diff --git a/backend/apps/scan/handlers/scan_flow_handlers.py b/backend/apps/scan/handlers/scan_flow_handlers.py index 1caf545b..28fe4706 100644 --- a/backend/apps/scan/handlers/scan_flow_handlers.py +++ b/backend/apps/scan/handlers/scan_flow_handlers.py @@ -6,14 +6,20 @@ 职责: - 更新各阶段的进度状态(running/completed/failed) - 发送扫描阶段的通知 +- 记录 Flow 性能指标 """ import logging from prefect import Flow from prefect.client.schemas import FlowRun, State +from apps.scan.utils.performance import FlowPerformanceTracker + logger = logging.getLogger(__name__) +# 存储每个 flow_run 的性能追踪器 +_flow_trackers: dict[str, FlowPerformanceTracker] = {} + def _get_stage_from_flow_name(flow_name: str) -> str | None: """ @@ -35,6 +41,7 @@ def on_scan_flow_running(flow: Flow, flow_run: FlowRun, state: State) -> None: 职责: - 更新阶段进度为 running - 发送扫描开始通知 + - 启动性能追踪 Args: flow: Prefect Flow 对象 @@ -47,6 +54,13 @@ def on_scan_flow_running(flow: Flow, flow_run: FlowRun, state: State) -> None: flow_params = flow_run.parameters or {} scan_id = flow_params.get('scan_id') target_name = flow_params.get('target_name', 'unknown') + target_id = flow_params.get('target_id') + + # 启动性能追踪 + if scan_id: + tracker = FlowPerformanceTracker(flow.name, scan_id) + tracker.start(target_id=target_id, target_name=target_name) + _flow_trackers[str(flow_run.id)] = tracker # 更新阶段进度 stage = _get_stage_from_flow_name(flow.name) @@ -67,6 +81,7 @@ def on_scan_flow_completed(flow: Flow, flow_run: FlowRun, state: State) -> None: 职责: - 更新阶段进度为 completed - 发送扫描完成通知(可选) + - 记录性能指标 Args: flow: Prefect Flow 对象 @@ -79,6 +94,18 @@ def on_scan_flow_completed(flow: Flow, flow_run: FlowRun, state: State) -> None: flow_params = flow_run.parameters or {} scan_id = flow_params.get('scan_id') + # 获取 flow result + result = None + try: + result = state.result() if state.result else None + except Exception: + pass + + # 记录性能指标 + tracker = _flow_trackers.pop(str(flow_run.id), None) + if tracker: + tracker.finish(success=True) + # 更新阶段进度 stage = _get_stage_from_flow_name(flow.name) if scan_id and stage: @@ -86,7 +113,6 @@ def on_scan_flow_completed(flow: Flow, flow_run: FlowRun, state: State) -> None: from apps.scan.services import ScanService service = ScanService() # 从 flow result 中提取 detail(如果有) - result = state.result() if state.result else None detail = None if isinstance(result, dict): detail = result.get('detail') @@ -109,6 +135,7 @@ def on_scan_flow_failed(flow: Flow, flow_run: FlowRun, state: State) -> None: 职责: - 更新阶段进度为 failed - 发送扫描失败通知 + - 记录性能指标(含错误信息) Args: flow: Prefect Flow 对象 @@ -125,6 +152,11 @@ def on_scan_flow_failed(flow: Flow, flow_run: FlowRun, state: State) -> None: # 提取错误信息 error_message = str(state.message) if state.message else "未知错误" + # 记录性能指标(失败情况) + tracker = _flow_trackers.pop(str(flow_run.id), None) + if tracker: + tracker.finish(success=False, error_message=error_message) + # 更新阶段进度 stage = _get_stage_from_flow_name(flow.name) if scan_id and stage: diff --git a/backend/apps/scan/utils/__init__.py b/backend/apps/scan/utils/__init__.py index 194a2e3b..e3daaae9 100644 --- a/backend/apps/scan/utils/__init__.py +++ b/backend/apps/scan/utils/__init__.py @@ -9,6 +9,7 @@ from .command_builder import build_scan_command from .command_executor import execute_and_wait, execute_stream from .wordlist_helpers import ensure_wordlist_local from .nuclei_helpers import ensure_nuclei_templates_local +from .performance import FlowPerformanceTracker, CommandPerformanceTracker from . import config_parser __all__ = [ @@ -23,6 +24,9 @@ __all__ = [ 'ensure_wordlist_local', # 确保本地字典文件(含 hash 校验) # Nuclei 模板 'ensure_nuclei_templates_local', # 确保本地模板(含 commit hash 校验) + # 性能监控 + 'FlowPerformanceTracker', # Flow 性能追踪器(含系统资源采样) + 'CommandPerformanceTracker', # 命令性能追踪器 # 配置解析 'config_parser', ] diff --git a/backend/apps/scan/utils/command_executor.py b/backend/apps/scan/utils/command_executor.py index dd391fa3..5904e53c 100644 --- a/backend/apps/scan/utils/command_executor.py +++ b/backend/apps/scan/utils/command_executor.py @@ -4,6 +4,10 @@ 统一管理所有命令执行方式: - execute_and_wait(): 等待式执行,适合输出到文件的工具 - execute_stream(): 流式执行,适合实时处理输出的工具 + +性能监控: +- 自动记录命令执行耗时、内存使用 +- 输出到 performance logger """ import logging @@ -26,6 +30,12 @@ except ImportError: # 运行环境缺少 psutil 时降级为无动态负载控 logger = logging.getLogger(__name__) +# 延迟导入,避免循环依赖 +def _get_command_tracker(tool_name: str, command: str): + """获取命令性能追踪器(延迟导入)""" + from apps.scan.utils.performance import CommandPerformanceTracker + return CommandPerformanceTracker(tool_name, command) + # 常量定义 GRACEFUL_SHUTDOWN_TIMEOUT = 5 # 进程优雅退出的超时时间(秒) MAX_LOG_TAIL_LINES = 1000 # 日志文件读取的最大行数 @@ -193,6 +203,10 @@ class CommandExecutor: # 记录开始时间(用于计算执行时间) start_time = datetime.now() + # 初始化性能追踪器 + perf_tracker = _get_command_tracker(tool_name, command) + perf_tracker.start() + process = None log_file_handle = None acquired_slot = False # 标记是否已增加全局活动命令计数 @@ -278,6 +292,9 @@ class CommandExecutor: else: logger.info("✓ 扫描工具 %s 执行完成 (执行时间: %.2f秒)", tool_name, duration) + # 记录性能日志 + perf_tracker.finish(success=success, duration=duration, timeout=timeout) + return { 'success': success, 'returncode': returncode, @@ -294,6 +311,9 @@ class CommandExecutor: if log_file_path and ENABLE_COMMAND_LOGGING: self._write_command_end_footer(log_file_path, tool_name, duration, -1, False) + # 记录性能日志(超时) + perf_tracker.finish(success=False, duration=duration, timeout=timeout, is_timeout=True) + error_msg = f"扫描工具 {tool_name} 执行超时({timeout}秒,实际执行: {duration:.2f}秒)" logger.error(error_msg) if log_file_path and log_file_path.exists(): @@ -377,6 +397,10 @@ class CommandExecutor: start_time = datetime.now() acquired_slot = False + # 初始化性能追踪器 + perf_tracker = _get_command_tracker(tool_name, cmd) + perf_tracker.start() + # 准备日志文件路径 log_file_path = Path(log_file) if log_file else None if log_file_path: @@ -554,13 +578,16 @@ class CommandExecutor: log_file_handle.close() # 5. 追加命令结束信息(如果开启且有日志文件) + duration = (datetime.now() - start_time).total_seconds() + success = not timed_out_event.is_set() and (exit_code == 0 if exit_code is not None else True) + if log_file_path and ENABLE_COMMAND_LOGGING: - duration = (datetime.now() - start_time).total_seconds() - success = not timed_out_event.is_set() and (exit_code == 0 if exit_code is not None else True) - # 追加结束信息到日志文件末尾 self._write_command_end_footer(log_file_path, tool_name, duration, exit_code or 0, success) + # 6. 记录性能日志 + perf_tracker.finish(success=success, duration=duration, timeout=timeout, is_timeout=timed_out_event.is_set()) + if acquired_slot: if _ACTIVE_COMMANDS_LOCK: with _ACTIVE_COMMANDS_LOCK: diff --git a/backend/apps/scan/utils/performance.py b/backend/apps/scan/utils/performance.py new file mode 100644 index 00000000..76d09c38 --- /dev/null +++ b/backend/apps/scan/utils/performance.py @@ -0,0 +1,317 @@ +""" +性能监控工具模块 + +提供 Flow 层的性能监控能力 + +功能: +1. Flow 性能监控 - 记录整体流程耗时、系统资源(CPU/内存) +2. 定时采样 - 每 N 秒记录一次系统资源状态 + +使用方式: + # Flow 层(在 handlers 中使用) + from apps.scan.utils.performance import FlowPerformanceTracker + tracker = FlowPerformanceTracker(flow_name, scan_id) + tracker.start() + # ... 执行流程 ... + tracker.finish(success=True, result=result) +""" + +import logging +import threading +import time +import os +from dataclasses import dataclass, field +from typing import Optional + +try: + import psutil +except ImportError: + psutil = None + +# 性能日志使用专门的 logger +perf_logger = logging.getLogger('performance') +logger = logging.getLogger(__name__) + +# 采样间隔(秒) +SAMPLE_INTERVAL = 30 + + +def _get_system_stats() -> dict: + """ + 获取当前系统资源状态 + + Returns: + dict: {'cpu_percent': float, 'memory_gb': float} + """ + if not psutil: + return {'cpu_percent': 0.0, 'memory_gb': 0.0} + + try: + cpu_percent = psutil.cpu_percent(interval=0.1) + memory = psutil.virtual_memory() + memory_gb = memory.used / (1024 ** 3) + return { + 'cpu_percent': cpu_percent, + 'memory_gb': memory_gb + } + except Exception: + return {'cpu_percent': 0.0, 'memory_gb': 0.0} + + +@dataclass +class FlowPerformanceMetrics: + """Flow 性能指标""" + flow_name: str + scan_id: int + target_id: Optional[int] = None + target_name: Optional[str] = None + + # 时间指标 + start_time: float = 0.0 + end_time: float = 0.0 + duration_seconds: float = 0.0 + + # 系统资源指标 + cpu_start: float = 0.0 + cpu_end: float = 0.0 + cpu_peak: float = 0.0 + memory_gb_start: float = 0.0 + memory_gb_end: float = 0.0 + memory_gb_peak: float = 0.0 + + # 执行结果 + success: bool = False + error_message: Optional[str] = None + + +class FlowPerformanceTracker: + """ + Flow 性能追踪器 + + 用于追踪 Prefect Flow 的执行性能,包括: + - 执行耗时 + - 系统 CPU 和内存使用 + - 定时采样(每 30 秒) + + 使用方式: + tracker = FlowPerformanceTracker("directory_scan", scan_id=1) + tracker.start(target_id=1, target_name="example.com") + # ... flow 执行 ... + tracker.finish(success=True, result={'created_count': 100}) + """ + + def __init__(self, flow_name: str, scan_id: int): + self.metrics = FlowPerformanceMetrics( + flow_name=flow_name, + scan_id=scan_id + ) + self._sampler_thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + self._samples: list[dict] = [] + + def start( + self, + target_id: Optional[int] = None, + target_name: Optional[str] = None + ) -> None: + """开始追踪""" + self.metrics.start_time = time.time() + self.metrics.target_id = target_id + self.metrics.target_name = target_name + + # 记录初始系统状态 + stats = _get_system_stats() + self.metrics.cpu_start = stats['cpu_percent'] + self.metrics.memory_gb_start = stats['memory_gb'] + self.metrics.cpu_peak = stats['cpu_percent'] + self.metrics.memory_gb_peak = stats['memory_gb'] + + # 记录开始日志 + perf_logger.info( + "📊 Flow 开始 - %s, scan_id=%d, 系统: CPU %.1f%%, 内存 %.1fGB", + self.metrics.flow_name, + self.metrics.scan_id, + stats['cpu_percent'], + stats['memory_gb'] + ) + + # 启动采样线程 + self._stop_event.clear() + self._sampler_thread = threading.Thread( + target=self._sample_loop, + daemon=True, + name=f"perf-sampler-{self.metrics.flow_name}-{self.metrics.scan_id}" + ) + self._sampler_thread.start() + + def _sample_loop(self) -> None: + """定时采样循环""" + elapsed = 0 + while not self._stop_event.wait(timeout=SAMPLE_INTERVAL): + elapsed += SAMPLE_INTERVAL + stats = _get_system_stats() + + # 更新峰值 + if stats['cpu_percent'] > self.metrics.cpu_peak: + self.metrics.cpu_peak = stats['cpu_percent'] + if stats['memory_gb'] > self.metrics.memory_gb_peak: + self.metrics.memory_gb_peak = stats['memory_gb'] + + # 记录采样 + self._samples.append({ + 'elapsed': elapsed, + 'cpu': stats['cpu_percent'], + 'memory_gb': stats['memory_gb'] + }) + + # 输出采样日志 + perf_logger.info( + "📊 Flow 执行中 - %s [%ds], 系统: CPU %.1f%%, 内存 %.1fGB", + self.metrics.flow_name, + elapsed, + stats['cpu_percent'], + stats['memory_gb'] + ) + + def finish( + self, + success: bool = True, + error_message: Optional[str] = None + ) -> None: + """ + 结束追踪并记录性能日志 + + Args: + success: 是否成功 + error_message: 错误信息 + """ + # 停止采样线程 + self._stop_event.set() + if self._sampler_thread and self._sampler_thread.is_alive(): + self._sampler_thread.join(timeout=1.0) + + # 记录结束时间和状态 + self.metrics.end_time = time.time() + self.metrics.duration_seconds = self.metrics.end_time - self.metrics.start_time + self.metrics.success = success + self.metrics.error_message = error_message + + # 记录结束时的系统状态 + stats = _get_system_stats() + self.metrics.cpu_end = stats['cpu_percent'] + self.metrics.memory_gb_end = stats['memory_gb'] + + # 更新峰值(最后一次采样) + if stats['cpu_percent'] > self.metrics.cpu_peak: + self.metrics.cpu_peak = stats['cpu_percent'] + if stats['memory_gb'] > self.metrics.memory_gb_peak: + self.metrics.memory_gb_peak = stats['memory_gb'] + + # 记录结束日志 + status = "✓" if success else "✗" + perf_logger.info( + "📊 Flow 结束 - %s %s, scan_id=%d, 耗时: %.1fs, " + "CPU: %.1f%%→%.1f%%(峰值%.1f%%), 内存: %.1fGB→%.1fGB(峰值%.1fGB)", + self.metrics.flow_name, + status, + self.metrics.scan_id, + self.metrics.duration_seconds, + self.metrics.cpu_start, + self.metrics.cpu_end, + self.metrics.cpu_peak, + self.metrics.memory_gb_start, + self.metrics.memory_gb_end, + self.metrics.memory_gb_peak + ) + + if not success and error_message: + perf_logger.warning( + "📊 Flow 失败原因 - %s: %s", + self.metrics.flow_name, + error_message + ) + + +class CommandPerformanceTracker: + """ + 命令执行性能追踪器 + + 用于追踪单个命令的执行性能,包括: + - 执行耗时 + - 系统 CPU 和内存使用(开始/结束) + + 使用方式: + tracker = CommandPerformanceTracker("ffuf", command="ffuf -u http://...") + tracker.start() + # ... 执行命令 ... + tracker.finish(success=True, duration=45.2) + """ + + def __init__(self, tool_name: str, command: str = ""): + self.tool_name = tool_name + self.command = command + self.start_time: float = 0.0 + self.cpu_start: float = 0.0 + self.memory_gb_start: float = 0.0 + + def start(self) -> None: + """开始追踪,记录初始系统状态""" + self.start_time = time.time() + stats = _get_system_stats() + self.cpu_start = stats['cpu_percent'] + self.memory_gb_start = stats['memory_gb'] + + # 截断过长的命令 + cmd_display = self.command[:200] + "..." if len(self.command) > 200 else self.command + + perf_logger.info( + "📊 命令开始 - %s, 系统: CPU %.1f%%, 内存 %.1fGB, 命令: %s", + self.tool_name, + self.cpu_start, + self.memory_gb_start, + cmd_display + ) + + def finish( + self, + success: bool = True, + duration: Optional[float] = None, + timeout: Optional[int] = None, + is_timeout: bool = False + ) -> None: + """ + 结束追踪并记录性能日志 + + Args: + success: 是否成功 + duration: 执行耗时(秒),如果不传则自动计算 + timeout: 超时配置(秒) + is_timeout: 是否超时 + """ + # 计算耗时 + if duration is None: + duration = time.time() - self.start_time + + # 获取结束时的系统状态 + stats = _get_system_stats() + cpu_end = stats['cpu_percent'] + memory_gb_end = stats['memory_gb'] + + status = "✓" if success else ("⏱ 超时" if is_timeout else "✗") + + # 截断过长的命令 + cmd_display = self.command[:200] + "..." if len(self.command) > 200 else self.command + + perf_logger.info( + "📊 命令结束 - %s %s, 耗时: %.2fs%s, " + "CPU: %.1f%%→%.1f%%, 内存: %.1fGB→%.1fGB, 命令: %s", + self.tool_name, + status, + duration, + f", 超时配置: {timeout}s" if timeout else "", + self.cpu_start, + cpu_end, + self.memory_gb_start, + memory_gb_end, + cmd_display + ) diff --git a/backend/config/logging_config.py b/backend/config/logging_config.py index 0af0fb43..a9a0c913 100644 --- a/backend/config/logging_config.py +++ b/backend/config/logging_config.py @@ -94,21 +94,21 @@ def get_logging_config(debug: bool = False): 'level': 'ERROR', # 只记录 ERROR 及以上级别 } - # JSON 结构化日志(便于日志分析和监控) - log_handlers.append('json_file') - logging_handlers['json_file'] = { - 'class': 'logging.handlers.RotatingFileHandler', - 'formatter': 'json', - 'filename': str(log_path / 'xingrin_json.log'), - 'maxBytes': 100 * 1024 * 1024, # 100MB - 'backupCount': 5, - 'encoding': 'utf-8', - } + # JSON 结构化日志(暂时关闭,需要时再开启) + # log_handlers.append('json_file') + # logging_handlers['json_file'] = { + # 'class': 'logging.handlers.RotatingFileHandler', + # 'formatter': 'json', + # 'filename': str(log_path / 'xingrin_json.log'), + # 'maxBytes': 100 * 1024 * 1024, # 100MB + # 'backupCount': 5, + # 'encoding': 'utf-8', + # } - # 性能指标日志(专门记录性能相关信息) + # 性能指标日志(可读格式,便于人工查看) logging_handlers['performance_file'] = { 'class': 'logging.handlers.RotatingFileHandler', - 'formatter': 'json', + 'formatter': 'standard', # 使用可读格式,不用 JSON 'filename': str(log_path / 'performance.log'), 'maxBytes': 100 * 1024 * 1024, # 100MB 'backupCount': 5,