Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
N
network-assets-reptile
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
liyang
network-assets-reptile
Commits
ab634c7f
Commit
ab634c7f
authored
Jul 13, 2023
by
liyang
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
fix:爬取twitter
parent
ab99c057
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
69 additions
and
84 deletions
+69
-84
pc_ptt.py
pc_ptt.py
+19
-7
pc_twitter.py
pc_twitter.py
+42
-76
pc_youtube.py
pc_youtube.py
+8
-1
No files found.
pc_ptt.py
View file @
ab634c7f
...
...
@@ -47,7 +47,7 @@ def reptile(browser=None, search_word=""):
for
index_two
in
range
(
length_two
):
# 标题不包含"公告"和"看板"
if
re
.
findall
(
"公告"
,
element_list
[
index_two
]
.
text
)
or
re
.
findall
(
"看板"
,
element_list
[
index_two
]
.
text
):
a
=
1
a
=
1
else
:
log
.
debug
(
f
"正在爬取分类:{type_title}-第{index_two + 1}条"
)
# 使用正则表达式进行匹配
...
...
@@ -67,7 +67,8 @@ def reptile(browser=None, search_word=""):
"//div[@id='main-content']/div[3]//span[@class='article-meta-value']"
)
except
:
log
.
error
(
"xpath 找不到元素://div[@id='main-content']/div[3]//span[@class='article-meta-value']"
)
log
.
error
(
"xpath 找不到元素://div[@id='main-content']/div[3]//span[@class='article-meta-value']"
)
log
.
debug
(
f
'页面链接:{browser_current_url}'
)
# 浏览器返回上一页
browser
.
back
()
...
...
@@ -117,7 +118,7 @@ def reptile(browser=None, search_word=""):
tag
.
decompose
()
except
:
# log.debug("查找所有的<a>标签失败")
a
=
1
a
=
1
try
:
# 找到所有第一级标签为 `div` 的元素
div_elements
=
soup
.
find_all
(
'div'
)
...
...
@@ -133,7 +134,7 @@ def reptile(browser=None, search_word=""):
except
:
# log.debug("删除第一级div失败")
a
=
2
a
=
2
html
=
soup
.
prettify
()
.
replace
(
'amp;'
,
''
)
# ------------------ content 过滤 end--------------
...
...
@@ -149,7 +150,7 @@ def reptile(browser=None, search_word=""):
}
# --------------- 组装数据 end---------------------
if
search_word
is
None
or
search_word
==
str
(
search_word
):
if
search_word
is
None
or
search_word
==
str
(
search_word
):
data
.
append
(
obj
)
else
:
# 使用正则表达式进行匹配
...
...
@@ -161,7 +162,7 @@ def reptile(browser=None, search_word=""):
data
.
append
(
obj
)
else
:
# log.debug("未找到匹配的字符串")
a
=
3
a
=
3
# 浏览器返回上一页
browser
.
back
()
element_list
=
browser
.
find_elements
(
'xpath'
,
"//div[@class='r-ent']//div[@class='title']//a"
)
...
...
@@ -206,9 +207,14 @@ def reptile(browser=None, search_word=""):
# time.sleep(3)
browser
.
quit
()
def
main
():
"""
"""
# 请求关键词
response
=
getReptileTask
()
global
status_task
# print(response)
if
response
[
'status_code'
]
==
200
and
response
[
'data'
][
'code'
]
==
200
:
log
.
debug
(
"call success"
)
...
...
@@ -217,8 +223,12 @@ def main():
if
item
[
'name'
]
==
'ptt'
:
search_word
=
item
[
'keyword'
]
table_name
=
item
[
'tableName'
]
status_task
=
item
[
"status"
]
# 简体转繁体
if
status_task
==
0
:
reptile
(
None
,
convert_to_traditional
(
search_word
))
else
:
log
.
debug
(
"爬取任务未启用"
)
else
:
log
.
debug
(
"call failed"
)
reptile
(
None
,
''
)
...
...
@@ -228,5 +238,7 @@ def main():
# 全局变量
data
=
[]
table_name
=
"pms_ptt"
# 是否启用
status_task
=
'0'
# 调用main函数
main
()
pc_twitter.py
View file @
ab634c7f
...
...
@@ -22,85 +22,51 @@ def reptile(browser=None, search_word=""):
# ['--headless']
browser
=
browser
or
create
()
# print(browser)
# browser = webdriver.Firefox(executable_path='/usr/local/bin/geckodriver')
# endDate = startDate = startDate + timedelta(days=i)
# 打开网页
browser
.
get
(
url
)
# WebDriverWait(browser,10).
# 打开登录窗口
# open_button_login = WebDriverWait(browser, 10).until(
# EC.presence_of_element_located((By.XPATH, "//a[@data-testid='login']")))
# open_button_login.click()
# time.sleep(5)
# 获取账号密码输入框
# input_email_element = WebDriverWait(browser, 10).until(
# EC.presence_of_element_located((By.XPATH, "//input[@autocomplete='username']")))
# # 获取下一步按钮
# buttons = WebDriverWait(browser, 10).until(EC.presence_of_all_elements_located((By.XPATH, "//div[@role='button']")))
# for item in buttons:
# print(BeautifulSoup(item, 'html.parser'))
# soup = BeautifulSoup(page_content, 'html.parser')
# input_pwd_element = WebDriverWait(browser, 10).until(EC.presence_of_element_located((By.XPATH, "//input[@name='pass']")))
# # 获取登录按钮
# button_login = WebDriverWait(browser, 10).until(EC.element_to_be_clickable((By.XPATH, "//button[@name='login']")))
#
# input_email_element.send_keys("liyang19970814@gmail.com")
# input_pwd_element.send_keys("xn89kiPT/^Kaeg#")
# button_login.click()
# print("---------------")
# print(input_email_element)
# print(input_pwd_element)
# print(button_login)
# logger.debug(button)
# 模拟点击按钮多次加载更多数据
# while button.is_enabled():
# time.sleep(2) # 等待一段时间,确保页面加载完毕
# try:
# button.click()
# button = WebDriverWait(browser, 5).until(
# EC.element_to_be_clickable((By.XPATH, "//button[@data-testid='search-show-more-button']")))
# except:
# break
time
.
sleep
(
3
)
try
:
# 检测是否要登录
login_input
=
browser
.
find_element
(
'xpath'
,
"//input[@autocomplete='username']"
)
login_input
.
send_keys
(
"liyang1851603"
)
# 获取下一步按钮
buttons
=
browser
.
find_element
(
'xpath'
,
"//div[@role='button'][2]"
)
buttons
.
click
()
time
.
sleep
(
3
)
password_input
=
browser
.
find_element
(
'xpath'
,
"//input[@autocomplete='current-password']"
)
password_input
.
send_keys
(
"liyang19970814"
)
# 获取登录按钮
button_login
=
browser
.
find_element
(
'xpath'
,
"//div[@data-testid='LoginForm_Login_Button']"
)
button_login
.
click
()
except
:
print
(
"------"
)
# print(333333)
# time.sleep(3)
# 获取完整的分页数据
# page_content = browser.page_source
# soup = BeautifulSoup(page_content, 'html.parser')
# print("----------")
# print(soup)
# list_news = soup.find_all('li', {"class": "css-1l4w6pd"})
# for index, item in enumerate(list_news):
# logger.debug(item)
# # 抓取图片
# image_key = image_key + 1
# url_element = item.find('img', {"class": "css-rq4mmj"})
# image_url = url_element['src'] if url_element else ""
# # logger.debug(url)
# if image_url:
# # logger.debug(url)
# # # 下载图片
# #
# filename = f"{image_key}.jpg"
# # logger.debug(filename)
# # sys.exit()
# download_image(image_url, f'{fileDir}images/{filename}')
# # 抓取文字
# title_element = item.find('h4', {"class": "css-2fgx4k"})
# introduction_element = item.find('p', {"class": "css-16nhkrn"})
# title = title_element.get_text() if title_element else ""
# introduction = introduction_element.get_text() if introduction_element else ""
# news = {
# "title": title,
# "introduction": introduction,
# "imageName": filename
# }
# data.append(news)
# logger.debug(data)
# 将数据保存到文件中
# with open(f'{fileDir}data.json', "w", encoding="utf-8") as file:
# json.dump(data, file, indent=2, ensure_ascii=False)
# 作者
element_authors_list
=
browser
.
find_elements
(
'xpath'
,
"//div[@data-testid='cellInnerDiv']//article//div[@data-testid='User-Name']//a[@role='link']//div[@dir='ltr']"
)
print
(
element_authors_list
)
print
(
"2222"
)
# 发布时间
element_release_list
=
browser
.
find_elements
(
'xpath'
,
"//div[@data-testid='cellInnerDiv']//article//div[@data-testid='User-Name']//div[2]//time[@datetime]"
)
# 标题
# element_title_list = browser.find_element('xpath',)
# 内容
element_content_list
=
browser
.
find_elements
(
'xpath'
,
"//div[@data-testid='cellInnerDiv']//article/div/div/div[2]/div[2]"
)
# print(element_content_list)
length
=
len
(
element_authors_list
)
print
(
length
)
for
index
in
range
(
length
):
author
=
element_authors_list
[
index
]
.
text
release_time
=
element_release_list
[
index
]
.
get_attribute
(
"datetime"
)
content
=
element_content_list
[
index
]
print
(
content
)
# 内容过滤
# 使用BeautifulSoup解析HTML
soup
=
BeautifulSoup
(
content
.
get_attribute
(
"innerHTML"
),
'html.parser'
)
print
(
soup
)
print
(
"-----"
)
# browser.close()
# # 关闭浏览器驱动
# browser.quit()
...
...
pc_youtube.py
View file @
ab634c7f
...
...
@@ -125,6 +125,7 @@ def main():
# 请求关键词
response
=
getReptileTask
()
# print(response)
global
status_task
if
response
[
'status_code'
]
==
200
and
response
[
'data'
][
'code'
]
==
200
:
log
.
debug
(
"call success"
)
search_word
=
""
...
...
@@ -132,7 +133,11 @@ def main():
if
item
[
'name'
]
==
'youtube'
:
search_word
=
item
[
'keyword'
]
table_name
=
item
[
'tableName'
]
status_task
=
item
[
"status"
]
if
status_task
==
0
:
reptile
(
None
,
convert_to_traditional
(
search_word
))
else
:
log
.
debug
(
"爬取任务未启用"
)
else
:
log
.
debug
(
"call failed"
)
reptile
(
None
,
''
)
...
...
@@ -142,5 +147,7 @@ def main():
# 全局变量
data
=
[]
table_name
=
"pms_youtube"
# 是否启用
status_task
=
'0'
# 调用main函数
main
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment