Commit 8708ce28 authored by liyang's avatar liyang

feat:init

parent 37523f73
reptile_data/files
*.zip
*.json
.DS_Store
venv
reptile_data
*.idea
\ No newline at end of file
import requests
headers = {}
baser_url = "http://192.168.0.127:8081/"
def importJson(file, form_data):
http_url = baser_url + "importJson/import"
response = requests.post(http_url, headers=headers, files={"file": file}, data=form_data)
# 将响应内容解析为 JSON 格式
return {"status_code": response.status_code, "data": response.json()}
def getReptileTask():
http_url = baser_url + "crawlerSetting/list"
response = requests.get(http_url, headers=headers)
# 将响应内容解析为 JSON 格式
return {"status_code": response.status_code, "data": response.json()}
\ No newline at end of file
This diff is collapsed.
# 日志文件路径
def get_log_path():
return "../"
import pymysql.cursors
# Connect to the database
connection = pymysql.connect(host='localhost',
user='user',
password='passwd',
database='db',
cursorclass=pymysql.cursors.DictCursor)
with connection:
with connection.cursor() as cursor:
# Create a new record
sql = "INSERT INTO `users` (`email`, `password`) VALUES (%s, %s)"
cursor.execute(sql, ('webmaster@python.org', 'very-secret'))
# connection is not autocommit by default. So you must commit to save
# your changes.
connection.commit()
with connection.cursor() as cursor:
# Read a single record
sql = "SELECT `id`, `password` FROM `users` WHERE `email`=%s"
cursor.execute(sql, ('webmaster@python.org',))
result = cursor.fetchone()
print(result)
import json
import random
# 标题
CATEGORIES = [
"身体健康",
"心理健康",
"精神健康",
"泰国马沙文化咖喱",
"意大利那不勒斯披萨",
"墨西哥巧克力",
"日本寿司",
"马来西亚槟榔屿叻沙汤",
"中国北京烤鸭",
"德国汉堡",
"泰国冬阴功汤",
"美国冰激凌",
"蓬鸡肉曼巴",
"威尼斯旅游",
"阿姆斯特丹旅游",
"马尔代夫旅游",
"迪拜旅游",
"新加坡旅游",
"巴厘岛旅游",
"普吉岛旅游",
"夏威夷旅游",
"普罗旺斯旅游",
"毛里求斯旅游",
"国际新闻",
"国内新闻",
"政治新闻",
"经济新闻",
"财经新闻",
"科技新闻",
"娱乐新闻",
"体育新闻",
"文化艺术新闻",
"社会新闻",
"健康新闻",
"教育新闻",
"环境新闻",
"科学新闻",
"汽车新闻",
"房地产新闻",
"旅游新闻",
"美食新闻",
"时尚新闻",
"宏观经济",
"金融市场",
"公司业绩",
"国际贸易",
"就业市场",
"创新科技",
"房地产市场",
"能源与资源",
"农业与食品",
"产业与区域经济",
"一帆风顺",
"一劳永逸",
"一蹴而就",
"一视同仁",
"一丝不苟",
"四面楚歌",
"十全十美",
"百发百中",
"半途而废",
"画蛇添足",
"亡羊补牢",
"自相矛盾",
"雪中送炭",
"虎头蛇尾",
"掩耳盗铃",
"心灵手巧",
"言而无信",
"对牛弹琴",
"大言不惭",
"以身作则",
"守株待兔",
"有眼无珠",
"闭门造车",
"画龙点睛",
"一箭双雕",
"东山再起",
"狗尾续貂",
"珠光宝气",
"名不虚传",
"笑里藏刀",
"独一无二",
"千军万马",
"坐井观天",
"如鱼得水",
"束手无策",
"井底之蛙",
"守口如瓶",
"自由自在",
"卧薪尝胆",
"锦上添花",
"才高八斗",
"扬眉吐气",
"黔驴技穷",
"寸步不离",
"骑虎难下",
"一诺千金",
"心有灵犀",
"不可思议",
"班门弄斧",
"无独有偶",
"声东击西",
"目瞪口呆",
"画饼充饥",
"面面俱到",
"青梅竹马",
"鱼贯而入",
"纸上谈兵",
"风和日丽",
"开门见山",
"九牛一毛",
"天方夜谭",
"刮目相看",
"柳暗花明",
"卧虎藏龙",
"画地为牢",
"万紫千红",
"风驰电掣",
"鸟语花香",
"才子佳人",
"金玉满堂",
"情同手足",
"东张西望",
"返老还童",
"左右为难",
"一知半解",
"白纸黑字",
"枝繁叶茂",
"宜将剩勇追穷寇",
"腾讯",
"阿里巴巴",
"华为",
"百度",
"小米",
"京东",
"美团",
"滴滴",
"字节",
"苏宁",
"恒大",
"中信",
"万科",
"民生银行",
"平安保险",
"复星",
"联想",
"迅雷",
"宝能投资",
"长虹",
"华兴银行",
"海尔",
"兴业银行",
"金科",
"掌趣",
"广汽",
"红星美凯龙",
"新东方",
"海康威视",
"正威",
"世纪佳缘",
"亚洲石油",
"伊利",
"绿地",
"恒大地产",
"三一",
"华润",
"中国联通",
"洋河股份",
"碧桂园",
"潍柴动力",
"阳光城",
"葛洲坝",
"中粮",
"南方航空",
"浙商银行",
"欧亚集团",
"瑞幸咖啡",
"顺丰",
"万达",
"江苏宏宝",
"巨人网络",
"掌阅科技",
"中化",
"海尔智家",
"三安光电",
"招商银行",
"金科股份",
"完美世界",
"药明康德",
"卓越",
"中国重工",
"中远海发",
"东方明珠",
"和而泰",
"新华保险",
"传化智联",
"同仁堂",
"中国民航",
"拓普集团",
"大华股份",
"乐视网",
"航天科技",
"泸州老窖",
"国美",
"新城控股",
"华泰证券",
"新潮能源",
"海螺水泥",
"乐视致新",
"大商股份",
"中航工业",
"新东方在线",
"九芝堂",
"众安在线",
"白云山",
"光明乳业",
"美的集团",
"久其软件",
"仁和药业",
"华夏幸福",
"国药控股",
"博雅生物",
"粤高速",
"国信证券",
"上汽集团",
"世界上最值得住的房子",
"我,想去哈尔滨",
"景天科趣味应用与品种类别",
"飞碟幻想消亡史",
"传承灿烂中华文明",
"向世界展示“中国精彩”"
]
# 简介
# 内容
contentList = [
"中新网乌鲁木齐6月11日电 6月10日,新疆昌吉州第二届油菜花观光旅游节开幕。 此次活动以千亩油菜花田为载体,打造一场“赏、游、学、玩、摄、食、购”全身心贴近大自然,放松体验互动式观光游览节。",
"人民网北京6月11日电 (记者孙博洋)记者从市场监管总局获悉,6月9日,在第十六个世界认可日主题活动上,中国合格评定国家认可委员会(CNAS)发布认可工作服务质量强国建设促进贸易便利化十项措施。 "
]
def generate_test_data():
data = []
for a in range(100):
print("生成进度:" + str(a + 1) + "%")
for b in range(10000):
category = random.choice(CATEGORIES) + "【测试数据】"
summary = category
content = random.choice(contentList) # 从数组中随机选择一项
item = {
"title": category,
"summary": summary,
"content": content,
"appId": 106845,
"appName": "军科研战",
"notifyId": b + random.randint(100, 9999999) + 101
}
data.append(item)
return data
test_data = generate_test_data()
print("数组长度:" + str(len(test_data)))
output = test_data
# 将数据保存到文件中
with open("reptile_data/mobileGatewayTestData.json", "w", encoding="utf-8") as file:
json.dump(output, file, indent=2, ensure_ascii=False)
import requests
from bs4 import BeautifulSoup
# 设置要爬取图片的 URL
url = 'https://www.woyaogexing.com/touxiang/'
# 发送 HTTP 请求并解析页面内容
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
# 获取所有图片链接
img_links = []
# print(soup)
for img in soup.find_all('img'):
img_links.append("https:" + img.get('src'))
# 创建保存图片的文件夹
# if not os.path.exists('images'):
# os.makedirs('images')
# print(img_links)
# 下载图片并保存到本地
dir = "/Users/macosx/Desktop/项目文档/python-demo/Crawlmages/avatar"
for link in img_links:
response = requests.get(link)
filename = link.split('/')[-1]
with open(f'{dir}/{filename}', 'wb') as f:
f.write(response.content)
from apscheduler.schedulers.blocking import BlockingScheduler
from reptile_task import reptile_task
def job():
reptile_task()
scheduler = BlockingScheduler()
scheduler.add_job(job, 'interval', seconds=10) # 每隔10秒执行一次任务
scheduler.start()
import tkinter as tk
from pc_twitter import reptile
from utils.createBrowserDriver import create
def button_click():
user_input = entry.get()
cookies = browser.get_cookies()
session_id = browser.session_id
if len(user_input) > 0:
label.configure(text="Button Clicked!")
reptile(browser)
browser = create("https://twitter.com/home?query=%E6%B0%B4")
window = tk.Tk()
window.geometry("400x300")
# 创建输入框
entry = tk.Entry(window)
entry.pack()
label = tk.Label(window, text="Hello, Tkinter!")
label.pack()
button = tk.Button(window, text="开始爬取", command=button_click)
button.pack()
window.mainloop()
# Register your models here.
from django.apps import AppConfig
class AdminConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'admin'
# Create your models here.
# Create your tests here.
from django.urls import path
from . import views
urlpatterns = [
path("", views.index, name="index"),
path("user", views.user, name="user"),
path("pc", views.pc, name="pc"),
]
import http.client
import json
import sys
import time
import pymysql.cursors
from django.http import HttpResponse
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options
def index(request):
return HttpResponse("Hello, world. You're at the polls index.")
def user(request):
# Connect to the database
connection = pymysql.connect(host='10.211.55.34',
user='root',
password='123456',
database='test',
cursorclass=pymysql.cursors.DictCursor)
with connection:
# with connection.cursor() as cursor:
# # Create a new record
# sql = "INSERT INTO `users` (`email`, `password`) VALUES (%s, %s)"
# cursor.execute(sql, ('webmaster@python.org', 'very-secret'))
#
# # connection is not autocommit by default. So you must commit to save
# # your changes.
# connection.commit()
print("已连接")
with connection.cursor() as cursor:
# Read a single record
sql = "SELECT `id`,`password`,`name` FROM `user`"
cursor.execute(sql)
result = cursor.fetchone()
print(result)
return HttpResponse(json.dumps(result))
cursor.close()
connection.close()
def pc(request):
# 工具函数-下载图片
'''
打开指定网页,并使用 Selenium 模拟点击 "GDPR-accept" 按钮,然后循环点击 "search-show-more-button" 按钮来加载更多数据,直到按钮不再可点击为止。最后,获取完整的分页数据并关闭浏览器驱动。
'''
# # json 数据
data = []
image_key = 0
fileDir = "./reptile_data/news/nytimes/"
# year = datetime(2021, 1, 1)
# startDate = datetime(2020, 12, 31) # 初始日期
# endDate = datetime(2020, 12, 31) # 结束日期
url = "https://twitter.com/"
if sys.platform.startswith('linux'):
# print("当前系统是 Linux")
# linunx 下加载驱动
# 加载谷歌浏览器驱动
chrome_options = Options()
# linux下运行记得加上这些参数 ----------------------------
chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--disable-dev-shm-usage')
# -----------------------------------------------------
# 加载chromedriver -------------------------------------------------
# windows 下的 chromedriver 默认加载路径是当前路径下的 chromedriver.exe
# linux 下的 chromedriver 默认加载路径是 /usr/bin/chromedriver
# 当然也可以通过 executable_path 自定义
browser = webdriver.Chrome(options=chrome_options)
# -----------------------------------------------------------------
else:
# print("当前系统不是 Linux")
# 创建浏览器驱动对象
browser = webdriver.Chrome()
print(browser)
# browser = webdriver.Firefox(executable_path='/usr/local/bin/geckodriver')
# endDate = startDate = startDate + timedelta(days=i)
# 打开网页
browser.get(url)
# WebDriverWait(browser,10).
# 打开登录窗口
open_button_login = WebDriverWait(browser, 10).until(
EC.presence_of_element_located((By.XPATH, "//a[@data-testid='login']")))
open_button_login.click()
time.sleep(5)
# 获取账号密码输入框
input_email_element = WebDriverWait(browser, 10).until(
EC.presence_of_element_located((By.XPATH, "//input[@autocomplete='username']")))
# 获取下一步按钮
buttons = WebDriverWait(browser, 10).until(EC.presence_of_all_elements_located((By.XPATH, "//div[@role='button']")))
# for item in buttons:
# print(BeautifulSoup(item, 'html.parser'))
page_content = browser.page_source
soup = BeautifulSoup(page_content, 'html.parser')
browser.close()
# 关闭浏览器驱动
# browser.quit()
print(soup)
return HttpResponse(soup)
#!/usr/bin/env python
"""Django's command-line utility for administrative tasks."""
import os
import sys
def main():
"""Run administrative tasks."""
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')
try:
from django.core.management import execute_from_command_line
except ImportError as exc:
raise ImportError(
"Couldn't import Django. Are you sure it's installed and "
"available on your PYTHONPATH environment variable? Did you "
"forget to activate a virtual environment?"
) from exc
execute_from_command_line(sys.argv)
if __name__ == '__main__':
main()
import pymysql
pymysql.install_as_MySQLdb()
"""
ASGI config for mysite project.
It exposes the ASGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/4.2/howto/deployment/asgi/
"""
import os
from django.core.asgi import get_asgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')
application = get_asgi_application()
"""
Django settings for mysite project.
Generated by 'django-admin startproject' using Django 4.2.2.
For more information on this file, see
https://docs.djangoproject.com/en/4.2/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/4.2/ref/settings/
"""
import os
from pathlib import Path
# 默认端口设置
# 如果没有设置`PORT`环境变量,则使用8000端口
# 可以通过在终端中运行`export PORT=8001`来设置其他端口
DEFAULT_PORT = os.getenv('PORT', 80001)
# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/4.2/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = 'django-insecure-xx_0ss1znvbf8)_fnhv41no80fz#j0(wj*-1m6z$jv@!c0s6^$'
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = False
# 如果设置DEBUG为False,还需要正确设置该ALLOWED_HOSTS设置。
# '10.211.55.34'
ALLOWED_HOSTS = ['127.0.0.1', 'localhost']
# Application definition
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
ROOT_URLCONF = 'mysite.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
WSGI_APPLICATION = 'mysite.wsgi.application'
# Database
# https://docs.djangoproject.com/en/4.2/ref/settings/#databases
# DATABASES = {
# 'default': {
# # 'ENGINE': 'django.db.backends.sqlite3',
# 'ENGINE': 'django.db.backends.mysql',
# 'NAME': BASE_DIR / 'db.sqlite3',
# }
# }
# Password validation
# https://docs.djangoproject.com/en/4.2/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
# Internationalization
# https://docs.djangoproject.com/en/4.2/topics/i18n/
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'UTC'
USE_I18N = True
USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/4.2/howto/static-files/
STATIC_URL = 'static/'
# Default primary key field type
# https://docs.djangoproject.com/en/4.2/ref/settings/#default-auto-field
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
"""
URL configuration for mysite project.
The `urlpatterns` list routes URLs to views. For more information please see:
https://docs.djangoproject.com/en/4.2/topics/http/urls/
Examples:
Function views
1. Add an import: from my_app import views
2. Add a URL to urlpatterns: path('', views.home, name='home')
Class-based views
1. Add an import: from other_app.views import Home
2. Add a URL to urlpatterns: path('', Home.as_view(), name='home')
Including another URLconf
1. Import the include() function: from django.urls import include, path
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
"""
from django.urls import path, include
urlpatterns = [
path("admin/", include("admin.urls")),
# path('admin/', admin.site.urls),
]
"""
WSGI config for mysite project.
It exposes the WSGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/4.2/howto/deployment/wsgi/
"""
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')
application = get_wsgi_application()
APScheduler==3.10.1
asgiref==3.7.2
async-generator==1.10
attrs==23.1.0
beautifulsoup4==4.12.2
certifi==2023.5.7
charset-normalizer==3.1.0
Django==4.2.2
docopt==0.6.2
exceptiongroup==1.1.1
h11==0.14.0
idna==3.4
loguru==0.7.0
lxml==4.9.2
outcome==1.2.0
pipreqs==0.4.13
PyMySQL==1.1.0
PySocks==1.7.1
pytube==15.0.0
pytz==2023.3
requests==2.31.0
selenium==4.10.0
six==1.16.0
sniffio==1.3.0
sortedcontainers==2.4.0
soupsieve==2.4.1
sqlparse==0.4.4
trio==0.22.0
trio-websocket==0.10.3
typing_extensions==4.7.0
tzlocal==5.0.1
urllib3==2.0.3
wsproto==1.2.0
yarg==0.1.9
import requests
from bs4 import BeautifulSoup
# import os
dir = "/reptile_data/files/app_logos"
# 创建保存图片的文件夹
# if not os.path.exists(dir):
# os.makedirs(dir)
# 请求商店页面
url = 'https://cablemap.info/_default.aspx'
response = requests.get(url)
# 解析页面HTML
soup = BeautifulSoup(response.text, 'html.parser')
print(soup)
# 获取最热门应用元素并遍历
# apps = soup.find_all('li', class_='category-item bottom-shadow J_cat_item')
# for app in apps[:100]:
# # 获取应用名称和Logo URL
# app_name = app.find('h2', class_='category-name ellipsis').value
# print(app_name)
# logo_url = 'https:' + app.find('img', class_='')['src']
# # newName = app_name.encode('utf-8')
# # 下载Logo图片并保存到本地文件夹
# response = requests.get(logo_url)
# with open(f'{dir}/{app_name}.jpg', 'wb') as f:
# f.write(response.content)
#
# print(f'{app_name} 的Logo已保存')
# 导入依赖库
# import os
import json
import requests
from bs4 import BeautifulSoup
from utils.download_image import download_image
# json 数据
data = []
# 确定要爬取的网址
url = "https://www.bbc.co.uk/search?q=2022&d=news_ps&page="
fileDir = "./news/bbc/"
image_key = 280
for a in range(28):
print(f'当前进度:{a + 1}')
_index = a + 1
response = requests.get(url + str(_index))
soup = BeautifulSoup(response.content, 'html.parser')
list_news = soup.find_all('div', {"class": "ssrcss-53phst-Promo ett16tt0"})
for index, item in enumerate(list_news):
# print(item)
# 抓取图片
image_key = image_key + 1
url_element = item.find('img', {"class": "ssrcss-evoj7m-Image ee0ct7c0"})
image_url = url_element['src'] if url_element else ""
# print(url)
if image_url:
# print(url)
# # 下载图片
#
filename = f"{image_key}.jpg"
# print(filename)
# sys.exit()
download_image(url, f'{fileDir}images/{filename}')
# 抓取文字
title_element = item.find('div', {"class": "ssrcss-1f3bvyz-Stack e1y4nx260"}).find('p', {
'class': 'ssrcss-6arcww-PromoHeadline e1f5wbog5'}).find('span')
introduction_element = item.find('div', {"class": "ssrcss-1f3bvyz-Stack e1y4nx260"}).find('p', {
'class': 'ssrcss-1q0x1qg-Paragraph eq5iqo00'})
title = title_element.get_text() if title_element else ""
introduction = introduction_element.get_text() if introduction_element else ""
news = {
"title": title,
"introduction": introduction,
"imageName": filename
}
data.append(news)
# print(data)
# 将数据保存到文件中
with open(f'{fileDir}data1.json', "w", encoding="utf-8") as file:
json.dump(data, file, indent=2, ensure_ascii=False)
# # 导入依赖库
import time
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
# 工具函数-下载图片
'''
打开指定网页,并使用 Selenium 模拟点击 "GDPR-accept" 按钮,然后循环点击 "search-show-more-button" 按钮来加载更多数据,直到按钮不再可点击为止。最后,获取完整的分页数据并关闭浏览器驱动。
'''
# # json 数据
data = []
image_key = 0
fileDir = "./reptile_data/news/nytimes/"
# year = datetime(2021, 1, 1)
# startDate = datetime(2020, 12, 31) # 初始日期
# endDate = datetime(2020, 12, 31) # 结束日期
url = "https://www.facebook.com/"
# 创建浏览器驱动对象
browser = webdriver.Chrome()
# endDate = startDate = startDate + timedelta(days=i)
# 打开网页
browser.get(url)
# 获取账号密码输入框
input_email_element = WebDriverWait(browser, 10).until(
EC.presence_of_element_located((By.XPATH, "//input[@name='email']")))
input_pwd_element = WebDriverWait(browser, 10).until(EC.presence_of_element_located((By.XPATH, "//input[@name='pass']")))
# 获取登录按钮
button_login = WebDriverWait(browser, 10).until(EC.element_to_be_clickable((By.XPATH, "//button[@name='login']")))
input_email_element.send_keys("liyang19970814@gmail.com")
input_pwd_element.send_keys("xn89kiPT/^Kaeg#")
button_login.click()
# print("---------------")
# print(input_email_element)
# print(input_pwd_element)
# print(button_login)
# logger.debug(button)
# 模拟点击按钮多次加载更多数据
# while button.is_enabled():
# time.sleep(2) # 等待一段时间,确保页面加载完毕
# try:
# button.click()
# button = WebDriverWait(browser, 5).until(
# EC.element_to_be_clickable((By.XPATH, "//button[@data-testid='search-show-more-button']")))
# except:
# break
time.sleep(3)
# 获取完整的分页数据
page_content = browser.page_source
soup = BeautifulSoup(page_content, 'html.parser')
print("----------")
print(soup)
# list_news = soup.find_all('li', {"class": "css-1l4w6pd"})
# for index, item in enumerate(list_news):
# logger.debug(item)
# # 抓取图片
# image_key = image_key + 1
# url_element = item.find('img', {"class": "css-rq4mmj"})
# image_url = url_element['src'] if url_element else ""
# # logger.debug(url)
# if image_url:
# # logger.debug(url)
# # # 下载图片
# #
# filename = f"{image_key}.jpg"
# # logger.debug(filename)
# # sys.exit()
# download_image(image_url, f'{fileDir}images/{filename}')
# # 抓取文字
# title_element = item.find('h4', {"class": "css-2fgx4k"})
# introduction_element = item.find('p', {"class": "css-16nhkrn"})
# title = title_element.get_text() if title_element else ""
# introduction = introduction_element.get_text() if introduction_element else ""
# news = {
# "title": title,
# "introduction": introduction,
# "imageName": filename
# }
# data.append(news)
# logger.debug(data)
# 将数据保存到文件中
# with open(f'{fileDir}data.json', "w", encoding="utf-8") as file:
# json.dump(data, file, indent=2, ensure_ascii=False)
browser.close()
# 关闭浏览器驱动
browser.quit()
# # 导入依赖库
import json
import time
from datetime import datetime, timedelta
from bs4 import BeautifulSoup
from loguru import logger
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
# 工具函数-下载图片
from utils.download_image import download_image
'''
打开指定网页,并使用 Selenium 模拟点击 "GDPR-accept" 按钮,然后循环点击 "search-show-more-button" 按钮来加载更多数据,直到按钮不再可点击为止。最后,获取完整的分页数据并关闭浏览器驱动。
'''
# # json 数据
data = [];
image_key = 0
fileDir = "./reptile_data/news/nytimes/"
year = datetime(2021, 1, 1)
startDate = datetime(2020, 12, 31) # 初始日期
endDate = datetime(2020, 12, 31) # 结束日期
# 创建浏览器驱动对象
browser = webdriver.Chrome()
for i in range(1):
endDate = startDate = startDate + timedelta(days=i)
# 打开网页
browser.get(
f'https://www.nytimes.com/search?dropmab=false&endDate={endDate.strftime("%Y%m%d")}&query={year.strftime("%Y")}&sort=best&startDate={startDate.strftime("%Y%m%d")}&types=interactivegraphics%2Carticle')
try:
accept = WebDriverWait(browser, 10).until(
EC.presence_of_element_located((By.XPATH, "//button[@data-testid='GDPR-accept']")))
accept.click()
finally:
logger.debug("")
# 等待加载更多按钮出现
button = WebDriverWait(browser, 10).until(
EC.element_to_be_clickable((By.XPATH, "//button[@data-testid='search-show-more-button']")))
# logger.debug(button)
# 模拟点击按钮多次加载更多数据
while button.is_enabled():
time.sleep(2) # 等待一段时间,确保页面加载完毕
try:
button.click()
button = WebDriverWait(browser, 5).until(
EC.element_to_be_clickable((By.XPATH, "//button[@data-testid='search-show-more-button']")))
except:
break
# 获取完整的分页数据
page_content = browser.page_source
soup = BeautifulSoup(page_content, 'html.parser')
list_news = soup.find_all('li', {"class": "css-1l4w6pd"})
for index, item in enumerate(list_news):
logger.debug(item)
# 抓取图片
image_key = image_key + 1
url_element = item.find('img', {"class": "css-rq4mmj"})
image_url = url_element['src'] if url_element else ""
# logger.debug(url)
if image_url:
# logger.debug(url)
# # 下载图片
#
filename = f"{image_key}.jpg"
# logger.debug(filename)
# sys.exit()
download_image(image_url, f'{fileDir}images/{filename}')
# 抓取文字
title_element = item.find('h4', {"class": "css-2fgx4k"})
introduction_element = item.find('p', {"class": "css-16nhkrn"})
title = title_element.get_text() if title_element else ""
introduction = introduction_element.get_text() if introduction_element else ""
news = {
"title": title,
"introduction": introduction,
"imageName": filename
}
data.append(news)
# logger.debug(data)
# 将数据保存到文件中
with open(f'{fileDir}data.json', "w", encoding="utf-8") as file:
json.dump(data, file, indent=2, ensure_ascii=False)
browser.close()
# 关闭浏览器驱动
browser.quit()
import io
import json
import re
import time
import loguru
import pymysql.cursors
import requests
from bs4 import BeautifulSoup
from api.index import importJson, getReptileTask
from utils.Logger import log
# from requests_toolbelt import *
from utils.createBrowserDriver import create
import opencc
'''
爬取台湾PTT论坛的热门帖子,包括帖子的标题、内容【文本、图片、视频】
爬取流程:创建驱动--》打开浏览器--》打开网页--》爬取分类元素--》循环点击--》爬取热门帖子标题--》循环点击--》爬取帖子详情
'''
data = []
def write_to_database(data):
# 连接到数据库
connection = pymysql.connect(host='192.168.0.103',
user='root',
password='123456',
database='test',
cursorclass=pymysql.cursors.DictCursor)
try:
# 创建游标
with connection.cursor() as cursor:
for data_item in data:
# 获取数据
title = data_item['title']
content = data_item['content']
log.debug("content 长度:" + str(len(content)))
# 执行数据库操作
sql = "INSERT INTO message (title, content) VALUES (%s, %s)"
cursor.execute(sql, (title, content))
# 提交事务
connection.commit()
# 事务执行成功
log.debug("事务执行成功")
except pymysql.Error as e:
# 发生异常时回滚事务
connection.rollback()
log.debug("事务执行失败:", e)
finally:
# 关闭连接
connection.close()
def reptile(browser=None, search_word=""):
url = "https://www.ptt.cc/bbs/hotboards.html"
browser = browser or create(['--headless'])
# time.sleep(1)
# 打开网页
browser.get(url)
classify_item_list = browser.find_elements('xpath', "//div[@class='board-class']")
# log.debug(classify_item_list)
length = len(classify_item_list)
for index in range(length):
if 0 < index < 5:
classify_item_list[index].click()
time.sleep(1)
element_list = browser.find_elements('xpath', "//div[@class='r-ent']//div[@class='title']//a")
length_two = len(element_list)
for index_two in range(length_two):
element_list[index_two].click()
time.sleep(1)
# 原链接
browser_current_url = browser.current_url
log.debug('网页链接' + str(browser_current_url))
# 获取帖子详情
element_title = browser.find_element('xpath',
"//div[@id='main-content']/div[3]//span[@class='article-meta-value']")
# 内容可能包含图片和视频,需要后处理
element_content = browser.find_element('xpath', "//div[@id='main-content']")
# 去除herf属性值包含'img'的a标签
# ------------------------------------
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(element_content.get_attribute('innerHTML'), 'html.parser')
# 查找所有的<a>标签
a_tags = soup.find_all('a', href=True)
# 循环遍历<a>标签,检查每个<a>标签是否包含<img>元素,如果包含则删除该<a>标签
for tag in a_tags:
tag.decompose()
html = soup.prettify().replace('amp;', '')
# log.debug(html)
# log.debug('11111')
# ------------------------------------
# 组装数据
obj = {
"title": element_title.text,
"content": html,
"link": browser_current_url,
"reptileTime": str(int(time.time()))
}
# 使用正则表达式进行匹配
matches = re.findall(search_word, element_title.text)
# 打印匹配结果
if matches:
# log.debug(f"找到了匹配的字符串:{matches}")
data.append(obj)
else:
log.debug("未找到匹配的字符串")
# 浏览器返回上一页
browser.back()
element_list = browser.find_elements('xpath', "//div[@class='r-ent']//div[@class='title']//a")
# 浏览器返回上一页
browser.back()
time.sleep(1)
# 重新获取
classify_item_list = browser.find_elements('xpath', "//div[@class='board-class']")
# 发送爬取数据到java服务
# 保存json文件到本地
file_dir = f'./reptile_data/ptt/{int(time.time())}'
# state_save = save_json(file_dir, data)
def upload_control():
# 定义表单数据
form_data = {
"tableName": "pms_ptt",
}
file = io.BytesIO(json.dumps(data).encode())
response = importJson(file, form_data)
if response['status_code'] == 200 and response['data']['code'] == 200:
log.debug("调用成功")
else:
log.debug("调用失败")
# upload_control()
if len(data) == 0:
log.debug("数据为空")
else:
upload_control()
# write_to_database(data)
# if state_save:
# log.debug('文件保存成功')
# else:
# log.debug('文件保存失败')
# 关闭浏览器驱动
# time.sleep(3)
browser.quit()
response = getReptileTask()
def convert_to_traditional(simplified_text):
converter = opencc.OpenCC('s2t.json') # 创建简体中文到繁体中文的转换器
traditional_text = converter.convert(simplified_text) # 进行转换
return traditional_text
if response['status_code'] == 200 and response['data']['code'] == 200:
log.debug("调用成功")
search_word = ""
for item in response['data']['rows']:
if item['name'] == 'ptt':
search_word = item['keyword']
# print(convert_to_traditional(search_word))
reptile(None, convert_to_traditional(search_word))
else:
log.debug("调用失败")
# upload_control()
# # 导入依赖库
import time
from telnetlib import EC
from bs4 import BeautifulSoup
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from utils.createBrowserDriver import create
# 工具函数-下载图片
'''
打开指定网页,并使用 Selenium 模拟点击 "GDPR-accept" 按钮,然后循环点击 "search-show-more-button" 按钮来加载更多数据,直到按钮不再可点击为止。最后,获取完整的分页数据并关闭浏览器驱动。
'''
def reptile(browser):
# # json 数据
data = []
image_key = 0
fileDir = "./reptile_data/news/nytimes/"
# year = datetime(2021, 1, 1)
# startDate = datetime(2020, 12, 31) # 初始日期
# endDate = datetime(2020, 12, 31) # 结束日期
url = "https://twitter.com/"
browser = browser or create(False)
print(browser)
# browser = webdriver.Firefox(executable_path='/usr/local/bin/geckodriver')
# endDate = startDate = startDate + timedelta(days=i)
# 打开网页
browser.get(url)
# WebDriverWait(browser,10).
# 打开登录窗口
open_button_login = WebDriverWait(browser, 10).until(
EC.presence_of_element_located((By.XPATH, "//a[@data-testid='login']")))
open_button_login.click()
time.sleep(5)
# 获取账号密码输入框
input_email_element = WebDriverWait(browser, 10).until(
EC.presence_of_element_located((By.XPATH, "//input[@autocomplete='username']")))
# 获取下一步按钮
buttons = WebDriverWait(browser, 10).until(EC.presence_of_all_elements_located((By.XPATH, "//div[@role='button']")))
for item in buttons:
print(BeautifulSoup(item, 'html.parser'))
# soup = BeautifulSoup(page_content, 'html.parser')
# input_pwd_element = WebDriverWait(browser, 10).until(EC.presence_of_element_located((By.XPATH, "//input[@name='pass']")))
# # 获取登录按钮
# button_login = WebDriverWait(browser, 10).until(EC.element_to_be_clickable((By.XPATH, "//button[@name='login']")))
#
# input_email_element.send_keys("liyang19970814@gmail.com")
# input_pwd_element.send_keys("xn89kiPT/^Kaeg#")
# button_login.click()
# print("---------------")
# print(input_email_element)
# print(input_pwd_element)
# print(button_login)
# logger.debug(button)
# 模拟点击按钮多次加载更多数据
# while button.is_enabled():
# time.sleep(2) # 等待一段时间,确保页面加载完毕
# try:
# button.click()
# button = WebDriverWait(browser, 5).until(
# EC.element_to_be_clickable((By.XPATH, "//button[@data-testid='search-show-more-button']")))
# except:
# break
# time.sleep(3)
# 获取完整的分页数据
page_content = browser.page_source
soup = BeautifulSoup(page_content, 'html.parser')
# print("----------")
# print(soup)
# list_news = soup.find_all('li', {"class": "css-1l4w6pd"})
# for index, item in enumerate(list_news):
# logger.debug(item)
# # 抓取图片
# image_key = image_key + 1
# url_element = item.find('img', {"class": "css-rq4mmj"})
# image_url = url_element['src'] if url_element else ""
# # logger.debug(url)
# if image_url:
# # logger.debug(url)
# # # 下载图片
# #
# filename = f"{image_key}.jpg"
# # logger.debug(filename)
# # sys.exit()
# download_image(image_url, f'{fileDir}images/{filename}')
# # 抓取文字
# title_element = item.find('h4', {"class": "css-2fgx4k"})
# introduction_element = item.find('p', {"class": "css-16nhkrn"})
# title = title_element.get_text() if title_element else ""
# introduction = introduction_element.get_text() if introduction_element else ""
# news = {
# "title": title,
# "introduction": introduction,
# "imageName": filename
# }
# data.append(news)
# logger.debug(data)
# 将数据保存到文件中
# with open(f'{fileDir}data.json', "w", encoding="utf-8") as file:
# json.dump(data, file, indent=2, ensure_ascii=False)
browser.close()
# 关闭浏览器驱动
browser.quit()
import ssl
from pytube import YouTube
import json
import time
from bs4 import BeautifulSoup
from utils.createBrowserDriver import create
from utils.filse import save_json
import pymysql.cursors
ssl._create_default_https_context = ssl._create_stdlib_context
def Download(link,file_dir):
yt = YouTube(link)
# yt.register_on_complete_callback(callback)
yt.streams.filter(progressive=True, file_extension='mp4')
steam = yt.streams.get_by_itag(22)
try:
steam.download(file_dir)
return True
except:
print("下载失败")
return False
def write_to_database(data):
# 连接到数据库
connection = pymysql.connect(host='192.168.0.103',
user='root',
password='123456',
database='test',
cursorclass=pymysql.cursors.DictCursor)
try:
# 创建游标
with connection.cursor() as cursor:
for data_item in data:
# 获取数据
title = data_item['title']
content = data_item['content']
print("content 长度:" + str(len(content)))
# 执行数据库操作
sql = "INSERT INTO message (title, content) VALUES (%s, %s)"
cursor.execute(sql, (title, content))
# 提交事务
connection.commit()
# 事务执行成功
print("事务执行成功")
except pymysql.Error as e:
# 发生异常时回滚事务
connection.rollback()
print("事务执行失败:", e)
finally:
# 关闭连接
connection.close()
def reptile(browser=None):
option = ['--headless']
browser = browser or create(url,None)
# print(browser)
# 打开网页
# browser.get(url)
classify_video_list = browser.find_elements('xpath', "//div[@id='contents']//ytd-video-renderer//div[@id='title-wrapper']//a")
# print(classify_item_list)
length = len(classify_video_list)
for index in range(length):
if 0 < index < 2:
title = classify_video_list[index].get_attribute('title')
# link = classify_video_list[index].get_attribute('href')
link = "https://www.youtube.com/watch?v=7q88m5MQRhE"
# print(link)
file_url = './'+link+'.mp4'
state_download = Download(link,file_url)
if state_download:
# 组装数据
obj = {
"title": title,
"content": f'<video src="{file_url}"></video>',
"videoUrl": file_url,
}
data.append(obj)
else:
return False
# 保存json文件到本地
state_save = save_json('./youtubeData', data)
# write_to_database(data)
if state_save:
print('文件保存成功')
else:
print('文件保存失败')
browser.close()
# 关闭浏览器驱动
browser.quit()
url = "https://www.youtube.com/results?search_query=水"
data = [] # json 数组
reptile()
\ No newline at end of file
{
"folders": [
{
"path": "."
}
],
"settings": {}
}
\ No newline at end of file
# 导入依赖库
import requests
from bs4 import BeautifulSoup
# 确定要爬取的网址
url = "https://play.google.com/store/apps"
# 发送 HTTP 请求并解析页面内容
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
# 找到所有应用的链接并进入详情页
# 列表页面中只包含应用的概要信息,我们需要进入每个应用的详情页面获取 logo 图片。可以通过在列表页面中找到应用的链接来实现这一点
# 找到所有应用的链接
app_links = []
for link in soup.find_all('a', href=True):
if '/store/apps/details?id=' in link['href']:
app_links.append(link['href'])
# 进入每个应用的详情页面
for app_link in app_links:
url = f'https://play.google.com{app_link}'
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
# 在详情页面中找到 logo 图片链接
logo = soup.find('img', {'class': 'T75of nm4vBd arM4bb'})['src']
# 下载图片
dir = "/Users/macosx/Desktop/项目文档/python-demo/Crawlmages/app_logos"
response = requests.get(logo)
filename = f"{soup.find('h1', {'class': 'Fd93Bb ynrBgc xwcR9d'}).find('span').text}.png"
with open(f'{dir}/{filename}', 'wb') as f:
f.write(response.content)
print(f"Saved {filename}")
import os
import requests
from bs4 import BeautifulSoup
# 设置保存视频的路径
save_path = '/reptile_data/files/youtube-video'
# 创建保存路径文件夹(如果不存在)
os.makedirs(save_path, exist_ok=True)
# YouTube 页面 URL
url = 'https://www.youtube.com/results?search_query=高清风景'
# 发起 HTTP 请求获取页面内容
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
print(soup)
# 解析页面获取视频链接
video_links = soup.select('.yt-uix-tile-link')
# 遍历视频链接并下载
for link in video_links:
video_url = 'https://www.youtube.com' + link['href']
video_id = link['href'][9:]
download_url = f'https://www.ssyoutube.com/watch?v={video_id}'
# 发起 HTTP 请求获取视频下载链接
response = requests.get(download_url)
soup = BeautifulSoup(response.text, 'html.parser')
# 解析页面获取视频下载链接
download_link = soup.select('.download-box a')[0]['href']
# 下载视频并保存到指定路径
video_response = requests.get(download_link, stream=True)
video_name = link.text.strip() + '.mp4'
video_path = os.path.join(save_path, video_name)
with open(video_path, 'wb') as f:
for chunk in video_response.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
print(f'已下载保存视频:{video_name}')
# reptile_task.py
def reptile_task():
print("定时任务执行了")
# logger.info("定时任务执行了")
# logger.add("")
APScheduler==3.10.1
asgiref==3.7.2
async-generator==1.10
attrs==23.1.0
beautifulsoup4==4.12.2
certifi==2023.5.7
charset-normalizer==3.1.0
Django==4.2.2
docopt==0.6.2
exceptiongroup==1.1.1
h11==0.14.0
idna==3.4
loguru==0.7.0
lxml==4.9.2
outcome==1.2.0
pipreqs==0.4.13
PyMySQL==1.1.0
PySocks==1.7.1
pytube==15.0.0
pytz==2023.3
requests==2.31.0
selenium==4.10.0
six==1.16.0
sniffio==1.3.0
sortedcontainers==2.4.0
soupsieve==2.4.1
sqlparse==0.4.4
trio==0.22.0
trio-websocket==0.10.3
typing_extensions==4.7.0
tzlocal==5.0.1
urllib3==2.0.3
wsproto==1.2.0
yarg==0.1.9
OpenCC==1.1.6
\ No newline at end of file
import time
from selenium import webdriver
o = webdriver.ChromeOptions()
prefs = {'profile.default_content_settings.popups': 0, 'download.default_directory': '输入下载路径'}
o.add_experimental_option('prefs', prefs)
# 设置无头运行
o.add_argument('--headless')
o.add_argument('--no-sandbox')
o.add_argument('--disable-gpu')
o.add_argument('--disable-dev-shm-usage')
driver = webdriver.Chrome(options=o)
driver.get('https://pypi.org/project/selenium/#files')
# 点击下载
driver.find_element('partial link text', 'selenium-4.5.0-py3-none-any.whl').click()
print('success')
driver.close()
driver.quit()
\ No newline at end of file
import logging
import os
from logging import Logger, handlers
from config.settings import get_log_path
class MyLogger(Logger):
def __init__(self):
# log_name = '{}.log'.format(time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime()))
# log_path_file = os.path.join(get_log_path(), log_name)
# 获取日志文件路径
all_log_path_file = os.path.join(get_log_path(), "api.log")
error_log_path_file = os.path.join(get_log_path(), "error.log")
# 设置日志的名字、日志的收集级别
super().__init__("test_api", logging.DEBUG)
# 自定义日志格式(Formatter), 实例化一个日志格式类
fmt_str = '%(asctime)s %(levelname)s %(filename)s : %(funcName)s [line: %(lineno)s] %(message)s'
formatter = logging.Formatter(fmt_str)
# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# 实例化控制台渠道(StreamHandle)
sh = logging.StreamHandler()
# 设置控制台输出的日志级别
sh.setLevel(logging.DEBUG)
# 设置渠道当中的日志显示格式
sh.setFormatter(formatter)
# 将渠道与日志收集器绑定起来
self.addHandler(sh)
# 实例化文件渠道(FileHandle)
# fh = logging.FileHandler(log_path_file, mode='a', encoding="utf-8")
'''
创建一个文件实例,如果 api_test.log 文件不存在,就会自动创建;
mode 参数设置为追加;另外为防止乱码, encoding 参数设置为 utf-8 编码格式
'''
fh = handlers.RotatingFileHandler(all_log_path_file, maxBytes=10 ** 6, backupCount=5,
encoding="utf-8", mode="a")
# 设置向文件输出的日志格式
fh.setLevel(logging.DEBUG)
# 设置渠道当中的日志显示格式
fh.setFormatter(formatter)
# 加载文件实例到 logger 对象中
self.addHandler(fh)
# 当log达到最大字节长度,将自动backup5个log文件。当5个log文件都达到最大长度时,将只保留最新的log。
fh1 = handlers.RotatingFileHandler(error_log_path_file, maxBytes=10 ** 6, backupCount=5,
encoding="utf-8", mode="a")
# 设置向文件输出的日志格式
fh1.setLevel(logging.ERROR)
# 设置渠道当中的日志显示格式
fh1.setFormatter(formatter)
# 加载文件实例到 logger 对象中
self.addHandler(fh1)
fh.close()
fh1.close()
sh.close()
# 实例化MyLogger对象,在其他文件直接使用log就能调用
log = MyLogger()
if __name__ == '__main__':
log.error("this is a error log")
log.info("this is a info log")
log.debug("this is a debug log")
log.warning("this is a warning log")
import sys
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
'''
创建浏览器实例
'''
def create(option=None):
chrome_options = webdriver.ChromeOptions()
if option is not None:
for value in option:
chrome_options.add_argument(value)
if sys.platform.startswith('linux'):
# print("当前系统是 Linux")
# linux下运行记得加上这些参数 ----------------------------
# chrome_options.add_argument('--headless')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--disable-dev-shm-usage')
# 加载chromedriver -------------------------------------------------
# windows 下的 chromedriver 默认加载路径是当前路径下的 chromedriver.exe
# linux 下的 chromedriver 默认加载路径是 /usr/local/bin/chromedriver
# 当然也可以通过 executable_path 自定义
browser = webdriver.Chrome(options=chrome_options)
# -----------------------------------------------------------------
else:
# print("当前系统不是 Linux")
# linux下运行记得加上这些参数 ----------------------------
# chrome_options.add_argument('--headless') # 启用无头模式
chrome_options.add_argument('--no-sandbox') # 禁用沙盒模式
# 创建浏览器驱动对象
browser = webdriver.Chrome(options=chrome_options)
return browser
import requests
def download_image(url, save_path):
response = requests.get(url, stream=True)
if response.status_code == 200:
with open(save_path, 'wb') as file:
for chunk in response.iter_content(1024):
file.write(chunk)
# print(f"图片下载成功:{save_path}")
else:
print(f"图片下载失败:{url}")
import json
# 保存json文件
def save_json(file_dir=None, data=None):
if data is None:
data = []
if file_dir is None:
file_dir = "./data.json"
try:
file_dir = f'{file_dir}.json'
print("文件保存路径:" + file_dir)
with open(file_dir, "w", encoding="utf-8") as file:
json.dump(data, file, indent=2, ensure_ascii=False)
return True
except:
return False
# 封装http请求
import requests
class HttpRequest:
# 需要的参数
# method:请求方式
# url:请求的url
# param:请求参数
# headers:请求头
# cookie:请求的cookie值
def __init__(self, method, url, param=None, headers=None, cookie=None):
self.method = method
self.url = url
self.param = param
self.headers = headers
self.cookie = cookie
def http_request(self):
if self.method.lower() == "post":
# if self.headers is None:
# self.headers = {"Content-Type": "application/json;charset=UTF-8s"}
return requests.post(self.url, self.param,headers=self.headers, cookies=self.cookie)
elif self.method.lower() == "get":
if self.headers is None:
self.headers = {"Content-Type": "application/json;charset=UTF-8s"}
return requests.get(self.url, self.param, headers=self.headers, cookies=self.cookie)
else:
print("请求方式错误:{0}".format(self.method))
# 主程序入口
if __name__ == '__main__':
login_url = "http://www.qabujiaban.com/user/login"
data = {"username": "uuuu222都44", "password": "WJHasb124*1"}
# 登陆
res = HttpRequest("Post", login_url, data).http_request()
print("登陆响应文本:", res.json()) #
cookie = res.cookies # 获取cookie
query_url = "http://www.qabujiaban.com/user/query"
query_headers = {"Content-Type": "application/json;charset=UTF-8s"}
# 查询
rqs = HttpRequest("Get", query_url, headers=query_headers, cookie=cookie).http_request()
print("查询响应文本:", rqs.json())
# print("Uu".lower())
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment