Commit 1526bcd6 authored by liyang's avatar liyang

feat:爬取instagram

parent 2fe822c2
......@@ -13,8 +13,5 @@ reptile_data/**/*.json
*.mp4
*.webm
*.jpg
*.app
*.exe
*.deb
browser/*chrome*
browser/**/chromedriver
\ No newline at end of file
......@@ -2,8 +2,26 @@
def get_log_path():
return "../"
def get_base_url():
return "http://192.168.0.118:8081/"
def get_base_file_url():
return "http://192.168.0.118:8186/"
\ No newline at end of file
return "http://192.168.0.118:8186/"
def get_account(name):
data = {}
if name == "twitter":
data["name"] = "liyang1851603"
data["password"] = "liyang19970814"
elif name == "facebook":
data["name"] = "liyang19970814@gmail.com"
data["password"] = "xn89kiPT/^Kaeg#"
elif name == "instagram":
data["name"] = "anthonymills7693"
data["password"] = "unm8rgoab52"
else:
print("")
return data
......@@ -11,6 +11,7 @@ from datetime import datetime
from utils.download_image import download_image
import os
from config.settings import get_base_file_url
from config.settings import get_account
import sys
# 工具函数-下载图片
'''
......@@ -29,8 +30,8 @@ def reptile(browser=None, search_word=""):
# 检测是否要登录
login_input = browser.find_element('xpath', "//input[@name='email']")
password_input = browser.find_element('xpath', "//input[@name='pass']")
login_input.send_keys("liyang19970814@gmail.com")
password_input.send_keys("xn89kiPT/^Kaeg#")
login_input.send_keys(get_account("facebook")["name"])
password_input.send_keys(get_account("facebook")["password"])
# 获取登录按钮
button_login = browser.find_element('xpath', "//button[@name='login']")
button_login.click()
......
......@@ -5,10 +5,20 @@ from utils.Logger import log
from utils.createBrowserDriver import create
from utils.filse import save_json
from api.index import importJson, getReptileTask, importJsonPath
from utils.index import convert_to_traditional, yt_dlp_download,convert_string_to_time,parse_time_string
from utils.index import convert_to_traditional, yt_dlp_download, convert_string_to_time, parse_twitter_time_string
# from pytube import YouTube
from config.settings import get_account
import os
import sys
from datetime import datetime
from utils.download_image import download_image
from config.settings import get_base_file_url
# --------------- selenium 依赖 start ----------------
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# --------------- selenium 依赖 end ----------------
# 工具函数-下载图片
'''
......@@ -17,196 +27,123 @@ from config.settings import get_base_file_url
def reptile(browser=None, search_word=""):
url = "https://www.facebook.com/"
"""
:param browser:
:param search_word:
"""
print(f"搜索词:{search_word}")
base_url = "https://www.instagram.com/"
option = ['--headless']
# ['--headless']
browser = browser or create(option)
# year = datetime(2021, 1, 1)
# startDate = datetime(2020, 12, 31) # 初始日期
# endDate = datetime(2020, 12, 31) # 结束日期
browser = browser or create(None, True)
# print(browser)
# 打开网页
browser.get(url)
# print("00000000000000000")
# time.sleep(3)
browser.get(base_url)
time.sleep(3)
try:
'''
邮箱:liyang19970814@gmail.com
账号:abayomi7742
全名: Abayomi
密码:3Z84UMt)34NZj;T
'''
# 检测是否要登录
login_input = browser.find_element('xpath',"//input[@name='email']")
password_input = browser.find_element('xpath', "//input[@name='pass']")
login_input.send_keys("liyang19970814@gmail.com")
password_input.send_keys("xn89kiPT/^Kaeg#")
login_input = browser.find_element('xpath', "//input[@name='username']")
password_input = browser.find_element('xpath', "//input[@name='password']")
login_input.send_keys(get_account("instagram")["name"])
password_input.send_keys(get_account("instagram")["password"])
# 获取登录按钮
button_login = browser.find_element('xpath', "//button[@name='login']")
button_login = browser.find_element('xpath', "//button[@type='submit']")
button_login.click()
time.sleep(3)
except:
# print("------")
a=1
# time.sleep(3)
url = f"https://www.facebook.com/search/top?q={search_word}"
print("------")
# print("1111")
url = f"{base_url}explore/tags/{search_word}/"
browser.get(url)
# 使用 JavaScript 将网页滚动到底部
browser.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(3)
# 帖子块集合
elements = browser.find_elements('xpath',"//div[@role='feed']/div//div[@aria-describedby]")
# print(333333)
# time.sleep(3)
# 作者
element_authors_list = browser.find_elements('xpath',
"//div[@role='feed']/div//div[@aria-describedby]//h3/span[1]")
# print(element_authors_list)
# print("2222")
# 发布时间
element_release_list = browser.find_elements('xpath',
"//div[@role='feed']/div//div[@aria-describedby]//span[@dir]/span//a[@role='link' and @aria-label]")
# 查找所有 展开 按钮,循环点击后在查找内容
elements_expand_list = browser.find_elements('xpath',"//div[@role='feed']/div//div[@aria-describedby]//div[@role='button' and text()='展开']")
for item in elements_expand_list:
item.click()
# time.sleep(2)
# 内容
element_content_list = browser.find_elements('xpath',"//div[@role='feed']/div//div[@aria-describedby]/div/div/div/div/div/div[2]/div/div/div[3]")
# print(element_content_list)
length = len(elements)
# print(length)
# 链接
element_link_list = browser.find_elements('xpath',"//article//a")
length = len(element_link_list)
for index in range(length):
element_link_list[index].click()
# 等待弹窗加载完成
wait = WebDriverWait(browser,10)
wait.until(EC.presence_of_element_located((By.XPATH, "//div[@role='dialog']/div/div[2]")))
author = element_authors_list[index].text
# el = element_release_list[index]
# # datetime_el = el.get_attribute("datetime")
# html = el.text
# 去除时间字符串中包含的html标签
# BeautifulSoup(element_release_list[index].get_attribute("innerHTML"),"html.parser").get_text()
release_time = str(int(parse_time_string(element_release_list[index].text)))
content = element_content_list[index].get_attribute("innerHTML")
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(element_content_list[index].get_attribute('innerHTML'), 'html.parser')
# ---------------- 判断类型 start ----------
# 提取其他
author = browser.find_element("xpath","//div[@role='dialog']/div//article/div/div[2]/div/div/div[1]//a")
content_element = browser.find_element("xpath","//div[@role='dialog']/div//article/div/div[2]/div/div/div[2]/div[1]//div[@role='button']//h1")
time_element = browser.find_element("xpath","//div[@role='dialog']/div//article/div/div[2]/div/div/div[2]/div[2]//time")
link_str = browser.current_url
# 提取时间,并转为时间戳
timestamp = datetime.fromisoformat(time_element.get_attribute("datetime")[:-1]).timestamp()
#提取图片、视频
picture_url = []
img_list = browser.find_elements("xpath","//div[@role='dialog']/div//article/div/div[1]/div/div[1]//img")
for key,item in enumerate(img_list):
if key == 0:
title = item.get_attribute("alt")
# 下载图片至本地,替换标签中的src
id = str(int(time.time()))
# 下载地址
download_dir = f'{os.path.join(file_dir, f"{id}.jpg")}'
# 访问地址
access_address = f'{get_base_file_url()}{table_name.split("_")[1]}/{id}.jpg'
# 下载状态
status = download_image(item['src'], download_dir)
if status:
item['src'] = access_address
picture_url.append(access_address)
#提取弹窗内容
soup = BeautifulSoup(content_element.get_attribute("outerHTML"), "html.parser")
# 将图片整合到内容中
for key, item in enumerate(img_list):
img = BeautifulSoup(item.get_attribute("outerHTML"),"html.parser")
soup.append(img)
content = soup.prettify()
# 类型
content_type = ""
try:
# 查找所有img标签
img_tags = soup.find_all('img')
if len(img_tags) > 0:
content_type = "图文"
else:
content_type = "文字"
except:
content_type = "文字"
# ---------------- 判断类型 end ----------
content_type = "图文"
# --------------- 组装数据 start---------------------
obj = {
"title": "",
"title": title or "",
"content": content,
"link": element_release_list[index].get_attribute("href"),
"link": link_str,
"reptileTime": str(int(time.time())),
"type": content_type,
"author": author,
"releaseTime": release_time
"author": author.text,
"releaseTime": str(int(timestamp)),
"picture_url": ",".join(picture_url)
}
# --------------- 组装数据 end---------------------
data.append(obj)
# print(content)
# 内容过滤
# 使用BeautifulSoup解析HTML
# soup = BeautifulSoup(content.get_attribute("innerHTML"), 'html.parser')
# print(soup)
# print("-----")
# print("---------------")
# print(input_email_element)
# print(input_pwd_element)
# print(button_login)
# logger.debug(button)
# 模拟点击按钮多次加载更多数据
# while button.is_enabled():
# time.sleep(2) # 等待一段时间,确保页面加载完毕
# try:
# button.click()
# button = WebDriverWait(browser, 5).until(
# EC.element_to_be_clickable((By.XPATH, "//button[@data-testid='search-show-more-button']")))
# except:
# break
# time.sleep(3)
# 获取完整的分页数据
# page_content = browser.page_source
# soup = BeautifulSoup(page_content, 'html.parser')
# print("----------")
# print(soup)
# list_news = soup.find_all('li', {"class": "css-1l4w6pd"})
# for index, item in enumerate(list_news):
# logger.debug(item)
# # 抓取图片
# image_key = image_key + 1
# url_element = item.find('img', {"class": "css-rq4mmj"})
# image_url = url_element['src'] if url_element else ""
# # logger.debug(url)
# if image_url:
# # logger.debug(url)
# # # 下载图片
# #
# filename = f"{image_key}.jpg"
# # logger.debug(filename)
# # sys.exit()
# download_image(image_url, f'{fileDir}images/{filename}')
# # 抓取文字
# title_element = item.find('h4', {"class": "css-2fgx4k"})
# introduction_element = item.find('p', {"class": "css-16nhkrn"})
# title = title_element.get_text() if title_element else ""
# introduction = introduction_element.get_text() if introduction_element else ""
# news = {
# "title": title,
# "introduction": introduction,
# "imageName": filename
# }
# data.append(news)
# logger.debug(data)
# 将数据保存到文件中
# with open(f'{fileDir}data.json', "w", encoding="utf-8") as file:
# json.dump(data, file, indent=2, ensure_ascii=False)
# 获取下一页按钮
next_buttons = browser.find_elements("xpath","//div[@role='dialog']/div/div[1]//button")
if index < length-1:
for key,item in enumerate(next_buttons):
if key+1 == len(next_buttons):
item.click()
# 发送爬取数据到java服务
# print('----------------------')
# print(data)
if len(data) > 0:
# 保存json文件到本地
log.debug(os.path.abspath("../"))
file_dir = f'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", "facebook", str(int(time.time())) + ".json")}'
# file_dir = f'./reptile_data/ptt/{int(time.time())}.json'
state_save = save_json(file_dir, data)
# log.debug("-----------------------------")
# write_to_database(data)
# log.debug(os.path.abspath("../"))
state_save = save_json(os.path.join(file_dir, str(int(time.time())) + ".json"), data)
if state_save:
log.debug('save file success')
# path = os.path.abspath(file_dir).join(file_dir).join(".json")
# log.debug('file_path:' + file_dir)
# form_data = {
# "path": file_dir,
# "tableName": table_name
# }
# response = importJsonPath(form_data)
else:
log.debug('save file failed')
else:
# 爬取数据为空
log.info("未爬取到数据")
# form_data = {
# "path": "",
# "tableName": table_name
# }
# response = importJsonPath(form_data)
browser.quit()
# 关闭浏览器驱动
try:
browser.close()
browser.quit()
except:
log.debug("浏览器驱动关闭失败")
sys.exit()
def main():
"""
......@@ -220,7 +157,7 @@ def main():
log.debug("call success")
search_word = ""
for item in response['data']['rows']:
if item['name'] == 'ptt':
if item['name'] == 'pms_instagram':
search_word = item['keyword']
table_name = item['tableName']
status_task = int(item["status"])
......@@ -238,6 +175,9 @@ def main():
# 全局变量
data = []
table_name = "pms_facebook"
table_name = "pms_instagram"
file_dir = f'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}'
# 是否启用
status_task = '0'
# 调用main函数
main()
......@@ -12,7 +12,7 @@ import sys
from datetime import datetime
from utils.download_image import download_image
from config.settings import get_base_file_url
from config.settings import get_account
# 工具函数-下载图片
'''
打开指定网页,并使用 Selenium 模拟点击 "GDPR-accept" 按钮,然后循环点击 "search-show-more-button" 按钮来加载更多数据,直到按钮不再可点击为止。最后,获取完整的分页数据并关闭浏览器驱动。
......@@ -37,13 +37,13 @@ def reptile(browser=None, search_word=""):
try:
# 检测是否要登录
login_input = browser.find_element('xpath', "//input[@autocomplete='username']")
login_input.send_keys("liyang1851603")
login_input.send_keys(get_account("twitter")["name"])
# 获取下一步按钮
buttons = browser.find_element('xpath', "//div[@role='button'][2]")
buttons.click()
time.sleep(3)
password_input = browser.find_element('xpath', "//input[@autocomplete='current-password']")
password_input.send_keys("liyang19970814")
password_input.send_keys(get_account("twitter")["password"])
# # 获取登录按钮
button_login = browser.find_element('xpath', "//div[@data-testid='LoginForm_Login_Button']")
button_login.click()
......
APScheduler==3.10.1
asgiref==3.7.2
async-generator==1.10
attrs==23.1.0
beautifulsoup4==4.12.2
certifi==2023.5.7
charset-normalizer==3.1.0
Django==4.2.2
docopt==0.6.2
exceptiongroup==1.1.1
h11==0.14.0
idna==3.4
loguru==0.7.0
lxml==4.9.2
outcome==1.2.0
pipreqs==0.4.13
PyMySQL==1.1.0
PySocks==1.7.1
pytube==15.0.0
pytz==2023.3
requests==2.31.0
selenium==4.10.0
six==1.16.0
sniffio==1.3.0
sortedcontainers==2.4.0
soupsieve==2.4.1
sqlparse==0.4.4
trio==0.22.0
trio-websocket==0.10.3
typing_extensions==4.7.0
tzlocal==5.0.1
urllib3==2.0.3
wsproto==1.2.0
yarg==0.1.9
OpenCC~=1.1.1
python-dateutil~=2.8.2
\ No newline at end of file
......@@ -28,7 +28,7 @@ from utils.index import get_screen_resolution
'''
def create(option=None, using_user_data=True, web_browser="chrome_test"):
def create(option=None, using_user_data=True, web_browser="chromium"):
"""
:param web_browser:
......@@ -50,7 +50,8 @@ def create(option=None, using_user_data=True, web_browser="chrome_test"):
options = webdriver.ChromeOptions()
elif web_browser == "edge":
options = webdriver.EdgeOptions()
elif web_browser == "chromium":
options = webdriver.ChromeOptions()
if option is not None:
for value in option:
options.add_argument(value)
......@@ -64,7 +65,7 @@ def create(option=None, using_user_data=True, web_browser="chrome_test"):
# options.add_argument(f'--user-data-dir={user_data_dir}')
elif web_browser == "chrome":
options.add_argument(f'--user-data-dir={user_data_dir}')
elif web_browser == "chrome_test":
elif web_browser == "chromium":
options.add_argument(f'--user-data-dir={user_data_dir}')
elif web_browser == "chrome_test":
options.add_argument(f'--user-data-dir={user_data_dir}')
......@@ -92,7 +93,7 @@ def create(option=None, using_user_data=True, web_browser="chrome_test"):
elif web_browser == "chrome":
# 创建Chrome浏览器对象并传入选项
web_browser = webdriver.Chrome(options=options, service=ChromeService(ChromeDriverManager().install()))
elif web_browser == "chrome_test":
elif web_browser == "chromium":
binary_location = ""
webdriver_location = ""
if platform.system() == "Windows":
......@@ -115,7 +116,7 @@ def create(option=None, using_user_data=True, web_browser="chrome_test"):
# 指定浏览器路径
# print(binary_location)
# 指定浏览器路径
options.binary_location = binary_location
# options.binary_location = binary_location
# options.browser_version = "114"
# 设置驱动二进制可执行文件路径
# service = ChromeService(executable_path=webdriver_location)
......
......@@ -2,7 +2,7 @@ import os.path
import re
import time
import opencc
from hanziconv import HanziConv
import datetime
from pytube import YouTube
import ssl
......@@ -135,8 +135,8 @@ def convert_to_traditional(simplified_text):
Returns:
str: 转换后的繁体中文文本。
"""
converter = opencc.OpenCC('s2t.json') # 创建简体中文到繁体中文的转换器
traditional_text = converter.convert(simplified_text) # 进行转换
# converter = opencc.OpenCC('s2t.json') # 创建简体中文到繁体中文的转换器
traditional_text = HanziConv.toTraditional(simplified_text) # 进行转换
return traditional_text
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment