From cd83f52f35fa98b88e88305eb53886f0c549f1bc Mon Sep 17 00:00:00 2001 From: yyhuni Date: Sat, 27 Dec 2025 11:39:26 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E6=8C=87=E7=BA=B9=E8=AF=86?= =?UTF-8?q?=E5=88=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../scan/flows/fingerprint_detect_flow.py | 403 ++++++++++++++++++ .../fingerprint_detect/export_urls_task.py | 137 ++++++ .../fingerprint_detect/run_xingfinger_task.py | 300 +++++++++++++ .../apps/scan/utils/fingerprint_helpers.py | 52 ++- docs/scan-flow-architecture.md | 14 +- 5 files changed, 900 insertions(+), 6 deletions(-) create mode 100644 backend/apps/scan/flows/fingerprint_detect_flow.py create mode 100644 backend/apps/scan/tasks/fingerprint_detect/export_urls_task.py create mode 100644 backend/apps/scan/tasks/fingerprint_detect/run_xingfinger_task.py diff --git a/backend/apps/scan/flows/fingerprint_detect_flow.py b/backend/apps/scan/flows/fingerprint_detect_flow.py new file mode 100644 index 00000000..a4d39114 --- /dev/null +++ b/backend/apps/scan/flows/fingerprint_detect_flow.py @@ -0,0 +1,403 @@ +""" +指纹识别 Flow + +负责编排指纹识别的完整流程 + +架构: +- Flow 负责编排多个原子 Task +- 在 site_scan 后串行执行 +- 使用 xingfinger 工具识别技术栈 +- 流式处理输出,批量更新数据库 +""" + +# Django 环境初始化(导入即生效) +from apps.common.prefect_django_setup import setup_django_for_prefect + +import logging +import os +from datetime import datetime +from pathlib import Path + +from prefect import flow + +from apps.scan.handlers.scan_flow_handlers import ( + on_scan_flow_running, + on_scan_flow_completed, + on_scan_flow_failed, +) +from apps.scan.tasks.fingerprint_detect import ( + export_urls_for_fingerprint_task, + run_xingfinger_and_stream_update_tech_task, +) +from apps.scan.utils import build_scan_command +from apps.scan.utils.fingerprint_helpers import get_fingerprint_paths + +logger = logging.getLogger(__name__) + + +def calculate_fingerprint_detect_timeout( + url_count: int, + base_per_url: float = 3.0, + min_timeout: int = 60 +) -> int: + """ + 根据 URL 数量计算超时时间 + + 公式:超时时间 = URL 数量 × 每 URL 基础时间 + 最小值:60秒 + 无上限 + + Args: + url_count: URL 数量 + base_per_url: 每 URL 基础时间(秒),默认 3秒 + min_timeout: 最小超时时间(秒),默认 60秒 + + Returns: + int: 计算出的超时时间(秒) + + 示例: + 100 URL × 3秒 = 300秒 + 1000 URL × 3秒 = 3000秒(50分钟) + 10000 URL × 3秒 = 30000秒(8.3小时) + """ + timeout = int(url_count * base_per_url) + return max(min_timeout, timeout) + + +def _setup_fingerprint_detect_directory(scan_workspace_dir: str) -> Path: + """ + 创建并验证指纹识别工作目录 + + Args: + scan_workspace_dir: 扫描工作空间目录 + + Returns: + Path: 指纹识别目录路径 + + Raises: + RuntimeError: 目录创建或验证失败 + """ + fingerprint_dir = Path(scan_workspace_dir) / 'fingerprint_detect' + fingerprint_dir.mkdir(parents=True, exist_ok=True) + + if not fingerprint_dir.is_dir(): + raise RuntimeError(f"指纹识别目录创建失败: {fingerprint_dir}") + if not os.access(fingerprint_dir, os.W_OK): + raise RuntimeError(f"指纹识别目录不可写: {fingerprint_dir}") + + return fingerprint_dir + + +def _export_urls( + target_id: int, + fingerprint_dir: Path, + target_name: str = None, + source: str = 'website' +) -> tuple[str, int]: + """ + 导出 URL 到文件 + + Args: + target_id: 目标 ID + fingerprint_dir: 指纹识别目录 + target_name: 目标名称(用于懒加载) + source: 数据源类型 + + Returns: + tuple: (urls_file, total_count) + """ + logger.info("Step 1: 导出 URL 列表 (source=%s)", source) + + urls_file = str(fingerprint_dir / 'urls.txt') + export_result = export_urls_for_fingerprint_task( + target_id=target_id, + output_file=urls_file, + target_name=target_name, + source=source, + batch_size=1000 + ) + + total_count = export_result['total_count'] + + logger.info( + "✓ URL 导出完成 - 文件: %s, 数量: %d", + export_result['output_file'], + total_count + ) + + return export_result['output_file'], total_count + + +def _run_fingerprint_detect( + enabled_tools: dict, + urls_file: str, + url_count: int, + fingerprint_dir: Path, + scan_id: int, + target_id: int, + source: str +) -> tuple[dict, list]: + """ + 执行指纹识别任务 + + Args: + enabled_tools: 已启用的工具配置字典 + urls_file: URL 文件路径 + url_count: URL 总数 + fingerprint_dir: 指纹识别目录 + scan_id: 扫描任务 ID + target_id: 目标 ID + source: 数据源类型 + + Returns: + tuple: (tool_stats, failed_tools) + """ + tool_stats = {} + failed_tools = [] + + for tool_name, tool_config in enabled_tools.items(): + # 1. 获取指纹库路径 + lib_names = tool_config.get('fingerprint_libs', ['ehole']) + fingerprint_paths = get_fingerprint_paths(lib_names) + + if not fingerprint_paths: + reason = f"没有可用的指纹库: {lib_names}" + logger.warning(reason) + failed_tools.append({'tool': tool_name, 'reason': reason}) + continue + + # 2. 将指纹库路径合并到 tool_config(用于命令构建) + tool_config_with_paths = {**tool_config, **fingerprint_paths} + + # 3. 构建命令 + try: + command = build_scan_command( + tool_name=tool_name, + scan_type='fingerprint_detect', + command_params={ + 'urls_file': urls_file + }, + tool_config=tool_config_with_paths + ) + except Exception as e: + reason = f"命令构建失败: {str(e)}" + logger.error("构建 %s 命令失败: %s", tool_name, e) + failed_tools.append({'tool': tool_name, 'reason': reason}) + continue + + # 4. 计算超时时间 + timeout = calculate_fingerprint_detect_timeout(url_count) + + # 5. 生成日志文件路径 + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + log_file = fingerprint_dir / f"{tool_name}_{timestamp}.log" + + logger.info( + "开始执行 %s 指纹识别 - URL数: %d, 超时: %ds, 指纹库: %s", + tool_name, url_count, timeout, list(fingerprint_paths.keys()) + ) + + # 6. 执行扫描任务 + try: + result = run_xingfinger_and_stream_update_tech_task( + cmd=command, + tool_name=tool_name, + scan_id=scan_id, + target_id=target_id, + source=source, + cwd=str(fingerprint_dir), + timeout=timeout, + log_file=str(log_file), + batch_size=100 + ) + + tool_stats[tool_name] = { + 'command': command, + 'result': result, + 'timeout': timeout, + 'fingerprint_libs': list(fingerprint_paths.keys()) + } + + logger.info( + "✓ 工具 %s 执行完成 - 处理记录: %d, 更新: %d, 未找到: %d", + tool_name, + result.get('processed_records', 0), + result.get('updated_count', 0), + result.get('not_found_count', 0) + ) + + except Exception as exc: + failed_tools.append({'tool': tool_name, 'reason': str(exc)}) + logger.error("工具 %s 执行失败: %s", tool_name, exc, exc_info=True) + + if failed_tools: + logger.warning( + "以下指纹识别工具执行失败: %s", + ', '.join([f['tool'] for f in failed_tools]) + ) + + return tool_stats, failed_tools + + +@flow( + name="fingerprint_detect", + log_prints=True, + on_running=[on_scan_flow_running], + on_completion=[on_scan_flow_completed], + on_failure=[on_scan_flow_failed], +) +def fingerprint_detect_flow( + scan_id: int, + target_name: str, + target_id: int, + scan_workspace_dir: str, + enabled_tools: dict +) -> dict: + """ + 指纹识别 Flow + + 主要功能: + 1. 从数据库导出目标下所有 WebSite URL 到文件 + 2. 使用 xingfinger 进行技术栈识别 + 3. 解析结果并更新 WebSite.tech 字段(合并去重) + + 工作流程: + Step 0: 创建工作目录 + Step 1: 导出 URL 列表 + Step 2: 解析配置,获取启用的工具 + Step 3: 执行 xingfinger 并解析结果 + + Args: + scan_id: 扫描任务 ID + target_name: 目标名称 + target_id: 目标 ID + scan_workspace_dir: 扫描工作空间目录 + enabled_tools: 启用的工具配置(xingfinger) + + Returns: + dict: { + 'success': bool, + 'scan_id': int, + 'target': str, + 'scan_workspace_dir': str, + 'urls_file': str, + 'url_count': int, + 'processed_records': int, + 'updated_count': int, + 'not_found_count': int, + 'executed_tasks': list, + 'tool_stats': dict + } + """ + try: + logger.info( + "="*60 + "\n" + + "开始指纹识别\n" + + f" Scan ID: {scan_id}\n" + + f" Target: {target_name}\n" + + f" Workspace: {scan_workspace_dir}\n" + + "="*60 + ) + + # 参数验证 + if scan_id is None: + raise ValueError("scan_id 不能为空") + if not target_name: + raise ValueError("target_name 不能为空") + if target_id is None: + raise ValueError("target_id 不能为空") + if not scan_workspace_dir: + raise ValueError("scan_workspace_dir 不能为空") + + # 数据源类型(当前只支持 website) + source = 'website' + + # Step 0: 创建工作目录 + fingerprint_dir = _setup_fingerprint_detect_directory(scan_workspace_dir) + + # Step 1: 导出 URL(支持懒加载) + urls_file, url_count = _export_urls(target_id, fingerprint_dir, target_name, source) + + if url_count == 0: + logger.warning("目标下没有可用的 URL,跳过指纹识别") + return { + 'success': True, + 'scan_id': scan_id, + 'target': target_name, + 'scan_workspace_dir': scan_workspace_dir, + 'urls_file': urls_file, + 'url_count': 0, + 'processed_records': 0, + 'updated_count': 0, + 'created_count': 0, + 'executed_tasks': ['export_urls_for_fingerprint'], + 'tool_stats': { + 'total': 0, + 'successful': 0, + 'failed': 0, + 'successful_tools': [], + 'failed_tools': [], + 'details': {} + } + } + + # Step 2: 工具配置信息 + logger.info("Step 2: 工具配置信息") + logger.info("✓ 启用工具: %s", ', '.join(enabled_tools.keys())) + + # Step 3: 执行指纹识别 + logger.info("Step 3: 执行指纹识别") + tool_stats, failed_tools = _run_fingerprint_detect( + enabled_tools=enabled_tools, + urls_file=urls_file, + url_count=url_count, + fingerprint_dir=fingerprint_dir, + scan_id=scan_id, + target_id=target_id, + source=source + ) + + logger.info("="*60 + "\n✓ 指纹识别完成\n" + "="*60) + + # 动态生成已执行的任务列表 + executed_tasks = ['export_urls_for_fingerprint'] + executed_tasks.extend([f'run_xingfinger ({tool})' for tool in tool_stats.keys()]) + + # 汇总所有工具的结果 + total_processed = sum(stats['result'].get('processed_records', 0) for stats in tool_stats.values()) + total_updated = sum(stats['result'].get('updated_count', 0) for stats in tool_stats.values()) + total_created = sum(stats['result'].get('created_count', 0) for stats in tool_stats.values()) + + successful_tools = [name for name in enabled_tools.keys() + if name not in [f['tool'] for f in failed_tools]] + + return { + 'success': True, + 'scan_id': scan_id, + 'target': target_name, + 'scan_workspace_dir': scan_workspace_dir, + 'urls_file': urls_file, + 'url_count': url_count, + 'processed_records': total_processed, + 'updated_count': total_updated, + 'created_count': total_created, + 'executed_tasks': executed_tasks, + 'tool_stats': { + 'total': len(enabled_tools), + 'successful': len(successful_tools), + 'failed': len(failed_tools), + 'successful_tools': successful_tools, + 'failed_tools': failed_tools, + 'details': tool_stats + } + } + + except ValueError as e: + logger.error("配置错误: %s", e) + raise + except RuntimeError as e: + logger.error("运行时错误: %s", e) + raise + except Exception as e: + logger.exception("指纹识别失败: %s", e) + raise diff --git a/backend/apps/scan/tasks/fingerprint_detect/export_urls_task.py b/backend/apps/scan/tasks/fingerprint_detect/export_urls_task.py new file mode 100644 index 00000000..4e2fe7ea --- /dev/null +++ b/backend/apps/scan/tasks/fingerprint_detect/export_urls_task.py @@ -0,0 +1,137 @@ +""" +导出 URL 任务 + +用于指纹识别前导出目标下的 URL 到文件 +支持懒加载模式:如果数据库为空,根据 Target 类型生成默认 URL +""" + +import ipaddress +import importlib +import logging +from pathlib import Path + +from prefect import task + +logger = logging.getLogger(__name__) + + +# 数据源映射:source → (module_path, model_name, url_field) +SOURCE_MODEL_MAP = { + 'website': ('apps.asset.models', 'WebSite', 'url'), + # 以后扩展: + # 'endpoint': ('apps.asset.models', 'Endpoint', 'url'), + # 'directory': ('apps.asset.models', 'Directory', 'url'), +} + + +def _get_model_class(source: str): + """ + 根据数据源类型获取 Model 类 + """ + if source not in SOURCE_MODEL_MAP: + raise ValueError(f"不支持的数据源: {source},支持的类型: {list(SOURCE_MODEL_MAP.keys())}") + + module_path, model_name, _ = SOURCE_MODEL_MAP[source] + module = importlib.import_module(module_path) + return getattr(module, model_name) + + +@task(name="export_urls_for_fingerprint") +def export_urls_for_fingerprint_task( + target_id: int, + output_file: str, + target_name: str = None, + source: str = 'website', + batch_size: int = 1000 +) -> dict: + """ + 导出目标下的 URL 到文件(用于指纹识别) + + 支持多种数据源,预留扩展: + - website: WebSite 表(当前实现) + - endpoint: Endpoint 表(以后扩展) + - directory: Directory 表(以后扩展) + + 懒加载模式: + - 如果数据库为空,根据 Target 类型生成默认 URL + - DOMAIN: http(s)://domain + - IP: http(s)://ip + - CIDR: 展开为所有 IP 的 URL + - URL: 直接使用目标 URL + + Args: + target_id: 目标 ID + output_file: 输出文件路径 + target_name: 目标名称(用于懒加载) + source: 数据源类型 + batch_size: 批量读取大小 + + Returns: + dict: {'output_file': str, 'total_count': int, 'source': str} + """ + from apps.targets.services import TargetService + from apps.targets.models import Target + + logger.info("开始导出 URL - target_id=%s, source=%s, output=%s", target_id, source, output_file) + + Model = _get_model_class(source) + _, _, url_field = SOURCE_MODEL_MAP[source] + + output_path = Path(output_file) + + # 分批导出 + total_count = 0 + with open(output_path, 'w', encoding='utf-8') as f: + queryset = Model.objects.filter(target_id=target_id).values_list(url_field, flat=True) + for url in queryset.iterator(chunk_size=batch_size): + if url: + f.write(url + '\n') + total_count += 1 + + # ==================== 懒加载模式:根据 Target 类型生成默认 URL ==================== + if total_count == 0: + target_service = TargetService() + target = target_service.get_target(target_id) + + if target: + target_name = target.name + target_type = target.type + + logger.info("懒加载模式:Target 类型=%s, 名称=%s", target_type, target_name) + + with open(output_path, 'w', encoding='utf-8') as f: + if target_type == Target.TargetType.DOMAIN: + f.write(f"http://{target_name}\n") + f.write(f"https://{target_name}\n") + total_count = 2 + + elif target_type == Target.TargetType.IP: + f.write(f"http://{target_name}\n") + f.write(f"https://{target_name}\n") + total_count = 2 + + elif target_type == Target.TargetType.CIDR: + try: + network = ipaddress.ip_network(target_name, strict=False) + for ip in network.hosts(): + f.write(f"http://{ip}\n") + f.write(f"https://{ip}\n") + total_count += 2 + except ValueError as e: + logger.warning("CIDR 解析失败: %s", e) + + elif target_type == Target.TargetType.URL: + f.write(f"{target_name}\n") + total_count = 1 + + logger.info("✓ 懒加载生成默认 URL - 数量: %d", total_count) + else: + logger.warning("Target ID %d 不存在,无法生成默认 URL", target_id) + + logger.info("✓ URL 导出完成 - 数量: %d, 文件: %s", total_count, output_file) + + return { + 'output_file': output_file, + 'total_count': total_count, + 'source': source + } diff --git a/backend/apps/scan/tasks/fingerprint_detect/run_xingfinger_task.py b/backend/apps/scan/tasks/fingerprint_detect/run_xingfinger_task.py new file mode 100644 index 00000000..d24f6d36 --- /dev/null +++ b/backend/apps/scan/tasks/fingerprint_detect/run_xingfinger_task.py @@ -0,0 +1,300 @@ +""" +xingfinger 执行任务 + +流式执行 xingfinger 命令并实时更新 tech 字段 +""" + +import importlib +import json +import logging +import subprocess +from typing import Optional, Generator +from urllib.parse import urlparse + +from django.db import connection +from prefect import task + +from apps.scan.utils import execute_stream + +logger = logging.getLogger(__name__) + + +# 数据源映射:source → (module_path, model_name, url_field) +SOURCE_MODEL_MAP = { + 'website': ('apps.asset.models', 'WebSite', 'url'), + # 以后扩展: + # 'endpoint': ('apps.asset.models', 'Endpoint', 'url'), + # 'directory': ('apps.asset.models', 'Directory', 'url'), +} + + +def _get_model_class(source: str): + """根据数据源类型获取 Model 类""" + if source not in SOURCE_MODEL_MAP: + raise ValueError(f"不支持的数据源: {source}") + + module_path, model_name, _ = SOURCE_MODEL_MAP[source] + module = importlib.import_module(module_path) + return getattr(module, model_name) + + +def parse_xingfinger_line(line: str) -> tuple[str, list[str]] | None: + """ + 解析 xingfinger 单行 JSON 输出 + + xingfinger 静默模式输出格式: + {"url": "https://example.com", "cms": "WordPress,PHP,nginx", ...} + + Returns: + tuple: (url, tech_list) 或 None(解析失败时) + """ + try: + item = json.loads(line) + url = item.get('url', '').strip() + cms = item.get('cms', '') + + if not url or not cms: + return None + + # cms 字段按逗号分割,去除空白 + techs = [t.strip() for t in cms.split(',') if t.strip()] + + return (url, techs) if techs else None + + except json.JSONDecodeError: + return None + + +def bulk_merge_tech_field( + source: str, + url_techs_map: dict[str, list[str]], + target_id: int +) -> dict: + """ + 批量合并 tech 数组字段(PostgreSQL 原生 SQL) + + 使用 PostgreSQL 原生 SQL 实现高效的数组合并去重操作。 + 如果 URL 对应的记录不存在,会自动创建新记录。 + + Returns: + dict: {'updated_count': int, 'created_count': int} + """ + Model = _get_model_class(source) + table_name = Model._meta.db_table + + updated_count = 0 + created_count = 0 + + with connection.cursor() as cursor: + for url, techs in url_techs_map.items(): + if not techs: + continue + + # 先尝试更新(PostgreSQL 数组合并去重) + sql = f""" + UPDATE {table_name} + SET tech = ( + SELECT ARRAY(SELECT DISTINCT unnest( + COALESCE(tech, ARRAY[]::varchar[]) || %s::varchar[] + )) + ) + WHERE url = %s AND target_id = %s + """ + + cursor.execute(sql, [techs, url, target_id]) + + if cursor.rowcount > 0: + updated_count += cursor.rowcount + else: + # 记录不存在,创建新记录 + try: + # 从 URL 提取 host + parsed = urlparse(url) + host = parsed.hostname or '' + + # 插入新记录(带冲突处理) + insert_sql = f""" + INSERT INTO {table_name} (target_id, url, host, tech, created_at) + VALUES (%s, %s, %s, %s::varchar[], NOW()) + ON CONFLICT (target_id, url) DO UPDATE SET + tech = ( + SELECT ARRAY(SELECT DISTINCT unnest( + COALESCE({table_name}.tech, ARRAY[]::varchar[]) || EXCLUDED.tech + )) + ) + """ + cursor.execute(insert_sql, [target_id, url, host, techs]) + created_count += 1 + + except Exception as e: + logger.warning("创建 %s 记录失败 (url=%s): %s", source, url, e) + + return { + 'updated_count': updated_count, + 'created_count': created_count + } + + +def _parse_xingfinger_stream_output( + cmd: str, + tool_name: str, + cwd: Optional[str] = None, + timeout: Optional[int] = None, + log_file: Optional[str] = None +) -> Generator[tuple[str, list[str]], None, None]: + """ + 流式解析 xingfinger 命令输出 + + 基于 execute_stream 实时处理 xingfinger 命令的 stdout,将每行 JSON 输出 + 转换为 (url, tech_list) 格式 + """ + logger.info("开始流式解析 xingfinger 命令输出 - 命令: %s", cmd) + + total_lines = 0 + valid_records = 0 + + try: + for line in execute_stream(cmd=cmd, tool_name=tool_name, cwd=cwd, shell=True, timeout=timeout, log_file=log_file): + total_lines += 1 + + # 解析单行 JSON + result = parse_xingfinger_line(line) + if result is None: + continue + + valid_records += 1 + yield result + + # 每处理 500 条记录输出一次进度 + if valid_records % 500 == 0: + logger.info("已解析 %d 条有效记录...", valid_records) + + except subprocess.TimeoutExpired as e: + error_msg = f"xingfinger 命令执行超时 - 超过 {timeout} 秒" + logger.warning(error_msg) + raise RuntimeError(error_msg) from e + except Exception as e: + logger.error("流式解析 xingfinger 输出失败: %s", e, exc_info=True) + raise + + logger.info("流式解析完成 - 总行数: %d, 有效记录: %d", total_lines, valid_records) + + +@task(name="run_xingfinger_and_stream_update_tech") +def run_xingfinger_and_stream_update_tech_task( + cmd: str, + tool_name: str, + scan_id: int, + target_id: int, + source: str, + cwd: str, + timeout: int, + log_file: str, + batch_size: int = 100 +) -> dict: + """ + 流式执行 xingfinger 命令并实时更新 tech 字段 + + 根据 source 参数更新对应表的 tech 字段: + - website → WebSite.tech + - endpoint → Endpoint.tech(以后扩展) + + 处理流程: + 1. 流式执行 xingfinger 命令 + 2. 实时解析 JSON 输出 + 3. 累积到 batch_size 条后批量更新数据库 + 4. 使用 PostgreSQL 原生 SQL 进行数组合并去重 + 5. 如果记录不存在,自动创建 + + Returns: + dict: { + 'processed_records': int, + 'updated_count': int, + 'created_count': int, + 'batch_count': int + } + """ + logger.info( + "开始执行 xingfinger 并更新 tech - target_id=%s, source=%s, timeout=%s秒", + target_id, source, timeout + ) + + data_generator = None + + try: + # 初始化统计 + processed_records = 0 + updated_count = 0 + created_count = 0 + batch_count = 0 + + # 当前批次的 URL -> techs 映射 + url_techs_map = {} + + # 流式处理 + data_generator = _parse_xingfinger_stream_output( + cmd=cmd, + tool_name=tool_name, + cwd=cwd, + timeout=timeout, + log_file=log_file + ) + + for url, techs in data_generator: + processed_records += 1 + + # 累积到 url_techs_map + if url in url_techs_map: + # 合并同一 URL 的多次识别结果 + url_techs_map[url].extend(techs) + else: + url_techs_map[url] = techs + + # 达到批次大小,执行批量更新 + if len(url_techs_map) >= batch_size: + batch_count += 1 + result = bulk_merge_tech_field(source, url_techs_map, target_id) + updated_count += result['updated_count'] + created_count += result.get('created_count', 0) + + logger.debug( + "批次 %d 完成 - 更新: %d, 创建: %d", + batch_count, result['updated_count'], result.get('created_count', 0) + ) + + # 清空批次 + url_techs_map = {} + + # 处理最后一批 + if url_techs_map: + batch_count += 1 + result = bulk_merge_tech_field(source, url_techs_map, target_id) + updated_count += result['updated_count'] + created_count += result.get('created_count', 0) + + logger.info( + "✓ xingfinger 执行完成 - 处理记录: %d, 更新: %d, 创建: %d, 批次: %d", + processed_records, updated_count, created_count, batch_count + ) + + return { + 'processed_records': processed_records, + 'updated_count': updated_count, + 'created_count': created_count, + 'batch_count': batch_count + } + + except subprocess.TimeoutExpired: + logger.warning("⚠️ xingfinger 执行超时 - target_id=%s, timeout=%s秒", target_id, timeout) + raise + except Exception as e: + error_msg = f"xingfinger 执行失败: {e}" + logger.error(error_msg, exc_info=True) + raise RuntimeError(error_msg) from e + finally: + # 清理资源 + if data_generator is not None: + try: + data_generator.close() + except Exception as e: + logger.debug("关闭生成器时出错: %s", e) diff --git a/backend/apps/scan/utils/fingerprint_helpers.py b/backend/apps/scan/utils/fingerprint_helpers.py index 686f652e..16873e34 100644 --- a/backend/apps/scan/utils/fingerprint_helpers.py +++ b/backend/apps/scan/utils/fingerprint_helpers.py @@ -1,7 +1,7 @@ """指纹文件本地缓存工具 提供 Worker 侧的指纹文件缓存和版本校验功能,用于: -- 指纹识别扫描 (fingerprint_scan_flow) +- 指纹识别扫描 (fingerprint_detect_flow) """ import logging @@ -12,6 +12,18 @@ from django.conf import settings logger = logging.getLogger(__name__) +# 指纹库映射:lib_name → ensure_func_name +# 以后扩展其他指纹库时,在此添加映射 +FINGERPRINT_LIB_MAP = { + 'ehole': 'ensure_ehole_fingerprint_local', + # 以后扩展: + # 'goby': 'ensure_goby_fingerprint_local', + # 'wappalyzer': 'ensure_wappalyzer_fingerprint_local', + # 'fingers': 'ensure_fingers_fingerprint_local', + # 'fingerprinthub': 'ensure_fingerprinthub_fingerprint_local', +} + + def ensure_ehole_fingerprint_local() -> str: """ 确保本地存在最新的 EHole 指纹文件(带缓存) @@ -70,4 +82,40 @@ def ensure_ehole_fingerprint_local() -> str: return cache_file -__all__ = ["ensure_ehole_fingerprint_local"] +def get_fingerprint_paths(lib_names: list) -> dict: + """ + 获取多个指纹库的本地路径 + + Args: + lib_names: 指纹库名称列表,如 ['ehole', 'goby'] + + Returns: + dict: {lib_name: local_path},如 {'ehole': '/opt/xingrin/fingerprints/ehole.json'} + + 示例: + paths = get_fingerprint_paths(['ehole']) + # {'ehole': '/opt/xingrin/fingerprints/ehole.json'} + """ + paths = {} + for lib_name in lib_names: + if lib_name not in FINGERPRINT_LIB_MAP: + logger.warning("不支持的指纹库: %s,跳过", lib_name) + continue + + ensure_func_name = FINGERPRINT_LIB_MAP[lib_name] + # 获取当前模块中的函数 + ensure_func = globals().get(ensure_func_name) + if ensure_func is None: + logger.warning("指纹库 %s 的导出函数 %s 未实现,跳过", lib_name, ensure_func_name) + continue + + try: + paths[lib_name] = ensure_func() + except Exception as e: + logger.error("获取指纹库 %s 路径失败: %s", lib_name, e) + continue + + return paths + + +__all__ = ["ensure_ehole_fingerprint_local", "get_fingerprint_paths", "FINGERPRINT_LIB_MAP"] diff --git a/docs/scan-flow-architecture.md b/docs/scan-flow-architecture.md index f51f9214..becdcfcc 100644 --- a/docs/scan-flow-architecture.md +++ b/docs/scan-flow-architecture.md @@ -40,8 +40,13 @@ flowchart TB HTTPX1[httpx
Web Service Detection] end + subgraph FINGER["Fingerprint Detect"] + XINGFINGER[xingfinger
Tech Stack Detection] + end + RESOLVE --> NAABU NAABU --> HTTPX1 + HTTPX1 --> XINGFINGER end TARGET --> SUBFINDER @@ -69,9 +74,9 @@ flowchart TB end end - HTTPX1 --> WAYMORE - HTTPX1 --> KATANA - HTTPX1 --> FFUF + XINGFINGER --> WAYMORE + XINGFINGER --> KATANA + XINGFINGER --> FFUF subgraph STAGE3["Stage 3: Vulnerability Sequential"] direction TB @@ -105,7 +110,7 @@ flowchart TB ```python # backend/apps/scan/configs/command_templates.py EXECUTION_STAGES = [ - {'mode': 'sequential', 'flows': ['subdomain_discovery', 'port_scan', 'site_scan']}, + {'mode': 'sequential', 'flows': ['subdomain_discovery', 'port_scan', 'site_scan', 'fingerprint_detect']}, {'mode': 'parallel', 'flows': ['url_fetch', 'directory_scan']}, {'mode': 'sequential', 'flows': ['vuln_scan']}, ] @@ -118,6 +123,7 @@ EXECUTION_STAGES = [ | subdomain_discovery | subfinder, amass, sublist3r, assetfinder, puredns | Subdomain | | port_scan | naabu | HostPortMapping | | site_scan | httpx | WebSite | +| fingerprint_detect | xingfinger | WebSite.tech(更新) | | url_fetch | waymore, katana, uro, httpx | Endpoint | | directory_scan | ffuf | Directory | | vuln_scan | dalfox, nuclei | Vulnerability |