nuclei加入website扫描为默认

This commit is contained in:
yyhuni
2026-01-11 12:13:27 +08:00
parent 1bd2a6ed88
commit 8342f196db
7 changed files with 355 additions and 33 deletions

View File

@@ -203,7 +203,7 @@ VULN_SCAN_COMMANDS = {
# -silent: 静默模式
# -l: 输入 URL 列表文件
# -t: 模板目录路径(支持多个仓库,多次 -t 由 template_args 直接拼接)
'base': "nuclei -j -silent -l '{endpoints_file}' {template_args}",
'base': "nuclei -j -silent -l '{input_file}' {template_args}",
'optional': {
'concurrency': '-c {concurrency}', # 并发数(默认 25
'rate_limit': '-rl {rate_limit}', # 每秒请求数限制
@@ -214,7 +214,12 @@ VULN_SCAN_COMMANDS = {
'tags': '-tags {tags}', # 过滤标签
'exclude_tags': '-etags {exclude_tags}', # 排除标签
},
'input_type': 'endpoints_file',
# 支持多种输入类型,用户通过 scan_endpoints/scan_websites 选择
'input_types': ['endpoints_file', 'websites_file'],
'defaults': {
'scan_endpoints': False, # 默认不扫描 endpoints
'scan_websites': True, # 默认扫描 websites
},
},
}

View File

@@ -158,7 +158,9 @@ vuln_scan:
nuclei:
enabled: true
# timeout: auto # 自动计算(根据 endpoints 行数)
# timeout: auto # 自动计算(根据输入 URL 行数)
scan-endpoints: false # 是否扫描 endpoints默认关闭
scan-websites: true # 是否扫描 websites默认开启
template-repo-names: # 模板仓库列表对应「Nuclei 模板」中的仓库名
- nuclei-templates
# - nuclei-custom # 可追加自定义仓库

View File

