Compare commits

..

4 Commits

Author SHA1 Message Date
yyhuni
d75a3f6882 fix(task_distributor): adjust high load wait parameters and improve timeout handling
- Increase high load wait interval from 60 to 120 seconds (2 minutes)
- Increase max retries from 10 to 60 to support up to 2 hours total wait time
- Improve timeout message to show actual wait duration in minutes
- Remove duplicate return statement in worker selection logic
- Update notification message to reflect new wait parameters (2 minutes check interval, 2 hours max wait)
- Clean up trailing whitespace in task_distributor.py
- Remove redundant error message from install.sh about missing/incorrect image versions
- Better handling of high load scenarios with clearer logging and user communication
2026-01-11 16:41:05 +08:00
github-actions[bot]
59e48e5b15 chore: bump version to v1.5.10-dev 2026-01-11 08:19:39 +00:00
yyhuni
2d2ec93626 perf(screenshot): optimize memory usage and add URL collection fallback logic
- Add iterator(chunk_size=50) to ScreenshotSnapshot query to prevent BinaryField data caching and reduce memory consumption
- Implement fallback logic in URL collection: WebSite → HostPortMapping → Default URL with priority handling
- Update _collect_urls_from_provider to return tuple with data source information for better logging and debugging
- Add detailed logging to track which data source was used during URL collection
- Improve code documentation with clear return type hints and fallback priority explanation
- Prevents memory spikes when processing large screenshot datasets with binary image data
2026-01-11 16:14:56 +08:00
github-actions[bot]
ced9f811f4 chore: bump version to v1.5.8-dev 2026-01-11 08:09:37 +00:00
6 changed files with 52 additions and 26 deletions

View File

@@ -1 +1 @@
v1.5.7
v1.5.10-dev

View File

