import io import json import re import sys import time import loguru # import pymysql.cursors import requests from bs4 import BeautifulSoup import datetime from api.index import importJson, getReptileTask, importJsonPath from utils.Logger import log from utils.index import convert_to_traditional, create_directory_if_not_exists, delete_directory, parse_ltn_time_string # from requests_toolbelt import * from utils.createBrowserDriver import create import opencc from utils.filse import save_json import os from config.settings import get_base_file_url from utils.download_image import download_image # --------------- selenium 依赖 start ---------------- from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC # --------------- selenium 依赖 end ---------------- ''' 爬取台湾PTT论坛的热门帖子,包括帖子的标题、内容【文本、图片、视频】 爬取流程:创建驱动--》打开浏览器--》打开网页--》爬取分类元素--》循环点击--》爬取热门帖子标题--》循环点击--》爬取帖子详情 ''' def reptile(browser=None, search_word=""): # 示例 # url = "https://search.ltn.com.tw/list?keyword=新闻&start_time=20230730&end_time=20230801&type=all&sort=date" # 将时间戳转换为datetime对象 begin_dt_object = datetime.datetime.fromtimestamp(beginFiltrationTime) end_dt_object = datetime.datetime.fromtimestamp(endFiltrationTime) # 将datetime对象格式化为指定的字符串格式 "20230730" filter_start_date = begin_dt_object.strftime("%Y%m%d") filter_end_date = end_dt_object.strftime("%Y%m%d") # 基础url url = f"https://search.ltn.com.tw/list?keyword={search_word}&start_time={str(filter_start_date)}&end_time={str(filter_end_date)}&type=all&sort=date&type=all" browser = browser or create(no_headless=False, using_user_data=True) # 有头模式执行 # browser = browser or create() # 打开网页 browser.get(url + "&page=1") time.sleep(2) # 获取分页 page_next = True # page_next = browser.find_elements("xpath", "//div[@data-desc='分頁']/a[@class='p_next']") page_index = 1 # 循环分页 while page_next: if page_index > 1: browser.get(f"{url}&page={page_index}") time.sleep(0.5) # 重新获取 try: page_next = browser.find_elements("xpath", "//div[@data-desc='分頁']/a[@class='p_next']") except: # 判断页码收否只有一页 # tag_list = browser.find_elements('xpath', "//div[@class='page-name']//ul/li") # if len(tag_list) <= 0: page_next = False # 滚动底部 browser.execute_script("window.scrollTo(0, document.body.scrollHeight);") # log.debug("已打开浏览器") classify_item_list = browser.find_elements('xpath', "//div[@class='page-name']//ul/li") for index, item_element in enumerate(classify_item_list): # 使用BeautifulSoup解析HTML soup = BeautifulSoup(item_element.get_attribute('outerHTML'), 'html.parser') # 发布时间 element_release = item_element.find_element("xpath", "./div/span") # 查找所有img标签 image_list = soup.find_all('img') picture_url = [] img_tag = soup.new_tag("img") if len(image_list) > 0: for key, element in enumerate(image_list): # 下载图片至本地,替换标签中的src id = str(int(time.time())) # 下载地址 download_dir = f'{os.path.join(local_path, f"{id}.jpg")}' # 访问地址 access_address = f'{get_base_file_url()}{table_name.split("_")[1]}/{local_path_name}/{id}.jpg' # 下载状态 if "default" in element['src']: status = False else: status = download_image(element['src'], download_dir) if status: # element['src'] = access_address img_tag["src"] = access_address # 重新组装图片 picture_url.append(download_dir) else: error = "" p = soup.new_tag("p") p.string = item_element.find_element("xpath", "./div/p").text div = soup.new_tag("div") div.append(img_tag) div.append(p) html = div.prettify() date_string = element_release.text date_format = "%a %b %d %H:%M:%S %Y" # 将日期字符串转换为datetime对象 date_time = parse_ltn_time_string(date_string) # print(date_time) # date_time = datetime.datetime.strptime(, date_format) # 将datetime对象转换为时间戳(以秒为单位) try: release_time = int(date_time) except: release_time = int(time.time()) # 过滤时间 if beginFiltrationTime <= release_time <= endFiltrationTime: # --------------- 组装数据 start--------------------- obj = { "title": item_element.find_element("xpath", "./div/a[1]").text, "content": html, "link": item_element.find_element("xpath", "./div/a[1]").get_attribute("href"), "reptileTime": str(int(time.time())), "type": "图文", "author": "自由时报", "releaseTime": str(release_time), "picture_url": ",".join(picture_url) } # --------------- 组装数据 end--------------------- data.append(obj) page_index = page_index + 1 time.sleep(0.1) # 限制最大20页 if page_index >= 21: page_next = False # 退出循环 break if len(data) > 0: # 保存json文件到本地 json_path = os.path.join(local_path, "data.json") state_save = save_json(json_path, data) # 保存task task = { # 爬取时间 "reptileTime": data[0]["reptileTime"], # 本地路径 "localPath": local_path, "beginFiltrationTime": beginFiltrationTime, "endFiltrationTime": endFiltrationTime, "keyword": keyword, "total": len(data) } state_save = save_json(os.path.join(file_dir, "task.json"), task) if state_save: log.debug('save file success') else: log.debug('save file failed') script_close(browser) else: # 爬取数据为空 log.info("no reptile data") # 删除目录 delete_directory(local_path) script_close(browser) def script_close(browser): # 关闭浏览器驱动 try: browser.close() browser.quit() except: log.debug("浏览器驱动关闭失败") try: # 一些代码... sys.exit() except SystemExit: raise # 重新抛出SystemExit异常,让脚本退出 except Exception as e: # 异常处理代码... print("sys.exit() 执行失败") def main(): """ """ # 请求关键词 response = getReptileTask() global status_task global beginFiltrationTime global endFiltrationTime global keyword if response['status_code'] == 200 and response['data']['code'] == 200: log.debug("call success") search_word = "" for item in response['data']['rows']: if item['name'] == 'ltn-自由时报': search_word = item['keyword'] table_name = item['tableName'] status_task = int(item["status"]) keyword = str(item["keyword"]) beginFiltrationTime = int(item["beginFiltrationTime"]) endFiltrationTime = int(item["endFiltrationTime"]) # 简体转繁体 if status_task == 0 and len(search_word) > 0: reptile(None, convert_to_traditional(search_word)) else: log.debug("爬取任务未启用") else: log.debug("call failed") # 请求超时 reptile(None, convert_to_traditional("新闻")) # upload_control() # 全局变量 data = [] # 任务详情 task = {} table_name = "pms_ltn" # 全局字段 keyword = "" # 过滤时间开始 beginFiltrationTime = int(123) # 过滤时间结束 endFiltrationTime = int(123) # 文件根目录 file_dir = f'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}' # 任务目录名称 local_path_name = str(int(time.time())) # 任务目录路径 local_path = f'{os.path.join(file_dir, local_path_name)}' # 任务目录是否创建 local_path_status = create_directory_if_not_exists(local_path) # 是否启用 status_task = 0 # 调用main函数 main()