import json import time from bs4 import BeautifulSoup from utils.Logger import log from utils.createBrowserDriver import create from utils.filse import save_json from api.index import importJson, getReptileTask, importJsonPath from utils.index import convert_to_traditional, yt_dlp_download, convert_string_to_time, parse_twitter_time_string # from pytube import YouTube from config.settings import get_account import os import sys from datetime import datetime from utils.download_image import download_image from config.settings import get_base_file_url # --------------- selenium 依赖 start ---------------- from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC # --------------- selenium 依赖 end ---------------- # 工具函数-下载图片 ''' 打开指定网页,并使用 Selenium 模拟点击 "GDPR-accept" 按钮,然后循环点击 "search-show-more-button" 按钮来加载更多数据,直到按钮不再可点击为止。最后,获取完整的分页数据并关闭浏览器驱动。 ''' def reptile(browser=None, search_word=""): """ :param browser: :param search_word: """ print(f"搜索词:{search_word}") base_url = "https://www.instagram.com/" option = ['--headless'] # ['--headless'] browser = browser or create(None, True) # print(browser) # 打开网页 browser.get(base_url) # 等待加载完成 time.sleep(2) # wait = WebDriverWait(browser, 10) # wait.until(EC.presence_of_element_located((By.XPATH, "//input[@name='username']"))) try: # 检测是否要登录 login_input = browser.find_element('xpath', "//input[@name='username']") password_input = browser.find_element('xpath', "//input[@name='password']") login_input.send_keys(get_account("instagram")["name"]) password_input.send_keys(get_account("instagram")["password"]) # 获取登录按钮 button_login = browser.find_element('xpath', "//button[@type='submit']") button_login.click() time.sleep(2) except: print("------") # print("1111") url = f"{base_url}explore/tags/{search_word}/" browser.get(url) wait = WebDriverWait(browser, 10) wait.until(EC.presence_of_element_located((By.XPATH, "//article//a"))) # 链接 element_link_list = browser.find_elements('xpath', "//article//a") length = len(element_link_list) for index in range(length): # element_link_list[index].click() browser.execute_script("arguments[0].click();", element_link_list[index]) # 等待弹窗加载完成 wait = WebDriverWait(browser, 10) wait.until(EC.presence_of_element_located((By.XPATH, "//div[@role='dialog']/div/div[2]"))) # 提取其他 author = browser.find_element("xpath", "//div[@role='dialog']/div//article/div/div[2]/div/div/div[1]//a") content_element = browser.find_element("xpath", "//div[@role='dialog']/div//article/div/div[2]/div/div/div[2]/div[1]//div[@role='button']//h1") time_element = browser.find_element("xpath", "//div[@role='dialog']/div//article/div/div[2]/div/div/div[2]/div[2]//time") link_str = browser.current_url # 提取时间,并转为时间戳 timestamp = datetime.fromisoformat(time_element.get_attribute("datetime")[:-1]).timestamp() # 提取弹窗内容 soup = BeautifulSoup(content_element.get_attribute("outerHTML"), "html.parser") # 提取图片、视频 picture_url = [] img_list = browser.find_elements("xpath", "//div[@role='dialog']/div//article/div/div[1]/div/div[1]//img") # 过滤视频 video_list = browser.find_elements("xpath", "//div[@role='dialog']/div//article/div/div[1]/div/div[1]//video") for key, item in enumerate(img_list): if len(video_list) == 0: if key == 0: title = item.get_attribute("alt") # 下载图片至本地,替换标签中的src id = str(int(time.time())) # 下载地址 download_dir = f'{os.path.join(file_dir, f"{id}.jpg")}' # 访问地址 access_address = f'{get_base_file_url()}{table_name.split("_")[1]}/{id}.jpg' # 下载状态 status = download_image(item.get_attribute("src"), download_dir) if status: # 将图片追加到内容中 img_soup = BeautifulSoup(item.get_attribute("outerHTML"), "html.parser") img_soup.img["src"] = access_address # print(img_soup.prettify()) soup.append(img_soup) picture_url.append(access_address) content = soup.prettify() # 类型 content_type = "图文" # --------------- 组装数据 start--------------------- obj = { "title": title or "", "content": content, "link": link_str, "reptileTime": str(int(time.time())), "type": content_type, "author": author.text, "releaseTime": str(int(timestamp)), "picture_url": ",".join(picture_url) } # --------------- 组装数据 end--------------------- data.append(obj) # 获取下一页按钮 next_buttons = browser.find_elements("xpath", "//div[@role='dialog']/div/div[1]//button") if index < length - 1: for key, item in enumerate(next_buttons): if key + 1 == len(next_buttons): item.click() if len(data) > 0: # 保存json文件到本地 # log.debug(os.path.abspath("../")) state_save = save_json(os.path.join(file_dir, str(int(time.time())) + ".json"), data) if state_save: log.debug('save file success') else: log.debug('save file failed') script_close(browser) else: # 爬取数据为空 log.info("未爬取到数据") script_close(browser) def script_close(browser): # 关闭浏览器驱动 try: browser.close() browser.quit() except: log.debug("浏览器驱动关闭失败") try: # 一些代码... sys.exit() except SystemExit: raise # 重新抛出SystemExit异常,让脚本退出 except Exception as e: # 异常处理代码... print("sys.exit() 执行失败") def main(): """ """ # 请求关键词 response = getReptileTask() global status_task # print(response) if response['status_code'] == 200 and response['data']['code'] == 200: log.debug("call success") search_word = "" for item in response['data']['rows']: if item['name'] == 'pms_instagram': search_word = item['keyword'] table_name = item['tableName'] status_task = int(item["status"]) # 简体转繁体 if status_task == 0 and len(search_word) > 0: reptile(None, convert_to_traditional(search_word)) else: log.debug("爬取任务未启用") else: log.debug("call failed") # 请求超时 reptile(None, convert_to_traditional("新闻")) # upload_control() # 全局变量 data = [] table_name = "pms_instagram" file_dir = f'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}' # 是否启用 status_task = '0' # 调用main函数 main()