diff --git a/backend/internal/socket/handler/socket.go b/backend/internal/socket/handler/socket.go index 344d51f..30257e5 100644 --- a/backend/internal/socket/handler/socket.go +++ b/backend/internal/socket/handler/socket.go @@ -45,15 +45,16 @@ type HeartbeatData struct { } type SocketHandler struct { - config *config.Config - logger *slog.Logger - workspaceService domain.WorkspaceFileUsecase - workspaceUsecase domain.WorkspaceUsecase - userService domain.UserUsecase - io *socketio.Io - mu sync.Mutex - workspaceCache map[string]*domain.Workspace - cacheMutex sync.RWMutex + config *config.Config + logger *slog.Logger + workspaceService domain.WorkspaceFileUsecase + workspaceUsecase domain.WorkspaceUsecase + userService domain.UserUsecase + io *socketio.Io + mu sync.Mutex + workspaceCache map[string]*domain.Workspace + cacheMutex sync.RWMutex + workspaceProcessing sync.Map } func NewSocketHandler(config *config.Config, logger *slog.Logger, workspaceService domain.WorkspaceFileUsecase, workspaceUsecase domain.WorkspaceUsecase, userService domain.UserUsecase) (*SocketHandler, error) { @@ -439,18 +440,42 @@ func (h *SocketHandler) processFileUpdateAsync(socket *socketio.Socket, updateDa // ensureWorkspace ensures that a workspace exists for the given workspacePath func (h *SocketHandler) ensureWorkspace(ctx context.Context, userID, workspacePath string) (string, error) { - if workspacePath != "" { - // Use EnsureWorkspace to create or update workspace based on path - workspace, err := h.workspaceUsecase.EnsureWorkspace(ctx, userID, workspacePath, "") - if err != nil { - h.logger.Error("Error ensuring workspace", "path", workspacePath, "error", err) - return "", fmt.Errorf("failed to ensure workspace: %w", err) - } - return workspace.ID, nil + if workspacePath == "" { + return "", fmt.Errorf("no workspace path provided") } - // If no workspacePath provided, return an error - return "", fmt.Errorf("no workspace path provided") + // 创建处理键,防止同一个 workspace 的并发处理 + processingKey := fmt.Sprintf("%s:%s", userID, workspacePath) + + // 检查是否已经在处理中 + if _, processing := h.workspaceProcessing.LoadOrStore(processingKey, true); processing { + h.logger.Debug("workspace already being processed, waiting", "userID", userID, "workspacePath", workspacePath) + + // 等待一段时间后重试 + maxWaitRetries := 10 + for i := 0; i < maxWaitRetries; i++ { + time.Sleep(50 * time.Millisecond) + if _, stillProcessing := h.workspaceProcessing.Load(processingKey); !stillProcessing { + break + } + } + + // 如果仍在处理中,直接调用 EnsureWorkspace(此时应该会很快返回现有的workspace) + h.logger.Debug("proceeding with workspace creation after wait", "userID", userID, "workspacePath", workspacePath) + } + + // 确保在函数结束时清理处理标记 + defer h.workspaceProcessing.Delete(processingKey) + + // Use EnsureWorkspace to create or update workspace based on path + workspace, err := h.workspaceUsecase.EnsureWorkspace(ctx, userID, workspacePath, "") + if err != nil { + h.logger.Error("Error ensuring workspace", "userID", userID, "path", workspacePath, "error", err) + return "", fmt.Errorf("failed to ensure workspace: %w", err) + } + + h.logger.Debug("workspace ensured successfully", "userID", userID, "workspacePath", workspacePath, "workspaceID", workspace.ID) + return workspace.ID, nil } func (h *SocketHandler) handleTestPing(socket *socketio.Socket, data string) { diff --git a/backend/internal/workspace/usecase/workspace.go b/backend/internal/workspace/usecase/workspace.go index f4c27ed..3f2c406 100644 --- a/backend/internal/workspace/usecase/workspace.go +++ b/backend/internal/workspace/usecase/workspace.go @@ -6,7 +6,9 @@ import ( "encoding/hex" "fmt" "log/slog" + "math" "strings" + "sync" "time" "github.com/chaitin/MonkeyCode/backend/config" @@ -16,9 +18,10 @@ import ( ) type WorkspaceUsecase struct { - repo domain.WorkspaceRepo - config *config.Config - logger *slog.Logger + repo domain.WorkspaceRepo + config *config.Config + logger *slog.Logger + ensureLocks sync.Map // map[string]*sync.Mutex } type WorkspaceFileUsecase struct { @@ -134,6 +137,17 @@ func (u *WorkspaceUsecase) Delete(ctx context.Context, id string) error { } func (u *WorkspaceUsecase) EnsureWorkspace(ctx context.Context, userID, rootPath, name string) (*domain.Workspace, error) { + // 创建锁的唯一键 + lockKey := fmt.Sprintf("%s:%s", userID, rootPath) + + // 获取或创建针对这个 userID+rootPath 的锁 + lockValue, _ := u.ensureLocks.LoadOrStore(lockKey, &sync.Mutex{}) + lock := lockValue.(*sync.Mutex) + + // 加锁,防止并发创建 + lock.Lock() + defer lock.Unlock() + // 自动生成工作区名称(如果未提供) if name == "" { name = u.generateWorkspaceName(rootPath) @@ -149,6 +163,8 @@ func (u *WorkspaceUsecase) EnsureWorkspace(ctx context.Context, userID, rootPath }) if err != nil { u.logger.Warn("failed to update workspace last accessed time", "error", err, "id", workspace.ID) + // 即使更新访问时间失败,也返回工作区 + return (&domain.Workspace{}).From(workspace), nil } return (&domain.Workspace{}).From(updated), nil } @@ -158,8 +174,8 @@ func (u *WorkspaceUsecase) EnsureWorkspace(ctx context.Context, userID, rootPath return nil, fmt.Errorf("failed to check workspace existence: %w", err) } - // 使用重试机制来处理并发创建的情况 - maxRetries := 3 + // 使用改进的重试机制来处理并发创建的情况 + maxRetries := 5 for i := range maxRetries { createReq := &domain.CreateWorkspaceReq{ UserID: userID, @@ -171,16 +187,25 @@ func (u *WorkspaceUsecase) EnsureWorkspace(ctx context.Context, userID, rootPath workspace, err := u.Create(ctx, createReq) if err == nil { + u.logger.Info("workspace created successfully", "userID", userID, "rootPath", rootPath, "retry", i) return workspace, nil } // 如果是唯一约束错误,说明工作区已经被其他请求创建了 if strings.Contains(err.Error(), "duplicate key value violates unique constraint") { - // 等待一小段时间,然后尝试获取已创建的工作区 - time.Sleep(10 * time.Millisecond) + u.logger.Debug("workspace creation conflict, retrying", "userID", userID, "rootPath", rootPath, "retry", i, "error", err) + // 使用指数退避等待 + waitTime := time.Duration(math.Pow(2, float64(i))) * 25 * time.Millisecond + if waitTime > 500*time.Millisecond { + waitTime = 500 * time.Millisecond + } + time.Sleep(waitTime) + + // 尝试获取已创建的工作区 existing, err := u.repo.GetByUserAndPath(ctx, userID, rootPath) if err == nil { + u.logger.Info("found existing workspace after conflict", "userID", userID, "rootPath", rootPath, "retry", i) // 更新最后访问时间 updated, err := u.repo.Update(ctx, existing.ID.String(), func(up *db.WorkspaceUpdateOne) error { up.SetLastAccessedAt(time.Now()) @@ -188,15 +213,25 @@ func (u *WorkspaceUsecase) EnsureWorkspace(ctx context.Context, userID, rootPath }) if err != nil { u.logger.Warn("failed to update workspace last accessed time", "error", err, "id", existing.ID) + // 即使更新访问时间失败,也返回工作区 + return (&domain.Workspace{}).From(existing), nil } return (&domain.Workspace{}).From(updated), nil + } else { + u.logger.Warn("failed to get workspace after conflict", "userID", userID, "rootPath", rootPath, "retry", i, "error", err) } + + // 如果是最后一次重试,返回错误 + if i == maxRetries-1 { + u.logger.Error("failed to resolve workspace creation conflict after all retries", "userID", userID, "rootPath", rootPath, "maxRetries", maxRetries) + return nil, fmt.Errorf("workspace creation conflict persists after %d retries: %w", maxRetries, err) + } + continue } // 如果不是唯一约束错误,直接返回错误 - if i == maxRetries-1 { - return nil, err - } + u.logger.Error("workspace creation failed with non-conflict error", "userID", userID, "rootPath", rootPath, "retry", i, "error", err) + return nil, fmt.Errorf("failed to create workspace: %w", err) } return nil, fmt.Errorf("failed to create workspace after %d retries", maxRetries)