Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
N
network-assets-reptile
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
liyang
network-assets-reptile
Commits
db9c2b5b
Commit
db9c2b5b
authored
Jul 21, 2023
by
liyang
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
feat:dcard 爬虫
parent
e6a5eb4e
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
326 additions
and
24 deletions
+326
-24
.gitignore
.gitignore
+5
-1
.gitkeep
browser/.gitkeep
+0
-0
error.log
log/error.log
+1
-0
pc_dcard.py
pc_dcard.py
+226
-0
pc_facebook.py
pc_facebook.py
+2
-2
pc_youtube.py
pc_youtube.py
+6
-4
createBrowserDriver.py
utils/createBrowserDriver.py
+45
-11
index.py
utils/index.py
+41
-6
No files found.
.gitignore
View file @
db9c2b5b
...
...
@@ -12,4 +12,8 @@ info.log
reptile_data/**/*.json
*.mp4
*.webm
*.jpg
\ No newline at end of file
*.jpg
*.app
*.exe
*.deb
browser/*chrome*
\ No newline at end of file
browser/.gitkeep
0 → 100644
View file @
db9c2b5b
log/error.log
View file @
db9c2b5b
...
...
@@ -10,3 +10,4 @@
2023-07-13 16:39:23,710 ERROR pc_ptt.py : reptile [line: 66] xpath 找不到元素://div[@id='main-content']/div[3]//span[@class='article-meta-value']
2023-07-13 16:41:30,332 ERROR pc_ptt.py : reptile [line: 66] xpath 找不到元素://div[@id='main-content']/div[3]//span[@class='article-meta-value']
2023-07-13 16:43:37,394 ERROR pc_ptt.py : reptile [line: 67] xpath 找不到元素://div[@id='main-content']/div[3]//span[@class='article-meta-value']
2023-07-21 10:54:17,501 ERROR pc_ptt.py : reptile [line: 73] xpath 找不到元素://div[@id='main-content']/div[3]//span[@class='article-meta-value']
pc_dcard.py
0 → 100644
View file @
db9c2b5b
import
json
import
time
from
bs4
import
BeautifulSoup
from
utils.Logger
import
log
from
utils.createBrowserDriver
import
create
from
utils.filse
import
save_json
from
api.index
import
importJson
,
getReptileTask
,
importJsonPath
from
utils.index
import
convert_to_traditional
,
yt_dlp_download
,
convert_string_to_time
,
parse_twitter_time_string
,
\
is_base64_image
,
save_base64_image
,
get_screen_resolution
# from pytube import YouTube
from
selenium.common.exceptions
import
NoSuchElementException
import
os
import
sys
from
datetime
import
datetime
from
utils.download_image
import
download_image
from
config.settings
import
get_base_file_url
# --------------- selenium 依赖 start ----------------
from
selenium.webdriver.common.by
import
By
from
selenium.webdriver.support.ui
import
WebDriverWait
from
selenium.webdriver.support
import
expected_conditions
as
EC
# --------------- selenium 依赖 end ----------------
# 工具函数-下载图片
'''
打开指定网页,并使用 Selenium 模拟点击 "GDPR-accept" 按钮,然后循环点击 "search-show-more-button" 按钮来加载更多数据,直到按钮不再可点击为止。最后,获取完整的分页数据并关闭浏览器驱动。
'''
def
reptile
(
browser
=
None
,
search_word
=
""
):
"""
:param browser:
:param search_word:
"""
print
(
f
"搜索词:{search_word}"
)
base_url
=
"https://www.dcard.tw"
option
=
[
'--headless'
]
# ['--headless']
browser
=
browser
or
create
(
None
,
False
)
# 打开网页
# browser.get(base_url)
browser
.
get
(
f
"{base_url}/search?query={search_word}"
)
base_xpath
=
"//div[@role='main']//div[@data-key]//article"
# 内容块
element_content_list
=
browser
.
find_elements
(
'xpath'
,
base_xpath
)
# 作者
element_authors_list
=
browser
.
find_elements
(
'xpath'
,
f
"{base_xpath}/div[1]/div[1]/div[2]/div/div[1]"
)
# 时间
element_time_list
=
browser
.
find_elements
(
'xpath'
,
f
"{base_xpath}/div[1]/div[1]/div[2]/div/div[2]/time"
)
# 标题
element_title_list
=
browser
.
find_elements
(
'xpath'
,
f
"{base_xpath}//h2"
)
# 点赞
element_like_list
=
browser
.
find_elements
(
'xpath'
,
f
"{base_xpath}/div[3]/div[1]/div/div[2]"
)
# 评论
element_comment_list
=
browser
.
find_elements
(
'xpath'
,
f
"{base_xpath}/div[3]/div[2]/div/span"
)
length
=
len
(
element_content_list
)
for
index
in
range
(
length
):
# 提取时间,并转为时间戳
timestamp
=
datetime
.
fromisoformat
(
element_time_list
[
index
]
.
get_attribute
(
"datetime"
)[:
-
1
])
.
timestamp
()
# 提取作者
author
=
element_authors_list
[
index
]
.
text
# 提取标题
title
=
element_title_list
[
index
]
.
text
# 提取点赞
like
=
element_like_list
[
index
]
.
text
# 提取评论
comment
=
element_comment_list
[
index
]
.
text
# -------------提取内容---------------
element_content_list
[
index
]
.
click
()
# 等待弹窗内容出现,设置最长等待时间为10秒
wait
=
WebDriverWait
(
browser
,
10
)
# 通过 expected_conditions 来定义等待条件,这里以弹窗内容的某个元素为例
wait
.
until
(
EC
.
presence_of_element_located
((
By
.
XPATH
,
"//div[@data-testid='overlay']"
)))
time
.
sleep
(
3
)
click_dom
=
browser
.
find_element
(
"xpath"
,
"//div[@data-testid='overlay']"
)
# 处理弹窗内容加载失败的情况
try
:
browser
.
find_element
(
"xpath"
,
"//div[@data-testid='overlay']//h2[text()='發生錯誤']"
)
error_status
=
True
except
NoSuchElementException
:
error_status
=
False
if
error_status
:
# click_dom.click()
browser
.
back
()
time
.
sleep
(
0.5
)
# continue
close_button
=
browser
.
find_element
(
"xpath"
,
"//div[@data-testid='overlay']/div[2]/div[1]/div/div//button[@aria-label='關閉']"
)
content_element
=
browser
.
find_element
(
"xpath"
,
"//div[@data-testid='overlay']//article//div[3]"
)
soup
=
BeautifulSoup
(
content_element
.
get_attribute
(
"outerHTML"
),
"html.parser"
)
# 提取链接
link_str
=
browser
.
current_url
# ------------- 处理内容中的视频 start ------------------
video_list
=
soup
.
find_all
(
"video"
)
for
key
,
item
in
enumerate
(
video_list
):
item
[
'src'
]
=
""
# ------------- 处理内容中的视频 end ------------------
# ------------- 处理内容中的图片 start ------------------
picture_url
=
[]
image_list
=
soup
.
find_all
(
"img"
)
# if len(image_list) > 0:
# for key, element in enumerate(image_list):
# # 下载图片至本地,替换标签中的src
# id = str(int(time.time()))
# # 下载地址
# download_dir = f'{os.path.join(file_dir, f"{id}.jpg")}'
# # 访问地址
# access_address = f'{get_base_file_url()}{table_name.split("_")[1]}/{id}.jpg'
# if is_base64_image(element['src']) == False:
# log.debug("图片属于 url")
# # 下载状态
# status = download_image(element['src'], download_dir)
# if status:
# element['src'] = access_address
# picture_url.append(access_address)
# else:
# log.debug("图片属于 base64")
# # 下载base 64
# # 下载状态
# status = save_base64_image(element['src'], download_dir)
# if status:
# element['src'] = access_address
# picture_url.append(access_address)
#
# else:
# print("")
# ------------- 处理内容中的图片 end ------------------
content
=
soup
.
prettify
()
# ---------------- 判断类型 start ----------
# 类型
content_type
=
""
try
:
# 查找所有img标签
img_tags
=
soup
.
find_all
(
'img'
)
if
len
(
img_tags
)
>
0
:
content_type
=
"图文"
else
:
content_type
=
"文字"
except
:
content_type
=
"文字"
# ---------------- 判断类型 end ----------
# --------------- 组装数据 start---------------------
obj
=
{
"title"
:
title
,
"content"
:
content
,
"link"
:
link_str
,
"reptileTime"
:
str
(
int
(
time
.
time
())),
"type"
:
content_type
,
"author"
:
author
,
"releaseTime"
:
str
(
int
(
timestamp
)),
"picture_url"
:
","
.
join
(
picture_url
)
}
# --------------- 组装数据 end---------------------
data
.
append
(
obj
)
close_button
.
click
()
# 发送爬取数据到java服务
# print('----------------------')
# print(data)
if
len
(
data
)
>
0
:
# 保存json文件到本地
# log.debug(os.path.abspath("../"))
state_save
=
save_json
(
os
.
path
.
join
(
file_dir
,
str
(
int
(
time
.
time
()))
+
".json"
),
data
)
if
state_save
:
log
.
debug
(
'save file success'
)
else
:
log
.
debug
(
'save file failed'
)
else
:
# 爬取数据为空
log
.
info
(
"未爬取到数据"
)
# 关闭浏览器驱动
try
:
browser
.
close
()
browser
.
quit
()
except
:
log
.
debug
(
"浏览器驱动关闭失败"
)
sys
.
exit
()
def
main
():
"""
"""
# 请求关键词
response
=
getReptileTask
()
global
status_task
# print(response)
if
response
[
'status_code'
]
==
200
and
response
[
'data'
][
'code'
]
==
200
:
log
.
debug
(
"call success"
)
search_word
=
""
for
item
in
response
[
'data'
][
'rows'
]:
if
item
[
'name'
]
==
'dcard'
:
search_word
=
item
[
'keyword'
]
table_name
=
item
[
'tableName'
]
status_task
=
int
(
item
[
"status"
])
# 简体转繁体
if
status_task
==
0
and
len
(
search_word
)
>
0
:
reptile
(
None
,
convert_to_traditional
(
search_word
))
else
:
log
.
debug
(
"爬取任务未启用"
)
else
:
log
.
debug
(
"call failed"
)
# 请求超时
reptile
(
None
,
convert_to_traditional
(
"新闻"
))
# upload_control()
# 全局变量
data
=
[]
table_name
=
"pms_dcard"
file_dir
=
f
'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data", table_name.split("_")[1])}'
# 是否启用
status_task
=
'0'
# 调用main函数
main
()
pc_facebook.py
View file @
db9c2b5b
...
...
@@ -22,7 +22,7 @@ def reptile(browser=None, search_word=""):
print
(
f
"搜索词:{search_word}"
)
url
=
"https://www.facebook.com/"
option
=
[
'--headless'
]
browser
=
browser
or
create
(
option
,
True
)
browser
=
browser
or
create
(
None
,
True
)
# 打开网页
browser
.
get
(
url
)
try
:
...
...
@@ -34,7 +34,7 @@ def reptile(browser=None, search_word=""):
# 获取登录按钮
button_login
=
browser
.
find_element
(
'xpath'
,
"//button[@name='login']"
)
button_login
.
click
()
time
.
sleep
(
3
)
time
.
sleep
(
6
)
except
:
print
(
"已登录"
)
url
=
f
"https://www.facebook.com/search/top?q={search_word}"
...
...
pc_youtube.py
View file @
db9c2b5b
...
...
@@ -13,6 +13,7 @@ from config.settings import get_base_file_url
from
selenium.webdriver.common.action_chains
import
ActionChains
import
sys
def
reptile
(
browser
=
None
,
search_word
=
""
):
"""
...
...
@@ -21,7 +22,7 @@ def reptile(browser=None, search_word=""):
:return:
"""
option
=
[
'--headless'
]
browser
=
browser
or
create
([
'--headless'
],
True
)
browser
=
browser
or
create
([
'--headless'
],
True
)
# print(browser)
# 打开网页
url
=
f
'https://www.youtube.com/results?search_query={search_word}'
...
...
@@ -63,13 +64,13 @@ def reptile(browser=None, search_word=""):
"releaseTime"
:
releaseTime
}
data
.
append
(
obj
)
else
:
else
:
print
(
""
)
if
len
(
data
)
>
0
:
# 保存json文件到本地
# log.debug(os.path.abspath("../"))
state_save
=
save_json
(
os
.
path
.
join
(
file_dir
,
str
(
int
(
time
.
time
()))
+
".json"
),
data
)
state_save
=
save_json
(
os
.
path
.
join
(
file_dir
,
str
(
int
(
time
.
time
()))
+
".json"
),
data
)
if
state_save
:
log
.
debug
(
'save file success'
)
else
:
...
...
@@ -87,6 +88,7 @@ def reptile(browser=None, search_word=""):
log
.
debug
(
"浏览器驱动关闭失败"
)
sys
.
exit
()
def
main
():
"""
...
...
@@ -118,7 +120,7 @@ def main():
# 全局变量
data
=
[]
table_name
=
"pms_youtube"
file_dir
=
f
'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data",table_name.split("_")[1])}'
file_dir
=
f
'{os.path.join(os.path.abspath("../"), "network-assets-reptile", "reptile_data",
table_name.split("_")[1])}'
# 是否启用
status_task
=
'0'
# 调用main函数
...
...
utils/createBrowserDriver.py
View file @
db9c2b5b
import
os
import
platform
import
sys
from
selenium
import
webdriver
from
selenium.webdriver.chrome.options
import
Options
from
selenium.webdriver.chrome.service
import
Service
from
selenium.webdriver.common.by
import
By
from
selenium.webdriver.support
import
expected_conditions
as
EC
# --------------- selenium 依赖 start ----------------
from
selenium.webdriver.support.ui
import
WebDriverWait
from
selenium.webdriver.support
import
expected_conditions
as
EC
from
webdriver_manager.chrome
import
ChromeDriverManager
from
selenium.webdriver.firefox.firefox_profile
import
FirefoxProfile
from
selenium.webdriver.chrome.service
import
Service
as
ChromeService
from
selenium.webdriver.chrome.service
import
Service
as
ChromiumService
from
webdriver_manager.chrome
import
ChromeDriverManager
from
webdriver_manager.core.utils
import
ChromeType
from
selenium.webdriver.firefox.service
import
Service
as
FirefoxService
from
webdriver_manager.firefox
import
GeckoDriverManager
# --------------- selenium 依赖 end ----------------
from
utils.index
import
get_screen_resolution
# from mozprofile import FirefoxProfile
'''
...
...
@@ -48,18 +51,25 @@ def create(option=None, using_user_data=True, web_browser="firefox"):
if
using_user_data
:
# 添加用户数据目录参数,启用浏览器的持久性会话,可以保存登录状态和Cookie
if
web_browser
==
"firefox"
:
firefox_profile_path
=
user_data_dir
# 将此处替换为你的Firefox用户数据目录路径
profile
=
FirefoxProfile
(
profile_directory
=
firefox_profile_path
)
# 将此处替换为你的Firefox用户数据目录路径
profile
=
FirefoxProfile
(
profile_directory
=
user_data_dir
)
options
.
profile
=
profile
# options.add_argument(f'--user-data-dir={user_data_dir}')
el
se
:
el
if
web_browser
==
"chrome"
:
options
.
add_argument
(
f
'--user-data-dir={user_data_dir}'
)
elif
web_browser
==
"chromium"
:
options
.
add_argument
(
f
'--user-data-dir={user_data_dir}'
)
elif
web_browser
==
"chrome_test"
:
options
.
add_argument
(
f
'--user-data-dir={user_data_dir}'
)
else
:
print
(
""
)
# if sys.platform.startswith('linux'):
# print("当前系统是 Linux")
# linux下运行记得加上这些参数 ----------------------------
# chrome_options.add_argument('--headless')
# options.add_argument("--window-size=1920x1080") # 设置窗口大小,这是一个常见的完全无头模式的设置
# options.add_argument("--start-maximized") # 最大化窗口
options
.
add_argument
(
'--no-sandbox'
)
options
.
add_argument
(
'--disable-gpu'
)
options
.
add_argument
(
'--disable-dev-shm-usage'
)
...
...
@@ -70,8 +80,32 @@ def create(option=None, using_user_data=True, web_browser="firefox"):
if
web_browser
==
"firefox"
:
# 创建Firefox浏览器对象并传入选
browser
=
webdriver
.
Firefox
(
options
=
options
,
service
=
FirefoxService
(
GeckoDriverManager
()
.
install
()))
el
se
:
el
if
web_browser
==
"chrome"
:
# 创建Chrome浏览器对象并传入选项
browser
=
webdriver
.
Chrome
(
options
=
options
,
service
=
ChromeService
(
ChromeDriverManager
()
.
install
()))
web_browser
=
webdriver
.
Chrome
(
options
=
options
,
service
=
ChromeService
(
ChromeDriverManager
()
.
install
()))
elif
web_browser
==
"chrome_test"
:
binary_location
=
""
if
platform
.
system
()
==
"Windows"
:
binary_location
=
os
.
path
.
join
(
os
.
path
.
abspath
(
"../"
),
'network-assets-reptile'
,
'browser'
,
"chrome-win64"
,
"chrome"
)
elif
platform
.
system
()
==
"Linux"
:
binary_location
=
os
.
path
.
join
(
os
.
path
.
abspath
(
"../"
),
'network-assets-reptile'
,
'browser'
,
"chrome-linux64"
,
"chrome"
)
elif
platform
.
system
()
==
"Darwin"
:
binary_location
=
os
.
path
.
join
(
os
.
path
.
abspath
(
"../"
),
'network-assets-reptile'
,
'browser'
,
"chrome-mac-x64"
,
"chrome"
)
else
:
print
(
""
)
# 指定浏览器路径
print
(
binary_location
)
options
.
binary_location
=
binary_location
browser
=
webdriver
.
Chrome
(
options
=
options
,
service
=
ChromiumService
(
ChromeDriverManager
(
chrome_type
=
ChromeType
.
CHROMIUM
)
.
install
()))
else
:
print
(
""
)
# 获取屏幕分辨率
width
,
height
=
get_screen_resolution
()
# 设置窗口大小为全屏
browser
.
set_window_size
(
width
,
height
)
return
browser
utils/index.py
View file @
db9c2b5b
...
...
@@ -3,18 +3,53 @@ import re
import
time
import
opencc
import
datetime
from
pytube
import
YouTube
import
ssl
import
subprocess
from
utils.Logger
import
log
import
datetime
import
utils.Logger
from
dateutil
import
parser
import
base64
import
tkinter
as
tk
import
io
from
PIL
import
Image
# from dateutil.relativedelta import relativedelta
ssl
.
_create_default_https_context
=
ssl
.
_create_stdlib_context
def
get_screen_resolution
():
try
:
root
=
tk
.
Tk
()
screen_width
=
root
.
winfo_screenwidth
()
screen_height
=
root
.
winfo_screenheight
()
root
.
destroy
()
# 关闭tkinter窗口
return
screen_width
,
screen_height
except
tk
.
TclError
as
e
:
print
(
"无法获取屏幕分辨率:"
,
e
)
return
1920
,
1080
def
save_base64_image
(
base64_string
,
file_path
):
try
:
# 解码base64数据
image_data
=
base64
.
b64decode
(
base64_string
.
split
(
','
)[
1
])
# 将解码后的数据写入图片文件
with
open
(
file_path
,
"wb"
)
as
file
:
file
.
write
(
image_data
)
# print(f"图片下载成功:{save_path}")
return
True
except
Exception
as
e
:
print
(
f
"图片下载失败:{file_path}"
)
return
False
def
is_base64_image
(
url
):
pattern
=
re
.
compile
(
r'^data:image\/[a-z]+;base64,'
)
return
bool
(
pattern
.
match
(
url
))
# 转换 facebook 的时间
def
parse_time_string
(
time_str
):
"""
...
...
@@ -57,17 +92,17 @@ def convert_string_to_time(string):
if
"天前"
in
string
:
days
=
int
(
string
.
split
(
"天前"
)[
0
])
converted_time
=
current_time
-
timedelta
(
days
=
days
)
converted_time
=
current_time
-
datetime
.
timedelta
(
days
=
days
)
elif
"周前"
in
string
:
weeks
=
int
(
string
.
split
(
"周前"
)[
0
])
converted_time
=
current_time
-
timedelta
(
weeks
=
weeks
)
converted_time
=
current_time
-
datetime
.
timedelta
(
weeks
=
weeks
)
elif
"月前"
in
string
:
cleaned_string
=
re
.
sub
(
r'\D'
,
''
,
string
.
split
(
"月前"
)[
0
])
months
=
int
(
cleaned_string
)
converted_time
=
current_time
-
timedelta
(
days
=
months
*
30
)
converted_time
=
current_time
-
datetime
.
timedelta
(
days
=
months
*
30
)
elif
"年前"
in
string
:
years
=
int
(
string
.
split
(
"年前"
)[
0
])
converted_time
=
current_time
-
timedelta
(
days
=
years
*
365
)
converted_time
=
current_time
-
datetime
.
timedelta
(
days
=
years
*
365
)
else
:
raise
ValueError
(
"Invalid string format"
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment