Commit 958dc177 authored by liyang's avatar liyang

fix:爬取youtube

parent 5d9b5cc8
......@@ -8,3 +8,4 @@ user_data/**
log/**/*.json
app.log
reptile_data/**/*.json
*.mp4
\ No newline at end of file
import requests
from config.settings import get_base_url
headers = {}
baser_url = "http://192.168.0.127:8081/"
baser_url = get_base_url()
def importJson(file, form_data):
......
# 日志文件路径
def get_log_path():
return "../"
def get_base_url():
return "http://192.168.0.127:8081/"
def get_base_file_url():
return "http://192.168.0.127:8186/"
\ No newline at end of file
......@@ -10,6 +10,7 @@ from bs4 import BeautifulSoup
from datetime import datetime
from api.index import importJson, getReptileTask, importJsonPath
from utils.Logger import log
from utils.index import convert_to_traditional
# from requests_toolbelt import *
from utils.createBrowserDriver import create
import opencc
......@@ -23,41 +24,6 @@ import os
'''
def write_to_database(data):
# 连接到数据库
connection = pymysql.connect(host='192.168.0.103',
user='root',
password='123456',
database='test',
cursorclass=pymysql.cursors.DictCursor)
try:
# 创建游标
with connection.cursor() as cursor:
for data_item in data:
# 获取数据
title = data_item['title']
content = data_item['content']
log.debug("content 长度:" + str(len(content)))
# 执行数据库操作
sql = "INSERT INTO message (title, content) VALUES (%s, %s)"
cursor.execute(sql, (title, content))
# 提交事务
connection.commit()
# 事务执行成功
log.debug("事务执行成功")
except pymysql.Error as e:
# 发生异常时回滚事务
connection.rollback()
log.debug("事务执行失败:", e)
finally:
# 关闭连接
connection.close()
def reptile(browser=None, search_word=""):
url = "https://www.ptt.cc/bbs/hotboards.html"
# 无头模式执行
......@@ -229,13 +195,6 @@ def reptile(browser=None, search_word=""):
# time.sleep(3)
browser.quit()
def convert_to_traditional(simplified_text):
converter = opencc.OpenCC('s2t.json') # 创建简体中文到繁体中文的转换器
traditional_text = converter.convert(simplified_text) # 进行转换
return traditional_text
def main():
# 请求关键词
response = getReptileTask()
......@@ -247,7 +206,7 @@ def main():
if item['name'] == 'ptt':
search_word = item['keyword']
table_name = item['tableName']
# print(convert_to_traditional(search_word))
# 简体转繁体
reptile(None, convert_to_traditional(search_word))
else:
log.debug("call failed")
......
import ssl
from pytube import YouTube
import json
import time
from bs4 import BeautifulSoup
from utils.Logger import log
from utils.createBrowserDriver import create
from utils.filse import save_json
import pymysql.cursors
ssl._create_default_https_context = ssl._create_stdlib_context
def Download(link,file_dir):
yt = YouTube(link)
# yt.register_on_complete_callback(callback)
yt.streams.filter(progressive=True, file_extension='mp4')
steam = yt.streams.get_by_itag(22)
try:
steam.download(file_dir)
return True
except:
print("下载失败")
return False
def write_to_database(data):
# 连接到数据库
connection = pymysql.connect(host='192.168.0.103',
user='root',
password='123456',
database='test',
cursorclass=pymysql.cursors.DictCursor)
try:
# 创建游标
with connection.cursor() as cursor:
for data_item in data:
# 获取数据
title = data_item['title']
content = data_item['content']
print("content 长度:" + str(len(content)))
# 执行数据库操作
sql = "INSERT INTO message (title, content) VALUES (%s, %s)"
cursor.execute(sql, (title, content))
# 提交事务
connection.commit()
# 事务执行成功
print("事务执行成功")
from api.index import importJson, getReptileTask, importJsonPath
from utils.index import convert_to_traditional, yt_dlp_download,convert_string_to_time
# from pytube import YouTube
from config.settings import get_base_file_url
def reptile(browser=None, search_word=""):
"""
except pymysql.Error as e:
# 发生异常时回滚事务
connection.rollback()
print("事务执行失败:", e)
finally:
# 关闭连接
connection.close()
def reptile(browser=None):
:param browser:
:param search_word:
:return:
"""
option = ['--headless']
browser = browser or create(url,None)
browser = browser or create(['--headless'])
# print(browser)
# 打开网页
# browser.get(url)
classify_video_list = browser.find_elements('xpath', "//div[@id='contents']//ytd-video-renderer//div[@id='title-wrapper']//a")
url = f'https://www.youtube.com/results?search_query={search_word}'
browser.get(url)
classify_video_list = browser.find_elements('xpath',
"//div[@id='contents']//ytd-video-renderer//div[@id='title-wrapper']//a")
element_author_list = browser.find_elements('xpath',"//div[@id='contents']//ytd-video-renderer//ytd-channel-name//yt-formatted-string/a")
element_time_list = browser.find_elements('xpath',"//div[@id='contents']//ytd-video-renderer//ytd-video-meta-block//div[@id='metadata-line']/span[2]")
# print(classify_item_list)
length = len(classify_video_list)
for index in range(length):
if 0 < index < 2:
title = classify_video_list[index].get_attribute('title')
# link = classify_video_list[index].get_attribute('href')
link = "https://www.youtube.com/watch?v=7q88m5MQRhE"
link = classify_video_list[index].get_attribute('href')
# yt = YouTube(link)
# link = "https://www.youtube.com/watch?v=7q88m5MQRhE"
# print(link)
file_url = './'+link+'.mp4'
state_download = Download(link,file_url)
# author = element_author_list[index].text
# file_url = './' + link + '.mp4'
id = link.split("?")[1].split("&")[0].replace("v=","")
url = f'https://www.youtube.com/watch?v={id}'
base_urr = get_base_file_url()
log.debug(url)
state_download = yt_dlp_download(url, 'youtube')
file_http_src = f'{base_urr}/youtube/{id}.mp4'
if state_download:
# 组装数据
obj = {
"title": title,
"content": f'<video src="{file_url}"></video>',
"videoUrl": file_url,
"content": f'<video src="{file_http_src}"></video>',
"videoUrl": file_http_src,
"link": link,
"reptileTime": str(int(time.time())),
"type": '视频',
"author": element_author_list[index].text,
"releaseTime": convert_string_to_time(element_time_list[index].text)
}
data.append(obj)
else:
return False
if len(data) > 0:
# 保存json文件到本地
state_save = save_json('./youtubeData', data)
log.debug(os.path.abspath("../"))
file_dir = f'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", "ptt", str(int(time.time())) + ".json")}'
# file_dir = f'./reptile_data/ptt/{int(time.time())}.json'
state_save = save_json(file_dir, data)
# log.debug("-----------------------------")
# write_to_database(data)
if state_save:
print('文件保存成功')
log.debug('save file success')
# path = os.path.abspath(file_dir).join(file_dir).join(".json")
# log.debug('file_path:' + file_dir)
# form_data = {
# "path": file_dir,
# "tableName": table_name
# }
# response = importJsonPath(form_data)
else:
log.debug('save file failed')
else:
print('文件保存失败')
# 爬取数据为空
log.info("未爬取到数据")
# form_data = {
# "path": "",
# "tableName": table_name
# }
# response = importJsonPath(form_data)
browser.close()
# 关闭浏览器驱动
browser.quit()
url = "https://www.youtube.com/results?search_query=水"
data = [] # json 数组
def main():
"""
"""
# 请求关键词
response = getReptileTask()
# print(response)
if response['status_code'] == 200 and response['data']['code'] == 200:
log.debug("call success")
search_word = ""
for item in response['data']['rows']:
if item['name'] == 'youtube':
search_word = item['keyword']
table_name = item['tableName']
reptile(None, convert_to_traditional(search_word))
else:
log.debug("call failed")
reptile(None, '')
# upload_control()
reptile()
\ No newline at end of file
# 全局变量
data = []
table_name = "pms_youtube"
# 调用main函数
main()
\ No newline at end of file
......@@ -13,6 +13,11 @@ from selenium.webdriver.support.ui import WebDriverWait
def create(option=None):
"""
:param option:
:return:
"""
chrome_options = webdriver.ChromeOptions()
if option is not None:
for value in option:
......@@ -21,7 +26,7 @@ def create(option=None):
# 启用浏览器的持久性会话,可以保存登录状态和Cookie
user_data_dir = os.path.join(os.path.abspath("../"), 'network-assets-reptile', 'user_data')
script = f'--user-data-dir={user_data_dir}'
print(script)
# print(script)
# log.debug(script)
chrome_options.add_argument(script) # 设置一个自定义的用户配置文件路径
......
import pymysql.cursors
def write_to_database(data):
"""
将数据写入数据库。
参数:
- data: 要写入数据库的数据,应为一个包含字典的列表。
返回值:
无。
异常:
- pymysql.Error: 当执行数据库操作时发生错误时会引发该异常。
"""
# 连接到数据库
connection = pymysql.connect(host='192.168.0.103',
user='root',
password='123456',
database='test',
cursorclass=pymysql.cursors.DictCursor)
try:
# 创建游标
with connection.cursor() as cursor:
for data_item in data:
# 获取数据
title = data_item['title']
content = data_item['content']
log.debug("content 长度:" + str(len(content)))
# 执行数据库操作
sql = "INSERT INTO message (title, content) VALUES (%s, %s)"
cursor.execute(sql, (title, content))
# 提交事务
connection.commit()
# 事务执行成功
log.debug("事务执行成功")
except pymysql.Error as e:
# 发生异常时回滚事务
connection.rollback()
log.debug("事务执行失败:", e)
finally:
# 关闭连接
connection.close()
import os.path
import opencc
from pytube import YouTube
import ssl
import subprocess
from utils.Logger import log
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
ssl._create_default_https_context = ssl._create_stdlib_context
def convert_string_to_time(string):
"""
:param string:
:return:
"""
current_time = datetime.now()
if "天前" in string:
days = int(string.split("天前")[0])
converted_time = current_time - relativedelta(days=days)
elif "周前" in string:
weeks = int(string.split("周前")[0])
converted_time = current_time - relativedelta(weeks=weeks)
elif "月前" in string:
months = int(string.split("月前")[0])
converted_time = current_time - relativedelta(months=months)
elif "年前" in string:
years = int(string.split("年前")[0])
converted_time = current_time - relativedelta(years=years)
else:
raise ValueError("Invalid string format")
timestamp = converted_time.timestamp()
return timestamp
def convert_to_traditional(simplified_text):
"""
将简体中文文本转换为繁体中文文本。
Args:
simplified_text (str): 要转换的简体中文文本。
Returns:
str: 转换后的繁体中文文本。
"""
converter = opencc.OpenCC('s2t.json') # 创建简体中文到繁体中文的转换器
traditional_text = converter.convert(simplified_text) # 进行转换
return traditional_text
def pytube_download(link, file_dir):
"""
下载指定链接的 YouTube 视频并保存到指定目录。
参数:
- link: 字符串,YouTube 视频的链接。
- file_dir: 字符串,视频保存的目录路径。
返回值:
- 如果下载成功,则返回 True。
- 如果下载失败,则返回 False。
"""
yt = YouTube(link) # 创建 YouTube 对象
# yt.register_on_complete_callback(callback)
yt.streams.filter(progressive=True, file_extension='mp4') # 过滤指定条件的视频流
steam = yt.streams.get_by_itag(22) # 获取指定标签的视频流
try:
steam.download(file_dir) # 下载视频并保存到指定目录
return True
except:
print("下载失败") # 捕获下载异常并输出错误信息
return False
def yt_dlp_download(url, name):
file_dir = os.path.abspath("../")
options = f'-v'
network_options = f'-o "{os.path.join(file_dir, "network-assets-reptile", "reptile_data", name, "%(id)s.%(ext)s")}"'
geo = ""
# --get-url
video_selection = f''
download_options = ""
other_options = f'--verbose'
# 要执行的 shell 命令
command = f'yt-dlp {options} {network_options} {geo} {video_selection} {download_options} {other_options} -- {url}'
# 使用 subprocess 调用 shell 命令
result = subprocess.run(command, shell=True, capture_output=True, text=True)
# 检查命令执行结果
if result.returncode == 0:
# 命令成功执行,输出结果
# log.debug(str(result.stdout))
return True
else:
# 命令执行失败,输出错误信息
# log.debug(str(result.stderr))
return False
# Lines starting with # are comments
# Always extract audio
#-x
# Do not copy the mtime
#--no-mtime
# Use this proxy
#--proxy 127.0.0.1:3128
# Save all videos under Movies directory in your home directory
-o /Users/macosx/Desktop/项目文档/网络资产管理系统/network-assets-reptile/youtube-dl/%(id)s.%(ext)s
\ No newline at end of file
from utils.index import yt_dlp_download
url = 'https://www.youtube.com/watch?v=ntoO0h6-RH4'
flag = yt_dlp_download(url, 'youtube')
print(flag)
调用示例:
youtube-dl --config-location "/Users/macosx/Desktop/项目文档/网络资产管理系统/network-assets-reptile/youtube-dl/youtube-dl.text" --geo-bypass URL "https://www.youtube.com/watch?v=BoGepY86__A"
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment