import json import platform import time from bs4 import BeautifulSoup from utils.Logger import log from utils.createBrowserDriver import create from utils.filse import save_json from api.index import importJson, getReptileTask, importJsonPath from utils.index import convert_to_traditional, yt_dlp_download, convert_string_to_time, create_directory_if_not_exists, \ delete_directory from pytube import YouTube from datetime import datetime import os from config.settings import get_base_file_url from selenium.webdriver.common.action_chains import ActionChains import sys # --------------- selenium 依赖 start ---------------- from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC # --------------- selenium 依赖 end ---------------- def reptile(browser=None, search_word=""): """ :param browser: :param search_word: :return: """ browser = browser or create(no_headless=False, using_user_data=False) # print(browser) # 打开网页 print(f"搜索词:{search_word}") url = f'https://www.youtube.com/results?search_query={search_word}' browser.get(url) # print(browser.page_source) time.sleep(3) log.debug("youtube login complete") video_list = browser.find_elements('xpath', "//div[@id='contents']//ytd-video-renderer") for index, item in enumerate(video_list): # 查找标题 author_element = item.find_element("xpath", "./div[1]/div/div[2]//ytd-channel-name//yt-formatted-string/a") # print(author_element.get_attribute("outerHTML")) title_element = item.find_element("xpath", ".//div[@id='title-wrapper']//a") # print(title_element.get_attribute("outerHTML")) time_element = item.find_element("xpath", "//ytd-video-meta-block//div[@id='metadata-line']/span[2]") # print(time_element.get_attribute("outerHTML")) title = title_element.get_attribute('title') link = title_element.get_attribute('href') id = link.split("?")[1].split("&")[0].replace("v=", "") url = f'https://www.youtube.com/watch?v={id}' # 时长按照秒计算 video_duration = int(YouTube(url).length) // 60 # 暂时先取6条数据 if video_duration < 60: # print(str(id)) # print("视频连接:" + str(link)) # print("视频时长:" + str(video_duration)) base_urr = get_base_file_url() releaseTime = "" try: releaseTime = str(int(convert_string_to_time(time_element.text))) except: releaseTime = str(int(time.time())) # 过滤时间 # # 如果'releaseTime'不是整数,则将其转换为整数 new_releaseTime = int(releaseTime) if beginFiltrationTime <= new_releaseTime <= endFiltrationTime: # 如果'new_releaseTime'不在指定范围内,则跳过当前迭代,继续下一个项目 continue video_url = [] # 下载地址 download_dir = f'{os.path.join(local_path, f"{id}.mp4")}' # 访问地址 access_address = f'{get_base_file_url()}{table_name.split("_")[1]}/{local_path_name}/{id}.mp4' # 下载视频 state_download = yt_dlp_download(url, local_path) video_url.append(download_dir) # print(str(state_download)) if state_download: # 组装数据 obj = { "title": title, "content": f"", "videoUrl": ",".join(video_url), "link": link, "reptileTime": str(int(time.time())), "type": '视频', "author": author_element.text, "releaseTime": releaseTime } data.append(obj) else: # print("") error = "" if len(data) > 0: # 保存json文件到本地 json_path = os.path.join(local_path, "data.json") state_save = save_json(json_path, data) # 保存task task = { # 爬取时间 "reptileTime": data[0]["reptileTime"], # 本地路径 "localPath": local_path, "beginFiltrationTime": beginFiltrationTime, "endFiltrationTime": endFiltrationTime, "keyword": keyword, "total": len(data) } state_save = save_json(os.path.join(file_dir, "task.json"), task) if state_save: log.debug('save file success') else: log.debug('save file failed') script_close(browser) else: # 爬取数据为空 log.info("未爬取到数据") # 删除目录 delete_directory(local_path) script_close(browser) def script_close(browser): # 关闭浏览器驱动 try: browser.close() browser.quit() except: log.debug("浏览器驱动关闭失败") try: # 一些代码... sys.exit() except SystemExit: raise # 重新抛出SystemExit异常,让脚本退出 except Exception as e: # 异常处理代码... print("sys.exit() 执行失败") def main(): """ """ # 请求关键词 response = getReptileTask() global status_task global beginFiltrationTime global endFiltrationTime global keyword # print(response) if response['status_code'] == 200 and response['data']['code'] == 200: log.debug("call success") search_word = "" for item in response['data']['rows']: if item['name'] == 'youtube': search_word = item['keyword'] table_name = item['tableName'] status_task = int(item["status"]) keyword = str(item["keyword"]) beginFiltrationTime = int(item["beginFiltrationTime"]) endFiltrationTime = int(item["endFiltrationTime"]) # 简体转繁体 if status_task == 0 and len(search_word) > 0: reptile(None, convert_to_traditional(search_word)) else: log.debug("爬取任务未启用") else: log.debug("call failed") # 请求超时 reptile(None, convert_to_traditional("新闻")) # upload_control() # 全局变量 data = [] # 任务详情 task = {} table_name = "pms_youtube" # 全局字段 keyword = "" # 过滤时间开始 beginFiltrationTime = int(123) # 过滤时间结束 endFiltrationTime = int(123) file_dir = f'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}' # 任务目录名称 local_path_name = str(int(time.time())) # 任务目录路径 local_path = f'{os.path.join(file_dir, local_path_name)}' # 任务目录是否创建 local_path_status = create_directory_if_not_exists(local_path) # 是否启用 status_task = 0 # 调用main函数 main()