Files
xingrin/backend/apps/engine/services/task_distributor.py
yyhuni 2a31e29aa2 fix: Add shell quoting for command arguments
- Use shlex.quote() to escape special characters in argument values
- Fixes: 'unrecognized arguments' error when values contain spaces
- Example: target_name='example.com scan' now correctly quoted
2025-12-30 17:32:09 +08:00

586 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
负载感知任务分发器
根据 Worker 负载动态分发任务,支持本地和远程 Worker。
核心逻辑:
1. 查询所有在线 Worker 的负载(从心跳数据)
2. 选择负载最低的 Worker可能是本地或远程
3. 本地 Worker直接执行 docker run
4. 远程 Worker通过 SSH 执行 docker run
5. 任务执行完自动销毁容器(--rm
镜像版本管理:
- 版本锁定:使用 settings.IMAGE_TAG 确保 server 和 worker 版本一致
- 预拉取策略:安装时预拉取镜像,执行时使用 --pull=missing
- 本地开发:可通过 TASK_EXECUTOR_IMAGE 环境变量指向本地镜像
环境变量注入:
- Worker 容器不使用 env_file通过 docker run -e 动态注入
- 只注入 SERVER_URL容器启动后从配置中心获取完整配置
- 本地 WorkerSERVER_URL = http://server:{port}Docker 内部网络)
- 远程 WorkerSERVER_URL = http://{public_host}:{port}(公网地址)
任务启动流程:
1. Server 调用 execute_scan_flow() 等方法提交任务
2. select_best_worker() 从 Redis 读取心跳数据,选择负载最低的节点
3. _build_docker_command() 构建完整的 docker run 命令:
- 设置网络(本地加入 Docker 网络,远程不指定)
- 注入环境变量(-e SERVER_URL=...
- 挂载结果和日志目录(-v
- 指定执行脚本python -m apps.scan.scripts.xxx
4. _execute_docker_command() 执行命令:
- 本地subprocess.run() 直接执行
- 远程paramiko SSH 执行
5. docker run -d 立即返回容器 ID任务在后台执行
特点:
- 负载感知:任务优先分发到最空闲的机器
- 统一调度:本地和远程 Worker 使用相同的选择逻辑
- 资源隔离:每个任务独立容器
- 按需创建:空闲时零占用
- 版本一致:所有节点使用相同版本的 worker 镜像
"""
import logging
import time
from typing import Optional, Dict, Any
from django.conf import settings
from apps.engine.models import WorkerNode
from apps.engine.services.docker_client_manager import DockerClientManager
logger = logging.getLogger(__name__)
class TaskDistributor:
"""
负载感知任务分发器
根据 Worker 负载自动选择最优节点执行任务。
- 本地 Worker (is_local=True):直接执行 docker 命令
- 远程 Worker (is_local=False):通过 SSH 执行 docker 命令
负载均衡策略:
- 心跳间隔3 秒Agent 上报到 Redis
- 任务间隔6 秒(确保心跳已更新)
- 高负载阈值85%CPU 或内存超过则跳过)
- 在线判断Redis TTL15秒过期视为离线
"""
# 上次任务提交时间(类级别,所有实例共享)
_last_submit_time: float = 0
def __init__(self):
self.docker_image = settings.TASK_EXECUTOR_IMAGE
if not self.docker_image:
raise ValueError("TASK_EXECUTOR_IMAGE 未配置,请确保 IMAGE_TAG 环境变量已设置")
# 统一使用 /opt/xingrin 下的路径
self.logs_mount = "/opt/xingrin/logs"
self.submit_interval = getattr(settings, 'TASK_SUBMIT_INTERVAL', 5)
# Docker 客户端管理器
self.docker_manager = DockerClientManager()
def get_online_workers(self) -> list[WorkerNode]:
"""
获取所有在线的 Worker
判断条件:
- status in ('online', 'offline') 表示已部署
- Redis 中有心跳数据TTL 未过期)
"""
from apps.engine.services.worker_load_service import worker_load_service
# 1. 获取所有已部署的节点online/offline 表示已部署)
workers = WorkerNode.objects.filter(status__in=['online', 'offline'])
# 2. 过滤出 Redis 中有心跳数据的(在线)
online_workers = []
for worker in workers:
if worker_load_service.is_online(worker.id):
online_workers.append(worker)
return online_workers
def select_best_worker(self) -> Optional[WorkerNode]:
"""
选择负载最低的在线 Worker
选择策略:
- 从 Redis 读取实时负载数据
- CPU 权重 70%,内存权重 30%
- 排除 CPU > 85% 或 内存 > 85% 的机器
Returns:
最优 Worker如果没有可用的返回 None
"""
from apps.engine.services.worker_load_service import worker_load_service
workers = self.get_online_workers()
if not workers:
logger.warning("没有可用的在线 Worker")
return None
# 从 Redis 批量获取负载数据
worker_ids = [w.id for w in workers]
loads = worker_load_service.get_all_loads(worker_ids)
# 计算每个 Worker 的负载分数
scored_workers = []
high_load_workers = [] # 高负载 Worker降级备选
for worker in workers:
# 从 Redis 获取负载数据
load = loads.get(worker.id)
if not load:
# Redis 无数据,跳过该节点(不应该发生,因为 get_online_workers 已过滤)
logger.warning(f"Worker {worker.name} 无负载数据,跳过")
continue
cpu = load.get('cpu', 0)
mem = load.get('mem', 0)
# 加权分数(越低越好)
score = cpu * 0.7 + mem * 0.3
# 区分正常和高负载(阈值降到 85%,更保守)
if cpu > 85 or mem > 85:
high_load_workers.append((worker, score, cpu, mem))
logger.debug(
"高负载 Worker: %s (CPU: %.1f%%, MEM: %.1f%%)",
worker.name, cpu, mem
)
else:
scored_workers.append((worker, score, cpu, mem))
# 降级策略:如果没有正常负载的,循环等待后重新检测
if not scored_workers:
if high_load_workers:
# 高负载等待参数(默认每 60 秒检测一次,最多 10 次)
high_load_wait = getattr(settings, 'HIGH_LOAD_WAIT_SECONDS', 60)
high_load_max_retries = getattr(settings, 'HIGH_LOAD_MAX_RETRIES', 10)
# 开始等待前发送高负载通知
high_load_workers.sort(key=lambda x: x[1])
_, _, first_cpu, first_mem = high_load_workers[0]
from apps.common.signals import all_workers_high_load
all_workers_high_load.send(
sender=self.__class__,
worker_name="所有节点",
cpu=first_cpu,
mem=first_mem
)
for retry in range(high_load_max_retries):
logger.warning(
"所有 Worker 高负载,等待 %d 秒后重试... (%d/%d)",
high_load_wait, retry + 1, high_load_max_retries
)
time.sleep(high_load_wait)
# 重新获取负载数据
loads = worker_load_service.get_all_loads(worker_ids)
# 重新评估
scored_workers = []
high_load_workers = []
for worker in workers:
load = loads.get(worker.id)
if not load:
continue
cpu = load.get('cpu', 0)
mem = load.get('mem', 0)
score = cpu * 0.7 + mem * 0.3
if cpu > 85 or mem > 85:
high_load_workers.append((worker, score, cpu, mem))
else:
scored_workers.append((worker, score, cpu, mem))
# 如果有正常负载的 Worker跳出循环
if scored_workers:
logger.info("检测到正常负载 Worker结束等待")
break
# 超时或仍然高负载,选择负载最低的
if not scored_workers and high_load_workers:
high_load_workers.sort(key=lambda x: x[1])
best_worker, _, cpu, mem = high_load_workers[0]
logger.warning(
"等待超时,强制分发到高负载 Worker: %s (CPU: %.1f%%, MEM: %.1f%%)",
best_worker.name, cpu, mem
)
return best_worker
return best_worker
else:
logger.warning("没有可用的 Worker")
return None
# 选择分数最低的
scored_workers.sort(key=lambda x: x[1])
best_worker, score, cpu, mem = scored_workers[0]
logger.info(
"选择 Worker: %s (CPU: %.1f%%, MEM: %.1f%%, Score: %.1f)",
best_worker.name, cpu, mem, score
)
return best_worker
def _build_container_command(self, script_module: str, script_args: Dict[str, Any]) -> list:
"""构建容器命令
Args:
script_module: Python 模块路径(如 'apps.scan.scripts.run_initiate_scan'
script_args: 脚本参数字典
Returns:
命令列表(如 ['python', '-m', 'script', '--arg=value']
"""
import shlex
# 日志文件路径(容器内)
log_file = f"{self.logs_mount}/container_{script_module.split('.')[-1]}.log"
# 构建参数列表(使用 shlex.quote 转义特殊字符)
args = [f"--{k}={shlex.quote(str(v))}" for k, v in script_args.items()]
# 完整命令:日志轮转 + 执行脚本
command = [
'sh', '-c',
f'tail -n 10000 {log_file} > {log_file}.tmp 2>/dev/null; '
f'mv {log_file}.tmp {log_file} 2>/dev/null; '
f'python -m {script_module} {" ".join(args)} >> {log_file} 2>&1'
]
return command
def _build_container_environment(self, worker: WorkerNode) -> Dict[str, str]:
"""构建容器环境变量
Args:
worker: Worker 节点
Returns:
环境变量字典
"""
# 根据 Worker 类型确定 Server 地址
if worker.is_local:
# 本地:使用 Docker 网络内部服务名
server_url = f"http://server:{settings.SERVER_PORT}"
else:
# 远程:通过 Nginx 反向代理访问
server_url = f"https://{settings.PUBLIC_HOST}:{settings.PUBLIC_PORT}"
is_local_str = "true" if worker.is_local else "false"
return {
'SERVER_URL': server_url,
'IS_LOCAL': is_local_str,
'PREFECT_HOME': '/tmp/.prefect',
'PREFECT_SERVER_EPHEMERAL_ENABLED': 'true',
'PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS': '120',
'PREFECT_SERVER_DATABASE_CONNECTION_URL': 'sqlite+aiosqlite:////tmp/.prefect/prefect.db',
'PREFECT_LOGGING_LEVEL': 'WARNING',
}
def _build_container_volumes(self) -> Dict[str, Dict[str, str]]:
"""构建容器挂载卷
Returns:
挂载卷配置字典
"""
host_xingrin_dir = "/opt/xingrin"
return {
host_xingrin_dir: {
'bind': host_xingrin_dir,
'mode': 'rw'
}
}
def _wait_for_submit_interval(self):
"""
等待任务提交间隔(后台线程中执行,不阻塞 API
确保连续任务提交之间有足够的间隔,让心跳有时间更新负载数据。
如果距上次提交已超过间隔,则不等待。
"""
if TaskDistributor._last_submit_time > 0:
elapsed = time.time() - TaskDistributor._last_submit_time
if elapsed < self.submit_interval:
time.sleep(self.submit_interval - elapsed)
TaskDistributor._last_submit_time = time.time()
def execute_scan_flow(
self,
scan_id: int,
target_name: str,
target_id: int,
scan_workspace_dir: str,
engine_name: str,
scheduled_scan_name: str | None = None,
) -> tuple[bool, str, Optional[str], Optional[int]]:
"""
在远程或本地 Worker 上执行扫描 Flow
Args:
scan_id: 扫描任务 ID
target_name: 目标名称
target_id: 目标 ID
scan_workspace_dir: 扫描工作目录
engine_name: 引擎名称
scheduled_scan_name: 定时扫描任务名称(可选)
Returns:
(success, message, container_id, worker_id) 元组
Note:
engine_config 由 Flow 内部通过 scan_id 查询数据库获取
"""
logger.info("="*60)
logger.info("execute_scan_flow 开始")
logger.info(" scan_id: %s", scan_id)
logger.info(" target_name: %s", target_name)
logger.info(" target_id: %s", target_id)
logger.info(" scan_workspace_dir: %s", scan_workspace_dir)
logger.info(" engine_name: %s", engine_name)
logger.info(" docker_image: %s", self.docker_image)
logger.info("="*60)
# 1. 等待提交间隔(后台线程执行,不阻塞 API
logger.info("等待提交间隔...")
self._wait_for_submit_interval()
logger.info("提交间隔等待完成")
# 2. 选择最佳 Worker
worker = self.select_best_worker()
if not worker:
return False, "没有可用的 Worker", None, None
# 3. 构建容器配置
script_args = {
'scan_id': scan_id,
'target_name': target_name,
'target_id': target_id,
'scan_workspace_dir': scan_workspace_dir,
'engine_name': engine_name,
}
if scheduled_scan_name:
script_args['scheduled_scan_name'] = scheduled_scan_name
# 构建命令行参数
command = self._build_container_command(
script_module='apps.scan.scripts.run_initiate_scan',
script_args=script_args,
)
# 构建环境变量
environment = self._build_container_environment(worker)
# 构建挂载卷
volumes = self._build_container_volumes()
# 网络配置(只有本地 Worker 需要)
network = settings.DOCKER_NETWORK_NAME if worker.is_local else None
logger.info(
"提交扫描任务到 Worker: %s - Scan ID: %d, Target: %s",
worker.name, scan_id, target_name
)
# 4. 使用 Docker SDK 运行容器
success, output = self.docker_manager.run_container(
worker=worker,
image=self.docker_image,
command=command,
environment=environment,
volumes=volumes,
network=network,
detach=True,
remove=True,
)
if success:
container_id = output # SDK 返回完整容器 ID
logger.info(
"扫描任务已提交 - Scan ID: %d, Worker: %s, Container: %s",
scan_id, worker.name, container_id[:12]
)
return True, f"任务已提交到 {worker.name}", container_id, worker.id
else:
logger.error(
"扫描任务提交失败 - Scan ID: %d, Worker: %s, Error: %s",
scan_id, worker.name, output
)
return False, output, None, None
def execute_cleanup_on_all_workers(
self,
retention_days: int = 7,
) -> list[dict]:
"""
在所有 Worker 上执行清理任务
Args:
retention_days: 保留天数默认7天
Returns:
各 Worker 的执行结果列表
"""
results = []
# 获取所有在线的 Worker
workers = self.get_online_workers()
if not workers:
logger.warning("没有可用的 Worker 执行清理任务")
return results
logger.info(f"开始在 {len(workers)} 个 Worker 上执行清理任务")
for worker in workers:
try:
# 构建容器配置
script_args = {
'results_dir': '/opt/xingrin/results',
'retention_days': retention_days,
}
command = self._build_container_command(
script_module='apps.scan.scripts.run_cleanup',
script_args=script_args,
)
environment = self._build_container_environment(worker)
volumes = self._build_container_volumes()
network = settings.DOCKER_NETWORK_NAME if worker.is_local else None
# 使用 Docker SDK 运行容器
success, output = self.docker_manager.run_container(
worker=worker,
image=self.docker_image,
command=command,
environment=environment,
volumes=volumes,
network=network,
detach=True,
remove=True,
)
results.append({
'worker_id': worker.id,
'worker_name': worker.name,
'success': success,
'output': output[:500] if output else None,
})
if success:
logger.info(f"✓ Worker {worker.name} 清理任务已启动")
else:
logger.warning(f"✗ Worker {worker.name} 清理任务启动失败: {output}")
except Exception as e:
logger.error(f"Worker {worker.name} 清理任务执行异常: {e}")
results.append({
'worker_id': worker.id,
'worker_name': worker.name,
'success': False,
'error': str(e),
})
return results
def execute_delete_task(
self,
task_type: str,
ids: list[int],
) -> tuple[bool, str, str | None]:
"""
分发删除任务到最优 Worker
统一入口,根据 task_type 选择对应的删除脚本执行。
Args:
task_type: 任务类型 ('targets', 'organizations', 'scans')
ids: 要删除的 ID 列表
Returns:
(success, message, container_id) 元组
"""
import json
# 映射任务类型到脚本
script_map = {
'targets': 'apps.targets.scripts.run_delete_targets',
'organizations': 'apps.targets.scripts.run_delete_organizations',
'scans': 'apps.scan.scripts.run_delete_scans',
}
# 映射任务类型到参数名
param_map = {
'targets': 'target_ids',
'organizations': 'organization_ids',
'scans': 'scan_ids',
}
if task_type not in script_map:
return False, f"不支持的任务类型: {task_type}", None
# 选择最佳 Worker
worker = self.select_best_worker()
if not worker:
return False, "没有可用的 Worker", None
# 构建参数ID 列表需要 JSON 序列化)
script_args = {
param_map[task_type]: json.dumps(ids),
}
# 构建 docker run 命令
docker_cmd = self._build_docker_command(
worker=worker,
script_module=script_map[task_type],
script_args=script_args,
)
logger.info(
"分发删除任务 - 类型: %s, 数量: %d, Worker: %s",
task_type, len(ids), worker.name
)
# 执行命令
success, output = self._execute_docker_command(worker, docker_cmd)
if success:
container_id = output.strip() if output else None
logger.info(
"✓ 删除任务已分发 - 类型: %s, Container: %s",
task_type, container_id
)
return True, f"任务已提交到 {worker.name}", container_id
else:
logger.error(
"✗ 删除任务分发失败 - 类型: %s, Error: %s",
task_type, output
)
return False, output, None
# 单例
_distributor: Optional[TaskDistributor] = None
def get_task_distributor() -> TaskDistributor:
"""获取任务分发器单例"""
global _distributor
if _distributor is None:
_distributor = TaskDistributor()
return _distributor