Compare commits

...

8 Commits

Author SHA1 Message Date
yyhuni
3ba1ba427e fix: agent自动更新逻辑 2025-12-19 19:48:01 +08:00
yyhuni
6019555729 fix:ssl问题 2025-12-19 19:41:12 +08:00
github-actions[bot]
750f52c515 chore: bump version to v1.0.20 2025-12-19 11:28:27 +00:00
yyhuni
bb5ce66a31 fix:agent容器版本号匹配 2025-12-19 19:20:15 +08:00
github-actions[bot]
ac958571a5 chore: bump version to v1.0.19 2025-12-19 11:12:14 +00:00
yyhuni
bcb321f883 Merge branch 'main' of https://github.com/yyhuni/xingrin 2025-12-19 19:03:39 +08:00
yyhuni
fd3cdf8033 fix:远程worker 8888端口问题 2025-12-19 19:02:43 +08:00
github-actions[bot]
f3f9718df2 chore: bump version to v1.0.18 2025-12-19 10:47:10 +00:00
16 changed files with 270 additions and 31 deletions

View File

@@ -104,6 +104,8 @@ jobs:
tags: |
${{ env.IMAGE_PREFIX }}/${{ matrix.image }}:${{ steps.version.outputs.VERSION }}
${{ steps.version.outputs.IS_RELEASE == 'true' && format('{0}/{1}:latest', env.IMAGE_PREFIX, matrix.image) || '' }}
build-args: |
IMAGE_TAG=${{ steps.version.outputs.VERSION }}
cache-from: type=gha
cache-to: type=gha,mode=max
provenance: false

View File

@@ -1 +1 @@
v1.0.17
v1.0.20

View File

@@ -14,6 +14,10 @@ import os
import sys
import requests
import logging
import urllib3
# 禁用自签名证书的 SSL 警告(远程 Worker 场景)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger = logging.getLogger(__name__)
@@ -36,7 +40,8 @@ def fetch_config_and_setup_django():
print(f"[CONFIG] 正在从配置中心获取配置: {config_url}")
print(f"[CONFIG] IS_LOCAL={is_local}")
try:
resp = requests.get(config_url, timeout=10)
# verify=False: 远程 Worker 通过 HTTPS 访问时可能使用自签名证书
resp = requests.get(config_url, timeout=10, verify=False)
resp.raise_for_status()
config = resp.json()

View File

@@ -232,9 +232,9 @@ class TaskDistributor:
network_arg = f"--network {settings.DOCKER_NETWORK_NAME}"
server_url = f"http://server:{settings.SERVER_PORT}"
else:
# 远程:无需指定网络,使用公网地址
# 远程:通过 Nginx 反向代理访问HTTPS不直连 8888 端口)
network_arg = ""
server_url = f"http://{settings.PUBLIC_HOST}:{settings.SERVER_PORT}"
server_url = f"https://{settings.PUBLIC_HOST}"
# 挂载路径(所有节点统一使用固定路径)
host_results_dir = settings.HOST_RESULTS_DIR # /opt/xingrin/results

View File

@@ -134,5 +134,57 @@ class WorkerService:
logger.warning(f"[卸载] Worker {worker_id} 远程卸载异常: {e}")
return False, f"远程卸载异常: {str(e)}"
def execute_remote_command(
self,
ip_address: str,
ssh_port: int,
username: str,
password: str | None,
command: str
) -> tuple[bool, str]:
"""
在远程主机上执行命令
Args:
ip_address: SSH 主机地址
ssh_port: SSH 端口
username: SSH 用户名
password: SSH 密码
command: 要执行的命令
Returns:
(success, message) 元组
"""
if not password:
return False, "未配置 SSH 密码"
try:
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(
ip_address,
port=ssh_port,
username=username,
password=password,
timeout=30
)
stdin, stdout, stderr = ssh.exec_command(command, timeout=120)
exit_status = stdout.channel.recv_exit_status()
ssh.close()
if exit_status == 0:
return True, stdout.read().decode().strip()
else:
error = stderr.read().decode().strip()
return False, error
except Exception as e:
return False, str(e)
__all__ = ["WorkerService"]

