Remove specs directory from version control

This commit is contained in:
yyhuni
2026-01-23 09:14:46 +08:00
parent 25ae325c69
commit c6dcfb0a5b
12 changed files with 34 additions and 2903 deletions

5
.gitignore vendored
View File

@@ -38,3 +38,8 @@ tmp/
temp/
.kiro/
# AI Assistant directories
codex/
openspec/
specs/

18
AGENTS.md Normal file
View File

@@ -0,0 +1,18 @@
<!-- OPENSPEC:START -->
# OpenSpec Instructions
These instructions are for AI assistants working in this project.
Always open `@/openspec/AGENTS.md` when the request:
- Mentions planning or proposals (words like proposal, spec, change, plan)
- Introduces new capabilities, breaking changes, architecture shifts, or big performance/security work
- Sounds ambiguous and you need the authoritative spec before coding
Use `@/openspec/AGENTS.md` to learn:
- How to create and apply change proposals
- Spec format and conventions
- Project structure and guidelines
Keep this managed block so 'openspec update' can refresh the instructions.
<!-- OPENSPEC:END -->

View File

@@ -1,2 +1,13 @@
model_provider = "custom"
model = "gpt-5.2-codex"
model_reasoning_effort = "high"
disable_response_storage = true
[model_providers.custom]
name = "custom"
base_url = "https://www.cursorhub.cloud/openai"
wire_api = "responses"
requires_openai_auth = true
[projects."/Users/yangyang/Desktop/orbit"]
trust_level = "trusted"

View File

@@ -1,60 +0,0 @@
# Specification Quality Checklist: WebSocket Agent System
**Purpose**: Validate specification completeness and quality before proceeding to planning
**Created**: 2026-01-21
**Feature**: [spec.md](../spec.md)
## Content Quality
- [x] No implementation details (languages, frameworks, APIs)
- [x] Focused on user value and business needs
- [x] Written for non-technical stakeholders
- [x] All mandatory sections completed
## Requirement Completeness
- [x] No [NEEDS CLARIFICATION] markers remain
- [x] Requirements are testable and unambiguous
- [x] Success criteria are measurable
- [x] Success criteria are technology-agnostic (no implementation details)
- [x] All acceptance scenarios are defined
- [x] Edge cases are identified
- [x] Scope is clearly bounded
- [x] Dependencies and assumptions identified
## Feature Readiness
- [x] All functional requirements have clear acceptance criteria
- [x] User scenarios cover primary flows
- [x] Feature meets measurable outcomes defined in Success Criteria
- [x] No implementation details leak into specification
## Validation Results
**All quality checks passed!**
### Content Quality Assessment
- The specification focuses on user needs and business value (Agent deployment, task execution, load monitoring)
- No implementation details in user stories (Docker, Go, WebSocket are mentioned only in technical requirements section where appropriate)
- Written in plain language accessible to non-technical stakeholders
- All mandatory sections (User Scenarios, Requirements, Success Criteria) are complete
### Requirement Completeness Assessment
- No [NEEDS CLARIFICATION] markers present
- All 20 functional requirements are specific and testable
- Success criteria include measurable metrics (time, percentage, counts)
- Edge cases comprehensively identified (network issues, resource constraints, concurrent operations)
- Scope clearly defined with "Out of Scope" section
- Dependencies and assumptions explicitly listed
### Feature Readiness Assessment
- 5 user stories prioritized (P1-P3) with independent test scenarios
- Each user story includes clear acceptance criteria in Given-When-Then format
- Success criteria are measurable and technology-agnostic
- Technical details appropriately separated into Requirements section
## Notes
The specification is ready for the next phase. You can proceed with:
- `/speckit.plan` - Create technical implementation plan
- `/speckit.clarify` - Ask clarification questions (optional, no clarifications needed)

View File

@@ -1,237 +0,0 @@
openapi: 3.0.3
info:
title: Agent HTTP API
description: HTTP API endpoints for Agent to interact with Server
version: 1.0.0
contact:
name: Orbit Team
servers:
- url: https://{server}/api/agent
description: Server API endpoint
variables:
server:
default: localhost:8080
description: Server address
security:
- AgentKeyAuth: []
paths:
/tasks/pull:
post:
summary: Pull a task from the queue
description: |
Agent calls this endpoint to pull the highest priority task from the queue.
Uses PostgreSQL row-level locking (FOR UPDATE SKIP LOCKED) to ensure
tasks are not assigned to multiple agents.
operationId: pullTask
responses:
'200':
description: Task assigned successfully
content:
application/json:
schema:
$ref: '#/components/schemas/TaskAssignment'
example:
taskId: 123
scanId: 456
stage: 0
workflowName: subdomain_discovery
targetId: 789
targetName: example.com
targetType: domain
workspaceDir: /opt/orbit/results/scan_456
config: |
subdomain_discovery:
recon:
enabled: true
bruteforce:
enabled: true
workerImage: yyhuni/orbit-worker:v1.0.19
'204':
description: No tasks available
'401':
$ref: '#/components/responses/Unauthorized'
'500':
$ref: '#/components/responses/InternalError'
/tasks/{taskId}/status:
patch:
summary: Update task status
description: |
Agent calls this endpoint to update task status after:
- Worker exits successfully (running → completed)
- Worker exits with error (running → failed)
- Task is cancelled (running → cancelled)
Note: pending → running is handled by Server when Agent pulls the task.
operationId: updateTaskStatus
parameters:
- name: taskId
in: path
required: true
schema:
type: integer
description: Task ID
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/TaskStatusUpdate'
examples:
completed:
summary: Worker completed successfully
value:
status: completed
failed:
summary: Worker failed
value:
status: failed
errorMessage: |
Container exited with code 1
Last 100 lines of logs:
[ERROR] Failed to resolve domain
[ERROR] DNS timeout after 30s
responses:
'200':
description: Status updated successfully (idempotent - same status returns 200)
'400':
description: Invalid status transition (only running→completed/failed/cancelled allowed from Agent)
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'401':
$ref: '#/components/responses/Unauthorized'
'403':
description: Agent does not own this task (agent_id mismatch)
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
example:
error: Task not owned by this agent
'404':
description: Task not found
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'500':
$ref: '#/components/responses/InternalError'
components:
securitySchemes:
AgentKeyAuth:
type: apiKey
in: header
name: X-Agent-Key
description: Agent API Key for authentication
schemas:
TaskAssignment:
type: object
required:
- taskId
- scanId
- stage
- workflowName
- targetId
- targetName
- targetType
- workspaceDir
- config
- workerImage
properties:
taskId:
type: integer
description: Unique task ID
example: 123
scanId:
type: integer
description: Parent scan ID
example: 456
stage:
type: integer
description: Task stage (0 for single workflow)
example: 0
workflowName:
type: string
description: Workflow name (e.g., subdomain_discovery)
example: subdomain_discovery
targetId:
type: integer
description: Target ID
example: 789
targetName:
type: string
description: Target name (domain, IP, etc.)
example: example.com
targetType:
type: string
description: Target type
enum: [domain, ip, cidr]
example: domain
workspaceDir:
type: string
description: Workspace directory path for results
example: /opt/orbit/results/scan_456
config:
type: string
description: YAML configuration for the workflow
example: |
subdomain_discovery:
recon:
enabled: true
workerImage:
type: string
description: Docker image name for Worker
example: yyhuni/orbit-worker:v1.0.19
TaskStatusUpdate:
type: object
required:
- status
properties:
status:
type: string
enum: [completed, failed, cancelled]
description: New task status (Agent can only report terminal states)
errorMessage:
type: string
description: Error message (required if status=failed, max 4KB)
maxLength: 4096
example: |
Container exited with code 1
Last 100 lines of logs:
[ERROR] Failed to resolve domain
Error:
type: object
required:
- error
properties:
error:
type: string
description: Error message
example: Invalid status transition
responses:
Unauthorized:
description: Authentication failed
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
example:
error: Invalid or missing API key
InternalError:
description: Internal server error
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
example:
error: Internal server error

View File

@@ -1,300 +0,0 @@
openapi: 3.0.3
info:
title: Agent Management API
description: HTTP API for managing Agents (for Web UI / Admin)
version: 1.0.0
servers:
- url: https://{server}/api
description: Server API endpoint
security:
- BearerAuth: []
paths:
/agents:
post:
summary: Create a new Agent
description: Creates a new Agent record and generates an API Key
operationId: createAgent
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/CreateAgentRequest'
responses:
'201':
description: Agent created successfully
content:
application/json:
schema:
$ref: '#/components/schemas/CreateAgentResponse'
'400':
description: Invalid request
'401':
$ref: '#/components/responses/Unauthorized'
get:
summary: List all Agents
description: Returns a paginated list of all Agents
operationId: listAgents
parameters:
- name: page
in: query
schema:
type: integer
default: 1
- name: pageSize
in: query
schema:
type: integer
default: 20
- name: status
in: query
schema:
type: string
enum: [pending, online, offline]
responses:
'200':
description: List of Agents
content:
application/json:
schema:
$ref: '#/components/schemas/AgentListResponse'
'401':
$ref: '#/components/responses/Unauthorized'
/agents/{id}:
get:
summary: Get Agent by ID
operationId: getAgent
parameters:
- name: id
in: path
required: true
schema:
type: integer
responses:
'200':
description: Agent details
content:
application/json:
schema:
$ref: '#/components/schemas/AgentResponse'
'404':
description: Agent not found
delete:
summary: Delete Agent
operationId: deleteAgent
parameters:
- name: id
in: path
required: true
schema:
type: integer
responses:
'204':
description: Agent deleted
'404':
description: Agent not found
/agents/{id}/config:
put:
summary: Update Agent configuration
description: Update scheduling configuration (maxTasks, thresholds)
operationId: updateAgentConfig
parameters:
- name: id
in: path
required: true
schema:
type: integer
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/UpdateAgentConfigRequest'
responses:
'200':
description: Configuration updated
'404':
description: Agent not found
/agents/{id}/regenerate-key:
post:
summary: Regenerate Agent API Key
description: Generates a new API Key for the Agent (old key becomes invalid)
operationId: regenerateAgentKey
parameters:
- name: id
in: path
required: true
schema:
type: integer
responses:
'200':
description: New API Key generated
content:
application/json:
schema:
type: object
properties:
apiKey:
type: string
'404':
description: Agent not found
/agents/install.sh:
get:
summary: Get installation script
description: |
Returns a bash script for one-click Agent installation.
The script embeds `SERVER_URL` derived by:
1) `PUBLIC_URL` config if set; otherwise
2) the request URL (Host/Proto/Port).
operationId: getInstallScript
parameters:
- name: key
in: query
required: true
schema:
type: string
description: Agent API Key (embedded in script)
responses:
'200':
description: Installation script
content:
text/plain:
schema:
type: string
components:
securitySchemes:
BearerAuth:
type: http
scheme: bearer
bearerFormat: JWT
schemas:
CreateAgentRequest:
type: object
required:
- name
properties:
name:
type: string
description: User-defined name for the Agent
example: "vps-us-east-1"
CreateAgentResponse:
type: object
properties:
id:
type: integer
example: 1
name:
type: string
example: "vps-us-east-1"
apiKey:
type: string
description: Generated API Key (only shown once)
example: "agent_abc123def456..."
status:
type: string
enum: [pending]
example: "pending"
installCommand:
type: string
description: One-click install command
example: "curl -sSL https://server/api/agents/install.sh?key=agent_abc123 | bash"
installScriptUrl:
type: string
description: URL to download install script
example: "https://server/api/agents/install.sh?key=agent_abc123"
createdAt:
type: string
format: date-time
AgentResponse:
type: object
properties:
id:
type: integer
name:
type: string
status:
type: string
enum: [pending, online, offline]
hostname:
type: string
ipAddress:
type: string
version:
type: string
maxTasks:
type: integer
cpuThreshold:
type: integer
memThreshold:
type: integer
diskThreshold:
type: integer
connectedAt:
type: string
format: date-time
lastHeartbeat:
type: string
format: date-time
createdAt:
type: string
format: date-time
AgentListResponse:
type: object
properties:
results:
type: array
items:
$ref: '#/components/schemas/AgentResponse'
total:
type: integer
page:
type: integer
pageSize:
type: integer
UpdateAgentConfigRequest:
type: object
properties:
maxTasks:
type: integer
minimum: 1
maximum: 20
cpuThreshold:
type: integer
minimum: 1
maximum: 100
memThreshold:
type: integer
minimum: 1
maximum: 100
diskThreshold:
type: integer
minimum: 1
maximum: 100
responses:
Unauthorized:
description: Authentication required
content:
application/json:
schema:
type: object
properties:
error:
type: string
example:
error: "Authorization header required"

