pc_dcard.py 9.87 KB
import json
import time
from bs4 import BeautifulSoup
from utils.Logger import log
from utils.createBrowserDriver import create
from utils.filse import save_json
from api.index import importJson, getReptileTask, importJsonPath
from utils.index import convert_to_traditional, yt_dlp_download, convert_string_to_time, parse_twitter_time_string, \
    is_base64_image, save_base64_image, get_screen_resolution,create_directory_if_not_exists, delete_directory
# from pytube import YouTube
from selenium.common.exceptions import NoSuchElementException
import os
import sys
from datetime import datetime
from utils.download_image import download_image
from config.settings import get_base_file_url

# ---------------   selenium 依赖 start ----------------
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

# ---------------   selenium 依赖 end ----------------
# 工具函数-下载图片
'''
打开指定网页,并使用 Selenium 模拟点击 "GDPR-accept" 按钮,然后循环点击 "search-show-more-button" 按钮来加载更多数据,直到按钮不再可点击为止。最后,获取完整的分页数据并关闭浏览器驱动。
'''


def reptile(browser=None, search_word=""):
    """

    :param browser:
    :param search_word:
    """
    print(f"搜索词:{search_word}")
    base_url = "https://www.dcard.tw"
    browser = browser or create(no_headless=False, using_user_data=True)
    # 打开网页
    browser.get(f"{base_url}/search?query={search_word}")
    time.sleep(6)
    base_xpath = "//div[@role='main']//div[@data-key]//article"
    # 内容块
    element_content_list = browser.find_elements('xpath', base_xpath)
    # 作者
    element_authors_list = browser.find_elements('xpath', f"{base_xpath}/div[1]/div[1]/div[2]/div/div[1]")
    # 时间
    element_time_list = browser.find_elements('xpath', f"{base_xpath}/div[1]/div[1]/div[2]/div/div[2]/time")
    # 标题
    element_title_list = browser.find_elements('xpath', f"{base_xpath}//h2")
    # 点赞
    element_like_list = browser.find_elements('xpath', f"{base_xpath}/div[3]/div[1]/div/div[2]")
    # 评论
    element_comment_list = browser.find_elements('xpath', f"{base_xpath}/div[3]/div[2]/div/span")

    for index,item in enumerate(element_content_list):
        # 提取时间,并转为时间戳
        timestamp = datetime.fromisoformat(element_time_list[index].get_attribute("datetime")[:-1]).timestamp()

        # 过滤时间
        # # 如果'releaseTime'不是整数,则将其转换为整数
        new_releaseTime = int(timestamp)
        if new_releaseTime < filter_time_start or new_releaseTime > filter_time_end:
            # 如果'new_releaseTime'不在指定范围内,则跳过当前迭代,继续下一个项目
            continue

        # 提取作者
        author = element_authors_list[index].text
        # 提取标题
        title = element_title_list[index].text
        # 提取点赞
        like = element_like_list[index].text
        # 提取评论
        comment = element_comment_list[index].text

        # -------------提取内容---------------
        element_content_list[index].click()
        # 等待弹窗内容出现,设置最长等待时间为10秒
        wait = WebDriverWait(browser, 10)
        # 通过 expected_conditions 来定义等待条件,这里以弹窗内容的某个元素为例
        wait.until(EC.presence_of_element_located((By.XPATH, "//div[@data-testid='overlay']")))
        time.sleep(3)
        click_dom = browser.find_element("xpath",
                                         "//div[@data-testid='overlay']")
        # 处理弹窗内容加载失败的情况
        try:
            browser.find_element("xpath", "//div[@data-testid='overlay']//h2[text()='發生錯誤']")
            error_status = True
        except NoSuchElementException:
            error_status = False

        if error_status:
            # click_dom.click()
            browser.back()
            time.sleep(0.5)
            # continue

        close_button = browser.find_element("xpath",
                                            "//div[@data-testid='overlay']/div[2]/div[1]/div/div//button[@aria-label='關閉']")
        content_element = browser.find_element("xpath", "//div[@data-testid='overlay']//article//div[3]")
        soup = BeautifulSoup(content_element.get_attribute("outerHTML"), "html.parser")
        # 提取链接
        link_str = browser.current_url
        # ------------- 处理内容中的视频 start ------------------
        video_list = soup.find_all("video")
        for key, item in enumerate(video_list):
            item['src'] = ""
        # ------------- 处理内容中的视频 end ------------------

        # ------------- 处理内容中的图片 start ------------------
        picture_url = []
        image_list = soup.find_all("img")
        if len(image_list) > 0:
            for key, element in enumerate(image_list):
                # 下载图片至本地,替换标签中的src
                id = str(int(time.time()))
                # 下载地址
                download_dir = f'{os.path.join(local_path, f"{id}.jpg")}'
                # 访问地址
                access_address = f'{get_base_file_url()}{table_name.split("_")[1]}/{local_path_name}/{id}.jpg'
                if is_base64_image(element['src']) == False:
                    log.debug("图片属于 url")
                    # 下载状态
                    status = download_image(element['src'], download_dir)
                    if status:
                        element['src'] = access_address
                        picture_url.append(access_address)
                else:
                    log.debug("图片属于 base64")
                    # 下载base 64
                    # 下载状态
                    status = save_base64_image(element['src'], download_dir)
                    if status:
                        element['src'] = access_address
                        picture_url.append(access_address)

        else:
            print("")
        # ------------- 处理内容中的图片 end ------------------

        content = soup.prettify()

        # ---------------- 判断类型 start ----------
        # 类型
        content_type = ""
        try:
            # 查找所有img标签
            img_tags = soup.find_all('img')
            if len(img_tags) > 0:
                content_type = "图文"
            else:
                content_type = "文字"
        except:
            content_type = "文字"
        # ---------------- 判断类型 end ----------
        # --------------- 组装数据 start---------------------
        obj = {
            "title": title,
            "content": content,
            "link": link_str,
            "reptileTime": str(int(time.time())),
            "type": content_type,
            "author": author,
            "releaseTime": str(int(timestamp)),
            "picture_url": ",".join(picture_url)
        }
        # --------------- 组装数据 end---------------------
        data.append(obj)
        close_button.click()

    # 发送爬取数据到java服务
    # print('----------------------')
    # print(data)
    if len(data) > 0:
        # 保存json文件到本地
        json_path = os.path.join(local_path, "data.json")
        state_save = save_json(json_path, data)
        # 保存task
        task = {
            # 爬取时间
            "reptileTime": data[0]["reptileTime"],
            # 本地路径
            "localPath": local_path
        }
        state_save = save_json(os.path.join(file_dir, "task.json"), task)
        if state_save:
            log.debug('save file success')
        else:
            log.debug('save file failed')
        script_close(browser)
    else:
        # 爬取数据为空
        log.info("未爬取到数据")
        # 删除目录
        delete_directory(local_path)
        script_close(browser)