View File

@@ -118,8 +118,25 @@ class WorkerNodeViewSet(viewsets.ModelViewSet):
@action(detail=True, methods=['post'])
def heartbeat(self, request, pk=None):
"""接收心跳上报(写 Redis首次心跳更新部署状态"""
"""
接收心跳上报(写 Redis首次心跳更新部署状态检查版本
请求体:
{
"cpu_percent": 50.0,
"memory_percent": 60.0,
"version": "v1.0.9"
}
返回:
{
"status": "ok",
"need_update": true/false,
"server_version": "v1.0.19"
}
"""
from apps.engine.services.worker_load_service import worker_load_service
from django.conf import settings
worker = self.get_object()
info = request.data if request.data else {}
@@ -134,7 +151,96 @@ class WorkerNodeViewSet(viewsets.ModelViewSet):
worker.status = 'online'
worker.save(update_fields=['status'])
return Response({'status': 'ok'})
# 3. 版本检查:比较 agent 版本与 server 版本
agent_version = info.get('version', '')
server_version = settings.IMAGE_TAG # Server 当前版本
need_update = False
if agent_version and agent_version != 'unknown':
# 版本不匹配时通知 agent 更新
need_update = agent_version != server_version
if need_update:
logger.info(
f"Worker {worker.name} 版本不匹配: agent={agent_version}, server={server_version}"
)
# 远程 Worker服务端主动通过 SSH 触发更新
# 旧版 agent 不会解析 need_update所以需要服务端主动推送
if not worker.is_local and worker.ip_address:
self._trigger_remote_agent_update(worker, server_version)
return Response({
'status': 'ok',
'need_update': need_update,
'server_version': server_version
})
def _trigger_remote_agent_update(self, worker, target_version: str):
"""
通过 SSH 触发远程 agent 更新(后台执行,不阻塞心跳响应)
使用 Redis 锁防止重复触发(同一 worker 60秒内只触发一次
"""
import redis
from django.conf import settings as django_settings
redis_client = redis.from_url(django_settings.REDIS_URL)
lock_key = f"agent_update_lock:{worker.id}"
# 尝试获取锁60秒过期防止重复触发
if not redis_client.set(lock_key, "1", nx=True, ex=60):
logger.debug(f"Worker {worker.name} 更新已在进行中,跳过")
return
# 提取数据避免后台线程访问 ORM
worker_id = worker.id
worker_name = worker.name
ip_address = worker.ip_address
ssh_port = worker.ssh_port
username = worker.username
password = worker.password
def _async_update():
try:
logger.info(f"开始远程更新 Worker {worker_name}{target_version}")
# 构建更新命令:拉取新镜像并重启 agent
docker_user = getattr(django_settings, 'DOCKER_USER', 'yyhuni')
update_cmd = f'''
docker pull {docker_user}/xingrin-agent:{target_version} && \
docker stop xingrin-agent 2>/dev/null || true && \
docker rm xingrin-agent 2>/dev/null || true && \
docker run -d --pull=always \
--name xingrin-agent \
--restart always \
-e HEARTBEAT_API_URL="https://{django_settings.PUBLIC_HOST}" \
-e WORKER_ID="{worker_id}" \
-e IMAGE_TAG="{target_version}" \
-v /proc:/host/proc:ro \
{docker_user}/xingrin-agent:{target_version}
'''
success, message = self.worker_service.execute_remote_command(
ip_address=ip_address,
ssh_port=ssh_port,
username=username,
password=password,
command=update_cmd
)
if success:
logger.info(f"Worker {worker_name} 远程更新成功")
else:
logger.warning(f"Worker {worker_name} 远程更新失败: {message}")
except Exception as e:
logger.error(f"Worker {worker_name} 远程更新异常: {e}")
finally:
# 释放锁
redis_client.delete(lock_key)
# 后台执行,不阻塞心跳响应
threading.Thread(target=_async_update, daemon=True).start()
@action(detail=False, methods=['post'])
def register(self, request):

View File

