feat: enhance workspace handling with concurrency control and improved retry logic

This commit is contained in:
Haoxin Li
2025-08-13 18:04:38 +08:00
parent 65feb7c6c9
commit 4381f86f4c
2 changed files with 89 additions and 29 deletions

View File

@@ -45,15 +45,16 @@ type HeartbeatData struct {
}
type SocketHandler struct {
config *config.Config
logger *slog.Logger
workspaceService domain.WorkspaceFileUsecase
workspaceUsecase domain.WorkspaceUsecase
userService domain.UserUsecase
io *socketio.Io
mu sync.Mutex
workspaceCache map[string]*domain.Workspace
cacheMutex sync.RWMutex
config *config.Config
logger *slog.Logger
workspaceService domain.WorkspaceFileUsecase
workspaceUsecase domain.WorkspaceUsecase
userService domain.UserUsecase
io *socketio.Io
mu sync.Mutex
workspaceCache map[string]*domain.Workspace
cacheMutex sync.RWMutex
workspaceProcessing sync.Map
}
func NewSocketHandler(config *config.Config, logger *slog.Logger, workspaceService domain.WorkspaceFileUsecase, workspaceUsecase domain.WorkspaceUsecase, userService domain.UserUsecase) (*SocketHandler, error) {
@@ -439,18 +440,42 @@ func (h *SocketHandler) processFileUpdateAsync(socket *socketio.Socket, updateDa
// ensureWorkspace ensures that a workspace exists for the given workspacePath
func (h *SocketHandler) ensureWorkspace(ctx context.Context, userID, workspacePath string) (string, error) {
if workspacePath != "" {
// Use EnsureWorkspace to create or update workspace based on path
workspace, err := h.workspaceUsecase.EnsureWorkspace(ctx, userID, workspacePath, "")
if err != nil {
h.logger.Error("Error ensuring workspace", "path", workspacePath, "error", err)
return "", fmt.Errorf("failed to ensure workspace: %w", err)
}
return workspace.ID, nil
if workspacePath == "" {
return "", fmt.Errorf("no workspace path provided")
}
// If no workspacePath provided, return an error
return "", fmt.Errorf("no workspace path provided")
// 创建处理键,防止同一个 workspace 的并发处理
processingKey := fmt.Sprintf("%s:%s", userID, workspacePath)
// 检查是否已经在处理中
if _, processing := h.workspaceProcessing.LoadOrStore(processingKey, true); processing {
h.logger.Debug("workspace already being processed, waiting", "userID", userID, "workspacePath", workspacePath)
// 等待一段时间后重试
maxWaitRetries := 10
for i := 0; i < maxWaitRetries; i++ {
time.Sleep(50 * time.Millisecond)
if _, stillProcessing := h.workspaceProcessing.Load(processingKey); !stillProcessing {
break
}
}
// 如果仍在处理中,直接调用 EnsureWorkspace此时应该会很快返回现有的workspace
h.logger.Debug("proceeding with workspace creation after wait", "userID", userID, "workspacePath", workspacePath)
}
// 确保在函数结束时清理处理标记
defer h.workspaceProcessing.Delete(processingKey)
// Use EnsureWorkspace to create or update workspace based on path
workspace, err := h.workspaceUsecase.EnsureWorkspace(ctx, userID, workspacePath, "")
if err != nil {
h.logger.Error("Error ensuring workspace", "userID", userID, "path", workspacePath, "error", err)
return "", fmt.Errorf("failed to ensure workspace: %w", err)
}
h.logger.Debug("workspace ensured successfully", "userID", userID, "workspacePath", workspacePath, "workspaceID", workspace.ID)
return workspace.ID, nil
}
func (h *SocketHandler) handleTestPing(socket *socketio.Socket, data string) {

View File

@@ -6,7 +6,9 @@ import (
"encoding/hex"
"fmt"
"log/slog"
"math"
"strings"
"sync"
"time"
"github.com/chaitin/MonkeyCode/backend/config"
@@ -16,9 +18,10 @@ import (
)
type WorkspaceUsecase struct {
repo domain.WorkspaceRepo
config *config.Config
logger *slog.Logger
repo domain.WorkspaceRepo
config *config.Config
logger *slog.Logger
ensureLocks sync.Map // map[string]*sync.Mutex
}
type WorkspaceFileUsecase struct {
@@ -134,6 +137,17 @@ func (u *WorkspaceUsecase) Delete(ctx context.Context, id string) error {
}
func (u *WorkspaceUsecase) EnsureWorkspace(ctx context.Context, userID, rootPath, name string) (*domain.Workspace, error) {
// 创建锁的唯一键
lockKey := fmt.Sprintf("%s:%s", userID, rootPath)
// 获取或创建针对这个 userID+rootPath 的锁
lockValue, _ := u.ensureLocks.LoadOrStore(lockKey, &sync.Mutex{})
lock := lockValue.(*sync.Mutex)
// 加锁,防止并发创建
lock.Lock()
defer lock.Unlock()
// 自动生成工作区名称(如果未提供)
if name == "" {
name = u.generateWorkspaceName(rootPath)
@@ -149,6 +163,8 @@ func (u *WorkspaceUsecase) EnsureWorkspace(ctx context.Context, userID, rootPath
})
if err != nil {
u.logger.Warn("failed to update workspace last accessed time", "error", err, "id", workspace.ID)
// 即使更新访问时间失败,也返回工作区
return (&domain.Workspace{}).From(workspace), nil
}
return (&domain.Workspace{}).From(updated), nil
}
@@ -158,8 +174,8 @@ func (u *WorkspaceUsecase) EnsureWorkspace(ctx context.Context, userID, rootPath
return nil, fmt.Errorf("failed to check workspace existence: %w", err)
}
// 使用重试机制来处理并发创建的情况
maxRetries := 3
// 使用改进的重试机制来处理并发创建的情况
maxRetries := 5
for i := range maxRetries {
createReq := &domain.CreateWorkspaceReq{
UserID: userID,
@@ -171,16 +187,25 @@ func (u *WorkspaceUsecase) EnsureWorkspace(ctx context.Context, userID, rootPath
workspace, err := u.Create(ctx, createReq)
if err == nil {
u.logger.Info("workspace created successfully", "userID", userID, "rootPath", rootPath, "retry", i)
return workspace, nil
}
// 如果是唯一约束错误,说明工作区已经被其他请求创建了
if strings.Contains(err.Error(), "duplicate key value violates unique constraint") {
// 等待一小段时间,然后尝试获取已创建的工作区
time.Sleep(10 * time.Millisecond)
u.logger.Debug("workspace creation conflict, retrying", "userID", userID, "rootPath", rootPath, "retry", i, "error", err)
// 使用指数退避等待
waitTime := time.Duration(math.Pow(2, float64(i))) * 25 * time.Millisecond
if waitTime > 500*time.Millisecond {
waitTime = 500 * time.Millisecond
}
time.Sleep(waitTime)
// 尝试获取已创建的工作区
existing, err := u.repo.GetByUserAndPath(ctx, userID, rootPath)
if err == nil {
u.logger.Info("found existing workspace after conflict", "userID", userID, "rootPath", rootPath, "retry", i)
// 更新最后访问时间
updated, err := u.repo.Update(ctx, existing.ID.String(), func(up *db.WorkspaceUpdateOne) error {
up.SetLastAccessedAt(time.Now())
@@ -188,15 +213,25 @@ func (u *WorkspaceUsecase) EnsureWorkspace(ctx context.Context, userID, rootPath
})
if err != nil {
u.logger.Warn("failed to update workspace last accessed time", "error", err, "id", existing.ID)
// 即使更新访问时间失败,也返回工作区
return (&domain.Workspace{}).From(existing), nil
}
return (&domain.Workspace{}).From(updated), nil
} else {
u.logger.Warn("failed to get workspace after conflict", "userID", userID, "rootPath", rootPath, "retry", i, "error", err)
}
// 如果是最后一次重试,返回错误
if i == maxRetries-1 {
u.logger.Error("failed to resolve workspace creation conflict after all retries", "userID", userID, "rootPath", rootPath, "maxRetries", maxRetries)
return nil, fmt.Errorf("workspace creation conflict persists after %d retries: %w", maxRetries, err)
}
continue
}
// 如果不是唯一约束错误,直接返回错误
if i == maxRetries-1 {
return nil, err
}
u.logger.Error("workspace creation failed with non-conflict error", "userID", userID, "rootPath", rootPath, "retry", i, "error", err)
return nil, fmt.Errorf("failed to create workspace: %w", err)
}
return nil, fmt.Errorf("failed to create workspace after %d retries", maxRetries)