@@ -108,7 +108,7 @@ def endpoints_vuln_scan_flow(
template_args = " ".join(f"-t {p}" for p in template_paths)
# 构建命令参数
command_params = {"endpoints_file": str(endpoints_file)}
command_params = {"input_file": str(endpoints_file)}
if template_args:
command_params["template_args"] = template_args

View File

@@ -14,32 +14,48 @@ from apps.scan.handlers.scan_flow_handlers import (
from apps.scan.configs.command_templates import get_command_template
from apps.scan.utils import user_log, wait_for_system_load
from .endpoints_vuln_scan_flow import endpoints_vuln_scan_flow
from .websites_vuln_scan_flow import websites_vuln_scan_flow
logger = logging.getLogger(__name__)
def _classify_vuln_tools(enabled_tools: Dict[str, dict]) -> Tuple[Dict[str, dict], Dict[str, dict]]:
"""根据命令模板中的 input_type 对漏洞扫描工具进行分类。
def _classify_vuln_tools(
enabled_tools: Dict[str, dict]
) -> Tuple[Dict[str, dict], Dict[str, dict], Dict[str, dict]]:
"""根据用户配置分类漏洞扫描工具。
当前支持
- endpoints_file: 以端点列表文件为输入(例如 Dalfox XSS
预留:
- 其他 input_type 将被归类到 other_tools暂不处理。
分类逻辑
- 读取 scan_endpoints / scan_websites 配置
- 默认值从模板的 defaults 或 input_type 推断
Returns:
(endpoints_tools, websites_tools, other_tools) 三元组
"""
endpoints_tools: Dict[str, dict] = {}
websites_tools: Dict[str, dict] = {}
other_tools: Dict[str, dict] = {}
for tool_name, tool_config in enabled_tools.items():
template = get_command_template("vuln_scan", tool_name) or {}
input_type = template.get("input_type", "endpoints_file")
defaults = template.get("defaults", {})
if input_type == "endpoints_file":
# 根据 input_type 推断默认值(兼容老工具)
input_type = template.get("input_type")
default_endpoints = defaults.get("scan_endpoints", input_type == "endpoints_file")
default_websites = defaults.get("scan_websites", input_type == "websites_file")
scan_endpoints = tool_config.get("scan_endpoints", default_endpoints)
scan_websites = tool_config.get("scan_websites", default_websites)
if scan_endpoints:
endpoints_tools[tool_name] = tool_config
else:
if scan_websites:
websites_tools[tool_name] = tool_config
if not scan_endpoints and not scan_websites:
other_tools[tool_name] = tool_config
return endpoints_tools, other_tools
return endpoints_tools, websites_tools, other_tools
@flow(
@@ -60,7 +76,7 @@ def vuln_scan_flow(
支持工具:
- dalfox_xss: XSS 漏洞扫描(流式保存)
- nuclei: 通用漏洞扫描(流式保存,支持模板 commit hash 同步
- nuclei: 通用漏洞扫描(流式保存,支持 endpoints 和 websites 两种输入
"""
try:
# 负载检查:等待系统资源充足
@@ -84,11 +100,12 @@ def vuln_scan_flow(
user_log(scan_id, "vuln_scan", "Starting vulnerability scan")
# Step 1: 分类工具
endpoints_tools, other_tools = _classify_vuln_tools(enabled_tools)
endpoints_tools, websites_tools, other_tools = _classify_vuln_tools(enabled_tools)
logger.info(
"漏洞扫描工具分类 - endpoints_file: %s, 其他: %s",
"漏洞扫描工具分类 - endpoints: %s, websites: %s, 其他: %s",
list(endpoints_tools.keys()) or "",
list(websites_tools.keys()) or "",
list(other_tools.keys()) or "",
)
@@ -98,28 +115,58 @@ def vuln_scan_flow(
list(other_tools.keys()),
)
if not endpoints_tools:
raise ValueError("漏洞扫描需要至少启用一个以 endpoints_file 为输入的工具(如 dalfox_xss、nuclei")
if not endpoints_tools and not websites_tools:
raise ValueError(
"漏洞扫描需要至少启用一个工具endpoints 或 websites 模式)"
)
# Step 2: 执行 Endpoint 漏洞扫描子 Flow串行
endpoint_result = endpoints_vuln_scan_flow(
scan_id=scan_id,
target_id=target_id,
scan_workspace_dir=scan_workspace_dir,
enabled_tools=endpoints_tools,
provider=provider,
)
total_vulns = 0
results = {}
# Step 2: 执行 Endpoint 漏洞扫描子 Flow
if endpoints_tools:
logger.info("执行 Endpoint 漏洞扫描 - 工具: %s", list(endpoints_tools.keys()))
endpoint_result = endpoints_vuln_scan_flow(
scan_id=scan_id,
target_id=target_id,
scan_workspace_dir=scan_workspace_dir,
enabled_tools=endpoints_tools,
provider=provider,
)
results["endpoints"] = endpoint_result
total_vulns += sum(
r.get("created_vulns", 0)
for r in endpoint_result.get("tool_results", {}).values()
)
# Step 3: 执行 WebSite 漏洞扫描子 Flow
if websites_tools:
logger.info("执行 WebSite 漏洞扫描 - 工具: %s", list(websites_tools.keys()))
website_result = websites_vuln_scan_flow(
scan_id=scan_id,
target_id=target_id,
scan_workspace_dir=scan_workspace_dir,
enabled_tools=websites_tools,
provider=provider,
)
results["websites"] = website_result
total_vulns += sum(
r.get("created_vulns", 0)
for r in website_result.get("tool_results", {}).values()
)
# 记录 Flow 完成
total_vulns = sum(
r.get("created_vulns", 0)
for r in endpoint_result.get("tool_results", {}).values()
)
logger.info("✓ 漏洞扫描完成 - 新增漏洞: %d", total_vulns)
user_log(scan_id, "vuln_scan", f"vuln_scan completed: found {total_vulns} vulnerabilities")
# 目前只有一个子 Flow直接返回其结果
return endpoint_result
return {
"success": True,
"scan_id": scan_id,
"target": target_name,
"scan_workspace_dir": scan_workspace_dir,
"total_vulns": total_vulns,
"sub_flow_results": results,
}
except Exception as e:
logger.exception("漏洞扫描主 Flow 失败: %s", e)

View File

@@ -0,0 +1,192 @@
"""
基于 WebSite 的漏洞扫描 Flow
与 endpoints_vuln_scan_flow 类似,但数据源是 WebSite 而不是 Endpoint。
主要用于 nuclei 扫描已存活的网站。
"""
import logging
from datetime import datetime
from typing import Dict
from prefect import flow
from apps.scan.utils import build_scan_command, ensure_nuclei_templates_local, user_log
from apps.scan.tasks.vuln_scan import run_and_stream_save_nuclei_vulns_task
from apps.scan.tasks.vuln_scan.export_websites_task import export_websites_task
from .utils import calculate_timeout_by_line_count
logger = logging.getLogger(__name__)
@flow(
name="websites_vuln_scan_flow",
log_prints=True,
)
def websites_vuln_scan_flow(
scan_id: int,
target_id: int,
scan_workspace_dir: str,
enabled_tools: Dict[str, dict],
provider,
) -> dict:
"""基于 WebSite 的漏洞扫描 Flow主要用于 nuclei"""
try:
target_name = provider.get_target_name()
if not target_name:
raise ValueError("无法获取 Target 名称")
if scan_id is None:
raise ValueError("scan_id 不能为空")
if target_id is None:
raise ValueError("target_id 不能为空")
if not scan_workspace_dir:
raise ValueError("scan_workspace_dir 不能为空")
if not enabled_tools:
raise ValueError("enabled_tools 不能为空")
from apps.scan.utils import setup_scan_directory
vuln_scan_dir = setup_scan_directory(scan_workspace_dir, 'vuln_scan')
websites_file = vuln_scan_dir / "input_websites.txt"
# Step 1: 导出 WebSite URL
export_result = export_websites_task(
output_file=str(websites_file),
provider=provider,
)
total_websites = export_result.get("total_count", 0)
if total_websites == 0:
logger.warning("目标下没有可用 WebSite跳过漏洞扫描")
return {
"success": True,
"scan_id": scan_id,
"target": target_name,
"scan_workspace_dir": scan_workspace_dir,
"websites_file": str(websites_file),
"website_count": 0,
"executed_tools": [],
"tool_results": {},
}
logger.info("WebSite 导出完成,共 %d 条,开始执行漏洞扫描", total_websites)
tool_results: Dict[str, dict] = {}
tool_futures: Dict[str, dict] = {}
# Step 2: 执行漏洞扫描工具
for tool_name, tool_config in enabled_tools.items():
# 目前只支持 nuclei
if tool_name != "nuclei":
logger.warning("websites_vuln_scan_flow 暂不支持工具: %s", tool_name)
continue
# 确保 nuclei 模板存在
repo_names = tool_config.get("template_repo_names")
if not repo_names or not isinstance(repo_names, (list, tuple)):
logger.error("Nuclei 配置缺少 template_repo_names数组跳过")
continue
template_paths = []
try:
for repo_name in repo_names:
path = ensure_nuclei_templates_local(repo_name)
template_paths.append(path)
logger.info("Nuclei 模板路径 [%s]: %s", repo_name, path)
except Exception as e:
logger.error("获取 Nuclei 模板失败: %s,跳过 nuclei 扫描", e)
continue
template_args = " ".join(f"-t {p}" for p in template_paths)
# 构建命令(使用 websites_file 作为输入)
command_params = {
"input_file": str(websites_file),
"template_args": template_args,
}
command = build_scan_command(
tool_name=tool_name,
scan_type="vuln_scan",
command_params=command_params,
tool_config=tool_config,
)
# 计算超时时间
raw_timeout = tool_config.get("timeout", 600)
if isinstance(raw_timeout, str) and raw_timeout == "auto":
timeout = calculate_timeout_by_line_count(
tool_config=tool_config,
file_path=str(websites_file),
base_per_time=30,
)
else:
try:
timeout = int(raw_timeout)
except (TypeError, ValueError) as e:
raise ValueError(
f"工具 {tool_name} 的 timeout 配置无效: {raw_timeout!r}"
) from e
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = vuln_scan_dir / f"{tool_name}_websites_{timestamp}.log"
logger.info("开始执行 %s 漏洞扫描WebSite 模式)", tool_name)
user_log(scan_id, "vuln_scan", f"Running {tool_name} (websites): {command}")
future = run_and_stream_save_nuclei_vulns_task.submit(
cmd=command,
tool_name=tool_name,
scan_id=scan_id,
target_id=target_id,
cwd=str(vuln_scan_dir),
shell=True,
batch_size=1,
timeout=timeout,
log_file=str(log_file),
)
tool_futures[tool_name] = {
"future": future,
"command": command,
"timeout": timeout,
"log_file": str(log_file),
}
# 收集结果
for tool_name, meta in tool_futures.items():
future = meta["future"]
try:
result = future.result()
created_vulns = result.get("created_vulns", 0)
tool_results[tool_name] = {
"command": meta["command"],
"timeout": meta["timeout"],
"processed_records": result.get("processed_records"),
"created_vulns": created_vulns,
"command_log_file": meta["log_file"],
}
logger.info("✓ 工具 %s (websites) 执行完成 - 漏洞: %d", tool_name, created_vulns)
user_log(
scan_id, "vuln_scan",
f"{tool_name} (websites) completed: found {created_vulns} vulnerabilities"
)
except Exception as e:
reason = str(e)
logger.error("工具 %s 执行失败: %s", tool_name, e, exc_info=True)
user_log(scan_id, "vuln_scan", f"{tool_name} failed: {reason}", "error")
return {
"success": True,
"scan_id": scan_id,
"target": target_name,
"scan_workspace_dir": scan_workspace_dir,
"websites_file": str(websites_file),
"website_count": total_websites,
"executed_tools": list(enabled_tools.keys()),
"tool_results": tool_results,
}
except Exception as e:
logger.exception("WebSite 漏洞扫描失败: %s", e)
raise

View File

@@ -2,18 +2,21 @@
包含:
- export_endpoints_task: 导出端点 URL 到文件
- export_websites_task: 导出网站 URL 到文件
- run_vuln_tool_task: 执行漏洞扫描工具(非流式)
- run_and_stream_save_dalfox_vulns_task: Dalfox 流式执行并保存漏洞结果
- run_and_stream_save_nuclei_vulns_task: Nuclei 流式执行并保存漏洞结果
"""
from .export_endpoints_task import export_endpoints_task
from .export_websites_task import export_websites_task
from .run_vuln_tool_task import run_vuln_tool_task
from .run_and_stream_save_dalfox_vulns_task import run_and_stream_save_dalfox_vulns_task
from .run_and_stream_save_nuclei_vulns_task import run_and_stream_save_nuclei_vulns_task
__all__ = [
"export_endpoints_task",
"export_websites_task",
"run_vuln_tool_task",
"run_and_stream_save_dalfox_vulns_task",
"run_and_stream_save_nuclei_vulns_task",

View File

@@ -0,0 +1,73 @@
"""导出 WebSite URL 到文件的 Task
使用 TargetProvider 从任意数据源导出 URL。
数据源WebSite为空时回退到默认 URL
"""
import logging
from pathlib import Path
from prefect import task
from apps.scan.providers import TargetProvider
logger = logging.getLogger(__name__)
@task(name="export_websites_for_vuln_scan")
def export_websites_task(
output_file: str,
provider: TargetProvider,
) -> dict:
"""导出目标下的所有 WebSite URL 到文本文件。
数据源优先级WebSite → 默认生成
Args:
output_file: 输出文件路径(绝对路径)
provider: TargetProvider 实例
Returns:
dict: {
"success": bool,
"output_file": str,
"total_count": int,
"source": str, # website | default
}
"""
if provider is None:
raise ValueError("必须提供 provider 参数")
logger.info("导出 URL - Provider: %s", type(provider).__name__)
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
# 获取数据,为空时回退到默认 URL
urls = list(provider.iter_websites())
source = "website"
if not urls:
logger.info("WebSite 为空,生成默认 URL")
urls = list(provider.iter_default_urls())
source = "default"
# 写入文件
total_count = 0
with open(output_path, 'w', encoding='utf-8', buffering=8192) as f:
for url in urls:
f.write(f"{url}\n")
total_count += 1
logger.info(
"✓ URL 导出完成 - 来源: %s, 总数: %d, 文件: %s",
source, total_count, str(output_path)
)
return {
"success": True,
"output_file": str(output_path),
"total_count": total_count,
"source": source,
}