@@ -3,10 +3,14 @@
import logging
import time
import requests
import urllib3
from .models import Notification, NotificationSettings
from .types import NotificationLevel, NotificationCategory
from .repositories import DjangoNotificationRepository, NotificationSettingsRepository
# 禁用自签名证书的 SSL 警告(远程 Worker 回调场景)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger = logging.getLogger(__name__)
@@ -314,7 +318,8 @@ def _push_via_api_callback(notification: Notification, server_url: str) -> None:
'created_at': notification.created_at.isoformat()
}
resp = requests.post(callback_url, json=data, timeout=5)
# verify=False: 远程 Worker 回调 Server 时可能使用自签名证书
resp = requests.post(callback_url, json=data, timeout=5, verify=False)
resp.raise_for_status()
logger.debug(f"通知回调推送成功 - ID: {notification.id}")

View File

@@ -7,6 +7,7 @@
import logging
import os
import ssl
from pathlib import Path
from urllib import request as urllib_request
from urllib import parse as urllib_parse
@@ -89,7 +90,12 @@ def ensure_wordlist_local(wordlist_name: str) -> str:
logger.info("从后端下载字典: %s -> %s", download_url, local_path)
try:
with urllib_request.urlopen(download_url) as resp:
# 创建不验证 SSL 的上下文(远程 Worker 可能使用自签名证书)
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
with urllib_request.urlopen(download_url, context=ssl_context) as resp:
if resp.status != 200:
raise RuntimeError(f"下载字典失败HTTP {resp.status}")
data = resp.read()

View File

