feat: init slmcp

This commit is contained in:
姚凯
2025-04-02 21:51:51 +08:00
committed by jjtac
parent d4f31efdea
commit bb0b1187a0
17 changed files with 348 additions and 0 deletions

1
.gitignore vendored
View File

@@ -5,3 +5,4 @@
*.tar.gz
build.sh
compose.yml
__pycache__

1
slmcp/.python-version Normal file
View File

@@ -0,0 +1 @@
3.13

11
slmcp/Dockerfile Normal file
View File

@@ -0,0 +1,11 @@
FROM ghcr.io/astral-sh/uv:python3.13-alpine
COPY requirements.txt /app/requirements.txt
WORKDIR /app
RUN uv pip sync requirements.txt --system -i https://pypi.tuna.tsinghua.edu.cn/simple
COPY . /app
CMD ["python", "__main__.py"]

3
slmcp/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from .config import Config
GLOBAL_CONFIG = Config.from_env()

4
slmcp/__main__.py Normal file
View File

@@ -0,0 +1,4 @@
from server import main
if __name__ == "__main__":
main()

3
slmcp/config/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from .config import Config
GLOBAL_CONFIG = Config.from_env()

27
slmcp/config/config.py Normal file
View File

@@ -0,0 +1,27 @@
import os
class Config:
SAFELINE_ADDRESS: str
SAFELINE_API_TOKEN: str
SECRET: str
LISTEN_PORT: int
LISTEN_ADDRESS: str
def __init__(self):
self.SAFELINE_ADDRESS = os.getenv("SAFELINE_ADDRESS")
self.SAFELINE_API_TOKEN = os.getenv("SAFELINE_API_TOKEN")
self.SECRET = os.getenv("SAFELINE_SECRET")
env_listen_port = os.getenv("LISTEN_PORT")
if env_listen_port and env_listen_port.isdigit():
self.LISTEN_PORT = int(env_listen_port)
else:
self.LISTEN_PORT = 5678
env_listen_address = os.getenv("LISTEN_ADDRESS")
if env_listen_address:
self.LISTEN_ADDRESS = env_listen_address
else:
self.LISTEN_ADDRESS = "0.0.0.0"
@staticmethod
def from_env():
return Config()

12
slmcp/docker-compose.yaml Normal file
View File

@@ -0,0 +1,12 @@
services:
slmcp:
image: slmcp:0.1
container_name: slmcp
ports:
- "5678:5678"
environment:
- SAFELINE_SECRET=your_secret_key
- SAFELINE_ADDRESS=https://your_safeline_ip:9443
- SAFELINE_API_TOKEN=your_safeline_api_token
- LISTEN_PORT=5678
- LISTEN_ADDRESS=0.0.0.0

9
slmcp/pyproject.toml Normal file
View File

@@ -0,0 +1,9 @@
[project]
name = "slmcp"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.13"
dependencies = [
"mcp[cli]>=1.6.0",
]

76
slmcp/requirements.txt Normal file
View File

@@ -0,0 +1,76 @@
# This file was autogenerated by uv via the following command:
# uv pip compile pyproject.toml -o requirements.txt
annotated-types==0.7.0
# via pydantic
anyio==4.9.0
# via
# httpx
# mcp
# sse-starlette
# starlette
certifi==2025.1.31
# via
# httpcore
# httpx
click==8.1.8
# via
# typer
# uvicorn
h11==0.14.0
# via
# httpcore
# uvicorn
httpcore==1.0.7
# via httpx
httpx==0.28.1
# via mcp
httpx-sse==0.4.0
# via mcp
idna==3.10
# via
# anyio
# httpx
markdown-it-py==3.0.0
# via rich
mcp==1.6.0
# via slmcp (pyproject.toml)
mdurl==0.1.2
# via markdown-it-py
pydantic==2.11.1
# via
# mcp
# pydantic-settings
pydantic-core==2.33.0
# via pydantic
pydantic-settings==2.8.1
# via mcp
pygments==2.19.1
# via rich
python-dotenv==1.1.0
# via
# mcp
# pydantic-settings
rich==14.0.0
# via typer
shellingham==1.5.4
# via typer
sniffio==1.3.1
# via anyio
sse-starlette==2.2.1
# via mcp
starlette==0.46.1
# via
# mcp
# sse-starlette
typer==0.15.2
# via mcp
typing-extensions==4.13.0
# via
# pydantic
# pydantic-core
# typer
# typing-inspection
typing-inspection==0.4.0
# via pydantic
uvicorn==0.34.0
# via mcp

47
slmcp/server.py Normal file
View File

