From e038c00ebca38fa0d4d60da7174262b75a4312fd Mon Sep 17 00:00:00 2001 From: DBT Date: Mon, 23 Feb 2026 09:49:49 +0000 Subject: [PATCH] optimize session memory and language routing for migration --- pkg/agent/context.go | 5 +- pkg/agent/language.go | 108 + pkg/agent/loop.go | 4865 +++------------------------------------- pkg/agent/memory.go | 294 +-- pkg/session/manager.go | 973 ++------ pkg/tools/memory.go | 225 +- 6 files changed, 658 insertions(+), 5812 deletions(-) create mode 100644 pkg/agent/language.go diff --git a/pkg/agent/context.go b/pkg/agent/context.go index 0555162..21d79f3 100644 --- a/pkg/agent/context.go +++ b/pkg/agent/context.go @@ -163,7 +163,7 @@ func (cb *ContextBuilder) LoadBootstrapFiles() string { return result } -func (cb *ContextBuilder) BuildMessages(history []providers.Message, summary string, currentMessage string, media []string, channel, chatID string) []providers.Message { +func (cb *ContextBuilder) BuildMessages(history []providers.Message, summary string, currentMessage string, media []string, channel, chatID, responseLanguage string) []providers.Message { messages := []providers.Message{} systemPrompt := cb.BuildSystemPrompt() @@ -173,6 +173,9 @@ func (cb *ContextBuilder) BuildMessages(history []providers.Message, summary str if channel != "" && chatID != "" { systemPrompt += fmt.Sprintf("\n\n## Current Session\nChannel: %s\nChat ID: %s", channel, chatID) } + if responseLanguage != "" { + systemPrompt += fmt.Sprintf("\n\n## Response Language\nReply in %s unless user explicitly asks to switch language. Keep code identifiers and CLI commands unchanged.", responseLanguage) + } // Log system prompt summary for debugging (debug mode only) logger.DebugCF("agent", "System prompt built", diff --git a/pkg/agent/language.go b/pkg/agent/language.go new file mode 100644 index 0000000..e33a8de --- /dev/null +++ b/pkg/agent/language.go @@ -0,0 +1,108 @@ +package agent + +import "strings" + +// DetectResponseLanguage returns a BCP-47 style language tag for reply policy. +// Priority: explicit preference > current user text > last session language > default English. +func DetectResponseLanguage(userText, preferred, last string) string { + if p := normalizeLang(preferred); p != "" { + return p + } + if detected := detectFromText(userText); detected != "" { + return detected + } + if l := normalizeLang(last); l != "" { + return l + } + return "en" +} + +func detectFromText(text string) string { + text = strings.TrimSpace(text) + if text == "" { + return "" + } + + var zh, ja, ko, letters int + for _, r := range text { + switch { + case r >= 0x4E00 && r <= 0x9FFF: + zh++ + case r >= 0x3040 && r <= 0x30FF: + ja++ + case r >= 0xAC00 && r <= 0xD7AF: + ko++ + case (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z'): + letters++ + } + } + + // CJK-first heuristic to match user typing language quickly. + if zh > 0 { + return "zh-CN" + } + if ja > 0 { + return "ja" + } + if ko > 0 { + return "ko" + } + if letters > 0 { + return "en" + } + return "" +} + +func normalizeLang(lang string) string { + lang = strings.TrimSpace(strings.ToLower(lang)) + switch lang { + case "zh", "zh-cn", "zh_hans", "chinese": + return "zh-CN" + case "en", "en-us", "english": + return "en" + case "ja", "jp", "japanese": + return "ja" + case "ko", "kr", "korean": + return "ko" + default: + if lang == "" { + return "" + } + return lang + } +} + +// ExtractLanguagePreference detects explicit user instructions for language switch. +func ExtractLanguagePreference(text string) string { + s := strings.ToLower(strings.TrimSpace(text)) + if s == "" { + return "" + } + + enHints := []string{"speak english", "reply in english", "use english", "以后用英文", "请用英文", "用英文"} + zhHints := []string{"说中文", "用中文", "请用中文", "reply in chinese", "speak chinese"} + jaHints := []string{"日本語", "reply in japanese", "speak japanese"} + koHints := []string{"한국어", "reply in korean", "speak korean"} + + for _, h := range enHints { + if strings.Contains(s, strings.ToLower(h)) { + return "en" + } + } + for _, h := range zhHints { + if strings.Contains(s, strings.ToLower(h)) { + return "zh-CN" + } + } + for _, h := range jaHints { + if strings.Contains(s, strings.ToLower(h)) { + return "ja" + } + } + for _, h := range koHints { + if strings.Contains(s, strings.ToLower(h)) { + return "ko" + } + } + return "" +} diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index e25f2ff..49eb3c2 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -9,22 +9,14 @@ package agent import ( "context" "encoding/json" - "errors" "fmt" "os" "path/filepath" "regexp" - "sort" - "strconv" "strings" - "sync" - "sync/atomic" - "time" - "unicode" "clawgo/pkg/bus" "clawgo/pkg/config" - "clawgo/pkg/configops" "clawgo/pkg/cron" "clawgo/pkg/logger" "clawgo/pkg/providers" @@ -32,461 +24,26 @@ import ( "clawgo/pkg/tools" ) -var errGatewayNotRunningSlash = errors.New("gateway not running") - -const autoLearnDefaultInterval = 10 * time.Minute -const autoLearnMinInterval = 30 * time.Second -const autonomyDefaultIdleInterval = 30 * time.Minute -const autonomyMinIdleInterval = 1 * time.Minute -const autonomyContinuousRunInterval = 20 * time.Second -const autonomyContinuousIdleThreshold = 20 * time.Second -const defaultRunStateTTL = 30 * time.Minute -const defaultRunStateMaxEntries = 500 -const defaultRunWaitTimeout = 60 * time.Second -const minRunWaitTimeout = 5 * time.Second -const maxRunWaitTimeout = 15 * time.Minute -const toolLoopRepeatSignatureThreshold = 2 -const toolLoopAllErrorRoundsThreshold = 2 -const toolLoopMaxCallsPerIteration = 6 -const toolLoopSingleCallTimeout = 20 * time.Second -const toolLoopMaxActDuration = 45 * time.Second -const toolLoopMinCallsPerIteration = 2 -const toolLoopMinSingleCallTimeout = 8 * time.Second -const toolLoopMinActDuration = 18 * time.Second -const toolLoopMaxParallelCalls = 2 -const finalizeDraftMinCharsForPolish = 90 -const finalizeQualityThreshold = 0.72 -const finalizeHeuristicHighThreshold = 0.82 -const finalizeHeuristicLowThreshold = 0.48 -const compactionAttemptTimeout = 8 * time.Second -const compactionRetryPerCandidate = 1 - -type sessionWorker struct { - cancelMu sync.Mutex - cancel context.CancelFunc - queueMu sync.Mutex - queue []bus.InboundMessage - queueNotify chan struct{} -} - -type autoLearner struct { - cancel context.CancelFunc - started time.Time - interval time.Duration - rounds int -} - -type autonomySession struct { - cancel context.CancelFunc - started time.Time - idleInterval time.Duration - rounds int - lastUserAt time.Time - lastNudgeAt time.Time - lastReportAt time.Time - pending bool - pendingSince time.Time - stallCount int - focus string -} - -type controlPolicy struct { - intentMaxInputChars int - autonomyTickInterval time.Duration - autonomyMinRunInterval time.Duration - autonomyIdleThreshold time.Duration - autonomyMaxPendingDuration time.Duration - autonomyMaxConsecutiveStalls int -} - -type runtimeControlStats struct { - runAccepted int64 - runCompleted int64 - runFailed int64 - runCanceled int64 - runControlHandled int64 - intentAutonomyMatched int64 - intentAutonomyRejected int64 - intentAutoLearnMatched int64 - intentAutoLearnRejected int64 - autonomyRounds int64 - autonomyStoppedByGuard int64 - autoLearnRounds int64 -} - -type runStatus string - -const ( - runStatusAccepted runStatus = "accepted" - runStatusRunning runStatus = "running" - runStatusOK runStatus = "ok" - runStatusError runStatus = "error" - runStatusCanceled runStatus = "canceled" -) - -type agentRunLifecycle struct { - runID string - acceptedAt time.Time - sessionKey string - channel string - senderID string - chatID string - synthetic bool - controlEligible bool -} - -type runState struct { - runID string - sessionKey string - channel string - chatID string - senderID string - synthetic bool - controlEligible bool - status runStatus - acceptedAt time.Time - startedAt time.Time - endedAt time.Time - errMessage string - responseLen int - controlHandled bool - done chan struct{} -} - type AgentLoop struct { - bus *bus.MessageBus - provider providers.LLMProvider - providersByProxy map[string]providers.LLMProvider - modelsByProxy map[string][]string - proxy string - proxyFallbacks []string - workspace string - model string - maxIterations int - sessions *session.SessionManager - contextBuilder *ContextBuilder - tools *tools.ToolRegistry - orchestrator *tools.Orchestrator - running atomic.Bool - compactionCfg config.ContextCompactionConfig - llmCallTimeout time.Duration - workersMu sync.Mutex - workers map[string]*sessionWorker - autoLearnMu sync.Mutex - autoLearners map[string]*autoLearner - autonomyMu sync.Mutex - autonomyBySess map[string]*autonomySession - controlPolicy controlPolicy - parallelSafeTools map[string]struct{} - maxParallelCalls int - controlStats runtimeControlStats - runSeq atomic.Int64 - runStateMu sync.Mutex - runStates map[string]*runState - runStateTTL time.Duration - runStateMax int + bus *bus.MessageBus + provider providers.LLMProvider + workspace string + model string + maxIterations int + sessions *session.SessionManager + contextBuilder *ContextBuilder + tools *tools.ToolRegistry + running bool } -type taskExecutionDirectives struct { - task string - stageReport bool -} - -type runControlIntent struct { - runID string - latest bool - wait bool - timeout time.Duration -} - -type autoLearnIntent struct { - enabled *bool - interval *time.Duration -} - -type autonomyIntent struct { - enabled *bool - clearFocus bool - idleInterval *time.Duration - focus string -} - -type intentDetectionOutcome struct { - matched bool - confidence float64 -} - -type autonomyIntentLLMResponse struct { - Matched bool `json:"matched"` - Enabled *bool `json:"enabled"` - ClearFocus bool `json:"clear_focus"` - IdleMinutes int `json:"idle_minutes"` - Focus string `json:"focus"` - Confidence float64 `json:"confidence"` -} - -type autoLearnIntentLLMResponse struct { - Matched bool `json:"matched"` - Enabled *bool `json:"enabled"` - IntervalMinutes int `json:"interval_minutes"` - Confidence float64 `json:"confidence"` -} - -type taskExecutionDirectivesLLMResponse struct { - Task string `json:"task"` - StageReport bool `json:"stage_report"` - Confidence float64 `json:"confidence"` -} - -type runControlIntentLLMResponse struct { - Matched bool `json:"matched"` - RunID string `json:"run_id"` - Latest bool `json:"latest"` - Wait bool `json:"wait"` - TimeoutSeconds int `json:"timeout_seconds"` - Confidence float64 `json:"confidence"` -} - -var defaultParallelSafeToolNames = []string{"read_file", "list_files", "find_files", "grep_files", "memory_search", "web_search", "repo_map", "system_info"} - -type stageReporter struct { - onUpdate func(content string) - localize func(content string) string -} - -type StartupSelfCheckReport struct { - TotalSessions int - CompactedSessions int -} - -type loopPromptTemplates struct { - autonomyFollowUpReportNoFocus string - autonomyFollowUpSilentNoFocus string - autonomyFollowUpReportWithFocus string - autonomyFollowUpSilentWithFocus string - autonomyFocusBootstrap string - autoLearnRound string - autonomyTaskWrapper string - progressStart string - progressAnalysis string - progressExecutionStart string - progressExecutionRound string - progressToolDone string - progressToolFailed string - progressFinalization string - progressDone string -} - -type tokenUsageTotals struct { - input int - output int - total int -} - -type tokenUsageTotalsKey struct{} - -func defaultControlPolicy() controlPolicy { - return controlPolicy{ - intentMaxInputChars: 1200, - autonomyTickInterval: autonomyContinuousRunInterval, - autonomyMinRunInterval: autonomyContinuousRunInterval, - autonomyIdleThreshold: autonomyContinuousIdleThreshold, - autonomyMaxPendingDuration: 3 * time.Minute, - autonomyMaxConsecutiveStalls: 3, - } -} - -func envFloat64(key string, fallback float64) float64 { - v := strings.TrimSpace(os.Getenv(key)) - if v == "" { - return fallback - } - f, err := strconv.ParseFloat(v, 64) - if err != nil { - return fallback - } - return f -} - -func envInt(key string, fallback int) int { - v := strings.TrimSpace(os.Getenv(key)) - if v == "" { - return fallback - } - n, err := strconv.Atoi(v) - if err != nil { - return fallback - } - return n -} - -func envDuration(key string, fallback time.Duration) time.Duration { - v := strings.TrimSpace(os.Getenv(key)) - if v == "" { - return fallback - } - d, err := time.ParseDuration(v) - if err != nil { - return fallback - } - return d -} - -func loadControlPolicyFromConfig(base controlPolicy, rc config.RuntimeControlConfig) controlPolicy { - p := base - if rc.IntentMaxInputChars > 0 { - p.intentMaxInputChars = rc.IntentMaxInputChars - } - if rc.AutonomyTickIntervalSec > 0 { - p.autonomyTickInterval = time.Duration(rc.AutonomyTickIntervalSec) * time.Second - } - if rc.AutonomyMinRunIntervalSec > 0 { - p.autonomyMinRunInterval = time.Duration(rc.AutonomyMinRunIntervalSec) * time.Second - } - if rc.AutonomyIdleThresholdSec > 0 { - p.autonomyIdleThreshold = time.Duration(rc.AutonomyIdleThresholdSec) * time.Second - } - if rc.AutonomyMaxPendingDurationSec > 0 { - p.autonomyMaxPendingDuration = time.Duration(rc.AutonomyMaxPendingDurationSec) * time.Second - } - if rc.AutonomyMaxConsecutiveStalls > 0 { - p.autonomyMaxConsecutiveStalls = rc.AutonomyMaxConsecutiveStalls - } - return p -} - -func loadRunStatePolicyFromConfig(rc config.RuntimeControlConfig) (time.Duration, int) { - ttl := defaultRunStateTTL - if rc.RunStateTTLSeconds > 0 { - ttl = time.Duration(rc.RunStateTTLSeconds) * time.Second - } - maxEntries := defaultRunStateMaxEntries - if rc.RunStateMax > 0 { - maxEntries = rc.RunStateMax - } - return ttl, maxEntries -} - -func normalizeKeywordList(values []string, fallback []string) []string { - if len(values) == 0 { - return append([]string(nil), fallback...) - } - out := make([]string, 0, len(values)) - seen := make(map[string]struct{}, len(values)) - for _, value := range values { - normalized := strings.ToLower(strings.TrimSpace(value)) - if normalized == "" { - continue - } - if _, ok := seen[normalized]; ok { - continue - } - seen[normalized] = struct{}{} - out = append(out, normalized) - } - if len(out) == 0 { - return append([]string(nil), fallback...) - } - return out -} - -func loadToolParallelPolicyFromConfig(rc config.RuntimeControlConfig) (map[string]struct{}, int) { - names := normalizeKeywordList(rc.ToolParallelSafeNames, defaultParallelSafeToolNames) - allowed := make(map[string]struct{}, len(names)) - for _, name := range names { - allowed[name] = struct{}{} - } - maxParallel := rc.ToolMaxParallelCalls - if maxParallel <= 0 { - maxParallel = toolLoopMaxParallelCalls - } - if maxParallel < 1 { - maxParallel = 1 - } - if maxParallel > 8 { - maxParallel = 8 - } - return allowed, maxParallel -} - -// applyLegacyControlPolicyEnvOverrides keeps compatibility with older env names. -func applyLegacyControlPolicyEnvOverrides(base controlPolicy) controlPolicy { - p := base - p.intentMaxInputChars = envInt("CLAWGO_INTENT_MAX_INPUT_CHARS", p.intentMaxInputChars) - p.autonomyTickInterval = envDuration("CLAWGO_AUTONOMY_TICK_INTERVAL", p.autonomyTickInterval) - p.autonomyMinRunInterval = envDuration("CLAWGO_AUTONOMY_MIN_RUN_INTERVAL", p.autonomyMinRunInterval) - p.autonomyIdleThreshold = envDuration("CLAWGO_AUTONOMY_IDLE_THRESHOLD", p.autonomyIdleThreshold) - p.autonomyMaxPendingDuration = envDuration("CLAWGO_AUTONOMY_MAX_PENDING_DURATION", p.autonomyMaxPendingDuration) - p.autonomyMaxConsecutiveStalls = envInt("CLAWGO_AUTONOMY_MAX_STALLS", p.autonomyMaxConsecutiveStalls) - - if p.intentMaxInputChars < 200 { - p.intentMaxInputChars = base.intentMaxInputChars - } - if p.autonomyTickInterval < 5*time.Second { - p.autonomyTickInterval = base.autonomyTickInterval - } - if p.autonomyMinRunInterval < 5*time.Second { - p.autonomyMinRunInterval = base.autonomyMinRunInterval - } - if p.autonomyIdleThreshold < 5*time.Second { - p.autonomyIdleThreshold = base.autonomyIdleThreshold - } - if p.autonomyMaxPendingDuration < 10*time.Second { - p.autonomyMaxPendingDuration = base.autonomyMaxPendingDuration - } - if p.autonomyMaxConsecutiveStalls <= 0 { - p.autonomyMaxConsecutiveStalls = base.autonomyMaxConsecutiveStalls - } - return p -} - -func (sr *stageReporter) Publish(stage int, total int, status string, detail string) { - if sr == nil || sr.onUpdate == nil { - return - } - _ = stage - _ = total - detail = strings.TrimSpace(detail) - if detail != "" { - if sr.localize != nil { - detail = sr.localize(detail) - } - sr.onUpdate(detail) - return - } - status = strings.TrimSpace(status) - if status != "" { - if sr.localize != nil { - status = sr.localize(status) - } - sr.onUpdate(status) - return - } - fallback := "Processing update" - if sr.localize != nil { - fallback = sr.localize(fallback) - } - sr.onUpdate(fallback) -} - -type userLanguageHint struct { - sessionKey string - content string -} - -type userLanguageHintKey struct{} - func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers.LLMProvider, cs *cron.CronService) *AgentLoop { workspace := cfg.WorkspacePath() os.MkdirAll(workspace, 0755) - logger.InfoCF("agent", "Agent workspace initialized", map[string]interface{}{ - "workspace": workspace, - }) toolsRegistry := tools.NewToolRegistry() - toolsRegistry.Register(tools.NewReadFileTool(workspace)) - toolsRegistry.Register(tools.NewWriteFileTool(workspace)) - toolsRegistry.Register(tools.NewListDirTool(workspace)) + toolsRegistry.Register(&tools.ReadFileTool{}) + toolsRegistry.Register(&tools.WriteFileTool{}) + toolsRegistry.Register(&tools.ListDirTool{}) toolsRegistry.Register(tools.NewExecTool(cfg.Tools.Shell, workspace)) if cs != nil { @@ -497,12 +54,12 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers toolsRegistry.Register(tools.NewWebSearchTool(braveAPIKey, cfg.Tools.Web.Search.MaxResults)) webFetchTool := tools.NewWebFetchTool(50000) toolsRegistry.Register(webFetchTool) + toolsRegistry.Register(tools.NewParallelFetchTool(webFetchTool)) // Register message tool messageTool := tools.NewMessageTool() - messageTool.SetSendCallback(func(channel, chatID, content string, buttons [][]bus.Button) error { + messageTool.SetSendCallback(func(channel, chatID, content string) error { msgBus.PublishOutbound(bus.OutboundMessage{ - Buttons: buttons, Channel: channel, ChatID: chatID, Content: content, @@ -512,14 +69,9 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers toolsRegistry.Register(messageTool) // Register spawn tool - orchestrator := tools.NewOrchestrator() - subagentManager := tools.NewSubagentManager(provider, workspace, msgBus, orchestrator) + subagentManager := tools.NewSubagentManager(provider, workspace, msgBus) spawnTool := tools.NewSpawnTool(subagentManager) toolsRegistry.Register(spawnTool) - toolsRegistry.Register(tools.NewPipelineCreateTool(orchestrator)) - toolsRegistry.Register(tools.NewPipelineStatusTool(orchestrator)) - toolsRegistry.Register(tools.NewPipelineStateSetTool(orchestrator)) - toolsRegistry.Register(tools.NewPipelineDispatchTool(orchestrator, subagentManager)) // Register edit file tool editFileTool := tools.NewEditFileTool(workspace) @@ -528,8 +80,9 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers // Register memory search tool memorySearchTool := tools.NewMemorySearchTool(workspace) toolsRegistry.Register(memorySearchTool) - toolsRegistry.Register(tools.NewRepoMapTool(workspace)) - toolsRegistry.Register(tools.NewSkillExecTool(workspace)) + + // Register parallel execution tool (leveraging Go's concurrency) + toolsRegistry.Register(tools.NewParallelTool(toolsRegistry)) // Register browser tool (integrated Chromium support) toolsRegistry.Register(tools.NewBrowserTool()) @@ -541,89 +94,21 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers sessionsManager := session.NewSessionManager(filepath.Join(filepath.Dir(cfg.WorkspacePath()), "sessions")) - providersByProxy, err := providers.CreateProviders(cfg) - if err != nil { - logger.WarnCF("agent", "Create providers map failed, fallback to single provider mode", map[string]interface{}{ - logger.FieldError: err.Error(), - }) - providersByProxy = map[string]providers.LLMProvider{ - "proxy": provider, - } - } - modelsByProxy := map[string][]string{} - for _, name := range providers.ListProviderNames(cfg) { - modelsByProxy[name] = providers.GetProviderModels(cfg, name) - } - - primaryProxy := strings.TrimSpace(cfg.Agents.Defaults.Proxy) - if primaryProxy == "" { - primaryProxy = "proxy" - } - if p, ok := providersByProxy[primaryProxy]; ok { - provider = p - } else if p, ok := providersByProxy["proxy"]; ok { - primaryProxy = "proxy" - provider = p - } - defaultModel := defaultModelFromModels(modelsByProxy[primaryProxy], provider) - policy := loadControlPolicyFromConfig(defaultControlPolicy(), cfg.Agents.Defaults.RuntimeControl) - policy = applyLegacyControlPolicyEnvOverrides(policy) - parallelSafeTools, maxParallelCalls := loadToolParallelPolicyFromConfig(cfg.Agents.Defaults.RuntimeControl) - toolsRegistry.Register(tools.NewParallelTool(toolsRegistry, maxParallelCalls, parallelSafeTools)) - toolsRegistry.Register(tools.NewParallelFetchTool(webFetchTool, maxParallelCalls, parallelSafeTools)) - runStateTTL, runStateMax := loadRunStatePolicyFromConfig(cfg.Agents.Defaults.RuntimeControl) - // Keep compatibility with older env names. - runStateTTL = envDuration("CLAWGO_RUN_STATE_TTL", runStateTTL) - if runStateTTL < 1*time.Minute { - runStateTTL = defaultRunStateTTL - } - runStateMax = envInt("CLAWGO_RUN_STATE_MAX", runStateMax) - if runStateMax <= 0 { - runStateMax = defaultRunStateMaxEntries - } - loop := &AgentLoop{ - bus: msgBus, - provider: provider, - providersByProxy: providersByProxy, - modelsByProxy: modelsByProxy, - proxy: primaryProxy, - proxyFallbacks: parseStringList(cfg.Agents.Defaults.ProxyFallbacks), - workspace: workspace, - model: defaultModel, - maxIterations: cfg.Agents.Defaults.MaxToolIterations, - sessions: sessionsManager, - contextBuilder: NewContextBuilder(workspace, cfg.Memory, cfg.Agents.Defaults.RuntimeControl.SystemSummary, func() []string { return toolsRegistry.GetSummaries() }), - tools: toolsRegistry, - orchestrator: orchestrator, - compactionCfg: cfg.Agents.Defaults.ContextCompaction, - llmCallTimeout: time.Duration(cfg.Providers.Proxy.TimeoutSec) * time.Second, - workers: make(map[string]*sessionWorker), - autoLearners: make(map[string]*autoLearner), - autonomyBySess: make(map[string]*autonomySession), - controlPolicy: policy, - parallelSafeTools: parallelSafeTools, - maxParallelCalls: maxParallelCalls, - runStates: make(map[string]*runState), - runStateTTL: runStateTTL, - runStateMax: runStateMax, + bus: msgBus, + provider: provider, + workspace: workspace, + model: cfg.Agents.Defaults.Model, + maxIterations: cfg.Agents.Defaults.MaxToolIterations, + sessions: sessionsManager, + contextBuilder: NewContextBuilder(workspace, func() []string { return toolsRegistry.GetSummaries() }), + tools: toolsRegistry, + running: false, } - logger.InfoCF("agent", "Control policy initialized", map[string]interface{}{ - "intent_max_input_chars": policy.intentMaxInputChars, - "autonomy_tick_interval": policy.autonomyTickInterval.String(), - "autonomy_min_run_interval": policy.autonomyMinRunInterval.String(), - "autonomy_idle_threshold": policy.autonomyIdleThreshold.String(), - "autonomy_max_pending_duration": policy.autonomyMaxPendingDuration.String(), - "autonomy_max_consecutive_stalls": policy.autonomyMaxConsecutiveStalls, - "parallel_safe_tool_count": len(parallelSafeTools), - "tool_max_parallel_calls": maxParallelCalls, - "run_state_ttl": runStateTTL.String(), - "run_state_max": runStateMax, - }) - // Inject recursive run logic so subagent has full tool-calling capability. + // 注入递归运行逻辑,使 subagent 具备 full tool-calling 能力 subagentManager.SetRunFunc(func(ctx context.Context, task, channel, chatID string) (string, error) { - sessionKey := fmt.Sprintf("subagent:%d", os.Getpid()) // Use PID/random value to avoid session key collisions. + sessionKey := fmt.Sprintf("subagent:%d", os.Getpid()) // 改用 PID 或随机数,避免 sessionKey 冲突 return loop.ProcessDirect(ctx, task, sessionKey) }) @@ -631,30 +116,30 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers } func (al *AgentLoop) Run(ctx context.Context) error { - al.running.Store(true) + al.running = true - for al.running.Load() { + for al.running { select { case <-ctx.Done(): - al.stopAllWorkers() - al.stopAllAutoLearners() - al.stopAllAutonomySessions() return nil default: msg, ok := al.bus.ConsumeInbound(ctx) if !ok { - al.stopAllWorkers() - al.stopAllAutoLearners() - al.stopAllAutonomySessions() - return nil - } - - if isStopCommand(msg.Content) { - al.handleStopCommand(msg) continue } - al.enqueueMessage(ctx, msg) + response, err := al.processMessage(ctx, msg) + if err != nil { + response = fmt.Sprintf("Error processing message: %v", err) + } + + if response != "" { + al.bus.PublishOutbound(bus.OutboundMessage{ + Channel: msg.Channel, + ChatID: msg.ChatID, + Content: response, + }) + } } } @@ -662,1309 +147,7 @@ func (al *AgentLoop) Run(ctx context.Context) error { } func (al *AgentLoop) Stop() { - al.running.Store(false) - al.stopAllWorkers() - al.stopAllAutoLearners() - al.stopAllAutonomySessions() -} - -func isStopCommand(content string) bool { - return strings.EqualFold(strings.TrimSpace(content), "/stop") -} - -func (al *AgentLoop) handleStopCommand(msg bus.InboundMessage) { - al.cancelSessionWorkerRun(msg.SessionKey) -} - -func (al *AgentLoop) cancelSessionWorkerRun(sessionKey string) { - worker := al.getWorker(sessionKey) - if worker == nil { - return - } - - worker.cancelMu.Lock() - cancel := worker.cancel - worker.cancelMu.Unlock() - - if cancel == nil { - return - } - - cancel() -} - -func (al *AgentLoop) enqueueMessage(ctx context.Context, msg bus.InboundMessage) { - _ = ctx - worker := al.getOrCreateWorker(ctx, msg.SessionKey) - worker.enqueue(msg) -} - -func (al *AgentLoop) getWorker(sessionKey string) *sessionWorker { - al.workersMu.Lock() - defer al.workersMu.Unlock() - return al.workers[sessionKey] -} - -func (al *AgentLoop) getOrCreateWorker(ctx context.Context, sessionKey string) *sessionWorker { - al.workersMu.Lock() - defer al.workersMu.Unlock() - - if w, ok := al.workers[sessionKey]; ok { - return w - } - - w := &sessionWorker{ - queue: make([]bus.InboundMessage, 0, 16), - queueNotify: make(chan struct{}, 1), - } - al.workers[sessionKey] = w - - go al.runSessionWorker(ctx, sessionKey, w) - return w -} - -func (al *AgentLoop) runSessionWorker(ctx context.Context, sessionKey string, worker *sessionWorker) { - for { - msg, ok := worker.dequeue(ctx) - if !ok { - al.clearWorkerCancel(worker) - al.removeWorker(sessionKey, worker) - return - } - - func() { - taskCtx, cancel := context.WithCancel(ctx) - taskCtx, tokenTotals := withTokenUsageTotals(taskCtx) - worker.cancelMu.Lock() - worker.cancel = cancel - worker.cancelMu.Unlock() - - defer func() { - cancel() - al.clearWorkerCancel(worker) - if r := recover(); r != nil { - logger.ErrorCF("agent", "Session worker recovered from panic", map[string]interface{}{ - "session_key": sessionKey, - "panic": fmt.Sprintf("%v", r), - }) - } - }() - - response, err := al.processMessage(taskCtx, msg) - if err != nil { - if errors.Is(err, context.Canceled) { - return - } - response = al.formatProcessingErrorMessage(taskCtx, msg, err) - } - - if response != "" && shouldPublishSyntheticResponse(msg) { - if al != nil && al.sessions != nil && tokenTotals != nil { - al.sessions.AddTokenUsage( - msg.SessionKey, - tokenTotals.input, - tokenTotals.output, - tokenTotals.total, - ) - } - response += formatTokenUsageSuffix( - tokenTotals, - ) - al.bus.PublishOutbound(bus.OutboundMessage{ - Buttons: nil, - Channel: msg.Channel, - ChatID: msg.ChatID, - Content: response, - }) - } - }() - } -} - -func (w *sessionWorker) enqueue(msg bus.InboundMessage) { - if w == nil { - return - } - w.queueMu.Lock() - w.queue = append(w.queue, msg) - w.queueMu.Unlock() - select { - case w.queueNotify <- struct{}{}: - default: - } -} - -func (w *sessionWorker) dequeue(ctx context.Context) (bus.InboundMessage, bool) { - if w == nil { - return bus.InboundMessage{}, false - } - for { - w.queueMu.Lock() - if len(w.queue) > 0 { - msg := w.queue[0] - w.queue[0] = bus.InboundMessage{} - w.queue = w.queue[1:] - w.queueMu.Unlock() - return msg, true - } - w.queueMu.Unlock() - - select { - case <-ctx.Done(): - return bus.InboundMessage{}, false - case <-w.queueNotify: - } - } -} - -func (al *AgentLoop) clearWorkerCancel(worker *sessionWorker) { - worker.cancelMu.Lock() - worker.cancel = nil - worker.cancelMu.Unlock() -} - -func (al *AgentLoop) formatProcessingErrorMessage(ctx context.Context, msg bus.InboundMessage, err error) string { - return "" -} - -func (al *AgentLoop) preferChineseUserFacingText(sessionKey, currentContent string) bool { - zhCount, enCount := countLanguageSignals(currentContent) - - if al != nil && al.sessions != nil && strings.TrimSpace(sessionKey) != "" { - history := al.sessions.GetHistory(sessionKey) - seenUserTurns := 0 - for i := len(history) - 1; i >= 0 && seenUserTurns < 6; i-- { - if history[i].Role != "user" { - continue - } - seenUserTurns++ - z, e := countLanguageSignals(history[i].Content) - zhCount += z - enCount += e - } - } - - if zhCount == 0 && enCount == 0 { - return false - } - return zhCount >= enCount -} - -func countLanguageSignals(text string) (zhCount int, enCount int) { - inEnglishWord := false - for _, r := range text { - if unicode.In(r, unicode.Han) { - zhCount++ - inEnglishWord = false - continue - } - if r <= unicode.MaxASCII && unicode.IsLetter(r) { - if !inEnglishWord { - enCount++ - inEnglishWord = true - } - continue - } - inEnglishWord = false - } - return zhCount, enCount -} - -func (al *AgentLoop) removeWorker(sessionKey string, worker *sessionWorker) { - al.workersMu.Lock() - defer al.workersMu.Unlock() - if cur, ok := al.workers[sessionKey]; ok && cur == worker { - delete(al.workers, sessionKey) - } -} - -func (al *AgentLoop) stopAllWorkers() { - al.workersMu.Lock() - workers := make([]*sessionWorker, 0, len(al.workers)) - for _, w := range al.workers { - workers = append(workers, w) - } - al.workersMu.Unlock() - - for _, w := range workers { - w.cancelMu.Lock() - cancel := w.cancel - w.cancelMu.Unlock() - if cancel != nil { - cancel() - } - } -} - -func (al *AgentLoop) stopAllAutoLearners() { - al.autoLearnMu.Lock() - defer al.autoLearnMu.Unlock() - - for sessionKey, learner := range al.autoLearners { - if learner != nil && learner.cancel != nil { - learner.cancel() - } - delete(al.autoLearners, sessionKey) - } -} - -func (al *AgentLoop) stopAllAutonomySessions() { - al.autonomyMu.Lock() - defer al.autonomyMu.Unlock() - - for sessionKey, s := range al.autonomyBySess { - if s != nil && s.cancel != nil { - s.cancel() - } - delete(al.autonomyBySess, sessionKey) - } -} - -func (al *AgentLoop) startAutonomy(ctx context.Context, msg bus.InboundMessage, idleInterval time.Duration, focus string) { - if msg.Channel == "cli" { - return - } - - if idleInterval <= 0 { - idleInterval = autonomyDefaultIdleInterval - } - if idleInterval < autonomyMinIdleInterval { - idleInterval = autonomyMinIdleInterval - } - - al.autonomyMu.Lock() - if old, ok := al.autonomyBySess[msg.SessionKey]; ok { - if old != nil && old.cancel != nil { - old.cancel() - } - delete(al.autonomyBySess, msg.SessionKey) - } - - langCtx := withUserLanguageHint(context.Background(), msg.SessionKey, msg.Content) - sessionCtx, cancel := context.WithCancel(langCtx) - s := &autonomySession{ - cancel: cancel, - started: time.Now(), - idleInterval: idleInterval, - lastUserAt: time.Now(), - lastReportAt: time.Now(), - focus: strings.TrimSpace(focus), - } - al.autonomyBySess[msg.SessionKey] = s - al.autonomyMu.Unlock() - - go al.runAutonomyLoop(sessionCtx, msg) - if s.focus != "" { - al.bus.PublishInbound(bus.InboundMessage{ - Channel: msg.Channel, - SenderID: "autonomy", - ChatID: msg.ChatID, - SessionKey: msg.SessionKey, - Content: al.buildAutonomyFocusPrompt(s.focus), - Metadata: map[string]string{ - "source": "autonomy", - "round": "0", - "mode": "focus_bootstrap", - }, - }) - } - return -} - -func (al *AgentLoop) stopAutonomy(sessionKey string) bool { - al.autonomyMu.Lock() - defer al.autonomyMu.Unlock() - - s, ok := al.autonomyBySess[sessionKey] - if !ok || s == nil { - return false - } - if s.cancel != nil { - s.cancel() - } - delete(al.autonomyBySess, sessionKey) - return true -} - -func (al *AgentLoop) clearAutonomyFocus(sessionKey string) bool { - al.autonomyMu.Lock() - defer al.autonomyMu.Unlock() - - s, ok := al.autonomyBySess[sessionKey] - if !ok || s == nil { - return false - } - s.focus = "" - return true -} - -func (al *AgentLoop) isAutonomyEnabled(sessionKey string) bool { - al.autonomyMu.Lock() - defer al.autonomyMu.Unlock() - _, ok := al.autonomyBySess[sessionKey] - return ok -} - -func (al *AgentLoop) noteAutonomyUserActivity(msg bus.InboundMessage) { - if isSyntheticMessage(msg) { - return - } - al.autonomyMu.Lock() - defer al.autonomyMu.Unlock() - - s, ok := al.autonomyBySess[msg.SessionKey] - if !ok || s == nil { - return - } - s.lastUserAt = time.Now() -} - -func (al *AgentLoop) runAutonomyLoop(ctx context.Context, msg bus.InboundMessage) { - tick := autonomyContinuousRunInterval - if al != nil && al.controlPolicy.autonomyTickInterval > 0 { - tick = al.controlPolicy.autonomyTickInterval - } - ticker := time.NewTicker(tick) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - if !al.maybeRunAutonomyRound(msg) { - return - } - } - } -} - -func (al *AgentLoop) maybeRunAutonomyRound(msg bus.InboundMessage) bool { - policy := defaultControlPolicy() - if al != nil { - policy = al.controlPolicy - } - al.autonomyMu.Lock() - s, ok := al.autonomyBySess[msg.SessionKey] - if !ok || s == nil { - al.autonomyMu.Unlock() - return false - } - - now := time.Now() - if s.pending { - if !s.pendingSince.IsZero() && now.Sub(s.pendingSince) > policy.autonomyMaxPendingDuration { - s.pending = false - s.pendingSince = time.Time{} - s.stallCount++ - if s.stallCount >= policy.autonomyMaxConsecutiveStalls { - if s.cancel != nil { - s.cancel() - } - delete(al.autonomyBySess, msg.SessionKey) - al.autonomyMu.Unlock() - al.cancelSessionWorkerRun(msg.SessionKey) - al.controlMetricAdd(&al.controlStats.autonomyStoppedByGuard, 1) - return false - } - } - al.autonomyMu.Unlock() - return true - } - if now.Sub(s.lastUserAt) < policy.autonomyIdleThreshold || - now.Sub(s.lastNudgeAt) < policy.autonomyMinRunInterval { - al.autonomyMu.Unlock() - return true - } - - s.rounds++ - round := s.rounds - s.lastNudgeAt = now - s.pending = true - s.pendingSince = now - reportDue := now.Sub(s.lastReportAt) >= s.idleInterval - if reportDue { - s.lastReportAt = now - } - focus := strings.TrimSpace(s.focus) - al.autonomyMu.Unlock() - al.controlMetricAdd(&al.controlStats.autonomyRounds, 1) - - al.bus.PublishInbound(bus.InboundMessage{ - Channel: msg.Channel, - SenderID: "autonomy", - ChatID: msg.ChatID, - SessionKey: msg.SessionKey, - Content: al.buildAutonomyFollowUpPrompt(round, focus, reportDue), - Metadata: map[string]string{ - "source": "autonomy", - "round": strconv.Itoa(round), - "report_due": strconv.FormatBool(reportDue), - }, - }) - - return true -} - -func (al *AgentLoop) finishAutonomyRound(sessionKey string) { - al.autonomyMu.Lock() - defer al.autonomyMu.Unlock() - if s, ok := al.autonomyBySess[sessionKey]; ok && s != nil { - s.pending = false - s.pendingSince = time.Time{} - s.stallCount = 0 - } -} - -func (al *AgentLoop) buildAutonomyFollowUpPrompt(round int, focus string, reportDue bool) string { - prompts := al.loadLoopPromptTemplates() - focus = strings.TrimSpace(focus) - if focus == "" && reportDue { - return renderLoopPromptTemplate(prompts.autonomyFollowUpReportNoFocus, map[string]string{ - "round": strconv.Itoa(round), - }) - } - if focus == "" && !reportDue { - return renderLoopPromptTemplate(prompts.autonomyFollowUpSilentNoFocus, map[string]string{ - "round": strconv.Itoa(round), - }) - } - if reportDue { - return renderLoopPromptTemplate(prompts.autonomyFollowUpReportWithFocus, map[string]string{ - "round": strconv.Itoa(round), - "focus": focus, - }) - } - return renderLoopPromptTemplate(prompts.autonomyFollowUpSilentWithFocus, map[string]string{ - "round": strconv.Itoa(round), - "focus": focus, - }) -} - -func (al *AgentLoop) buildAutonomyFocusPrompt(focus string) string { - prompts := al.loadLoopPromptTemplates() - focus = strings.TrimSpace(focus) - return renderLoopPromptTemplate(prompts.autonomyFocusBootstrap, map[string]string{ - "focus": focus, - }) -} - -func (al *AgentLoop) startAutoLearner(ctx context.Context, msg bus.InboundMessage, interval time.Duration) { - if msg.Channel == "cli" { - return - } - - if interval <= 0 { - interval = autoLearnDefaultInterval - } - if interval < autoLearnMinInterval { - interval = autoLearnMinInterval - } - - al.autoLearnMu.Lock() - if old, ok := al.autoLearners[msg.SessionKey]; ok { - if old != nil && old.cancel != nil { - old.cancel() - } - delete(al.autoLearners, msg.SessionKey) - } - - langCtx := withUserLanguageHint(context.Background(), msg.SessionKey, msg.Content) - learnerCtx, cancel := context.WithCancel(langCtx) - learner := &autoLearner{ - cancel: cancel, - started: time.Now(), - interval: interval, - } - al.autoLearners[msg.SessionKey] = learner - al.autoLearnMu.Unlock() - - go al.runAutoLearnerLoop(learnerCtx, msg) - return -} - -func (al *AgentLoop) runAutoLearnerLoop(ctx context.Context, msg bus.InboundMessage) { - runOnce := func() bool { - round, ok := al.bumpAutoLearnRound(msg.SessionKey) - if !ok { - return false - } - al.controlMetricAdd(&al.controlStats.autoLearnRounds, 1) - - al.bus.PublishInbound(bus.InboundMessage{ - Channel: msg.Channel, - SenderID: "autolearn", - ChatID: msg.ChatID, - SessionKey: msg.SessionKey, - Content: al.buildAutoLearnPrompt(round), - Metadata: map[string]string{ - "source": "autolearn", - "round": strconv.Itoa(round), - }, - }) - return true - } - - if !runOnce() { - return - } - - ticker := time.NewTicker(al.autoLearnInterval(msg.SessionKey)) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - if !runOnce() { - return - } - } - } -} - -func (al *AgentLoop) autoLearnInterval(sessionKey string) time.Duration { - al.autoLearnMu.Lock() - defer al.autoLearnMu.Unlock() - - learner, ok := al.autoLearners[sessionKey] - if !ok || learner == nil || learner.interval <= 0 { - return autoLearnDefaultInterval - } - return learner.interval -} - -func (al *AgentLoop) bumpAutoLearnRound(sessionKey string) (int, bool) { - al.autoLearnMu.Lock() - defer al.autoLearnMu.Unlock() - - learner, ok := al.autoLearners[sessionKey] - if !ok || learner == nil { - return 0, false - } - learner.rounds++ - return learner.rounds, true -} - -func (al *AgentLoop) stopAutoLearner(sessionKey string) bool { - al.autoLearnMu.Lock() - defer al.autoLearnMu.Unlock() - - learner, ok := al.autoLearners[sessionKey] - if !ok || learner == nil { - return false - } - if learner.cancel != nil { - learner.cancel() - } - delete(al.autoLearners, sessionKey) - return true -} - -func (al *AgentLoop) buildAutoLearnPrompt(round int) string { - prompts := al.loadLoopPromptTemplates() - return renderLoopPromptTemplate(prompts.autoLearnRound, map[string]string{ - "round": strconv.Itoa(round), - }) -} - -func (al *AgentLoop) buildAutonomyTaskPrompt(task string) string { - prompts := al.loadLoopPromptTemplates() - return renderLoopPromptTemplate(prompts.autonomyTaskWrapper, map[string]string{ - "task": strings.TrimSpace(task), - }) -} - -func defaultLoopPromptTemplates() loopPromptTemplates { - return loopPromptTemplates{ - autonomyFollowUpReportNoFocus: "Autonomy round {round}: first complete the current active task. After it is complete, you may continue with one closely related next step and report progress.", - autonomyFollowUpSilentNoFocus: "Autonomy round {round}: first complete the current active task. After it is complete, you may continue with one closely related next step. If blocked, stop this round without external reply.", - autonomyFollowUpReportWithFocus: "Autonomy round {round}: first complete focus \"{focus}\". After that focus is complete, you may extend with one closely related next step; avoid unrelated branches. If blocked, report blocker and pause.", - autonomyFollowUpSilentWithFocus: "Autonomy round {round}: first complete focus \"{focus}\". After that focus is complete, you may extend with one closely related next step; avoid unrelated branches. If blocked, stop this round without external reply.", - autonomyFocusBootstrap: "Autonomy mode started. Prioritize focus \"{focus}\" first. Once complete, extension is allowed only to directly related next steps.", - autoLearnRound: "Auto-learn round {round}: choose one small bounded task and complete it. If finished with remaining capacity, you may do one directly related extension step, then stop.", - autonomyTaskWrapper: "Execute the user task below first. After completion, you may continue with directly related improvements. Avoid unrelated side tasks.\n\nUser task: {task}", - progressStart: "I received your task and will clarify the goal and constraints first.", - progressAnalysis: "I am building the context needed for execution.", - progressExecutionStart: "I am starting step-by-step execution.", - progressExecutionRound: "Starting another execution round.", - progressToolDone: "Tool execution completed.", - progressToolFailed: "Tool execution failed.", - progressFinalization: "Final response is ready.", - progressDone: "Task completed.", - } -} - -func (al *AgentLoop) loadLoopPromptTemplates() loopPromptTemplates { - prompts := defaultLoopPromptTemplates() - if al == nil || strings.TrimSpace(al.workspace) == "" { - return prompts - } - - for _, filename := range []string{"AGENTS.md", "USER.md"} { - filePath := filepath.Join(al.workspace, filename) - data, err := os.ReadFile(filePath) - if err != nil { - continue - } - applyLoopPromptOverrides(&prompts, string(data)) - } - return prompts -} - -func applyLoopPromptOverrides(dst *loopPromptTemplates, content string) { - if dst == nil { - return - } - const sectionHeader = "## CLAWGO_LOOP_PROMPTS" - lines := strings.Split(content, "\n") - inSection := false - - for _, raw := range lines { - line := strings.TrimSpace(raw) - if strings.HasPrefix(line, "## ") { - if strings.EqualFold(line, sectionHeader) { - inSection = true - continue - } - if inSection { - break - } - } - if !inSection || line == "" || strings.HasPrefix(line, "