diff --git a/cmd/clawgo/cmd_gateway.go b/cmd/clawgo/cmd_gateway.go index fd9ea26..7dde504 100644 --- a/cmd/clawgo/cmd_gateway.go +++ b/cmd/clawgo/cmd_gateway.go @@ -15,6 +15,7 @@ import ( "time" "clawgo/pkg/agent" + "clawgo/pkg/autonomy" "clawgo/pkg/bus" "clawgo/pkg/channels" "clawgo/pkg/config" @@ -100,6 +101,7 @@ func gatewayCmd() { }) configureCronServiceRuntime(cronService, cfg) heartbeatService := buildHeartbeatService(cfg, msgBus) + autonomyEngine := buildAutonomyEngine(cfg, msgBus) sentinelService := sentinel.NewService( getConfigPath(), cfg.WorkspacePath(), @@ -152,6 +154,10 @@ func gatewayCmd() { fmt.Printf("Error starting heartbeat service: %v\n", err) } fmt.Println("✓ Heartbeat service started") + autonomyEngine.Start() + if cfg.Agents.Defaults.Autonomy.Enabled { + fmt.Println("✓ Autonomy engine started") + } if cfg.Sentinel.Enabled { sentinelService.Start() fmt.Println("✓ Sentinel service started") @@ -181,10 +187,13 @@ func gatewayCmd() { } configureCronServiceRuntime(cronService, newCfg) heartbeatService.Stop() + autonomyEngine.Stop() heartbeatService = buildHeartbeatService(newCfg, msgBus) + autonomyEngine = buildAutonomyEngine(newCfg, msgBus) if err := heartbeatService.Start(); err != nil { fmt.Printf("Error starting heartbeat service: %v\n", err) } + autonomyEngine.Start() if reflect.DeepEqual(cfg, newCfg) { fmt.Println("✓ Config unchanged, skip reload") @@ -274,6 +283,7 @@ func gatewayCmd() { fmt.Println("\nShutting down...") cancel() heartbeatService.Stop() + autonomyEngine.Stop() sentinelService.Stop() cronService.Stop() agentLoop.Stop() @@ -628,3 +638,18 @@ func buildHeartbeatService(cfg *config.Config, msgBus *bus.MessageBus) *heartbea return "queued", nil }, hbInterval, cfg.Agents.Defaults.Heartbeat.Enabled, cfg.Agents.Defaults.Heartbeat.PromptTemplate) } + +func buildAutonomyEngine(cfg *config.Config, msgBus *bus.MessageBus) *autonomy.Engine { + a := cfg.Agents.Defaults.Autonomy + return autonomy.NewEngine(autonomy.Options{ + Enabled: a.Enabled, + TickIntervalSec: a.TickIntervalSec, + MinRunIntervalSec: a.MinRunIntervalSec, + MaxPendingDurationSec: a.MaxPendingDurationSec, + MaxConsecutiveStalls: a.MaxConsecutiveStalls, + MaxDispatchPerTick: a.MaxDispatchPerTick, + Workspace: cfg.WorkspacePath(), + DefaultNotifyChannel: a.NotifyChannel, + DefaultNotifyChatID: a.NotifyChatID, + }, msgBus) +} diff --git a/config.example.json b/config.example.json index 13d171c..ece649d 100644 --- a/config.example.json +++ b/config.example.json @@ -13,6 +13,16 @@ "ack_max_chars": 64, "prompt_template": "Read HEARTBEAT.md if it exists (workspace context). Follow it strictly. Do not infer or repeat old tasks from prior chats. If nothing needs attention, reply HEARTBEAT_OK." }, + "autonomy": { + "enabled": false, + "tick_interval_sec": 30, + "min_run_interval_sec": 20, + "max_pending_duration_sec": 180, + "max_consecutive_stalls": 3, + "max_dispatch_per_tick": 2, + "notify_channel": "", + "notify_chat_id": "" + }, "texts": { "no_response_fallback": "I've completed processing but have no response to give.", "think_only_fallback": "Thinking process completed.", diff --git a/pkg/autonomy/engine.go b/pkg/autonomy/engine.go new file mode 100644 index 0000000..1d89457 --- /dev/null +++ b/pkg/autonomy/engine.go @@ -0,0 +1,249 @@ +package autonomy + +import ( + "context" + "crypto/sha1" + "encoding/hex" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "clawgo/pkg/bus" + "clawgo/pkg/lifecycle" +) + +type Options struct { + Enabled bool + TickIntervalSec int + MinRunIntervalSec int + MaxPendingDurationSec int + MaxConsecutiveStalls int + MaxDispatchPerTick int + Workspace string + DefaultNotifyChannel string + DefaultNotifyChatID string +} + +type taskState struct { + ID string + Content string + Status string // idle|running|waiting|blocked|completed + LastRunAt time.Time + LastAutonomyAt time.Time + ConsecutiveStall int +} + +type Engine struct { + opts Options + bus *bus.MessageBus + runner *lifecycle.LoopRunner + + mu sync.Mutex + state map[string]*taskState +} + +func NewEngine(opts Options, msgBus *bus.MessageBus) *Engine { + if opts.TickIntervalSec <= 0 { + opts.TickIntervalSec = 30 + } + if opts.MinRunIntervalSec <= 0 { + opts.MinRunIntervalSec = 20 + } + if opts.MaxPendingDurationSec <= 0 { + opts.MaxPendingDurationSec = 180 + } + if opts.MaxConsecutiveStalls <= 0 { + opts.MaxConsecutiveStalls = 3 + } + if opts.MaxDispatchPerTick <= 0 { + opts.MaxDispatchPerTick = 2 + } + return &Engine{ + opts: opts, + bus: msgBus, + runner: lifecycle.NewLoopRunner(), + state: map[string]*taskState{}, + } +} + +func (e *Engine) Start() { + if !e.opts.Enabled { + return + } + e.runner.Start(e.runLoop) +} + +func (e *Engine) Stop() { + e.runner.Stop() +} + +func (e *Engine) runLoop(stopCh <-chan struct{}) { + ticker := time.NewTicker(time.Duration(e.opts.TickIntervalSec) * time.Second) + defer ticker.Stop() + for { + select { + case <-stopCh: + return + case <-ticker.C: + e.tick() + } + } +} + +func (e *Engine) tick() { + todos := e.scanTodos() + now := time.Now() + + e.mu.Lock() + defer e.mu.Unlock() + + known := map[string]struct{}{} + for _, t := range todos { + known[t.ID] = struct{}{} + st, ok := e.state[t.ID] + if !ok { + e.state[t.ID] = &taskState{ID: t.ID, Content: t.Content, Status: "idle"} + continue + } + st.Content = t.Content + if st.Status == "completed" { + st.Status = "idle" + } + } + + // completed when removed from todo source + for id, st := range e.state { + if _, ok := known[id]; !ok { + if st.Status != "completed" { + st.Status = "completed" + e.sendCompletionNotification(st) + } + } + } + + dispatched := 0 + for _, st := range e.state { + if dispatched >= e.opts.MaxDispatchPerTick { + break + } + if st.Status == "completed" || st.Status == "blocked" { + continue + } + if !st.LastRunAt.IsZero() && now.Sub(st.LastRunAt) < time.Duration(e.opts.MinRunIntervalSec)*time.Second { + continue + } + if st.Status == "running" && now.Sub(st.LastRunAt) > time.Duration(e.opts.MaxPendingDurationSec)*time.Second { + st.ConsecutiveStall++ + if st.ConsecutiveStall > e.opts.MaxConsecutiveStalls { + st.Status = "blocked" + e.sendFailureNotification(st, "max consecutive stalls reached") + continue + } + } + + e.dispatchTask(st) + st.Status = "running" + st.LastRunAt = now + st.LastAutonomyAt = now + dispatched++ + } +} + +type todoItem struct { + ID string + Content string +} + +func (e *Engine) scanTodos() []todoItem { + var out []todoItem + if strings.TrimSpace(e.opts.Workspace) == "" { + return out + } + paths := []string{ + filepath.Join(e.opts.Workspace, "MEMORY.md"), + filepath.Join(e.opts.Workspace, "memory", time.Now().Format("2006-01-02")+".md"), + } + for _, p := range paths { + data, err := os.ReadFile(p) + if err != nil { + continue + } + for _, line := range strings.Split(string(data), "\n") { + t := strings.TrimSpace(line) + if strings.HasPrefix(t, "- [ ]") { + content := strings.TrimSpace(strings.TrimPrefix(t, "- [ ]")) + if content == "" { + continue + } + out = append(out, todoItem{ID: hashID(content), Content: content}) + continue + } + if strings.HasPrefix(strings.ToLower(t), "todo:") { + content := strings.TrimSpace(t[5:]) + if content == "" { + continue + } + out = append(out, todoItem{ID: hashID(content), Content: content}) + } + } + } + return out +} + +func (e *Engine) dispatchTask(st *taskState) { + content := fmt.Sprintf("Autonomy task (Plan -> Act -> Reflect):\n- Goal: %s\n- Requirements: concise progress update\n- If blocked, explain blocker and next retry hint", st.Content) + e.bus.PublishInbound(bus.InboundMessage{ + Channel: "system", + SenderID: "autonomy", + ChatID: "internal:autonomy", + Content: content, + SessionKey: "autonomy:" + st.ID, + Metadata: map[string]string{ + "trigger": "autonomy", + "task_id": st.ID, + "source": "memory_todo", + }, + }) +} + +func (e *Engine) sendCompletionNotification(st *taskState) { + if strings.TrimSpace(e.opts.DefaultNotifyChannel) == "" || strings.TrimSpace(e.opts.DefaultNotifyChatID) == "" { + return + } + e.bus.PublishOutbound(bus.OutboundMessage{ + Channel: e.opts.DefaultNotifyChannel, + ChatID: e.opts.DefaultNotifyChatID, + Content: fmt.Sprintf("[Autonomy] Task completed: %s", st.Content), + }) +} + +func (e *Engine) sendFailureNotification(st *taskState, reason string) { + if strings.TrimSpace(e.opts.DefaultNotifyChannel) == "" || strings.TrimSpace(e.opts.DefaultNotifyChatID) == "" { + return + } + e.bus.PublishOutbound(bus.OutboundMessage{ + Channel: e.opts.DefaultNotifyChannel, + ChatID: e.opts.DefaultNotifyChatID, + Content: fmt.Sprintf("[Autonomy] Task blocked: %s (%s)", st.Content, reason), + }) +} + +func hashID(s string) string { + sum := sha1.Sum([]byte(strings.ToLower(strings.TrimSpace(s)))) + return hex.EncodeToString(sum[:])[:12] +} + +func RunOnce(ctx context.Context, engine *Engine) { + if engine == nil { + return + } + select { + case <-ctx.Done(): + return + default: + engine.tick() + } +} diff --git a/pkg/config/config.go b/pkg/config/config.go index ce259ce..0a8a388 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -38,11 +38,23 @@ type AgentDefaults struct { Temperature float64 `json:"temperature" env:"CLAWGO_AGENTS_DEFAULTS_TEMPERATURE"` MaxToolIterations int `json:"max_tool_iterations" env:"CLAWGO_AGENTS_DEFAULTS_MAX_TOOL_ITERATIONS"` Heartbeat HeartbeatConfig `json:"heartbeat"` + Autonomy AutonomyConfig `json:"autonomy"` Texts AgentTextConfig `json:"texts"` ContextCompaction ContextCompactionConfig `json:"context_compaction"` RuntimeControl RuntimeControlConfig `json:"runtime_control"` } +type AutonomyConfig struct { + Enabled bool `json:"enabled" env:"CLAWGO_AGENTS_DEFAULTS_AUTONOMY_ENABLED"` + TickIntervalSec int `json:"tick_interval_sec" env:"CLAWGO_AGENTS_DEFAULTS_AUTONOMY_TICK_INTERVAL_SEC"` + MinRunIntervalSec int `json:"min_run_interval_sec" env:"CLAWGO_AGENTS_DEFAULTS_AUTONOMY_MIN_RUN_INTERVAL_SEC"` + MaxPendingDurationSec int `json:"max_pending_duration_sec" env:"CLAWGO_AGENTS_DEFAULTS_AUTONOMY_MAX_PENDING_DURATION_SEC"` + MaxConsecutiveStalls int `json:"max_consecutive_stalls" env:"CLAWGO_AGENTS_DEFAULTS_AUTONOMY_MAX_CONSECUTIVE_STALLS"` + MaxDispatchPerTick int `json:"max_dispatch_per_tick" env:"CLAWGO_AGENTS_DEFAULTS_AUTONOMY_MAX_DISPATCH_PER_TICK"` + NotifyChannel string `json:"notify_channel" env:"CLAWGO_AGENTS_DEFAULTS_AUTONOMY_NOTIFY_CHANNEL"` + NotifyChatID string `json:"notify_chat_id" env:"CLAWGO_AGENTS_DEFAULTS_AUTONOMY_NOTIFY_CHAT_ID"` +} + type AgentTextConfig struct { NoResponseFallback string `json:"no_response_fallback"` ThinkOnlyFallback string `json:"think_only_fallback"` @@ -284,6 +296,16 @@ func DefaultConfig() *Config { AckMaxChars: 64, PromptTemplate: "Read HEARTBEAT.md if it exists (workspace context). Follow it strictly. Do not infer or repeat old tasks from prior chats. If nothing needs attention, reply HEARTBEAT_OK.", }, + Autonomy: AutonomyConfig{ + Enabled: false, + TickIntervalSec: 30, + MinRunIntervalSec: 20, + MaxPendingDurationSec: 180, + MaxConsecutiveStalls: 3, + MaxDispatchPerTick: 2, + NotifyChannel: "", + NotifyChatID: "", + }, Texts: AgentTextConfig{ NoResponseFallback: "I've completed processing but have no response to give.", ThinkOnlyFallback: "Thinking process completed.", diff --git a/pkg/config/validate.go b/pkg/config/validate.go index 2590863..66865e7 100644 --- a/pkg/config/validate.go +++ b/pkg/config/validate.go @@ -82,6 +82,24 @@ func Validate(cfg *Config) []error { errs = append(errs, fmt.Errorf("agents.defaults.heartbeat.ack_max_chars must be > 0 when enabled=true")) } } + aut := cfg.Agents.Defaults.Autonomy + if aut.Enabled { + if aut.TickIntervalSec <= 0 { + errs = append(errs, fmt.Errorf("agents.defaults.autonomy.tick_interval_sec must be > 0 when enabled=true")) + } + if aut.MinRunIntervalSec <= 0 { + errs = append(errs, fmt.Errorf("agents.defaults.autonomy.min_run_interval_sec must be > 0 when enabled=true")) + } + if aut.MaxPendingDurationSec <= 0 { + errs = append(errs, fmt.Errorf("agents.defaults.autonomy.max_pending_duration_sec must be > 0 when enabled=true")) + } + if aut.MaxConsecutiveStalls <= 0 { + errs = append(errs, fmt.Errorf("agents.defaults.autonomy.max_consecutive_stalls must be > 0 when enabled=true")) + } + if aut.MaxDispatchPerTick <= 0 { + errs = append(errs, fmt.Errorf("agents.defaults.autonomy.max_dispatch_per_tick must be > 0 when enabled=true")) + } + } texts := cfg.Agents.Defaults.Texts if strings.TrimSpace(texts.NoResponseFallback) == "" { errs = append(errs, fmt.Errorf("agents.defaults.texts.no_response_fallback must be non-empty"))