Commit 8de9ad90 authored by liyang's avatar liyang

feat:加载多页

parent 835fc16a
......@@ -7,7 +7,7 @@ from utils.createBrowserDriver import create
from utils.filse import save_json
from api.index import importJson, getReptileTask, importJsonPath
from utils.index import convert_to_traditional, yt_dlp_download, convert_string_to_time, \
parse_time_string,create_directory_if_not_exists, delete_directory
parse_time_string, create_directory_if_not_exists, delete_directory
# from pytube import YouTube
from datetime import datetime
from utils.download_image import download_image
......@@ -31,7 +31,7 @@ from selenium.webdriver.support import expected_conditions as EC
def reptile(browser=None, search_word=""):
print(f"搜索词:{search_word}")
url = "https://www.facebook.com/"
browser = browser or create(no_headless=False, using_user_data=False)
browser = browser or create(no_headless=False, using_user_data=True)
# 打开网页
browser.get(url)
time.sleep(2)
......@@ -51,14 +51,15 @@ def reptile(browser=None, search_word=""):
log.debug("facebook login complete")
url = f"https://www.facebook.com/search/top?q={search_word}"
browser.get(url)
# time.sleep(1)
time.sleep(2)
# 使用 JavaScript 将网页滚动到底部
browser.execute_script("window.scrollTo(0, document.body.scrollHeight);")
# time.sleep(1)
time.sleep(6)
# 等待内容出现,设置最长等待时间为10秒
wait = WebDriverWait(browser, 10)
# 通过 expected_conditions 来定义等待条件,这里以弹窗内容的某个元素为例
wait.until(EC.presence_of_element_located((By.XPATH, "//div[@role='feed']")))
# 内容
element_content_list = browser.find_elements('xpath',
"//div[@role='feed']/div//div[@aria-describedby]/div/div/div/div/div/div[2]/div/div/div[3]")
......@@ -78,11 +79,13 @@ def reptile(browser=None, search_word=""):
print("Clicking element failed: " + str(e))
length = len(element_content_list)
# print(length)
for index in range(length):
author = element_authors_list[index].text
release_time_timestamp = int(parse_time_string(element_release_list[index].text))
author_soup = BeautifulSoup(element_authors_list[index].get_attribute("outerHTML"), "html")
time_soup = BeautifulSoup(element_release_list[index].get_attribute("outerHTML"), "html")
# author = element_authors_list[index].text
author = author_soup.find_all("a")[0].text
time_text = time_soup.find_all("a")[0].text
release_time_timestamp = int(parse_time_string(time_text))
release_time = str(release_time_timestamp)
# 过滤时间
......@@ -119,8 +122,8 @@ def reptile(browser=None, search_word=""):
custom_video["src"] = ""
parent_div.append(custom_video)
else:
print("")
# print("")
error = ""
picture_url = []
if len(image_list) > 0:
for key, element in enumerate(image_list):
......@@ -136,7 +139,8 @@ def reptile(browser=None, search_word=""):
element['src'] = access_address
picture_url.append(download_dir)
else:
print("")
# print("")
error = ""
content = soup.prettify()
# 标题取:作者+日期
title = f"{author}-{datetime.fromtimestamp(release_time_timestamp)}"
......@@ -215,7 +219,6 @@ def script_close(browser):
print("sys.exit() 执行失败")
def main():
"""
......
......@@ -48,25 +48,23 @@ def reptile(browser=None, search_word=""):
# 有头模式执行
# browser = browser or create()
# 打开网页
browser.get(url+"&page=1")
browser.get(url + "&page=1")
time.sleep(2)
# 获取分页
page_list_element = browser.find_elements("xpath", "//div[@data-desc='分頁']/a[@class='p_num' or @class='active']")
page_next = False
page_next = browser.find_elements("xpath", "//div[@data-desc='分頁']/a[@class='p_next']")
page_index = 1
# 循环分页
for key, element in enumerate(page_list_element):
if key > 0 and key <= len(page_list_element) - 1:
# 点击分页
browser.get(f"{url}&page={key+1}")
# element.click()
time.sleep(2)
# 重新获取
page_list_element = browser.find_elements("xpath", "//div[@data-desc='分頁']/a")
elif key == len(page_list_element) - 1:
# 退出本次循环迭代
continue
while page_next:
if page_index > 1:
browser.get(f"{url}&page={page_index}")
time.sleep(0.5)
# 重新获取
try:
page_next = browser.find_elements("xpath", "//div[@data-desc='分頁']/a[@class='p_next']")
except:
page_next = False
# 滚动底部
browser.execute_script("window.scrollTo(0, document.body.scrollHeight);")
# log.debug("已打开浏览器")
......@@ -137,6 +135,14 @@ def reptile(browser=None, search_word=""):
}
# --------------- 组装数据 end---------------------
data.append(obj)
page_index = page_index + 1
time.sleep(0.1)
# 限制最大20页
if page_index >= 20:
page_next = False
# 退出循环
break
if len(data) > 0:
# 保存json文件到本地
......
......@@ -5,8 +5,8 @@ from utils.Logger import log
from utils.createBrowserDriver import create
from utils.filse import save_json
from api.index import importJson, getReptileTask, importJsonPath
from utils.index import convert_to_traditional, yt_dlp_download, convert_string_to_time, parse_twitter_time_string,\
extract_image_format,create_directory_if_not_exists, delete_directory
from utils.index import convert_to_traditional, yt_dlp_download, convert_string_to_time, parse_twitter_time_string, \
extract_image_format, create_directory_if_not_exists, delete_directory
# from pytube import YouTube
import os
import sys
......@@ -70,12 +70,23 @@ def reptile(browser=None, search_word=""):
url = 'https://twitter.com/search?q=' + search_word + '&src=typed_query'
browser.get(url)
time.sleep(2)
# 浏览器滚动到底部
browser.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2)
wait = WebDriverWait(browser, 10)
wait.until(
EC.presence_of_element_located((By.XPATH, "//div[@data-testid='cellInnerDiv']//article/div/div/div[2]/div[2]")))
base_xpath = "//div[@data-testid='cellInnerDiv']//article/div/div/div[2]/div[2]"
# 内容块
element_content_list = browser.find_elements('xpath', base_xpath)
# 小于10条,加载下一页
if len(element_content_list) < 10:
browser.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2)
element_content_list = browser.find_elements('xpath', base_xpath)
# 作者
element_authors_list = browser.find_elements('xpath',
f"{base_xpath}//div[@data-testid='User-Name']/div[1]//a[@role='link']")
......@@ -125,7 +136,7 @@ def reptile(browser=None, search_word=""):
# div[@aria-labelledby="xx"] 替换为img标签【内容含有视频的替换为img标签】
items.replaceWith(div)
else:
error =""
error = ""
else:
error = ""
......@@ -155,7 +166,7 @@ def reptile(browser=None, search_word=""):
# 删除多余div
# parent_div = soup.find("div")
# 找到所有的 <div> 子元素
div_elements = soup.find("div").findChildren("div",recursive=False)
div_elements = soup.find("div").findChildren("div", recursive=False)
for key, item in enumerate(div_elements):
if key == 0 or key == len(div_elements) - 1:
item.extract()
......@@ -236,7 +247,6 @@ def script_close(browser):
print("sys.exit() 执行失败")
def main():
"""
......
......@@ -59,7 +59,7 @@ def reptile(browser=None, search_word=""):
video_duration = int(YouTube(url).length) // 60
# 暂时先取6条数据
if index < 6 and video_duration < 60:
if video_duration < 60:
# print(str(id))
# print("视频连接:" + str(link))
# print("视频时长:" + str(video_duration))
......@@ -73,7 +73,7 @@ def reptile(browser=None, search_word=""):
# 过滤时间
# # 如果'releaseTime'不是整数,则将其转换为整数
new_releaseTime = int(releaseTime)
if new_releaseTime < beginFiltrationTime or new_releaseTime > endFiltrationTime:
if beginFiltrationTime <= new_releaseTime <= endFiltrationTime:
# 如果'new_releaseTime'不在指定范围内,则跳过当前迭代,继续下一个项目
continue
......
......@@ -14,7 +14,11 @@ def download_image(url, save_path):
# print(f"图片文件已存在:{save_path}")
return True
response = requests.get(url, stream=True)
response = ""
try:
response = requests.get(url, stream=True)
except:
return False
if response.status_code == 200:
with open(save_path, 'wb') as file:
......
......@@ -97,14 +97,15 @@ def parse_time_string(time_str):
time_delta = datetime.timedelta(hours=number)
return int((datetime.datetime.now() + time_delta).timestamp())
else:
try:
datetime_str = time_str.replace("月", " ").replace("日", "")
month, day = map(int, datetime_str.split())
current_year = datetime.datetime.now().year
datetime_obj = datetime.datetime(year=current_year, month=month, day=day)
return int(datetime_obj.timestamp())
except ValueError:
return None
# 非 n天前、n小时前、n分钟前
if "年" in time_str:
new_time_str = time_str.replace("年", "/").replace("月", "/").replace("日", "")
dt_object = datetime.datetime.strptime(new_time_str, '%Y/%m/%d')
else:
new_time_str = time_str.replace("月", "/").replace("日", "")
dt_object = datetime.datetime.strptime(new_time_str, '%m/%d')
return dt_object.timestamp()
def parse_ltn_time_string(time_str):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment