pc_twitter.py 8.3 KB
Newer Older
1
import json
liyang's avatar
liyang committed
2 3
import time
from bs4 import BeautifulSoup
4
from utils.Logger import log
liyang's avatar
liyang committed
5
from utils.createBrowserDriver import create
6 7
from utils.filse import save_json
from api.index import importJson, getReptileTask, importJsonPath
liyang's avatar
liyang committed
8
from utils.index import convert_to_traditional, yt_dlp_download, convert_string_to_time, parse_twitter_time_string
9 10
# from pytube import YouTube
import os
11
import sys
12
from datetime import datetime
liyang's avatar
liyang committed
13
from utils.download_image import download_image
14
from config.settings import get_base_file_url
liyang's avatar
liyang committed
15
from config.settings import get_account
liyang's avatar
liyang committed
16

17 18 19 20 21 22
# ---------------   selenium 依赖 start ----------------
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

# ---------------   selenium 依赖 end ----------------
liyang's avatar
liyang committed
23 24

import platform
liyang's avatar
liyang committed
25 26 27 28 29
'''
打开指定网页,并使用 Selenium 模拟点击 "GDPR-accept" 按钮,然后循环点击 "search-show-more-button" 按钮来加载更多数据,直到按钮不再可点击为止。最后,获取完整的分页数据并关闭浏览器驱动。
'''


30
def reptile(browser=None, search_word=""):
liyang's avatar
liyang committed
31 32 33 34 35 36
    """

    :param browser:
    :param search_word:
    """
    print(f"搜索词:{search_word}")
37
    base_url = "https://twitter.com/"
liyang's avatar
liyang committed
38
    if platform.system() == "Windows":
liyang's avatar
liyang committed
39
        browser = browser or create(no_headless=True, using_user_data=True)
liyang's avatar
liyang committed
40 41
    else:
        browser = browser or create(no_headless=True, using_user_data=True)
42
    # print(browser)
liyang's avatar
liyang committed
43
    # 打开网页
44
    browser.get(base_url)
45
    time.sleep(2)
liyang's avatar
liyang committed
46
    try:
47 48
        # wait = WebDriverWait(browser, 20)
        # wait.until(EC.presence_of_element_located((By.XPATH, "//input[@autocomplete='username']")))
liyang's avatar
liyang committed
49
        # 检测是否要登录
50
        login_input = browser.find_element('xpath', "//input[@autocomplete='username']")
liyang's avatar
liyang committed
51
        login_input.send_keys(get_account("twitter")["name"])
liyang's avatar
liyang committed
52 53 54
        # 获取下一步按钮
        buttons = browser.find_element('xpath', "//div[@role='button'][2]")
        buttons.click()
55 56
        wait = WebDriverWait(browser, 10)
        wait.until(EC.presence_of_element_located((By.XPATH, "//input[@autocomplete='current-password']")))
57
        password_input = browser.find_element('xpath', "//input[@autocomplete='current-password']")
liyang's avatar
liyang committed
58
        password_input.send_keys(get_account("twitter")["password"])
liyang's avatar
liyang committed
59
        # # 获取登录按钮
60
        button_login = browser.find_element('xpath', "//div[@data-testid='LoginForm_Login_Button']")
liyang's avatar
liyang committed
61
        button_login.click()
62
        time.sleep(1)
liyang's avatar
liyang committed
63 64
    except:
        print("------")
liyang's avatar
liyang committed
65
    url = 'https://twitter.com/search?q=' + search_word + '&src=typed_query'
66
    browser.get(url)
67 68 69
    wait = WebDriverWait(browser, 10)
    wait.until(
        EC.presence_of_element_located((By.XPATH, "//div[@data-testid='cellInnerDiv']//article/div/div/div[2]/div[2]")))
70
    base_xpath = "//div[@data-testid='cellInnerDiv']//article/div/div/div[2]/div[2]"
71
    # 内容块
liyang's avatar
liyang committed
72
    element_content_list = browser.find_elements('xpath', base_xpath)
liyang's avatar
liyang committed
73
    # 作者
liyang's avatar
liyang committed
74 75
    element_authors_list = browser.find_elements('xpath',
                                                 f"{base_xpath}//div[@data-testid='User-Name']/div[1]//a[@role='link']")
liyang's avatar
liyang committed
76 77
    length = len(element_authors_list)
    for index in range(length):
78
        # print(index)
liyang's avatar
liyang committed
79
        soup = BeautifulSoup(element_content_list[index].get_attribute("outerHTML"), "html.parser")
80
        # 查找time标签
liyang's avatar
liyang committed
81 82 83 84 85 86 87 88
        try:
            time_soup = soup.find('time')
            timestamp = datetime.fromisoformat(time_soup['datetime'].replace("Z", "+00:00")).timestamp()
            link_soup = time_soup.parent
            link_str = base_url + link_soup["href"]
        except:
            link_str = ""
            timestamp = time.time()
