diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 7b4d956..6cce901 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -55,6 +55,8 @@ type AgentLoop struct { running bool intentMu sync.RWMutex intentHints map[string]string + sessionRunMu sync.Mutex + sessionRunLocks map[string]*sync.Mutex } type executionTxnState struct { @@ -236,6 +238,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers audit: newTriggerAudit(workspace), running: false, intentHints: map[string]string{}, + sessionRunLocks: map[string]*sync.Mutex{}, } // 注入递归运行逻辑,使 subagent 具备 full tool-calling 能力 @@ -295,6 +298,22 @@ func (al *AgentLoop) buildSessionShards(ctx context.Context) []chan bus.InboundM return shards } +func (al *AgentLoop) lockSessionRun(sessionKey string) func() { + key := strings.TrimSpace(sessionKey) + if key == "" { + key = "default" + } + al.sessionRunMu.Lock() + mu, ok := al.sessionRunLocks[key] + if !ok { + mu = &sync.Mutex{} + al.sessionRunLocks[key] = mu + } + al.sessionRunMu.Unlock() + mu.Lock() + return func() { mu.Unlock() } +} + func (al *AgentLoop) processInbound(ctx context.Context, msg bus.InboundMessage) { response, err := al.processMessage(ctx, msg) if err != nil { @@ -419,6 +438,8 @@ func (al *AgentLoop) ProcessDirect(ctx context.Context, content, sessionKey stri } func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) (string, error) { + unlock := al.lockSessionRun(msg.SessionKey) + defer unlock() // Add message preview to log preview := truncate(msg.Content, 80) logger.InfoCF("agent", fmt.Sprintf("Processing message from %s:%s: %s", msg.Channel, msg.SenderID, preview),