import json import time from bs4 import BeautifulSoup from utils.Logger import log from utils.createBrowserDriver import create from utils.filse import save_json from api.index import importJson, getReptileTask, importJsonPath from utils.index import convert_to_traditional, yt_dlp_download, convert_string_to_time, parse_twitter_time_string, \ extract_image_format, create_directory_if_not_exists, delete_directory # from pytube import YouTube import os import sys from datetime import datetime from utils.download_image import download_image from config.settings import get_base_file_url from config.settings import get_account # --------------- selenium 依赖 start ---------------- from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC # --------------- selenium 依赖 end ---------------- import platform ''' 打开指定网页,并使用 Selenium 模拟点击 "GDPR-accept" 按钮,然后循环点击 "search-show-more-button" 按钮来加载更多数据,直到按钮不再可点击为止。最后,获取完整的分页数据并关闭浏览器驱动。 ''' def reptile(browser=None, search_word=""): """ :param browser: :param search_word: """ print(f"搜索词:{search_word}") base_url = "https://twitter.com/" browser = browser or create(no_headless=False, using_user_data=True) # 打开网页 browser.get(base_url) time.sleep(2) try: try: login_button = browser.find_element('xpath', "//a[@href='/login']") login_button.click() time.sleep(2) except: error = "" # wait = WebDriverWait(browser, 20) # wait.until(EC.presence_of_element_located((By.XPATH, "//input[@autocomplete='username']"))) # 检测是否要登录 login_input = browser.find_element('xpath', "//input[@autocomplete='username']") login_input.send_keys(get_account("twitter")["name"]) # 获取下一步按钮 buttons = browser.find_element('xpath', "//div[@role='button'][2]") buttons.click() wait = WebDriverWait(browser, 10) wait.until(EC.presence_of_element_located((By.XPATH, "//input[@autocomplete='current-password']"))) password_input = browser.find_element('xpath', "//input[@autocomplete='current-password']") password_input.send_keys(get_account("twitter")["password"]) # # 获取登录按钮 button_login = browser.find_element('xpath', "//div[@data-testid='LoginForm_Login_Button']") button_login.click() time.sleep(2) except: error = "" url = 'https://twitter.com/search?q=' + search_word + '&src=typed_query' browser.get(url) time.sleep(2) # 浏览器滚动到底部 browser.execute_script("window.scrollTo(0, document.body.scrollHeight);") time.sleep(2) wait = WebDriverWait(browser, 10) wait.until( EC.presence_of_element_located((By.XPATH, "//div[@data-testid='cellInnerDiv']//article/div/div/div[2]/div[2]"))) base_xpath = "//div[@data-testid='cellInnerDiv']//article/div/div/div[2]/div[2]" # 内容块 element_content_list = browser.find_elements('xpath', base_xpath) # 小于10条,加载下一页 if len(element_content_list) < 10: browser.execute_script("window.scrollTo(0, document.body.scrollHeight);") time.sleep(2) element_content_list = browser.find_elements('xpath', base_xpath) # 作者 element_authors_list = browser.find_elements('xpath', f"{base_xpath}//div[@data-testid='User-Name']/div[1]//a[@role='link']") length = len(element_authors_list) for index in range(length): soup = BeautifulSoup(element_content_list[index].get_attribute("outerHTML"), "html.parser") # 查找time标签 try: time_soup = soup.find('time') timestamp = datetime.fromisoformat(time_soup['datetime'].replace("Z", "+00:00")).timestamp() link_soup = time_soup.parent link_str = base_url + link_soup["href"] except: link_str = "" timestamp = time.time() # 过滤时间 # # 如果'releaseTime'不是整数,则将其转换为整数 new_releaseTime = int(timestamp) if new_releaseTime < beginFiltrationTime or new_releaseTime > endFiltrationTime: # 如果'new_releaseTime'不在指定范围内,则跳过当前迭代,继续下一个项目 continue author = element_authors_list[index].text # 标题取:作者+日期 title = f"{author}-{datetime.fromtimestamp(int(timestamp))}" video_list = soup.find_all("video") # lth = len(ignore_list) if len(video_list) > 0: # for key,element in enumerate(video_list): div_elements = soup.find("div").findChildren("div", recursive=False) # div_tags = soup.find_all("div", recursive=False) for item in video_list: div = soup.new_tag('div') img_tag = soup.new_tag('img') img_tag["src"] = item["poster"] div.append(img_tag) for items in div_elements: attr = False try: attr = items["aria-labelledby"] except: attr = False if attr: # div["aria-labelledby"] = "sdfsf" # div[@aria-labelledby="xx"] 替换为img标签【内容含有视频的替换为img标签】 items.replaceWith(div) else: error = "" else: error = "" image_list = soup.find_all("img") picture_url = [] if len(image_list) > 0: for key, element in enumerate(image_list): # 如果是svg,就删除 if str(element['src']).find("svg") != -1: element.extract() else: # 下载图片至本地,替换标签中的src id = str(int(time.time())) image_type = extract_image_format(element['src']) # 下载地址 download_dir = f'{os.path.join(local_path, f"{id}.{image_type}")}' # 访问地址 access_address = f'{get_base_file_url()}{table_name.split("_")[1]}/{local_path_name}/{id}.{image_type}' # 下载状态 status = download_image(element['src'], download_dir) if status: element['src'] = access_address picture_url.append(download_dir) else: error = "" # 删除多余div # parent_div = soup.find("div") # 找到所有的