重构:回退逻辑

This commit is contained in:
yyhuni
2026-01-07 08:45:27 +08:00
parent abf2d95f6f
commit c521bdb511
7 changed files with 284 additions and 162 deletions

View File

@@ -17,7 +17,12 @@ from .scan_state_service import ScanStateService
from .scan_control_service import ScanControlService
from .scan_stats_service import ScanStatsService
from .scheduled_scan_service import ScheduledScanService
from .target_export_service import TargetExportService
from .target_export_service import (
TargetExportService,
create_export_service,
export_urls_with_fallback,
DataSource,
)
__all__ = [
'ScanService', # 主入口(向后兼容)
@@ -27,5 +32,8 @@ __all__ = [
'ScanStatsService',
'ScheduledScanService',
'TargetExportService', # 目标导出服务
'create_export_service',
'export_urls_with_fallback',
'DataSource',
]

View File

@@ -2,7 +2,9 @@
目标导出服务
提供统一的目标提取和文件导出功能,支持:
- URL 导出(流式写入 + 默认值回退)
- URL 导出(纯导出,不做隐式回退)
- 默认 URL 生成(独立方法)
- 带回退链的 URL 导出(用例层编排)
- 域名/IP 导出(用于端口扫描)
- 黑名单过滤集成
"""
@@ -10,7 +12,7 @@
import ipaddress
import logging
from pathlib import Path
from typing import Dict, Any, Optional, List
from typing import Dict, Any, Optional, List, Callable
from django.db.models import QuerySet
@@ -19,6 +21,14 @@ from apps.common.utils import BlacklistFilter
logger = logging.getLogger(__name__)
class DataSource:
"""数据源类型常量"""
ENDPOINT = "endpoint"
WEBSITE = "website"
HOST_PORT = "host_port"
DEFAULT = "default"
def create_export_service(target_id: int) -> 'TargetExportService':
"""
工厂函数:创建带黑名单过滤的导出服务
@@ -36,21 +46,129 @@ def create_export_service(target_id: int) -> 'TargetExportService':
return TargetExportService(blacklist_filter=blacklist_filter)
def export_urls_with_fallback(
target_id: int,
output_file: str,
sources: List[str],
batch_size: int = 1000
) -> Dict[str, Any]:
"""
带回退链的 URL 导出用例函数
按 sources 顺序尝试每个数据源,直到有数据返回。
回退逻辑:
1. 遍历 sources 列表
2. 对每个 source 构建 queryset 并调用 export_urls()
3. 如果 total_count > 0返回
4. 如果 queryset_count > 0 但 total_count == 0全被黑名单过滤不回退
5. 如果 source == "default",调用 generate_default_urls()
Args:
target_id: 目标 ID
output_file: 输出文件路径
sources: 数据源优先级列表,如 ["endpoint", "website", "default"]
batch_size: 批次大小
Returns:
dict: {
'success': bool,
'output_file': str,
'total_count': int,
'source': str, # 实际使用的数据源
'tried_sources': List[str], # 尝试过的数据源
}
"""
from apps.asset.models import Endpoint, WebSite
export_service = create_export_service(target_id)
tried_sources = []
for source in sources:
tried_sources.append(source)
if source == DataSource.DEFAULT:
# 默认 URL 生成
result = export_service.generate_default_urls(target_id, output_file)
return {
'success': result['success'],
'output_file': result['output_file'],
'total_count': result['total_count'],
'source': DataSource.DEFAULT,
'tried_sources': tried_sources,
}
# 构建对应数据源的 queryset
if source == DataSource.ENDPOINT:
queryset = Endpoint.objects.filter(target_id=target_id).values_list('url', flat=True)
elif source == DataSource.WEBSITE:
queryset = WebSite.objects.filter(target_id=target_id).values_list('url', flat=True)
else:
logger.warning("未知的数据源类型: %s,跳过", source)
continue
result = export_service.export_urls(
target_id=target_id,
output_path=output_file,
queryset=queryset,
batch_size=batch_size
)
# 有数据写入,返回
if result['total_count'] > 0:
logger.info("%s 导出 %d 条 URL", source, result['total_count'])
return {
'success': result['success'],
'output_file': result['output_file'],
'total_count': result['total_count'],
'source': source,
'tried_sources': tried_sources,
}
# 数据存在但全被黑名单过滤,不回退
if result['queryset_count'] > 0:
logger.info(
"%s%d 条数据但全被黑名单过滤filtered=%d),不回退",
source, result['queryset_count'], result['filtered_count']
)
return {
'success': result['success'],
'output_file': result['output_file'],
'total_count': 0,
'source': source,
'tried_sources': tried_sources,
}
# 数据源为空,继续尝试下一个
logger.info("%s 为空,尝试下一个数据源", source)
# 所有数据源都为空
logger.warning("所有数据源都为空,无法导出 URL")
return {
'success': True,
'output_file': output_file,
'total_count': 0,
'source': 'none',
'tried_sources': tried_sources,
}
class TargetExportService:
"""
目标导出服务 - 提供统一的目标提取和文件导出功能
使用方式:
from apps.common.services import BlacklistService
from apps.common.utils import BlacklistFilter
# 方式 1使用用例函数推荐
from apps.scan.services.target_export_service import export_urls_with_fallback, DataSource
# 获取规则并创建过滤器
blacklist_service = BlacklistService()
rules = blacklist_service.get_rules(target_id)
blacklist_filter = BlacklistFilter(rules)
result = export_urls_with_fallback(
target_id=1,
output_file='/path/to/output.txt',
sources=[DataSource.ENDPOINT, DataSource.WEBSITE, DataSource.DEFAULT]
)
# 使用导出服务
export_service = TargetExportService(blacklist_filter=blacklist_filter)
# 方式 2直接使用 Service纯导出不带回退
export_service = create_export_service(target_id)
result = export_service.export_urls(target_id, output_path, queryset)
"""
@@ -72,16 +190,14 @@ class TargetExportService:
batch_size: int = 1000
) -> Dict[str, Any]:
"""
统一 URL 导出函数
URL 导出函数 - 只负责将 queryset 数据写入文件
自动判断数据库有无数据:
- 有数据:流式写入数据库数据到文件
- 无数据:调用默认值生成器生成 URL
不做任何隐式回退或默认 URL 生成。
Args:
target_id: 目标 ID
output_path: 输出文件路径
queryset: 数据源 queryset Task 层构建,应为 values_list flat=True
queryset: 数据源 queryset调用方构建,应为 values_list flat=True
url_field: URL 字段名(用于黑名单过滤)
batch_size: 批次大小
@@ -89,7 +205,9 @@ class TargetExportService:
dict: {
'success': bool,
'output_file': str,
'total_count': int
'total_count': int, # 实际写入数量
'queryset_count': int, # 原始数据数量(迭代计数)
'filtered_count': int, # 被黑名单过滤的数量
}
Raises:
@@ -102,9 +220,12 @@ class TargetExportService:
total_count = 0
filtered_count = 0
queryset_count = 0
try:
with open(output_file, 'w', encoding='utf-8', buffering=8192) as f:
for url in queryset.iterator(chunk_size=batch_size):
queryset_count += 1
if url:
# 黑名单过滤
if self.blacklist_filter and not self.blacklist_filter.is_allowed(url):
@@ -122,25 +243,26 @@ class TargetExportService:
if filtered_count > 0:
logger.info("黑名单过滤: 过滤 %d 个 URL", filtered_count)
# 默认值回退模式
if total_count == 0:
total_count = self._generate_default_urls(target_id, output_file)
logger.info("✓ URL 导出完成 - 数量: %d, 文件: %s", total_count, output_path)
logger.info(
"✓ URL 导出完成 - 写入: %d, 原始: %d, 过滤: %d, 文件: %s",
total_count, queryset_count, filtered_count, output_path
)
return {
'success': True,
'output_file': str(output_file),
'total_count': total_count
'total_count': total_count,
'queryset_count': queryset_count,
'filtered_count': filtered_count,
}
def _generate_default_urls(
def generate_default_urls(
self,
target_id: int,
output_path: Path
) -> int:
output_path: str
) -> Dict[str, Any]:
"""
默认值生成器(内部函数)
默认 URL 生成器
根据 Target 类型生成默认 URL
- DOMAIN: http(s)://domain
@@ -153,26 +275,37 @@ class TargetExportService:
output_path: 输出文件路径
Returns:
int: 写入的 URL 总数
dict: {
'success': bool,
'output_file': str,
'total_count': int,
}
"""
from apps.targets.services import TargetService
from apps.targets.models import Target
output_file = Path(output_path)
output_file.parent.mkdir(parents=True, exist_ok=True)
target_service = TargetService()
target = target_service.get_target(target_id)
if not target:
logger.warning("Target ID %d 不存在,无法生成默认 URL", target_id)
return 0
return {
'success': True,
'output_file': str(output_file),
'total_count': 0,
}
target_name = target.name
target_type = target.type
logger.info("懒加载模式Target 类型=%s, 名称=%s", target_type, target_name)
logger.info("生成默认 URLTarget 类型=%s, 名称=%s", target_type, target_name)
total_urls = 0
with open(output_path, 'w', encoding='utf-8', buffering=8192) as f:
with open(output_file, 'w', encoding='utf-8', buffering=8192) as f:
if target_type == Target.TargetType.DOMAIN:
urls = [f"http://{target_name}", f"https://{target_name}"]
for url in urls:
@@ -221,8 +354,13 @@ class TargetExportService:
else:
logger.warning("不支持的 Target 类型: %s", target_type)
logger.info("懒加载生成默认 URL - 数量: %d", total_urls)
return total_urls
logger.info("✓ 默认 URL 生成完成 - 数量: %d", total_urls)
return {
'success': True,
'output_file': str(output_file),
'total_count': total_urls,
}
def _should_write_url(self, url: str) -> bool:
"""检查 URL 是否应该写入(通过黑名单过滤)"""

View File

@@ -1,15 +1,16 @@
"""
导出站点 URL 到 TXT 文件的 Task
使用 TargetExportService 统一处理导出逻辑和默认值回退
数据源: WebSite.url
使用 export_urls_with_fallback 用例函数处理回退链逻辑
数据源: WebSite.url → Default
"""
import logging
from prefect import task
from apps.asset.models import WebSite
from apps.scan.services import TargetExportService
from apps.scan.services.target_export_service import create_export_service
from apps.scan.services.target_export_service import (
export_urls_with_fallback,
DataSource,
)
logger = logging.getLogger(__name__)
@@ -23,13 +24,9 @@ def export_sites_task(
"""
导出目标下的所有站点 URL 到 TXT 文件
数据源: WebSite.url
懒加载模式:
- 如果数据库为空,根据 Target 类型生成默认 URL
- DOMAIN: http(s)://domain
- IP: http(s)://ip
- CIDR: 展开为所有 IP 的 URL
数据源优先级(回退链):
1. WebSite 表 - 站点级别 URL
2. 默认生成 - 根据 Target 类型生成 http(s)://target_name
Args:
target_id: 目标 ID
@@ -47,25 +44,21 @@ def export_sites_task(
ValueError: 参数错误
IOError: 文件写入失败
"""
# 构建数据源 querysetTask 层决定数据源)
queryset = WebSite.objects.filter(target_id=target_id).values_list('url', flat=True)
# 使用工厂函数创建导出服务
export_service = create_export_service(target_id)
result = export_service.export_urls(
result = export_urls_with_fallback(
target_id=target_id,
output_path=output_file,
queryset=queryset,
batch_size=batch_size
output_file=output_file,
sources=[DataSource.WEBSITE, DataSource.DEFAULT],
batch_size=batch_size,
)
logger.info(
"站点 URL 导出完成 - source=%s, count=%d",
result['source'], result['total_count']
)
# 保持返回值格式不变(向后兼容)
return {
'success': result['success'],
'output_file': result['output_file'],
'total_count': result['total_count']
'total_count': result['total_count'],
}

View File

@@ -2,15 +2,17 @@
导出 URL 任务
用于指纹识别前导出目标下的 URL 到文件
使用 TargetExportService 统一处理导出逻辑和默认值回退
使用 export_urls_with_fallback 用例函数处理回退链逻辑
"""
import logging
from prefect import task
from apps.asset.models import WebSite
from apps.scan.services.target_export_service import create_export_service
from apps.scan.services.target_export_service import (
export_urls_with_fallback,
DataSource,
)
logger = logging.getLogger(__name__)
@@ -19,46 +21,40 @@ logger = logging.getLogger(__name__)
def export_urls_for_fingerprint_task(
target_id: int,
output_file: str,
source: str = 'website',
source: str = 'website', # 保留参数,兼容旧调用(实际值由回退链决定)
batch_size: int = 1000
) -> dict:
"""
导出目标下的 URL 到文件(用于指纹识别)
数据源: WebSite.url
懒加载模式:
- 如果数据库为空,根据 Target 类型生成默认 URL
- DOMAIN: http(s)://domain
- IP: http(s)://ip
- CIDR: 展开为所有 IP 的 URL
- URL: 直接使用目标 URL
数据源优先级(回退链):
1. WebSite 表 - 站点级别 URL
2. 默认生成 - 根据 Target 类型生成 http(s)://target_name
Args:
target_id: 目标 ID
output_file: 输出文件路径
source: 数据源类型(保留参数,兼容旧调用)
source: 数据源类型(保留参数,兼容旧调用,实际值由回退链决定
batch_size: 批量读取大小
Returns:
dict: {'output_file': str, 'total_count': int, 'source': str}
"""
# 构建数据源 querysetTask 层决定数据源)
queryset = WebSite.objects.filter(target_id=target_id).values_list('url', flat=True)
# 使用工厂函数创建导出服务
export_service = create_export_service(target_id)
result = export_service.export_urls(
result = export_urls_with_fallback(
target_id=target_id,
output_path=output_file,
queryset=queryset,
batch_size=batch_size
output_file=output_file,
sources=[DataSource.WEBSITE, DataSource.DEFAULT],
batch_size=batch_size,
)
# 保持返回值格式不变(向后兼容)
logger.info(
"指纹识别 URL 导出完成 - source=%s, count=%d",
result['source'], result['total_count']
)
# 返回实际使用的数据源(不再固定为 "website"
return {
'output_file': result['output_file'],
'total_count': result['total_count'],
'source': source
'source': result['source'],
}

View File

@@ -2,7 +2,7 @@
导出站点URL到文件的Task
直接使用 HostPortMapping 表查询 host+port 组合拼接成URL格式写入文件
使用 TargetExportService 处理默认值回退逻辑
使用 TargetExportService.generate_default_urls() 处理默认值回退逻辑
特殊逻辑:
- 80 端口:只生成 HTTP URL省略端口号
@@ -46,18 +46,15 @@ def export_site_urls_task(
"""
导出目标下的所有站点URL到文件基于 HostPortMapping 表)
数据源: HostPortMapping (host + port)
数据源: HostPortMapping (host + port) → Default
特殊逻辑:
- 80 端口:只生成 HTTP URL省略端口号
- 443 端口:只生成 HTTPS URL省略端口号
- 其他端口:生成 HTTP 和 HTTPS 两个URL带端口号
懒加载模式
- 如果数据库为空,根据 Target 类型生成默认 URL
- DOMAIN: http(s)://domain
- IP: http(s)://ip
- CIDR: 展开为所有 IP 的 URL
回退逻辑
- 如果 HostPortMapping 为空,使用 generate_default_urls() 生成默认 URL
Args:
target_id: 目标ID
@@ -69,7 +66,8 @@ def export_site_urls_task(
'success': bool,
'output_file': str,
'total_urls': int,
'association_count': int # 主机端口关联数量
'association_count': int, # 主机端口关联数量
'source': str, # 数据来源: "host_port" | "default"
}
Raises:
@@ -94,6 +92,7 @@ def export_site_urls_task(
total_urls = 0
association_count = 0
filtered_count = 0
# 流式写入文件(特殊端口逻辑)
with open(output_path, 'w', encoding='utf-8', buffering=8192) as f:
@@ -104,6 +103,7 @@ def export_site_urls_task(
# 先校验 host通过了再生成 URL
if not blacklist_filter.is_allowed(host):
filtered_count += 1
continue
# 根据端口号生成URL
@@ -114,19 +114,40 @@ def export_site_urls_task(
if association_count % 1000 == 0:
logger.info("已处理 %d 条关联,生成 %d 个URL...", association_count, total_urls)
if filtered_count > 0:
logger.info("黑名单过滤: 过滤 %d 条关联", filtered_count)
logger.info(
"✓ 站点URL导出完成 - 关联数: %d, 总URL数: %d, 文件: %s",
association_count, total_urls, str(output_path)
)
# 默认值回退模式:使用工厂函数创建导出服务
# 判断数据来源
source = "host_port"
# 数据存在但全被过滤,不回退
if association_count > 0 and total_urls == 0:
logger.info("HostPortMapping 有 %d 条数据,但全被黑名单过滤,不回退", association_count)
return {
'success': True,
'output_file': str(output_path),
'total_urls': 0,
'association_count': association_count,
'source': source,
}
# 数据源为空,回退到默认 URL 生成
if total_urls == 0:
logger.info("HostPortMapping 为空,使用默认 URL 生成")
export_service = create_export_service(target_id)
total_urls = export_service._generate_default_urls(target_id, output_path)
result = export_service.generate_default_urls(target_id, str(output_path))
total_urls = result['total_count']
source = "default"
return {
'success': True,
'output_file': str(output_path),
'total_urls': total_urls,
'association_count': association_count
'association_count': association_count,
'source': source,
}

View File

@@ -1,16 +1,17 @@
"""
导出站点 URL 列表任务
使用 TargetExportService 统一处理导出逻辑和默认值回退
数据源: WebSite.url用于 katana 等爬虫工具)
使用 export_urls_with_fallback 用例函数处理回退链逻辑
数据源: WebSite.url → Default(用于 katana 等爬虫工具)
"""
import logging
from prefect import task
from typing import Optional
from apps.asset.models import WebSite
from apps.scan.services.target_export_service import create_export_service
from apps.scan.services.target_export_service import (
export_urls_with_fallback,
DataSource,
)
logger = logging.getLogger(__name__)
@@ -29,13 +30,9 @@ def export_sites_task(
"""
导出站点 URL 列表到文件(用于 katana 等爬虫工具)
数据源: WebSite.url
懒加载模式:
- 如果数据库为空,根据 Target 类型生成默认 URL
- DOMAIN: http(s)://domain
- IP: http(s)://ip
- CIDR: 展开为所有 IP 的 URL
数据源优先级(回退链):
1. WebSite 表 - 站点级别 URL
2. 默认生成 - 根据 Target 类型生成 http(s)://target_name
Args:
output_file: 输出文件路径
@@ -53,17 +50,16 @@ def export_sites_task(
ValueError: 参数错误
RuntimeError: 执行失败
"""
# 构建数据源 querysetTask 层决定数据源)
queryset = WebSite.objects.filter(target_id=target_id).values_list('url', flat=True)
# 使用工厂函数创建导出服务
export_service = create_export_service(target_id)
result = export_service.export_urls(
result = export_urls_with_fallback(
target_id=target_id,
output_path=output_file,
queryset=queryset,
batch_size=batch_size
output_file=output_file,
sources=[DataSource.WEBSITE, DataSource.DEFAULT],
batch_size=batch_size,
)
logger.info(
"站点 URL 导出完成 - source=%s, count=%d",
result['source'], result['total_count']
)
# 保持返回值格式不变(向后兼容)

View File

@@ -1,6 +1,6 @@
"""导出 Endpoint URL 到文件的 Task
使用 TargetExportService 统一处理导出逻辑和默认值回退
使用 export_urls_with_fallback 用例函数处理回退链逻辑
数据源优先级(回退链):
1. Endpoint.url - 最精细的 URL含路径、参数等
@@ -9,13 +9,14 @@
"""
import logging
from pathlib import Path
from typing import Dict
from prefect import task
from apps.asset.models import Endpoint, WebSite
from apps.scan.services.target_export_service import create_export_service
from apps.scan.services.target_export_service import (
export_urls_with_fallback,
DataSource,
)
logger = logging.getLogger(__name__)
@@ -43,55 +44,24 @@ def export_endpoints_task(
"success": bool,
"output_file": str,
"total_count": int,
"source": str, # 数据来源: "endpoint" | "website" | "default"
"source": str, # 数据来源: "endpoint" | "website" | "default" | "none"
}
"""
export_service = create_export_service(target_id)
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
# 1. 优先从 Endpoint 表导出
endpoint_queryset = Endpoint.objects.filter(target_id=target_id).values_list('url', flat=True)
result = export_service.export_urls(
result = export_urls_with_fallback(
target_id=target_id,
output_path=output_file,
queryset=endpoint_queryset,
batch_size=batch_size
output_file=output_file,
sources=[DataSource.ENDPOINT, DataSource.WEBSITE, DataSource.DEFAULT],
batch_size=batch_size,
)
if result['total_count'] > 0:
logger.info("从 Endpoint 表导出 %d 条 URL", result['total_count'])
return {
"success": True,
"output_file": result['output_file'],
"total_count": result['total_count'],
"source": "endpoint",
}
# 2. Endpoint 为空,回退到 WebSite 表
logger.info("Endpoint 表为空,回退到 WebSite 表")
website_queryset = WebSite.objects.filter(target_id=target_id).values_list('url', flat=True)
result = export_service.export_urls(
target_id=target_id,
output_path=output_file,
queryset=website_queryset,
batch_size=batch_size
logger.info(
"URL 导出完成 - source=%s, count=%d, tried=%s",
result['source'], result['total_count'], result['tried_sources']
)
if result['total_count'] > 0:
logger.info("从 WebSite 表导出 %d 条 URL", result['total_count'])
return {
"success": True,
"output_file": result['output_file'],
"total_count": result['total_count'],
"source": "website",
}
# 3. WebSite 也为空,生成默认 URLexport_urls 内部已处理)
logger.info("WebSite 表也为空,使用默认 URL 生成")
return {
"success": True,
"success": result['success'],
"output_file": result['output_file'],
"total_count": result['total_count'],
"source": "default",
"source": result['source'],
}