View File

@@ -1,243 +0,0 @@
openapi: 3.0.3
info:
title: Agent HTTP API
description: HTTP API for Agent to pull tasks and update status
version: 1.0.0
servers:
- url: https://{server}/api/agent
description: Server API endpoint
variables:
server:
default: localhost:8080
security:
- AgentKey: []
paths:
/tasks/pull:
post:
summary: Pull a pending task
description: |
Agent pulls a pending task from the queue.
Server uses PostgreSQL row-level locking (FOR UPDATE SKIP LOCKED) to ensure atomic assignment.
Returns 204 if no tasks available.
operationId: pullTask
responses:
'200':
description: Task assigned successfully
content:
application/json:
schema:
$ref: '#/components/schemas/TaskResponse'
'204':
description: No tasks available
'401':
description: Authentication failed
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'500':
description: Internal server error
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
/tasks/{taskId}/status:
patch:
summary: Update task status
description: |
Agent updates task status after Worker execution.
Server validates:
- Agent ownership (agent_id matches)
- Valid state transitions (pending→running, running→completed/failed/cancelled)
- Idempotency (duplicate status updates return 200)
operationId: updateTaskStatus
parameters:
- name: taskId
in: path
required: true
schema:
type: integer
description: Task ID
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/StatusUpdateRequest'
responses:
'200':
description: Status updated successfully
content:
application/json:
schema:
$ref: '#/components/schemas/StatusUpdateResponse'
'400':
description: Invalid status transition
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'401':
description: Authentication failed
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'403':
description: Agent does not own this task
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'404':
description: Task not found
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'500':
description: Internal server error
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
components:
securitySchemes:
AgentKey:
type: apiKey
in: header
name: X-Agent-Key
description: Agent API key for authentication
schemas:
TaskResponse:
type: object
required:
- taskId
- scanId
- workflowName
- version
- config
properties:
taskId:
type: integer
description: Task ID
example: 123
scanId:
type: integer
description: Scan ID
example: 456
workflowName:
type: string
description: Workflow name (e.g., subdomain_discovery)
example: "subdomain_discovery"
version:
type: string
description: Worker version (Agent constructs image name as yyhuni/orbit-worker:v{version})
example: "v1.5.12-dev"
config:
type: string
description: YAML configuration for the task
example: |
target: example.com
depth: 3
timeout: 3600
StatusUpdateRequest:
type: object
required:
- status
properties:
status:
type: string
enum: [running, completed, failed, cancelled]
description: New task status
example: "completed"
errorMessage:
type: string
maxLength: 4096
description: Error message (required if status=failed, truncated to 4KB)
example: "Worker container exited with code 1: connection timeout"
StatusUpdateResponse:
type: object
required:
- taskId
- status
properties:
taskId:
type: integer
description: Task ID
example: 123
status:
type: string
description: Updated status
example: "completed"
Error:
type: object
required:
- error
properties:
error:
type: string
description: Error message
example: "Invalid status transition"
x-api-docs:
authentication:
description: |
All API requests must include the X-Agent-Key header with the Agent's API key.
The API key is generated when creating an Agent via the Web UI.
task-pull-flow:
description: |
1. Agent sends POST /api/agent/tasks/pull
2. Server uses FOR UPDATE SKIP LOCKED to atomically select a pending task
3. Server updates task status to 'running' and sets agent_id
4. Server returns task details including version
5. Agent constructs Worker image name: yyhuni/orbit-worker:v{version}
6. Agent starts Worker container with the constructed image name
7. If no tasks available, Server returns 204 and Agent backs off
status-update-flow:
description: |
1. Worker container exits
2. Agent reads exit code and logs
3. Agent sends PATCH /api/agent/tasks/{taskId}/status
4. Server validates agent_id ownership
5. Server validates status transition
6. Server updates scan_task.status and scan.status
7. Server returns 200 on success
error-handling:
ownership-validation: |
Server checks that the requesting Agent owns the task (agent_id matches).
Returns 403 if ownership check fails.
idempotency: |
Duplicate status updates (same status) return 200 without error.
This handles network retries gracefully.
state-transitions: |
Valid transitions:
- pending → running (task pull)
- running → completed (Worker exit code 0)
- running → failed (Worker exit code ≠ 0)
- running → cancelled (user cancellation)
Invalid transitions return 400.
pull-strategy:
description: |
Agent pull strategy with backoff:
1. Receive task_available WebSocket notification → pull immediately
2. Pull returns 204 (no tasks) → backoff: 5s, 10s, 30s, max 60s
3. Receive new task_available → reset backoff, pull immediately
This reduces unnecessary polling while maintaining responsiveness.

View File

@@ -1,312 +0,0 @@
openapi: 3.0.3
info:
title: WebSocket Message Protocol
description: WebSocket message protocol between Agent and Server
version: 1.0.0
paths: {}
components:
schemas:
# Base message structure
Message:
type: object
required:
- type
- payload
- timestamp
properties:
type:
type: string
description: Message type
payload:
type: object
description: Message payload (type-specific)
timestamp:
type: string
format: date-time
description: Message timestamp (ISO 8601)
example: "2026-01-21T10:30:00Z"
# Agent -> Server messages
HeartbeatMessage:
allOf:
- $ref: '#/components/schemas/Message'
- type: object
properties:
type:
type: string
enum: [heartbeat]
payload:
$ref: '#/components/schemas/HeartbeatPayload'
example:
type: heartbeat
payload:
cpu: 45.2
mem: 62.1
disk: 78.5
tasks: 2
version: "1.0.0"
hostname: "vps-1"
uptime: 86400
timestamp: "2026-01-21T10:30:00Z"
HeartbeatPayload:
type: object
required:
- cpu
- mem
- disk
- tasks
- version
- hostname
- uptime
properties:
cpu:
type: number
format: float
minimum: 0
maximum: 100
description: CPU usage percentage
example: 45.2
mem:
type: number
format: float
minimum: 0
maximum: 100
description: Memory usage percentage
example: 62.1
disk:
type: number
format: float
minimum: 0
maximum: 100
description: Disk usage percentage
example: 78.5
tasks:
type: integer
minimum: 0
description: Number of running tasks
example: 2
version:
type: string
description: Agent version
example: "1.0.0"
hostname:
type: string
description: Agent hostname
example: "vps-1"
uptime:
type: integer
minimum: 0
description: Agent uptime in seconds
example: 86400
# Server -> Agent messages
TaskAvailableMessage:
allOf:
- $ref: '#/components/schemas/Message'
- type: object
properties:
type:
type: string
enum: [task_available]
payload:
type: object
description: Empty payload (notification only)
example:
type: task_available
payload: {}
timestamp: "2026-01-21T10:30:00Z"
TaskCancelMessage:
allOf:
- $ref: '#/components/schemas/Message'
- type: object
properties:
type:
type: string
enum: [task_cancel]
payload:
$ref: '#/components/schemas/TaskCancelPayload'
example:
type: task_cancel
payload:
taskId: 123
timestamp: "2026-01-21T10:30:00Z"
TaskCancelPayload:
type: object
required:
- taskId
properties:
taskId:
type: integer
description: Task ID to cancel
example: 123
ConfigUpdateMessage:
allOf:
- $ref: '#/components/schemas/Message'
- type: object
properties:
type:
type: string
enum: [config_update]
payload:
$ref: '#/components/schemas/ConfigUpdatePayload'
example:
type: config_update
payload:
maxTasks: 10
cpuThreshold: 90
memThreshold: 90
diskThreshold: 95
timestamp: "2026-01-21T10:30:00Z"
ConfigUpdatePayload:
type: object
properties:
maxTasks:
type: integer
minimum: 1
maximum: 20
description: Maximum concurrent tasks
example: 10
cpuThreshold:
type: integer
minimum: 1
maximum: 100
description: CPU threshold percentage
example: 90
memThreshold:
type: integer
minimum: 1
maximum: 100
description: Memory threshold percentage
example: 90
diskThreshold:
type: integer
minimum: 1
maximum: 100
description: Disk threshold percentage
example: 95
UpdateRequiredMessage:
allOf:
- $ref: '#/components/schemas/Message'
- type: object
properties:
type:
type: string
enum: [update_required]
payload:
$ref: '#/components/schemas/UpdateRequiredPayload'
example:
type: update_required
payload:
version: "1.0.19"
image: "yyhuni/orbit-agent"
timestamp: "2026-01-21T10:30:00Z"
UpdateRequiredPayload:
type: object
required:
- version
- image
properties:
version:
type: string
description: New version to update to
example: "1.0.19"
image:
type: string
description: Docker image name (without tag)
example: "yyhuni/orbit-agent"
PingMessage:
allOf:
- $ref: '#/components/schemas/Message'
- type: object
properties:
type:
type: string
enum: [ping]
payload:
type: object
description: Empty payload
example:
type: ping
payload: {}
timestamp: "2026-01-21T10:30:00Z"
# Protocol Documentation
x-protocol-docs:
connection:
endpoint: wss://{server}/api/agents/ws
authentication:
- X-Agent-Key header
- Query parameter: ?key=<api_key>
description: |
Agent initiates WebSocket connection to Server.
Server validates API Key (from header or query) and registers the connection.
Query parameter is supported for environments that don't allow custom WebSocket headers.
heartbeat:
interval: 5 seconds
timeout: 120 seconds
description: |
Agent sends heartbeat every 5 seconds.
Server marks Agent offline if no heartbeat for 120 seconds.
message-flow:
agent-to-server:
- heartbeat: Periodic system metrics
server-to-agent:
- task_available: Notify Agent of new tasks (optional optimization)
- task_cancel: Cancel a running task
- config_update: Update Agent configuration
- update_required: Trigger Agent self-update
- ping: Keep-alive check
reconnection:
strategy: Exponential backoff
initial-delay: 1 second
max-delay: 60 seconds
backoff-multiplier: 2
description: |
Agent reconnects with exponential backoff on connection loss:
1s, 2s, 4s, 8s, 16s, 32s, 60s (max)
error-handling:
authentication-failure:
action: Log error and exit
reason: Invalid API Key cannot be recovered
connection-lost:
action: Automatic reconnection with backoff
message-parse-error:
action: Log error and ignore message
examples:
connection-flow: |
1. Agent connects to wss://server/api/agents/ws
2. Server validates X-Agent-Key header
3. Server sends config_update with initial config
4. Agent starts sending heartbeat every 5s
5. Server sends task_available when tasks are ready
6. Agent pulls tasks via HTTP API
update-flow: |
1. Agent sends heartbeat with version="1.0.8"
2. Server detects version mismatch (expects 1.0.19)
3. Server sends update_required message
4. Agent pulls new image: orbit-agent:1.0.19
5. Agent starts new container with same config
6. Agent exits current process
7. New Agent connects and reports version="1.0.19"
cancel-flow: |
1. User clicks "Cancel" in Web UI
2. Server sends task_cancel via WebSocket
3. Agent stops Worker container
4. Agent updates task status to "cancelled" via HTTP API

View File

@@ -1,411 +0,0 @@
# Data Model: WebSocket Agent System
**Feature**: WebSocket Agent System
**Branch**: 001-websocket-agent
**Date**: 2026-01-21
## Overview
本文档定义 WebSocket Agent 系统的数据模型,包括数据库表结构、实体关系和状态机。
## Database Schema
### 1. agent 表
Agent 元数据和配置信息。
```sql
CREATE TABLE agent (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL, -- 自动生成(如 "agent-hostname"
api_key VARCHAR(8) NOT NULL UNIQUE, -- 认证密钥8字符hex4字节随机
status VARCHAR(20) DEFAULT 'online', -- online/offline注册时直接 online
hostname VARCHAR(255), -- 主机名
ip_address VARCHAR(45), -- 最后连接的 IP
version VARCHAR(20), -- Agent 版本号
-- 调度配置(可通过 API 动态修改)
max_tasks INT DEFAULT 5, -- 最大并发任务数
cpu_threshold INT DEFAULT 85, -- CPU 负载阈值 (%)
mem_threshold INT DEFAULT 85, -- 内存负载阈值 (%)
disk_threshold INT DEFAULT 90, -- 磁盘空间阈值 (%)
-- 自注册相关
registration_token VARCHAR(8), -- 注册时使用的 token用于审计
-- 时间戳
connected_at TIMESTAMP, -- 最后连接时间
last_heartbeat TIMESTAMP, -- 最后心跳时间
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_agent_status ON agent(status);
CREATE INDEX idx_agent_api_key ON agent(api_key);
```
**字段说明**
- `name`: 自动生成(格式:`agent-{hostname}`
- `api_key`: 8 字符的随机字符串,用于 WebSocket 和 HTTP API 认证
- `status`:
- `online`: Agent 已连接,心跳正常
- `offline`: Agent 断开连接或心跳超时(>120 秒)
- `hostname`: Agent 上报的主机名
- `ip_address`: 从 WebSocket 连接中提取的客户端 IP
- `version`: Agent 上报的版本号,用于自动更新判断
- `registration_token`: 注册时使用的 token用于审计追溯
### 2. registration_token 表
注册令牌表,用于控制 Agent 自注册的准入权限。
```sql
CREATE TABLE registration_token (
id SERIAL PRIMARY KEY,
token VARCHAR(8) NOT NULL UNIQUE, -- 注册令牌8字符hex
expires_at TIMESTAMP NOT NULL DEFAULT (NOW() + INTERVAL '1 hour'), -- 过期时间固定1小时
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_registration_token_token ON registration_token(token);
CREATE INDEX idx_registration_token_expires ON registration_token(expires_at);
```
**字段说明**
- `token`: 8 字符的随机字符串hex用于 Agent 注册时的身份验证
- `expires_at`: 过期时间,固定为创建后 1 小时
- `created_at`: 创建时间
**使用场景**
- 批量部署:创建 token 后 1 小时内完成所有 Agent 部署
- Token 在 1 小时内可以无限次使用
- 过期后需要重新生成新的 token
### 3. scan_task 表
任务队列,支持优先级调度。
```sql
CREATE TABLE scan_task (
id SERIAL PRIMARY KEY,
scan_id INT NOT NULL REFERENCES scan(id) ON DELETE CASCADE,
stage INT NOT NULL DEFAULT 0, -- 当前阶段(单 workflow 时固定为 0
workflow_name VARCHAR(100) NOT NULL, -- e.g. 'subdomain_discovery'
status VARCHAR(20) DEFAULT 'pending', -- pending/running/completed/failed/cancelled
version VARCHAR(20), -- Worker 版本号(从 VERSION 文件读取)
-- 分配信息
agent_id INT REFERENCES agent(id), -- 分配给哪个 Agent
config TEXT, -- YAML 配置
error_message VARCHAR(4096), -- 错误信息Agent 截断,对齐 K8s termination message
-- 重试控制
retry_count INT DEFAULT 0, -- 已重试次数
-- 时间戳
created_at TIMESTAMP DEFAULT NOW(),
started_at TIMESTAMP, -- Worker 启动时间
completed_at TIMESTAMP -- 完成时间
);
CREATE INDEX idx_scan_task_pending_order ON scan_task(status, stage DESC, created_at ASC);
CREATE INDEX idx_scan_task_agent_id ON scan_task(agent_id);
CREATE INDEX idx_scan_task_scan_id ON scan_task(scan_id);
```
**字段说明**
- `stage`: 任务阶段,用于多 workflow 串联(当前固定为 0
- `workflow_name`: 工作流名称,对应 Worker 的 templates.yaml 中的定义
- `status`: 任务状态pending/running/completed/failed/cancelled
- `version`: Worker 版本号Server 从 VERSION 文件读取Agent 拼接镜像名称yyhuni/orbit-worker:v{VERSION}
- `agent_id`: 分配给哪个 Agent拉取时写入
- `config`: YAML 格式的任务配置
### 4. Agent 自注册流程
系统采用 **Token 控制的自注册模式**Agent 通过注册令牌自动注册到系统。
**完整流程**
```
1. Admin 生成注册 Token
→ POST /api/registration-tokens
→ Server 返回 token 和安装命令token 1小时后过期
2. Admin 批量部署
→ for vps in vps-{1..100}; do
ssh $vps "curl install.sh | bash -s <token>"
done
3. Agent 使用 Token 注册
→ POST /api/agents/register
{ "token": "<token>", "hostname": "vps-1", "version": "1.0.0" }
→ Server 验证 token检查是否过期
→ Server 生成专属 api_key创建 Agent 记录status=online
→ 返回 api_key 给 Agent
4. Agent 保存 api_key 并发送心跳
→ Agent 将 api_key 保存到本地配置文件
→ 使用 api_key 建立 WebSocket 连接
→ 发送心跳数据
```
**Token 生命周期**
```
创建 Token (默认1小时后过期)
VPS-1 注册 → 返回 key_001
VPS-2 注册 → 返回 key_002
...
VPS-N 注册 → 返回 key_N (在过期前可无限次使用)
Token 过期 → 新注册请求被拒绝
已注册的 Agent 继续使用各自的 api_key 正常工作
```
**Token 验证逻辑**
```sql
-- 验证 token 是否有效
SELECT * FROM registration_token
WHERE token = $1
AND expires_at > NOW();
```
**适用场景**
- 批量部署(一个 token 部署多台服务器)
- 快速上手(一键安装命令)
- 公网环境(需要准入控制)
- 临时授权(设置过期时间)
### 5. Redis 缓存结构
#### 心跳数据
```
Key: agent:{agent_id}:heartbeat
Value: {
"cpu": 45.2, // CPU 使用率 (0-100)
"mem": 62.1, // 内存使用率 (0-100)
"disk": 78.5, // 磁盘使用率 (0-100)
"tasks": 2, // 运行中任务数
"version": "1.0.0", // Agent 版本
"hostname": "vps-1", // 主机名
"uptime": 86400, // 运行时长(秒)
"updated_at": "2026-01-21T10:30:00Z"
}
TTL: 15 秒
```
## Entity Relationships
```
┌──────────────────────┐
│ RegistrationToken │
│ (注册令牌) │
└──────────┬───────────┘
│ 1
│ (用于注册)
│ N
┌─────────────────┐
│ Agent │
│ (常驻服务) │
└────────┬────────┘
│ 1
│ N
┌─────────────────┐ ┌─────────────────┐
│ ScanTask │ N 1 │ Scan │
│ (任务队列) │◀────────│ (扫描记录) │
└─────────────────┘ └─────────────────┘
│ depends_on (预留)
┌─────────────────┐
│ ScanTask │
│ (下游任务) │
└─────────────────┘
```
**关系说明**
- 一个 RegistrationToken 可以用于注册多个 Agent在过期前无限次使用
- 一个 Agent 可以执行多个 ScanTask
- 一个 Scan 对应一个或多个 ScanTask当前为 1:1未来支持 1:N
- ScanTask 之间可以有依赖关系(预留,当前未使用)
## State Machines
### Agent 状态机
```
online ──────────────→ offline
(心跳超时/断开)
└──────────────────→ online
(重连成功)
```
**状态转换规则**
- Agent 注册时直接创建为 `online` 状态
- `online → offline`: 120 秒未收到心跳或连接断开
- `offline → online`: Agent 重连成功
### ScanTask 状态机
**统一状态**scan.status 和 scan_task.status 使用相同的 5 个状态):
```
pending ────────────────→ running ───────────────→ completed
(Agent 拉取并启动) (退出码=0)
├──→ failed
│ (退出码≠0)
└──→ cancelled
(用户取消)
```
**状态转换职责**
- **Server**:
- 创建 scan_task: `status=pending`
- 分配任务Agent pull 成功): `pending → running`(分配和启动合并为一步)
- 任务回收Agent 离线): `running → pending`(重试)或 `→ failed`(超过重试次数)
- **Agent**:
- Worker 退出: `running → completed/failed/cancelled`
**未来多 workflow 扩展**(预留):
- 依赖通过查询条件过滤,不新增状态
### 任务回收机制Agent 离线时)
当 Agent 离线(心跳超时 >120 秒)时,其名下所有 `running` 状态的任务需要回收:
**回收规则**
- `retry_count < 3`: 重置为 `pending``retry_count += 1`,重新进入队列
- `retry_count >= 3`: 标记为 `failed`,错误信息为 "Agent lost, max retries exceeded"
**后台 Job 逻辑**(每分钟执行一次):
```sql
-- Step 1: 标记心跳超时的 Agent 为 offline
UPDATE agent
SET status = 'offline'
WHERE status = 'online'
AND last_heartbeat < NOW() - INTERVAL '120 seconds';
-- Step 2: 回收离线 Agent 的任务
UPDATE scan_task
SET
status = CASE WHEN retry_count >= 3 THEN 'failed' ELSE 'pending' END,
agent_id = NULL,
retry_count = retry_count + 1,
error_message = CASE WHEN retry_count >= 3 THEN 'Agent lost, max retries exceeded' ELSE NULL END
WHERE status = 'running'
AND agent_id IN (SELECT id FROM agent WHERE status = 'offline');
-- Step 3: 同步更新受影响的 scan.status状态已统一直接复制
UPDATE scan s
SET status = t.status
FROM scan_task t
WHERE s.id = t.scan_id
AND t.status IN ('pending', 'failed')
AND s.status NOT IN ('completed', 'failed', 'cancelled');
```
**设计理由**
- Agent 离线 ≠ 任务本身有问题,换个 Agent 跑大概率能成功
- 重试次数限制防止无限循环
- 扫描任务是幂等的,偶发重复执行不会造成数据错误
## Data Validation Rules
### Agent
- `name`: 1-100 字符,非空,自动生成格式:`agent-{hostname}`
- `api_key`: 8 字符,唯一,自动生成
- `status`: 枚举值 `online|offline`
- `max_tasks`: 1-100默认 5
- `cpu_threshold`: 1-100默认 85
- `mem_threshold`: 1-100默认 85
- `disk_threshold`: 1-100默认 90
- `registration_token`: 8 字符(可选,仅自注册时填充)
### RegistrationToken
- `token`: 8 字符唯一自动生成hex
- `expires_at`: 时间戳,必填,默认创建后 1 小时过期
- **业务规则**
- 过期的 token 不能用于注册
- Token 在过期前可以无限次使用
### ScanTask
- `workflow_name`: 非空,必须在 Worker templates.yaml 中定义
- `status`: 枚举值 `pending|running|completed|failed|cancelled`
- `config`: 有效的 YAML 格式
- `error_message`: 最大 4KBAgent 截断,对齐 K8s termination message 限制)
## Indexes and Performance
### 关键索引
- `idx_scan_task_pending_order`: 支持任务拉取查询(按 stage DESC, created_at ASC 排序)
- `idx_agent_status`: 支持在线 Agent 查询
- `idx_scan_task_agent_id`: 支持按 Agent 查询任务
### 查询优化
- 任务拉取使用 `FOR UPDATE SKIP LOCKED` 避免锁竞争
- 心跳数据存储在 Redis避免频繁写入 PostgreSQL
- Agent 状态更新使用乐观锁(`updated_at` 检查)
### 任务拉取 SQL 示例
```sql
-- 原子操作:选取一个 pending 任务并设置为 running
WITH selected AS (
SELECT id FROM scan_task
WHERE status = 'pending'
ORDER BY stage DESC, created_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE scan_task t
SET status = 'running', agent_id = $1, started_at = NOW()
FROM selected
WHERE t.id = selected.id
RETURNING t.*;
```
## Migration Strategy
### 新增表
- `agent`: 全新表,替代现有的 `worker_node`
- `scan_task`: 全新表
- `registration_token`: 全新表,用于自注册模式
### 修改现有表
- `scan`: 删除 `worker_id` 字段(任务分配关系移至 `scan_task.agent_id`
- `scan.status``scan_task.status` 同步更新(当前 1:1 映射)
### 弃用表
- `worker_node`: SSH 方式已弃用,由 `agent` 表替代
### 数据迁移
- 新创建的 `scan` 自动创建对应的 `scan_task`(在同一事务中)
- 状态统一迁移:
```sql
-- 将旧状态统一为新的 5 状态模型
UPDATE scan SET status = 'pending' WHERE status IN ('initiated', 'scheduled');
```
## Future Extensions
### 多 Workflow 支持(预留)
- `scan_task.depends_on`: 存储依赖的 task ID 数组
- 依赖通过拉取时的查询条件过滤,不新增状态
### 任务排序规则
- 拉取时按 `ORDER BY stage DESC, created_at ASC` 排序
- 高 stage 优先(让流水线尽快完成)
- 同 stage 内先创建的先执行(防止饿死)
### Agent 分组(可选)
- 新增 `agent_group` 表
- 支持将 Agent 分配到不同的组
- 任务可以指定目标 Agent 组

View File

@@ -1,829 +0,0 @@
# Implementation Plan: WebSocket Agent System
**Branch**: `001-websocket-agent` | **Date**: 2026-01-21 | **Spec**: [spec.md](spec.md)
**Input**: Feature specification from [specs/001-websocket-agent/spec.md](spec.md)
## Summary
实现基于 WebSocket 的轻量级 Agent 系统,替代现有的 SSH 任务分发方式。Agent 作为常驻服务运行在远程 VPS 上,通过 WebSocket 主动连接 Server使用 Pull 模式拉取任务并启动临时 Worker 容器执行扫描。系统支持心跳监控、负载均衡、动态配置更新和自动版本升级。
核心技术方案:
- Agent 使用 Go 编译为单个二进制文件,通过 Docker SDK 管理 Worker 容器
- Server 端使用 PostgreSQL 作为任务队列,通过行级锁保证并发安全
- WebSocket 用于控制通道心跳、配置更新、取消指令HTTP API 用于任务拉取和状态上报
- 采用 Pull 模式而非 Push 模式Agent 根据自身负载主动拉取任务
## Technical Context
**Language/Version**: Go 1.21+
**Primary Dependencies**:
- gorilla/websocket (WebSocket 客户端)
- docker/docker (Docker SDK)
- shirou/gopsutil/v3 (系统负载采集)
- gin-gonic/gin (Server 端 HTTP 框架,已有)
- gorm (Server 端 ORM已有)
**Storage**:
- PostgreSQL (任务队列和 Agent 元数据)
- Redis (心跳数据缓存)
**Testing**:
- Go testing package (单元测试)
- testify (断言库)
- Docker-in-Docker (集成测试)
**Target Platform**: Linux amd64/arm64 (Agent), Linux server (Server)
**Project Type**: Backend service (Agent + Server 扩展)
**Performance Goals**:
- 任务拉取延迟 < 5 秒
- 状态更新延迟 < 2 秒
- 心跳间隔 5 秒
- 支持单 Agent 5+ 并发任务
- Agent 内存占用 < 50MB
**Constraints**:
- Agent 必须在容器 cgroup 限制下正确采集系统负载
- 网络断开后 120 秒内自动重连
- Worker 容器必须 100% 清理,不留僵尸容器
- 数据库锁必须防止任务重复分配
**Scale/Scope**:
- 支持 10+ 个 Agent 同时连接
- 单个 Agent 管理 5+ 个并发 Worker
- 任务队列支持 1000+ 待处理任务
## Constitution Check
*GATE: Must pass before Phase 0 research. Re-check after Phase 1 design.*
### Simplicity Gates
-**Single Language**: Go for Agent, existing Go backend for Server
-**Minimal Dependencies**: 使用成熟的标准库gorilla/websocket, gopsutil, Docker SDK
-**No Over-Engineering**: Pull 模式简单直接,避免复杂的消息队列
-**Standard Patterns**: REST API + WebSocket业界标准架构
### Architecture Gates
-**Clear Separation**: Agent (任务执行) / Server (任务调度) / Worker (扫描执行)
-**Stateless Agent**: Agent 不存储任务状态,所有状态在 Server 端
-**Idempotent Operations**: 任务拉取和状态更新都是幂等的
### No Violations
所有设计决策符合项目宪法要求,无需额外说明。
## Project Structure
### Documentation (this feature)
```text
specs/001-websocket-agent/
├── spec.md # Feature specification (completed)
├── plan.md # This file (in progress)
├── research.md # Technology decisions and rationale
├── data-model.md # Database schema and entity relationships
├── contracts/ # API contracts (OpenAPI specs)
│ ├── agent-api.yaml # Agent HTTP API endpoints
│ └── websocket.yaml # WebSocket message protocol
├── quickstart.md # Development setup guide
└── tasks.md # Implementation tasks (generated by /speckit.tasks)
```
### Source Code (repository root)
```text
agent/ # New Agent project (独立 Go 模块)
├── cmd/
│ └── agent/
│ └── main.go # Agent 入口
├── internal/
│ ├── config/
│ │ └── config.go # 配置管理
│ ├── connection/
│ │ ├── client.go # WebSocket 客户端
│ │ └── reconnect.go # 重连策略
│ ├── heartbeat/
│ │ └── reporter.go # 心跳上报
│ ├── task/
│ │ ├── manager.go # 任务管理
│ │ ├── executor.go # 任务执行Pull 模式)
│ │ └── http_client.go # HTTP API 客户端
│ ├── docker/
│ │ └── runner.go # Docker 容器管理
│ └── system/
│ └── metrics.go # 系统负载采集
├── go.mod
├── go.sum
├── Makefile
└── Dockerfile
server/ # 现有 Server 项目扩展
├── internal/
│ ├── websocket/ # 新增WebSocket 管理
│ │ ├── hub.go # 连接管理中心
│ │ ├── client.go # 单个 Agent 连接
│ │ └── message.go # 消息定义
│ ├── handler/
│ │ ├── agent_ws.go # 新增WebSocket 端点
│ │ └── agent_task.go # 新增Agent 任务 API
│ ├── service/
│ │ ├── agent_service.go # 新增Agent 管理服务
│ │ └── task_dispatcher.go # 新增:任务分发服务
│ ├── repository/
│ │ ├── agent_repository.go # 新增Agent 数据访问
│ │ └── scan_task_repository.go # 新增:任务队列数据访问
│ └── model/
│ ├── agent.go # 新增Agent 模型
│ └── scan_task.go # 新增ScanTask 模型
└── migrations/
└── 00X_add_agent_tables.sql # 新增:数据库迁移
scripts/
└── install-agent.sh # 新增Agent 一键安装脚本
```
**Structure Decision**:
- Agent 作为独立的 Go 模块(`agent/`),可以单独编译和分发
- Server 端扩展现有项目结构,添加 WebSocket 和 Agent 管理功能
- 遵循现有的分层架构handler → service → repository → model
## Complexity Tracking
无复杂性违规需要说明。所有设计决策都遵循简单性原则。
## Phase 0: Research & Technology Decisions
### 1. WebSocket Library Selection
**Decision**: gorilla/websocket
**Rationale**:
- Go 社区事实标准,成熟稳定
- 支持自动重连和心跳检测
- 性能优秀,内存占用低
- 与 Gin 框架集成良好
**Alternatives Considered**:
- nhooyr.io/websocket: 更现代但社区较小
- golang.org/x/net/websocket: 官方库但功能较少
### 2. Docker SDK vs CLI
**Decision**: Docker SDK (github.com/docker/docker/client)
**Rationale**:
- 原生 Go API类型安全
- 更好的错误处理和日志获取
- 避免解析 CLI 输出
- 支持流式日志读取
**Alternatives Considered**:
- Docker CLI: 需要解析文本输出,不够可靠
- Containerd API: 过于底层,增加复杂度
### 3. Task Queue: PostgreSQL vs Redis
**Decision**: PostgreSQL with row-level locking
**Rationale**:
- 任务持久化,不怕丢失
- 支持复杂查询(优先级、依赖关系)
- ACID 事务保证
- `FOR UPDATE SKIP LOCKED` 完美支持并发拉取
- 扫描任务是分钟级,不需要 Redis 的毫秒级性能
**Alternatives Considered**:
- Redis: 性能过剩,持久化不可靠
- RabbitMQ/Kafka: 过度工程,增加运维复杂度
### 4. System Metrics Collection
**Decision**: gopsutil v3
**Rationale**:
- 跨平台支持Linux amd64/arm64
- 正确处理容器 cgroup 限制
- 活跃维护,社区广泛使用
- API 简单直观
**Alternatives Considered**:
- 直接读取 /proc: 需要处理各种边缘情况
- prometheus/client_golang: 过于重量级
### 5. Agent Update Strategy
**Decision**: Self-update via Docker SDK
**Rationale**:
- 无需 SSH 访问远程机器
- Agent 自己拉取镜像并重启
- 失败时不影响当前运行
- 符合容器化最佳实践
**Alternatives Considered**:
- SSH 远程更新: 需要存储凭证,安全风险
- 手动更新: 运维负担重
## Phase 1: Design & Contracts
### Data Model
详见 [data-model.md](data-model.md)
核心实体:
- **Agent**: 常驻服务元数据
- **ScanTask**: 任务队列记录
- **Heartbeat**: 心跳数据Redis 缓存)
### API Contracts
详见 [contracts/](contracts/) 目录
核心 API
- `POST /api/agent/tasks/pull`: Agent 拉取任务
- `PATCH /api/agent/tasks/:id/status`: Agent 更新任务状态
- `GET /api/agents/ws`: WebSocket 连接端点
### WebSocket Message Protocol
详见 [contracts/websocket.yaml](contracts/websocket.yaml)
消息类型:
- Agent → Server: `heartbeat`
- Server → Agent: `task_available`, `task_cancel`, `config_update`, `update_required`, `ping`
## Implementation Phases
### Phase 1: Agent 基础框架
- 配置管理和命令行参数解析
- WebSocket 客户端和重连逻辑
- 心跳上报机制
- 系统负载采集
### Phase 2: Server 端 WebSocket 支持
- WebSocket Hub 连接管理
- Agent 认证和注册
- 心跳接收和存储
- Agent 状态管理online/offline
### Phase 3: 任务拉取和执行
- Agent HTTP 客户端
- 任务拉取循环Pull 模式)
- 负载检查逻辑
- Server 端任务分配 API
### Phase 4: Docker 容器管理
- Worker 容器启动和参数传递
- 容器退出码监控
- 错误日志读取和截断
- 容器自动清理
### Phase 5: 任务状态管理
- Agent 状态上报 API
- Server 端状态更新逻辑
- scan_task 和 scan 状态同步
- 任务取消机制
### Phase 6: 高级功能
- 配置动态更新
- Agent 自动更新
- 一键安装脚本
- 监控和日志
## Testing Strategy
### Unit Tests
- 配置解析和验证
- 重连退避算法
- 系统负载采集
- 消息序列化/反序列化
### Integration Tests
- WebSocket 连接和认证
- 心跳发送和接收
- 任务拉取和状态更新
- Docker 容器生命周期
### End-to-End Tests
- 完整任务执行流程
- 断线重连场景
- 多 Agent 并发拉取
- 负载均衡验证
## Security Considerations
- API Key 存储和传输安全
- WebSocket 使用 wss:// 加密
- Docker socket 权限控制
- 错误日志脱敏
- Agent OOM 保护oom-score-adj
## Deployment
### Agent 部署
1. 用户在 Web 界面创建 Agent获取 API Key
2. 执行一键安装命令:`curl ... | bash -s -- --server <ip> --key <key>`
3. Agent 容器启动并连接 Server
4. Web 界面显示 Agent 在线状态
### Server 部署
1. 运行数据库迁移添加 agent 和 scan_task 表
2. 更新 Server 镜像
3. 重启 Server 服务
4. 验证 WebSocket 端点可访问
## Monitoring
- Agent 心跳数据CPU/内存/磁盘/任务数)
- 任务队列长度和等待时间
- 任务成功率和失败率
- Agent 连接状态和重连次数
- Worker 容器资源使用
## Rollback Plan
- Agent 更新失败时继续运行旧版本
- Server 可以独立回滚,不影响 Agent
- 数据库迁移支持回滚
- 保留旧版本镜像用于快速恢复
## Code Examples
### Agent Core Components
#### 1. Config Structure
```go
// agent/internal/config/config.go
package config
type Config struct {
// CLI 参数
ServerURL string // 基础地址(不含协议)
APIKey string
AgentName string
LogLevel string
// Server 动态下发的配置
MaxTasks int
CPUThreshold float64
MemThreshold float64
DiskThreshold float64
}
// 注意Worker 镜像名称由 Agent 根据 task.Version 动态构造
// 格式yyhuni/orbit-worker:v{version}
// 不再需要在配置中指定 WorkerImage
func LoadConfig() (*Config, error) {
// 解析命令行参数
// 派生 WebSocket URL: wss://<server>/api/agents/ws
// 派生 HTTP URL: https://<server>
}
```
#### 2. WebSocket Client
```go
// agent/internal/connection/client.go
package connection
import (
"github.com/gorilla/websocket"
)
type Client struct {
conn *websocket.Conn
serverURL string
apiKey string
msgChan chan *Message
}
func (c *Client) Connect(ctx context.Context) error {
// 连接 wss://<server>/api/agents/ws
// 设置 X-Agent-Key header
}
func (c *Client) reconnectLoop(ctx context.Context) {
backoff := 1 * time.Second
maxBackoff := 60 * time.Second
for {
err := c.Connect(ctx)
if err == nil {
backoff = 1 * time.Second
c.handleMessages(ctx)
}
time.Sleep(backoff)
backoff = min(backoff*2, maxBackoff)
}
}
```
#### 3. Task Executor (Pull Mode)
```go
// agent/internal/task/executor.go
package task
type Executor struct {
httpClient *http.Client
docker *docker.Runner
runningTasks sync.Map
maxTasks int
cpuThreshold float64
memThreshold float64
diskThreshold float64
}
func (e *Executor) Run(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
if !e.canAcceptTask(ctx) {
time.Sleep(e.getPullInterval())
continue
}
task, err := e.pullTask(ctx)
if err != nil || task == nil {
time.Sleep(e.getPullInterval())
continue
}
go e.executeTask(ctx, task)
time.Sleep(e.getPullInterval()) // 拉取成功后根据负载等待
}
}
}
// 根据当前负载动态调整拉取间隔,实现自动负载均衡
func (e *Executor) getPullInterval() time.Duration {
cpu, mem, disk, _ := getSystemLoad()
load := max(cpu, mem, disk) // 取最高负载
if load < 50 {
return 1 * time.Second // 低负载,快速拉取
} else if load < 80 {
return 3 * time.Second // 中负载,减慢拉取
} else {
return 10 * time.Second // 高负载,大幅减慢
}
}
func (e *Executor) canAcceptTask(ctx context.Context) bool {
if e.RunningCount() >= e.maxTasks {
return false
}
cpu, mem, disk, _ := getSystemLoad()
return cpu < e.cpuThreshold &&
mem < e.memThreshold &&
disk < e.diskThreshold
}
func (e *Executor) pullTask(ctx context.Context) (*Task, error) {
req, _ := http.NewRequestWithContext(ctx, "POST",
e.serverURL+"/api/agent/tasks/pull", nil)
req.Header.Set("X-Agent-Key", e.apiKey)
resp, err := e.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode == 204 {
return nil, nil // 无任务
}
var task Task
json.NewDecoder(resp.Body).Decode(&task)
return &task, nil
}
```
#### 4. System Metrics Collection
```go
// agent/internal/system/metrics.go
package system
import (
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/mem"
"github.com/shirou/gopsutil/v3/disk"
)
func GetSystemLoad() (cpuPercent, memPercent, diskPercent float64, err error) {
// CPU 使用率
cpuPercents, err := cpu.Percent(0, false)
if err != nil || len(cpuPercents) == 0 {
return 0, 0, 0, err
}
// 内存使用率(正确处理容器 cgroup 限制)
memInfo, err := mem.VirtualMemory()
if err != nil {
return 0, 0, 0, err
}
// 磁盘使用率
diskInfo, err := disk.Usage("/")
if err != nil {
return 0, 0, 0, err
}
return cpuPercents[0], memInfo.UsedPercent, diskInfo.UsedPercent, nil
}
```
### Server Core Components
#### 1. WebSocket Hub
```go
// server/internal/websocket/hub.go
package websocket
type Hub struct {
clients map[int]*Client // agentID -> Client
register chan *Client
unregister chan *Client
broadcast chan *Message
mu sync.RWMutex
}
func (h *Hub) Run() {
for {
select {
case client := <-h.register:
h.mu.Lock()
h.clients[client.agentID] = client
h.mu.Unlock()
case client := <-h.unregister:
h.mu.Lock()
delete(h.clients, client.agentID)
h.mu.Unlock()
case msg := <-h.broadcast:
h.broadcastMessage(msg)
}
}
}
func (h *Hub) SendTo(agentID int, msg *Message) error {
h.mu.RLock()
client, ok := h.clients[agentID]
h.mu.RUnlock()
if !ok {
return errors.New("agent not connected")
}
return client.Send(msg)
}
```
#### 2. Task Pull Handler
```go
// server/internal/handler/agent_task.go
package handler
func (h *AgentTaskHandler) PullTask(c *gin.Context) {
agentID := c.GetInt("agentID") // 从中间件获取
task, err := h.taskRepo.PullTask(c.Request.Context(), agentID)
if err != nil {
c.JSON(500, gin.H{"error": err.Error()})
return
}
if task == nil {
c.Status(204) // No Content
return
}
c.JSON(200, dto.TaskAssignResponse{
TaskID: task.ID,
ScanID: task.ScanID,
WorkflowName: task.WorkflowName,
Config: task.Config,
Version: task.Version,
})
}
```
#### 3. Task Repository (with Locking)
```go
// server/internal/repository/scan_task_repository.go
package repository
func (r *scanTaskRepository) PullTask(ctx context.Context, agentID int) (*model.ScanTask, error) {
var task model.ScanTask
err := r.db.WithContext(ctx).Raw(`
WITH c AS (
SELECT id
FROM scan_task
WHERE status = 'pending'
ORDER BY stage DESC, created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
UPDATE scan_task t
SET status = 'running',
dispatched_at = NOW(),
agent_id = ?
FROM c
WHERE t.id = c.id
RETURNING t.*
`, agentID).Scan(&task).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}
// 同步更新 scan.status
if err == nil {
r.db.Exec("UPDATE scan SET status = 'running' WHERE id = ?", task.ScanID)
}
return &task, err
}
```
#### 4. Heartbeat Handler
```go
// server/internal/websocket/client.go
package websocket
func (c *Client) handleHeartbeat(payload HeartbeatPayload) {
// 存储到 Redis
key := fmt.Sprintf("agent:%d:heartbeat", c.agentID)
data, _ := json.Marshal(payload)
c.redis.Set(context.Background(), key, data, 60*time.Second)
// 更新 Agent 最后心跳时间
c.db.Exec("UPDATE agent SET last_heartbeat = NOW(), status = 'online' WHERE id = ?", c.agentID)
// 检查版本是否需要更新
if payload.Version != c.expectedVersion {
c.Send(&Message{
Type: "update_required",
Payload: map[string]string{
"version": c.expectedVersion,
"image": "yyhuni/orbit-agent",
},
})
}
}
```
### Docker Integration
```go
// agent/internal/docker/runner.go
package docker
import (
"github.com/docker/docker/client"
)
type Runner struct {
cli *client.Client
}
func (r *Runner) Run(ctx context.Context, task *Task) (containerID string, err error) {
// 构造镜像名称yyhuni/orbit-worker:v{version}
workerImage := fmt.Sprintf("yyhuni/orbit-worker:v%s", task.Version)
// 拉取镜像(如果本地没有)
r.cli.ImagePull(ctx, workerImage, types.ImagePullOptions{})
// 创建容器
resp, err := r.cli.ContainerCreate(ctx, &container.Config{
Image: workerImage,
Env: []string{
fmt.Sprintf("TASK_ID=%d", task.ID),
fmt.Sprintf("SCAN_ID=%d", task.ScanID),
fmt.Sprintf("SERVER_URL=https://%s", serverURL),
fmt.Sprintf("CONFIG=%s", task.Config),
},
}, &container.HostConfig{
AutoRemove: false, // 不使用自动清理,退出后先读取日志再删除
OomScoreAdj: 500, // Worker 优先被 OOM 杀死
Binds: []string{
"/opt/orbit:/opt/orbit",
},
}, nil, nil, "")
if err != nil {
return "", err
}
// 启动容器
err = r.cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{})
return resp.ID, err
}
func (r *Runner) Wait(containerID string) (exitCode int, err error) {
statusCh, errCh := r.cli.ContainerWait(context.Background(),
containerID, container.WaitConditionNotRunning)
select {
case err := <-errCh:
return -1, err
case status := <-statusCh:
return int(status.StatusCode), nil
}
}
func (r *Runner) GetLogs(containerID string, lines int) string {
options := types.ContainerLogsOptions{
ShowStdout: true,
ShowStderr: true,
Tail: fmt.Sprintf("%d", lines),
}
logs, _ := r.cli.ContainerLogs(context.Background(), containerID, options)
defer logs.Close()
buf := new(bytes.Buffer)
buf.ReadFrom(logs)
// 截断到 4KB
if buf.Len() > 4096 {
return "[truncated]\n" + buf.String()[buf.Len()-4096:]
}
return buf.String()
}
```
### Message Protocol Implementation
```go
// agent/internal/connection/message.go
package connection
type Message struct {
Type string `json:"type"`
Payload map[string]interface{} `json:"payload"`
Timestamp time.Time `json:"timestamp"`
}
type HeartbeatPayload struct {
CPU float64 `json:"cpu"`
Mem float64 `json:"mem"`
Disk float64 `json:"disk"`
Tasks int `json:"tasks"`
Version string `json:"version"`
Hostname string `json:"hostname"`
Uptime int64 `json:"uptime"`
}
func (c *Client) SendHeartbeat(payload HeartbeatPayload) error {
msg := &Message{
Type: "heartbeat",
Payload: structToMap(payload),
Timestamp: time.Now(),
}
return c.Send(msg)
}
```
## Quick Reference
### Key Files to Implement
**Agent (Priority Order)**:
1. [agent/internal/config/config.go](agent/internal/config/config.go) - 配置管理
2. [agent/internal/connection/client.go](agent/internal/connection/client.go) - WebSocket 客户端
3. [agent/internal/task/executor.go](agent/internal/task/executor.go) - 任务执行器
4. [agent/internal/docker/runner.go](agent/internal/docker/runner.go) - Docker 管理
5. [agent/internal/system/metrics.go](agent/internal/system/metrics.go) - 系统监控
**Server (Priority Order)**:
1. [server/internal/model/agent.go](server/internal/model/agent.go) - Agent 模型
2. [server/internal/model/scan_task.go](server/internal/model/scan_task.go) - ScanTask 模型
3. [server/internal/websocket/hub.go](server/internal/websocket/hub.go) - WebSocket Hub
4. [server/internal/repository/scan_task_repository.go](server/internal/repository/scan_task_repository.go) - 任务队列
5. [server/internal/handler/agent_task.go](server/internal/handler/agent_task.go) - Agent API
### Critical Implementation Details
1. **任务拉取必须使用 `FOR UPDATE SKIP LOCKED`** 避免重复分配
2. **Agent 必须使用 gopsutil** 正确处理容器 cgroup 限制
3. **Worker 容器不使用 AutoRemove**,退出后先读取日志再手动删除
4. **错误日志必须截断到 4KB** 避免数据库字段溢出
5. **重连必须使用指数退避** 避免过度重连
6. **心跳数据存储在 Redis** 避免频繁写入 PostgreSQL

View File

@@ -1,238 +0,0 @@
# Feature Specification: WebSocket Agent System
**Feature Branch**: `001-websocket-agent`
**Created**: 2026-01-21
**Status**: Draft
**Input**: 实现基于 WebSocket 的轻量级 Agent 系统,用于替代 SSH 任务分发方式。Agent 作为常驻服务运行在远程 VPS 上,主动连接 Server 建立长连接,通过 Pull 模式拉取任务并启动临时 Worker 容器执行扫描。
## User Scenarios & Testing
### User Story 1 - Agent 部署和连接 (Priority: P1)
作为运维人员,我希望能够快速在远程 VPS 上部署 Agent 并连接到 Server以便开始接收和执行扫描任务。
**Why this priority**: 这是整个系统的基础,没有 Agent 连接就无法执行任何任务。这是 MVP 的核心功能。
**Independent Test**: 可以通过在远程机器上运行安装命令,验证 Agent 成功连接到 Server 并显示在线状态来独立测试。
**Acceptance Scenarios**:
1. **Given** 我在 Web 界面创建了一个新 Agent**When** 我在远程 VPS 上执行提供的安装命令,**Then** Agent 应该成功启动并在 Web 界面显示为在线状态
2. **Given** Agent 已连接到 Server**When** 网络临时断开,**Then** Agent 应该自动重连并恢复在线状态
3. **Given** 我提供了错误的 API Key**When** Agent 尝试连接,**Then** 连接应该被拒绝并显示认证失败错误
---
### User Story 2 - 任务执行和状态跟踪 (Priority: P1)
作为系统管理员,我希望 Agent 能够自动拉取任务并执行扫描,同时实时更新任务状态,以便我能够监控扫描进度。
**Why this priority**: 这是系统的核心价值所在Agent 必须能够执行任务才能替代 SSH 方式。
**Independent Test**: 可以通过创建一个扫描任务,验证 Agent 自动拉取、执行并更新状态来独立测试。
**Acceptance Scenarios**:
1. **Given** Agent 处于空闲状态且系统负载正常,**When** Server 有新的扫描任务,**Then** Agent 应该自动拉取任务并启动 Worker 容器执行
2. **Given** Worker 容器正在执行扫描,**When** 扫描完成(退出码为 0**Then** 任务状态应该更新为 completed
3. **Given** Worker 容器执行失败(退出码非 0**When** 容器退出,**Then** 任务状态应该更新为 failed 并包含错误日志
4. **Given** 用户在 Web 界面取消任务,**When** 取消指令发送到 Agent**Then** Agent 应该停止对应的 Worker 容器并更新状态为 cancelled
---
### User Story 3 - 负载监控和智能调度 (Priority: P2)
作为系统管理员,我希望 Agent 能够监控自身负载并智能决定是否接受新任务,以便避免系统过载。
**Why this priority**: 这确保了系统的稳定性和可靠性,防止单个 Agent 过载导致任务失败。
**Independent Test**: 可以通过模拟高负载场景,验证 Agent 拒绝新任务来独立测试。
**Acceptance Scenarios**:
1. **Given** Agent 的 CPU 使用率超过阈值(默认 85%**When** 检查是否可以接受新任务,**Then** Agent 应该等待直到负载降低
2. **Given** Agent 已达到最大并发任务数(默认 5**When** 尝试拉取新任务,**Then** Agent 应该等待直到有任务完成
3. **Given** Agent 定期上报心跳,**When** Server 收到心跳数据,**Then** Server 应该记录 Agent 的 CPU、内存、磁盘使用率和当前任务数
---
### User Story 4 - 配置动态更新 (Priority: P2)
作为系统管理员,我希望能够在 Web 界面动态调整 Agent 的配置参数,以便根据实际情况优化性能。
**Why this priority**: 这提供了灵活性,允许在不重启 Agent 的情况下调整配置。
**Independent Test**: 可以通过在 Web 界面修改配置,验证 Agent 立即应用新配置来独立测试。
**Acceptance Scenarios**:
1. **Given** Agent 已连接到 Server**When** 管理员在 Web 界面修改最大任务数,**Then** Agent 应该立即接收并应用新配置
2. **Given** Agent 使用旧的负载阈值,**When** Server 推送新的阈值配置,**Then** Agent 应该使用新阈值进行负载检查
---
### User Story 5 - 自动更新 (Priority: P3)
作为运维人员,我希望 Agent 能够自动更新到最新版本,以便无需手动干预即可获得新功能和修复。
**Why this priority**: 这是便利性功能,可以简化运维工作,但不是核心功能。
**Independent Test**: 可以通过发布新版本,验证 Agent 自动拉取镜像并重启来独立测试。
**Acceptance Scenarios**:
1. **Given** Agent 运行旧版本,**When** Server 检测到版本不匹配,**Then** Server 应该发送更新指令
2. **Given** Agent 收到更新指令,**When** 开始更新流程,**Then** Agent 应该拉取新镜像、启动新容器并退出旧容器
3. **Given** 镜像拉取失败,**When** 更新失败,**Then** Agent 应该记录错误并继续运行当前版本
---
### Edge Cases
- 当 Agent 正在执行任务时收到更新指令,应该等待任务完成后再更新
- 当网络不稳定导致频繁断连时,应该使用指数退避策略避免过度重连
- 当 Docker 守护进程不可用时Agent 应该记录错误并等待 Docker 恢复
- 当磁盘空间不足时Agent 应该拒绝新任务并上报警告
- 当 Worker 容器长时间未响应时,应该有超时机制强制停止
- 当多个 Agent 同时拉取任务时,应该使用数据库锁避免重复分配
## Requirements
### Functional Requirements
- **FR-001**: Agent 必须作为独立的 Go 二进制文件编译,支持 Linux amd64 和 arm64 架构
- **FR-002**: Agent 必须支持通过命令行参数配置 Server 地址和 API Key
- **FR-003**: Agent 必须主动连接 Server 的 WebSocket 端点并进行认证
- **FR-004**: Agent 必须在连接失败时使用指数退避策略自动重连1s, 2s, 4s, 8s, 最大 60s
- **FR-005**: Agent 必须每 5 秒发送一次心跳消息,包含 CPU、内存、磁盘使用率、任务数、版本号和主机名
- **FR-006**: Server 必须在 120 秒未收到心跳时将 Agent 标记为离线
- **FR-007**: Agent 必须通过 HTTP API 主动拉取任务Pull 模式API 路径为 `/api/agent/tasks/*`(操作 scan_task非 scan
- **FR-026**: Agent 拉取策略:收到 WS task_available 通知时立即拉取;拉取返回 204 后退避等待5s/10s/30s最大 60s收到新通知时重置退避拉取间隔根据当前负载动态调整负载 <50% 时 1 秒50-80% 时 3 秒,>80% 时 10 秒),实现自动负载均衡
- **FR-008**: Agent 必须在满足以下条件时拉取任务:当前任务数 < max_tasks 且 CPU/内存/磁盘使用率低于阈值
- **FR-009**: Server 必须使用 PostgreSQL 行级锁FOR UPDATE SKIP LOCKED确保任务不被重复分配
- **FR-010**: Agent 必须使用 Docker SDK 启动 Worker 容器,并传递任务参数作为环境变量
- **FR-011**: Agent 必须监控 Worker 容器的退出码并根据退出码更新任务状态0=completed, 非0=failed
- **FR-012**: Agent 必须在 Worker 失败时读取容器最后 100 行日志作为错误信息(超过 4KB 时截断,数据库字段 error_message 为 VARCHAR(4096)
- **FR-013**: Agent 必须响应 Server 的任务取消指令,停止对应的 Worker 容器
- **FR-014**: Agent 必须在 Worker 容器退出后先读取日志再删除容器(不使用 --rm手动清理
- **FR-025**: Agent 必须对每个 Worker 容器设置最大运行时长(默认 7 天),超时强制停止并标记任务为 failed
- **FR-015**: Server 必须提供一键安装脚本,包含 Agent 部署命令和 API Key。脚本应包含拉取 Agent Docker 镜像、使用提供的 API Key 和 Server 地址启动 Agent 容器(挂载 Docker socket 和 /opt/orbit 目录)
- **Server 地址生成规则**(最少交互):
1) 若配置了 `PUBLIC_URL`(完整 URL含协议/域名/端口),直接使用;
2) 否则从用户访问 `GET /api/agents/install.sh` 的请求 URL 推断(基于 Host/Proto/Port 头)。
- 安装脚本内写入 `SERVER_URL=<PUBLIC_URL or inferred URL>`Agent 启动后使用该值访问 HTTP API并将 `https→wss``http→ws` 自动转换用于 WebSocket 连接。
- **FR-016**: Agent 必须支持接收 Server 推送的配置更新maxTasks、cpuThreshold、memThreshold、diskThreshold
- **FR-017**: Agent 必须在收到更新指令时拉取新版本镜像、启动新容器并退出当前进程
- **FR-018**: Worker 容器必须使用 oom-score-adj=500 提高被 OOM 杀死的优先级,保护 Agentoom-score-adj=-500
- **FR-019**: Agent 必须使用 gopsutil 库正确处理容器 cgroup 限制来采集系统负载(在容器环境中从 /sys/fs/cgroup 读取指标)
- **FR-020**: Server 必须在分配任务时将 scan_task.status 更新为 running并同步更新 scan.status 为 running分配和启动合并为一步
- **FR-027**: Server 必须在创建 Scan 时同时创建对应的 scan_task 记录status=pending, workflow_name 取 YamlConfiguration 的第一个顶层 key即扫描配置 YAML 中定义的工作流名称,如 "subdomain_discovery"version 从 VERSION 文件读取Agent 收到后自行拼接镜像名称为 yyhuni/orbit-worker:v{VERSION}
- **FR-028**: Server 必须运行后台 Job每分钟执行负责1) 标记心跳超时的 Agent 为 offline2) 回收离线 Agent 的任务
- **FR-021**: Server 必须在 Agent 离线时回收其名下的 running 任务,重试次数 <3 时重置为 pending否则标记为 failed
- **FR-022**: Server 必须校验状态更新请求的 Agent 所有权agent_id 匹配),不匹配返回 403
- **FR-023**: Server 必须保证状态更新幂等,重复上报相同状态返回 200
- **FR-024**: Server 必须拒绝非法状态转换(仅允许 pending→running、running→completed/failed/cancelled
### Key Entities
- **Agent**: 常驻服务包含属性ID、名称、API Key、状态pending/online/offline、主机名、IP 地址、版本号、调度配置max_tasks、cpu_threshold、mem_threshold、disk_threshold、连接时间、最后心跳时间
- **ScanTask**: 扫描任务包含属性ID、scan_id、stage、workflow_name、状态pending/running/completed/failed/cancelled、agent_id、worker_image、配置YAML、错误信息、retry_count、时间戳
- **Heartbeat**: 心跳数据包含属性CPU 使用率、内存使用率、磁盘使用率、运行中任务数、版本号、主机名、运行时长
- **Worker**: 临时容器,执行具体扫描任务,完成后自动删除
## Success Criteria
### Measurable Outcomes
- **SC-001**: Agent 安装和连接过程在 2 分钟内完成(从执行安装命令到显示在线状态)
- **SC-002**: Agent 在网络恢复后 120 秒内自动重连成功
- **SC-003**: 任务从创建到被 Agent 拉取的延迟不超过 5 秒(在 Agent 空闲且负载正常的情况下)
- **SC-004**: 任务状态更新的延迟不超过 2 秒(从 Worker 退出到状态更新完成)
- **SC-005**: Agent 在 CPU 使用率超过 85% 时不接受新任务,确保系统稳定性
- **SC-006**: 单个 Agent 支持同时运行至少 5 个并发任务
- **SC-007**: 心跳数据每 5 秒更新一次Server 在 120 秒未收到心跳时准确标记 Agent 离线
- **SC-008**: 配置更新在推送后 5 秒内被 Agent 应用
- **SC-009**: Agent 自动更新过程在 5 分钟内完成(包括镜像拉取和容器重启)
- **SC-010**: 多个 Agent 同时拉取任务时,不会出现任务重复分配(通过数据库锁保证)
- **SC-011**: Agent 内存占用不超过 50MB空闲状态
- **SC-012**: Worker 容器在任务完成后 100% 被清理,不留下僵尸容器
## Assumptions
- Docker 已在远程 VPS 上安装并正常运行
- Server 和 Agent 之间的网络连接支持 WebSocketwss://
- PostgreSQL 数据库版本支持 FOR UPDATE SKIP LOCKED 语法9.5+
- Agent 运行的机器有足够的磁盘空间存储 Worker 镜像和扫描结果
- Server 使用 HTTPS/WSS 协议TLS 证书校验默认关闭(用于自签名证书场景)
- 单个 Worker 容器的资源需求不会超过 Agent 机器的总资源
- 扫描任务的执行时间通常在分钟到小时级别,不需要毫秒级的任务调度
- Agent 和 Worker 使用相同的 /opt/orbit 目录进行数据交换
## Out of Scope
- Agent 的 Web 管理界面(通过 Server 的 Web 界面管理)
- Agent 之间的直接通信(所有通信通过 Server 中转)
- 任务优先级的手动调整(优先级由系统自动计算)
- 多 Workflow 串联执行(当前仅支持单个 subdomain_discovery workflow多 workflow 功能预留)
- Agent 的日志聚合和分析Agent 只负责本地日志记录)
- Worker 容器的资源限制配置(使用 Docker 默认设置)
- Agent 的健康检查端点(通过心跳机制实现健康监控)
- 任务级别的超时控制(由 Worker 内部实现Agent 仅实现容器级别的 7 天超时保护,见 FR-025
## Dependencies
- Docker EngineAgent 使用 Docker SDK 管理容器)
- PostgreSQL 数据库Server 端任务队列存储)
- RedisServer 端心跳数据缓存)
- Go 1.21+Agent 开发语言)
- gopsutil v3系统负载采集库
- gorilla/websocketWebSocket 客户端库)
## Version Management
**统一版本管理**所有组件Server、Agent、Worker的版本号统一由项目根目录的 `VERSION` 文件管理。
**版本文件位置**`/Users/yangyang/Desktop/orbit/VERSION`
**版本使用方式**
- **Server**:启动时读取 VERSION 文件,创建 scan_task 时拼接 Worker 镜像名称(格式:`yyhuni/orbit-worker:v{VERSION}`
- **Agent**:启动时读取 VERSION 文件,在心跳消息中上报版本号给 Server
- **Worker**Docker 镜像的 tag 使用 VERSION 文件内容(如:`yyhuni/orbit-worker:v1.5.12-dev`
**版本升级流程**
1. 更新 `VERSION` 文件内容(如:`v1.5.12-dev``v1.5.13`
2. 重新构建并发布 Worker 镜像tag 使用新版本号)
3. 重启 Server读取新版本号
4. 新创建的 scan_task 自动使用新版本 Worker 镜像
5. Agent 拉取任务时获取新镜像名称,自动拉取并使用新版本 Worker
**优势**
- 统一管理:只需修改一个文件即可更新所有组件版本
- 版本一致:确保 Server、Agent、Worker 版本同步
- 自动升级:无需手动修改配置,新任务自动使用新版本
- 易于追踪:所有组件版本号一致,便于问题排查和回滚
## Security Considerations
### 认证体系3 种调用者)
| 调用者 | 认证方式 | Header | 用途 |
|--------|---------|--------|------|
| 用户(前端) | JWT | `Authorization: Bearer <token>` | Web 界面操作 |
| Worker容器 | 全局静态 Token | `X-Worker-Token` | 保存扫描结果 |
| Agent | 每个 Agent 独立 Key | `X-Agent-Key` | 任务拉取、状态更新 |
- Agent 的 `api_key` 存储在 `agent` 表,每个 Agent 一个独立的 key
- Worker 的 token 是全局配置Worker 是 Agent 启动的临时容器,不需要独立认证)
- WebSocket 认证:由于部分环境不支持自定义 Header支持两种方式
- Header: `X-Agent-Key: <key>`
- Query: `wss://server/api/agents/ws?key=<key>`
### 其他安全要求
- API Key 必须安全存储,不应出现在日志或错误信息中
- WebSocket 连接必须使用 wss:// 协议(加密传输)
- Agent 不应信任来自 Worker 的任何输入Worker 只能通过 HTTP API 上报结果)
- Docker socket 挂载到 Agent 容器时需要注意权限控制
- 错误日志在上报前应该截断,避免泄露敏感信息
- Agent 的 oom-score-adj 设置为 -500确保在内存不足时优先保护 Agent

View File

@@ -1,273 +0,0 @@
# Implementation Tasks: WebSocket Agent System
**Feature Branch**: `001-websocket-agent`
**Generated**: 2026-01-22
**Total Tasks**: 87
## Overview
This document breaks down the WebSocket Agent System implementation into executable tasks organized by user story. Each phase represents a complete, independently testable increment of functionality.
## Implementation Strategy
**MVP Scope**: Phase 3 (User Story 1 - Agent Deployment and Connection)
- Delivers core value: Agent can connect to Server and maintain connection
- Enables early testing and validation
- Foundation for all subsequent features
**Incremental Delivery**:
1. Setup + Foundational → US1 (MVP) → US2 → US3 → US4 → US5 → Polish
2. Each user story phase is independently testable
3. Parallel execution opportunities marked with [P]
## Phase 1: Setup (Project Initialization)
**Goal**: Initialize project structure and dependencies for both Agent and Server components.
### Tasks
- [ ] T001 Create agent/ module directory structure in /Users/yangyang/Desktop/orbit/agent/
- [ ] T002 Initialize Go module for agent in agent/go.mod with module name github.com/yyhuni/orbit/agent
- [ ] T003 Create agent subdirectories: cmd/agent/, internal/config/, internal/websocket/, internal/task/, internal/docker/, internal/metrics/
- [ ] T004 [P] Add gorilla/websocket dependency to agent/go.mod (go get github.com/gorilla/websocket)
- [ ] T005 [P] Add docker/docker SDK dependency to agent/go.mod (go get github.com/docker/docker)
- [ ] T006 [P] Add gopsutil v3 dependency to agent/go.mod (go get github.com/shirou/gopsutil/v3)
- [ ] T007 Create server extensions directory structure in server/internal/handler/agent.go
- [ ] T008 [P] Add gin-gonic/gin dependency to server/go.mod if not present
- [ ] T009 [P] Add gorilla/websocket dependency to server/go.mod if not present
## Phase 2: Foundational (Blocking Prerequisites)
**Goal**: Implement database schema, base models, and core infrastructure needed by all user stories.
**Independent Test**: Database migrations run successfully, base models can be instantiated, WebSocket infrastructure accepts connections.
### Tasks
- [ ] T010 Create database migration for agent table in server/migrations/YYYYMMDD_HHMMSS_create_agent_table.sql (use timestamp format: 20260122_143000)
- [ ] T011 Create database migration for scan_task table in server/migrations/YYYYMMDD_HHMMSS_create_scan_task_table.sql (includes version field, no worker_image field)
- [ ] T012 Create database indexes migration in server/migrations/YYYYMMDD_HHMMSS_create_indexes.sql
- [ ] T013 [P] Implement Agent model in server/internal/model/agent.go with GORM tags
- [ ] T014 [P] Implement ScanTask model in server/internal/model/scan_task.go with GORM tags (includes version field, no worker_image field)
- [ ] T015 Implement Agent repository interface in server/internal/repository/agent.go
- [ ] T016 Implement ScanTask repository interface in server/internal/repository/scan_task.go
- [ ] T017 [P] Implement Redis client wrapper in server/internal/cache/redis.go
- [ ] T018 [P] Implement heartbeat cache operations (set/get/delete) in server/internal/cache/heartbeat.go
- [ ] T019 Create WebSocket hub for connection management in server/internal/websocket/hub.go
- [ ] T020 Implement WebSocket authentication middleware in server/internal/middleware/agent_auth.go
## Phase 3: User Story 1 - Agent Deployment and Connection (P1) 🎯 MVP
**Story Goal**: As an operations person, I can quickly deploy Agent on remote VPS and connect to Server to start receiving tasks.
**Independent Test**:
1. Create Agent via Web UI → receive API key
2. Run installation command on remote machine → Agent shows online status
3. Disconnect network → Agent automatically reconnects within 120s
4. Provide wrong API key → connection rejected with auth error
### Tasks
- [ ] T021 [US1] Implement Config struct in agent/internal/config/config.go with ServerURL, APIKey, MaxTasks, thresholds
- [ ] T022 [US1] Implement config loading from environment variables in agent/internal/config/loader.go
- [ ] T023 [US1] Implement WebSocket client with exponential backoff in agent/internal/websocket/client.go
- [ ] T024 [US1] Implement connection authentication in agent/internal/websocket/auth.go (support header and query param)
- [ ] T025 [US1] Implement reconnection logic with backoff strategy (1s, 2s, 4s, 8s, max 60s) in agent/internal/websocket/reconnect.go
- [ ] T026 [US1] Implement main Agent entry point in agent/cmd/agent/main.go
- [ ] T027 [P] [US1] Implement Server WebSocket endpoint handler in server/internal/handler/agent_ws.go at /api/agents/ws
- [ ] T028 [P] [US1] Implement Agent registration on first connection in server/internal/service/agent_service.go
- [ ] T029 [P] [US1] Implement Agent status update to online in server/internal/repository/agent.go
- [ ] T030 [US1] Implement WebSocket message router in server/internal/websocket/router.go
- [ ] T031 [US1] Create Agent creation API endpoint in server/internal/handler/agent.go POST /api/agents
- [ ] T032 [US1] Implement API key generation (8 char hex string, 4 bytes random) in server/internal/service/agent_service.go
- [ ] T033 [US1] Implement Agent list API endpoint in server/internal/handler/agent.go GET /api/agents
## Phase 4: User Story 2 - Task Execution and Status Tracking (P1)
**Story Goal**: As a system admin, I want Agent to automatically pull tasks and execute scans, with real-time status updates for monitoring progress.
**Independent Test**:
1. Create scan task → Agent pulls and starts Worker container
2. Worker completes (exit code 0) → task status updates to completed
3. Worker fails (exit code ≠ 0) → task status updates to failed with error log
4. Cancel task in Web UI → Agent stops Worker and updates status to cancelled
### Tasks
- [ ] T034 [US2] Implement task pull HTTP client in agent/internal/task/client.go for POST /api/agent/tasks/pull
- [ ] T035 [US2] Implement task status update client in agent/internal/task/client.go for PATCH /api/agent/tasks/{taskId}/status
- [ ] T036 [US2] Implement Docker client wrapper in agent/internal/docker/client.go
- [ ] T037 [US2] Implement Worker container launcher in agent/internal/docker/runner.go (constructs image name as yyhuni/orbit-worker:v{version} from task version field, passes environment variables)
- [ ] T038 [US2] Implement container exit code monitoring in agent/internal/docker/monitor.go
- [ ] T039 [US2] Implement container log reader (last 100 lines, 4KB truncation) in agent/internal/docker/logs.go
- [ ] T040 [US2] Implement container cleanup logic in agent/internal/docker/cleanup.go (manual, not --rm)
- [ ] T041 [US2] Implement task executor orchestration in agent/internal/task/executor.go
- [ ] T042 [US2] Implement Worker timeout mechanism (default 7 days) in agent/internal/task/timeout.go
- [ ] T043 [P] [US2] Implement task pull endpoint in server/internal/handler/agent_task.go POST /api/agent/tasks/pull
- [ ] T044 [P] [US2] Implement task assignment with FOR UPDATE SKIP LOCKED in server/internal/repository/scan_task.go
- [ ] T045 [P] [US2] Implement task status update endpoint in server/internal/handler/agent_task.go PATCH /api/agent/tasks/{taskId}/status
- [ ] T046 [P] [US2] Implement status update validation in server/internal/service/scan_task_service.go (FR-022: ownership check - agent_id must match; FR-023: idempotency - duplicate status returns 200; FR-024: state transition validation - only allow pending→running, running→completed/failed/cancelled)
- [ ] T047 [P] [US2] Implement scan.status synchronization with scan_task.status in server/internal/service/scan_service.go
- [ ] T048 [US2] Implement task_available WebSocket notification in server/internal/websocket/notifier.go
- [ ] T049 [US2] Implement task_cancel WebSocket message handler in agent/internal/websocket/handlers.go
- [ ] T050 [US2] Implement scan_task creation on scan creation in server/internal/service/scan_service.go (reads VERSION file and sets version field)
- [ ] T051 [US2] Implement Agent pull strategy with backoff (5s/10s/30s, max 60s) and dynamic pull interval based on load (<50%: 1s, 50-80%: 3s, >80%: 10s) in agent/internal/task/puller.go
## Phase 5: User Story 3 - Load Monitoring and Smart Scheduling (P2)
**Story Goal**: As a system admin, I want Agent to monitor its load and intelligently decide whether to accept new tasks to avoid system overload.
**Independent Test**:
1. Agent CPU exceeds 85% → Agent waits before pulling new tasks
2. Agent reaches max concurrent tasks (5) → Agent waits until task completes
3. Agent sends heartbeat → Server records CPU, memory, disk, task count
4. Agent heartbeat timeout (>120s) → Server marks Agent offline
### Tasks
- [ ] T052 [US3] Implement system metrics collector using gopsutil in agent/internal/metrics/collector.go
- [ ] T053 [US3] Implement cgroup-aware metrics for containerized Agent in agent/internal/metrics/cgroup.go
- [ ] T054 [US3] Implement heartbeat message builder in agent/internal/websocket/heartbeat.go
- [ ] T055 [US3] Implement heartbeat sender (every 5 seconds) in agent/internal/websocket/sender.go
- [ ] T056 [US3] Implement load check before task pull in agent/internal/task/scheduler.go
- [ ] T057 [US3] Implement concurrent task counter in agent/internal/task/counter.go
- [ ] T058 [P] [US3] Implement heartbeat WebSocket message handler in server/internal/websocket/handlers.go
- [ ] T059 [P] [US3] Integrate heartbeat handler with cache layer (call T018 operations) in server/internal/websocket/handlers.go
- [ ] T060 [P] [US3] Implement Agent offline detection background job in server/internal/job/agent_monitor.go (runs every minute)
- [ ] T061 [P] [US3] Implement task recovery for offline Agents in server/internal/job/task_recovery.go
- [ ] T062 [US3] Implement heartbeat API endpoint for Web UI in server/internal/handler/agent.go GET /api/agents/{id}/heartbeat
## Phase 6: User Story 4 - Dynamic Configuration Updates (P2)
**Story Goal**: As a system admin, I can dynamically adjust Agent configuration parameters in Web UI to optimize performance without restarting Agent.
**Independent Test**:
1. Modify max tasks in Web UI → Agent receives and applies new config immediately
2. Update load thresholds → Agent uses new thresholds for load checks
### Tasks
- [ ] T063 [US4] Implement config_update WebSocket message handler in agent/internal/websocket/handlers.go
- [ ] T064 [US4] Implement dynamic config update in agent/internal/config/updater.go
- [ ] T065 [P] [US4] Implement Agent config update API endpoint in server/internal/handler/agent.go PATCH /api/agents/{id}/config
- [ ] T066 [P] [US4] Implement config_update WebSocket message sender in server/internal/websocket/notifier.go
## Phase 7: User Story 5 - Auto-Update (P3)
**Story Goal**: As an operations person, I want Agent to automatically update to the latest version without manual intervention.
**Independent Test**:
1. Agent reports old version → Server sends update_required message
2. Agent receives update → pulls new image, starts new container, exits old process
3. Image pull fails → Agent logs error and continues running current version
### Tasks
- [ ] T067 [US5] Implement update_required WebSocket message handler in agent/internal/websocket/handlers.go
- [ ] T068 [US5] Implement self-update logic in agent/internal/update/updater.go (pull image, start container, exit)
- [ ] T069 [US5] Implement graceful shutdown waiting for tasks in agent/internal/task/shutdown.go
- [ ] T070 [P] [US5] Implement version check in server/internal/service/agent_service.go
- [ ] T071 [P] [US5] Implement update_required message sender in server/internal/websocket/notifier.go
## Phase 8: Polish & Cross-Cutting Concerns
**Goal**: Add production-ready features, error handling, logging, and documentation.
### Tasks
- [ ] T072 [P] Add structured logging to Agent using zerolog in agent/internal/logger/
- [ ] T073 [P] Add structured logging to Server Agent handlers using existing logger
- [ ] T074 [P] Implement API key masking in logs for both Agent and Server
- [ ] T075 [P] Add OOM score adjustment (-500 for Agent, 500 for Worker) in agent/internal/docker/runner.go
- [ ] T076 [P] Implement ping/pong WebSocket keep-alive in agent/internal/websocket/keepalive.go
- [ ] T077 [P] Add error message truncation (4KB limit) in agent/internal/docker/logs.go
- [ ] T078 [P] Create Agent Dockerfile in agent/Dockerfile
- [ ] T079 [P] Create Agent installation script in scripts/install-agent.sh (implement FR-015: support PUBLIC_URL config or infer from request headers; generate SERVER_URL for Agent; auto-convert https→wss, http→ws for WebSocket)
- [ ] T080 [P] Add Agent deployment documentation in docs/agent-deployment.md
- [ ] T081 [P] Add WebSocket protocol documentation in docs/websocket-protocol.md
- [ ] T082 Add integration test for Agent connection flow in agent/test/integration/connection_test.go
- [ ] T083 Add integration test for task execution flow in agent/test/integration/task_test.go
- [ ] T084 Add unit tests for WebSocket reconnection logic in agent/internal/websocket/reconnect_test.go
- [ ] T085 Add unit tests for task assignment with database locks in server/internal/repository/scan_task_test.go
- [ ] T086 Add unit tests for status update validation in server/internal/service/scan_task_service_test.go
- [ ] T087 Add unit tests for heartbeat processing in server/internal/websocket/handlers_test.go
## Dependencies
### User Story Completion Order
```
Setup (Phase 1) → Foundational (Phase 2)
US1 (MVP) ← Must complete first
US2 ← Depends on US1 (connection required for task execution)
┌───┴───┐
US3 US4 ← Can be done in parallel after US2
└───┬───┘
US5 ← Depends on US1 (connection) and US3 (heartbeat/version)
Polish
```
### Critical Path
1. **Setup + Foundational** (T001-T020): Required for all subsequent work
2. **US1: Connection** (T021-T033): Blocking for all other user stories
3. **US2: Task Execution** (T034-T051): Blocking for US3, US4, US5
4. **US3 + US4** (T052-T066): Can be implemented in parallel
5. **US5: Auto-Update** (T067-T071): Requires US1 and US3
6. **Polish** (T072-T087): Can be done incrementally throughout
## Parallel Execution Opportunities
### Phase 1 (Setup)
- T004, T005, T006: Agent dependencies (parallel)
- T008, T009: Server dependencies (parallel)
### Phase 2 (Foundational)
- T013, T014: Models (parallel)
- T017, T018: Redis cache (parallel after T017)
### Phase 3 (US1)
- T027, T028, T029: Server-side connection handling (parallel)
### Phase 4 (US2)
- T043, T044, T045, T046, T047: Server-side task APIs (parallel)
### Phase 5 (US3)
- T058, T059, T060, T061: Server-side heartbeat and monitoring (parallel)
### Phase 6 (US4)
- T065, T066: Server-side config update (parallel)
### Phase 7 (US5)
- T070, T071: Server-side version check (parallel)
### Phase 8 (Polish)
- T072, T073, T074, T075, T076, T077, T078, T079, T080, T081: Documentation and logging (all parallel)
- T082-T087: Tests (can be done in parallel with implementation)
## Task Metrics
- **Total Tasks**: 87
- **Setup Phase**: 9 tasks
- **Foundational Phase**: 11 tasks
- **User Story 1 (P1)**: 13 tasks - MVP
- **User Story 2 (P1)**: 18 tasks
- **User Story 3 (P2)**: 11 tasks
- **User Story 4 (P2)**: 4 tasks
- **User Story 5 (P3)**: 5 tasks
- **Polish Phase**: 16 tasks
- **Parallelizable Tasks**: 42 tasks (48%)
## Notes
- All file paths are absolute from project root: `/Users/yangyang/Desktop/orbit/`
- Tasks marked with [P] can be executed in parallel with other [P] tasks in the same phase
- Tasks marked with [US1], [US2], etc. belong to specific user stories
- Each user story phase is independently testable
- MVP scope (Phase 3) delivers core value and enables early validation
- Tests are included in Polish phase but can be written incrementally during implementation