Commit 15d41825 authored by liyang's avatar liyang

feat:1.编写自由时报爬虫脚本

2.编写数据量统计脚本
parent a85b0f30
import io
import json
import re
import sys
import time
import loguru
# import pymysql.cursors
import requests
from bs4 import BeautifulSoup
import datetime
from api.index import importJson, getReptileTask, importJsonPath
from utils.Logger import log
from utils.index import convert_to_traditional, create_directory_if_not_exists, delete_directory, parse_ltn_time_string
# from requests_toolbelt import *
from utils.createBrowserDriver import create
import opencc
from utils.filse import save_json
import os
from config.settings import get_base_file_url
from utils.download_image import download_image
# --------------- selenium 依赖 start ----------------
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
# --------------- selenium 依赖 end ----------------
'''
爬取台湾PTT论坛的热门帖子,包括帖子的标题、内容【文本、图片、视频】
爬取流程:创建驱动--》打开浏览器--》打开网页--》爬取分类元素--》循环点击--》爬取热门帖子标题--》循环点击--》爬取帖子详情
'''
def reptile(browser=None, search_word=""):
# 示例
# url = "https://search.ltn.com.tw/list?keyword=新闻&start_time=20230730&end_time=20230801&type=all&sort=date"
# 将时间戳转换为datetime对象
begin_dt_object = datetime.datetime.fromtimestamp(beginFiltrationTime)
end_dt_object = datetime.datetime.fromtimestamp(endFiltrationTime)
# 将datetime对象格式化为指定的字符串格式 "20230730"
filter_start_date = begin_dt_object.strftime("%Y%m%d")
filter_end_date = end_dt_object.strftime("%Y%m%d")
# 基础url
url = f"https://search.ltn.com.tw/list?keyword={search_word}&start_time={str(filter_start_date)}&end_time={str(filter_end_date)}&type=all&sort=date&type=all"
browser = browser or create(no_headless=False, using_user_data=True)
# 有头模式执行
# browser = browser or create()
# 打开网页
browser.get(url+"&page=1")
time.sleep(3)
# 获取分页
page_list_element = browser.find_elements("xpath", "//div[@data-desc='分頁']/a[@class='p_num' or @class='active']")
# 循环分页
for key, element in enumerate(page_list_element):
if key > 0 or key <= len(page_list_element) - 1:
# 点击分页
browser.get(f"{url}&page={key+1}")
# element.click()
time.sleep(3)
# 重新获取
page_list_element = browser.find_elements("xpath", "//div[@data-desc='分頁']/a")
elif key == len(page_list_element) - 1:
# 退出本次循环迭代
continue
# 滚动底部
browser.execute_script("window.scrollTo(0, document.body.scrollHeight);")
# log.debug("已打开浏览器")
classify_item_list = browser.find_elements('xpath', "//div[@class='page-name']//ul/li")
for index, item_element in enumerate(classify_item_list):
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(item_element.get_attribute('outerHTML'), 'html.parser')
# 发布时间
element_release = item_element.find_element("xpath", "./div/span")
# 查找所有img标签
image_list = soup.find_all('img')
picture_url = []
img_tag = soup.new_tag("img")
if len(image_list) > 0:
for key, element in enumerate(image_list):
# 下载图片至本地,替换标签中的src
id = str(int(time.time()))
# 下载地址
download_dir = f'{os.path.join(local_path, f"{id}.jpg")}'
# 访问地址
access_address = f'{get_base_file_url()}{table_name.split("_")[1]}/{local_path_name}/{id}.jpg'
# 下载状态
status = download_image(element['src'], download_dir)
if status:
# element['src'] = access_address
img_tag["src"] = access_address
# 重新组装图片
picture_url.append(download_dir)
else:
error = ""
p = soup.new_tag("p")
p.string = item_element.find_element("xpath", "./div/p").text
div = soup.new_tag("div")
div.append(img_tag)
div.append(p)
html = div.prettify()
date_string = element_release.text
date_format = "%a %b %d %H:%M:%S %Y"
# 将日期字符串转换为datetime对象
date_time = parse_ltn_time_string(date_string)
# date_time = datetime.datetime.strptime(, date_format)
# 将datetime对象转换为时间戳(以秒为单位)
release_time = int(date_time)
# 过滤时间
if beginFiltrationTime <= release_time <= endFiltrationTime:
# --------------- 组装数据 start---------------------
obj = {
"title": item_element.find_element("xpath", "./div/a[1]").text,
"content": html,
"link": item_element.find_element("xpath", "./div/a[1]").get_attribute("href"),
"reptileTime": str(int(time.time())),
"type": "图文",
"author": "自由时报",
"releaseTime": str(release_time),
"picture_url": ",".join(picture_url)
}
# --------------- 组装数据 end---------------------
data.append(obj)
if len(data) > 0:
# 保存json文件到本地
json_path = os.path.join(local_path, "data.json")
state_save = save_json(json_path, data)
# 保存task
task = {
# 爬取时间
"reptileTime": data[0]["reptileTime"],
# 本地路径
"localPath": local_path,
"beginFiltrationTime": beginFiltrationTime,
"endFiltrationTime": endFiltrationTime,
"keyword": keyword
}
state_save = save_json(os.path.join(file_dir, "task.json"), task)
if state_save:
log.debug('save file success')
else:
log.debug('save file failed')
script_close(browser)
else:
# 爬取数据为空
log.info("未爬取到数据")
# 删除目录
delete_directory(local_path)
script_close(browser)
def script_close(browser):
# 关闭浏览器驱动
try:
browser.close()
browser.quit()
except:
log.debug("浏览器驱动关闭失败")
try:
# 一些代码...
sys.exit()
except SystemExit:
raise # 重新抛出SystemExit异常,让脚本退出
except Exception as e:
# 异常处理代码...
print("sys.exit() 执行失败")
def main():
"""
"""
# 请求关键词
response = getReptileTask()
global status_task
global beginFiltrationTime
global endFiltrationTime
global keyword
if response['status_code'] == 200 and response['data']['code'] == 200:
log.debug("call success")
search_word = ""
for item in response['data']['rows']:
if item['name'] == 'libertyTimeNet-自由时报':
search_word = item['keyword']
table_name = item['tableName']
status_task = int(item["status"])
keyword = str(item["keyword"])
beginFiltrationTime = int(item["beginFiltrationTime"])
endFiltrationTime = int(item["endFiltrationTime"])
# 简体转繁体
if status_task == 0 and len(search_word) > 0:
reptile(None, convert_to_traditional(search_word))
else:
log.debug("爬取任务未启用")
else:
log.debug("call failed")
# 请求超时
reptile(None, convert_to_traditional("新闻"))
# upload_control()
# 全局变量
data = []
# 任务详情
task = {}
table_name = "pms_libertyTimeNet"
# 全局字段
keyword = ""
# 过滤时间开始
beginFiltrationTime = int(123)
# 过滤时间结束
endFiltrationTime = int(123)
# 文件根目录
file_dir = f'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}'
# 任务目录名称
local_path_name = str(int(time.time()))
# 任务目录路径
local_path = f'{os.path.join(file_dir, local_path_name)}'
# 任务目录是否创建
local_path_status = create_directory_if_not_exists(local_path)
# 是否启用
status_task = 0
# 调用main函数
main()
import os
import json
# 主目录路径
main_directory = f'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data")}'
# 初始化总长度
total_length = 0
# 遍历主目录下的子目录
for root, dirs, files in os.walk(main_directory):
for dir_name in dirs:
dir_path = os.path.join(root, dir_name)
for roots, path, file in os.walk(dir_path):
for dis in path:
json_file_path = os.path.join(os.path.join(roots,dis), 'data.json')
# 检查data.json文件是否存在
if os.path.exists(json_file_path):
with open(json_file_path, 'r', encoding='utf-8') as json_file:
try:
# 读取JSON数组
json_data = json.load(json_file)
# 统计JSON数组的长度,并累加到总长度
array_length = len(json_data)
total_length += array_length
# print(f"Found {array_length} entries in {json_file_path}")
except json.JSONDecodeError:
# print(f"Error parsing JSON in {json_file_path}")
error = 0
else:
# print(f"Could not find data.json in {dir_path}")
error = 0
print(f"总数据量: {total_length}")
......@@ -107,6 +107,37 @@ def parse_time_string(time_str):
return None
def parse_ltn_time_string(time_str):
"""
转换 自由时报 的时间
:param time_str:
:return:
"""
# log.debug(f'转换face4book的发布时间:{time_str}')
if "天" in time_str:
number = int(time_str.split("天")[0])
time_delta = datetime.timedelta(days=number)
return int((datetime.datetime.now() + time_delta).timestamp())
elif "分鐘" in time_str:
number = int(time_str.split("分鐘")[0])
time_delta = datetime.timedelta(minutes=number)
return int((datetime.datetime.now() + time_delta).timestamp())
elif "小時" in time_str:
number = int(time_str.split("小時")[0])
time_delta = datetime.timedelta(hours=number)
return int((datetime.datetime.now() + time_delta).timestamp())
else:
try:
datetime_str = time_str.replace("月", " ").replace("日", "")
month, day = map(int, datetime_str.split())
current_year = datetime.datetime.now().year
datetime_obj = datetime.datetime(year=current_year, month=month, day=day)
return int(datetime_obj.timestamp())
except ValueError:
return None
# 转换 youtube 的时间
def convert_string_to_time(string):
"""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment