pc_twitter.py 11 KB
Newer Older
1
import json
liyang's avatar
liyang committed
2 3
import time
from bs4 import BeautifulSoup
4
from utils.Logger import log
liyang's avatar
liyang committed
5
from utils.createBrowserDriver import create
6 7
from utils.filse import save_json
from api.index import importJson, getReptileTask, importJsonPath
liyang's avatar
liyang committed
8 9
from utils.index import convert_to_traditional, yt_dlp_download, convert_string_to_time, parse_twitter_time_string, \
    extract_image_format, create_directory_if_not_exists, delete_directory
10 11
# from pytube import YouTube
import os
12
import sys
13
from datetime import datetime
liyang's avatar
liyang committed
14
from utils.download_image import download_image
15
from config.settings import get_base_file_url
liyang's avatar
liyang committed
16
from config.settings import get_account
liyang's avatar
liyang committed
17

18 19 20 21 22 23
# ---------------   selenium 依赖 start ----------------
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

# ---------------   selenium 依赖 end ----------------
liyang's avatar
liyang committed
24 25

import platform
liyang's avatar
liyang committed
26

liyang's avatar
liyang committed
27 28 29 30 31
'''
打开指定网页,并使用 Selenium 模拟点击 "GDPR-accept" 按钮,然后循环点击 "search-show-more-button" 按钮来加载更多数据,直到按钮不再可点击为止。最后,获取完整的分页数据并关闭浏览器驱动。
'''


32
def reptile(browser=None, search_word=""):
liyang's avatar
liyang committed
33 34 35 36 37 38
    """

    :param browser:
    :param search_word:
    """
    print(f"搜索词:{search_word}")
39
    base_url = "https://twitter.com/"
liyang's avatar
liyang committed
40
    browser = browser or create(no_headless=False, using_user_data=True)
liyang's avatar
liyang committed
41
    # 打开网页
42
    browser.get(base_url)
43
    time.sleep(2)
liyang's avatar
liyang committed
44
    try:
liyang's avatar
liyang committed
45 46 47 48 49 50 51
        try:
            login_button = browser.find_element('xpath', "//a[@href='/login']")
            login_button.click()
            time.sleep(2)
        except:
            error = ""

52 53
        # wait = WebDriverWait(browser, 20)
        # wait.until(EC.presence_of_element_located((By.XPATH, "//input[@autocomplete='username']")))
liyang's avatar
liyang committed
54
        # 检测是否要登录
55
        login_input = browser.find_element('xpath', "//input[@autocomplete='username']")
liyang's avatar
liyang committed
56
        login_input.send_keys(get_account("twitter")["name"])
liyang's avatar
liyang committed
57 58 59
        # 获取下一步按钮
        buttons = browser.find_element('xpath', "//div[@role='button'][2]")
        buttons.click()
60 61
        wait = WebDriverWait(browser, 10)
        wait.until(EC.presence_of_element_located((By.XPATH, "//input[@autocomplete='current-password']")))
62
        password_input = browser.find_element('xpath', "//input[@autocomplete='current-password']")
liyang's avatar
liyang committed
63
        password_input.send_keys(get_account("twitter")["password"])
liyang's avatar
liyang committed
64
        # # 获取登录按钮
65
        button_login = browser.find_element('xpath', "//div[@data-testid='LoginForm_Login_Button']")
liyang's avatar
liyang committed
66
        button_login.click()
liyang's avatar
liyang committed
67
        time.sleep(2)
liyang's avatar
liyang committed
68
    except:
liyang's avatar
liyang committed
69 70
        error = ""

liyang's avatar
liyang committed
71
    url = 'https://twitter.com/search?q=' + search_word + '&src=typed_query'
72
    browser.get(url)
liyang's avatar
liyang committed
73 74 75 76
    time.sleep(2)
    # 浏览器滚动到底部
    browser.execute_script("window.scrollTo(0, document.body.scrollHeight);")
    time.sleep(2)
77 78 79
    wait = WebDriverWait(browser, 10)
    wait.until(
        EC.presence_of_element_located((By.XPATH, "//div[@data-testid='cellInnerDiv']//article/div/div/div[2]/div[2]")))
80
    base_xpath = "//div[@data-testid='cellInnerDiv']//article/div/div/div[2]/div[2]"
81
    # 内容块
liyang's avatar
liyang committed
82
    element_content_list = browser.find_elements('xpath', base_xpath)
liyang's avatar
liyang committed
83 84 85 86 87 88 89

    # 小于10条,加载下一页
    if len(element_content_list) < 10:
        browser.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(2)
        element_content_list = browser.find_elements('xpath', base_xpath)

liyang's avatar
liyang committed
90
    # 作者
liyang's avatar
liyang committed
91 92
    element_authors_list = browser.find_elements('xpath',
                                                 f"{base_xpath}//div[@data-testid='User-Name']/div[1]//a[@role='link']")
liyang's avatar
liyang committed
93 94
    length = len(element_authors_list)
    for index in range(length):
liyang's avatar
liyang committed
95
        soup = BeautifulSoup(element_content_list[index].get_attribute("outerHTML"), "html.parser")
96
        # 查找time标签
liyang's avatar
liyang committed
97 98 99 100 101 102 103 104
        try:
            time_soup = soup.find('time')
            timestamp = datetime.fromisoformat(time_soup['datetime'].replace("Z", "+00:00")).timestamp()
            link_soup = time_soup.parent
            link_str = base_url + link_soup["href"]
        except:
            link_str = ""
            timestamp = time.time()
liyang's avatar
liyang committed
105

liyang's avatar
liyang committed
106 107 108
        # 过滤时间
        # # 如果'releaseTime'不是整数,则将其转换为整数
        new_releaseTime = int(timestamp)
liyang's avatar
liyang committed
109
        if new_releaseTime < beginFiltrationTime or new_releaseTime > endFiltrationTime:
liyang's avatar
liyang committed
110 111 112
            # 如果'new_releaseTime'不在指定范围内,则跳过当前迭代,继续下一个项目
            continue

liyang's avatar
liyang committed
113
        author = element_authors_list[index].text
114 115
        # 标题取:作者+日期
        title = f"{author}-{datetime.fromtimestamp(int(timestamp))}"
liyang's avatar
liyang committed
116 117 118 119 120

        video_list = soup.find_all("video")
        # lth = len(ignore_list)
        if len(video_list) > 0:
            # for key,element in enumerate(video_list):
liyang's avatar
liyang committed
121
            div_elements = soup.find("div").findChildren("div", recursive=False)
liyang's avatar
liyang committed
122
            # div_tags = soup.find_all("div", recursive=False)
liyang's avatar
liyang committed
123
            for item in video_list:
liyang's avatar
liyang committed
124
                div = soup.new_tag('div')
liyang's avatar
liyang committed
125 126
                img_tag = soup.new_tag('img')
                img_tag["src"] = item["poster"]
liyang's avatar
liyang committed
127
                div.append(img_tag)
liyang's avatar
liyang committed
128
                for items in div_elements:
liyang's avatar
liyang committed
129 130 131 132 133 134 135
                    attr = False
                    try:
                        attr = items["aria-labelledby"]
                    except:
                        attr = False
                    if attr:
                        # div["aria-labelledby"] = "sdfsf"
liyang's avatar
liyang committed
136
                        # div[@aria-labelledby="xx"] 替换为img标签【内容含有视频的替换为img标签】
liyang's avatar
liyang committed
137 138
                        items.replaceWith(div)
                    else:
liyang's avatar
liyang committed
139
                        error = ""
liyang's avatar
liyang committed
140
        else:
liyang's avatar
liyang committed
141
            error = ""
liyang's avatar
liyang committed
142 143

        image_list = soup.find_all("img")
liyang's avatar
liyang committed
144 145 146
        picture_url = []
        if len(image_list) > 0:
            for key, element in enumerate(image_list):
liyang's avatar
liyang committed
147 148 149 150 151 152 153 154
                # 如果是svg,就删除
                if str(element['src']).find("svg") != -1:
                    element.extract()
                else:
                    # 下载图片至本地,替换标签中的src
                    id = str(int(time.time()))
                    image_type = extract_image_format(element['src'])
                    # 下载地址
155
                    download_dir = f'{os.path.join(local_path, f"{id}.{image_type}")}'
liyang's avatar
liyang committed
156
                    # 访问地址
157
                    access_address = f'{get_base_file_url()}{table_name.split("_")[1]}/{local_path_name}/{id}.{image_type}'
liyang's avatar
liyang committed
158 159 160 161 162
                    # 下载状态
                    status = download_image(element['src'], download_dir)
                    if status:
                        element['src'] = access_address
                        picture_url.append(download_dir)
liyang's avatar
liyang committed
163
        else:
liyang's avatar
liyang committed
164 165
            error = ""

liyang's avatar
liyang committed
166 167 168
        # 删除多余div
        # parent_div = soup.find("div")
        # 找到所有的 <div> 子元素
liyang's avatar
liyang committed
169
        div_elements = soup.find("div").findChildren("div", recursive=False)
liyang's avatar
liyang committed
170 171 172
        for key, item in enumerate(div_elements):
            if key == 0 or key == len(div_elements) - 1:
                item.extract()
liyang's avatar
liyang committed
173

liyang's avatar
liyang committed
174
        content = soup.prettify()