@@ -146,7 +146,9 @@ class ScreenshotService:
"""
from apps.asset.models import Screenshot, ScreenshotSnapshot
snapshots = ScreenshotSnapshot.objects.filter(scan_id=scan_id)
# 使用 iterator() 避免 QuerySet 缓存大量 BinaryField 数据导致内存飙升
# chunk_size=50: 每次只加载 50 条记录,处理完后释放内存
snapshots = ScreenshotSnapshot.objects.filter(scan_id=scan_id).iterator(chunk_size=50)
count = 0
for snapshot in snapshots:

View File

@@ -156,10 +156,10 @@ class TaskDistributor:
# 降级策略:如果没有正常负载的,循环等待后重新检测
if not scored_workers:
if high_load_workers:
# 高负载等待参数(默认每 60 秒检测一次,最多 10 次
high_load_wait = getattr(settings, 'HIGH_LOAD_WAIT_SECONDS', 60)
high_load_max_retries = getattr(settings, 'HIGH_LOAD_MAX_RETRIES', 10)
# 高负载等待参数(每 2 分钟检测一次,最多等待 2 小时
high_load_wait = getattr(settings, 'HIGH_LOAD_WAIT_SECONDS', 120)
high_load_max_retries = getattr(settings, 'HIGH_LOAD_MAX_RETRIES', 60)
# 开始等待前发送高负载通知
high_load_workers.sort(key=lambda x: x[1])
_, _, first_cpu, first_mem = high_load_workers[0]
@@ -170,51 +170,51 @@ class TaskDistributor:
cpu=first_cpu,
mem=first_mem
)
for retry in range(high_load_max_retries):
logger.warning(
"所有 Worker 高负载,等待 %d 秒后重试... (%d/%d)",
high_load_wait, retry + 1, high_load_max_retries
)
time.sleep(high_load_wait)
# 重新获取负载数据
loads = worker_load_service.get_all_loads(worker_ids)
# 重新评估
scored_workers = []
high_load_workers = []
for worker in workers:
load = loads.get(worker.id)
if not load:
continue
cpu = load.get('cpu', 0)
mem = load.get('mem', 0)
score = cpu * 0.7 + mem * 0.3
if cpu > 85 or mem > 85:
high_load_workers.append((worker, score, cpu, mem))
else:
scored_workers.append((worker, score, cpu, mem))
# 如果有正常负载的 Worker跳出循环
if scored_workers:
logger.info("检测到正常负载 Worker结束等待")
break
# 超时或仍然高负载,选择负载最低的
# 超时后强制派发到负载最低的 Worker
if not scored_workers and high_load_workers:
high_load_workers.sort(key=lambda x: x[1])
best_worker, _, cpu, mem = high_load_workers[0]
logger.warning(
"等待超时,强制分发到高负载 Worker: %s (CPU: %.1f%%, MEM: %.1f%%)",
"等待 %d 分钟后仍高负载,强制分发到 Worker: %s (CPU: %.1f%%, MEM: %.1f%%)",
(high_load_wait * high_load_max_retries) // 60,
best_worker.name, cpu, mem
)
return best_worker
return best_worker
else:
logger.warning("没有可用的 Worker")
return None

View File

@@ -31,10 +31,35 @@ def _parse_screenshot_config(enabled_tools: dict) -> dict:
}
def _collect_urls_from_provider(provider: TargetProvider) -> list[str]:
"""从 Provider 收集网站 URL"""
def _collect_urls_from_provider(provider: TargetProvider) -> tuple[list[str], str]:
"""
从 Provider 收集网站 URL带回退逻辑
优先级WebSite → HostPortMapping → Default URL
Returns:
tuple: (urls, source)
- urls: URL 列表
- source: 数据来源 ('website' | 'host_port' | 'default')
"""
logger.info("从 Provider 获取网站 URL - Provider: %s", type(provider).__name__)
return list(provider.iter_websites())
# 优先从 WebSite 获取
urls = list(provider.iter_websites())
if urls:
logger.info("使用 WebSite 数据源 - 数量: %d", len(urls))
return urls, "website"
# 回退到 HostPortMapping
urls = list(provider.iter_host_port_urls())
if urls:
logger.info("WebSite 为空,回退到 HostPortMapping - 数量: %d", len(urls))
return urls, "host_port"
# 最终回退到默认 URL
urls = list(provider.iter_default_urls())
logger.info("HostPortMapping 为空,回退到默认 URL - 数量: %d", len(urls))
return urls, "default"
def _build_empty_result(scan_id: int, target_name: str) -> dict:
@@ -96,9 +121,9 @@ def screenshot_flow(
concurrency = config['concurrency']
logger.info("截图配置 - 并发: %d", concurrency)
# Step 2: 从 Provider 收集 URL 列表
urls = _collect_urls_from_provider(provider)
logger.info("URL 收集完成 - 数量: %d", len(urls))
# Step 2: 从 Provider 收集 URL 列表(带回退逻辑)
urls, source = _collect_urls_from_provider(provider)
logger.info("URL 收集完成 - 来源: %s, 数量: %d", source, len(urls))
if not urls:
logger.warning("没有可截图的 URL跳过截图任务")

View File

@@ -87,7 +87,7 @@ def on_all_workers_high_load(sender, worker_name, cpu, mem, **kwargs):
"""所有 Worker 高负载时的通知处理"""
create_notification(
title="系统负载较高",
message=f"所有节点负载较高(最低负载节点 CPU: {cpu:.1f}%, 内存: {mem:.1f}%),系统将等待最多 10 分钟后分发任务,扫描速度可能受影响",
message=f"所有节点负载较高(最低负载节点 CPU: {cpu:.1f}%, 内存: {mem:.1f}%),系统将每 2 分钟检测一次,最多等待 2 小时后分发任务",
level=NotificationLevel.MEDIUM,
category=NotificationCategory.SYSTEM
)

View File

@@ -703,7 +703,6 @@ else
warn "镜像加速已配置,但拉取仍然失败,可能原因:"
echo -e " 1. 镜像源暂时不可用,请稍后重试"
echo -e " 2. 网络连接问题"
echo -e " 3. 镜像不存在或版本错误"
fi
echo
exit 1