Files
SafeLine/mcp_server/utils/request.py

62 lines
1.9 KiB
Python
Raw Normal View History

2025-04-02 21:51:51 +08:00
from httpx import AsyncClient
from config import GLOBAL_CONFIG
import httpx
2025-04-07 18:46:55 +08:00
import json
2025-04-02 21:51:51 +08:00
2025-04-07 18:46:55 +08:00
def get_response_data(response: httpx.Response) -> dict:
2025-04-02 21:51:51 +08:00
if response.status_code != 200:
raise Exception(f"response status code: {response.status_code}")
2025-04-02 21:51:51 +08:00
data = response.json()
if data["msg"] is not None and data["msg"] != "":
2025-04-07 18:46:55 +08:00
raise Exception(f"request SafeLine API failed: {data['msg']}")
2025-04-02 21:51:51 +08:00
if data["err"] is not None and data["err"] != "":
2025-04-07 18:46:55 +08:00
raise Exception(f"request SafeLine API failed: {data['err']}")
return data['data']
def check_slce_response(response: httpx.Response) -> str:
try:
get_response_data(response)
except Exception as e:
return str(e)
2025-04-02 21:51:51 +08:00
return "success"
2025-04-07 18:46:55 +08:00
def check_slce_get_response(response: httpx.Response) -> str:
try:
data = get_response_data(response)
if data:
return json.dumps(data)
return "empty response data"
except Exception as e:
return str(e)
2025-04-07 07:41:42 +00:00
async def get_slce_api(path: str) -> str:
if not path.startswith("/"):
path = f"/{path}"
try:
async with AsyncClient(verify=False) as client:
2025-04-07 09:34:48 +00:00
response = await client.get(f"{GLOBAL_CONFIG.SAFELINE_ADDRESS}{path}", headers={
2025-04-07 07:41:42 +00:00
"X-SLCE-API-TOKEN": f"{GLOBAL_CONFIG.SAFELINE_API_TOKEN}"
})
2025-04-07 18:46:55 +08:00
return check_slce_get_response(response)
2025-04-07 07:41:42 +00:00
except Exception as e:
return str(e)
async def post_slce_api(path: str, req_body: dict) -> str:
2025-04-03 14:07:37 +08:00
if not path.startswith("/"):
path = f"/{path}"
2025-04-03 19:09:38 +08:00
try:
async with AsyncClient(verify=False) as client:
response = await client.post(f"{GLOBAL_CONFIG.SAFELINE_ADDRESS}{path}", json=req_body, headers={
"X-SLCE-API-TOKEN": f"{GLOBAL_CONFIG.SAFELINE_API_TOKEN}"
})
return check_slce_response(response)
except Exception as e:
return str(e)