import io import json import re import sys import time import loguru # import pymysql.cursors import requests from bs4 import BeautifulSoup from datetime import datetime from api.index import importJson, getReptileTask, importJsonPath from utils.Logger import log from utils.index import convert_to_traditional # from requests_toolbelt import * from utils.createBrowserDriver import create import opencc from utils.filse import save_json import os from config.settings import get_base_file_url from utils.download_image import download_image # --------------- selenium 依赖 start ---------------- from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC # --------------- selenium 依赖 end ---------------- ''' 爬取台湾PTT论坛的热门帖子,包括帖子的标题、内容【文本、图片、视频】 爬取流程:创建驱动--》打开浏览器--》打开网页--》爬取分类元素--》循环点击--》爬取热门帖子标题--》循环点击--》爬取帖子详情 ''' def reptile(browser=None, search_word=""): url = "https://www.ptt.cc/bbs/hotboards.html" browser = browser or create(no_headless=True, using_user_data=True) # 有头模式执行 # browser = browser or create() # 打开网页 browser.get(url) # log.debug("已打开浏览器") classify_item_list = browser.find_elements('xpath', "//div[@class='board-class']") # log.debug(classify_item_list) # classify_item_list = item_list.copy() length = len(classify_item_list) for index in range(length): # 暂时先爬取 第2个 分类 if 0 <= index < 4: type_title = classify_item_list[index].text classify_item_list[index].click() time.sleep(0.1) if index == 0: try: button = browser.find_element("xpath", "//form/div[1]//button") button.click() except: error = "" wait = WebDriverWait(browser, 10) wait.until(EC.presence_of_element_located((By.XPATH, "//div[@class='r-ent']"))) element_list = browser.find_elements('xpath', "//div[@class='r-ent']//div[@class='title']//a") length_two = len(element_list) for index_two in range(length_two): # print(element_list[index_two].text) try: re.findall("公告", element_list[index_two].text) except IndexError: log.debug(f"正在爬取分类:{type_title}-第{index_two + 1}条") print("当前连接:"+str(browser.current_url)) print(data[len(data)-1]["title"]) # 标题不包含"公告"和"看板" if re.findall("公告", element_list[index_two].text) or re.findall("看板", element_list[index_two].text): a = 1 else: # 使用正则表达式进行匹配 # matches = # log.debug(element_list[index_two].text+str(matches)) # 打印匹配结果 # if matches: # log.debug(f"找到了匹配的字符串:{matches}") element_list[index_two].click() time.sleep(0.1) # 原链接 browser_current_url = browser.current_url # print(browser_current_url) # log.debug('网页链接' + str(browser_current_url)) try: # 获取帖子详情 element_title = browser.find_element('xpath', "//div[@id='main-content']/div[3]//span[@class='article-meta-value']") except: log.error( "xpath 找不到元素://div[@id='main-content']/div[3]//span[@class='article-meta-value']") log.debug(f'页面链接:{browser_current_url}') # 浏览器返回上一页 browser.back() element_list = browser.find_elements('xpath', "//div[@class='r-ent']//div[@class='title']//a") break # 内容可能包含图片和视频,需要后处理 element_content = browser.find_element('xpath', "//div[@id='main-content']") # 去除herf属性值包含'img'的a标签 # ------------------------------------ # 使用BeautifulSoup解析HTML soup = BeautifulSoup(element_content.get_attribute('outerHTML'), 'html.parser') # 作者 element_author = browser.find_element('xpath', "//div[@id='main-content']/div[@class='article-metaline'][1]/span[2]") # 发布时间 element_release = browser.find_element('xpath', "//div[@id='main-content']/div[@class='article-metaline'][3]/span[2]") date_string = element_release.text date_format = "%a %b %d %H:%M:%S %Y" # 将日期字符串转换为datetime对象 date_time = datetime.strptime(date_string, date_format) # 将datetime对象转换为时间戳(以秒为单位) release_time = int(date_time.timestamp()) # log.debug('开始判断类型') try: # 找到所有第一级标签为 `div` 的元素 div_elements = soup.find_all('div') # log.debug("一级div数量:" + str(len(div_elements))) # 逐个删除这些元素 for key, div in enumerate(div_elements): if key > 0: div.extract() # 删除第一级span span_element = soup.find_all('span') # log.debug("一级span数量:" + str(len(span_element))) for span in span_element: span.extract() except: # log.debug("删除第一级div失败") a = 2 # ---------------- 判断类型 start ---------- # 类型 content_type = "" # 查找所有img标签 image_list = soup.find_all('img') try: if len(image_list) > 0: content_type = "图文" else: content_type = "文字" except: content_type = "文字" picture_url = [] if len(image_list) > 0: for key, element in enumerate(image_list): # 下载图片至本地,替换标签中的src id = str(int(time.time())) # 下载地址 download_dir = f'{os.path.join(file_dir, f"{id}.jpg")}' # 访问地址 access_address = f'{get_base_file_url()}{table_name.split("_")[1]}/{id}.jpg' # 下载状态 status = download_image(element['src'], download_dir) if status: element['src'] = access_address picture_url.append(download_dir) else: # print("") error = "" # ---------------- 判断类型 end ---------- # log.debug('开始内容过滤') # ------------------ content 过滤 start-------------- try: # 查找所有的标签 a_tags = soup.find_all('a', href=True) # log.debug("a标签数量:" + str(len(a_tags))) # 循环遍历标签,检查每个标签是否包含元素,如果包含则删除该标签 for tag in a_tags: tag.decompose() except: # log.debug("查找所有的标签失败") a = 1 html = soup.prettify().replace('amp;', '') # ------------------ content 过滤 end-------------- # --------------- 组装数据 start--------------------- obj = { "title": element_title.text, "content": html, "link": browser_current_url, "reptileTime": str(int(time.time())), "type": content_type, "author": element_author.text, "releaseTime": str(release_time), "picture_url": ",".join(picture_url) } # --------------- 组装数据 end--------------------- # 使用正则表达式进行匹配 # log.debug(f"关键词:{search_word}-{element_title.text}") matches = re.findall(search_word, element_title.text) # 打印匹配结果 if matches: # log.debug(f"找到了匹配的字符串:{matches}") data.append(obj) else: # log.debug("未找到匹配的字符串") a = 3 # 浏览器返回上一页 browser.back() time.sleep(0.1) element_list = browser.find_elements('xpath', "//div[@class='r-ent']//div[@class='title']//a") # 浏览器返回上一页 browser.back() if index == 0: browser.back() time.sleep(0.1) # 重新获取 classify_item_list = browser.find_elements('xpath', "//div[@class='board-class']") # 发送爬取数据到java服务 # print('----------------------') # print(data) if len(data) > 0: # 保存json文件到本地 # log.debug(os.path.abspath("../")) state_save = save_json(os.path.join(file_dir, str(int(time.time())) + ".json"), data) if state_save: log.debug('save file success') else: log.debug('save file failed') # script_close(browser) else: # 爬取数据为空 log.info("未爬取到数据") # script_close(browser) def script_close(browser): # 关闭浏览器驱动 try: browser.close() browser.quit() except: log.debug("浏览器驱动关闭失败") sys.exit() def main(): """ """ # 请求关键词 response = getReptileTask() global status_task # print(response) if response['status_code'] == 200 and response['data']['code'] == 200: log.debug("call success") search_word = "" for item in response['data']['rows']: if item['name'] == 'ptt': search_word = item['keyword'] table_name = item['tableName'] status_task = int(item["status"]) # 简体转繁体 if status_task == 0 and len(search_word) > 0: reptile(None, convert_to_traditional(search_word)) else: log.debug("爬取任务未启用") else: log.debug("call failed") # 请求超时 reptile(None, convert_to_traditional("新闻")) # upload_control() # 全局变量 data = [] table_name = "pms_ptt" file_dir = f'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}' # 是否启用 status_task = '0' # 调用main函数 main()