@@ -0,0 +1,47 @@
from mcp.server import Server
from mcp.types import Tool, TextContent
import mcp.server.sse
from starlette.applications import Starlette
from starlette.routing import Route, Mount
from starlette.requests import Request
import uvicorn
from starlette.responses import PlainTextResponse
import tools
from config import GLOBAL_CONFIG
# Create an MCP server
slmcp = Server("SafeLine WAF mcp server")
sse = mcp.server.sse.SseServerTransport("/messages/")
@slmcp.list_tools()
async def list_tools() -> list[Tool]:
return tools.ALL_TOOLS
@slmcp.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
result = await tools.run(name, arguments)
return [TextContent(
type="text",
text=f"{name} result: {result}"
)]
async def handle_sse(request: Request) -> None:
if GLOBAL_CONFIG.SECRET != "" and request.headers.get("Secret") != GLOBAL_CONFIG.SECRET:
return PlainTextResponse("Unauthorized", status_code=401)
async with sse.connect_sse(
request.scope, request.receive, request._send
) as [read_stream, write_stream]:
await slmcp.run(
read_stream, write_stream, slmcp.create_initialization_options()
)
def main():
starlette_app = Starlette(debug=True,routes=[
Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=sse.handle_post_message),
])
uvicorn.run(starlette_app, host=GLOBAL_CONFIG.LISTEN_ADDRESS, port=GLOBAL_CONFIG.LISTEN_PORT)

17
slmcp/tools/__init__.py Normal file
View File

@@ -0,0 +1,17 @@
from mcp.types import Tool
from typing import Callable
ALL_TOOLS = []
TOOL_FUNC_MAP = {}
def register_tool(tool: Tool, func: Callable):
ALL_TOOLS.append(tool)
TOOL_FUNC_MAP[tool.name] = func
async def run(name:str, arguments:dict) -> str:
if name not in TOOL_FUNC_MAP:
return f"Unknown tool: {name}"
return await TOOL_FUNC_MAP[name](arguments)
from . import create_black_custom_rule, create_http_application

View File

@@ -0,0 +1,60 @@
from pydantic import BaseModel, Field
from utils.request import post_slce_api
from tools import register_tool, Tool
class CreateBlackCustomRule(BaseModel):
path: str = Field(default="",description="request path to block, if path is empty, block all paths")
ip: str = Field(default="",description="request ip to block, if ip is empty, block all ips")
async def create_black_custom_rule(arguments:dict) -> str:
"""
Create a new black custom rule.
Args:
path: request path to block, if path is empty, block all paths
ip: request ip to block, if ip is empty, block all ips
"""
pattern = []
name = "block "
path = arguments["path"]
ip = arguments["ip"]
if path is not None and path != "":
pattern.append({
"k": "uri_no_query",
"op": "eq",
"v": [path],
"sub_k": ""
})
name += f"path: {path} "
if ip is not None and ip != "":
pattern.append({
"k": "src_ip",
"op": "eq",
"v": [ip],
"sub_k": ""
})
name += f"ip: {ip}"
if len(pattern) == 0:
return "path and ip cannot be empty"
return await post_slce_api("/api/open/policy",{
"name": name,
"is_enabled": True,
"pattern": [pattern],
"action": 1
})
register_tool(
Tool(
name="create_black_custom_rule",
description="在雷池 WAF 上创建一个黑名单自定义规则",
inputSchema=CreateBlackCustomRule.model_json_schema()
),
create_black_custom_rule
)

View File

@@ -0,0 +1,54 @@
from pydantic import BaseModel, Field
from utils.request import post_slce_api
from tools import register_tool, Tool
from urllib.parse import urlparse
class CreateHttpApplication(BaseModel):
domain: str = Field(default="",description="application domain, if empty, match all domain")
port: int = Field(description="application listen port, must between 1 and 65535")
upstream: str = Field(description="application proxy address, must be a valid url")
async def create_http_application(arguments:dict) -> str:
"""
Create a new HTTP application.
Args:
domain: application domain
port: application listen port
upstream: application proxy address
"""
port = arguments["port"]
upstream = arguments["upstream"]
domain = arguments["domain"]
if port is None or port < 1 or port > 65535:
return "invalid port"
parsed_upstream = urlparse(upstream)
if parsed_upstream.scheme != "https" and parsed_upstream.scheme != "http":
return "invalid upstream scheme"
if parsed_upstream.hostname == "":
return "invalid upstream host"
return await post_slce_api("/api/open/site",{
"server_names": [domain],
"ports": [ str(port) ],
"upstreams": [ upstream ],
"type": 0,
"static_default": 1,
"health_check": True,
"load_balance": {
"balance_type": 1
}
})
register_tool(
Tool(
name="create_http_application",
description="在雷池 WAF 上创建一个站点应用",
inputSchema=CreateHttpApplication.model_json_schema()
),
create_http_application
)

0
slmcp/utils/__init__.py Normal file
View File

23
slmcp/utils/request.py Normal file
View File

@@ -0,0 +1,23 @@
from httpx import AsyncClient
from config import GLOBAL_CONFIG
import httpx
def check_slce_response(response: httpx.Response) -> str:
if response.status_code != 200:
return f"response status code: {response.status_code}"
data = response.json()
if data["msg"] is not None and data["msg"] != "":
return f"request SafeLine API failed: {data['msg']}"
if data["err"] is not None and data["err"] != "":
return f"request SafeLine API failed: {data['err']}"
return "success"
async def post_slce_api(path: str,req_body: dict) -> str:
async with AsyncClient(verify=False) as client:
response = await client.post(f"{GLOBAL_CONFIG.SAFELINE_ADDRESS}{path}", json=req_body, headers={
"X-SLCE-API-TOKEN": f"{GLOBAL_CONFIG.SAFELINE_API_TOKEN}"
})
return check_slce_response(response)

0
slmcp/uv.lock generated Normal file
View File