Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
N
network-assets-reptile
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
liyang
network-assets-reptile
Commits
602bca1c
Commit
602bca1c
authored
Jul 28, 2023
by
liyang
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
fix:时间过滤
parent
c959a447
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
150 additions
and
76 deletions
+150
-76
pc_dcard.py
pc_dcard.py
+44
-30
pc_facebook.py
pc_facebook.py
+17
-7
pc_instagram.py
pc_instagram.py
+20
-6
pc_ptt.py
pc_ptt.py
+32
-21
pc_twitter.py
pc_twitter.py
+15
-0
pc_youtube.py
pc_youtube.py
+22
-12
No files found.
pc_dcard.py
View file @
602bca1c
...
...
@@ -35,7 +35,7 @@ def reptile(browser=None, search_word=""):
"""
print
(
f
"搜索词:{search_word}"
)
base_url
=
"https://www.dcard.tw"
browser
=
browser
or
create
(
no_headless
=
Tru
e
,
using_user_data
=
True
)
browser
=
browser
or
create
(
no_headless
=
Fals
e
,
using_user_data
=
True
)
# 打开网页
browser
.
get
(
f
"{base_url}/search?query={search_word}"
)
time
.
sleep
(
6
)
...
...
@@ -53,10 +53,17 @@ def reptile(browser=None, search_word=""):
# 评论
element_comment_list
=
browser
.
find_elements
(
'xpath'
,
f
"{base_xpath}/div[3]/div[2]/div/span"
)
length
=
len
(
element_content_list
)
for
index
in
range
(
length
):
for
index
,
item
in
enumerate
(
element_content_list
):
# 提取时间,并转为时间戳
timestamp
=
datetime
.
fromisoformat
(
element_time_list
[
index
]
.
get_attribute
(
"datetime"
)[:
-
1
])
.
timestamp
()
# 过滤时间
# # 如果'releaseTime'不是整数,则将其转换为整数
new_releaseTime
=
int
(
timestamp
)
if
new_releaseTime
<
filter_time_start
or
new_releaseTime
>
filter_time_end
:
# 如果'new_releaseTime'不在指定范围内,则跳过当前迭代,继续下一个项目
continue
# 提取作者
author
=
element_authors_list
[
index
]
.
text
# 提取标题
...
...
@@ -103,32 +110,32 @@ def reptile(browser=None, search_word=""):
# ------------- 处理内容中的图片 start ------------------
picture_url
=
[]
image_list
=
soup
.
find_all
(
"img"
)
#
if len(image_list) > 0:
#
for key, element in enumerate(image_list):
#
# 下载图片至本地,替换标签中的src
#
id = str(int(time.time()))
#
# 下载地址
#
download_dir = f'{os.path.join(file_dir, f"{id}.jpg")}'
#
# 访问地址
#
access_address = f'{get_base_file_url()}{table_name.split("_")[1]}/{id}.jpg'
#
if is_base64_image(element['src']) == False:
#
log.debug("图片属于 url")
#
# 下载状态
#
status = download_image(element['src'], download_dir)
#
if status:
#
element['src'] = access_address
#
picture_url.append(access_address)
#
else:
#
log.debug("图片属于 base64")
#
# 下载base 64
#
# 下载状态
#
status = save_base64_image(element['src'], download_dir)
#
if status:
#
element['src'] = access_address
#
picture_url.append(access_address)
#
#
else:
#
print("")
if
len
(
image_list
)
>
0
:
for
key
,
element
in
enumerate
(
image_list
):
# 下载图片至本地,替换标签中的src
id
=
str
(
int
(
time
.
time
()))
# 下载地址
download_dir
=
f
'{os.path.join(file_dir, f"{id}.jpg")}'
# 访问地址
access_address
=
f
'{get_base_file_url()}{table_name.split("_")[1]}/{id}.jpg'
if
is_base64_image
(
element
[
'src'
])
==
False
:
log
.
debug
(
"图片属于 url"
)
# 下载状态
status
=
download_image
(
element
[
'src'
],
download_dir
)
if
status
:
element
[
'src'
]
=
access_address
picture_url
.
append
(
access_address
)
else
:
log
.
debug
(
"图片属于 base64"
)
# 下载base 64
# 下载状态
status
=
save_base64_image
(
element
[
'src'
],
download_dir
)
if
status
:
element
[
'src'
]
=
access_address
picture_url
.
append
(
access_address
)
else
:
print
(
""
)
# ------------- 处理内容中的图片 end ------------------
content
=
soup
.
prettify
()
...
...
@@ -196,7 +203,8 @@ def main():
# 请求关键词
response
=
getReptileTask
()
global
status_task
# print(response)
global
filter_time_start
global
filter_time_end
if
response
[
'status_code'
]
==
200
and
response
[
'data'
][
'code'
]
==
200
:
log
.
debug
(
"call success"
)
search_word
=
""
...
...
@@ -205,6 +213,8 @@ def main():
search_word
=
item
[
'keyword'
]
table_name
=
item
[
'tableName'
]
status_task
=
int
(
item
[
"status"
])
filter_time_start
=
int
(
item
[
"beginFiltrationTime"
])
filter_time_end
=
int
(
item
[
"endFiltrationTime"
])
# 简体转繁体
if
status_task
==
0
and
len
(
search_word
)
>
0
:
reptile
(
None
,
convert_to_traditional
(
search_word
))
...
...
@@ -220,6 +230,10 @@ def main():
# 全局变量
data
=
[]
table_name
=
"pms_dcard"
# 过滤时间开始
filter_time_start
=
int
(
123
)
# 过滤时间结束
filter_time_end
=
int
(
123
)
file_dir
=
f
'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}'
# 是否启用
status_task
=
'0'
...
...
pc_facebook.py
View file @
602bca1c
...
...
@@ -78,18 +78,21 @@ def reptile(browser=None, search_word=""):
# print(length)
for
index
in
range
(
length
):
author
=
element_authors_list
[
index
]
.
text
# print(element_release_list[index].text)
# print(parse_time_string(element_release_list[index].text))
release_time_timestamp
=
int
(
parse_time_string
(
element_release_list
[
index
]
.
text
))
release_time
=
str
(
release_time_timestamp
)
# release_time = ""
# content = element_content_list[index].get_attribute("outerHTML")
# 使用BeautifulSoup解析HTML
# 过滤时间
# # 如果'releaseTime'不是整数,则将其转换为整数
new_releaseTime
=
int
(
release_time
)
if
new_releaseTime
<
filter_time_start
or
new_releaseTime
>
filter_time_end
:
# 如果'new_releaseTime'不在指定范围内,则跳过当前迭代,继续下一个项目
continue
text
=
element_content_list
[
index
]
.
text
soup
=
BeautifulSoup
(
element_content_list
[
index
]
.
get_attribute
(
'outerHTML'
),
'html.parser'
)
soup_str
=
soup
.
prettify
()
# 查找是否含有视频
# ignore_list = soup.find_all("div", {"data-visualcompletion": "video"})
video_list
=
soup
.
find_all
(
"video"
)
image_list
=
soup
.
find_all
(
"img"
)
# lth = len(ignore_list)
...
...
@@ -196,7 +199,8 @@ def main():
# 请求关键词
response
=
getReptileTask
()
global
status_task
# print(response)
global
filter_time_start
global
filter_time_end
if
response
[
'status_code'
]
==
200
and
response
[
'data'
][
'code'
]
==
200
:
log
.
debug
(
"call success"
)
search_word
=
""
...
...
@@ -205,6 +209,8 @@ def main():
search_word
=
item
[
'keyword'
]
table_name
=
item
[
'tableName'
]
status_task
=
int
(
item
[
"status"
])
filter_time_start
=
int
(
item
[
"beginFiltrationTime"
])
filter_time_end
=
int
(
item
[
"endFiltrationTime"
])
# 简体转繁体
if
status_task
==
0
and
len
(
search_word
)
>
0
:
reptile
(
None
,
convert_to_traditional
(
search_word
))
...
...
@@ -220,6 +226,10 @@ def main():
# 全局变量
data
=
[]
table_name
=
"pms_facebook"
# 过滤时间开始
filter_time_start
=
int
(
123
)
# 过滤时间结束
filter_time_end
=
int
(
123
)
file_dir
=
f
'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}'
# 调用main函数
main
()
pc_instagram.py
View file @
602bca1c
...
...
@@ -40,7 +40,7 @@ def reptile(browser=None, search_word=""):
# 打开网页
browser
.
get
(
base_url
)
# 等待加载完成
time
.
sleep
(
2
)
time
.
sleep
(
4
)
# wait = WebDriverWait(browser, 10)
# wait.until(EC.presence_of_element_located((By.XPATH, "//input[@name='username']")))
try
:
...
...
@@ -54,8 +54,7 @@ def reptile(browser=None, search_word=""):
button_login
.
click
()
time
.
sleep
(
4
)
except
:
print
(
"------"
)
# print("1111")
error
=
""
log
.
debug
(
"instagram login complete"
)
url
=
f
"{base_url}explore/tags/{search_word}/"
browser
.
get
(
url
)
...
...
@@ -83,6 +82,14 @@ def reptile(browser=None, search_word=""):
# 提取时间,并转为时间戳
timestamp
=
datetime
.
fromisoformat
(
time_element
.
get_attribute
(
"datetime"
)[:
-
1
])
.
timestamp
()
# 过滤时间
# # 如果'releaseTime'不是整数,则将其转换为整数
new_releaseTime
=
int
(
timestamp
)
if
new_releaseTime
<
filter_time_start
or
new_releaseTime
>
filter_time_end
:
# 如果'new_releaseTime'不在指定范围内,则跳过当前迭代,继续下一个项目
continue
# 提取弹窗内容
soup
=
BeautifulSoup
(
content_element
.
get_attribute
(
"outerHTML"
),
"html.parser"
)
# 提取图片、视频
...
...
@@ -100,14 +107,14 @@ def reptile(browser=None, search_word=""):
title
=
title_str_list
[
1
]
else
:
# 提取图片中的文字
title
=
""
title
=
author
.
text
+
"-"
+
str
(
time
.
time
())
img_soup
=
BeautifulSoup
(
item
.
get_attribute
(
"outerHTML"
),
"html.parser"
)
.
find
(
"img"
)
del
img_soup
[
"srcset"
]
img_soup
[
"style"
]
=
"width:100
%
"
src
=
item
.
get_attribute
(
"src"
)
else
:
# 有视频,图片链接从列表中提取
title
=
""
title
=
author
.
text
+
"-"
+
str
(
time
.
time
())
a_soup
=
BeautifulSoup
(
element_link_list
[
index
]
.
get_attribute
(
"outerHTML"
),
"html.parser"
)
# img_element = element_link_list[index].find_element("xpath","img")
img_soup
=
a_soup
.
find
(
"img"
)
...
...
@@ -191,7 +198,8 @@ def main():
# 请求关键词
response
=
getReptileTask
()
global
status_task
# print(response)
global
filter_time_start
global
filter_time_end
if
response
[
'status_code'
]
==
200
and
response
[
'data'
][
'code'
]
==
200
:
log
.
debug
(
"call success"
)
search_word
=
""
...
...
@@ -200,6 +208,8 @@ def main():
search_word
=
item
[
'keyword'
]
table_name
=
item
[
'tableName'
]
status_task
=
int
(
item
[
"status"
])
filter_time_start
=
int
(
item
[
"beginFiltrationTime"
])
filter_time_end
=
int
(
item
[
"endFiltrationTime"
])
# 简体转繁体
if
status_task
==
0
and
len
(
search_word
)
>
0
:
reptile
(
None
,
convert_to_traditional
(
search_word
))
...
...
@@ -215,6 +225,10 @@ def main():
# 全局变量
data
=
[]
table_name
=
"pms_instagram"
# 过滤时间开始
filter_time_start
=
int
(
123
)
# 过滤时间结束
filter_time_end
=
int
(
123
)
file_dir
=
f
'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}'
# 是否启用
status_task
=
'0'
...
...
pc_ptt.py
View file @
602bca1c
...
...
@@ -41,7 +41,7 @@ def reptile(browser=None, search_word=""):
# log.debug("已打开浏览器")
classify_item_list
=
browser
.
find_elements
(
'xpath'
,
"//div[@class='board-class']"
)
for
index
,
item_element
in
enumerate
(
classify_item_list
):
for
index
,
item_element
in
enumerate
(
classify_item_list
):
# 暂时先爬取 第2个 分类
if
0
<=
index
<
4
:
type_title
=
classify_item_list
[
index
]
.
text
...
...
@@ -118,12 +118,7 @@ def reptile(browser=None, search_word=""):
# 发布时间
element_release
=
browser
.
find_element
(
'xpath'
,
"//div[@id='main-content']/div[@class='article-metaline'][3]/span[2]"
)
date_string
=
element_release
.
text
date_format
=
"
%
a
%
b
%
d
%
H:
%
M:
%
S
%
Y"
# 将日期字符串转换为datetime对象
date_time
=
datetime
.
strptime
(
date_string
,
date_format
)
# 将datetime对象转换为时间戳(以秒为单位)
release_time
=
int
(
date_time
.
timestamp
())
# log.debug('开始判断类型')
try
:
# 找到所有第一级标签为 `div` 的元素
...
...
@@ -187,19 +182,28 @@ def reptile(browser=None, search_word=""):
html
=
soup
.
prettify
()
.
replace
(
'amp;'
,
''
)
# ------------------ content 过滤 end--------------
# --------------- 组装数据 start---------------------
obj
=
{
"title"
:
element_title
.
text
,
"content"
:
html
,
"link"
:
browser_current_url
,
"reptileTime"
:
str
(
int
(
time
.
time
())),
"type"
:
content_type
,
"author"
:
element_author
.
text
,
"releaseTime"
:
str
(
release_time
),
"picture_url"
:
","
.
join
(
picture_url
)
}
# --------------- 组装数据 end---------------------
data
.
append
(
obj
)
date_string
=
element_release
.
text
date_format
=
"
%
a
%
b
%
d
%
H:
%
M:
%
S
%
Y"
# 将日期字符串转换为datetime对象
date_time
=
datetime
.
strptime
(
date_string
,
date_format
)
# 将datetime对象转换为时间戳(以秒为单位)
release_time
=
int
(
date_time
.
timestamp
())
# 过滤时间
if
filter_time_start
<=
release_time
<=
filter_time_end
:
# --------------- 组装数据 start---------------------
obj
=
{
"title"
:
element_title
.
text
,
"content"
:
html
,
"link"
:
browser_current_url
,
"reptileTime"
:
str
(
int
(
time
.
time
())),
"type"
:
content_type
,
"author"
:
element_author
.
text
,
"releaseTime"
:
str
(
release_time
),
"picture_url"
:
","
.
join
(
picture_url
)
}
# --------------- 组装数据 end---------------------
data
.
append
(
obj
)
# 浏览器返回上一页
browser
.
back
()
time
.
sleep
(
0.1
)
...
...
@@ -249,7 +253,8 @@ def main():
# 请求关键词
response
=
getReptileTask
()
global
status_task
# print(response)
global
filter_time_start
global
filter_time_end
if
response
[
'status_code'
]
==
200
and
response
[
'data'
][
'code'
]
==
200
:
log
.
debug
(
"call success"
)
search_word
=
""
...
...
@@ -258,6 +263,8 @@ def main():
search_word
=
item
[
'keyword'
]
table_name
=
item
[
'tableName'
]
status_task
=
int
(
item
[
"status"
])
filter_time_start
=
int
(
item
[
"beginFiltrationTime"
])
filter_time_end
=
int
(
item
[
"endFiltrationTime"
])
# 简体转繁体
if
status_task
==
0
and
len
(
search_word
)
>
0
:
reptile
(
None
,
convert_to_traditional
(
search_word
))
...
...
@@ -273,6 +280,10 @@ def main():
# 全局变量
data
=
[]
table_name
=
"pms_ptt"
# 过滤时间开始
filter_time_start
=
int
(
123
)
# 过滤时间结束
filter_time_end
=
int
(
123
)
file_dir
=
f
'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}'
# 是否启用
status_task
=
'0'
...
...
pc_twitter.py
View file @
602bca1c
...
...
@@ -91,6 +91,13 @@ def reptile(browser=None, search_word=""):
link_str
=
""
timestamp
=
time
.
time
()
# 过滤时间
# # 如果'releaseTime'不是整数,则将其转换为整数
new_releaseTime
=
int
(
timestamp
)
if
new_releaseTime
<
filter_time_start
or
new_releaseTime
>
filter_time_end
:
# 如果'new_releaseTime'不在指定范围内,则跳过当前迭代,继续下一个项目
continue
author
=
element_authors_list
[
index
]
.
text
# 标题取:作者+日期
title
=
f
"{author}-{datetime.fromtimestamp(int(timestamp))}"
...
...
@@ -215,6 +222,8 @@ def main():
# 请求关键词
response
=
getReptileTask
()
global
status_task
global
filter_time_start
global
filter_time_end
if
response
[
'status_code'
]
==
200
and
response
[
'data'
][
'code'
]
==
200
:
log
.
debug
(
"call success"
)
search_word
=
""
...
...
@@ -223,6 +232,8 @@ def main():
search_word
=
item
[
'keyword'
]
table_name
=
item
[
'tableName'
]
status_task
=
int
(
item
[
"status"
])
filter_time_start
=
int
(
item
[
"beginFiltrationTime"
])
filter_time_end
=
int
(
item
[
"endFiltrationTime"
])
# 简体转繁体
if
status_task
==
0
and
len
(
search_word
)
>
0
:
reptile
(
None
,
convert_to_traditional
(
search_word
))
...
...
@@ -238,6 +249,10 @@ def main():
# 全局变量
data
=
[]
table_name
=
"pms_twitter"
# 过滤时间开始
filter_time_start
=
int
(
123
)
# 过滤时间结束
filter_time_end
=
int
(
123
)
file_dir
=
f
'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}'
# 是否启用
status_task
=
'0'
...
...
pc_youtube.py
View file @
602bca1c
...
...
@@ -28,31 +28,25 @@ def reptile(browser=None, search_word=""):
:param search_word:
:return:
"""
browser
=
browser
or
create
(
no_headless
=
Tru
e
,
using_user_data
=
False
)
browser
=
browser
or
create
(
no_headless
=
Fals
e
,
using_user_data
=
False
)
# print(browser)
# 打开网页
print
(
f
"搜索词:{search_word}"
)
url
=
f
'https://www.youtube.com/results?search_query={search_word}'
browser
.
get
(
url
)
# print(browser.page_source)
if
platform
.
system
()
==
"Linux"
:
time
.
sleep
(
3
)
else
:
wait
=
WebDriverWait
(
browser
,
10
)
wait
.
until
(
EC
.
presence_of_element_located
((
By
.
XPATH
,
"//div[@id='contents']"
)))
time
.
sleep
(
3
)
log
.
debug
(
"youtube login complete"
)
video_list
=
browser
.
find_elements
(
'xpath'
,
"//div[@id='contents']//ytd-video-renderer"
)
# print(video_list[0].get_attribute("outerHTML"))
length
=
len
(
video_list
)
for
index
in
range
(
length
):
for
index
,
item
in
enumerate
(
video_list
):
# 查找标题
author_element
=
video_list
[
index
]
.
find_element
(
"xpath"
,
"./div[1]/div/div[2]//ytd-channel-name//yt-formatted-string/a"
)
author_element
=
item
.
find_element
(
"xpath"
,
"./div[1]/div/div[2]//ytd-channel-name//yt-formatted-string/a"
)
# print(author_element.get_attribute("outerHTML"))
title_element
=
video_list
[
index
]
.
find_element
(
"xpath"
,
".//div[@id='title-wrapper']//a"
)
title_element
=
item
.
find_element
(
"xpath"
,
".//div[@id='title-wrapper']//a"
)
# print(title_element.get_attribute("outerHTML"))
time_element
=
video_list
[
index
]
.
find_element
(
"xpath"
,
".
//ytd-video-meta-block//div[@id='metadata-line']/span[2]"
)
time_element
=
item
.
find_element
(
"xpath"
,
"
//ytd-video-meta-block//div[@id='metadata-line']/span[2]"
)
# print(time_element.get_attribute("outerHTML"))
title
=
title_element
.
get_attribute
(
'title'
)
...
...
@@ -74,6 +68,14 @@ def reptile(browser=None, search_word=""):
releaseTime
=
str
(
int
(
convert_string_to_time
(
time_element
.
text
)))
except
:
releaseTime
=
str
(
int
(
time
.
time
()))
# 过滤时间
# # 如果'releaseTime'不是整数,则将其转换为整数
new_releaseTime
=
int
(
releaseTime
)
if
new_releaseTime
<
filter_time_start
or
new_releaseTime
>
filter_time_end
:
# 如果'new_releaseTime'不在指定范围内,则跳过当前迭代,继续下一个项目
continue
video_url
=
[]
# 下载地址
download_dir
=
f
'{os.path.join(file_dir, f"{id}.mp4")}'
...
...
@@ -131,6 +133,8 @@ def main():
# 请求关键词
response
=
getReptileTask
()
global
status_task
global
filter_time_start
global
filter_time_end
# print(response)
if
response
[
'status_code'
]
==
200
and
response
[
'data'
][
'code'
]
==
200
:
log
.
debug
(
"call success"
)
...
...
@@ -140,6 +144,8 @@ def main():
search_word
=
item
[
'keyword'
]
table_name
=
item
[
'tableName'
]
status_task
=
int
(
item
[
"status"
])
filter_time_start
=
int
(
item
[
"beginFiltrationTime"
])
filter_time_end
=
int
(
item
[
"endFiltrationTime"
])
# 简体转繁体
if
status_task
==
0
and
len
(
search_word
)
>
0
:
reptile
(
None
,
convert_to_traditional
(
search_word
))
...
...
@@ -155,6 +161,10 @@ def main():
# 全局变量
data
=
[]
table_name
=
"pms_youtube"
# 过滤时间开始
filter_time_start
=
int
(
123
)
# 过滤时间结束
filter_time_end
=
int
(
123
)
file_dir
=
f
'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}'
# 是否启用
status_task
=
'0'
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment