import json import time from bs4 import BeautifulSoup from utils.Logger import log from utils.createBrowserDriver import create from utils.filse import save_json from api.index import importJson, getReptileTask, importJsonPath from utils.index import convert_to_traditional, yt_dlp_download, convert_string_to_time, parse_twitter_time_string, \ is_base64_image, save_base64_image, get_screen_resolution,create_directory_if_not_exists, delete_directory # from pytube import YouTube from selenium.common.exceptions import NoSuchElementException import os import sys from datetime import datetime from utils.download_image import download_image from config.settings import get_base_file_url # --------------- selenium 依赖 start ---------------- from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC # --------------- selenium 依赖 end ---------------- # 工具函数-下载图片 ''' 打开指定网页,并使用 Selenium 模拟点击 "GDPR-accept" 按钮,然后循环点击 "search-show-more-button" 按钮来加载更多数据,直到按钮不再可点击为止。最后,获取完整的分页数据并关闭浏览器驱动。 ''' def reptile(browser=None, search_word=""): """ :param browser: :param search_word: """ print(f"搜索词:{search_word}") base_url = "https://www.dcard.tw" browser = browser or create(no_headless=False, using_user_data=True) # 打开网页 browser.get(f"{base_url}/search?query={search_word}") time.sleep(6) base_xpath = "//div[@role='main']//div[@data-key]//article" # 内容块 element_content_list = browser.find_elements('xpath', base_xpath) # 作者 element_authors_list = browser.find_elements('xpath', f"{base_xpath}/div[1]/div[1]/div[2]/div/div[1]") # 时间 element_time_list = browser.find_elements('xpath', f"{base_xpath}/div[1]/div[1]/div[2]/div/div[2]/time") # 标题 element_title_list = browser.find_elements('xpath', f"{base_xpath}//h2") # 点赞 element_like_list = browser.find_elements('xpath', f"{base_xpath}/div[3]/div[1]/div/div[2]") # 评论 element_comment_list = browser.find_elements('xpath', f"{base_xpath}/div[3]/div[2]/div/span") for index,item in enumerate(element_content_list): # 提取时间,并转为时间戳 timestamp = datetime.fromisoformat(element_time_list[index].get_attribute("datetime")[:-1]).timestamp() # 过滤时间 # # 如果'releaseTime'不是整数,则将其转换为整数 new_releaseTime = int(timestamp) if new_releaseTime < beginFiltrationTime or new_releaseTime > endFiltrationTime: # 如果'new_releaseTime'不在指定范围内,则跳过当前迭代,继续下一个项目 continue # 提取作者 author = element_authors_list[index].text # 提取标题 title = element_title_list[index].text # 提取点赞 like = element_like_list[index].text # 提取评论 comment = element_comment_list[index].text # -------------提取内容--------------- element_content_list[index].click() # 等待弹窗内容出现,设置最长等待时间为10秒 wait = WebDriverWait(browser, 10) # 通过 expected_conditions 来定义等待条件,这里以弹窗内容的某个元素为例 wait.until(EC.presence_of_element_located((By.XPATH, "//div[@data-testid='overlay']"))) time.sleep(3) click_dom = browser.find_element("xpath", "//div[@data-testid='overlay']") # 处理弹窗内容加载失败的情况 try: browser.find_element("xpath", "//div[@data-testid='overlay']//h2[text()='發生錯誤']") error_status = True except NoSuchElementException: error_status = False if error_status: # click_dom.click() browser.back() time.sleep(0.5) # continue close_button = browser.find_element("xpath", "//div[@data-testid='overlay']/div[2]/div[1]/div/div//button[@aria-label='關閉']") content_element = browser.find_element("xpath", "//div[@data-testid='overlay']//article//div[3]") soup = BeautifulSoup(content_element.get_attribute("outerHTML"), "html.parser") # 提取链接 link_str = browser.current_url # ------------- 处理内容中的视频 start ------------------ video_list = soup.find_all("video") for key, item in enumerate(video_list): item['src'] = "" # ------------- 处理内容中的视频 end ------------------ # ------------- 处理内容中的图片 start ------------------ picture_url = [] image_list = soup.find_all("img") if len(image_list) > 0: for key, element in enumerate(image_list): # 下载图片至本地,替换标签中的src id = str(int(time.time())) # 下载地址 download_dir = f'{os.path.join(local_path, f"{id}.jpg")}' # 访问地址 access_address = f'{get_base_file_url()}{table_name.split("_")[1]}/{local_path_name}/{id}.jpg' if is_base64_image(element['src']) == False: log.debug("图片属于 url") # 下载状态 status = download_image(element['src'], download_dir) if status: element['src'] = access_address picture_url.append(access_address) else: log.debug("图片属于 base64") # 下载base 64 # 下载状态 status = save_base64_image(element['src'], download_dir) if status: element['src'] = access_address picture_url.append(access_address) else: print("") # ------------- 处理内容中的图片 end ------------------ content = soup.prettify() # ---------------- 判断类型 start ---------- # 类型 content_type = "" try: # 查找所有img标签 img_tags = soup.find_all('img') if len(img_tags) > 0: content_type = "图文" else: content_type = "文字" except: content_type = "文字" # ---------------- 判断类型 end ---------- # --------------- 组装数据 start--------------------- obj = { "title": title, "content": content, "link": link_str, "reptileTime": str(int(time.time())), "type": content_type, "author": author, "releaseTime": str(int(timestamp)), "picture_url": ",".join(picture_url) } # --------------- 组装数据 end--------------------- data.append(obj) close_button.click() # 发送爬取数据到java服务 # print('----------------------') # print(data) if len(data) > 0: # 保存json文件到本地 json_path = os.path.join(local_path, "data.json") state_save = save_json(json_path, data) # 保存task task = { # 爬取时间 "reptileTime": data[0]["reptileTime"], # 本地路径 "localPath": local_path, "beginFiltrationTime": beginFiltrationTime, "endFiltrationTime": endFiltrationTime, "keyword": keyword } state_save = save_json(os.path.join(file_dir, "task.json"), task) if state_save: log.debug('save file success') else: log.debug('save file failed') script_close(browser) else: # 爬取数据为空 log.info("未爬取到数据") # 删除目录 delete_directory(local_path) script_close(browser) def script_close(browser): # 关闭浏览器驱动 try: browser.close() browser.quit() except: log.debug("浏览器驱动关闭失败") try: # 一些代码... sys.exit() except SystemExit: raise # 重新抛出SystemExit异常,让脚本退出 except Exception as e: # 异常处理代码... print("sys.exit() 执行失败") def main(): """ """ # 请求关键词 response = getReptileTask() global status_task global beginFiltrationTime global endFiltrationTime global keyword if response['status_code'] == 200 and response['data']['code'] == 200: log.debug("call success") search_word = "" for item in response['data']['rows']: if item['name'] == 'dcard': search_word = item['keyword'] table_name = item['tableName'] status_task = int(item["status"]) keyword = str(item["keyword"]) beginFiltrationTime = int(item["beginFiltrationTime"]) endFiltrationTime = int(item["endFiltrationTime"]) # 简体转繁体 if status_task == 0 and len(search_word) > 0: reptile(None, convert_to_traditional(search_word)) else: log.debug("爬取任务未启用") else: log.debug("call failed") # 请求超时 reptile(None, convert_to_traditional("新闻")) # upload_control() # 全局变量 data = [] # 任务详情 task = {} table_name = "pms_dcard" # 全局字段 keyword = "" # 过滤时间开始 beginFiltrationTime = int(123) # 过滤时间结束 endFiltrationTime = int(123) # 文件根目录 file_dir = f'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}' # 任务目录名称 local_path_name = str(int(time.time())) # 任务目录路径 local_path = f'{os.path.join(file_dir, local_path_name)}' # 任务目录是否创建 local_path_status = create_directory_if_not_exists(local_path) # 是否启用 status_task = 0 # 调用main函数 main()