175 176 177 178 179 180 181 182 183 184 185 186 187
        # ---------------- 判断类型 start ----------
        # 类型
        content_type = ""
        try:
            # 查找所有img标签
            img_tags = soup.find_all('img')
            if len(img_tags) > 0:
                content_type = "图文"
            else:
                content_type = "文字"
        except:
            content_type = "文字"
        # ---------------- 判断类型 end ----------
liyang's avatar
liyang committed
188

189 190
        # --------------- 组装数据 start---------------------
        obj = {
191
            "title": title,
192
            "content": content,
193
            "link": link_str,
194 195 196
            "reptileTime": str(int(time.time())),
            "type": content_type,
            "author": author,
liyang's avatar
liyang committed
197 198
            "releaseTime": str(int(timestamp)),
            "picture_url": ",".join(picture_url)
199 200
        }
        # --------------- 组装数据 end---------------------
201
        data.append(obj)
liyang's avatar
liyang committed
202 203
        soup = ""
        time.sleep(0.1)
204 205 206

    if len(data) > 0:
        # 保存json文件到本地
207 208 209 210 211 212 213
        json_path = os.path.join(local_path, "data.json")
        state_save = save_json(json_path, data)
        # 保存task
        task = {
            # 爬取时间
            "reptileTime": data[0]["reptileTime"],
            # 本地路径
liyang's avatar
liyang committed
214 215 216 217
            "localPath": local_path,
            "beginFiltrationTime": beginFiltrationTime,
            "endFiltrationTime": endFiltrationTime,
            "keyword": keyword
218 219
        }
        state_save = save_json(os.path.join(file_dir, "task.json"), task)
220 221 222 223
        if state_save:
            log.debug('save file success')
        else:
            log.debug('save file failed')
liyang's avatar
liyang committed
224
        script_close(browser)
225 226 227
    else:
        # 爬取数据为空
        log.info("未爬取到数据")
228 229
        # 删除目录
        delete_directory(local_path)
liyang's avatar
liyang committed
230 231
        script_close(browser)

232

liyang's avatar
liyang committed
233
def script_close(browser):
234
    # 关闭浏览器驱动
235 236 237 238 239
    try:
        browser.close()
        browser.quit()
    except:
        log.debug("浏览器驱动关闭失败")
240 241 242 243 244 245 246 247 248
    try:
        # 一些代码...
        sys.exit()
    except SystemExit:
        raise  # 重新抛出SystemExit异常,让脚本退出
    except Exception as e:
        # 异常处理代码...
        print("sys.exit() 执行失败")

249 250 251 252 253 254 255

def main():
    """

    """
    # 请求关键词
    response = getReptileTask()
256
    global status_task
liyang's avatar
liyang committed
257 258 259
    global beginFiltrationTime
    global endFiltrationTime
    global keyword
260 261 262 263
    if response['status_code'] == 200 and response['data']['code'] == 200:
        log.debug("call success")
        search_word = ""
        for item in response['data']['rows']:
264
            if item['name'] == 'twitter':
265 266
                search_word = item['keyword']
                table_name = item['tableName']
liyang's avatar
liyang committed
267
                keyword = str(item["keyword"])
liyang's avatar
liyang committed
268
                status_task = int(item["status"])
liyang's avatar
liyang committed
269 270
                beginFiltrationTime = int(item["beginFiltrationTime"])
                endFiltrationTime = int(item["endFiltrationTime"])
271
        # 简体转繁体
liyang's avatar
liyang committed
272
        if status_task == 0 and len(search_word) > 0:
273 274 275
            reptile(None, convert_to_traditional(search_word))
        else:
            log.debug("爬取任务未启用")
276 277
    else:
        log.debug("call failed")
278 279
        # 请求超时
        reptile(None, convert_to_traditional("新闻"))
280 281 282 283 284
        # upload_control()


# 全局变量
data = []
285 286
# 任务详情
task = {}
287
table_name = "pms_twitter"
liyang's avatar
liyang committed
288 289 290

# 全局字段
keyword = ""
liyang's avatar
liyang committed
291
# 过滤时间开始
liyang's avatar
liyang committed
292
beginFiltrationTime = int(123)
liyang's avatar
liyang committed
293
# 过滤时间结束
liyang's avatar
liyang committed
294 295
endFiltrationTime = int(123)

liyang's avatar
liyang committed
296
file_dir = f'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}'
297 298 299 300 301 302
# 任务目录名称
local_path_name = str(int(time.time()))
# 任务目录路径
local_path = f'{os.path.join(file_dir, local_path_name)}'
# 任务目录是否创建
local_path_status = create_directory_if_not_exists(local_path)
303
# 是否启用
304
status_task = 0
305
# 调用main函数
306
main()