pc_youtube.py 7.04 KB
import json
import platform
import time
from bs4 import BeautifulSoup
from utils.Logger import log
from utils.createBrowserDriver import create
from utils.filse import save_json
from api.index import importJson, getReptileTask, importJsonPath
from utils.index import convert_to_traditional, yt_dlp_download, convert_string_to_time, create_directory_if_not_exists, \
    delete_directory
from pytube import YouTube
from datetime import datetime
import os
from config.settings import get_base_file_url
from selenium.webdriver.common.action_chains import ActionChains
import sys
# ---------------   selenium 依赖 start ----------------
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC


# ---------------   selenium 依赖 end ----------------

def reptile(browser=None, search_word=""):
    """

    :param browser:
    :param search_word:
    :return:
    """
    browser = browser or create(no_headless=False, using_user_data=False)
    # print(browser)
    # 打开网页
    print(f"搜索词:{search_word}")
    url = f'https://www.youtube.com/results?search_query={search_word}'
    browser.get(url)
    # print(browser.page_source)
    time.sleep(3)
    log.debug("youtube login complete")
    video_list = browser.find_elements('xpath', "//div[@id='contents']//ytd-video-renderer")
    for index, item in enumerate(video_list):
        # 查找标题
        author_element = item.find_element("xpath", "./div[1]/div/div[2]//ytd-channel-name//yt-formatted-string/a")
        # print(author_element.get_attribute("outerHTML"))

        title_element = item.find_element("xpath", ".//div[@id='title-wrapper']//a")
        # print(title_element.get_attribute("outerHTML"))

        time_element = item.find_element("xpath", "//ytd-video-meta-block//div[@id='metadata-line']/span[2]")
        # print(time_element.get_attribute("outerHTML"))

        title = title_element.get_attribute('title')
        link = title_element.get_attribute('href')
        id = link.split("?")[1].split("&")[0].replace("v=", "")
        url = f'https://www.youtube.com/watch?v={id}'

        # 时长按照秒计算
        video_duration = int(YouTube(url).length) // 60

        # 暂时先取6条数据
        if video_duration < 60:
            # print(str(id))
            # print("视频连接:" + str(link))
            # print("视频时长:" + str(video_duration))
            base_urr = get_base_file_url()
            releaseTime = ""
            try:
                releaseTime = str(int(convert_string_to_time(time_element.text)))
            except:
                releaseTime = str(int(time.time()))

            # 过滤时间
            # # 如果'releaseTime'不是整数,则将其转换为整数
            new_releaseTime = int(releaseTime)
            if beginFiltrationTime <= new_releaseTime <= endFiltrationTime:
                # 如果'new_releaseTime'不在指定范围内,则跳过当前迭代,继续下一个项目
                continue

            video_url = []
            # 下载地址
            download_dir = f'{os.path.join(local_path, f"{id}.mp4")}'
            # 访问地址
            access_address = f'{get_base_file_url()}{table_name.split("_")[1]}/{local_path_name}/{id}.mp4'
            # 下载视频
            state_download = yt_dlp_download(url, local_path)
            video_url.append(download_dir)
            # print(str(state_download))
            if state_download:
                # 组装数据
                obj = {
                    "title": title,
                    "content": f"<video controls style='width:100%' src='{access_address}'></video>",
                    "videoUrl": ",".join(video_url),
                    "link": link,
                    "reptileTime": str(int(time.time())),
                    "type": '视频',
                    "author": author_element.text,
                    "releaseTime": releaseTime
                }
                data.append(obj)
            else:
                # print("")
                error = ""
    if len(data) > 0:
        # 保存json文件到本地
        json_path = os.path.join(local_path, "data.json")
        state_save = save_json(json_path, data)
        # 保存task
        task = {
            # 爬取时间
            "reptileTime": data[0]["reptileTime"],
            # 本地路径
            "localPath": local_path,
            "beginFiltrationTime": beginFiltrationTime,
            "endFiltrationTime": endFiltrationTime,
            "keyword": keyword,
            "total": len(data)
        }
        state_save = save_json(os.path.join(file_dir, "task.json"), task)
        if state_save:
            log.debug('save file success')
        else:
            log.debug('save file failed')
        script_close(browser)
    else:
        # 爬取数据为空
        log.info("未爬取到数据")
        # 删除目录
        delete_directory(local_path)
        script_close(browser)


def script_close(browser):
    # 关闭浏览器驱动
    try:
        browser.close()
        browser.quit()
    except:
        log.debug("浏览器驱动关闭失败")
    try:
        # 一些代码...
        sys.exit()
    except SystemExit:
        raise  # 重新抛出SystemExit异常,让脚本退出
    except Exception as e:
        # 异常处理代码...
        print("sys.exit() 执行失败")


def main():
    """

    """
    # 请求关键词
    response = getReptileTask()
    global status_task
    global beginFiltrationTime
    global endFiltrationTime
    global keyword
    # print(response)
    if response['status_code'] == 200 and response['data']['code'] == 200:
        log.debug("call success")
        search_word = ""
        for item in response['data']['rows']:
            if item['name'] == 'youtube':
                search_word = item['keyword']
                table_name = item['tableName']
                status_task = int(item["status"])
                keyword = str(item["keyword"])
                beginFiltrationTime = int(item["beginFiltrationTime"])
                endFiltrationTime = int(item["endFiltrationTime"])
        # 简体转繁体
        if status_task == 0 and len(search_word) > 0:
            reptile(None, convert_to_traditional(search_word))
        else:
            log.debug("爬取任务未启用")
    else:
        log.debug("call failed")
        # 请求超时
        reptile(None, convert_to_traditional("新闻"))
        # upload_control()


# 全局变量
data = []
# 任务详情
task = {}
table_name = "pms_youtube"

# 全局字段
keyword = ""
# 过滤时间开始
beginFiltrationTime = int(123)
# 过滤时间结束
endFiltrationTime = int(123)

file_dir = f'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}'
# 任务目录名称
local_path_name = str(int(time.time()))
# 任务目录路径
local_path = f'{os.path.join(file_dir, local_path_name)}'
# 任务目录是否创建
local_path_status = create_directory_if_not_exists(local_path)
# 是否启用
status_task = 0
# 调用main函数
main()