index.py 1.6 KB
import requests
from requests.exceptions import Timeout
from config.settings import get_base_url

headers = {}

baser_url = get_base_url()

# 设置超时时间为5秒
timeout = 3



def custom_request(method, url, *args, **kwargs):
    """

    :param method:
    :param url:
    :param args:
    :param kwargs:
    :return:
    """

    try:
        response = requests.request(method, url, timeout=timeout, *args, **kwargs)
        response = {"status_code": response.status_code, "data": response.json()}
    except Timeout:
        response = {"status_code": 500}
    # 将响应内容解析为 JSON 格式
    return response


def importJson(file, form_data):
    """

    :param file:
    :param form_data:
    :return:
    """
    http_url = baser_url + "importJson/import"
    response = custom_request('POST', http_url, headers=headers, files={"file": file}, data=form_data)
    return response


def getReptileTask():
    """

    :return:
    """
    http_url = baser_url + "crawlerSetting/list"
    response = custom_request('GET', http_url)
    return response


def importJsonPath(form_data):
    """

    :param form_data:
    :return:
    """
    header = {"Content-Type": "application/json"}
    http_url = baser_url + "importJson/importJsonPath"
    response = custom_request('POST', http_url, headers=header, data=form_data)
    return response


def runingPython(form_data):
    """

    :param form_data:
    :return:
    """
    header = {"Content-Type": "application/json"}
    http_url = baser_url + "python/startPy"
    response = custom_request('POST', http_url, headers=header, data=form_data)
    return response