1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import requests
from requests.exceptions import Timeout
from config.settings import get_base_url
headers = {}
baser_url = get_base_url()
# 设置超时时间为5秒
timeout = 3
def custom_request(method, url, *args, **kwargs):
"""
:param method:
:param url:
:param args:
:param kwargs:
:return:
"""
try:
response = requests.request(method, url, timeout=timeout, *args, **kwargs)
response = {"status_code": response.status_code, "data": response.json()}
except Timeout:
response = {"status_code": 500}
# 将响应内容解析为 JSON 格式
return response
def importJson(file, form_data):
"""
:param file:
:param form_data:
:return:
"""
http_url = baser_url + "importJson/import"
response = custom_request('POST', http_url, headers=headers, files={"file": file}, data=form_data)
return response
def getReptileTask():
"""
:return:
"""
http_url = baser_url + "crawlerSetting/list"
response = custom_request('GET', http_url)
return response
def importJsonPath(form_data):
"""
:param form_data:
:return:
"""
header = {"Content-Type": "application/json"}
http_url = baser_url + "importJson/importJsonPath"
response = custom_request('POST', http_url, headers=header, data=form_data)
return response
def runingPython(form_data):
"""
:param form_data:
:return:
"""
header = {"Content-Type": "application/json"}
http_url = baser_url + "python/startPy"
response = custom_request('POST', http_url, headers=header, data=form_data)
return response