Commit 023912ec authored by liyang's avatar liyang

fix:更新facebook和twitter

parent ab634c7f
import requests
from requests.exceptions import Timeout
from config.settings import get_base_url
headers = {}
baser_url = get_base_url()
# 设置超时时间为5秒
timeout = 3
def custom_request(method, url, *args, **kwargs):
"""
:param method:
:param url:
:param args:
:param kwargs:
:return:
"""
try:
response = requests.request(method, url, timeout=timeout, *args, **kwargs)
response = {"status_code": response.status_code, "data": response.json()}
except Timeout:
response = {"status_code": 500}
# 将响应内容解析为 JSON 格式
return response
def importJson(file, form_data):
"""
:param file:
:param form_data:
:return:
"""
http_url = baser_url + "importJson/import"
response = requests.post(http_url, headers=headers, files={"file": file}, data=form_data)
# 将响应内容解析为 JSON 格式
return {"status_code": response.status_code, "data": response.json()}
response = custom_request('POST', http_url, headers=headers, files={"file": file}, data=form_data)
return response
def getReptileTask():
"""
:return:
"""
http_url = baser_url + "crawlerSetting/list"
response = requests.get(http_url, headers=headers)
# 将响应内容解析为 JSON 格式
return {"status_code": response.status_code, "data": response.json()}
response = custom_request('GET', http_url)
return response
def importJsonPath(form_data):
headers = {"Content-Type": "application/json"}
"""
:param form_data:
:return:
"""
header = {"Content-Type": "application/json"}
http_url = baser_url + "importJson/importJsonPath"
response = requests.post(http_url, headers=headers, data=form_data)
# 将响应内容解析为 JSON 格式
return {"status_code": response.status_code, "data": response.json()}
response = custom_request('POST', http_url, headers=header, data=form_data)
return response
def runingPython(form_data):
headers = {"Content-Type": "application/json"}
"""
:param form_data:
:return:
"""
header = {"Content-Type": "application/json"}
http_url = baser_url + "python/startPy"
response = requests.post(http_url, headers=headers, data=form_data)
# 将响应内容解析为 JSON 格式
return {"status_code": response.status_code, "data": response.json()}
response = custom_request('POST', http_url, headers=header, data=form_data)
return response
This diff is collapsed.
import json
import time
from bs4 import BeautifulSoup
from utils.Logger import log
from utils.createBrowserDriver import create
from utils.filse import save_json
from api.index import importJson, getReptileTask, importJsonPath
from utils.index import convert_to_traditional, yt_dlp_download,convert_string_to_time,parse_time_string
# from pytube import YouTube
import os
from config.settings import get_base_file_url
# 工具函数-下载图片
'''
打开指定网页,并使用 Selenium 模拟点击 "GDPR-accept" 按钮,然后循环点击 "search-show-more-button" 按钮来加载更多数据,直到按钮不再可点击为止。最后,获取完整的分页数据并关闭浏览器驱动。
'''
def reptile(browser=None, search_word=""):
url = "https://www.facebook.com/"
option = ['--headless']
# ['--headless']
browser = browser or create(option)
# year = datetime(2021, 1, 1)
# startDate = datetime(2020, 12, 31) # 初始日期
# endDate = datetime(2020, 12, 31) # 结束日期
# print(browser)
# 打开网页
browser.get(url)
# print("00000000000000000")
# time.sleep(3)
try:
'''
邮箱:liyang19970814@gmail.com
账号:abayomi7742
全名: Abayomi
密码:3Z84UMt)34NZj;T
'''
# 检测是否要登录
login_input = browser.find_element('xpath',"//input[@name='email']")
password_input = browser.find_element('xpath', "//input[@name='pass']")
login_input.send_keys("liyang19970814@gmail.com")
password_input.send_keys("xn89kiPT/^Kaeg#")
# 获取登录按钮
button_login = browser.find_element('xpath', "//button[@name='login']")
button_login.click()
time.sleep(3)
except:
# print("------")
a=1
# time.sleep(3)
url = f"https://www.facebook.com/search/top?q={search_word}"
browser.get(url)
# 使用 JavaScript 将网页滚动到底部
browser.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(3)
# 帖子块集合
elements = browser.find_elements('xpath',"//div[@role='feed']/div//div[@aria-describedby]")
# print(333333)
# time.sleep(3)
# 作者
element_authors_list = browser.find_elements('xpath',
"//div[@role='feed']/div//div[@aria-describedby]//h3/span[1]")
# print(element_authors_list)
# print("2222")
# 发布时间
element_release_list = browser.find_elements('xpath',
"//div[@role='feed']/div//div[@aria-describedby]//span[@dir]/span//a[@role='link' and @aria-label]")
# 查找所有 展开 按钮,循环点击后在查找内容
elements_expand_list = browser.find_elements('xpath',"//div[@role='feed']/div//div[@aria-describedby]//div[@role='button' and text()='展开']")
for item in elements_expand_list:
item.click()
# time.sleep(2)
# 内容
element_content_list = browser.find_elements('xpath',"//div[@role='feed']/div//div[@aria-describedby]/div/div/div/div/div/div[2]/div/div/div[3]")
# print(element_content_list)
length = len(elements)
# print(length)
for index in range(length):
author = element_authors_list[index].text
# el = element_release_list[index]
# # datetime_el = el.get_attribute("datetime")
# html = el.text
# 去除时间字符串中包含的html标签
# BeautifulSoup(element_release_list[index].get_attribute("innerHTML"),"html.parser").get_text()
release_time = str(int(parse_time_string(element_release_list[index].text)))
content = element_content_list[index].get_attribute("innerHTML")
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(element_content_list[index].get_attribute('innerHTML'), 'html.parser')
# ---------------- 判断类型 start ----------
# 类型
content_type = ""
try:
# 查找所有img标签
img_tags = soup.find_all('img')
if len(img_tags) > 0:
content_type = "图文"
else:
content_type = "文字"
except:
content_type = "文字"
# ---------------- 判断类型 end ----------
# --------------- 组装数据 start---------------------
obj = {
"title": "",
"content": content,
"link": element_release_list[index].get_attribute("href"),
"reptileTime": str(int(time.time())),
"type": content_type,
"author": author,
"releaseTime": release_time
}
# --------------- 组装数据 end---------------------
data.append(obj)
# print(content)
# 内容过滤
# 使用BeautifulSoup解析HTML
# soup = BeautifulSoup(content.get_attribute("innerHTML"), 'html.parser')
# print(soup)
# print("-----")
# print("---------------")
# print(input_email_element)
# print(input_pwd_element)
# print(button_login)
# logger.debug(button)
# 模拟点击按钮多次加载更多数据
# while button.is_enabled():
# time.sleep(2) # 等待一段时间,确保页面加载完毕
# try:
# button.click()
# button = WebDriverWait(browser, 5).until(
# EC.element_to_be_clickable((By.XPATH, "//button[@data-testid='search-show-more-button']")))
# except:
# break
# time.sleep(3)
# 获取完整的分页数据
# page_content = browser.page_source
# soup = BeautifulSoup(page_content, 'html.parser')
# print("----------")
# print(soup)
# list_news = soup.find_all('li', {"class": "css-1l4w6pd"})
# for index, item in enumerate(list_news):
# logger.debug(item)
# # 抓取图片
# image_key = image_key + 1
# url_element = item.find('img', {"class": "css-rq4mmj"})
# image_url = url_element['src'] if url_element else ""
# # logger.debug(url)
# if image_url:
# # logger.debug(url)
# # # 下载图片
# #
# filename = f"{image_key}.jpg"
# # logger.debug(filename)
# # sys.exit()
# download_image(image_url, f'{fileDir}images/{filename}')
# # 抓取文字
# title_element = item.find('h4', {"class": "css-2fgx4k"})
# introduction_element = item.find('p', {"class": "css-16nhkrn"})
# title = title_element.get_text() if title_element else ""
# introduction = introduction_element.get_text() if introduction_element else ""
# news = {
# "title": title,
# "introduction": introduction,
# "imageName": filename
# }
# data.append(news)
# logger.debug(data)
# 将数据保存到文件中
# with open(f'{fileDir}data.json', "w", encoding="utf-8") as file:
# json.dump(data, file, indent=2, ensure_ascii=False)
# 发送爬取数据到java服务
# print('----------------------')
# print(data)
if len(data) > 0:
# 保存json文件到本地
log.debug(os.path.abspath("../"))
file_dir = f'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", "facebook", str(int(time.time())) + ".json")}'
# file_dir = f'./reptile_data/ptt/{int(time.time())}.json'
state_save = save_json(file_dir, data)
# log.debug("-----------------------------")
# write_to_database(data)
if state_save:
log.debug('save file success')
# path = os.path.abspath(file_dir).join(file_dir).join(".json")
# log.debug('file_path:' + file_dir)
# form_data = {
# "path": file_dir,
# "tableName": table_name
# }
# response = importJsonPath(form_data)
else:
log.debug('save file failed')
else:
# 爬取数据为空
log.info("未爬取到数据")
# form_data = {
# "path": "",
# "tableName": table_name
# }
# response = importJsonPath(form_data)
browser.quit()
def main():
"""
"""
# 请求关键词
response = getReptileTask()
global status_task
# print(response)
if response['status_code'] == 200 and response['data']['code'] == 200:
log.debug("call success")
search_word = ""
for item in response['data']['rows']:
if item['name'] == 'ptt':
search_word = item['keyword']
table_name = item['tableName']
status_task = item["status"]
# 简体转繁体
if status_task == 0 and len(search_word) > 0:
reptile(None, convert_to_traditional(search_word))
else:
log.debug("爬取任务未启用")
else:
log.debug("call failed")
# 请求超时
reptile(None, convert_to_traditional("新闻"))
# upload_control()
# 全局变量
data = []
table_name = "pms_facebook"
# 调用main函数
main()
......@@ -150,19 +150,16 @@ def reptile(browser=None, search_word=""):
}
# --------------- 组装数据 end---------------------
if search_word is None or search_word == str(search_word):
# 使用正则表达式进行匹配
# log.debug(f"关键词:{search_word}-{element_title.text}")
matches = re.findall(search_word, element_title.text)
# 打印匹配结果
if matches:
# log.debug(f"找到了匹配的字符串:{matches}")
data.append(obj)
else:
# 使用正则表达式进行匹配
# log.debug(f"关键词:{search_word}-{element_title.text}")
matches = re.findall(search_word, element_title.text)
# 打印匹配结果
if matches:
# log.debug(f"找到了匹配的字符串:{matches}")
data.append(obj)
else:
# log.debug("未找到匹配的字符串")
a = 3
# log.debug("未找到匹配的字符串")
a = 3
# 浏览器返回上一页
browser.back()
element_list = browser.find_elements('xpath', "//div[@class='r-ent']//div[@class='title']//a")
......@@ -225,13 +222,14 @@ def main():
table_name = item['tableName']
status_task = item["status"]
# 简体转繁体
if status_task == 0:
if status_task == 0 and len(search_word)>0:
reptile(None, convert_to_traditional(search_word))
else:
log.debug("爬取任务未启用")
else:
log.debug("call failed")
reptile(None, '')
# 请求超时
reptile(None, convert_to_traditional("新闻"))
# upload_control()
......
......@@ -5,7 +5,7 @@ from utils.Logger import log
from utils.createBrowserDriver import create
from utils.filse import save_json
from api.index import importJson, getReptileTask, importJsonPath
from utils.index import convert_to_traditional, yt_dlp_download,convert_string_to_time
from utils.index import convert_to_traditional, yt_dlp_download, convert_string_to_time
# from pytube import YouTube
import os
from config.settings import get_base_file_url
......@@ -20,56 +20,108 @@ def reptile(browser=None, search_word=""):
url = "https://twitter.com/"
option = ['--headless']
# ['--headless']
browser = browser or create()
browser = browser or create(option, False)
# print(browser)
# 打开网页
browser.get(url)
time.sleep(3)
try:
# 检测是否要登录
login_input = browser.find_element('xpath',"//input[@autocomplete='username']")
login_input = browser.find_element('xpath', "//input[@autocomplete='username']")
login_input.send_keys("liyang1851603")
# 获取下一步按钮
buttons = browser.find_element('xpath', "//div[@role='button'][2]")
buttons.click()
time.sleep(3)
password_input = browser.find_element('xpath', "//input[@autocomplete='current-password']")
password_input = browser.find_element('xpath', "//input[@autocomplete='current-password']")
password_input.send_keys("liyang19970814")
# 获取登录按钮
button_login = browser.find_element('xpath',"//div[@data-testid='LoginForm_Login_Button']")
button_login = browser.find_element('xpath', "//div[@data-testid='LoginForm_Login_Button']")
button_login.click()
except:
print("------")
# print(333333)
# time.sleep(3)
time.sleep(2)
url = 'https://twitter.com/search?q='+search_word+'&src=typed_query'
browser.get(url)
time.sleep(3)
# 内容块
element_content_list = browser.find_elements('xpath',
"//div[@data-testid='cellInnerDiv']//article/div/div/div[2]/div[2]")
# 作者
element_authors_list = browser.find_elements('xpath',
"//div[@data-testid='cellInnerDiv']//article//div[@data-testid='User-Name']//a[@role='link']//div[@dir='ltr']")
print(element_authors_list)
print("2222")
"//div[@data-testid='cellInnerDiv']//article//div[@data-testid='User-Name']/div[1]//a[@role='link']")
# 发布时间
element_release_list = browser.find_elements('xpath',
"//div[@data-testid='cellInnerDiv']//article//div[@data-testid='User-Name']//div[2]//time[@datetime]")
# 标题
# element_title_list = browser.find_element('xpath',)
# 内容
element_content_list = browser.find_elements('xpath',"//div[@data-testid='cellInnerDiv']//article/div/div/div[2]/div[2]")
"//div[@data-testid='cellInnerDiv']//article//div[@data-testid='User-Name']//div[2]//time[@datetime]")
# print(element_content_list)
length = len(element_authors_list)
print(length)
for index in range(length):
author = element_authors_list[index].text
release_time = element_release_list[index].get_attribute("datetime")
content = element_content_list[index]
print(content)
release_time = str(int(parse_twitter_time_string(element_release_list[index].text)))
content = element_content_list[index].get_attribute("innerHTML")
# print(content)
# 内容过滤
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(content.get_attribute("innerHTML"), 'html.parser')
print(soup)
print("-----")
# browser.close()
# # 关闭浏览器驱动
# browser.quit()
soup = BeautifulSoup(content, 'html.parser')
# ---------------- 判断类型 start ----------
# 类型
content_type = ""
try:
# 查找所有img标签
img_tags = soup.find_all('img')
if len(img_tags) > 0:
content_type = "图文"
else:
content_type = "文字"
except:
content_type = "文字"
# ---------------- 判断类型 end ----------
# --------------- 组装数据 start---------------------
obj = {
"title": "",
"content": content,
"link": "",
"reptileTime": str(int(time.time())),
"type": content_type,
"author": author,
"releaseTime": release_time
}
# --------------- 组装数据 end---------------------
# 发送爬取数据到java服务
# print('----------------------')
# print(data)
if len(data) > 0:
# 保存json文件到本地
log.debug(os.path.abspath("../"))
file_dir = f'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", "twitter", str(int(time.time())) + ".json")}'
# file_dir = f'./reptile_data/ptt/{int(time.time())}.json'
state_save = save_json(file_dir, data)
# log.debug("-----------------------------")
# write_to_database(data)
if state_save:
log.debug('save file success')
# path = os.path.abspath(file_dir).join(file_dir).join(".json")
# log.debug('file_path:' + file_dir)
# form_data = {
# "path": file_dir,
# "tableName": table_name
# }
# response = importJsonPath(form_data)
else:
log.debug('save file failed')
else:
# 爬取数据为空
log.info("未爬取到数据")
# form_data = {
# "path": "",
# "tableName": table_name
# }
# response = importJsonPath(form_data)
# 关闭浏览器驱动
# time.sleep(3)
browser.quit()
def main():
......@@ -78,23 +130,32 @@ def main():
"""
# 请求关键词
response = getReptileTask()
global status_task
# print(response)
if response['status_code'] == 200 and response['data']['code'] == 200:
log.debug("call success")
search_word = ""
for item in response['data']['rows']:
if item['name'] == 'twitter':
if item['name'] == 'ptt':
search_word = item['keyword']
table_name = item['tableName']
reptile(None, convert_to_traditional(search_word))
status_task = item["status"]
# 简体转繁体
if status_task == 0 and len(search_word)>0:
reptile(None, convert_to_traditional(search_word))
else:
log.debug("爬取任务未启用")
else:
log.debug("call failed")
reptile(None, '')
# 请求超时
reptile(None, convert_to_traditional("新闻"))
# upload_control()
# 全局变量
data = []
table_name = "pms_twitter"
# 是否启用
status_task = '0'
# 调用main函数
main()
\ No newline at end of file
main()
......@@ -61,9 +61,9 @@ def reptile(browser=None, search_word=""):
base_urr = get_base_file_url()
releaseTime = ""
try:
releaseTime = str(convert_string_to_time(element_time_list[index].text))
releaseTime = str(int(convert_string_to_time(element_time_list[index].text)))
except:
releaseTime = str(time.time())
releaseTime = str(int(time.time()))
# 下载视频
state_download = yt_dlp_download(url, 'youtube')
log.debug(url)
......@@ -124,23 +124,25 @@ def main():
"""
# 请求关键词
response = getReptileTask()
# print(response)
global status_task
# print(response)
if response['status_code'] == 200 and response['data']['code'] == 200:
log.debug("call success")
search_word = ""
for item in response['data']['rows']:
if item['name'] == 'youtube':
if item['name'] == 'ptt':
search_word = item['keyword']
table_name = item['tableName']
status_task = item["status"]
if status_task == 0:
# 简体转繁体
if status_task == 0 and len(search_word) > 0:
reptile(None, convert_to_traditional(search_word))
else:
log.debug("爬取任务未启用")
else:
log.debug("call failed")
reptile(None, '')
# 请求超时
reptile(None, convert_to_traditional("新闻"))
# upload_control()
......
......@@ -12,9 +12,10 @@ from selenium.webdriver.support.ui import WebDriverWait
'''
def create(option=None):
def create(option=None, using_user_data=True):
"""
:param using_user_data:
:param option:
:return:
"""
......@@ -28,7 +29,8 @@ def create(option=None):
script = f'--user-data-dir={user_data_dir}'
# print(script)
# log.debug(script)
chrome_options.add_argument(script) # 设置一个自定义的用户配置文件路径
if using_user_data:
chrome_options.add_argument(script) # 设置一个自定义的用户配置文件路径
if sys.platform.startswith('linux'):
# print("当前系统是 Linux")
......
......@@ -5,12 +5,46 @@ from pytube import YouTube
import ssl
import subprocess
from utils.Logger import log
from datetime import datetime, timedelta
import datetime
from dateutil import parser
# from dateutil.relativedelta import relativedelta
ssl._create_default_https_context = ssl._create_stdlib_context
# 转换 facebook 的时间
def parse_time_string(time_str):
"""
:param time_str:
:return:
"""
log.debug(f'转换face4book的发布时间:{time_str}')
if "天" in time_str:
number = int(time_str.split("天")[0])
time_delta = datetime.timedelta(days=number)
return int((datetime.datetime.now() + time_delta).timestamp())
elif "分钟" in time_str:
number = int(time_str.split("分钟")[0])
time_delta = datetime.timedelta(minutes=number)
return int((datetime.datetime.now() + time_delta).timestamp())
elif "小时" in time_str:
number = int(time_str.split("小时")[0])
time_delta = datetime.timedelta(hours=number)
return int((datetime.datetime.now() + time_delta).timestamp())
else:
try:
datetime_str = time_str.replace("月", " ").replace("日", "")
month, day, hour, minute = map(int, datetime_str.split())
current_year = datetime.datetime.now().year
datetime_obj = datetime.datetime(year=current_year, month=month, day=day, hour=hour, minute=minute)
return int(datetime_obj.timestamp())
except ValueError:
return None
# 转换 youtube 的时间
def convert_string_to_time(string):
"""
......@@ -38,6 +72,18 @@ def convert_string_to_time(string):
timestamp = int(converted_time.timestamp())
return timestamp
# 转换 twitter 的时间
def parse_twitter_time_string(time_str):
"""
:param time_str:
:return:
"""
# 解析相对时间字符串
return datetime.timestamp(parser.parse(time_str, fuzzy=True))
def convert_to_traditional(simplified_text):
"""
将简体中文文本转换为繁体中文文本。
......@@ -85,7 +131,7 @@ def yt_dlp_download(url, name):
# --get-url
video_selection = f''
# 清晰度
definition = f'18' # 360p
definition = f'18' # 360p
# definition = f'18' # 720p
# definition = f'24' # 1080p
download_options = f'-f {definition} -vU'
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment