import io
import json
import re
import sys
import time
import loguru
# import pymysql.cursors
import requests
from bs4 import BeautifulSoup
from datetime import datetime
from api.index import importJson, getReptileTask, importJsonPath
from utils.Logger import log
from utils.index import convert_to_traditional
# from requests_toolbelt import *
from utils.createBrowserDriver import create
import opencc
from utils.filse import save_json
import os
from config.settings import get_base_file_url
from utils.download_image import download_image
# --------------- selenium 依赖 start ----------------
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# --------------- selenium 依赖 end ----------------
'''
爬取台湾PTT论坛的热门帖子,包括帖子的标题、内容【文本、图片、视频】
爬取流程:创建驱动--》打开浏览器--》打开网页--》爬取分类元素--》循环点击--》爬取热门帖子标题--》循环点击--》爬取帖子详情
'''
def reptile(browser=None, search_word=""):
url = "https://www.ptt.cc/bbs/hotboards.html"
browser = browser or create(no_headless=True, using_user_data=True)
# 有头模式执行
# browser = browser or create()
# 打开网页
browser.get(url)
# log.debug("已打开浏览器")
classify_item_list = browser.find_elements('xpath', "//div[@class='board-class']")
# log.debug(classify_item_list)
# classify_item_list = item_list.copy()
length = len(classify_item_list)
for index in range(length):
# 暂时先爬取 第2个 分类
if 0 <= index < 4:
type_title = classify_item_list[index].text
classify_item_list[index].click()
time.sleep(0.1)
if index == 0:
try:
button = browser.find_element("xpath", "//form/div[1]//button")
button.click()
except:
error = ""
wait = WebDriverWait(browser, 10)
wait.until(EC.presence_of_element_located((By.XPATH, "//div[@class='r-ent']")))
element_list = browser.find_elements('xpath', "//div[@class='r-ent']//div[@class='title']//a")
length_two = len(element_list)
for index_two in range(length_two):
# print(element_list[index_two].text)
try:
re.findall("公告", element_list[index_two].text)
except IndexError:
log.debug(f"正在爬取分类:{type_title}-第{index_two + 1}条")
print("当前连接:"+str(browser.current_url))
print(data[len(data)-1]["title"])
# 标题不包含"公告"和"看板"
if re.findall("公告", element_list[index_two].text) or re.findall("看板", element_list[index_two].text):
a = 1
else:
# 使用正则表达式进行匹配
# matches =
# log.debug(element_list[index_two].text+str(matches))
# 打印匹配结果
# if matches:
# log.debug(f"找到了匹配的字符串:{matches}")
element_list[index_two].click()
time.sleep(0.1)
# 原链接
browser_current_url = browser.current_url
# print(browser_current_url)
# log.debug('网页链接' + str(browser_current_url))
try:
# 获取帖子详情
element_title = browser.find_element('xpath',
"//div[@id='main-content']/div[3]//span[@class='article-meta-value']")
except:
log.error(
"xpath 找不到元素://div[@id='main-content']/div[3]//span[@class='article-meta-value']")
log.debug(f'页面链接:{browser_current_url}')
# 浏览器返回上一页
browser.back()
element_list = browser.find_elements('xpath', "//div[@class='r-ent']//div[@class='title']//a")
break
# 内容可能包含图片和视频,需要后处理
element_content = browser.find_element('xpath', "//div[@id='main-content']")
# 去除herf属性值包含'img'的a标签
# ------------------------------------
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(element_content.get_attribute('outerHTML'), 'html.parser')
# 作者
element_author = browser.find_element('xpath',
"//div[@id='main-content']/div[@class='article-metaline'][1]/span[2]")
# 发布时间
element_release = browser.find_element('xpath',
"//div[@id='main-content']/div[@class='article-metaline'][3]/span[2]")
date_string = element_release.text
date_format = "%a %b %d %H:%M:%S %Y"
# 将日期字符串转换为datetime对象
date_time = datetime.strptime(date_string, date_format)
# 将datetime对象转换为时间戳(以秒为单位)
release_time = int(date_time.timestamp())
# log.debug('开始判断类型')
try:
# 找到所有第一级标签为 `div` 的元素
div_elements = soup.find_all('div')
# log.debug("一级div数量:" + str(len(div_elements)))
# 逐个删除这些元素
for key, div in enumerate(div_elements):
if key > 0:
div.extract()
# 删除第一级span
span_element = soup.find_all('span')
# log.debug("一级span数量:" + str(len(span_element)))
for span in span_element:
span.extract()
except:
# log.debug("删除第一级div失败")
a = 2
# ---------------- 判断类型 start ----------
# 类型
content_type = ""
# 查找所有img标签
image_list = soup.find_all('img')
try:
if len(image_list) > 0:
content_type = "图文"
else:
content_type = "文字"
except:
content_type = "文字"
picture_url = []
if len(image_list) > 0:
for key, element in enumerate(image_list):
# 下载图片至本地,替换标签中的src
id = str(int(time.time()))
# 下载地址
download_dir = f'{os.path.join(file_dir, f"{id}.jpg")}'
# 访问地址
access_address = f'{get_base_file_url()}{table_name.split("_")[1]}/{id}.jpg'
# 下载状态
status = download_image(element['src'], download_dir)
if status:
element['src'] = access_address
picture_url.append(download_dir)
else:
# print("")
error = ""
# ---------------- 判断类型 end ----------
# log.debug('开始内容过滤')
# ------------------ content 过滤 start--------------
try:
# 查找所有的标签
a_tags = soup.find_all('a', href=True)
# log.debug("a标签数量:" + str(len(a_tags)))
# 循环遍历标签,检查每个标签是否包含
元素,如果包含则删除该标签
for tag in a_tags:
tag.decompose()
except:
# log.debug("查找所有的标签失败")
a = 1
html = soup.prettify().replace('amp;', '')
# ------------------ content 过滤 end--------------
# --------------- 组装数据 start---------------------
obj = {
"title": element_title.text,
"content": html,
"link": browser_current_url,
"reptileTime": str(int(time.time())),
"type": content_type,
"author": element_author.text,
"releaseTime": str(release_time),
"picture_url": ",".join(picture_url)
}
# --------------- 组装数据 end---------------------
# 使用正则表达式进行匹配
# log.debug(f"关键词:{search_word}-{element_title.text}")
matches = re.findall(search_word, element_title.text)
# 打印匹配结果
if matches:
# log.debug(f"找到了匹配的字符串:{matches}")
data.append(obj)
else:
# log.debug("未找到匹配的字符串")
a = 3
# 浏览器返回上一页
browser.back()
time.sleep(0.1)
element_list = browser.find_elements('xpath', "//div[@class='r-ent']//div[@class='title']//a")
# 浏览器返回上一页
browser.back()
if index == 0:
browser.back()
time.sleep(0.1)
# 重新获取
classify_item_list = browser.find_elements('xpath', "//div[@class='board-class']")
# 发送爬取数据到java服务
# print('----------------------')
# print(data)
if len(data) > 0:
# 保存json文件到本地
# log.debug(os.path.abspath("../"))
state_save = save_json(os.path.join(file_dir, str(int(time.time())) + ".json"), data)
if state_save:
log.debug('save file success')
else:
log.debug('save file failed')
# script_close(browser)
else:
# 爬取数据为空
log.info("未爬取到数据")
# script_close(browser)
def script_close(browser):
# 关闭浏览器驱动
try:
browser.close()
browser.quit()
except:
log.debug("浏览器驱动关闭失败")
sys.exit()
def main():
"""
"""
# 请求关键词
response = getReptileTask()
global status_task
# print(response)
if response['status_code'] == 200 and response['data']['code'] == 200:
log.debug("call success")
search_word = ""
for item in response['data']['rows']:
if item['name'] == 'ptt':
search_word = item['keyword']
table_name = item['tableName']
status_task = int(item["status"])
# 简体转繁体
if status_task == 0 and len(search_word) > 0:
reptile(None, convert_to_traditional(search_word))
else:
log.debug("爬取任务未启用")
else:
log.debug("call failed")
# 请求超时
reptile(None, convert_to_traditional("新闻"))
# upload_control()
# 全局变量
data = []
table_name = "pms_ptt"
file_dir = f'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}'
# 是否启用
status_task = '0'
# 调用main函数
main()