diff --git a/backend/apps/scan/flows/fingerprint_detect_flow.py b/backend/apps/scan/flows/fingerprint_detect_flow.py
new file mode 100644
index 00000000..a4d39114
--- /dev/null
+++ b/backend/apps/scan/flows/fingerprint_detect_flow.py
@@ -0,0 +1,403 @@
+"""
+指纹识别 Flow
+
+负责编排指纹识别的完整流程
+
+架构:
+- Flow 负责编排多个原子 Task
+- 在 site_scan 后串行执行
+- 使用 xingfinger 工具识别技术栈
+- 流式处理输出,批量更新数据库
+"""
+
+# Django 环境初始化(导入即生效)
+from apps.common.prefect_django_setup import setup_django_for_prefect
+
+import logging
+import os
+from datetime import datetime
+from pathlib import Path
+
+from prefect import flow
+
+from apps.scan.handlers.scan_flow_handlers import (
+ on_scan_flow_running,
+ on_scan_flow_completed,
+ on_scan_flow_failed,
+)
+from apps.scan.tasks.fingerprint_detect import (
+ export_urls_for_fingerprint_task,
+ run_xingfinger_and_stream_update_tech_task,
+)
+from apps.scan.utils import build_scan_command
+from apps.scan.utils.fingerprint_helpers import get_fingerprint_paths
+
+logger = logging.getLogger(__name__)
+
+
+def calculate_fingerprint_detect_timeout(
+ url_count: int,
+ base_per_url: float = 3.0,
+ min_timeout: int = 60
+) -> int:
+ """
+ 根据 URL 数量计算超时时间
+
+ 公式:超时时间 = URL 数量 × 每 URL 基础时间
+ 最小值:60秒
+ 无上限
+
+ Args:
+ url_count: URL 数量
+ base_per_url: 每 URL 基础时间(秒),默认 3秒
+ min_timeout: 最小超时时间(秒),默认 60秒
+
+ Returns:
+ int: 计算出的超时时间(秒)
+
+ 示例:
+ 100 URL × 3秒 = 300秒
+ 1000 URL × 3秒 = 3000秒(50分钟)
+ 10000 URL × 3秒 = 30000秒(8.3小时)
+ """
+ timeout = int(url_count * base_per_url)
+ return max(min_timeout, timeout)
+
+
+def _setup_fingerprint_detect_directory(scan_workspace_dir: str) -> Path:
+ """
+ 创建并验证指纹识别工作目录
+
+ Args:
+ scan_workspace_dir: 扫描工作空间目录
+
+ Returns:
+ Path: 指纹识别目录路径
+
+ Raises:
+ RuntimeError: 目录创建或验证失败
+ """
+ fingerprint_dir = Path(scan_workspace_dir) / 'fingerprint_detect'
+ fingerprint_dir.mkdir(parents=True, exist_ok=True)
+
+ if not fingerprint_dir.is_dir():
+ raise RuntimeError(f"指纹识别目录创建失败: {fingerprint_dir}")
+ if not os.access(fingerprint_dir, os.W_OK):
+ raise RuntimeError(f"指纹识别目录不可写: {fingerprint_dir}")
+
+ return fingerprint_dir
+
+
+def _export_urls(
+ target_id: int,
+ fingerprint_dir: Path,
+ target_name: str = None,
+ source: str = 'website'
+) -> tuple[str, int]:
+ """
+ 导出 URL 到文件
+
+ Args:
+ target_id: 目标 ID
+ fingerprint_dir: 指纹识别目录
+ target_name: 目标名称(用于懒加载)
+ source: 数据源类型
+
+ Returns:
+ tuple: (urls_file, total_count)
+ """
+ logger.info("Step 1: 导出 URL 列表 (source=%s)", source)
+
+ urls_file = str(fingerprint_dir / 'urls.txt')
+ export_result = export_urls_for_fingerprint_task(
+ target_id=target_id,
+ output_file=urls_file,
+ target_name=target_name,
+ source=source,
+ batch_size=1000
+ )
+
+ total_count = export_result['total_count']
+
+ logger.info(
+ "✓ URL 导出完成 - 文件: %s, 数量: %d",
+ export_result['output_file'],
+ total_count
+ )
+
+ return export_result['output_file'], total_count
+
+
+def _run_fingerprint_detect(
+ enabled_tools: dict,
+ urls_file: str,
+ url_count: int,
+ fingerprint_dir: Path,
+ scan_id: int,
+ target_id: int,
+ source: str
+) -> tuple[dict, list]:
+ """
+ 执行指纹识别任务
+
+ Args:
+ enabled_tools: 已启用的工具配置字典
+ urls_file: URL 文件路径
+ url_count: URL 总数
+ fingerprint_dir: 指纹识别目录
+ scan_id: 扫描任务 ID
+ target_id: 目标 ID
+ source: 数据源类型
+
+ Returns:
+ tuple: (tool_stats, failed_tools)
+ """
+ tool_stats = {}
+ failed_tools = []
+
+ for tool_name, tool_config in enabled_tools.items():
+ # 1. 获取指纹库路径
+ lib_names = tool_config.get('fingerprint_libs', ['ehole'])
+ fingerprint_paths = get_fingerprint_paths(lib_names)
+
+ if not fingerprint_paths:
+ reason = f"没有可用的指纹库: {lib_names}"
+ logger.warning(reason)
+ failed_tools.append({'tool': tool_name, 'reason': reason})
+ continue
+
+ # 2. 将指纹库路径合并到 tool_config(用于命令构建)
+ tool_config_with_paths = {**tool_config, **fingerprint_paths}
+
+ # 3. 构建命令
+ try:
+ command = build_scan_command(
+ tool_name=tool_name,
+ scan_type='fingerprint_detect',
+ command_params={
+ 'urls_file': urls_file
+ },
+ tool_config=tool_config_with_paths
+ )
+ except Exception as e:
+ reason = f"命令构建失败: {str(e)}"
+ logger.error("构建 %s 命令失败: %s", tool_name, e)
+ failed_tools.append({'tool': tool_name, 'reason': reason})
+ continue
+
+ # 4. 计算超时时间
+ timeout = calculate_fingerprint_detect_timeout(url_count)
+
+ # 5. 生成日志文件路径
+ timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
+ log_file = fingerprint_dir / f"{tool_name}_{timestamp}.log"
+
+ logger.info(
+ "开始执行 %s 指纹识别 - URL数: %d, 超时: %ds, 指纹库: %s",
+ tool_name, url_count, timeout, list(fingerprint_paths.keys())
+ )
+
+ # 6. 执行扫描任务
+ try:
+ result = run_xingfinger_and_stream_update_tech_task(
+ cmd=command,
+ tool_name=tool_name,
+ scan_id=scan_id,
+ target_id=target_id,
+ source=source,
+ cwd=str(fingerprint_dir),
+ timeout=timeout,
+ log_file=str(log_file),
+ batch_size=100
+ )
+
+ tool_stats[tool_name] = {
+ 'command': command,
+ 'result': result,
+ 'timeout': timeout,
+ 'fingerprint_libs': list(fingerprint_paths.keys())
+ }
+
+ logger.info(
+ "✓ 工具 %s 执行完成 - 处理记录: %d, 更新: %d, 未找到: %d",
+ tool_name,
+ result.get('processed_records', 0),
+ result.get('updated_count', 0),
+ result.get('not_found_count', 0)
+ )
+
+ except Exception as exc:
+ failed_tools.append({'tool': tool_name, 'reason': str(exc)})
+ logger.error("工具 %s 执行失败: %s", tool_name, exc, exc_info=True)
+
+ if failed_tools:
+ logger.warning(
+ "以下指纹识别工具执行失败: %s",
+ ', '.join([f['tool'] for f in failed_tools])
+ )
+
+ return tool_stats, failed_tools
+
+
+@flow(
+ name="fingerprint_detect",
+ log_prints=True,
+ on_running=[on_scan_flow_running],
+ on_completion=[on_scan_flow_completed],
+ on_failure=[on_scan_flow_failed],
+)
+def fingerprint_detect_flow(
+ scan_id: int,
+ target_name: str,
+ target_id: int,
+ scan_workspace_dir: str,
+ enabled_tools: dict
+) -> dict:
+ """
+ 指纹识别 Flow
+
+ 主要功能:
+ 1. 从数据库导出目标下所有 WebSite URL 到文件
+ 2. 使用 xingfinger 进行技术栈识别
+ 3. 解析结果并更新 WebSite.tech 字段(合并去重)
+
+ 工作流程:
+ Step 0: 创建工作目录
+ Step 1: 导出 URL 列表
+ Step 2: 解析配置,获取启用的工具
+ Step 3: 执行 xingfinger 并解析结果
+
+ Args:
+ scan_id: 扫描任务 ID
+ target_name: 目标名称
+ target_id: 目标 ID
+ scan_workspace_dir: 扫描工作空间目录
+ enabled_tools: 启用的工具配置(xingfinger)
+
+ Returns:
+ dict: {
+ 'success': bool,
+ 'scan_id': int,
+ 'target': str,
+ 'scan_workspace_dir': str,
+ 'urls_file': str,
+ 'url_count': int,
+ 'processed_records': int,
+ 'updated_count': int,
+ 'not_found_count': int,
+ 'executed_tasks': list,
+ 'tool_stats': dict
+ }
+ """
+ try:
+ logger.info(
+ "="*60 + "\n" +
+ "开始指纹识别\n" +
+ f" Scan ID: {scan_id}\n" +
+ f" Target: {target_name}\n" +
+ f" Workspace: {scan_workspace_dir}\n" +
+ "="*60
+ )
+
+ # 参数验证
+ if scan_id is None:
+ raise ValueError("scan_id 不能为空")
+ if not target_name:
+ raise ValueError("target_name 不能为空")
+ if target_id is None:
+ raise ValueError("target_id 不能为空")
+ if not scan_workspace_dir:
+ raise ValueError("scan_workspace_dir 不能为空")
+
+ # 数据源类型(当前只支持 website)
+ source = 'website'
+
+ # Step 0: 创建工作目录
+ fingerprint_dir = _setup_fingerprint_detect_directory(scan_workspace_dir)
+
+ # Step 1: 导出 URL(支持懒加载)
+ urls_file, url_count = _export_urls(target_id, fingerprint_dir, target_name, source)
+
+ if url_count == 0:
+ logger.warning("目标下没有可用的 URL,跳过指纹识别")
+ return {
+ 'success': True,
+ 'scan_id': scan_id,
+ 'target': target_name,
+ 'scan_workspace_dir': scan_workspace_dir,
+ 'urls_file': urls_file,
+ 'url_count': 0,
+ 'processed_records': 0,
+ 'updated_count': 0,
+ 'created_count': 0,
+ 'executed_tasks': ['export_urls_for_fingerprint'],
+ 'tool_stats': {
+ 'total': 0,
+ 'successful': 0,
+ 'failed': 0,
+ 'successful_tools': [],
+ 'failed_tools': [],
+ 'details': {}
+ }
+ }
+
+ # Step 2: 工具配置信息
+ logger.info("Step 2: 工具配置信息")
+ logger.info("✓ 启用工具: %s", ', '.join(enabled_tools.keys()))
+
+ # Step 3: 执行指纹识别
+ logger.info("Step 3: 执行指纹识别")
+ tool_stats, failed_tools = _run_fingerprint_detect(
+ enabled_tools=enabled_tools,
+ urls_file=urls_file,
+ url_count=url_count,
+ fingerprint_dir=fingerprint_dir,
+ scan_id=scan_id,
+ target_id=target_id,
+ source=source
+ )
+
+ logger.info("="*60 + "\n✓ 指纹识别完成\n" + "="*60)
+
+ # 动态生成已执行的任务列表
+ executed_tasks = ['export_urls_for_fingerprint']
+ executed_tasks.extend([f'run_xingfinger ({tool})' for tool in tool_stats.keys()])
+
+ # 汇总所有工具的结果
+ total_processed = sum(stats['result'].get('processed_records', 0) for stats in tool_stats.values())
+ total_updated = sum(stats['result'].get('updated_count', 0) for stats in tool_stats.values())
+ total_created = sum(stats['result'].get('created_count', 0) for stats in tool_stats.values())
+
+ successful_tools = [name for name in enabled_tools.keys()
+ if name not in [f['tool'] for f in failed_tools]]
+
+ return {
+ 'success': True,
+ 'scan_id': scan_id,
+ 'target': target_name,
+ 'scan_workspace_dir': scan_workspace_dir,
+ 'urls_file': urls_file,
+ 'url_count': url_count,
+ 'processed_records': total_processed,
+ 'updated_count': total_updated,
+ 'created_count': total_created,
+ 'executed_tasks': executed_tasks,
+ 'tool_stats': {
+ 'total': len(enabled_tools),
+ 'successful': len(successful_tools),
+ 'failed': len(failed_tools),
+ 'successful_tools': successful_tools,
+ 'failed_tools': failed_tools,
+ 'details': tool_stats
+ }
+ }
+
+ except ValueError as e:
+ logger.error("配置错误: %s", e)
+ raise
+ except RuntimeError as e:
+ logger.error("运行时错误: %s", e)
+ raise
+ except Exception as e:
+ logger.exception("指纹识别失败: %s", e)
+ raise
diff --git a/backend/apps/scan/tasks/fingerprint_detect/export_urls_task.py b/backend/apps/scan/tasks/fingerprint_detect/export_urls_task.py
new file mode 100644
index 00000000..4e2fe7ea
--- /dev/null
+++ b/backend/apps/scan/tasks/fingerprint_detect/export_urls_task.py
@@ -0,0 +1,137 @@
+"""
+导出 URL 任务
+
+用于指纹识别前导出目标下的 URL 到文件
+支持懒加载模式:如果数据库为空,根据 Target 类型生成默认 URL
+"""
+
+import ipaddress
+import importlib
+import logging
+from pathlib import Path
+
+from prefect import task
+
+logger = logging.getLogger(__name__)
+
+
+# 数据源映射:source → (module_path, model_name, url_field)
+SOURCE_MODEL_MAP = {
+ 'website': ('apps.asset.models', 'WebSite', 'url'),
+ # 以后扩展:
+ # 'endpoint': ('apps.asset.models', 'Endpoint', 'url'),
+ # 'directory': ('apps.asset.models', 'Directory', 'url'),
+}
+
+
+def _get_model_class(source: str):
+ """
+ 根据数据源类型获取 Model 类
+ """
+ if source not in SOURCE_MODEL_MAP:
+ raise ValueError(f"不支持的数据源: {source},支持的类型: {list(SOURCE_MODEL_MAP.keys())}")
+
+ module_path, model_name, _ = SOURCE_MODEL_MAP[source]
+ module = importlib.import_module(module_path)
+ return getattr(module, model_name)
+
+
+@task(name="export_urls_for_fingerprint")
+def export_urls_for_fingerprint_task(
+ target_id: int,
+ output_file: str,
+ target_name: str = None,
+ source: str = 'website',
+ batch_size: int = 1000
+) -> dict:
+ """
+ 导出目标下的 URL 到文件(用于指纹识别)
+
+ 支持多种数据源,预留扩展:
+ - website: WebSite 表(当前实现)
+ - endpoint: Endpoint 表(以后扩展)
+ - directory: Directory 表(以后扩展)
+
+ 懒加载模式:
+ - 如果数据库为空,根据 Target 类型生成默认 URL
+ - DOMAIN: http(s)://domain
+ - IP: http(s)://ip
+ - CIDR: 展开为所有 IP 的 URL
+ - URL: 直接使用目标 URL
+
+ Args:
+ target_id: 目标 ID
+ output_file: 输出文件路径
+ target_name: 目标名称(用于懒加载)
+ source: 数据源类型
+ batch_size: 批量读取大小
+
+ Returns:
+ dict: {'output_file': str, 'total_count': int, 'source': str}
+ """
+ from apps.targets.services import TargetService
+ from apps.targets.models import Target
+
+ logger.info("开始导出 URL - target_id=%s, source=%s, output=%s", target_id, source, output_file)
+
+ Model = _get_model_class(source)
+ _, _, url_field = SOURCE_MODEL_MAP[source]
+
+ output_path = Path(output_file)
+
+ # 分批导出
+ total_count = 0
+ with open(output_path, 'w', encoding='utf-8') as f:
+ queryset = Model.objects.filter(target_id=target_id).values_list(url_field, flat=True)
+ for url in queryset.iterator(chunk_size=batch_size):
+ if url:
+ f.write(url + '\n')
+ total_count += 1
+
+ # ==================== 懒加载模式:根据 Target 类型生成默认 URL ====================
+ if total_count == 0:
+ target_service = TargetService()
+ target = target_service.get_target(target_id)
+
+ if target:
+ target_name = target.name
+ target_type = target.type
+
+ logger.info("懒加载模式:Target 类型=%s, 名称=%s", target_type, target_name)
+
+ with open(output_path, 'w', encoding='utf-8') as f:
+ if target_type == Target.TargetType.DOMAIN:
+ f.write(f"http://{target_name}\n")
+ f.write(f"https://{target_name}\n")
+ total_count = 2
+
+ elif target_type == Target.TargetType.IP:
+ f.write(f"http://{target_name}\n")
+ f.write(f"https://{target_name}\n")
+ total_count = 2
+
+ elif target_type == Target.TargetType.CIDR:
+ try:
+ network = ipaddress.ip_network(target_name, strict=False)
+ for ip in network.hosts():
+ f.write(f"http://{ip}\n")
+ f.write(f"https://{ip}\n")
+ total_count += 2
+ except ValueError as e:
+ logger.warning("CIDR 解析失败: %s", e)
+
+ elif target_type == Target.TargetType.URL:
+ f.write(f"{target_name}\n")
+ total_count = 1
+
+ logger.info("✓ 懒加载生成默认 URL - 数量: %d", total_count)
+ else:
+ logger.warning("Target ID %d 不存在,无法生成默认 URL", target_id)
+
+ logger.info("✓ URL 导出完成 - 数量: %d, 文件: %s", total_count, output_file)
+
+ return {
+ 'output_file': output_file,
+ 'total_count': total_count,
+ 'source': source
+ }
diff --git a/backend/apps/scan/tasks/fingerprint_detect/run_xingfinger_task.py b/backend/apps/scan/tasks/fingerprint_detect/run_xingfinger_task.py
new file mode 100644
index 00000000..d24f6d36
--- /dev/null
+++ b/backend/apps/scan/tasks/fingerprint_detect/run_xingfinger_task.py
@@ -0,0 +1,300 @@
+"""
+xingfinger 执行任务
+
+流式执行 xingfinger 命令并实时更新 tech 字段
+"""
+
+import importlib
+import json
+import logging
+import subprocess
+from typing import Optional, Generator
+from urllib.parse import urlparse
+
+from django.db import connection
+from prefect import task
+
+from apps.scan.utils import execute_stream
+
+logger = logging.getLogger(__name__)
+
+
+# 数据源映射:source → (module_path, model_name, url_field)
+SOURCE_MODEL_MAP = {
+ 'website': ('apps.asset.models', 'WebSite', 'url'),
+ # 以后扩展:
+ # 'endpoint': ('apps.asset.models', 'Endpoint', 'url'),
+ # 'directory': ('apps.asset.models', 'Directory', 'url'),
+}
+
+
+def _get_model_class(source: str):
+ """根据数据源类型获取 Model 类"""
+ if source not in SOURCE_MODEL_MAP:
+ raise ValueError(f"不支持的数据源: {source}")
+
+ module_path, model_name, _ = SOURCE_MODEL_MAP[source]
+ module = importlib.import_module(module_path)
+ return getattr(module, model_name)
+
+
+def parse_xingfinger_line(line: str) -> tuple[str, list[str]] | None:
+ """
+ 解析 xingfinger 单行 JSON 输出
+
+ xingfinger 静默模式输出格式:
+ {"url": "https://example.com", "cms": "WordPress,PHP,nginx", ...}
+
+ Returns:
+ tuple: (url, tech_list) 或 None(解析失败时)
+ """
+ try:
+ item = json.loads(line)
+ url = item.get('url', '').strip()
+ cms = item.get('cms', '')
+
+ if not url or not cms:
+ return None
+
+ # cms 字段按逗号分割,去除空白
+ techs = [t.strip() for t in cms.split(',') if t.strip()]
+
+ return (url, techs) if techs else None
+
+ except json.JSONDecodeError:
+ return None
+
+
+def bulk_merge_tech_field(
+ source: str,
+ url_techs_map: dict[str, list[str]],
+ target_id: int
+) -> dict:
+ """
+ 批量合并 tech 数组字段(PostgreSQL 原生 SQL)
+
+ 使用 PostgreSQL 原生 SQL 实现高效的数组合并去重操作。
+ 如果 URL 对应的记录不存在,会自动创建新记录。
+
+ Returns:
+ dict: {'updated_count': int, 'created_count': int}
+ """
+ Model = _get_model_class(source)
+ table_name = Model._meta.db_table
+
+ updated_count = 0
+ created_count = 0
+
+ with connection.cursor() as cursor:
+ for url, techs in url_techs_map.items():
+ if not techs:
+ continue
+
+ # 先尝试更新(PostgreSQL 数组合并去重)
+ sql = f"""
+ UPDATE {table_name}
+ SET tech = (
+ SELECT ARRAY(SELECT DISTINCT unnest(
+ COALESCE(tech, ARRAY[]::varchar[]) || %s::varchar[]
+ ))
+ )
+ WHERE url = %s AND target_id = %s
+ """
+
+ cursor.execute(sql, [techs, url, target_id])
+
+ if cursor.rowcount > 0:
+ updated_count += cursor.rowcount
+ else:
+ # 记录不存在,创建新记录
+ try:
+ # 从 URL 提取 host
+ parsed = urlparse(url)
+ host = parsed.hostname or ''
+
+ # 插入新记录(带冲突处理)
+ insert_sql = f"""
+ INSERT INTO {table_name} (target_id, url, host, tech, created_at)
+ VALUES (%s, %s, %s, %s::varchar[], NOW())
+ ON CONFLICT (target_id, url) DO UPDATE SET
+ tech = (
+ SELECT ARRAY(SELECT DISTINCT unnest(
+ COALESCE({table_name}.tech, ARRAY[]::varchar[]) || EXCLUDED.tech
+ ))
+ )
+ """
+ cursor.execute(insert_sql, [target_id, url, host, techs])
+ created_count += 1
+
+ except Exception as e:
+ logger.warning("创建 %s 记录失败 (url=%s): %s", source, url, e)
+
+ return {
+ 'updated_count': updated_count,
+ 'created_count': created_count
+ }
+
+
+def _parse_xingfinger_stream_output(
+ cmd: str,
+ tool_name: str,
+ cwd: Optional[str] = None,
+ timeout: Optional[int] = None,
+ log_file: Optional[str] = None
+) -> Generator[tuple[str, list[str]], None, None]:
+ """
+ 流式解析 xingfinger 命令输出
+
+ 基于 execute_stream 实时处理 xingfinger 命令的 stdout,将每行 JSON 输出
+ 转换为 (url, tech_list) 格式
+ """
+ logger.info("开始流式解析 xingfinger 命令输出 - 命令: %s", cmd)
+
+ total_lines = 0
+ valid_records = 0
+
+ try:
+ for line in execute_stream(cmd=cmd, tool_name=tool_name, cwd=cwd, shell=True, timeout=timeout, log_file=log_file):
+ total_lines += 1
+
+ # 解析单行 JSON
+ result = parse_xingfinger_line(line)
+ if result is None:
+ continue
+
+ valid_records += 1
+ yield result
+
+ # 每处理 500 条记录输出一次进度
+ if valid_records % 500 == 0:
+ logger.info("已解析 %d 条有效记录...", valid_records)
+
+ except subprocess.TimeoutExpired as e:
+ error_msg = f"xingfinger 命令执行超时 - 超过 {timeout} 秒"
+ logger.warning(error_msg)
+ raise RuntimeError(error_msg) from e
+ except Exception as e:
+ logger.error("流式解析 xingfinger 输出失败: %s", e, exc_info=True)
+ raise
+
+ logger.info("流式解析完成 - 总行数: %d, 有效记录: %d", total_lines, valid_records)
+
+
+@task(name="run_xingfinger_and_stream_update_tech")
+def run_xingfinger_and_stream_update_tech_task(
+ cmd: str,
+ tool_name: str,
+ scan_id: int,
+ target_id: int,
+ source: str,
+ cwd: str,
+ timeout: int,
+ log_file: str,
+ batch_size: int = 100
+) -> dict:
+ """
+ 流式执行 xingfinger 命令并实时更新 tech 字段
+
+ 根据 source 参数更新对应表的 tech 字段:
+ - website → WebSite.tech
+ - endpoint → Endpoint.tech(以后扩展)
+
+ 处理流程:
+ 1. 流式执行 xingfinger 命令
+ 2. 实时解析 JSON 输出
+ 3. 累积到 batch_size 条后批量更新数据库
+ 4. 使用 PostgreSQL 原生 SQL 进行数组合并去重
+ 5. 如果记录不存在,自动创建
+
+ Returns:
+ dict: {
+ 'processed_records': int,
+ 'updated_count': int,
+ 'created_count': int,
+ 'batch_count': int
+ }
+ """
+ logger.info(
+ "开始执行 xingfinger 并更新 tech - target_id=%s, source=%s, timeout=%s秒",
+ target_id, source, timeout
+ )
+
+ data_generator = None
+
+ try:
+ # 初始化统计
+ processed_records = 0
+ updated_count = 0
+ created_count = 0
+ batch_count = 0
+
+ # 当前批次的 URL -> techs 映射
+ url_techs_map = {}
+
+ # 流式处理
+ data_generator = _parse_xingfinger_stream_output(
+ cmd=cmd,
+ tool_name=tool_name,
+ cwd=cwd,
+ timeout=timeout,
+ log_file=log_file
+ )
+
+ for url, techs in data_generator:
+ processed_records += 1
+
+ # 累积到 url_techs_map
+ if url in url_techs_map:
+ # 合并同一 URL 的多次识别结果
+ url_techs_map[url].extend(techs)
+ else:
+ url_techs_map[url] = techs
+
+ # 达到批次大小,执行批量更新
+ if len(url_techs_map) >= batch_size:
+ batch_count += 1
+ result = bulk_merge_tech_field(source, url_techs_map, target_id)
+ updated_count += result['updated_count']
+ created_count += result.get('created_count', 0)
+
+ logger.debug(
+ "批次 %d 完成 - 更新: %d, 创建: %d",
+ batch_count, result['updated_count'], result.get('created_count', 0)
+ )
+
+ # 清空批次
+ url_techs_map = {}
+
+ # 处理最后一批
+ if url_techs_map:
+ batch_count += 1
+ result = bulk_merge_tech_field(source, url_techs_map, target_id)
+ updated_count += result['updated_count']
+ created_count += result.get('created_count', 0)
+
+ logger.info(
+ "✓ xingfinger 执行完成 - 处理记录: %d, 更新: %d, 创建: %d, 批次: %d",
+ processed_records, updated_count, created_count, batch_count
+ )
+
+ return {
+ 'processed_records': processed_records,
+ 'updated_count': updated_count,
+ 'created_count': created_count,
+ 'batch_count': batch_count
+ }
+
+ except subprocess.TimeoutExpired:
+ logger.warning("⚠️ xingfinger 执行超时 - target_id=%s, timeout=%s秒", target_id, timeout)
+ raise
+ except Exception as e:
+ error_msg = f"xingfinger 执行失败: {e}"
+ logger.error(error_msg, exc_info=True)
+ raise RuntimeError(error_msg) from e
+ finally:
+ # 清理资源
+ if data_generator is not None:
+ try:
+ data_generator.close()
+ except Exception as e:
+ logger.debug("关闭生成器时出错: %s", e)
diff --git a/backend/apps/scan/utils/fingerprint_helpers.py b/backend/apps/scan/utils/fingerprint_helpers.py
index 686f652e..16873e34 100644
--- a/backend/apps/scan/utils/fingerprint_helpers.py
+++ b/backend/apps/scan/utils/fingerprint_helpers.py
@@ -1,7 +1,7 @@
"""指纹文件本地缓存工具
提供 Worker 侧的指纹文件缓存和版本校验功能,用于:
-- 指纹识别扫描 (fingerprint_scan_flow)
+- 指纹识别扫描 (fingerprint_detect_flow)
"""
import logging
@@ -12,6 +12,18 @@ from django.conf import settings
logger = logging.getLogger(__name__)
+# 指纹库映射:lib_name → ensure_func_name
+# 以后扩展其他指纹库时,在此添加映射
+FINGERPRINT_LIB_MAP = {
+ 'ehole': 'ensure_ehole_fingerprint_local',
+ # 以后扩展:
+ # 'goby': 'ensure_goby_fingerprint_local',
+ # 'wappalyzer': 'ensure_wappalyzer_fingerprint_local',
+ # 'fingers': 'ensure_fingers_fingerprint_local',
+ # 'fingerprinthub': 'ensure_fingerprinthub_fingerprint_local',
+}
+
+
def ensure_ehole_fingerprint_local() -> str:
"""
确保本地存在最新的 EHole 指纹文件(带缓存)
@@ -70,4 +82,40 @@ def ensure_ehole_fingerprint_local() -> str:
return cache_file
-__all__ = ["ensure_ehole_fingerprint_local"]
+def get_fingerprint_paths(lib_names: list) -> dict:
+ """
+ 获取多个指纹库的本地路径
+
+ Args:
+ lib_names: 指纹库名称列表,如 ['ehole', 'goby']
+
+ Returns:
+ dict: {lib_name: local_path},如 {'ehole': '/opt/xingrin/fingerprints/ehole.json'}
+
+ 示例:
+ paths = get_fingerprint_paths(['ehole'])
+ # {'ehole': '/opt/xingrin/fingerprints/ehole.json'}
+ """
+ paths = {}
+ for lib_name in lib_names:
+ if lib_name not in FINGERPRINT_LIB_MAP:
+ logger.warning("不支持的指纹库: %s,跳过", lib_name)
+ continue
+
+ ensure_func_name = FINGERPRINT_LIB_MAP[lib_name]
+ # 获取当前模块中的函数
+ ensure_func = globals().get(ensure_func_name)
+ if ensure_func is None:
+ logger.warning("指纹库 %s 的导出函数 %s 未实现,跳过", lib_name, ensure_func_name)
+ continue
+
+ try:
+ paths[lib_name] = ensure_func()
+ except Exception as e:
+ logger.error("获取指纹库 %s 路径失败: %s", lib_name, e)
+ continue
+
+ return paths
+
+
+__all__ = ["ensure_ehole_fingerprint_local", "get_fingerprint_paths", "FINGERPRINT_LIB_MAP"]
diff --git a/docs/scan-flow-architecture.md b/docs/scan-flow-architecture.md
index f51f9214..becdcfcc 100644
--- a/docs/scan-flow-architecture.md
+++ b/docs/scan-flow-architecture.md
@@ -40,8 +40,13 @@ flowchart TB
HTTPX1[httpx
Web Service Detection]
end
+ subgraph FINGER["Fingerprint Detect"]
+ XINGFINGER[xingfinger
Tech Stack Detection]
+ end
+
RESOLVE --> NAABU
NAABU --> HTTPX1
+ HTTPX1 --> XINGFINGER
end
TARGET --> SUBFINDER
@@ -69,9 +74,9 @@ flowchart TB
end
end
- HTTPX1 --> WAYMORE
- HTTPX1 --> KATANA
- HTTPX1 --> FFUF
+ XINGFINGER --> WAYMORE
+ XINGFINGER --> KATANA
+ XINGFINGER --> FFUF
subgraph STAGE3["Stage 3: Vulnerability Sequential"]
direction TB
@@ -105,7 +110,7 @@ flowchart TB
```python
# backend/apps/scan/configs/command_templates.py
EXECUTION_STAGES = [
- {'mode': 'sequential', 'flows': ['subdomain_discovery', 'port_scan', 'site_scan']},
+ {'mode': 'sequential', 'flows': ['subdomain_discovery', 'port_scan', 'site_scan', 'fingerprint_detect']},
{'mode': 'parallel', 'flows': ['url_fetch', 'directory_scan']},
{'mode': 'sequential', 'flows': ['vuln_scan']},
]
@@ -118,6 +123,7 @@ EXECUTION_STAGES = [
| subdomain_discovery | subfinder, amass, sublist3r, assetfinder, puredns | Subdomain |
| port_scan | naabu | HostPortMapping |
| site_scan | httpx | WebSite |
+| fingerprint_detect | xingfinger | WebSite.tech(更新) |
| url_fetch | waymore, katana, uro, httpx | Endpoint |
| directory_scan | ffuf | Directory |
| vuln_scan | dalfox, nuclei | Vulnerability |