diff --git a/backend/apps/scan/services/__init__.py b/backend/apps/scan/services/__init__.py index 091e8fab..66b8ea81 100644 --- a/backend/apps/scan/services/__init__.py +++ b/backend/apps/scan/services/__init__.py @@ -17,7 +17,12 @@ from .scan_state_service import ScanStateService from .scan_control_service import ScanControlService from .scan_stats_service import ScanStatsService from .scheduled_scan_service import ScheduledScanService -from .target_export_service import TargetExportService +from .target_export_service import ( + TargetExportService, + create_export_service, + export_urls_with_fallback, + DataSource, +) __all__ = [ 'ScanService', # 主入口(向后兼容) @@ -27,5 +32,8 @@ __all__ = [ 'ScanStatsService', 'ScheduledScanService', 'TargetExportService', # 目标导出服务 + 'create_export_service', + 'export_urls_with_fallback', + 'DataSource', ] diff --git a/backend/apps/scan/services/target_export_service.py b/backend/apps/scan/services/target_export_service.py index 03b244fc..9d4b015b 100644 --- a/backend/apps/scan/services/target_export_service.py +++ b/backend/apps/scan/services/target_export_service.py @@ -2,7 +2,9 @@ 目标导出服务 提供统一的目标提取和文件导出功能,支持: -- URL 导出(流式写入 + 默认值回退) +- URL 导出(纯导出,不做隐式回退) +- 默认 URL 生成(独立方法) +- 带回退链的 URL 导出(用例层编排) - 域名/IP 导出(用于端口扫描) - 黑名单过滤集成 """ @@ -10,7 +12,7 @@ import ipaddress import logging from pathlib import Path -from typing import Dict, Any, Optional, List +from typing import Dict, Any, Optional, List, Callable from django.db.models import QuerySet @@ -19,6 +21,14 @@ from apps.common.utils import BlacklistFilter logger = logging.getLogger(__name__) +class DataSource: + """数据源类型常量""" + ENDPOINT = "endpoint" + WEBSITE = "website" + HOST_PORT = "host_port" + DEFAULT = "default" + + def create_export_service(target_id: int) -> 'TargetExportService': """ 工厂函数:创建带黑名单过滤的导出服务 @@ -36,21 +46,129 @@ def create_export_service(target_id: int) -> 'TargetExportService': return TargetExportService(blacklist_filter=blacklist_filter) +def export_urls_with_fallback( + target_id: int, + output_file: str, + sources: List[str], + batch_size: int = 1000 +) -> Dict[str, Any]: + """ + 带回退链的 URL 导出用例函数 + + 按 sources 顺序尝试每个数据源,直到有数据返回。 + + 回退逻辑: + 1. 遍历 sources 列表 + 2. 对每个 source 构建 queryset 并调用 export_urls() + 3. 如果 total_count > 0,返回 + 4. 如果 queryset_count > 0 但 total_count == 0(全被黑名单过滤),不回退 + 5. 如果 source == "default",调用 generate_default_urls() + + Args: + target_id: 目标 ID + output_file: 输出文件路径 + sources: 数据源优先级列表,如 ["endpoint", "website", "default"] + batch_size: 批次大小 + + Returns: + dict: { + 'success': bool, + 'output_file': str, + 'total_count': int, + 'source': str, # 实际使用的数据源 + 'tried_sources': List[str], # 尝试过的数据源 + } + """ + from apps.asset.models import Endpoint, WebSite + + export_service = create_export_service(target_id) + tried_sources = [] + + for source in sources: + tried_sources.append(source) + + if source == DataSource.DEFAULT: + # 默认 URL 生成 + result = export_service.generate_default_urls(target_id, output_file) + return { + 'success': result['success'], + 'output_file': result['output_file'], + 'total_count': result['total_count'], + 'source': DataSource.DEFAULT, + 'tried_sources': tried_sources, + } + + # 构建对应数据源的 queryset + if source == DataSource.ENDPOINT: + queryset = Endpoint.objects.filter(target_id=target_id).values_list('url', flat=True) + elif source == DataSource.WEBSITE: + queryset = WebSite.objects.filter(target_id=target_id).values_list('url', flat=True) + else: + logger.warning("未知的数据源类型: %s,跳过", source) + continue + + result = export_service.export_urls( + target_id=target_id, + output_path=output_file, + queryset=queryset, + batch_size=batch_size + ) + + # 有数据写入,返回 + if result['total_count'] > 0: + logger.info("从 %s 导出 %d 条 URL", source, result['total_count']) + return { + 'success': result['success'], + 'output_file': result['output_file'], + 'total_count': result['total_count'], + 'source': source, + 'tried_sources': tried_sources, + } + + # 数据存在但全被黑名单过滤,不回退 + if result['queryset_count'] > 0: + logger.info( + "%s 有 %d 条数据,但全被黑名单过滤(filtered=%d),不回退", + source, result['queryset_count'], result['filtered_count'] + ) + return { + 'success': result['success'], + 'output_file': result['output_file'], + 'total_count': 0, + 'source': source, + 'tried_sources': tried_sources, + } + + # 数据源为空,继续尝试下一个 + logger.info("%s 为空,尝试下一个数据源", source) + + # 所有数据源都为空 + logger.warning("所有数据源都为空,无法导出 URL") + return { + 'success': True, + 'output_file': output_file, + 'total_count': 0, + 'source': 'none', + 'tried_sources': tried_sources, + } + + class TargetExportService: """ 目标导出服务 - 提供统一的目标提取和文件导出功能 使用方式: - from apps.common.services import BlacklistService - from apps.common.utils import BlacklistFilter + # 方式 1:使用用例函数(推荐) + from apps.scan.services.target_export_service import export_urls_with_fallback, DataSource - # 获取规则并创建过滤器 - blacklist_service = BlacklistService() - rules = blacklist_service.get_rules(target_id) - blacklist_filter = BlacklistFilter(rules) + result = export_urls_with_fallback( + target_id=1, + output_file='/path/to/output.txt', + sources=[DataSource.ENDPOINT, DataSource.WEBSITE, DataSource.DEFAULT] + ) - # 使用导出服务 - export_service = TargetExportService(blacklist_filter=blacklist_filter) + # 方式 2:直接使用 Service(纯导出,不带回退) + export_service = create_export_service(target_id) result = export_service.export_urls(target_id, output_path, queryset) """ @@ -72,16 +190,14 @@ class TargetExportService: batch_size: int = 1000 ) -> Dict[str, Any]: """ - 统一 URL 导出函数 + 纯 URL 导出函数 - 只负责将 queryset 数据写入文件 - 自动判断数据库有无数据: - - 有数据:流式写入数据库数据到文件 - - 无数据:调用默认值生成器生成 URL + 不做任何隐式回退或默认 URL 生成。 Args: target_id: 目标 ID output_path: 输出文件路径 - queryset: 数据源 queryset(由 Task 层构建,应为 values_list flat=True) + queryset: 数据源 queryset(由调用方构建,应为 values_list flat=True) url_field: URL 字段名(用于黑名单过滤) batch_size: 批次大小 @@ -89,7 +205,9 @@ class TargetExportService: dict: { 'success': bool, 'output_file': str, - 'total_count': int + 'total_count': int, # 实际写入数量 + 'queryset_count': int, # 原始数据数量(迭代计数) + 'filtered_count': int, # 被黑名单过滤的数量 } Raises: @@ -102,9 +220,12 @@ class TargetExportService: total_count = 0 filtered_count = 0 + queryset_count = 0 + try: with open(output_file, 'w', encoding='utf-8', buffering=8192) as f: for url in queryset.iterator(chunk_size=batch_size): + queryset_count += 1 if url: # 黑名单过滤 if self.blacklist_filter and not self.blacklist_filter.is_allowed(url): @@ -122,25 +243,26 @@ class TargetExportService: if filtered_count > 0: logger.info("黑名单过滤: 过滤 %d 个 URL", filtered_count) - # 默认值回退模式 - if total_count == 0: - total_count = self._generate_default_urls(target_id, output_file) - - logger.info("✓ URL 导出完成 - 数量: %d, 文件: %s", total_count, output_path) + logger.info( + "✓ URL 导出完成 - 写入: %d, 原始: %d, 过滤: %d, 文件: %s", + total_count, queryset_count, filtered_count, output_path + ) return { 'success': True, 'output_file': str(output_file), - 'total_count': total_count + 'total_count': total_count, + 'queryset_count': queryset_count, + 'filtered_count': filtered_count, } - def _generate_default_urls( + def generate_default_urls( self, target_id: int, - output_path: Path - ) -> int: + output_path: str + ) -> Dict[str, Any]: """ - 默认值生成器(内部函数) + 默认 URL 生成器 根据 Target 类型生成默认 URL: - DOMAIN: http(s)://domain @@ -153,26 +275,37 @@ class TargetExportService: output_path: 输出文件路径 Returns: - int: 写入的 URL 总数 + dict: { + 'success': bool, + 'output_file': str, + 'total_count': int, + } """ from apps.targets.services import TargetService from apps.targets.models import Target + output_file = Path(output_path) + output_file.parent.mkdir(parents=True, exist_ok=True) + target_service = TargetService() target = target_service.get_target(target_id) if not target: logger.warning("Target ID %d 不存在,无法生成默认 URL", target_id) - return 0 + return { + 'success': True, + 'output_file': str(output_file), + 'total_count': 0, + } target_name = target.name target_type = target.type - logger.info("懒加载模式:Target 类型=%s, 名称=%s", target_type, target_name) + logger.info("生成默认 URL:Target 类型=%s, 名称=%s", target_type, target_name) total_urls = 0 - with open(output_path, 'w', encoding='utf-8', buffering=8192) as f: + with open(output_file, 'w', encoding='utf-8', buffering=8192) as f: if target_type == Target.TargetType.DOMAIN: urls = [f"http://{target_name}", f"https://{target_name}"] for url in urls: @@ -221,8 +354,13 @@ class TargetExportService: else: logger.warning("不支持的 Target 类型: %s", target_type) - logger.info("✓ 懒加载生成默认 URL - 数量: %d", total_urls) - return total_urls + logger.info("✓ 默认 URL 生成完成 - 数量: %d", total_urls) + + return { + 'success': True, + 'output_file': str(output_file), + 'total_count': total_urls, + } def _should_write_url(self, url: str) -> bool: """检查 URL 是否应该写入(通过黑名单过滤)""" diff --git a/backend/apps/scan/tasks/directory_scan/export_sites_task.py b/backend/apps/scan/tasks/directory_scan/export_sites_task.py index 088274e6..9a350cd9 100644 --- a/backend/apps/scan/tasks/directory_scan/export_sites_task.py +++ b/backend/apps/scan/tasks/directory_scan/export_sites_task.py @@ -1,15 +1,16 @@ """ 导出站点 URL 到 TXT 文件的 Task -使用 TargetExportService 统一处理导出逻辑和默认值回退 -数据源: WebSite.url +使用 export_urls_with_fallback 用例函数处理回退链逻辑 +数据源: WebSite.url → Default """ import logging from prefect import task -from apps.asset.models import WebSite -from apps.scan.services import TargetExportService -from apps.scan.services.target_export_service import create_export_service +from apps.scan.services.target_export_service import ( + export_urls_with_fallback, + DataSource, +) logger = logging.getLogger(__name__) @@ -23,13 +24,9 @@ def export_sites_task( """ 导出目标下的所有站点 URL 到 TXT 文件 - 数据源: WebSite.url - - 懒加载模式: - - 如果数据库为空,根据 Target 类型生成默认 URL - - DOMAIN: http(s)://domain - - IP: http(s)://ip - - CIDR: 展开为所有 IP 的 URL + 数据源优先级(回退链): + 1. WebSite 表 - 站点级别 URL + 2. 默认生成 - 根据 Target 类型生成 http(s)://target_name Args: target_id: 目标 ID @@ -47,25 +44,21 @@ def export_sites_task( ValueError: 参数错误 IOError: 文件写入失败 """ - # 构建数据源 queryset(Task 层决定数据源) - queryset = WebSite.objects.filter(target_id=target_id).values_list('url', flat=True) - - # 使用工厂函数创建导出服务 - export_service = create_export_service(target_id) - - result = export_service.export_urls( + result = export_urls_with_fallback( target_id=target_id, - output_path=output_file, - queryset=queryset, - batch_size=batch_size + output_file=output_file, + sources=[DataSource.WEBSITE, DataSource.DEFAULT], + batch_size=batch_size, + ) + + logger.info( + "站点 URL 导出完成 - source=%s, count=%d", + result['source'], result['total_count'] ) # 保持返回值格式不变(向后兼容) return { 'success': result['success'], 'output_file': result['output_file'], - 'total_count': result['total_count'] + 'total_count': result['total_count'], } - - - diff --git a/backend/apps/scan/tasks/fingerprint_detect/export_urls_task.py b/backend/apps/scan/tasks/fingerprint_detect/export_urls_task.py index 69de97ca..0451343c 100644 --- a/backend/apps/scan/tasks/fingerprint_detect/export_urls_task.py +++ b/backend/apps/scan/tasks/fingerprint_detect/export_urls_task.py @@ -2,15 +2,17 @@ 导出 URL 任务 用于指纹识别前导出目标下的 URL 到文件 -使用 TargetExportService 统一处理导出逻辑和默认值回退 +使用 export_urls_with_fallback 用例函数处理回退链逻辑 """ import logging from prefect import task -from apps.asset.models import WebSite -from apps.scan.services.target_export_service import create_export_service +from apps.scan.services.target_export_service import ( + export_urls_with_fallback, + DataSource, +) logger = logging.getLogger(__name__) @@ -19,46 +21,40 @@ logger = logging.getLogger(__name__) def export_urls_for_fingerprint_task( target_id: int, output_file: str, - source: str = 'website', + source: str = 'website', # 保留参数,兼容旧调用(实际值由回退链决定) batch_size: int = 1000 ) -> dict: """ 导出目标下的 URL 到文件(用于指纹识别) - 数据源: WebSite.url - - 懒加载模式: - - 如果数据库为空,根据 Target 类型生成默认 URL - - DOMAIN: http(s)://domain - - IP: http(s)://ip - - CIDR: 展开为所有 IP 的 URL - - URL: 直接使用目标 URL + 数据源优先级(回退链): + 1. WebSite 表 - 站点级别 URL + 2. 默认生成 - 根据 Target 类型生成 http(s)://target_name Args: target_id: 目标 ID output_file: 输出文件路径 - source: 数据源类型(保留参数,兼容旧调用) + source: 数据源类型(保留参数,兼容旧调用,实际值由回退链决定) batch_size: 批量读取大小 Returns: dict: {'output_file': str, 'total_count': int, 'source': str} """ - # 构建数据源 queryset(Task 层决定数据源) - queryset = WebSite.objects.filter(target_id=target_id).values_list('url', flat=True) - - # 使用工厂函数创建导出服务 - export_service = create_export_service(target_id) - - result = export_service.export_urls( + result = export_urls_with_fallback( target_id=target_id, - output_path=output_file, - queryset=queryset, - batch_size=batch_size + output_file=output_file, + sources=[DataSource.WEBSITE, DataSource.DEFAULT], + batch_size=batch_size, ) - # 保持返回值格式不变(向后兼容) + logger.info( + "指纹识别 URL 导出完成 - source=%s, count=%d", + result['source'], result['total_count'] + ) + + # 返回实际使用的数据源(不再固定为 "website") return { 'output_file': result['output_file'], 'total_count': result['total_count'], - 'source': source + 'source': result['source'], } diff --git a/backend/apps/scan/tasks/site_scan/export_site_urls_task.py b/backend/apps/scan/tasks/site_scan/export_site_urls_task.py index d885d369..72c7f741 100644 --- a/backend/apps/scan/tasks/site_scan/export_site_urls_task.py +++ b/backend/apps/scan/tasks/site_scan/export_site_urls_task.py @@ -2,7 +2,7 @@ 导出站点URL到文件的Task 直接使用 HostPortMapping 表查询 host+port 组合,拼接成URL格式写入文件 -使用 TargetExportService 处理默认值回退逻辑 +使用 TargetExportService.generate_default_urls() 处理默认值回退逻辑 特殊逻辑: - 80 端口:只生成 HTTP URL(省略端口号) @@ -46,18 +46,15 @@ def export_site_urls_task( """ 导出目标下的所有站点URL到文件(基于 HostPortMapping 表) - 数据源: HostPortMapping (host + port) + 数据源: HostPortMapping (host + port) → Default 特殊逻辑: - 80 端口:只生成 HTTP URL(省略端口号) - 443 端口:只生成 HTTPS URL(省略端口号) - 其他端口:生成 HTTP 和 HTTPS 两个URL(带端口号) - 懒加载模式: - - 如果数据库为空,根据 Target 类型生成默认 URL - - DOMAIN: http(s)://domain - - IP: http(s)://ip - - CIDR: 展开为所有 IP 的 URL + 回退逻辑: + - 如果 HostPortMapping 为空,使用 generate_default_urls() 生成默认 URL Args: target_id: 目标ID @@ -69,7 +66,8 @@ def export_site_urls_task( 'success': bool, 'output_file': str, 'total_urls': int, - 'association_count': int # 主机端口关联数量 + 'association_count': int, # 主机端口关联数量 + 'source': str, # 数据来源: "host_port" | "default" } Raises: @@ -94,6 +92,7 @@ def export_site_urls_task( total_urls = 0 association_count = 0 + filtered_count = 0 # 流式写入文件(特殊端口逻辑) with open(output_path, 'w', encoding='utf-8', buffering=8192) as f: @@ -104,6 +103,7 @@ def export_site_urls_task( # 先校验 host,通过了再生成 URL if not blacklist_filter.is_allowed(host): + filtered_count += 1 continue # 根据端口号生成URL @@ -114,19 +114,40 @@ def export_site_urls_task( if association_count % 1000 == 0: logger.info("已处理 %d 条关联,生成 %d 个URL...", association_count, total_urls) + if filtered_count > 0: + logger.info("黑名单过滤: 过滤 %d 条关联", filtered_count) + logger.info( "✓ 站点URL导出完成 - 关联数: %d, 总URL数: %d, 文件: %s", association_count, total_urls, str(output_path) ) - # 默认值回退模式:使用工厂函数创建导出服务 + # 判断数据来源 + source = "host_port" + + # 数据存在但全被过滤,不回退 + if association_count > 0 and total_urls == 0: + logger.info("HostPortMapping 有 %d 条数据,但全被黑名单过滤,不回退", association_count) + return { + 'success': True, + 'output_file': str(output_path), + 'total_urls': 0, + 'association_count': association_count, + 'source': source, + } + + # 数据源为空,回退到默认 URL 生成 if total_urls == 0: + logger.info("HostPortMapping 为空,使用默认 URL 生成") export_service = create_export_service(target_id) - total_urls = export_service._generate_default_urls(target_id, output_path) + result = export_service.generate_default_urls(target_id, str(output_path)) + total_urls = result['total_count'] + source = "default" return { 'success': True, 'output_file': str(output_path), 'total_urls': total_urls, - 'association_count': association_count + 'association_count': association_count, + 'source': source, } diff --git a/backend/apps/scan/tasks/url_fetch/export_sites_task.py b/backend/apps/scan/tasks/url_fetch/export_sites_task.py index 0c2e6654..2e932cdb 100644 --- a/backend/apps/scan/tasks/url_fetch/export_sites_task.py +++ b/backend/apps/scan/tasks/url_fetch/export_sites_task.py @@ -1,16 +1,17 @@ """ 导出站点 URL 列表任务 -使用 TargetExportService 统一处理导出逻辑和默认值回退 -数据源: WebSite.url(用于 katana 等爬虫工具) +使用 export_urls_with_fallback 用例函数处理回退链逻辑 +数据源: WebSite.url → Default(用于 katana 等爬虫工具) """ import logging from prefect import task -from typing import Optional -from apps.asset.models import WebSite -from apps.scan.services.target_export_service import create_export_service +from apps.scan.services.target_export_service import ( + export_urls_with_fallback, + DataSource, +) logger = logging.getLogger(__name__) @@ -29,13 +30,9 @@ def export_sites_task( """ 导出站点 URL 列表到文件(用于 katana 等爬虫工具) - 数据源: WebSite.url - - 懒加载模式: - - 如果数据库为空,根据 Target 类型生成默认 URL - - DOMAIN: http(s)://domain - - IP: http(s)://ip - - CIDR: 展开为所有 IP 的 URL + 数据源优先级(回退链): + 1. WebSite 表 - 站点级别 URL + 2. 默认生成 - 根据 Target 类型生成 http(s)://target_name Args: output_file: 输出文件路径 @@ -53,17 +50,16 @@ def export_sites_task( ValueError: 参数错误 RuntimeError: 执行失败 """ - # 构建数据源 queryset(Task 层决定数据源) - queryset = WebSite.objects.filter(target_id=target_id).values_list('url', flat=True) - - # 使用工厂函数创建导出服务 - export_service = create_export_service(target_id) - - result = export_service.export_urls( + result = export_urls_with_fallback( target_id=target_id, - output_path=output_file, - queryset=queryset, - batch_size=batch_size + output_file=output_file, + sources=[DataSource.WEBSITE, DataSource.DEFAULT], + batch_size=batch_size, + ) + + logger.info( + "站点 URL 导出完成 - source=%s, count=%d", + result['source'], result['total_count'] ) # 保持返回值格式不变(向后兼容) diff --git a/backend/apps/scan/tasks/vuln_scan/export_endpoints_task.py b/backend/apps/scan/tasks/vuln_scan/export_endpoints_task.py index 1c4d2804..5f5ab9fe 100644 --- a/backend/apps/scan/tasks/vuln_scan/export_endpoints_task.py +++ b/backend/apps/scan/tasks/vuln_scan/export_endpoints_task.py @@ -1,6 +1,6 @@ """导出 Endpoint URL 到文件的 Task -使用 TargetExportService 统一处理导出逻辑和默认值回退 +使用 export_urls_with_fallback 用例函数处理回退链逻辑 数据源优先级(回退链): 1. Endpoint.url - 最精细的 URL(含路径、参数等) @@ -9,13 +9,14 @@ """ import logging -from pathlib import Path from typing import Dict from prefect import task -from apps.asset.models import Endpoint, WebSite -from apps.scan.services.target_export_service import create_export_service +from apps.scan.services.target_export_service import ( + export_urls_with_fallback, + DataSource, +) logger = logging.getLogger(__name__) @@ -43,55 +44,24 @@ def export_endpoints_task( "success": bool, "output_file": str, "total_count": int, - "source": str, # 数据来源: "endpoint" | "website" | "default" + "source": str, # 数据来源: "endpoint" | "website" | "default" | "none" } """ - export_service = create_export_service(target_id) - output_path = Path(output_file) - output_path.parent.mkdir(parents=True, exist_ok=True) - - # 1. 优先从 Endpoint 表导出 - endpoint_queryset = Endpoint.objects.filter(target_id=target_id).values_list('url', flat=True) - result = export_service.export_urls( + result = export_urls_with_fallback( target_id=target_id, - output_path=output_file, - queryset=endpoint_queryset, - batch_size=batch_size + output_file=output_file, + sources=[DataSource.ENDPOINT, DataSource.WEBSITE, DataSource.DEFAULT], + batch_size=batch_size, ) - if result['total_count'] > 0: - logger.info("从 Endpoint 表导出 %d 条 URL", result['total_count']) - return { - "success": True, - "output_file": result['output_file'], - "total_count": result['total_count'], - "source": "endpoint", - } - - # 2. Endpoint 为空,回退到 WebSite 表 - logger.info("Endpoint 表为空,回退到 WebSite 表") - website_queryset = WebSite.objects.filter(target_id=target_id).values_list('url', flat=True) - result = export_service.export_urls( - target_id=target_id, - output_path=output_file, - queryset=website_queryset, - batch_size=batch_size + logger.info( + "URL 导出完成 - source=%s, count=%d, tried=%s", + result['source'], result['total_count'], result['tried_sources'] ) - if result['total_count'] > 0: - logger.info("从 WebSite 表导出 %d 条 URL", result['total_count']) - return { - "success": True, - "output_file": result['output_file'], - "total_count": result['total_count'], - "source": "website", - } - - # 3. WebSite 也为空,生成默认 URL(export_urls 内部已处理) - logger.info("WebSite 表也为空,使用默认 URL 生成") return { - "success": True, + "success": result['success'], "output_file": result['output_file'], "total_count": result['total_count'], - "source": "default", + "source": result['source'], }