liyang's avatar
liyang committed
89
        author = element_authors_list[index].text
90 91
        # 标题取:作者+日期
        title = f"{author}-{datetime.fromtimestamp(int(timestamp))}"
liyang's avatar
liyang committed
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130

        video_list = soup.find_all("video")
        image_list = soup.find_all("img")
        # lth = len(ignore_list)
        if len(video_list) > 0:
            # for key,element in enumerate(video_list):
            # 删除第二个子元素
            # 找到包含两个 <div> 元素的父级元素
            parent_div = soup.find('div')
            # 找到所有的 <div> 子元素
            div_elements = parent_div.find_all('div', recursive=False)
            # div_tags = soup.find_all("div", recursive=False)
            # 确保列表中至少有两个 <div> 子元素
            if len(div_elements) >= 2:
                # 获取第二个 <div> 元素,并将其从父级元素中移除
                div_to_remove = div_elements[1]
                div_to_remove.extract()
                # 删除
                # div.decompose()
                # 创建video标签占位
                custom_video = soup.new_tag("video")
                custom_video["src"] = ""
                parent_div.append(custom_video)
        else:
            print("")

        picture_url = []
        if len(image_list) > 0:
            for key, element in enumerate(image_list):
                # 下载图片至本地,替换标签中的src
                id = str(int(time.time()))
                # 下载地址
                download_dir = f'{os.path.join(file_dir, f"{id}.jpg")}'
                # 访问地址
                access_address = f'{get_base_file_url()}{table_name.split("_")[1]}/{id}.jpg'
                # 下载状态
                status = download_image(element['src'], download_dir)
                if status:
                    element['src'] = access_address
131
                    picture_url.append(download_dir)
liyang's avatar
liyang committed
132 133 134 135
        else:
            print("")
        content = soup.prettify()

136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
        # ---------------- 判断类型 start ----------
        # 类型
        content_type = ""
        try:
            # 查找所有img标签
            img_tags = soup.find_all('img')
            if len(img_tags) > 0:
                content_type = "图文"
            else:
                content_type = "文字"
        except:
            content_type = "文字"
        # ---------------- 判断类型 end ----------
        # --------------- 组装数据 start---------------------
        obj = {
151
            "title": title,
152
            "content": content,
153
            "link": link_str,
154 155 156
            "reptileTime": str(int(time.time())),
            "type": content_type,
            "author": author,
liyang's avatar
liyang committed
157 158
            "releaseTime": str(int(timestamp)),
            "picture_url": ",".join(picture_url)
159 160
        }
        # --------------- 组装数据 end---------------------
161
        data.append(obj)
162 163 164 165 166 167

    # 发送爬取数据到java服务
    # print('----------------------')
    # print(data)
    if len(data) > 0:
        # 保存json文件到本地
168 169
        # log.debug(os.path.abspath("../"))
        state_save = save_json(os.path.join(file_dir, str(int(time.time())) + ".json"), data)
170 171 172 173
        if state_save:
            log.debug('save file success')
        else:
            log.debug('save file failed')
liyang's avatar
liyang committed
174
        script_close(browser)
175 176 177
    else:
        # 爬取数据为空
        log.info("未爬取到数据")
liyang's avatar
liyang committed
178 179
        script_close(browser)

180

liyang's avatar
liyang committed
181
def script_close(browser):
182
    # 关闭浏览器驱动
183 184 185 186 187
    try:
        browser.close()
        browser.quit()
    except:
        log.debug("浏览器驱动关闭失败")
liyang's avatar
liyang committed
188
    sys.exit()
189

liyang's avatar
liyang committed
190

191 192 193 194 195 196
def main():
    """

    """
    # 请求关键词
    response = getReptileTask()
197
    global status_task
198 199 200 201 202
    # print(response)
    if response['status_code'] == 200 and response['data']['code'] == 200:
        log.debug("call success")
        search_word = ""
        for item in response['data']['rows']:
203
            if item['name'] == 'twitter':
204 205
                search_word = item['keyword']
                table_name = item['tableName']
liyang's avatar
liyang committed
206
                status_task = int(item["status"])
207
        # 简体转繁体
liyang's avatar
liyang committed
208
        if status_task == 0 and len(search_word) > 0:
209 210 211
            reptile(None, convert_to_traditional(search_word))
        else:
            log.debug("爬取任务未启用")
212 213
    else:
        log.debug("call failed")
214 215
        # 请求超时
        reptile(None, convert_to_traditional("新闻"))
216 217 218 219 220 221
        # upload_control()


# 全局变量
data = []
table_name = "pms_twitter"
liyang's avatar
liyang committed
222
file_dir = f'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}'
223 224
# 是否启用
status_task = '0'
225
# 调用main函数
226
main()