def script_close(browser):
    # 关闭浏览器驱动
    try:
        browser.close()
        browser.quit()
    except:
        log.debug("浏览器驱动关闭失败")
    try:
        # 一些代码...
        sys.exit()
    except SystemExit:
        raise  # 重新抛出SystemExit异常,让脚本退出
    except Exception as e:
        # 异常处理代码...
        print("sys.exit() 执行失败")



def main():
    """

    """
    # 请求关键词
    response = getReptileTask()
    global status_task
    global filter_time_start
    global filter_time_end
    if response['status_code'] == 200 and response['data']['code'] == 200:
        log.debug("call success")
        search_word = ""
        for item in response['data']['rows']:
            if item['name'] == 'dcard':
                search_word = item['keyword']
                table_name = item['tableName']
                status_task = int(item["status"])
                filter_time_start = int(item["beginFiltrationTime"])
                filter_time_end = int(item["endFiltrationTime"])
        # 简体转繁体
        if status_task == 0 and len(search_word) > 0:
            reptile(None, convert_to_traditional(search_word))
        else:
            log.debug("爬取任务未启用")
    else:
        log.debug("call failed")
        # 请求超时
        reptile(None, convert_to_traditional("新闻"))
        # upload_control()


# 全局变量
data = []
# 任务详情
task = {}
table_name = "pms_dcard"
# 过滤时间开始
filter_time_start = int(123)
# 过滤时间结束
filter_time_end = int(123)
# 文件根目录
file_dir = f'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}'
# 任务目录名称
local_path_name = str(int(time.time()))
# 任务目录路径
local_path = f'{os.path.join(file_dir, local_path_name)}'
# 任务目录是否创建
local_path_status = create_directory_if_not_exists(local_path)
# 是否启用
status_task = 0
# 调用main函数
main()