Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
N
network-assets-reptile
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
liyang
network-assets-reptile
Commits
d0b2cdfa
Commit
d0b2cdfa
authored
Jul 31, 2023
by
liyang
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
feat:任务爬取数据存放在独立目录
parent
602bca1c
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
238 additions
and
41 deletions
+238
-41
pc_dcard.py
pc_dcard.py
+34
-7
pc_facebook.py
pc_facebook.py
+39
-7
pc_instagram.py
pc_instagram.py
+27
-6
pc_ptt.py
pc_ptt.py
+34
-7
pc_twitter.py
pc_twitter.py
+34
-7
pc_youtube.py
pc_youtube.py
+33
-7
index.py
utils/index.py
+37
-0
No files found.
pc_dcard.py
View file @
d0b2cdfa
...
...
@@ -6,7 +6,7 @@ from utils.createBrowserDriver import create
from
utils.filse
import
save_json
from
api.index
import
importJson
,
getReptileTask
,
importJsonPath
from
utils.index
import
convert_to_traditional
,
yt_dlp_download
,
convert_string_to_time
,
parse_twitter_time_string
,
\
is_base64_image
,
save_base64_image
,
get_screen_resolution
is_base64_image
,
save_base64_image
,
get_screen_resolution
,
create_directory_if_not_exists
,
delete_directory
# from pytube import YouTube
from
selenium.common.exceptions
import
NoSuchElementException
import
os
...
...
@@ -115,9 +115,9 @@ def reptile(browser=None, search_word=""):
# 下载图片至本地,替换标签中的src
id
=
str
(
int
(
time
.
time
()))
# 下载地址
download_dir
=
f
'{os.path.join(
file_dir
, f"{id}.jpg")}'
download_dir
=
f
'{os.path.join(
local_path
, f"{id}.jpg")}'
# 访问地址
access_address
=
f
'{get_base_file_url()}{table_name.split("_")[1]}/{id}.jpg'
access_address
=
f
'{get_base_file_url()}{table_name.split("_")[1]}/{
local_path_name}/{
id}.jpg'
if
is_base64_image
(
element
[
'src'
])
==
False
:
log
.
debug
(
"图片属于 url"
)
# 下载状态
...
...
@@ -173,8 +173,16 @@ def reptile(browser=None, search_word=""):
# print(data)
if
len
(
data
)
>
0
:
# 保存json文件到本地
# log.debug(os.path.abspath("../"))
state_save
=
save_json
(
os
.
path
.
join
(
file_dir
,
str
(
int
(
time
.
time
()))
+
".json"
),
data
)
json_path
=
os
.
path
.
join
(
local_path
,
"data.json"
)
state_save
=
save_json
(
json_path
,
data
)
# 保存task
task
=
{
# 爬取时间
"reptileTime"
:
data
[
0
][
"reptileTime"
],
# 本地路径
"localPath"
:
local_path
}
state_save
=
save_json
(
os
.
path
.
join
(
file_dir
,
"task.json"
),
task
)
if
state_save
:
log
.
debug
(
'save file success'
)
else
:
...
...
@@ -183,6 +191,8 @@ def reptile(browser=None, search_word=""):
else
:
# 爬取数据为空
log
.
info
(
"未爬取到数据"
)
# 删除目录
delete_directory
(
local_path
)
script_close
(
browser
)
...
...
@@ -193,7 +203,15 @@ def script_close(browser):
browser
.
quit
()
except
:
log
.
debug
(
"浏览器驱动关闭失败"
)
sys
.
exit
()
try
:
# 一些代码...
sys
.
exit
()
except
SystemExit
:
raise
# 重新抛出SystemExit异常,让脚本退出
except
Exception
as
e
:
# 异常处理代码...
print
(
"sys.exit() 执行失败"
)
def
main
():
...
...
@@ -229,13 +247,22 @@ def main():
# 全局变量
data
=
[]
# 任务详情
task
=
{}
table_name
=
"pms_dcard"
# 过滤时间开始
filter_time_start
=
int
(
123
)
# 过滤时间结束
filter_time_end
=
int
(
123
)
# 文件根目录
file_dir
=
f
'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}'
# 任务目录名称
local_path_name
=
str
(
int
(
time
.
time
()))
# 任务目录路径
local_path
=
f
'{os.path.join(file_dir, local_path_name)}'
# 任务目录是否创建
local_path_status
=
create_directory_if_not_exists
(
local_path
)
# 是否启用
status_task
=
'0'
status_task
=
0
# 调用main函数
main
()
pc_facebook.py
View file @
d0b2cdfa
...
...
@@ -6,7 +6,8 @@ from utils.Logger import log
from
utils.createBrowserDriver
import
create
from
utils.filse
import
save_json
from
api.index
import
importJson
,
getReptileTask
,
importJsonPath
from
utils.index
import
convert_to_traditional
,
yt_dlp_download
,
convert_string_to_time
,
parse_time_string
from
utils.index
import
convert_to_traditional
,
yt_dlp_download
,
convert_string_to_time
,
\
parse_time_string
,
create_directory_if_not_exists
,
delete_directory
# from pytube import YouTube
from
datetime
import
datetime
from
utils.download_image
import
download_image
...
...
@@ -30,7 +31,7 @@ from selenium.webdriver.support import expected_conditions as EC
def
reptile
(
browser
=
None
,
search_word
=
""
):
print
(
f
"搜索词:{search_word}"
)
url
=
"https://www.facebook.com/"
browser
=
browser
or
create
(
no_headless
=
False
,
using_user_data
=
Tru
e
)
browser
=
browser
or
create
(
no_headless
=
False
,
using_user_data
=
Fals
e
)
# 打开网页
browser
.
get
(
url
)
time
.
sleep
(
2
)
...
...
@@ -50,8 +51,10 @@ def reptile(browser=None, search_word=""):
log
.
debug
(
"facebook login complete"
)
url
=
f
"https://www.facebook.com/search/top?q={search_word}"
browser
.
get
(
url
)
# time.sleep(1)
# 使用 JavaScript 将网页滚动到底部
browser
.
execute_script
(
"window.scrollTo(0, document.body.scrollHeight);"
)
# time.sleep(1)
# 等待内容出现,设置最长等待时间为10秒
wait
=
WebDriverWait
(
browser
,
10
)
# 通过 expected_conditions 来定义等待条件,这里以弹窗内容的某个元素为例
...
...
@@ -124,9 +127,9 @@ def reptile(browser=None, search_word=""):
# 下载图片至本地,替换标签中的src
id
=
str
(
int
(
time
.
time
()))
# 下载地址
download_dir
=
f
'{os.path.join(
file_dir
, f"{id}.jpg")}'
download_dir
=
f
'{os.path.join(
local_path
, f"{id}.jpg")}'
# 访问地址
access_address
=
f
'{get_base_file_url()}{table_name.split("_")[1]}/{id}.jpg'
access_address
=
f
'{get_base_file_url()}{table_name.split("_")[1]}/{
local_path_name}/{
id}.jpg'
# 下载状态
status
=
download_image
(
element
[
'src'
],
download_dir
)
if
status
:
...
...
@@ -169,8 +172,16 @@ def reptile(browser=None, search_word=""):
if
len
(
data
)
>
0
:
# 保存json文件到本地
# log.debug(os.path.abspath("../"))
state_save
=
save_json
(
os
.
path
.
join
(
file_dir
,
str
(
int
(
time
.
time
()))
+
".json"
),
data
)
json_path
=
os
.
path
.
join
(
local_path
,
"data.json"
)
state_save
=
save_json
(
json_path
,
data
)
# 保存task
task
=
{
# 爬取时间
"reptileTime"
:
data
[
0
][
"reptileTime"
],
# 本地路径
"localPath"
:
local_path
}
state_save
=
save_json
(
os
.
path
.
join
(
file_dir
,
"task.json"
),
task
)
if
state_save
:
log
.
debug
(
'save file success'
)
else
:
...
...
@@ -179,6 +190,8 @@ def reptile(browser=None, search_word=""):
else
:
# 爬取数据为空
log
.
info
(
"未爬取到数据"
)
# 删除目录
delete_directory
(
local_path
)
script_close
(
browser
)
...
...
@@ -189,7 +202,15 @@ def script_close(browser):
browser
.
quit
()
except
:
log
.
debug
(
"浏览器驱动关闭失败"
)
sys
.
exit
()
try
:
# 一些代码...
sys
.
exit
()
except
SystemExit
:
raise
# 重新抛出SystemExit异常,让脚本退出
except
Exception
as
e
:
# 异常处理代码...
print
(
"sys.exit() 执行失败"
)
def
main
():
...
...
@@ -225,11 +246,22 @@ def main():
# 全局变量
data
=
[]
# 任务详情
task
=
{}
table_name
=
"pms_facebook"
# 过滤时间开始
filter_time_start
=
int
(
123
)
# 过滤时间结束
filter_time_end
=
int
(
123
)
# 文件根目录
file_dir
=
f
'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}'
# 任务目录名称
local_path_name
=
str
(
int
(
time
.
time
()))
# 任务目录路径
local_path
=
f
'{os.path.join(file_dir, local_path_name)}'
# 任务目录是否创建
local_path_status
=
create_directory_if_not_exists
(
local_path
)
# 是否启用
status_task
=
0
# 调用main函数
main
()
pc_instagram.py
View file @
d0b2cdfa
...
...
@@ -5,7 +5,8 @@ from utils.Logger import log
from
utils.createBrowserDriver
import
create
from
utils.filse
import
save_json
from
api.index
import
importJson
,
getReptileTask
,
importJsonPath
from
utils.index
import
convert_to_traditional
,
yt_dlp_download
,
convert_string_to_time
,
parse_twitter_time_string
from
utils.index
import
convert_to_traditional
,
yt_dlp_download
,
convert_string_to_time
,
\
parse_twitter_time_string
,
create_directory_if_not_exists
,
delete_directory
# from pytube import YouTube
from
config.settings
import
get_account
import
os
...
...
@@ -120,12 +121,13 @@ def reptile(browser=None, search_word=""):
img_soup
=
a_soup
.
find
(
"img"
)
img_soup
[
"style"
]
=
"width:100
%
"
src
=
img_soup
[
"src"
]
str_list
=
link_str
.
split
(
"/"
)
img_id
=
str_list
[
len
(
str_list
)
-
2
]
# 下载地址
download_dir
=
f
'{os.path.join(
file_dir
, f"{img_id}.jpg")}'
download_dir
=
f
'{os.path.join(
local_path
, f"{img_id}.jpg")}'
# 访问地址
access_address
=
f
'{get_base_file_url()}{table_name.split("_")[1]}/{img_id}.jpg'
access_address
=
f
'{get_base_file_url()}{table_name.split("_")[1]}/{
local_path_name}/{
img_id}.jpg'
# 下载状态
status
=
download_image
(
src
,
download_dir
)
if
status
:
...
...
@@ -161,8 +163,16 @@ def reptile(browser=None, search_word=""):
if
len
(
data
)
>
0
:
# 保存json文件到本地
# log.debug(os.path.abspath("../"))
state_save
=
save_json
(
os
.
path
.
join
(
file_dir
,
str
(
int
(
time
.
time
()))
+
".json"
),
data
)
json_path
=
os
.
path
.
join
(
local_path
,
"data.json"
)
state_save
=
save_json
(
json_path
,
data
)
# 保存task
task
=
{
# 爬取时间
"reptileTime"
:
data
[
0
][
"reptileTime"
],
# 本地路径
"localPath"
:
local_path
}
state_save
=
save_json
(
os
.
path
.
join
(
file_dir
,
"task.json"
),
task
)
if
state_save
:
log
.
debug
(
'save file success'
)
else
:
...
...
@@ -171,6 +181,8 @@ def reptile(browser=None, search_word=""):
else
:
# 爬取数据为空
log
.
info
(
"未爬取到数据"
)
# 删除目录
delete_directory
(
local_path
)
script_close
(
browser
)
...
...
@@ -224,13 +236,22 @@ def main():
# 全局变量
data
=
[]
# 任务详情
task
=
{}
table_name
=
"pms_instagram"
# 过滤时间开始
filter_time_start
=
int
(
123
)
# 过滤时间结束
filter_time_end
=
int
(
123
)
# 文件根目录
file_dir
=
f
'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}'
# 任务目录名称
local_path_name
=
str
(
int
(
time
.
time
()))
# 任务目录路径
local_path
=
f
'{os.path.join(file_dir, local_path_name)}'
# 任务目录是否创建
local_path_status
=
create_directory_if_not_exists
(
local_path
)
# 是否启用
status_task
=
'0'
status_task
=
0
# 调用main函数
main
()
pc_ptt.py
View file @
d0b2cdfa
...
...
@@ -10,7 +10,7 @@ from bs4 import BeautifulSoup
from
datetime
import
datetime
from
api.index
import
importJson
,
getReptileTask
,
importJsonPath
from
utils.Logger
import
log
from
utils.index
import
convert_to_traditional
from
utils.index
import
convert_to_traditional
,
create_directory_if_not_exists
,
delete_directory
# from requests_toolbelt import *
from
utils.createBrowserDriver
import
create
import
opencc
...
...
@@ -155,9 +155,9 @@ def reptile(browser=None, search_word=""):
# 下载图片至本地,替换标签中的src
id
=
str
(
int
(
time
.
time
()))
# 下载地址
download_dir
=
f
'{os.path.join(
file_dir
, f"{id}.jpg")}'
download_dir
=
f
'{os.path.join(
local_path
, f"{id}.jpg")}'
# 访问地址
access_address
=
f
'{get_base_file_url()}{table_name.split("_")[1]}/{id}.jpg'
access_address
=
f
'{get_base_file_url()}{table_name.split("_")[1]}/{
local_path_name}/{
id}.jpg'
# 下载状态
status
=
download_image
(
element
[
'src'
],
download_dir
)
if
status
:
...
...
@@ -223,8 +223,16 @@ def reptile(browser=None, search_word=""):
# print(data)
if
len
(
data
)
>
0
:
# 保存json文件到本地
# log.debug(os.path.abspath("../"))
state_save
=
save_json
(
os
.
path
.
join
(
file_dir
,
str
(
int
(
time
.
time
()))
+
".json"
),
data
)
json_path
=
os
.
path
.
join
(
local_path
,
"data.json"
)
state_save
=
save_json
(
json_path
,
data
)
# 保存task
task
=
{
# 爬取时间
"reptileTime"
:
data
[
0
][
"reptileTime"
],
# 本地路径
"localPath"
:
local_path
}
state_save
=
save_json
(
os
.
path
.
join
(
file_dir
,
"task.json"
),
task
)
if
state_save
:
log
.
debug
(
'save file success'
)
else
:
...
...
@@ -233,6 +241,8 @@ def reptile(browser=None, search_word=""):
else
:
# 爬取数据为空
log
.
info
(
"未爬取到数据"
)
# 删除目录
delete_directory
(
local_path
)
script_close
(
browser
)
...
...
@@ -243,7 +253,15 @@ def script_close(browser):
browser
.
quit
()
except
:
log
.
debug
(
"浏览器驱动关闭失败"
)
sys
.
exit
()
try
:
# 一些代码...
sys
.
exit
()
except
SystemExit
:
raise
# 重新抛出SystemExit异常,让脚本退出
except
Exception
as
e
:
# 异常处理代码...
print
(
"sys.exit() 执行失败"
)
def
main
():
...
...
@@ -279,13 +297,22 @@ def main():
# 全局变量
data
=
[]
# 任务详情
task
=
{}
table_name
=
"pms_ptt"
# 过滤时间开始
filter_time_start
=
int
(
123
)
# 过滤时间结束
filter_time_end
=
int
(
123
)
# 文件根目录
file_dir
=
f
'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}'
# 任务目录名称
local_path_name
=
str
(
int
(
time
.
time
()))
# 任务目录路径
local_path
=
f
'{os.path.join(file_dir, local_path_name)}'
# 任务目录是否创建
local_path_status
=
create_directory_if_not_exists
(
local_path
)
# 是否启用
status_task
=
'0'
status_task
=
0
# 调用main函数
main
()
pc_twitter.py
View file @
d0b2cdfa
...
...
@@ -5,7 +5,8 @@ from utils.Logger import log
from
utils.createBrowserDriver
import
create
from
utils.filse
import
save_json
from
api.index
import
importJson
,
getReptileTask
,
importJsonPath
from
utils.index
import
convert_to_traditional
,
yt_dlp_download
,
convert_string_to_time
,
parse_twitter_time_string
,
extract_image_format
from
utils.index
import
convert_to_traditional
,
yt_dlp_download
,
convert_string_to_time
,
parse_twitter_time_string
,
\
extract_image_format
,
create_directory_if_not_exists
,
delete_directory
# from pytube import YouTube
import
os
import
sys
...
...
@@ -140,9 +141,9 @@ def reptile(browser=None, search_word=""):
id
=
str
(
int
(
time
.
time
()))
image_type
=
extract_image_format
(
element
[
'src'
])
# 下载地址
download_dir
=
f
'{os.path.join(
file_dir
, f"{id}.{image_type}")}'
download_dir
=
f
'{os.path.join(
local_path
, f"{id}.{image_type}")}'
# 访问地址
access_address
=
f
'{get_base_file_url()}{table_name.split("_")[1]}/{id}.{image_type}'
access_address
=
f
'{get_base_file_url()}{table_name.split("_")[1]}/{
local_path_name}/{
id}.{image_type}'
# 下载状态
status
=
download_image
(
element
[
'src'
],
download_dir
)
if
status
:
...
...
@@ -192,8 +193,16 @@ def reptile(browser=None, search_word=""):
if
len
(
data
)
>
0
:
# 保存json文件到本地
# log.debug(os.path.abspath("../"))
state_save
=
save_json
(
os
.
path
.
join
(
file_dir
,
str
(
int
(
time
.
time
()))
+
".json"
),
data
)
json_path
=
os
.
path
.
join
(
local_path
,
"data.json"
)
state_save
=
save_json
(
json_path
,
data
)
# 保存task
task
=
{
# 爬取时间
"reptileTime"
:
data
[
0
][
"reptileTime"
],
# 本地路径
"localPath"
:
local_path
}
state_save
=
save_json
(
os
.
path
.
join
(
file_dir
,
"task.json"
),
task
)
if
state_save
:
log
.
debug
(
'save file success'
)
else
:
...
...
@@ -202,6 +211,8 @@ def reptile(browser=None, search_word=""):
else
:
# 爬取数据为空
log
.
info
(
"未爬取到数据"
)
# 删除目录
delete_directory
(
local_path
)
script_close
(
browser
)
...
...
@@ -212,7 +223,15 @@ def script_close(browser):
browser
.
quit
()
except
:
log
.
debug
(
"浏览器驱动关闭失败"
)
sys
.
exit
()
try
:
# 一些代码...
sys
.
exit
()
except
SystemExit
:
raise
# 重新抛出SystemExit异常,让脚本退出
except
Exception
as
e
:
# 异常处理代码...
print
(
"sys.exit() 执行失败"
)
def
main
():
...
...
@@ -248,13 +267,21 @@ def main():
# 全局变量
data
=
[]
# 任务详情
task
=
{}
table_name
=
"pms_twitter"
# 过滤时间开始
filter_time_start
=
int
(
123
)
# 过滤时间结束
filter_time_end
=
int
(
123
)
file_dir
=
f
'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}'
# 任务目录名称
local_path_name
=
str
(
int
(
time
.
time
()))
# 任务目录路径
local_path
=
f
'{os.path.join(file_dir, local_path_name)}'
# 任务目录是否创建
local_path_status
=
create_directory_if_not_exists
(
local_path
)
# 是否启用
status_task
=
'0'
status_task
=
0
# 调用main函数
main
()
pc_youtube.py
View file @
d0b2cdfa
...
...
@@ -6,7 +6,7 @@ from utils.Logger import log
from
utils.createBrowserDriver
import
create
from
utils.filse
import
save_json
from
api.index
import
importJson
,
getReptileTask
,
importJsonPath
from
utils.index
import
convert_to_traditional
,
yt_dlp_download
,
convert_string_to_time
from
utils.index
import
convert_to_traditional
,
yt_dlp_download
,
convert_string_to_time
,
create_directory_if_not_exists
,
delete_directory
from
pytube
import
YouTube
from
datetime
import
datetime
import
os
...
...
@@ -78,9 +78,9 @@ def reptile(browser=None, search_word=""):
video_url
=
[]
# 下载地址
download_dir
=
f
'{os.path.join(
file_dir
, f"{id}.mp4")}'
download_dir
=
f
'{os.path.join(
local_path
, f"{id}.mp4")}'
# 访问地址
access_address
=
f
'{get_base_file_url()}{table_name.split("_")[1]}/{id}.mp4'
access_address
=
f
'{get_base_file_url()}{table_name.split("_")[1]}/{
local_path_name}/{
id}.mp4'
# 下载视频
state_download
=
yt_dlp_download
(
url
,
'youtube'
)
video_url
.
append
(
download_dir
)
...
...
@@ -103,8 +103,16 @@ def reptile(browser=None, search_word=""):
error
=
""
if
len
(
data
)
>
0
:
# 保存json文件到本地
# log.debug(os.path.abspath("../"))
state_save
=
save_json
(
os
.
path
.
join
(
file_dir
,
str
(
int
(
time
.
time
()))
+
".json"
),
data
)
json_path
=
os
.
path
.
join
(
local_path
,
"data.json"
)
state_save
=
save_json
(
json_path
,
data
)
# 保存task
task
=
{
# 爬取时间
"reptileTime"
:
data
[
0
][
"reptileTime"
],
# 本地路径
"localPath"
:
local_path
}
state_save
=
save_json
(
os
.
path
.
join
(
file_dir
,
"task.json"
),
task
)
if
state_save
:
log
.
debug
(
'save file success'
)
else
:
...
...
@@ -113,6 +121,8 @@ def reptile(browser=None, search_word=""):
else
:
# 爬取数据为空
log
.
info
(
"未爬取到数据"
)
# 删除目录
delete_directory
(
local_path
)
script_close
(
browser
)
...
...
@@ -123,7 +133,15 @@ def script_close(browser):
browser
.
quit
()
except
:
log
.
debug
(
"浏览器驱动关闭失败"
)
sys
.
exit
()
try
:
# 一些代码...
sys
.
exit
()
except
SystemExit
:
raise
# 重新抛出SystemExit异常,让脚本退出
except
Exception
as
e
:
# 异常处理代码...
print
(
"sys.exit() 执行失败"
)
def
main
():
...
...
@@ -160,13 +178,21 @@ def main():
# 全局变量
data
=
[]
# 任务详情
task
=
{}
table_name
=
"pms_youtube"
# 过滤时间开始
filter_time_start
=
int
(
123
)
# 过滤时间结束
filter_time_end
=
int
(
123
)
file_dir
=
f
'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}'
# 任务目录名称
local_path_name
=
str
(
int
(
time
.
time
()))
# 任务目录路径
local_path
=
f
'{os.path.join(file_dir, local_path_name)}'
# 任务目录是否创建
local_path_status
=
create_directory_if_not_exists
(
local_path
)
# 是否启用
status_task
=
'0'
status_task
=
0
# 调用main函数
main
()
utils/index.py
View file @
d0b2cdfa
import
os.path
import
re
import
shutil
import
time
from
hanziconv
import
HanziConv
...
...
@@ -77,6 +78,7 @@ def is_base64_image(url):
# 转换 facebook 的时间
def
parse_time_string
(
time_str
):
"""
转换 facebook 的时间
:param time_str:
:return:
...
...
@@ -108,6 +110,7 @@ def parse_time_string(time_str):
# 转换 youtube 的时间
def
convert_string_to_time
(
string
):
"""
转换 youtube 的时间
:param string:
:return:
...
...
@@ -137,6 +140,7 @@ def convert_string_to_time(string):
# 转换 twitter 的时间
def
parse_twitter_time_string
(
time_str
):
"""
转换 twitter 的时间
:param time_str:
:return:
...
...
@@ -212,3 +216,36 @@ def yt_dlp_download(url, name):
# 命令执行失败,输出错误信息
# log.debug(str(result.stderr))
return
False
def
create_directory_if_not_exists
(
directory_path
):
"""
创建目录
:param directory_path:
:return:
"""
if
not
os
.
path
.
exists
(
directory_path
):
try
:
os
.
makedirs
(
directory_path
)
return
True
except
OSError
as
e
:
return
False
else
:
return
True
def
delete_directory
(
directory_path
):
"""
删除目录
:param directory_path:
:return:
"""
try
:
shutil
.
rmtree
(
directory_path
)
# print(f"Directory '{directory_path}' and its contents have been deleted successfully.")
return
True
except
OSError
as
e
:
# print(f"Error: Failed to delete directory '{directory_path}'. {e}")
return
False
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment