Files
xingrin/backend/apps/engine/services/task_distributor.py

681 lines
26 KiB
Python
Raw Normal View History

2025-12-12 18:04:57 +08:00
"""
负载感知任务分发器
根据 Worker 负载动态分发任务支持本地和远程 Worker
核心逻辑
1. 查询所有在线 Worker 的负载从心跳数据
2. 选择负载最低的 Worker可能是本地或远程
3. 本地 Worker直接执行 docker run
4. 远程 Worker通过 SSH 执行 docker run
5. 任务执行完自动销毁容器--rm
2025-12-12 18:04:57 +08:00
镜像版本管理
- 版本锁定使用 settings.IMAGE_TAG 确保 server worker 版本一致
- 预拉取策略安装时预拉取镜像执行时使用 --pull=missing
- 本地开发可通过 TASK_EXECUTOR_IMAGE 环境变量指向本地镜像
环境变量注入
- Worker 容器不使用 env_file通过 docker run -e 动态注入
- 只注入 SERVER_URL容器启动后从配置中心获取完整配置
- 本地 WorkerSERVER_URL = http://server:{port}Docker 内部网络
- 远程 WorkerSERVER_URL = http://{public_host}:{port}公网地址
任务启动流程
1. Server 调用 execute_scan_flow() 等方法提交任务
2. select_best_worker() Redis 读取心跳数据选择负载最低的节点
3. _build_docker_command() 构建完整的 docker run 命令
- 设置网络本地加入 Docker 网络远程不指定
- 注入环境变量-e SERVER_URL=...
- 挂载结果和日志目录-v
- 指定执行脚本python -m apps.scan.scripts.xxx
4. _execute_docker_command() 执行命令
- 本地subprocess.run() 直接执行
- 远程paramiko SSH 执行
5. docker run -d 立即返回容器 ID任务在后台执行
2025-12-12 18:04:57 +08:00
特点
- 负载感知任务优先分发到最空闲的机器
- 统一调度本地和远程 Worker 使用相同的选择逻辑
- 资源隔离每个任务独立容器
- 按需创建空闲时零占用
- 版本一致所有节点使用相同版本的 worker 镜像
2025-12-12 18:04:57 +08:00
"""
import logging
import time
from typing import Optional, Dict, Any
2025-12-30 20:35:51 +08:00
import paramiko
2025-12-12 18:04:57 +08:00
from django.conf import settings
from apps.engine.models import WorkerNode
logger = logging.getLogger(__name__)
class TaskDistributor:
"""
负载感知任务分发器
根据 Worker 负载自动选择最优节点执行任务
- 本地 Worker (is_local=True)直接执行 docker 命令
- 远程 Worker (is_local=False)通过 SSH 执行 docker 命令
负载均衡策略
- 心跳间隔3 Agent 上报到 Redis
- 任务间隔6 确保心跳已更新
- 高负载阈值85%CPU 或内存超过则跳过
- 在线判断Redis TTL15秒过期视为离线
"""
# 上次任务提交时间(类级别,所有实例共享)
_last_submit_time: float = 0
def __init__(self):
2025-12-15 12:09:28 +08:00
self.docker_image = settings.TASK_EXECUTOR_IMAGE
if not self.docker_image:
raise ValueError("TASK_EXECUTOR_IMAGE 未配置,请确保 IMAGE_TAG 环境变量已设置")
# 统一使用 /opt/xingrin 下的路径
self.logs_mount = "/opt/xingrin/logs"
2025-12-12 18:04:57 +08:00
self.submit_interval = getattr(settings, 'TASK_SUBMIT_INTERVAL', 5)
def get_online_workers(self) -> list[WorkerNode]:
"""
获取所有在线的 Worker
判断条件
- status in ('online', 'offline') 表示已部署
- Redis 中有心跳数据TTL 未过期
"""
from apps.engine.services.worker_load_service import worker_load_service
# 1. 获取所有已部署的节点online/offline 表示已部署)
workers = WorkerNode.objects.filter(status__in=['online', 'offline'])
# 2. 过滤出 Redis 中有心跳数据的(在线)
online_workers = []
for worker in workers:
if worker_load_service.is_online(worker.id):
online_workers.append(worker)
return online_workers
def select_best_worker(self) -> Optional[WorkerNode]:
"""
选择负载最低的在线 Worker
选择策略
- Redis 读取实时负载数据
- CPU 权重 70%内存权重 30%
- 排除 CPU > 85% 内存 > 85% 的机器
Returns:
最优 Worker如果没有可用的返回 None
"""
from apps.engine.services.worker_load_service import worker_load_service
workers = self.get_online_workers()
if not workers:
logger.warning("没有可用的在线 Worker")
return None
# 从 Redis 批量获取负载数据
worker_ids = [w.id for w in workers]
loads = worker_load_service.get_all_loads(worker_ids)
# 计算每个 Worker 的负载分数
scored_workers = []
high_load_workers = [] # 高负载 Worker降级备选
for worker in workers:
# 从 Redis 获取负载数据
load = loads.get(worker.id)
if not load:
# Redis 无数据,跳过该节点(不应该发生,因为 get_online_workers 已过滤)
logger.warning(f"Worker {worker.name} 无负载数据,跳过")
continue
cpu = load.get('cpu', 0)
mem = load.get('mem', 0)
# 加权分数(越低越好)
score = cpu * 0.7 + mem * 0.3
# 区分正常和高负载(阈值降到 85%,更保守)
if cpu > 85 or mem > 85:
high_load_workers.append((worker, score, cpu, mem))
logger.debug(
"高负载 Worker: %s (CPU: %.1f%%, MEM: %.1f%%)",
worker.name, cpu, mem
)
else:
scored_workers.append((worker, score, cpu, mem))
2025-12-27 12:49:14 +08:00
# 降级策略:如果没有正常负载的,循环等待后重新检测
2025-12-12 18:04:57 +08:00
if not scored_workers:
if high_load_workers:
2025-12-27 12:49:14 +08:00
# 高负载等待参数(默认每 60 秒检测一次,最多 10 次)
high_load_wait = getattr(settings, 'HIGH_LOAD_WAIT_SECONDS', 60)
2025-12-27 12:49:14 +08:00
high_load_max_retries = getattr(settings, 'HIGH_LOAD_MAX_RETRIES', 10)
2025-12-27 12:49:14 +08:00
# 开始等待前发送高负载通知
2025-12-19 12:11:20 +08:00
high_load_workers.sort(key=lambda x: x[1])
2025-12-27 12:49:14 +08:00
_, _, first_cpu, first_mem = high_load_workers[0]
2025-12-19 12:11:20 +08:00
from apps.common.signals import all_workers_high_load
all_workers_high_load.send(
sender=self.__class__,
2025-12-27 12:49:14 +08:00
worker_name="所有节点",
cpu=first_cpu,
mem=first_mem
2025-12-19 12:11:20 +08:00
)
2025-12-27 12:49:14 +08:00
for retry in range(high_load_max_retries):
logger.warning(
"所有 Worker 高负载,等待 %d 秒后重试... (%d/%d)",
high_load_wait, retry + 1, high_load_max_retries
)
time.sleep(high_load_wait)
# 重新获取负载数据
loads = worker_load_service.get_all_loads(worker_ids)
# 重新评估
scored_workers = []
high_load_workers = []
for worker in workers:
load = loads.get(worker.id)
if not load:
continue
cpu = load.get('cpu', 0)
mem = load.get('mem', 0)
score = cpu * 0.7 + mem * 0.3
if cpu > 85 or mem > 85:
high_load_workers.append((worker, score, cpu, mem))
else:
scored_workers.append((worker, score, cpu, mem))
# 如果有正常负载的 Worker跳出循环
if scored_workers:
logger.info("检测到正常负载 Worker结束等待")
break
# 超时或仍然高负载,选择负载最低的
if not scored_workers and high_load_workers:
high_load_workers.sort(key=lambda x: x[1])
best_worker, _, cpu, mem = high_load_workers[0]
logger.warning(
"等待超时,强制分发到高负载 Worker: %s (CPU: %.1f%%, MEM: %.1f%%)",
best_worker.name, cpu, mem
)
return best_worker
return best_worker
2025-12-12 18:04:57 +08:00
else:
logger.warning("没有可用的 Worker")
return None
# 选择分数最低的
scored_workers.sort(key=lambda x: x[1])
best_worker, score, cpu, mem = scored_workers[0]
logger.info(
"选择 Worker: %s (CPU: %.1f%%, MEM: %.1f%%, Score: %.1f)",
best_worker.name, cpu, mem, score
)
return best_worker
2025-12-30 20:35:51 +08:00
def _wait_for_submit_interval(self):
"""
等待任务提交间隔后台线程中执行不阻塞 API
确保连续任务提交之间有足够的间隔让心跳有时间更新负载数据
如果距上次提交已超过间隔则不等待
"""
if TaskDistributor._last_submit_time > 0:
elapsed = time.time() - TaskDistributor._last_submit_time
if elapsed < self.submit_interval:
time.sleep(self.submit_interval - elapsed)
TaskDistributor._last_submit_time = time.time()
def _build_docker_command(
self,
worker: WorkerNode,
script_module: str,
script_args: Dict[str, Any],
) -> str:
"""
构建 docker run 命令
容器只需要 SERVER_URL启动后从配置中心获取完整配置
2025-12-12 18:04:57 +08:00
Args:
2025-12-30 20:35:51 +08:00
worker: 目标 Worker用于区分本地/远程网络
script_module: 脚本模块路径 apps.scan.scripts.run_initiate_scan
script_args: 脚本参数会转换为命令行参数
Returns:
2025-12-30 20:35:51 +08:00
完整的 docker run 命令
2025-12-12 18:04:57 +08:00
"""
import shlex
2025-12-30 20:35:51 +08:00
# 根据 Worker 类型确定网络和 Server 地址
if worker.is_local:
# 本地:加入 Docker 网络,使用内部服务名
network_arg = f"--network {settings.DOCKER_NETWORK_NAME}"
server_url = f"http://server:{settings.SERVER_PORT}"
else:
# 远程:通过 Nginx 反向代理访问HTTPS不直连 8888 端口)
network_arg = ""
server_url = f"https://{settings.PUBLIC_HOST}:{settings.PUBLIC_PORT}"
# 挂载路径(统一挂载 /opt/xingrin扫描工具在 /opt/xingrin-tools/bin 不受影响)
host_xingrin_dir = "/opt/xingrin"
2025-12-12 18:04:57 +08:00
2025-12-30 20:35:51 +08:00
# 环境变量SERVER_URL + IS_LOCAL其他配置容器启动时从配置中心获取
# IS_LOCAL 用于 Worker 向配置中心声明身份,决定返回的数据库地址
# Prefect 本地模式配置:启用 ephemeral server本地临时服务器
is_local_str = "true" if worker.is_local else "false"
env_vars = [
f"-e SERVER_URL={shlex.quote(server_url)}",
f"-e IS_LOCAL={is_local_str}",
f"-e WORKER_API_KEY={shlex.quote(settings.WORKER_API_KEY)}", # Worker API 认证密钥
2025-12-30 20:35:51 +08:00
"-e PREFECT_HOME=/tmp/.prefect", # 设置 Prefect 数据目录到可写位置
"-e PREFECT_SERVER_EPHEMERAL_ENABLED=true", # 启用 ephemeral server本地临时服务器
"-e PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS=120", # 增加启动超时时间
"-e PREFECT_SERVER_DATABASE_CONNECTION_URL=sqlite+aiosqlite:////tmp/.prefect/prefect.db", # 使用 /tmp 下的 SQLite
"-e PREFECT_LOGGING_LEVEL=WARNING", # 日志级别(减少 DEBUG 噪音)
]
2025-12-12 18:04:57 +08:00
2025-12-30 20:35:51 +08:00
# 挂载卷(统一挂载整个 /opt/xingrin 目录)
volumes = [
f"-v {host_xingrin_dir}:{host_xingrin_dir}",
]
2025-12-12 18:04:57 +08:00
2025-12-30 20:35:51 +08:00
# 构建命令行参数
# 使用 shlex.quote 处理特殊字符,确保参数在 shell 中正确解析
args_str = " ".join([f"--{k}={shlex.quote(str(v))}" for k, v in script_args.items()])
# 日志文件路径(容器内),保留最近 10000 行
log_file = f"{self.logs_mount}/container_{script_module.split('.')[-1]}.log"
# 构建内部命令(日志轮转 + 执行脚本)
inner_cmd = f'tail -n 10000 {log_file} > {log_file}.tmp 2>/dev/null; mv {log_file}.tmp {log_file} 2>/dev/null; python -m {script_module} {args_str} >> {log_file} 2>&1'
# 完整命令
# 镜像拉取策略:--pull=missing
# - 本地 Workerinstall.sh 已预拉取镜像,直接使用本地版本
# - 远程 Workerdeploy 时已预拉取镜像,直接使用本地版本
# - 避免每次任务都检查 Docker Hub提升性能和稳定性
2025-12-31 12:01:53 +08:00
cmd = f'''docker run --rm -d --pull=missing {network_arg} \\
{' '.join(env_vars)} \\
{' '.join(volumes)} \\
{self.docker_image} \\
2025-12-30 20:35:51 +08:00
sh -c "{inner_cmd}"'''
return cmd
2025-12-30 20:35:51 +08:00
def _execute_docker_command(
self,
worker: WorkerNode,
docker_cmd: str,
) -> tuple[bool, str]:
"""
Worker 上执行 docker run 命令
docker run -d 会立即返回容器 ID无需等待任务完成
Args:
2025-12-30 20:35:51 +08:00
worker: 目标 Worker
docker_cmd: docker run 命令
2025-12-12 18:04:57 +08:00
Returns:
2025-12-30 20:35:51 +08:00
(success, container_id) 元组
2025-12-12 18:04:57 +08:00
"""
2025-12-30 20:35:51 +08:00
logger.info("准备执行 Docker 命令 - Worker: %s, Local: %s", worker.name, worker.is_local)
logger.info("Docker 命令: %s", docker_cmd[:200] + '...' if len(docker_cmd) > 200 else docker_cmd)
2025-12-12 18:04:57 +08:00
if worker.is_local:
2025-12-30 20:35:51 +08:00
return self._execute_local_docker(docker_cmd)
2025-12-12 18:04:57 +08:00
else:
2025-12-30 20:35:51 +08:00
return self._execute_ssh_docker(worker, docker_cmd)
2025-12-12 18:04:57 +08:00
2025-12-30 20:35:51 +08:00
def _execute_local_docker(
self,
docker_cmd: str,
) -> tuple[bool, str]:
2025-12-12 18:04:57 +08:00
"""
2025-12-30 20:35:51 +08:00
在本地执行 docker run 命令
2025-12-12 18:04:57 +08:00
2025-12-30 20:35:51 +08:00
docker run -d 立即返回容器 ID
"""
import subprocess
logger.info("开始执行本地 Docker 命令...")
try:
result = subprocess.run(
docker_cmd,
shell=True,
capture_output=True,
text=True,
)
if result.returncode != 0:
logger.error(
"本地 Docker 执行失败 - Exit: %d, Stderr: %s, Stdout: %s",
result.returncode, result.stderr[:500], result.stdout[:500]
)
return False, result.stderr
container_id = result.stdout.strip()
logger.info("本地 Docker 执行成功 - Container ID: %s", container_id[:12] if container_id else 'N/A')
return True, container_id
except Exception as e:
logger.error("本地 Docker 执行异常: %s", e, exc_info=True)
return False, f"执行异常: {e}"
2025-12-12 18:04:57 +08:00
2025-12-30 20:35:51 +08:00
def _execute_ssh_docker(
self,
worker: WorkerNode,
docker_cmd: str,
) -> tuple[bool, str]:
2025-12-12 18:04:57 +08:00
"""
2025-12-30 20:35:51 +08:00
在远程 Worker 上通过 SSH 执行 docker run 命令
2025-12-12 18:04:57 +08:00
2025-12-30 20:35:51 +08:00
docker run -d 立即返回容器 ID无需长时间等待
Args:
worker: 目标 Worker
docker_cmd: docker run 命令
Returns:
(success, container_id) 元组
2025-12-12 18:04:57 +08:00
"""
2025-12-30 20:35:51 +08:00
ssh = None
logger.info("开始 SSH Docker 执行 - Worker: %s (%s:%d)", worker.name, worker.ip_address, worker.ssh_port)
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接SSH 连接超时 10 秒足够)
ssh.connect(
hostname=worker.ip_address,
port=worker.ssh_port,
username=worker.username,
password=worker.password if worker.password else None,
timeout=10,
)
logger.debug("SSH 连接成功 - Worker: %s", worker.name)
# 执行 docker run-d 模式立即返回)
stdin, stdout, stderr = ssh.exec_command(docker_cmd)
exit_code = stdout.channel.recv_exit_status()
output = stdout.read().decode().strip()
error = stderr.read().decode().strip()
if exit_code != 0:
logger.error(
"SSH Docker 执行失败 - Worker: %s, Exit: %d, Stderr: %s, Stdout: %s",
worker.name, exit_code, error[:500], output[:500]
)
return False, error
logger.info("SSH Docker 执行成功 - Worker: %s, Container ID: %s", worker.name, output[:12] if output else 'N/A')
return True, output
except paramiko.AuthenticationException as e:
logger.error("SSH 认证失败 - Worker: %s, Error: %s", worker.name, e)
return False, f"认证失败: {e}"
except paramiko.SSHException as e:
logger.error("SSH 连接错误 - Worker: %s, Error: %s", worker.name, e)
return False, f"SSH 错误: {e}"
except Exception as e:
logger.error("SSH Docker 执行异常 - Worker: %s, Error: %s", worker.name, e)
return False, f"执行异常: {e}"
finally:
if ssh:
ssh.close()
2025-12-12 18:04:57 +08:00
def execute_scan_flow(
self,
scan_id: int,
target_name: str,
target_id: int,
scan_workspace_dir: str,
engine_name: str,
scheduled_scan_name: str | None = None,
) -> tuple[bool, str, Optional[str], Optional[int]]:
"""
在远程或本地 Worker 上执行扫描 Flow
Args:
scan_id: 扫描任务 ID
target_name: 目标名称
target_id: 目标 ID
scan_workspace_dir: 扫描工作目录
engine_name: 引擎名称
scheduled_scan_name: 定时扫描任务名称可选
Returns:
(success, message, container_id, worker_id) 元组
Note:
engine_config Flow 内部通过 scan_id 查询数据库获取
"""
logger.info("="*60)
logger.info("execute_scan_flow 开始")
logger.info(" scan_id: %s", scan_id)
logger.info(" target_name: %s", target_name)
logger.info(" target_id: %s", target_id)
logger.info(" scan_workspace_dir: %s", scan_workspace_dir)
logger.info(" engine_name: %s", engine_name)
logger.info(" docker_image: %s", self.docker_image)
logger.info("="*60)
2025-12-12 18:04:57 +08:00
# 1. 等待提交间隔(后台线程执行,不阻塞 API
logger.info("等待提交间隔...")
2025-12-12 18:04:57 +08:00
self._wait_for_submit_interval()
logger.info("提交间隔等待完成")
2025-12-12 18:04:57 +08:00
# 2. 选择最佳 Worker
worker = self.select_best_worker()
if not worker:
return False, "没有可用的 Worker", None, None
2025-12-30 20:35:51 +08:00
# 3. 构建 docker run 命令
2025-12-12 18:04:57 +08:00
script_args = {
'scan_id': scan_id,
'target_name': target_name,
'target_id': target_id,
'scan_workspace_dir': scan_workspace_dir,
'engine_name': engine_name,
}
if scheduled_scan_name:
script_args['scheduled_scan_name'] = scheduled_scan_name
2025-12-30 20:35:51 +08:00
docker_cmd = self._build_docker_command(
worker=worker,
2025-12-12 18:04:57 +08:00
script_module='apps.scan.scripts.run_initiate_scan',
script_args=script_args,
)
logger.info(
"提交扫描任务到 Worker: %s - Scan ID: %d, Target: %s",
worker.name, scan_id, target_name
)
2025-12-30 20:35:51 +08:00
# 4. 执行 docker run本地直接执行远程通过 SSH
success, output = self._execute_docker_command(worker, docker_cmd)
2025-12-12 18:04:57 +08:00
if success:
2025-12-30 20:35:51 +08:00
container_id = output[:12] if output else None
2025-12-12 18:04:57 +08:00
logger.info(
"扫描任务已提交 - Scan ID: %d, Worker: %s, Container: %s",
2025-12-30 20:35:51 +08:00
scan_id, worker.name, container_id
2025-12-12 18:04:57 +08:00
)
return True, f"任务已提交到 {worker.name}", container_id, worker.id
else:
logger.error(
"扫描任务提交失败 - Scan ID: %d, Worker: %s, Error: %s",
scan_id, worker.name, output
)
return False, output, None, None
def execute_cleanup_on_all_workers(
self,
retention_days: int = 7,
) -> list[dict]:
"""
在所有 Worker 上执行清理任务
Args:
retention_days: 保留天数默认7天
Returns:
Worker 的执行结果列表
"""
results = []
# 获取所有在线的 Worker
workers = self.get_online_workers()
if not workers:
logger.warning("没有可用的 Worker 执行清理任务")
return results
logger.info(f"开始在 {len(workers)} 个 Worker 上执行清理任务")
for worker in workers:
try:
2025-12-30 20:35:51 +08:00
# 构建 docker run 命令(清理过期扫描结果目录)
2025-12-12 18:04:57 +08:00
script_args = {
'results_dir': '/opt/xingrin/results',
2025-12-12 18:04:57 +08:00
'retention_days': retention_days,
}
2025-12-30 20:35:51 +08:00
docker_cmd = self._build_docker_command(
worker=worker,
2025-12-12 18:04:57 +08:00
script_module='apps.scan.scripts.run_cleanup',
script_args=script_args,
)
2025-12-30 20:35:51 +08:00
# 执行清理命令
success, output = self._execute_docker_command(worker, docker_cmd)
2025-12-12 18:04:57 +08:00
results.append({
'worker_id': worker.id,
'worker_name': worker.name,
'success': success,
'output': output[:500] if output else None,
})
if success:
logger.info(f"✓ Worker {worker.name} 清理任务已启动")
else:
logger.warning(f"✗ Worker {worker.name} 清理任务启动失败: {output}")
except Exception as e:
logger.error(f"Worker {worker.name} 清理任务执行异常: {e}")
results.append({
'worker_id': worker.id,
'worker_name': worker.name,
'success': False,
'error': str(e),
})
return results
def execute_delete_task(
self,
task_type: str,
ids: list[int],
) -> tuple[bool, str, str | None]:
"""
分发删除任务到最优 Worker
统一入口根据 task_type 选择对应的删除脚本执行
Args:
task_type: 任务类型 ('targets', 'organizations', 'scans')
ids: 要删除的 ID 列表
Returns:
(success, message, container_id) 元组
"""
import json
# 映射任务类型到脚本
script_map = {
'targets': 'apps.targets.scripts.run_delete_targets',
'organizations': 'apps.targets.scripts.run_delete_organizations',
'scans': 'apps.scan.scripts.run_delete_scans',
}
# 映射任务类型到参数名
param_map = {
'targets': 'target_ids',
'organizations': 'organization_ids',
'scans': 'scan_ids',
}
if task_type not in script_map:
return False, f"不支持的任务类型: {task_type}", None
# 选择最佳 Worker
worker = self.select_best_worker()
if not worker:
return False, "没有可用的 Worker", None
# 构建参数ID 列表需要 JSON 序列化)
script_args = {
param_map[task_type]: json.dumps(ids),
}
# 构建 docker run 命令
docker_cmd = self._build_docker_command(
worker=worker,
script_module=script_map[task_type],
script_args=script_args,
)
logger.info(
"分发删除任务 - 类型: %s, 数量: %d, Worker: %s",
task_type, len(ids), worker.name
)
# 执行命令
success, output = self._execute_docker_command(worker, docker_cmd)
if success:
container_id = output.strip() if output else None
logger.info(
"✓ 删除任务已分发 - 类型: %s, Container: %s",
task_type, container_id
)
return True, f"任务已提交到 {worker.name}", container_id
else:
logger.error(
"✗ 删除任务分发失败 - 类型: %s, Error: %s",
task_type, output
)
return False, output, None
# 单例
_distributor: Optional[TaskDistributor] = None
def get_task_distributor() -> TaskDistributor:
"""获取任务分发器单例"""
global _distributor
if _distributor is None:
_distributor = TaskDistributor()
return _distributor