diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 3127d9b..6c91572 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -34,7 +34,6 @@ import ( var errGatewayNotRunningSlash = errors.New("gateway not running") -const perSessionQueueSize = 64 const autoLearnDefaultInterval = 10 * time.Minute const autoLearnMinInterval = 30 * time.Second const autonomyDefaultIdleInterval = 30 * time.Minute @@ -67,9 +66,11 @@ const compactionAttemptTimeout = 8 * time.Second const compactionRetryPerCandidate = 1 type sessionWorker struct { - queue chan bus.InboundMessage - cancelMu sync.Mutex - cancel context.CancelFunc + cancelMu sync.Mutex + cancel context.CancelFunc + queueMu sync.Mutex + queue []bus.InboundMessage + queueNotify chan struct{} } type autoLearner struct { @@ -250,14 +251,6 @@ type runControlIntentLLMResponse struct { } var defaultParallelSafeToolNames = []string{"read_file", "list_files", "find_files", "grep_files", "memory_search", "web_search", "repo_map", "system_info"} -var autonomyIntentKeywords = []string{ - "autonomy", "autonomous", "autonomy mode", "self-driven", "self driven", - "自主", "自驱", "自主模式", "自动执行", "自治模式", -} -var autoLearnIntentKeywords = []string{ - "auto-learn", "autolearn", "learning loop", "learn loop", - "自学习", "学习循环", "自动学习", "学习模式", -} type stageReporter struct { onUpdate func(content string) @@ -685,7 +678,11 @@ func isStopCommand(content string) bool { } func (al *AgentLoop) handleStopCommand(msg bus.InboundMessage) { - worker := al.getWorker(msg.SessionKey) + al.cancelSessionWorkerRun(msg.SessionKey) +} + +func (al *AgentLoop) cancelSessionWorkerRun(sessionKey string) { + worker := al.getWorker(sessionKey) if worker == nil { return } @@ -702,18 +699,9 @@ func (al *AgentLoop) handleStopCommand(msg bus.InboundMessage) { } func (al *AgentLoop) enqueueMessage(ctx context.Context, msg bus.InboundMessage) { + _ = ctx worker := al.getOrCreateWorker(ctx, msg.SessionKey) - select { - case worker.queue <- msg: - case <-ctx.Done(): - case <-time.After(5 * time.Second): - al.bus.PublishOutbound(bus.OutboundMessage{ - Buttons: nil, - Channel: msg.Channel, - ChatID: msg.ChatID, - Content: "The message queue is currently busy. Please try again shortly.", - }) - } + worker.enqueue(msg) } func (al *AgentLoop) getWorker(sessionKey string) *sessionWorker { @@ -731,7 +719,8 @@ func (al *AgentLoop) getOrCreateWorker(ctx context.Context, sessionKey string) * } w := &sessionWorker{ - queue: make(chan bus.InboundMessage, perSessionQueueSize), + queue: make([]bus.InboundMessage, 0, 16), + queueNotify: make(chan struct{}, 1), } al.workers[sessionKey] = w @@ -741,58 +730,94 @@ func (al *AgentLoop) getOrCreateWorker(ctx context.Context, sessionKey string) * func (al *AgentLoop) runSessionWorker(ctx context.Context, sessionKey string, worker *sessionWorker) { for { - select { - case <-ctx.Done(): + msg, ok := worker.dequeue(ctx) + if !ok { al.clearWorkerCancel(worker) al.removeWorker(sessionKey, worker) return - case msg := <-worker.queue: - func() { - taskCtx, cancel := context.WithCancel(ctx) - taskCtx, tokenTotals := withTokenUsageTotals(taskCtx) - worker.cancelMu.Lock() - worker.cancel = cancel - worker.cancelMu.Unlock() + } - defer func() { - cancel() - al.clearWorkerCancel(worker) - if r := recover(); r != nil { - logger.ErrorCF("agent", "Session worker recovered from panic", map[string]interface{}{ - "session_key": sessionKey, - "panic": fmt.Sprintf("%v", r), - }) - } - }() + func() { + taskCtx, cancel := context.WithCancel(ctx) + taskCtx, tokenTotals := withTokenUsageTotals(taskCtx) + worker.cancelMu.Lock() + worker.cancel = cancel + worker.cancelMu.Unlock() - response, err := al.processMessage(taskCtx, msg) - if err != nil { - if errors.Is(err, context.Canceled) { - return - } - response = al.formatProcessingErrorMessage(taskCtx, msg, err) - } - - if response != "" && shouldPublishSyntheticResponse(msg) { - if al != nil && al.sessions != nil && tokenTotals != nil { - al.sessions.AddTokenUsage( - msg.SessionKey, - tokenTotals.input, - tokenTotals.output, - tokenTotals.total, - ) - } - response += formatTokenUsageSuffix( - tokenTotals, - ) - al.bus.PublishOutbound(bus.OutboundMessage{ - Buttons: nil, - Channel: msg.Channel, - ChatID: msg.ChatID, - Content: response, + defer func() { + cancel() + al.clearWorkerCancel(worker) + if r := recover(); r != nil { + logger.ErrorCF("agent", "Session worker recovered from panic", map[string]interface{}{ + "session_key": sessionKey, + "panic": fmt.Sprintf("%v", r), }) } }() + + response, err := al.processMessage(taskCtx, msg) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + response = al.formatProcessingErrorMessage(taskCtx, msg, err) + } + + if response != "" && shouldPublishSyntheticResponse(msg) { + if al != nil && al.sessions != nil && tokenTotals != nil { + al.sessions.AddTokenUsage( + msg.SessionKey, + tokenTotals.input, + tokenTotals.output, + tokenTotals.total, + ) + } + response += formatTokenUsageSuffix( + tokenTotals, + ) + al.bus.PublishOutbound(bus.OutboundMessage{ + Buttons: nil, + Channel: msg.Channel, + ChatID: msg.ChatID, + Content: response, + }) + } + }() + } +} + +func (w *sessionWorker) enqueue(msg bus.InboundMessage) { + if w == nil { + return + } + w.queueMu.Lock() + w.queue = append(w.queue, msg) + w.queueMu.Unlock() + select { + case w.queueNotify <- struct{}{}: + default: + } +} + +func (w *sessionWorker) dequeue(ctx context.Context) (bus.InboundMessage, bool) { + if w == nil { + return bus.InboundMessage{}, false + } + for { + w.queueMu.Lock() + if len(w.queue) > 0 { + msg := w.queue[0] + w.queue[0] = bus.InboundMessage{} + w.queue = w.queue[1:] + w.queueMu.Unlock() + return msg, true + } + w.queueMu.Unlock() + + select { + case <-ctx.Done(): + return bus.InboundMessage{}, false + case <-w.queueNotify: } } } @@ -1059,11 +1084,12 @@ func (al *AgentLoop) maybeRunAutonomyRound(msg bus.InboundMessage) bool { } delete(al.autonomyBySess, msg.SessionKey) al.autonomyMu.Unlock() + al.cancelSessionWorkerRun(msg.SessionKey) al.controlMetricAdd(&al.controlStats.autonomyStoppedByGuard, 1) al.bus.PublishOutbound(bus.OutboundMessage{ Channel: msg.Channel, ChatID: msg.ChatID, - Content: "Autonomy mode stopped automatically because background rounds stalled repeatedly.", + Content: al.autonomyGuardStopMessage(msg.SessionKey), }) return false } @@ -1083,7 +1109,7 @@ func (al *AgentLoop) maybeRunAutonomyRound(msg bus.InboundMessage) bool { al.bus.PublishOutbound(bus.OutboundMessage{ Channel: msg.Channel, ChatID: msg.ChatID, - Content: "Autonomy mode paused automatically after many unattended rounds. Send a new request to continue.", + Content: al.autonomyGuardPauseMessage(msg.SessionKey), }) return false } @@ -1132,6 +1158,61 @@ func (al *AgentLoop) finishAutonomyRound(sessionKey string) { } } +func (al *AgentLoop) autonomyGuardStopMessage(sessionKey string) string { + return al.renderControlNotice( + sessionKey, + "Explain autonomy mode stopped automatically because background rounds stalled repeatedly, and suggest sending a new request to continue.", + "自主模式已因后台轮次连续停滞而自动停止;如需继续,请发送新的请求。", + ) +} + +func (al *AgentLoop) autonomyGuardPauseMessage(sessionKey string) string { + return al.renderControlNotice( + sessionKey, + "Explain autonomy mode paused automatically after too many unattended rounds, and suggest sending a new request to continue.", + "自主模式已在长时间无用户输入后自动暂停;如需继续,请发送新的请求。", + ) +} + +func (al *AgentLoop) autoLearnGuardStopMessage(sessionKey string) string { + return al.renderControlNotice( + sessionKey, + "Explain auto-learn stopped automatically after reaching the unattended round limit.", + "自动学习已因达到无人值守轮次上限而自动停止。", + ) +} + +func (al *AgentLoop) renderControlNotice(sessionKey, requirement, fallback string) string { + if al == nil { + return fallback + } + + ctx, cancel := context.WithTimeout( + withUserLanguageHint(context.Background(), sessionKey, ""), + 8*time.Second, + ) + defer cancel() + + systemPrompt := al.withBootstrapPolicy(`Generate one short user-facing notification sentence. +Requirements: +- Use natural language only. +- ` + strings.TrimSpace(requirement)) + + resp, err := al.callLLMWithModelFallback(ctx, []providers.Message{ + {Role: "system", Content: systemPrompt}, + }, nil, map[string]interface{}{ + "max_tokens": 80, + "temperature": 0.2, + }) + if err == nil && resp != nil { + text := strings.TrimSpace(resp.Content) + if text != "" { + return text + } + } + return fallback +} + func buildAutonomyFollowUpPrompt(round int, focus string, reportDue bool) string { focus = strings.TrimSpace(focus) if focus == "" && reportDue { @@ -1199,7 +1280,7 @@ func (al *AgentLoop) runAutoLearnerLoop(ctx context.Context, msg bus.InboundMess al.bus.PublishOutbound(bus.OutboundMessage{ Channel: msg.Channel, ChatID: msg.ChatID, - Content: "Auto-learn stopped automatically after reaching the unattended round limit.", + Content: al.autoLearnGuardStopMessage(msg.SessionKey), }) return false } @@ -2005,6 +2086,11 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) }) if isAutonomySyntheticMessage(msg) { + // Autonomy may have been stopped while synthetic messages are still queued. + // Drop stale synthetic rounds so they don't block real user turns. + if !al.isAutonomyEnabled(msg.SessionKey) { + return "", nil + } defer al.finishAutonomyRound(msg.SessionKey) } @@ -4801,9 +4887,6 @@ func isExplicitRunCommand(content string) bool { } func (al *AgentLoop) detectAutonomyIntent(ctx context.Context, content string) (autonomyIntent, intentDetectionOutcome) { - if !shouldAttemptAutonomyIntentInference(content) { - return autonomyIntent{}, intentDetectionOutcome{} - } if intent, confidence, ok := al.inferAutonomyIntent(ctx, content); ok { al.controlMetricAdd(&al.controlStats.intentAutonomyMatched, 1) return intent, intentDetectionOutcome{matched: true, confidence: confidence} @@ -4813,9 +4896,6 @@ func (al *AgentLoop) detectAutonomyIntent(ctx context.Context, content string) ( } func (al *AgentLoop) detectAutoLearnIntent(ctx context.Context, content string) (autoLearnIntent, intentDetectionOutcome) { - if !shouldAttemptAutoLearnIntentInference(content) { - return autoLearnIntent{}, intentDetectionOutcome{} - } if intent, confidence, ok := al.inferAutoLearnIntent(ctx, content); ok { al.controlMetricAdd(&al.controlStats.intentAutoLearnMatched, 1) return intent, intentDetectionOutcome{matched: true, confidence: confidence} @@ -4885,14 +4965,6 @@ Rules: return intent, parsed.Confidence, true } -func shouldAttemptAutonomyIntentInference(content string) bool { - lower := strings.ToLower(strings.TrimSpace(content)) - if lower == "" { - return false - } - return containsAnySubstring(lower, autonomyIntentKeywords...) -} - func extractJSONObject(text string) string { s := strings.TrimSpace(text) if s == "" { @@ -4974,14 +5046,6 @@ Rules: return intent, parsed.Confidence, true } -func shouldAttemptAutoLearnIntentInference(content string) bool { - lower := strings.ToLower(strings.TrimSpace(content)) - if lower == "" { - return false - } - return containsAnySubstring(lower, autoLearnIntentKeywords...) -} - func (al *AgentLoop) inferTaskExecutionDirectives(ctx context.Context, content string) (taskExecutionDirectives, bool) { text := strings.TrimSpace(content) if text == "" || len(text) > 1200 {