import io import json import re import sys import time import loguru # import pymysql.cursors import requests from bs4 import BeautifulSoup from datetime import datetime from api.index import importJson, getReptileTask, importJsonPath from utils.Logger import log from utils.index import convert_to_traditional, create_directory_if_not_exists, delete_directory # from requests_toolbelt import * from utils.createBrowserDriver import create import opencc from utils.filse import save_json import os from config.settings import get_base_file_url from utils.download_image import download_image # --------------- selenium 依赖 start ---------------- from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC # --------------- selenium 依赖 end ---------------- ''' 爬取台湾PTT论坛的热门帖子,包括帖子的标题、内容【文本、图片、视频】 爬取流程:创建驱动--》打开浏览器--》打开网页--》爬取分类元素--》循环点击--》爬取热门帖子标题--》循环点击--》爬取帖子详情 ''' def reptile(browser=None, search_word=""): url = "https://www.ptt.cc/bbs/hotboards.html" browser = browser or create(no_headless=False, using_user_data=True) # 有头模式执行 # browser = browser or create() # 打开网页 browser.get(url) # log.debug("已打开浏览器") classify_item_list = browser.find_elements('xpath', "//div[@class='board-class']") for index, item_element in enumerate(classify_item_list): # 暂时先爬取 第2个 分类 if 0 <= index < 4: type_title = classify_item_list[index].text # 进入分类页面 classify_item_list[index].click() time.sleep(0.1) # 综合分类需要确认已满18周岁 if index == 0: try: button = browser.find_element("xpath", "//form/div[1]//button") button.click() except: error = "" wait = WebDriverWait(browser, 10) wait.until(EC.presence_of_element_located((By.XPATH, "//div[@class='r-ent']"))) element_list = browser.find_elements('xpath', "//div[@class='r-ent']//div[@class='title']//a") for index_two, item in enumerate(element_list): # print(element_list[index_two].text) try: re.findall("公告", item.text) except IndexError: log.debug(f"正在爬取分类:{type_title}-第{index_two + 1}条") print("当前连接:" + str(browser.current_url)) print(data[len(data) - 1]["title"]) # 使用正则表达式进行匹配关键词 if re.findall(search_word, item.text): # log.debug(f"找到了匹配的字符串:{matches}") error = "" else: # 本次迭代帖子标题未匹配关键词,退出本次迭代,进入下一次迭代 continue # 标题不包含"公告"和"看板" if re.findall("公告", element_list[index_two].text) or re.findall("看板", element_list[index_two].text): a = 1 else: # 使用正则表达式进行匹配 # matches = # log.debug(element_list[index_two].text+str(matches)) # 打印匹配结果 # if matches: # log.debug(f"找到了匹配的字符串:{matches}") element_list[index_two].click() time.sleep(0.1) # 原链接 browser_current_url = browser.current_url # print(browser_current_url) # log.debug('网页链接' + str(browser_current_url)) try: # 获取帖子详情 element_title = browser.find_element('xpath', "//div[@id='main-content']/div[3]//span[@class='article-meta-value']") except: log.error( "xpath 找不到元素://div[@id='main-content']/div[3]//span[@class='article-meta-value']") log.debug(f'页面链接:{browser_current_url}') # 浏览器返回上一页 browser.back() element_list = browser.find_elements('xpath', "//div[@class='r-ent']//div[@class='title']//a") break # 内容可能包含图片和视频,需要后处理 element_content = browser.find_element('xpath', "//div[@id='main-content']") # 去除herf属性值包含'img'的a标签 # ------------------------------------ # 使用BeautifulSoup解析HTML soup = BeautifulSoup(element_content.get_attribute('outerHTML'), 'html.parser') # 作者 element_author = browser.find_element('xpath', "//div[@id='main-content']/div[@class='article-metaline'][1]/span[2]") # 发布时间 element_release = browser.find_element('xpath', "//div[@id='main-content']/div[@class='article-metaline'][3]/span[2]") # log.debug('开始判断类型') try: # 找到所有第一级标签为 `div` 的元素 div_elements = soup.find_all('div') # log.debug("一级div数量:" + str(len(div_elements))) # 逐个删除这些元素 for key, div in enumerate(div_elements): if key > 0: div.extract() # 删除第一级span span_element = soup.find_all('span') # log.debug("一级span数量:" + str(len(span_element))) for span in span_element: span.extract() except: # log.debug("删除第一级div失败") a = 2 # ---------------- 判断类型 start ---------- # 类型 content_type = "" # 查找所有img标签 image_list = soup.find_all('img') try: if len(image_list) > 0: content_type = "图文" else: content_type = "文字" except: content_type = "文字" picture_url = [] if len(image_list) > 0: for key, element in enumerate(image_list): # 下载图片至本地,替换标签中的src id = str(int(time.time())) # 下载地址 download_dir = f'{os.path.join(local_path, f"{id}.jpg")}' # 访问地址 access_address = f'{get_base_file_url()}{table_name.split("_")[1]}/{local_path_name}/{id}.jpg' # 下载状态 status = download_image(element['src'], download_dir) if status: element['src'] = access_address picture_url.append(download_dir) else: # print("") error = "" # ---------------- 判断类型 end ---------- # log.debug('开始内容过滤') # ------------------ content 过滤 start-------------- try: # 查找所有的<a>标签 a_tags = soup.find_all('a', href=True) # log.debug("a标签数量:" + str(len(a_tags))) # 循环遍历<a>标签,检查每个<a>标签是否包含<img>元素,如果包含则删除该<a>标签 for tag in a_tags: tag.decompose() except: # log.debug("查找所有的<a>标签失败") a = 1 html = soup.prettify().replace('amp;', '') # ------------------ content 过滤 end-------------- date_string = element_release.text date_format = "%a %b %d %H:%M:%S %Y" # 将日期字符串转换为datetime对象 date_time = datetime.strptime(date_string, date_format) # 将datetime对象转换为时间戳(以秒为单位) release_time = int(date_time.timestamp()) # 过滤时间 if filter_time_start <= release_time <= filter_time_end: # --------------- 组装数据 start--------------------- obj = { "title": element_title.text, "content": html, "link": browser_current_url, "reptileTime": str(int(time.time())), "type": content_type, "author": element_author.text, "releaseTime": str(release_time), "picture_url": ",".join(picture_url) } # --------------- 组装数据 end--------------------- data.append(obj) # 浏览器返回上一页 browser.back() time.sleep(0.1) element_list = browser.find_elements('xpath', "//div[@class='r-ent']//div[@class='title']//a") print("循环结束") # 浏览器返回上一页 browser.back() if index == 0: browser.back() time.sleep(0.1) # 重新获取 classify_item_list = browser.find_elements('xpath', "//div[@class='board-class']") # 发送爬取数据到java服务 # print('----------------------') # print(data) if len(data) > 0: # 保存json文件到本地 json_path = os.path.join(local_path, "data.json") state_save = save_json(json_path, data) # 保存task task = { # 爬取时间 "reptileTime": data[0]["reptileTime"], # 本地路径 "localPath": local_path } state_save = save_json(os.path.join(file_dir, "task.json"), task) if state_save: log.debug('save file success') else: log.debug('save file failed') script_close(browser) else: # 爬取数据为空 log.info("未爬取到数据") # 删除目录 delete_directory(local_path) script_close(browser) def script_close(browser): # 关闭浏览器驱动 try: browser.close() browser.quit() except: log.debug("浏览器驱动关闭失败") try: # 一些代码... sys.exit() except SystemExit: raise # 重新抛出SystemExit异常,让脚本退出 except Exception as e: # 异常处理代码... print("sys.exit() 执行失败") def main(): """ """ # 请求关键词 response = getReptileTask() global status_task global filter_time_start global filter_time_end if response['status_code'] == 200 and response['data']['code'] == 200: log.debug("call success") search_word = "" for item in response['data']['rows']: if item['name'] == 'ptt': search_word = item['keyword'] table_name = item['tableName'] status_task = int(item["status"]) filter_time_start = int(item["beginFiltrationTime"]) filter_time_end = int(item["endFiltrationTime"]) # 简体转繁体 if status_task == 0 and len(search_word) > 0: reptile(None, convert_to_traditional(search_word)) else: log.debug("爬取任务未启用") else: log.debug("call failed") # 请求超时 reptile(None, convert_to_traditional("新闻")) # upload_control() # 全局变量 data = [] # 任务详情 task = {} table_name = "pms_ptt" # 过滤时间开始 filter_time_start = int(123) # 过滤时间结束 filter_time_end = int(123) # 文件根目录 file_dir = f'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}' # 任务目录名称 local_path_name = str(int(time.time())) # 任务目录路径 local_path = f'{os.path.join(file_dir, local_path_name)}' # 任务目录是否创建 local_path_status = create_directory_if_not_exists(local_path) # 是否启用 status_task = 0 # 调用main函数 main()