@@ -1,7 +1,7 @@
#!/bin/bash
# ============================================
# XingRin Agent
# 用途:心跳上报 + 负载监控
# 用途:心跳上报 + 负载监控 + 版本检查
# 适用:远程 VPS 或 Docker 容器内
# ============================================
@@ -17,6 +17,9 @@ SRC_DIR="${MARKER_DIR}/src"
ENV_FILE="${SRC_DIR}/backend/.env"
INTERVAL=${AGENT_INTERVAL:-3}
# Agent 版本(从环境变量获取,由 Docker 镜像构建时注入)
AGENT_VERSION="${IMAGE_TAG:-unknown}"
# 颜色定义
GREEN='\033[0;32m'
RED='\033[0;31m'
@@ -90,7 +93,7 @@ register_worker() {
EOF
)
RESPONSE=$(curl -s -X POST \
RESPONSE=$(curl -k -s -X POST \
-H "Content-Type: application/json" \
-d "$REGISTER_DATA" \
"${API_URL}/api/workers/register/" 2>/dev/null)
@@ -172,22 +175,72 @@ while true; do
fi
# 构建 JSON 数据(使用数值而非字符串,便于比较和排序)
# 包含版本号,供 Server 端检查版本一致性
JSON_DATA=$(cat <<EOF
{
"cpu_percent": $CPU_PERCENT,
"memory_percent": $MEM_PERCENT
"memory_percent": $MEM_PERCENT,
"version": "$AGENT_VERSION"
}
EOF
)
# 发送心跳
RESPONSE=$(curl -k -s -o /dev/null -w "%{http_code}" -X POST \
# 发送心跳,获取响应内容
RESPONSE_FILE=$(mktemp)
HTTP_CODE=$(curl -k -s -o "$RESPONSE_FILE" -w "%{http_code}" -X POST \
-H "Content-Type: application/json" \
-d "$JSON_DATA" \
"${API_URL}/api/workers/${WORKER_ID}/heartbeat/" 2>/dev/null || echo "000")
RESPONSE_BODY=$(cat "$RESPONSE_FILE" 2>/dev/null)
rm -f "$RESPONSE_FILE"
if [ "$RESPONSE" != "200" ] && [ "$RESPONSE" != "201" ]; then
log "${YELLOW}心跳发送失败 (HTTP $RESPONSE)${NC}"
if [ "$HTTP_CODE" != "200" ] && [ "$HTTP_CODE" != "201" ]; then
log "${YELLOW}心跳发送失败 (HTTP $HTTP_CODE)${NC}"
else
# 检查是否需要更新
NEED_UPDATE=$(echo "$RESPONSE_BODY" | grep -oE '"need_update":\s*(true|false)' | grep -oE '(true|false)')
if [ "$NEED_UPDATE" = "true" ]; then
SERVER_VERSION=$(echo "$RESPONSE_BODY" | grep -oE '"server_version":\s*"[^"]+"' | sed 's/.*"\([^"]*\)"$/\1/')
log "${YELLOW}检测到版本不匹配: Agent=$AGENT_VERSION, Server=$SERVER_VERSION${NC}"
log "${GREEN}正在自动更新...${NC}"
# 执行自动更新
if [ "$RUN_MODE" = "container" ]; then
# 容器模式:通知外部重启(退出后由 docker-compose restart policy 重启)
log "容器模式:退出以触发重启更新"
exit 0
else
# 远程模式:拉取新镜像并重启 agent 容器
log "远程模式:更新 agent 镜像..."
DOCKER_USER="${DOCKER_USER:-yyhuni}"
NEW_IMAGE="${DOCKER_USER}/xingrin-agent:${SERVER_VERSION}"
# 拉取新镜像
if $DOCKER_CMD pull "$NEW_IMAGE" 2>/dev/null; then
log "${GREEN}镜像拉取成功: $NEW_IMAGE${NC}"
# 停止当前容器并用新镜像重启
CONTAINER_NAME="xingrin-agent"
$DOCKER_CMD stop "$CONTAINER_NAME" 2>/dev/null || true
$DOCKER_CMD rm "$CONTAINER_NAME" 2>/dev/null || true
# 重新启动(使用相同的环境变量)
$DOCKER_CMD run -d \
--name "$CONTAINER_NAME" \
--restart unless-stopped \
-e HEARTBEAT_API_URL="$API_URL" \
-e WORKER_ID="$WORKER_ID" \
-e IMAGE_TAG="$SERVER_VERSION" \
-v /proc:/host/proc:ro \
"$NEW_IMAGE"
log "${GREEN}Agent 已更新到 $SERVER_VERSION${NC}"
exit 0
else
log "${RED}镜像拉取失败: $NEW_IMAGE${NC}"
fi
fi
fi
fi
# 休眠

View File

@@ -60,12 +60,14 @@ start_agent() {
log_info "=========================================="
log_info "启动 agent 容器..."
# --pull=missing 只在本地没有镜像时才拉取,避免意外更新
# --pull=missing: 本地没有镜像时才拉取
# 版本更新由服务端通过 SSH 显式 docker pull 触发
docker run -d --pull=missing \
--name ${CONTAINER_NAME} \
--restart always \
-e SERVER_URL="${PRESET_SERVER_URL}" \
-e WORKER_ID="${PRESET_WORKER_ID}" \
-e IMAGE_TAG="${IMAGE_TAG}" \
-v /proc:/host/proc:ro \
${IMAGE}

View File

@@ -1,12 +1,15 @@
# ============================================
# XingRin Agent - 轻量心跳上报镜像
# 用途:心跳上报 + 负载监控
# 用途:心跳上报 + 负载监控 + 版本检查
# 基础镜像Alpine Linux (~5MB)
# 最终大小:~10MB
# ============================================
FROM alpine:3.19
# 构建参数:版本号
ARG IMAGE_TAG=unknown
# 安装必要工具
RUN apk add --no-cache \
bash \
@@ -17,6 +20,9 @@ RUN apk add --no-cache \
COPY backend/scripts/worker-deploy/agent.sh /app/agent.sh
RUN chmod +x /app/agent.sh
# 将版本号写入环境变量(运行时可用)
ENV IMAGE_TAG=${IMAGE_TAG}
# 工作目录
WORKDIR /app

View File

@@ -54,19 +54,19 @@ services:
retries: 3
start_period: 60s
# Agent心跳上报 + 负载监控
# Agent心跳上报 + 负载监控 + 版本检查
agent:
build:
context: ..
dockerfile: docker/worker/Dockerfile
dockerfile: docker/agent/Dockerfile
args:
IMAGE_TAG: ${IMAGE_TAG:-dev}
restart: always
env_file:
- .env
environment:
- SERVER_URL=http://server:8888
- WORKER_NAME=本地节点
- IS_LOCAL=true
command: bash /app/backend/scripts/worker-deploy/agent.sh
- IMAGE_TAG=${IMAGE_TAG:-dev}
depends_on:
server:
condition: service_healthy

View File

@@ -72,6 +72,7 @@ services:
- SERVER_URL=http://server:8888
- WORKER_NAME=本地节点
- IS_LOCAL=true
- IMAGE_TAG=${IMAGE_TAG}
depends_on:
server:
condition: service_healthy

View File

@@ -26,7 +26,7 @@ fi
# 检查是否已有交换分区
CURRENT_SWAP_KB=$(grep SwapTotal /proc/meminfo | awk '{print $2}')
CURRENT_SWAP_GB=$((CURRENT_SWAP_KB / 1024 / 1024))
CURRENT_SWAP_GB=$(awk "BEGIN {printf \"%.0f\", $CURRENT_SWAP_KB / 1024 / 1024}")
if [ "$CURRENT_SWAP_GB" -gt 0 ]; then
log_warn "系统已有 ${CURRENT_SWAP_GB}GB 交换分区"
swapon --show
@@ -37,9 +37,9 @@ if [ "$CURRENT_SWAP_GB" -gt 0 ]; then
fi
fi
# 获取系统内存大小GB
# 获取系统内存大小GB,四舍五入
TOTAL_MEM_KB=$(grep MemTotal /proc/meminfo | awk '{print $2}')
TOTAL_MEM_GB=$((TOTAL_MEM_KB / 1024 / 1024))
TOTAL_MEM_GB=$(awk "BEGIN {printf \"%.0f\", $TOTAL_MEM_KB / 1024 / 1024}")
# 确定交换分区大小
if [ -n "$1" ]; then
@@ -56,8 +56,8 @@ SWAP_FILE="/swapfile_xingrin"
log_info "系统内存: ${TOTAL_MEM_GB}GB"
log_info "将创建 ${SWAP_SIZE_GB}GB 交换分区: $SWAP_FILE"
# 检查磁盘空间
AVAILABLE_GB=$(df / | tail -1 | awk '{print int($4/1024/1024)}')
# 检查磁盘空间(向下取整,保守估计)
AVAILABLE_GB=$(df / | tail -1 | awk '{printf "%.0f", $4/1024/1024}')
if [ "$AVAILABLE_GB" -lt "$SWAP_SIZE_GB" ]; then
log_error "磁盘空间不足!可用: ${AVAILABLE_GB}GB需要: ${SWAP_SIZE_GB}GB"
exit 1

View File

@@ -135,6 +135,7 @@ if [ "$DEV_MODE" = true ]; then
fi
else
# 生产模式:拉取 Docker Hub 镜像
# pull 后 up -d 会自动检测镜像变化并重建容器
if [ "$WITH_FRONTEND" = true ]; then
echo -e "${CYAN}[PULL]${NC} 拉取最新镜像..."
${COMPOSE_CMD} ${COMPOSE_ARGS} pull

View File

@@ -282,13 +282,13 @@ fi
# 交换分区配置(仅 Linux
# ==============================================================================
if [[ "$OSTYPE" == "linux-gnu"* ]]; then
# 获取当前内存大小GB纯 bash 算术
# 获取当前内存大小GB四舍五入
TOTAL_MEM_KB=$(grep MemTotal /proc/meminfo | awk '{print $2}')
TOTAL_MEM_GB=$((TOTAL_MEM_KB / 1024 / 1024))
TOTAL_MEM_GB=$(awk "BEGIN {printf \"%.0f\", $TOTAL_MEM_KB / 1024 / 1024}")
# 获取当前交换分区大小GB
# 获取当前交换分区大小GB,四舍五入
CURRENT_SWAP_KB=$(grep SwapTotal /proc/meminfo | awk '{print $2}')
CURRENT_SWAP_GB=$((CURRENT_SWAP_KB / 1024 / 1024))
CURRENT_SWAP_GB=$(awk "BEGIN {printf \"%.0f\", $CURRENT_SWAP_KB / 1024 / 1024}")
# 推荐交换分区大小与内存相同最小1G最大8G
RECOMMENDED_SWAP=$TOTAL_MEM_GB