mirror of
https://github.com/yyhuni/xingrin.git
synced 2026-01-31 19:53:11 +08:00
- Add response_headers field to Endpoint and WebSite models as JSONField - Add response_headers field to EndpointSnapshot and WebsiteSnapshot models - Update all related DTOs to include response_headers with Dict[str, Any] type - Add GIN indexes on response_headers fields for optimized JSON queries - Update endpoint and website repositories to handle response_headers data - Update serializers to include response_headers in API responses - Update frontend components to display response headers in detail views - Add response_headers to fingerprint detection and site scan tasks - Update command templates and engine config to support header extraction - Add internationalization strings for response headers in en.json and zh.json - Update TypeScript types for endpoint and website to include response_headers - Enhance scan history and target detail pages to show response header information
114 lines
4.6 KiB
Python
114 lines
4.6 KiB
Python
"""
|
||
初始化默认扫描引擎
|
||
|
||
用法:
|
||
python manage.py init_default_engine # 只创建不存在的引擎(不覆盖已有)
|
||
python manage.py init_default_engine --force # 强制覆盖所有引擎配置
|
||
|
||
cd /root/my-vulun-scan/docker
|
||
docker compose exec server python backend/manage.py init_default_engine --force
|
||
|
||
功能:
|
||
- 读取 engine_config_example.yaml 作为默认配置
|
||
- 创建 full scan(默认引擎)+ 各扫描类型的子引擎
|
||
- 默认不覆盖已有配置,加 --force 才会覆盖
|
||
"""
|
||
|
||
from django.core.management.base import BaseCommand
|
||
from io import StringIO
|
||
from pathlib import Path
|
||
|
||
from ruamel.yaml import YAML
|
||
|
||
from apps.engine.models import ScanEngine
|
||
|
||
|
||
class Command(BaseCommand):
|
||
help = '初始化默认扫描引擎配置(默认不覆盖已有,加 --force 强制覆盖)'
|
||
|
||
def add_arguments(self, parser):
|
||
parser.add_argument(
|
||
'--force',
|
||
action='store_true',
|
||
help='强制覆盖已有的引擎配置',
|
||
)
|
||
|
||
def handle(self, *args, **options):
|
||
force = options.get('force', False)
|
||
# 读取默认配置文件
|
||
config_path = Path(__file__).resolve().parent.parent.parent.parent / 'scan' / 'configs' / 'engine_config_example.yaml'
|
||
|
||
if not config_path.exists():
|
||
self.stdout.write(self.style.ERROR(f'配置文件不存在: {config_path}'))
|
||
return
|
||
|
||
with open(config_path, 'r', encoding='utf-8') as f:
|
||
default_config = f.read()
|
||
|
||
# 使用 ruamel.yaml 解析,保留注释
|
||
yaml_parser = YAML()
|
||
yaml_parser.preserve_quotes = True
|
||
try:
|
||
config_dict = yaml_parser.load(default_config) or {}
|
||
except Exception as e:
|
||
self.stdout.write(self.style.ERROR(f'引擎配置 YAML 解析失败: {e}'))
|
||
return
|
||
|
||
# 1) full scan:保留完整配置
|
||
engine = ScanEngine.objects.filter(name='full scan').first()
|
||
if engine:
|
||
if force:
|
||
engine.configuration = default_config
|
||
engine.save()
|
||
self.stdout.write(self.style.SUCCESS(f'✓ 扫描引擎 full scan 配置已更新 (ID: {engine.id})'))
|
||
else:
|
||
self.stdout.write(self.style.WARNING(f' ⊘ full scan 已存在,跳过(使用 --force 覆盖)'))
|
||
else:
|
||
engine = ScanEngine.objects.create(
|
||
name='full scan',
|
||
configuration=default_config,
|
||
)
|
||
self.stdout.write(self.style.SUCCESS(f'✓ 扫描引擎 full scan 已创建 (ID: {engine.id})'))
|
||
|
||
# 2) 为每个扫描类型生成一个「单一扫描类型」的子引擎
|
||
# 例如:subdomain_discovery, port_scan, ...
|
||
from apps.scan.configs.command_templates import get_supported_scan_types
|
||
|
||
supported_scan_types = set(get_supported_scan_types())
|
||
|
||
for scan_type, scan_cfg in config_dict.items():
|
||
# 只处理受支持且结构为 {tools: {...}} 的扫描类型
|
||
if scan_type not in supported_scan_types:
|
||
continue
|
||
if not isinstance(scan_cfg, dict):
|
||
continue
|
||
# subdomain_discovery 使用 4 阶段新结构(无 tools 字段),其他扫描类型仍要求有 tools
|
||
if scan_type != 'subdomain_discovery' and 'tools' not in scan_cfg:
|
||
continue
|
||
|
||
# 构造只包含当前扫描类型配置的 YAML(保留注释)
|
||
single_config = {scan_type: scan_cfg}
|
||
try:
|
||
stream = StringIO()
|
||
yaml_parser.dump(single_config, stream)
|
||
single_yaml = stream.getvalue()
|
||
except Exception as e:
|
||
self.stdout.write(self.style.ERROR(f'生成子引擎 {scan_type} 配置失败: {e}'))
|
||
continue
|
||
|
||
engine_name = f"{scan_type}"
|
||
sub_engine = ScanEngine.objects.filter(name=engine_name).first()
|
||
if sub_engine:
|
||
if force:
|
||
sub_engine.configuration = single_yaml
|
||
sub_engine.save()
|
||
self.stdout.write(self.style.SUCCESS(f' ✓ 子引擎 {engine_name} 配置已更新 (ID: {sub_engine.id})'))
|
||
else:
|
||
self.stdout.write(self.style.WARNING(f' ⊘ {engine_name} 已存在,跳过(使用 --force 覆盖)'))
|
||
else:
|
||
sub_engine = ScanEngine.objects.create(
|
||
name=engine_name,
|
||
configuration=single_yaml,
|
||
)
|
||
self.stdout.write(self.style.SUCCESS(f' ✓ 子引擎 {engine_name} 已创建 (ID: {sub_engine.id})'))
|