import json import time from bs4 import BeautifulSoup from utils.Logger import log from utils.createBrowserDriver import create from utils.filse import save_json from api.index import importJson, getReptileTask, importJsonPath from utils.index import convert_to_traditional, yt_dlp_download, convert_string_to_time, parse_time_string # from pytube import YouTube from datetime import datetime from utils.download_image import download_image import os from config.settings import get_base_file_url import sys # 工具函数-下载图片 ''' 打开指定网页,并使用 Selenium 模拟点击 "GDPR-accept" 按钮,然后循环点击 "search-show-more-button" 按钮来加载更多数据,直到按钮不再可点击为止。最后,获取完整的分页数据并关闭浏览器驱动。 ''' def reptile(browser=None, search_word=""): print(f"搜索词:{search_word}") url = "https://www.facebook.com/" option = ['--headless'] browser = browser or create(option, True) # 打开网页 browser.get(url) try: # 检测是否要登录 login_input = browser.find_element('xpath', "//input[@name='email']") password_input = browser.find_element('xpath', "//input[@name='pass']") login_input.send_keys("liyang19970814@gmail.com") password_input.send_keys("xn89kiPT/^Kaeg#") # 获取登录按钮 button_login = browser.find_element('xpath', "//button[@name='login']") button_login.click() time.sleep(3) except: print("已登录") url = f"https://www.facebook.com/search/top?q={search_word}" browser.get(url) # 使用 JavaScript 将网页滚动到底部 browser.execute_script("window.scrollTo(0, document.body.scrollHeight);") time.sleep(3) # 内容 element_content_list = browser.find_elements('xpath', "//div[@role='feed']/div//div[@aria-describedby]/div/div/div/div/div/div[2]/div/div/div[3]") # 作者 element_authors_list = browser.find_elements('xpath', "//div[@role='feed']/div//div[@aria-describedby]//h3/span[1]") # 发布时间 element_release_list = browser.find_elements('xpath', "//div[@role='feed']/div//div[@aria-describedby]//span[@dir]/span//a[@role='link' and @aria-label]") # 查找所有 展开 按钮,循环点击后在查找内容 elements_expand_list = browser.find_elements('xpath', "//div[@role='feed']/div//div[@aria-describedby]//div[@role='button' and text()='展开']") for key, element in enumerate(elements_expand_list): try: # 使用JavaScript 执行点击操作 browser.execute_script("arguments[0].click();", element) except Exception as e: print("Clicking element failed: " + str(e)) length = len(element_content_list) # print(length) for index in range(length): author = element_authors_list[index].text # print(element_release_list[index].text) # print(parse_time_string(element_release_list[index].text)) release_time_timestamp = int(parse_time_string(element_release_list[index].text)) release_time = str(release_time_timestamp) # release_time = "" # content = element_content_list[index].get_attribute("outerHTML") # 使用BeautifulSoup解析HTML text = element_content_list[index].text soup = BeautifulSoup(element_content_list[index].get_attribute('outerHTML'), 'html.parser') soup_str = soup.prettify() # 查找是否含有视频 # ignore_list = soup.find_all("div", {"data-visualcompletion": "video"}) video_list = soup.find_all("video") image_list = soup.find_all("img") # lth = len(ignore_list) if len(video_list) > 0: # for key,element in enumerate(video_list): # 删除第二个子元素 # 找到包含两个
元素的父级元素 parent_div = soup.find('div') # 找到所有的
子元素 div_elements = parent_div.find_all('div', recursive=False) # div_tags = soup.find_all("div", recursive=False) # 确保列表中至少有两个
子元素 if len(div_elements) >= 2: # 获取第二个
元素,并将其从父级元素中移除 div_to_remove = div_elements[1] div_to_remove.extract() # 删除 # div.decompose() # 创建video标签占位 custom_video = soup.new_tag("video") custom_video["src"] = "" parent_div.append(custom_video) else: print("") picture_url = [] if len(image_list) > 0: for key, element in enumerate(image_list): # 下载图片至本地,替换标签中的src id = str(int(time.time())) # 下载地址 download_dir = f'{os.path.join(file_dir, f"{id}.jpg")}' # 访问地址 access_address = f'{get_base_file_url()}{table_name.split("_")[1]}/{id}.jpg' # 下载状态 status = download_image(element['src'], download_dir) if status: element['src'] = access_address picture_url.append(access_address) else: print("") content = soup.prettify() # 标题取:作者+日期 title = f"{author}-{datetime.fromtimestamp(release_time_timestamp)}" # title = "" # ---------------- 判断类型 start ---------- # 类型 content_type = "" try: # 查找所有img标签 img_tags = soup.find_all('img') if len(img_tags) > 0: content_type = "图文" else: content_type = "文字" except: content_type = "文字" # ---------------- 判断类型 end ---------- # --------------- 组装数据 start--------------------- obj = { "title": title, "content": content, "link": element_release_list[index].get_attribute("href"), "reptileTime": str(int(time.time())), "type": content_type, "author": author, "releaseTime": release_time, "picture_url": ",".join(picture_url) } # --------------- 组装数据 end--------------------- data.append(obj) if len(data) > 0: # 保存json文件到本地 # log.debug(os.path.abspath("../")) state_save = save_json(os.path.join(file_dir, str(int(time.time())) + ".json"), data) if state_save: log.debug('save file success') else: log.debug('save file failed') else: # 爬取数据为空 log.info("未爬取到数据") # 关闭浏览器驱动 try: browser.close() browser.quit() except: log.debug("浏览器驱动关闭失败") sys.exit() def main(): """ """ # 请求关键词 response = getReptileTask() global status_task # print(response) if response['status_code'] == 200 and response['data']['code'] == 200: log.debug("call success") search_word = "" for item in response['data']['rows']: if item['name'] == 'facebook': search_word = item['keyword'] table_name = item['tableName'] status_task = int(item["status"]) # 简体转繁体 if status_task == 0 and len(search_word) > 0: reptile(None, convert_to_traditional(search_word)) else: log.debug("爬取任务未启用") else: log.debug("call failed") # 请求超时 reptile(None, convert_to_traditional("新闻")) # upload_control() # 全局变量 data = [] table_name = "pms_facebook" file_dir = f'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}' # 调用main函数 main()