Files
SafeLine/mcp_server/tools/create_ip_custom_rule.py

55 lines
1.6 KiB
Python
Raw Normal View History

2025-04-03 16:21:13 +08:00
from pydantic import BaseModel, Field
from utils.request import post_slce_api
from tools import Tool, ABCTool, tools
2025-04-03 17:44:57 +08:00
import ipaddress
2025-04-03 16:21:13 +08:00
@tools.register
class CreateIPCustomRule(BaseModel, ABCTool):
ip: str = Field(description="request ip to allow or block")
2025-04-03 17:44:57 +08:00
action: int = Field(min=0, max=1,description="1: block, 0: allow")
2025-04-03 16:21:13 +08:00
@classmethod
async def run(self, arguments:dict) -> str:
2025-04-03 17:44:57 +08:00
try:
req = CreateIPCustomRule.model_validate(arguments)
ipaddress.ip_address(req.ip)
except Exception as e:
return str(e)
2025-04-03 16:21:13 +08:00
name = ""
2025-04-03 17:44:57 +08:00
match req.action:
2025-04-03 16:21:13 +08:00
case 0:
name += "allow "
case 1:
name += "block "
case _:
return "invalid action"
2025-04-03 17:44:57 +08:00
if not req.ip or req.ip == "":
2025-04-03 16:21:13 +08:00
return "ip is required"
2025-04-03 17:44:57 +08:00
name += f"ip: {req.ip}"
2025-04-03 16:21:13 +08:00
return await post_slce_api("/api/open/policy",{
"name": name,
"is_enabled": True,
"pattern": [
[
{
"k": "src_ip",
"op": "eq",
2025-04-03 17:44:57 +08:00
"v": [req.ip],
2025-04-03 16:21:13 +08:00
"sub_k": ""
},
]
],
2025-04-03 17:44:57 +08:00
"action": req.action
2025-04-03 16:21:13 +08:00
})
@classmethod
def tool(self) -> Tool:
return Tool(
2025-04-07 07:41:42 +00:00
name="waf_create_ip_custom_rule",
description="以 客户端 IP 地址为条件,在雷池 WAF 上创建一个黑/白名单",
2025-04-03 16:21:13 +08:00
inputSchema=self.model_json_schema()
)