mirror of
https://github.com/yyhuni/xingrin.git
synced 2026-01-31 11:46:16 +08:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3ba1ba427e |
@@ -134,5 +134,57 @@ class WorkerService:
|
||||
logger.warning(f"[卸载] Worker {worker_id} 远程卸载异常: {e}")
|
||||
return False, f"远程卸载异常: {str(e)}"
|
||||
|
||||
def execute_remote_command(
|
||||
self,
|
||||
ip_address: str,
|
||||
ssh_port: int,
|
||||
username: str,
|
||||
password: str | None,
|
||||
command: str
|
||||
) -> tuple[bool, str]:
|
||||
"""
|
||||
在远程主机上执行命令
|
||||
|
||||
Args:
|
||||
ip_address: SSH 主机地址
|
||||
ssh_port: SSH 端口
|
||||
username: SSH 用户名
|
||||
password: SSH 密码
|
||||
command: 要执行的命令
|
||||
|
||||
Returns:
|
||||
(success, message) 元组
|
||||
"""
|
||||
if not password:
|
||||
return False, "未配置 SSH 密码"
|
||||
|
||||
try:
|
||||
import paramiko
|
||||
|
||||
ssh = paramiko.SSHClient()
|
||||
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
|
||||
ssh.connect(
|
||||
ip_address,
|
||||
port=ssh_port,
|
||||
username=username,
|
||||
password=password,
|
||||
timeout=30
|
||||
)
|
||||
|
||||
stdin, stdout, stderr = ssh.exec_command(command, timeout=120)
|
||||
exit_status = stdout.channel.recv_exit_status()
|
||||
|
||||
ssh.close()
|
||||
|
||||
if exit_status == 0:
|
||||
return True, stdout.read().decode().strip()
|
||||
else:
|
||||
error = stderr.read().decode().strip()
|
||||
return False, error
|
||||
|
||||
except Exception as e:
|
||||
return False, str(e)
|
||||
|
||||
|
||||
__all__ = ["WorkerService"]
|
||||
|
||||
@@ -163,6 +163,11 @@ class WorkerNodeViewSet(viewsets.ModelViewSet):
|
||||
logger.info(
|
||||
f"Worker {worker.name} 版本不匹配: agent={agent_version}, server={server_version}"
|
||||
)
|
||||
|
||||
# 远程 Worker:服务端主动通过 SSH 触发更新
|
||||
# 旧版 agent 不会解析 need_update,所以需要服务端主动推送
|
||||
if not worker.is_local and worker.ip_address:
|
||||
self._trigger_remote_agent_update(worker, server_version)
|
||||
|
||||
return Response({
|
||||
'status': 'ok',
|
||||
@@ -170,6 +175,73 @@ class WorkerNodeViewSet(viewsets.ModelViewSet):
|
||||
'server_version': server_version
|
||||
})
|
||||
|
||||
def _trigger_remote_agent_update(self, worker, target_version: str):
|
||||
"""
|
||||
通过 SSH 触发远程 agent 更新(后台执行,不阻塞心跳响应)
|
||||
|
||||
使用 Redis 锁防止重复触发(同一 worker 60秒内只触发一次)
|
||||
"""
|
||||
import redis
|
||||
from django.conf import settings as django_settings
|
||||
|
||||
redis_client = redis.from_url(django_settings.REDIS_URL)
|
||||
lock_key = f"agent_update_lock:{worker.id}"
|
||||
|
||||
# 尝试获取锁(60秒过期,防止重复触发)
|
||||
if not redis_client.set(lock_key, "1", nx=True, ex=60):
|
||||
logger.debug(f"Worker {worker.name} 更新已在进行中,跳过")
|
||||
return
|
||||
|
||||
# 提取数据避免后台线程访问 ORM
|
||||
worker_id = worker.id
|
||||
worker_name = worker.name
|
||||
ip_address = worker.ip_address
|
||||
ssh_port = worker.ssh_port
|
||||
username = worker.username
|
||||
password = worker.password
|
||||
|
||||
def _async_update():
|
||||
try:
|
||||
logger.info(f"开始远程更新 Worker {worker_name} 到 {target_version}")
|
||||
|
||||
# 构建更新命令:拉取新镜像并重启 agent
|
||||
docker_user = getattr(django_settings, 'DOCKER_USER', 'yyhuni')
|
||||
update_cmd = f'''
|
||||
docker pull {docker_user}/xingrin-agent:{target_version} && \
|
||||
docker stop xingrin-agent 2>/dev/null || true && \
|
||||
docker rm xingrin-agent 2>/dev/null || true && \
|
||||
docker run -d --pull=always \
|
||||
--name xingrin-agent \
|
||||
--restart always \
|
||||
-e HEARTBEAT_API_URL="https://{django_settings.PUBLIC_HOST}" \
|
||||
-e WORKER_ID="{worker_id}" \
|
||||
-e IMAGE_TAG="{target_version}" \
|
||||
-v /proc:/host/proc:ro \
|
||||
{docker_user}/xingrin-agent:{target_version}
|
||||
'''
|
||||
|
||||
success, message = self.worker_service.execute_remote_command(
|
||||
ip_address=ip_address,
|
||||
ssh_port=ssh_port,
|
||||
username=username,
|
||||
password=password,
|
||||
command=update_cmd
|
||||
)
|
||||
|
||||
if success:
|
||||
logger.info(f"Worker {worker_name} 远程更新成功")
|
||||
else:
|
||||
logger.warning(f"Worker {worker_name} 远程更新失败: {message}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Worker {worker_name} 远程更新异常: {e}")
|
||||
finally:
|
||||
# 释放锁
|
||||
redis_client.delete(lock_key)
|
||||
|
||||
# 后台执行,不阻塞心跳响应
|
||||
threading.Thread(target=_async_update, daemon=True).start()
|
||||
|
||||
@action(detail=False, methods=['post'])
|
||||
def register(self, request):
|
||||
"""
|
||||
|
||||
@@ -60,12 +60,14 @@ start_agent() {
|
||||
log_info "=========================================="
|
||||
|
||||
log_info "启动 agent 容器..."
|
||||
# --pull=missing 只在本地没有镜像时才拉取,避免意外更新
|
||||
# --pull=missing: 本地没有镜像时才拉取
|
||||
# 版本更新由服务端通过 SSH 显式 docker pull 触发
|
||||
docker run -d --pull=missing \
|
||||
--name ${CONTAINER_NAME} \
|
||||
--restart always \
|
||||
-e SERVER_URL="${PRESET_SERVER_URL}" \
|
||||
-e WORKER_ID="${PRESET_WORKER_ID}" \
|
||||
-e IMAGE_TAG="${IMAGE_TAG}" \
|
||||
-v /proc:/host/proc:ro \
|
||||
${IMAGE}
|
||||
|
||||
|
||||
@@ -135,6 +135,7 @@ if [ "$DEV_MODE" = true ]; then
|
||||
fi
|
||||
else
|
||||
# 生产模式:拉取 Docker Hub 镜像
|
||||
# pull 后 up -d 会自动检测镜像变化并重建容器
|
||||
if [ "$WITH_FRONTEND" = true ]; then
|
||||
echo -e "${CYAN}[PULL]${NC} 拉取最新镜像..."
|
||||
${COMPOSE_CMD} ${COMPOSE_ARGS} pull
|
||||
|
||||
Reference in New Issue
Block a user