From 79e0a48b743380a560da01ce6879143610c788b7 Mon Sep 17 00:00:00 2001 From: lpf Date: Tue, 14 Apr 2026 14:53:18 +0800 Subject: [PATCH] feat(runtime): add process watch patterns, unified backup/import, pluggable context engine, token usage, and codex device login --- cmd/cli_common.go | 1 + cmd/cmd_backup.go | 354 ++++++++++++++++++++++++++ cmd/cmd_backup_test.go | 71 ++++++ cmd/cmd_config.go | 10 +- cmd/main.go | 4 +- pkg/agent/context_engine.go | 55 ++++ pkg/agent/context_engine_test.go | 48 ++++ pkg/agent/loop.go | 192 +++++++++++--- pkg/agent/loop_usage_test.go | 26 ++ pkg/providers/oauth.go | 169 ++++++++++-- pkg/providers/oauth_test.go | 92 +++++++ pkg/tools/message_process_test.go | 62 +++++ pkg/tools/process_tool.go | 167 +++++++++++- pkg/tools/subagent.go | 6 + pkg/tools/subagent_runtime_context.go | 14 +- workspace/AGENTS.md | 21 +- workspace/SOUL.md | 10 + workspace/USER.md | 19 ++ 18 files changed, 1257 insertions(+), 64 deletions(-) create mode 100644 cmd/cmd_backup.go create mode 100644 cmd/cmd_backup_test.go create mode 100644 pkg/agent/context_engine.go create mode 100644 pkg/agent/context_engine_test.go create mode 100644 pkg/agent/loop_usage_test.go diff --git a/cmd/cli_common.go b/cmd/cli_common.go index b683190..0e108a2 100644 --- a/cmd/cli_common.go +++ b/cmd/cli_common.go @@ -97,6 +97,7 @@ func printHelp() { fmt.Println(" cron Manage scheduled tasks") fmt.Println(" channel Test and manage messaging channels") fmt.Println(" skills Manage skills (install, list, remove)") + fmt.Println(" backup Unified backup/import for config, sessions, memory, skills") if tuiEnabled { fmt.Println(" tui Chat in terminal using the gateway chat API") } diff --git a/cmd/cmd_backup.go b/cmd/cmd_backup.go new file mode 100644 index 0000000..b82da53 --- /dev/null +++ b/cmd/cmd_backup.go @@ -0,0 +1,354 @@ +package main + +import ( + "archive/zip" + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "sort" + "strings" + "time" +) + +type unifiedBackupManifest struct { + Version int `json:"version"` + CreatedAt string `json:"created_at"` + Config string `json:"config"` + Workspace string `json:"workspace"` + Includes []string `json:"includes,omitempty"` +} + +func backupCmd() { + if len(os.Args) < 3 { + backupHelp() + return + } + switch strings.TrimSpace(os.Args[2]) { + case "create": + backupCreateCmd() + case "import": + backupImportCmd() + default: + fmt.Printf("Unknown backup command: %s\n", os.Args[2]) + backupHelp() + } +} + +func backupHelp() { + fmt.Println("\nBackup commands:") + fmt.Println(" create [archive.zip] Create unified backup (config + sessions + memory + skills)") + fmt.Println(" import Restore unified backup and auto-create rollback snapshot") + fmt.Println() + fmt.Println("Examples:") + fmt.Println(" clawgo backup create") + fmt.Println(" clawgo backup create /tmp/clawgo-backup.zip") + fmt.Println(" clawgo backup import /tmp/clawgo-backup.zip") +} + +func backupCreateCmd() { + cfg, err := loadConfig() + if err != nil { + fmt.Printf("Error loading config: %v\n", err) + return + } + out := "" + if len(os.Args) >= 4 { + out = strings.TrimSpace(os.Args[3]) + } + if out == "" { + out = defaultBackupPathForConfig("clawgo-backup", getConfigPath()) + } + count, err := createUnifiedBackup(cfg.WorkspacePath(), getConfigPath(), out) + if err != nil { + fmt.Printf("Backup failed: %v\n", err) + return + } + fmt.Printf("Backup created: %s (%d files)\n", out, count) +} + +func backupImportCmd() { + if len(os.Args) < 4 { + fmt.Println("Usage: clawgo backup import ") + return + } + cfg, err := loadConfig() + if err != nil { + fmt.Printf("Error loading config: %v\n", err) + return + } + archive := strings.TrimSpace(os.Args[3]) + rollbackPath, restored, err := importUnifiedBackup(cfg.WorkspacePath(), getConfigPath(), archive) + if err != nil { + fmt.Printf("Import failed: %v\n", err) + return + } + fmt.Printf("Import completed: %d files restored\n", restored) + fmt.Printf("Rollback snapshot: %s\n", rollbackPath) +} + +func defaultBackupPath(prefix string) string { + return defaultBackupPathForConfig(prefix, getConfigPath()) +} + +func defaultBackupPathForConfig(prefix, configPath string) string { + configPath = strings.TrimSpace(configPath) + if configPath == "" { + configPath = getConfigPath() + } + dir := filepath.Join(filepath.Dir(configPath), "backups") + _ = os.MkdirAll(dir, 0755) + name := fmt.Sprintf("%s-%s.zip", prefix, time.Now().Format("20060102-150405")) + return filepath.Join(dir, name) +} + +func createUnifiedBackup(workspacePath, configPath, archivePath string) (int, error) { + workspacePath = strings.TrimSpace(workspacePath) + configPath = strings.TrimSpace(configPath) + archivePath = strings.TrimSpace(archivePath) + if workspacePath == "" { + return 0, fmt.Errorf("workspace path is empty") + } + if configPath == "" { + return 0, fmt.Errorf("config path is empty") + } + if archivePath == "" { + return 0, fmt.Errorf("archive path is empty") + } + if err := os.MkdirAll(filepath.Dir(archivePath), 0755); err != nil { + return 0, err + } + + f, err := os.Create(archivePath) + if err != nil { + return 0, err + } + defer f.Close() + zw := zip.NewWriter(f) + defer zw.Close() + + agentsRoot := filepath.Join(filepath.Dir(workspacePath), "agents") + includes := []string{ + "config/config.json", + "workspace/MEMORY.md", + "workspace/memory/**", + "workspace/skills/**", + "agents/**", + "workspace/AGENTS.md", + "workspace/USER.md", + "workspace/SOUL.md", + } + + fileCount := 0 + seen := map[string]struct{}{} + addFile := func(src, dst string) error { + src = filepath.Clean(src) + dst = filepath.ToSlash(strings.TrimSpace(dst)) + if src == "" || dst == "" { + return nil + } + if _, ok := seen[dst]; ok { + return nil + } + info, err := os.Stat(src) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + if info.IsDir() { + return nil + } + r, err := os.Open(src) + if err != nil { + return err + } + defer r.Close() + hdr, err := zip.FileInfoHeader(info) + if err != nil { + return err + } + hdr.Name = dst + hdr.Method = zip.Deflate + w, err := zw.CreateHeader(hdr) + if err != nil { + return err + } + if _, err := io.Copy(w, r); err != nil { + return err + } + seen[dst] = struct{}{} + fileCount++ + return nil + } + addTree := func(srcDir, dstDir string) error { + info, err := os.Stat(srcDir) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + if !info.IsDir() { + return addFile(srcDir, filepath.Join(dstDir, filepath.Base(srcDir))) + } + return filepath.Walk(srcDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + return nil + } + rel, err := filepath.Rel(srcDir, path) + if err != nil { + return err + } + return addFile(path, filepath.Join(dstDir, rel)) + }) + } + + if err := addFile(configPath, "config/config.json"); err != nil { + return 0, err + } + if err := addFile(filepath.Join(workspacePath, "MEMORY.md"), "workspace/MEMORY.md"); err != nil { + return 0, err + } + if err := addTree(filepath.Join(workspacePath, "memory"), "workspace/memory"); err != nil { + return 0, err + } + if err := addTree(filepath.Join(workspacePath, "skills"), "workspace/skills"); err != nil { + return 0, err + } + if err := addTree(agentsRoot, "agents"); err != nil { + return 0, err + } + for _, name := range []string{"AGENTS.md", "USER.md", "SOUL.md"} { + if err := addFile(filepath.Join(workspacePath, name), filepath.Join("workspace", name)); err != nil { + return 0, err + } + } + + manifest := unifiedBackupManifest{ + Version: 1, + CreatedAt: time.Now().UTC().Format(time.RFC3339), + Config: filepath.Clean(configPath), + Workspace: filepath.Clean(workspacePath), + Includes: includes, + } + manifestData, _ := json.MarshalIndent(manifest, "", " ") + w, err := zw.Create("manifest.json") + if err != nil { + return 0, err + } + if _, err := w.Write(manifestData); err != nil { + return 0, err + } + return fileCount, nil +} + +func importUnifiedBackup(workspacePath, configPath, archivePath string) (string, int, error) { + workspacePath = strings.TrimSpace(workspacePath) + configPath = strings.TrimSpace(configPath) + archivePath = strings.TrimSpace(archivePath) + if workspacePath == "" || configPath == "" || archivePath == "" { + return "", 0, fmt.Errorf("invalid import paths") + } + r, err := zip.OpenReader(archivePath) + if err != nil { + return "", 0, err + } + defer r.Close() + + rollbackPath := defaultBackupPathForConfig("clawgo-rollback", configPath) + if _, err := createUnifiedBackup(workspacePath, configPath, rollbackPath); err != nil { + return "", 0, fmt.Errorf("create rollback snapshot: %w", err) + } + + tmpDir, err := os.MkdirTemp("", "clawgo-import-*") + if err != nil { + return "", 0, err + } + defer os.RemoveAll(tmpDir) + + for _, zf := range r.File { + target := filepath.Clean(filepath.Join(tmpDir, zf.Name)) + if !strings.HasPrefix(target, tmpDir+string(filepath.Separator)) && target != tmpDir { + return "", 0, fmt.Errorf("invalid zip entry path: %s", zf.Name) + } + if zf.FileInfo().IsDir() { + if err := os.MkdirAll(target, 0755); err != nil { + return "", 0, err + } + continue + } + if err := os.MkdirAll(filepath.Dir(target), 0755); err != nil { + return "", 0, err + } + rc, err := zf.Open() + if err != nil { + return "", 0, err + } + data, readErr := io.ReadAll(rc) + _ = rc.Close() + if readErr != nil { + return "", 0, readErr + } + if err := os.WriteFile(target, data, zf.Mode()); err != nil { + return "", 0, err + } + } + + agentsRoot := filepath.Join(filepath.Dir(workspacePath), "agents") + restoreTasks := []struct { + src string + dst string + }{ + {src: filepath.Join(tmpDir, "workspace"), dst: workspacePath}, + {src: filepath.Join(tmpDir, "agents"), dst: agentsRoot}, + } + sort.SliceStable(restoreTasks, func(i, j int) bool { return restoreTasks[i].src < restoreTasks[j].src }) + restored := 0 + for _, task := range restoreTasks { + info, err := os.Stat(task.src) + if err != nil { + if os.IsNotExist(err) { + continue + } + return rollbackPath, restored, err + } + if !info.IsDir() { + continue + } + if err := os.MkdirAll(task.dst, 0755); err != nil { + return rollbackPath, restored, err + } + if err := copyDirectory(task.src, task.dst); err != nil { + return rollbackPath, restored, err + } + filepath.Walk(task.src, func(path string, info os.FileInfo, err error) error { + if err == nil && !info.IsDir() { + restored++ + } + return nil + }) + } + + importedConfig := filepath.Join(tmpDir, "config", "config.json") + if info, err := os.Stat(importedConfig); err == nil && !info.IsDir() { + data, err := os.ReadFile(importedConfig) + if err != nil { + return rollbackPath, restored, err + } + if err := os.MkdirAll(filepath.Dir(configPath), 0755); err != nil { + return rollbackPath, restored, err + } + if err := os.WriteFile(configPath, data, 0644); err != nil { + return rollbackPath, restored, err + } + restored++ + } + + return rollbackPath, restored, nil +} diff --git a/cmd/cmd_backup_test.go b/cmd/cmd_backup_test.go new file mode 100644 index 0000000..5ea3865 --- /dev/null +++ b/cmd/cmd_backup_test.go @@ -0,0 +1,71 @@ +package main + +import ( + "os" + "path/filepath" + "strings" + "testing" +) + +func TestUnifiedBackupCreateAndImport(t *testing.T) { + t.Parallel() + + root := t.TempDir() + workspace := filepath.Join(root, "workspace") + configPath := filepath.Join(root, "config", "config.json") + agentsDir := filepath.Join(root, "agents", "main", "sessions") + skillsDir := filepath.Join(workspace, "skills", "demo") + memoryDir := filepath.Join(workspace, "memory") + if err := os.MkdirAll(agentsDir, 0755); err != nil { + t.Fatalf("mkdir agents: %v", err) + } + if err := os.MkdirAll(skillsDir, 0755); err != nil { + t.Fatalf("mkdir skills: %v", err) + } + if err := os.MkdirAll(memoryDir, 0755); err != nil { + t.Fatalf("mkdir memory: %v", err) + } + if err := os.MkdirAll(filepath.Dir(configPath), 0755); err != nil { + t.Fatalf("mkdir config: %v", err) + } + _ = os.WriteFile(configPath, []byte(`{"gateway":{"token":"abc"}}`), 0644) + _ = os.WriteFile(filepath.Join(workspace, "MEMORY.md"), []byte("long-term"), 0644) + _ = os.WriteFile(filepath.Join(memoryDir, "2026-04-14.md"), []byte("daily-note"), 0644) + _ = os.WriteFile(filepath.Join(skillsDir, "SKILL.md"), []byte("# demo"), 0644) + _ = os.WriteFile(filepath.Join(agentsDir, "main.active.jsonl"), []byte("{\"type\":\"message\"}\n"), 0644) + + archive := filepath.Join(root, "backup.zip") + files, err := createUnifiedBackup(workspace, configPath, archive) + if err != nil { + t.Fatalf("createUnifiedBackup: %v", err) + } + if files < 4 { + t.Fatalf("expected backup files >= 4, got %d", files) + } + + // Mutate files to ensure import actually restores prior state. + _ = os.WriteFile(configPath, []byte(`{"gateway":{"token":"changed"}}`), 0644) + _ = os.WriteFile(filepath.Join(workspace, "MEMORY.md"), []byte("changed-memory"), 0644) + + rollback, restored, err := importUnifiedBackup(workspace, configPath, archive) + if err != nil { + t.Fatalf("importUnifiedBackup: %v", err) + } + if restored < 4 { + t.Fatalf("expected restored files >= 4, got %d", restored) + } + if strings.TrimSpace(rollback) == "" { + t.Fatalf("expected rollback path") + } + if _, err := os.Stat(rollback); err != nil { + t.Fatalf("rollback snapshot missing: %v", err) + } + cfgData, _ := os.ReadFile(configPath) + if !strings.Contains(string(cfgData), `"abc"`) { + t.Fatalf("config not restored, got %s", string(cfgData)) + } + memData, _ := os.ReadFile(filepath.Join(workspace, "MEMORY.md")) + if strings.TrimSpace(string(memData)) != "long-term" { + t.Fatalf("memory not restored, got %s", string(memData)) + } +} diff --git a/cmd/cmd_config.go b/cmd/cmd_config.go index c76fef4..7e561f0 100644 --- a/cmd/cmd_config.go +++ b/cmd/cmd_config.go @@ -406,6 +406,12 @@ func providerLoginCmd() { fmt.Printf("Provider %s is not configured with auth=oauth/hybrid\n", providerName) os.Exit(1) } + oauthProvider := strings.ToLower(strings.TrimSpace(pc.OAuth.Provider)) + if oauthProvider == "codex" { + // Codex login is device-code only; callback/browser modes are no longer used. + manual = false + noBrowser = true + } if manual { noBrowser = true } @@ -460,8 +466,10 @@ func providerLoginCmd() { fmt.Printf("OAuth login succeeded for provider %s\n", providerName) if manual { fmt.Println("Mode: manual callback URL paste") - } else if noBrowser { + } else if noBrowser && oauthProvider != "codex" { fmt.Println("Mode: local callback listener without auto-opening browser") + } else if oauthProvider == "codex" { + fmt.Println("Mode: device-code") } if session.Email != "" { fmt.Printf("Account: %s\n", session.Email) diff --git a/cmd/main.go b/cmd/main.go index 69c85bc..2966979 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -15,7 +15,7 @@ import ( "github.com/YspCoder/clawgo/pkg/logger" ) -var version = "0.0.2" +var version = "1.2.0" var buildTime = "unknown" const logo = ">" @@ -65,6 +65,8 @@ func main() { channelCmd() case "skills": skillsCmd() + case "backup": + backupCmd() case "tui": tuiCmd() case "version", "--version", "-v": diff --git a/pkg/agent/context_engine.go b/pkg/agent/context_engine.go new file mode 100644 index 0000000..cb5b0b3 --- /dev/null +++ b/pkg/agent/context_engine.go @@ -0,0 +1,55 @@ +package agent + +import "github.com/YspCoder/clawgo/pkg/providers" + +// ContextBuildRequest defines inputs for building a provider message window. +type ContextBuildRequest struct { + History []providers.Message + Summary string + CurrentMessage string + Media []string + Channel string + ChatID string + ResponseLanguage string + MemoryNamespace string +} + +// ContextEngine allows swapping context-assembly behavior without touching AgentLoop flow. +type ContextEngine interface { + BuildMessages(req ContextBuildRequest) []providers.Message + SkillsInfo() map[string]interface{} +} + +type defaultContextEngine struct { + builder *ContextBuilder +} + +func NewDefaultContextEngine(builder *ContextBuilder) ContextEngine { + if builder == nil { + return nil + } + return &defaultContextEngine{builder: builder} +} + +func (e *defaultContextEngine) BuildMessages(req ContextBuildRequest) []providers.Message { + if e == nil || e.builder == nil { + return nil + } + return e.builder.BuildMessagesWithMemoryNamespace( + req.History, + req.Summary, + req.CurrentMessage, + req.Media, + req.Channel, + req.ChatID, + req.ResponseLanguage, + req.MemoryNamespace, + ) +} + +func (e *defaultContextEngine) SkillsInfo() map[string]interface{} { + if e == nil || e.builder == nil { + return map[string]interface{}{} + } + return e.builder.GetSkillsInfo() +} diff --git a/pkg/agent/context_engine_test.go b/pkg/agent/context_engine_test.go new file mode 100644 index 0000000..7b54dbe --- /dev/null +++ b/pkg/agent/context_engine_test.go @@ -0,0 +1,48 @@ +package agent + +import ( + "testing" + + "github.com/YspCoder/clawgo/pkg/bus" + "github.com/YspCoder/clawgo/pkg/providers" + "github.com/YspCoder/clawgo/pkg/session" +) + +type testContextEngine struct { + lastReq ContextBuildRequest + messages []providers.Message +} + +func (e *testContextEngine) BuildMessages(req ContextBuildRequest) []providers.Message { + e.lastReq = req + return append([]providers.Message(nil), e.messages...) +} + +func (e *testContextEngine) SkillsInfo() map[string]interface{} { + return map[string]interface{}{"total": 0} +} + +func TestAgentLoopUsesPluggableContextEngine(t *testing.T) { + t.Parallel() + + engine := &testContextEngine{ + messages: []providers.Message{{Role: "system", Content: "from-test-engine"}}, + } + loop := &AgentLoop{ + sessions: session.NewSessionManager(""), + contextEngine: engine, + } + msg := bus.InboundMessage{ + Channel: "cli", + ChatID: "direct", + SessionKey: "main", + Content: "hello", + } + messages, _ := loop.prepareUserMessageContext(msg, "main") + if len(messages) != 1 || messages[0].Content != "from-test-engine" { + t.Fatalf("expected custom engine output, got %#v", messages) + } + if engine.lastReq.CurrentMessage != "hello" || engine.lastReq.Channel != "cli" { + t.Fatalf("unexpected context request: %#v", engine.lastReq) + } +} diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index a619c5f..51b8ea7 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -46,6 +46,7 @@ type AgentLoop struct { maxIterations int sessions *session.SessionManager contextBuilder *ContextBuilder + contextEngine ContextEngine tools *tools.ToolRegistry compactionEnabled bool compactionTrigger int @@ -117,6 +118,18 @@ func (al *AgentLoop) SetConfigPath(path string) { al.configPath = strings.TrimSpace(path) } +func (al *AgentLoop) SetContextEngine(engine ContextEngine) { + if al == nil { + return + } + al.runMu.Lock() + defer al.runMu.Unlock() + if engine == nil && al.contextBuilder != nil { + engine = NewDefaultContextEngine(al.contextBuilder) + } + al.contextEngine = engine +} + // StartupCompactionReport provides startup memory/session maintenance stats. type StartupCompactionReport struct { TotalSessions int `json:"total_sessions"` @@ -234,6 +247,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers // Register system info tool toolsRegistry.Register(tools.NewSystemInfoTool()) + contextBuilder := NewContextBuilder(workspace, func() []string { return toolsRegistry.GetSummaries() }) loop := &AgentLoop{ bus: msgBus, cfg: cfg, @@ -244,7 +258,8 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers temperature: cfg.Agents.Defaults.Temperature, maxIterations: cfg.Agents.Defaults.MaxToolIterations, sessions: sessionsManager, - contextBuilder: NewContextBuilder(workspace, func() []string { return toolsRegistry.GetSummaries() }), + contextBuilder: contextBuilder, + contextEngine: NewDefaultContextEngine(contextBuilder), tools: toolsRegistry, compactionEnabled: cfg.Agents.Defaults.ContextCompaction.Enabled, compactionTrigger: cfg.Agents.Defaults.ContextCompaction.TriggerMessages, @@ -893,14 +908,17 @@ type llmTurnLoopConfig struct { } type llmTurnLoopResult struct { - messages []providers.Message - pendingPersist []providers.Message - finalContent string - iteration int - attemptCount int - restartCount int - failureCode string - hasToolActivity bool + messages []providers.Message + pendingPersist []providers.Message + finalContent string + iteration int + attemptCount int + restartCount int + promptTokens int + completionTokens int + totalTokens int + failureCode string + hasToolActivity bool } func logLLMTurnRequest(iteration, maxIterations int, providerName, activeModel string, messages []providers.Message, providerToolDefs []providers.ToolDefinition, maxTokens int, temperature float64) { @@ -947,6 +965,63 @@ func logLLMToolCalls(iteration int, toolCalls []providers.ToolCall) { }) } +func mergeUsageTotals(dst *llmTurnLoopResult, usage *providers.UsageInfo) { + if dst == nil || usage == nil { + return + } + prompt := usage.PromptTokens + completion := usage.CompletionTokens + total := usage.TotalTokens + if total <= 0 { + total = prompt + completion + } + dst.promptTokens += prompt + dst.completionTokens += completion + dst.totalTokens += total +} + +func estimateResponseUsage(ctx context.Context, provider providers.LLMProvider, model string, prompt []providers.Message, toolDefs []providers.ToolDefinition, response *providers.LLMResponse) *providers.UsageInfo { + if response == nil { + return nil + } + if response.Usage != nil { + return response.Usage + } + counter, ok := provider.(providers.TokenCounter) + if !ok { + return nil + } + usage, err := counter.CountTokens(ctx, prompt, toolDefs, model, nil) + if err != nil || usage == nil { + return nil + } + promptTokens := usage.TotalTokens + if promptTokens <= 0 { + promptTokens = usage.PromptTokens + } + if promptTokens <= 0 { + return nil + } + completionChars := len(strings.TrimSpace(response.Content)) + for _, tc := range response.ToolCalls { + completionChars += len(strings.TrimSpace(tc.Name)) + if tc.Arguments != nil { + if b, err := json.Marshal(tc.Arguments); err == nil { + completionChars += len(b) + } + } + } + completionTokens := completionChars / 4 + if completionTokens <= 0 && completionChars > 0 { + completionTokens = 1 + } + return &providers.UsageInfo{ + PromptTokens: promptTokens, + CompletionTokens: completionTokens, + TotalTokens: promptTokens + completionTokens, + } +} + func buildAssistantToolCallMessage(response *providers.LLMResponse) providers.Message { assistantMsg := providers.Message{ Role: "assistant", @@ -1195,6 +1270,7 @@ func (al *AgentLoop) runLLMTurnLoop(cfg llmTurnLoopConfig) (llmTurnLoopResult, e }) return result, fmt.Errorf("LLM call failed: %w", err) } + mergeUsageTotals(&result, estimateResponseUsage(cfg.ctx, activeProvider, activeModel, result.messages, providerToolDefs, response)) if len(response.ToolCalls) == 0 { result.finalContent = response.Content @@ -1240,16 +1316,30 @@ func (al *AgentLoop) prepareUserMessageContext(msg bus.InboundMessage, memoryNam } preferredLang, lastLang := al.sessions.GetLanguagePreferences(msg.SessionKey) responseLang := DetectResponseLanguage(msg.Content, preferredLang, lastLang) - messages := al.contextBuilder.BuildMessagesWithMemoryNamespace( - history, - summary, - msg.Content, - nil, - msg.Channel, - msg.ChatID, - responseLang, - memoryNamespace, - ) + messages := []providers.Message(nil) + if al.contextEngine != nil { + messages = al.contextEngine.BuildMessages(ContextBuildRequest{ + History: history, + Summary: summary, + CurrentMessage: msg.Content, + Channel: msg.Channel, + ChatID: msg.ChatID, + ResponseLanguage: responseLang, + MemoryNamespace: memoryNamespace, + }) + } + if len(messages) == 0 && al.contextBuilder != nil { + messages = al.contextBuilder.BuildMessagesWithMemoryNamespace( + history, + summary, + msg.Content, + nil, + msg.Channel, + msg.ChatID, + responseLang, + memoryNamespace, + ) + } return messages, responseLang } @@ -1270,15 +1360,29 @@ func (al *AgentLoop) prepareSystemMessageContext(sessionKey string, msg bus.Inbo summary := al.sessions.GetSummary(sessionKey) preferredLang, lastLang := al.sessions.GetLanguagePreferences(sessionKey) responseLang := DetectResponseLanguage(msg.Content, preferredLang, lastLang) - messages := al.contextBuilder.BuildMessages( - history, - summary, - msg.Content, - nil, - originChannel, - originChatID, - responseLang, - ) + messages := []providers.Message(nil) + if al.contextEngine != nil { + messages = al.contextEngine.BuildMessages(ContextBuildRequest{ + History: history, + Summary: summary, + CurrentMessage: msg.Content, + Channel: originChannel, + ChatID: originChatID, + ResponseLanguage: responseLang, + MemoryNamespace: "main", + }) + } + if len(messages) == 0 && al.contextBuilder != nil { + messages = al.contextBuilder.BuildMessages( + history, + summary, + msg.Content, + nil, + originChannel, + originChatID, + responseLang, + ) + } return messages, responseLang } @@ -1567,18 +1671,24 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) }) if err != nil { tools.RecordSubagentExecutionStats(ctx, tools.SubagentExecutionStats{ - Iterations: loopResult.iteration, - Attempts: loopResult.attemptCount, - Restarts: loopResult.restartCount, - FailureCode: classifyLLMFailureCode(err), + Iterations: loopResult.iteration, + Attempts: loopResult.attemptCount, + Restarts: loopResult.restartCount, + PromptTokens: loopResult.promptTokens, + CompletionTokens: loopResult.completionTokens, + TotalTokens: loopResult.totalTokens, + FailureCode: classifyLLMFailureCode(err), }) al.reopenSpecTaskOnError(specTaskRef, msg, err) return "", err } tools.RecordSubagentExecutionStats(ctx, tools.SubagentExecutionStats{ - Iterations: loopResult.iteration, - Attempts: loopResult.attemptCount, - Restarts: loopResult.restartCount, + Iterations: loopResult.iteration, + Attempts: loopResult.attemptCount, + Restarts: loopResult.restartCount, + PromptTokens: loopResult.promptTokens, + CompletionTokens: loopResult.completionTokens, + TotalTokens: loopResult.totalTokens, }) finalContent, userContent := al.finalizeUserTurnResponse(ctx, msg, responseLang, loopResult) @@ -1590,8 +1700,14 @@ func (al *AgentLoop) processMessage(ctx context.Context, msg bus.InboundMessage) "sender_id": msg.SenderID, "preview": responsePreview, "iterations": loopResult.iteration, + "attempts": loopResult.attemptCount, "final_length": len(finalContent), "user_length": len(userContent), + "token_usage": map[string]int{ + "prompt": loopResult.promptTokens, + "completion": loopResult.completionTokens, + "total": loopResult.totalTokens, + }, }) al.completeSpecTaskOnSuccess(specTaskRef, msg, userContent) @@ -2547,7 +2663,11 @@ func (al *AgentLoop) GetStartupInfo() map[string]interface{} { } // Skills info - info["skills"] = al.contextBuilder.GetSkillsInfo() + if al.contextEngine != nil { + info["skills"] = al.contextEngine.SkillsInfo() + } else if al.contextBuilder != nil { + info["skills"] = al.contextBuilder.GetSkillsInfo() + } return info } diff --git a/pkg/agent/loop_usage_test.go b/pkg/agent/loop_usage_test.go new file mode 100644 index 0000000..3924471 --- /dev/null +++ b/pkg/agent/loop_usage_test.go @@ -0,0 +1,26 @@ +package agent + +import ( + "testing" + + "github.com/YspCoder/clawgo/pkg/providers" +) + +func TestMergeUsageTotals(t *testing.T) { + t.Parallel() + + var result llmTurnLoopResult + mergeUsageTotals(&result, &providers.UsageInfo{PromptTokens: 10, CompletionTokens: 4, TotalTokens: 0}) + mergeUsageTotals(&result, &providers.UsageInfo{PromptTokens: 3, CompletionTokens: 2, TotalTokens: 9}) + + if result.promptTokens != 13 { + t.Fatalf("prompt tokens = %d, want 13", result.promptTokens) + } + if result.completionTokens != 6 { + t.Fatalf("completion tokens = %d, want 6", result.completionTokens) + } + // First merge falls back to prompt+completion (14), second uses explicit total (9). + if result.totalTokens != 23 { + t.Fatalf("total tokens = %d, want 23", result.totalTokens) + } +} diff --git a/pkg/providers/oauth.go b/pkg/providers/oauth.go index 76d860a..22968bb 100644 --- a/pkg/providers/oauth.go +++ b/pkg/providers/oauth.go @@ -33,11 +33,12 @@ const ( oauthStyleJSON = "json" defaultCodexOAuthProvider = "codex" - defaultCodexAuthURL = "https://auth.openai.com/oauth/authorize" + defaultCodexAuthURL = "https://auth.openai.com/codex/device" + defaultCodexDeviceCodeURL = "https://auth.openai.com/api/accounts/deviceauth/usercode" + defaultCodexDeviceTokenPollURL = "https://auth.openai.com/api/accounts/deviceauth/token" + defaultCodexDeviceRedirectURL = "https://auth.openai.com/deviceauth/callback" defaultCodexTokenURL = "https://auth.openai.com/oauth/token" defaultCodexClientID = "app_EMoamEEZ73f0CkXaXp7hrann" - defaultCodexCallbackPort = 1455 - defaultCodexRedirectPath = "/auth/callback" defaultClaudeOAuthProvider = "claude" defaultClaudeAuthURL = "https://claude.ai/oauth/authorize" defaultClaudeTokenURL = "https://api.anthropic.com/v1/oauth/token" @@ -90,14 +91,14 @@ var ( ) var ( - defaultAntigravityClientIDValue = "1071006060591-" + "tmhssin2h21lcre235vtolojh4g403ep.apps.googleusercontent.com" + defaultAntigravityClientIDValue = "1071006060591-" + "tmhssin2h21lcre235vtolojh4g403ep.apps.googleusercontent.com" defaultAntigravityClientSecretValue = "GOCSPX-" + "K58FWR486LdLJ1mLB8sXC4z6qDAf" - defaultGeminiClientIDValue = "681255809395-" + "oo8ft2oprdrnp9e3aqf6av3hmdib135j.apps.googleusercontent.com" - defaultGeminiClientSecretValue = "GOCSPX-" + "4uHgMPm-1o7Sk-geV6Cu5clXFsxl" - defaultAntigravityClientID = firstNonEmpty(strings.TrimSpace(os.Getenv("CLAWGO_ANTIGRAVITY_CLIENT_ID")), defaultAntigravityClientIDValue) - defaultAntigravityClientSecret = firstNonEmpty(strings.TrimSpace(os.Getenv("CLAWGO_ANTIGRAVITY_CLIENT_SECRET")), defaultAntigravityClientSecretValue) - defaultGeminiClientID = firstNonEmpty(strings.TrimSpace(os.Getenv("CLAWGO_GEMINI_CLIENT_ID")), defaultGeminiClientIDValue) - defaultGeminiClientSecret = firstNonEmpty(strings.TrimSpace(os.Getenv("CLAWGO_GEMINI_CLIENT_SECRET")), defaultGeminiClientSecretValue) + defaultGeminiClientIDValue = "681255809395-" + "oo8ft2oprdrnp9e3aqf6av3hmdib135j.apps.googleusercontent.com" + defaultGeminiClientSecretValue = "GOCSPX-" + "4uHgMPm-1o7Sk-geV6Cu5clXFsxl" + defaultAntigravityClientID = firstNonEmpty(strings.TrimSpace(os.Getenv("CLAWGO_ANTIGRAVITY_CLIENT_ID")), defaultAntigravityClientIDValue) + defaultAntigravityClientSecret = firstNonEmpty(strings.TrimSpace(os.Getenv("CLAWGO_ANTIGRAVITY_CLIENT_SECRET")), defaultAntigravityClientSecretValue) + defaultGeminiClientID = firstNonEmpty(strings.TrimSpace(os.Getenv("CLAWGO_GEMINI_CLIENT_ID")), defaultGeminiClientIDValue) + defaultGeminiClientSecret = firstNonEmpty(strings.TrimSpace(os.Getenv("CLAWGO_GEMINI_CLIENT_SECRET")), defaultGeminiClientSecretValue) ) var ( @@ -152,6 +153,7 @@ type oauthConfig struct { AuthURL string TokenURL string DeviceCodeURL string + DeviceTokenURL string UserInfoURL string RedirectURL string RedirectPath string @@ -611,11 +613,20 @@ func resolveOAuthConfig(pc config.ProviderConfig) (oauthConfig, error) { } switch provider { case defaultCodexOAuthProvider: - cfg.CallbackPort = defaultInt(cfg.CallbackPort, defaultCodexCallbackPort) + cfg.FlowKind = oauthFlowDevice cfg.ClientID = firstNonEmpty(cfg.ClientID, defaultCodexClientID) - cfg.AuthURL = firstNonEmpty(cfg.AuthURL, defaultCodexAuthURL) + cfg.AuthURL = firstNonEmpty(defaultCodexAuthURL) cfg.TokenURL = firstNonEmpty(cfg.TokenURL, defaultCodexTokenURL) - cfg.RedirectPath = defaultCodexRedirectPath + deviceURL := strings.TrimSpace(pc.OAuth.AuthURL) + if deviceURL == "" || strings.Contains(strings.ToLower(deviceURL), "/oauth/authorize") { + deviceURL = defaultCodexDeviceCodeURL + } + cfg.DeviceCodeURL = deviceURL + if strings.Contains(strings.ToLower(deviceURL), "/usercode") { + cfg.DeviceTokenURL = strings.Replace(deviceURL, "/usercode", "/token", 1) + } + cfg.DeviceTokenURL = firstNonEmpty(cfg.DeviceTokenURL, defaultCodexDeviceTokenPollURL) + cfg.RedirectURL = firstNonEmpty(strings.TrimSpace(pc.OAuth.RedirectURL), defaultCodexDeviceRedirectURL) if len(cfg.Scopes) == 0 { cfg.Scopes = append([]string(nil), defaultCodexScopes...) } @@ -743,11 +754,15 @@ func (m *oauthManager) login(ctx context.Context, apiBase string, opts OAuthLogi if err != nil { return nil, nil, err } - fmt.Printf("Open this URL to continue OAuth login:\n%s\n", flow.AuthURL) + if m.cfg.Provider == defaultCodexOAuthProvider { + fmt.Printf("To continue Codex login:\n1) Open: %s\n2) Enter code: %s\n", flow.AuthURL, strings.TrimSpace(flow.UserCode)) + } else { + fmt.Printf("Open this URL to continue OAuth login:\n%s\n", flow.AuthURL) + } if strings.TrimSpace(flow.UserCode) != "" { fmt.Printf("User code: %s\n", flow.UserCode) } - if !opts.NoBrowser { + if !opts.NoBrowser && m.cfg.Provider != defaultCodexOAuthProvider { if err := openBrowser(flow.AuthURL); err != nil { fmt.Printf("Automatic browser open failed: %v\n", err) } @@ -2379,6 +2394,32 @@ func (m *oauthManager) startDeviceFlow(ctx context.Context, opts OAuthLoginOptio form := url.Values{} form.Set("client_id", m.cfg.ClientID) switch m.cfg.Provider { + case defaultCodexOAuthProvider: + body, _ := json.Marshal(map[string]any{"client_id": m.cfg.ClientID}) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, m.cfg.DeviceCodeURL, strings.NewReader(string(body))) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + raw, err := m.doJSONRequest(req, "oauth device request", opts.NetworkProxy) + if err != nil { + return nil, err + } + userCode := strings.TrimSpace(asString(raw["user_code"])) + deviceAuthID := strings.TrimSpace(asString(raw["device_auth_id"])) + if userCode == "" || deviceAuthID == "" { + return nil, fmt.Errorf("oauth device flow missing user_code/device_auth_id") + } + return &OAuthPendingFlow{ + Mode: oauthFlowDevice, + AuthURL: firstNonEmpty(strings.TrimSpace(m.cfg.AuthURL), defaultCodexAuthURL), + UserCode: userCode, + DeviceCode: deviceAuthID, + IntervalSec: defaultInt(asInt(raw["interval"]), 5), + ExpiresAt: deviceExpiry(asInt(raw["expires_in"])), + Instructions: "Open the verification URL, enter the user code, and approve the device login.", + }, nil case defaultQwenOAuthProvider: verifier, challenge, err := generatePKCE() if err != nil { @@ -2430,6 +2471,26 @@ func (m *oauthManager) startDeviceFlow(ctx context.Context, opts OAuthLoginOptio } } +func asInt(v any) int { + switch n := v.(type) { + case int: + return n + case int64: + return int(n) + case float64: + return int(n) + case json.Number: + if parsed, err := n.Int64(); err == nil { + return int(parsed) + } + case string: + if parsed, err := strconv.Atoi(strings.TrimSpace(n)); err == nil { + return parsed + } + } + return 0 +} + func (m *oauthManager) doFormDeviceRequest(ctx context.Context, endpoint string, form url.Values, proxyURL string) (map[string]any, error) { req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, strings.NewReader(form.Encode())) if err != nil { @@ -2513,6 +2574,9 @@ func (m *oauthManager) pollDeviceToken(ctx context.Context, flow *OAuthPendingFl if flow == nil || strings.TrimSpace(flow.DeviceCode) == "" { return nil, fmt.Errorf("oauth device flow missing device code") } + if m.cfg.Provider == defaultCodexOAuthProvider { + return m.pollCodexDeviceToken(ctx, flow, proxyURL) + } interval := time.Duration(defaultInt(flow.IntervalSec, 5)) * time.Second deadline := time.Now().Add(m.cfg.DevicePollMax) if expireAt, err := time.Parse(time.RFC3339, strings.TrimSpace(flow.ExpiresAt)); err == nil && expireAt.Before(deadline) { @@ -2560,3 +2624,78 @@ func (m *oauthManager) pollDeviceToken(ctx context.Context, flow *OAuthPendingFl } } } + +func (m *oauthManager) pollCodexDeviceToken(ctx context.Context, flow *OAuthPendingFlow, proxyURL string) (*oauthSession, error) { + interval := time.Duration(defaultInt(flow.IntervalSec, 5)) * time.Second + if interval < 3*time.Second { + interval = 3 * time.Second + } + deadline := time.Now().Add(m.cfg.DevicePollMax) + if expireAt, err := time.Parse(time.RFC3339, strings.TrimSpace(flow.ExpiresAt)); err == nil && expireAt.Before(deadline) { + deadline = expireAt + } + for { + if time.Now().After(deadline) { + return nil, fmt.Errorf("oauth device flow timed out") + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(interval): + } + payload, _ := json.Marshal(map[string]any{ + "device_auth_id": strings.TrimSpace(flow.DeviceCode), + "user_code": strings.TrimSpace(flow.UserCode), + }) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, m.cfg.DeviceTokenURL, strings.NewReader(string(payload))) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + client, err := m.httpClientForProxy(proxyURL) + if err != nil { + return nil, err + } + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("oauth device poll request failed: %w", err) + } + body, readErr := io.ReadAll(resp.Body) + resp.Body.Close() + if readErr != nil { + return nil, fmt.Errorf("oauth device poll read failed: %w", readErr) + } + switch resp.StatusCode { + case http.StatusOK: + var raw map[string]any + if err := json.Unmarshal(body, &raw); err != nil { + return nil, fmt.Errorf("oauth device poll decode failed: %w", err) + } + authCode := strings.TrimSpace(asString(raw["authorization_code"])) + verifier := strings.TrimSpace(asString(raw["code_verifier"])) + if authCode == "" || verifier == "" { + return nil, fmt.Errorf("oauth device auth response missing authorization_code/code_verifier") + } + form := url.Values{} + form.Set("grant_type", "authorization_code") + form.Set("code", authCode) + form.Set("redirect_uri", m.cfg.RedirectURL) + form.Set("client_id", m.cfg.ClientID) + form.Set("code_verifier", verifier) + tokenRaw, err := m.doFormTokenRequest(ctx, form, proxyURL) + if err != nil { + return nil, err + } + session, err := sessionFromTokenPayload(m.cfg.Provider, tokenRaw) + if err != nil { + return nil, err + } + return m.enrichSession(ctx, session) + case http.StatusForbidden, http.StatusNotFound: + continue + default: + return nil, fmt.Errorf("oauth device poll failed: status=%d body=%s", resp.StatusCode, strings.TrimSpace(string(body))) + } + } +} diff --git a/pkg/providers/oauth_test.go b/pkg/providers/oauth_test.go index 9ffc332..b17ed01 100644 --- a/pkg/providers/oauth_test.go +++ b/pkg/providers/oauth_test.go @@ -323,6 +323,7 @@ func TestResolveOAuthConfigSupportsAdditionalProviders(t *testing.T) { want string flow string }{ + {name: "codex", provider: "codex", want: "codex", flow: oauthFlowDevice}, {name: "anthropic-alias", provider: "anthropic", want: "claude", flow: oauthFlowCallback}, {name: "antigravity", provider: "antigravity", want: "antigravity", flow: oauthFlowCallback}, {name: "gemini", provider: "gemini", want: "gemini", flow: oauthFlowCallback}, @@ -951,6 +952,97 @@ func TestOAuthDeviceFlowQwenManualCompletes(t *testing.T) { } } +func TestOAuthDeviceFlowCodexStartAndComplete(t *testing.T) { + t.Parallel() + + var pollAttempts int32 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/api/accounts/deviceauth/usercode": + if got := r.Header.Get("Content-Type"); !strings.Contains(strings.ToLower(got), "application/json") { + t.Fatalf("expected json content type, got %s", got) + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"user_code":"U-CODE","device_auth_id":"dev-auth-1","interval":0,"expires_in":60}`)) + case "/api/accounts/deviceauth/token": + attempt := atomic.AddInt32(&pollAttempts, 1) + if attempt == 1 { + w.WriteHeader(http.StatusNotFound) + _, _ = w.Write([]byte(`{}`)) + return + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"authorization_code":"auth-code-1","code_verifier":"verifier-1"}`)) + case "/oauth/token": + if err := r.ParseForm(); err != nil { + t.Fatalf("parse form failed: %v", err) + } + if got := r.Form.Get("grant_type"); got != "authorization_code" { + t.Fatalf("unexpected grant_type: %s", got) + } + if got := r.Form.Get("code"); got != "auth-code-1" { + t.Fatalf("unexpected code: %s", got) + } + if got := r.Form.Get("code_verifier"); got != "verifier-1" { + t.Fatalf("unexpected code_verifier: %s", got) + } + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"access_token":"codex-at","refresh_token":"codex-rt","expires_in":3600}`)) + case "/models": + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"data":[{"id":"gpt-5.4"}]}`)) + default: + http.NotFound(w, r) + } + })) + defer server.Close() + + dir := t.TempDir() + manager, err := newOAuthManager(config.ProviderConfig{ + APIBase: server.URL, + Auth: "oauth", + OAuth: config.ProviderOAuthConfig{ + Provider: "codex", + AuthURL: server.URL + "/api/accounts/deviceauth/usercode", + TokenURL: server.URL + "/oauth/token", + RedirectURL: server.URL + "/deviceauth/callback", + CredentialFile: filepath.Join(dir, "codex.json"), + }, + }, 5*time.Second) + if err != nil { + t.Fatalf("new oauth manager failed: %v", err) + } + defer manager.bgCancel() + + flow, err := manager.startDeviceFlow(context.Background(), OAuthLoginOptions{}) + if err != nil { + t.Fatalf("start device flow failed: %v", err) + } + if flow.Mode != oauthFlowDevice { + t.Fatalf("unexpected flow mode: %s", flow.Mode) + } + if flow.UserCode != "U-CODE" || flow.DeviceCode != "dev-auth-1" { + t.Fatalf("unexpected flow payload: %#v", flow) + } + if flow.IntervalSec < 1 { + t.Fatalf("expected normalized poll interval >=1, got %d", flow.IntervalSec) + } + + session, models, err := manager.completeDeviceFlow(context.Background(), server.URL, flow, OAuthLoginOptions{}) + if err != nil { + t.Fatalf("complete device flow failed: %v", err) + } + if session.AccessToken != "codex-at" || session.RefreshToken != "codex-rt" { + t.Fatalf("unexpected session tokens: %#v", session) + } + if atomic.LoadInt32(&pollAttempts) < 2 { + t.Fatalf("expected polling retries, got %d", pollAttempts) + } + if len(models) != 1 || models[0] != "gpt-5.4" { + t.Fatalf("unexpected models: %#v", models) + } +} + func TestHTTPProviderHybridFallsBackFromAPIKeyToOAuth(t *testing.T) { t.Parallel() diff --git a/pkg/tools/message_process_test.go b/pkg/tools/message_process_test.go index 6a1a0f0..8d5a38d 100644 --- a/pkg/tools/message_process_test.go +++ b/pkg/tools/message_process_test.go @@ -2,8 +2,10 @@ package tools import ( "context" + "encoding/json" "strings" "testing" + "time" "github.com/YspCoder/clawgo/pkg/bus" ) @@ -66,3 +68,63 @@ func TestProcessToolParsesStringIntegers(t *testing.T) { t.Fatalf("expected json list output, got %s", out) } } + +func TestProcessToolWatchPatternsMatchesLog(t *testing.T) { + t.Parallel() + + pm := NewProcessManager(t.TempDir()) + id, err := pm.Start(context.Background(), "printf 'READY\\n'; sleep 0.05", "") + if err != nil { + t.Fatalf("start failed: %v", err) + } + tool := NewProcessTool(pm) + + out, err := tool.Execute(context.Background(), map[string]interface{}{ + "action": "watch_patterns", + "session_id": id, + "patterns": []interface{}{"ready"}, + "timeout_ms": 2000, + "interval_ms": 50, + }) + if err != nil { + t.Fatalf("execute failed: %v", err) + } + var payload map[string]interface{} + if err := json.Unmarshal([]byte(out), &payload); err != nil { + t.Fatalf("invalid json output: %v (%s)", err, out) + } + if matched, _ := payload["matched"].(bool); !matched { + t.Fatalf("expected matched response, got %v", payload) + } +} + +func TestProcessToolWatchPatternsTimesOut(t *testing.T) { + t.Parallel() + + pm := NewProcessManager(t.TempDir()) + id, err := pm.Start(context.Background(), "sleep 0.3", "") + if err != nil { + t.Fatalf("start failed: %v", err) + } + tool := NewProcessTool(pm) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + out, err := tool.Execute(ctx, map[string]interface{}{ + "action": "watch_patterns", + "session_id": id, + "patterns": "nomatch", + "timeout_ms": "120", + "interval_ms": "30", + }) + if err != nil { + t.Fatalf("execute failed: %v", err) + } + var payload map[string]interface{} + if err := json.Unmarshal([]byte(out), &payload); err != nil { + t.Fatalf("invalid json output: %v (%s)", err, out) + } + if timedOut, _ := payload["timed_out"].(bool); !timedOut { + t.Fatalf("expected timed_out=true, got %v", payload) + } +} diff --git a/pkg/tools/process_tool.go b/pkg/tools/process_tool.go index 59b62b8..92914ef 100644 --- a/pkg/tools/process_tool.go +++ b/pkg/tools/process_tool.go @@ -3,6 +3,8 @@ package tools import ( "context" "encoding/json" + "fmt" + "strings" "time" ) @@ -11,15 +13,19 @@ type ProcessTool struct{ m *ProcessManager } func NewProcessTool(m *ProcessManager) *ProcessTool { return &ProcessTool{m: m} } func (t *ProcessTool) Name() string { return "process" } func (t *ProcessTool) Description() string { - return "Manage background exec sessions: list, poll, log, kill" + return "Manage background exec sessions: list, poll, log, kill, watch_patterns" } func (t *ProcessTool) Parameters() map[string]interface{} { return map[string]interface{}{"type": "object", "properties": map[string]interface{}{ - "action": map[string]interface{}{"type": "string", "description": "list|poll|log|kill"}, - "session_id": map[string]interface{}{"type": "string"}, - "offset": map[string]interface{}{"type": "integer"}, - "limit": map[string]interface{}{"type": "integer"}, - "timeout_ms": map[string]interface{}{"type": "integer"}, + "action": map[string]interface{}{"type": "string", "description": "list|poll|log|kill|watch_patterns"}, + "session_id": map[string]interface{}{"type": "string"}, + "offset": map[string]interface{}{"type": "integer"}, + "limit": map[string]interface{}{"type": "integer"}, + "timeout_ms": map[string]interface{}{"type": "integer"}, + "interval_ms": map[string]interface{}{"type": "integer"}, + "patterns": map[string]interface{}{"type": "array", "items": map[string]interface{}{"type": "string"}}, + "case_sensitive": map[string]interface{}{"type": "boolean"}, + "alert_on_exit": map[string]interface{}{"type": "boolean"}, }, "required": []string{"action"}} } @@ -76,7 +82,156 @@ func (t *ProcessTool) Execute(ctx context.Context, args map[string]interface{}) } b, _ := json.Marshal(resp) return string(b), nil + case "watch_patterns": + patterns := MapStringListArg(args, "patterns") + if len(patterns) == 0 { + return "", fmt.Errorf("patterns is required") + } + timeout := MapIntArg(args, "timeout_ms", 30000) + if timeout < 1 { + timeout = 30000 + } + interval := MapIntArg(args, "interval_ms", 250) + if interval < 50 { + interval = 50 + } + if interval > timeout { + interval = timeout + } + off := MapIntArg(args, "offset", 0) + if off < 0 { + off = 0 + } + caseSensitive := false + if v, ok := MapBoolArg(args, "case_sensitive"); ok { + caseSensitive = v + } + alertOnExit := true + if v, ok := MapBoolArg(args, "alert_on_exit"); ok { + alertOnExit = v + } + return t.watchPatterns(ctx, sid, patterns, off, timeout, interval, caseSensitive, alertOnExit) default: return "", nil } } + +func (t *ProcessTool) watchPatterns(ctx context.Context, sid string, patterns []string, offset, timeoutMs, intervalMs int, caseSensitive, alertOnExit bool) (string, error) { + s, ok := t.m.Get(sid) + if !ok { + return "", fmt.Errorf("session not found: %s", sid) + } + type watchPattern struct { + original string + lookup string + } + normalized := make([]watchPattern, 0, len(patterns)) + for _, p := range patterns { + p = strings.TrimSpace(p) + if p == "" { + continue + } + lookup := p + if !caseSensitive { + lookup = strings.ToLower(p) + } + normalized = append(normalized, watchPattern{original: p, lookup: lookup}) + } + if len(normalized) == 0 { + return "", fmt.Errorf("patterns is required") + } + started := time.Now() + deadline := started.Add(time.Duration(timeoutMs) * time.Millisecond) + scanBuf := "" + nextOffset := offset + for { + chunk, err := t.m.Log(sid, nextOffset, 16*1024) + if err != nil { + return "", err + } + if chunk != "" { + nextOffset += len(chunk) + scanBuf += chunk + if len(scanBuf) > 24*1024 { + scanBuf = scanBuf[len(scanBuf)-24*1024:] + } + haystack := scanBuf + if !caseSensitive { + haystack = strings.ToLower(haystack) + } + for _, pattern := range normalized { + if strings.Contains(haystack, pattern.lookup) { + resp := map[string]interface{}{ + "id": s.ID, + "matched": true, + "pattern": pattern.original, + "running": processSessionRunning(s), + "next_offset": nextOffset, + "elapsed_ms": time.Since(started).Milliseconds(), + } + b, _ := json.Marshal(resp) + return string(b), nil + } + } + } + running, exitCode := processSessionState(s) + if !running { + resp := map[string]interface{}{ + "id": s.ID, + "matched": false, + "running": false, + "exit_code": exitCode, + "next_offset": nextOffset, + "elapsed_ms": time.Since(started).Milliseconds(), + } + if alertOnExit { + resp["event"] = "process_exited" + } + b, _ := json.Marshal(resp) + return string(b), nil + } + now := time.Now() + if now.After(deadline) { + resp := map[string]interface{}{ + "id": s.ID, + "matched": false, + "running": true, + "timed_out": true, + "next_offset": nextOffset, + "elapsed_ms": now.Sub(started).Milliseconds(), + } + b, _ := json.Marshal(resp) + return string(b), nil + } + wait := time.Duration(intervalMs) * time.Millisecond + if remaining := time.Until(deadline); wait > remaining { + wait = remaining + } + select { + case <-ctx.Done(): + return "", ctx.Err() + case <-time.After(wait): + } + } +} + +func processSessionRunning(s *processSession) bool { + if s == nil { + return false + } + s.mu.RLock() + defer s.mu.RUnlock() + return s.ExitCode == nil +} + +func processSessionState(s *processSession) (running bool, exitCode interface{}) { + if s == nil { + return false, nil + } + s.mu.RLock() + defer s.mu.RUnlock() + if s.ExitCode == nil { + return true, nil + } + return false, *s.ExitCode +} diff --git a/pkg/tools/subagent.go b/pkg/tools/subagent.go index 717d97a..1fc6057 100644 --- a/pkg/tools/subagent.go +++ b/pkg/tools/subagent.go @@ -37,6 +37,9 @@ type SubagentRun struct { IterationCount int `json:"iteration_count,omitempty"` AttemptCount int `json:"attempt_count,omitempty"` RestartCount int `json:"restart_count,omitempty"` + PromptTokens int `json:"prompt_tokens,omitempty"` + CompletionTokens int `json:"completion_tokens,omitempty"` + TotalTokens int `json:"total_tokens,omitempty"` LastFailureCode string `json:"last_failure_code,omitempty"` ThreadID string `json:"thread_id,omitempty"` CorrelationID string `json:"correlation_id,omitempty"` @@ -872,6 +875,9 @@ func (sm *SubagentManager) applyExecutionStats(run *SubagentRun, stats *Subagent run.IterationCount += stats.Iterations run.AttemptCount += stats.Attempts run.RestartCount += stats.Restarts + run.PromptTokens += stats.PromptTokens + run.CompletionTokens += stats.CompletionTokens + run.TotalTokens += stats.TotalTokens if strings.TrimSpace(stats.FailureCode) != "" { run.LastFailureCode = strings.TrimSpace(stats.FailureCode) } diff --git a/pkg/tools/subagent_runtime_context.go b/pkg/tools/subagent_runtime_context.go index 08b795b..bc6341d 100644 --- a/pkg/tools/subagent_runtime_context.go +++ b/pkg/tools/subagent_runtime_context.go @@ -3,10 +3,13 @@ package tools import "context" type SubagentExecutionStats struct { - Iterations int - Attempts int - Restarts int - FailureCode string + Iterations int + Attempts int + Restarts int + PromptTokens int + CompletionTokens int + TotalTokens int + FailureCode string } type subagentExecutionStatsKey struct{} @@ -30,6 +33,9 @@ func RecordSubagentExecutionStats(ctx context.Context, delta SubagentExecutionSt stats.Iterations += delta.Iterations stats.Attempts += delta.Attempts stats.Restarts += delta.Restarts + stats.PromptTokens += delta.PromptTokens + stats.CompletionTokens += delta.CompletionTokens + stats.TotalTokens += delta.TotalTokens if delta.FailureCode != "" { stats.FailureCode = delta.FailureCode } diff --git a/workspace/AGENTS.md b/workspace/AGENTS.md index 2ea7903..c87b10a 100644 --- a/workspace/AGENTS.md +++ b/workspace/AGENTS.md @@ -30,6 +30,10 @@ At the start of work, load context in this order: - Daily log: write to `memory/YYYY-MM-DD.md` - Long-term memory: write to `MEMORY.md` - Prefer short, structured notes (bullets) over long paragraphs. +- For "previous chat / last time / earlier discussion" requests: + - first use `session_search` to recover transcript evidence + - then use `memory_search` for durable preferences/decisions + - do not guess from memory when searchable history exists --- @@ -169,7 +173,7 @@ If thinking is complete but output should be suppressed, output exactly: If the user message contains any of: - `remember, 记得, 上次, 之前, 偏好, preference, todo, 待办, 决定, decision` Then: -- prioritize recalling from `MEMORY.md` and today’s log +- prioritize recalling via `session_search`, then `MEMORY.md` and today’s log - if writing memory, write short, structured bullets #### 12.4 Empty listing fallbacks @@ -199,3 +203,18 @@ If content includes any of: ### 13) Safety - No destructive actions without confirmation. - No external sending/actions unless explicitly allowed. +- For channel-facing actions (Telegram/Weixin/Feishu/etc), prefer "internal draft -> explicit send" when ambiguity exists. +- If a tool call may touch external systems, state: target, expected side effect, and rollback hint. + +--- + +### 14) Runtime Reliability Defaults +- Keep user-facing latency first: + - do not block final user response on non-critical background maintenance + - allow best-effort background retries for compaction/index maintenance +- Prefer structured failure reporting: + - classify failures (`timeout`, `stream_failed`, `retry_limit`, `context_compacted`) when available + - avoid generic "failed" messages without actionable context +- Use incremental state paths by default: + - append-only logs first + - sidecar/index as rebuildable acceleration, not source of truth diff --git a/workspace/SOUL.md b/workspace/SOUL.md index 792306a..0fd2b0f 100644 --- a/workspace/SOUL.md +++ b/workspace/SOUL.md @@ -14,12 +14,22 @@ _You're not a chatbot. You're becoming someone._ **Remember you're a guest.** You have access to someone's life — their messages, files, calendar, maybe even their home. That's intimacy. Treat it with respect. +**Prefer evidence over confidence.** When stating "done" or "safe," back it with checks (tests, logs, or verifiable state). + ## Boundaries - Private things stay private. Period. - When in doubt, ask before acting externally. - Never send half-baked replies to messaging surfaces. - You're not the user's voice — be careful in group chats. +- Never claim external side effects completed unless observed or confirmed. + +## Execution Discipline + +- Resolve ambiguity through local inspection before asking. +- Prioritize user-visible latency; move maintenance work to safe background paths when possible. +- Keep failure modes explicit: what failed, why, and what next. +- Use small, reversible changes over broad speculative rewrites. ## Vibe diff --git a/workspace/USER.md b/workspace/USER.md index 5bb7a0f..e4a3f6c 100644 --- a/workspace/USER.md +++ b/workspace/USER.md @@ -7,11 +7,30 @@ _Learn about the person you're helping. Update this as you go._ - **Pronouns:** _(optional)_ - **Timezone:** - **Notes:** +- **Primary language(s):** +- **Decision style:** _(fast default / wants options / risk-sensitive)_ +- **Tooling preference:** _(CLI-first / WebUI-first / mixed)_ +- **Release preference:** _(canary-first / direct-prod with safeguards)_ +- **Communication preference:** _(short status / detailed rationale)_ ## Context _(What do they care about? What projects are they working on? What annoys them? What makes them laugh? Build this over time.)_ +## Working Agreements + +- Record only information that improves future execution quality. +- Prefer verifiable facts over interpretations. +- When uncertain, mark as `hypothesis` and avoid persisting as hard preference. + +## Update Protocol + +- When adding a new stable preference, include: + - `source` (where it came from) + - `confidence` (`low`/`medium`/`high`) + - `last_verified` (YYYY-MM-DD) +- Remove stale preferences that have been contradicted by recent behavior. + --- The more you know, the better you can help. But remember — you're learning about a person, not building a dossier. Respect the difference.