diff --git a/cmd/clawgo/cmd_status.go b/cmd/clawgo/cmd_status.go index 99a2fad..52c129b 100644 --- a/cmd/clawgo/cmd_status.go +++ b/cmd/clawgo/cmd_status.go @@ -10,6 +10,7 @@ import ( "time" "clawgo/pkg/config" + "clawgo/pkg/nodes" "clawgo/pkg/providers" ) func statusCmd() { @@ -158,6 +159,23 @@ func statusCmd() { } fmt.Printf("Autonomy Control: %s\n", autonomyControlState(workspace)) } + ns := nodes.DefaultManager().List() + if len(ns) > 0 { + online := 0 + caps := map[string]int{"run": 0, "camera": 0, "screen": 0, "location": 0, "canvas": 0} + for _, n := range ns { + if n.Online { + online++ + } + if n.Capabilities.Run { caps["run"]++ } + if n.Capabilities.Camera { caps["camera"]++ } + if n.Capabilities.Screen { caps["screen"]++ } + if n.Capabilities.Location { caps["location"]++ } + if n.Capabilities.Canvas { caps["canvas"]++ } + } + fmt.Printf("Nodes: total=%d online=%d\n", len(ns), online) + fmt.Printf("Nodes Capabilities: run=%d camera=%d screen=%d location=%d canvas=%d\n", caps["run"], caps["camera"], caps["screen"], caps["location"], caps["canvas"]) + } } } diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 322626f..4416337 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -78,6 +78,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers toolsRegistry.Register(tools.NewExecTool(cfg.Tools.Shell, workspace, processManager)) toolsRegistry.Register(tools.NewProcessTool(processManager)) nodesManager := nodes.DefaultManager() + nodesManager.SetAuditPath(filepath.Join(workspace, "memory", "nodes-audit.jsonl")) nodesManager.Upsert(nodes.NodeInfo{ID: "local", Name: "local", Capabilities: nodes.Capabilities{Run: true, Invoke: true, Camera: true, Screen: true, Location: true, Canvas: true}, Online: true}) nodesManager.RegisterHandler("local", func(req nodes.Request) nodes.Response { switch req.Action { diff --git a/pkg/nodes/manager.go b/pkg/nodes/manager.go index 04e044f..d7128b8 100644 --- a/pkg/nodes/manager.go +++ b/pkg/nodes/manager.go @@ -1,6 +1,9 @@ package nodes import ( + "encoding/json" + "os" + "path/filepath" "sort" "strings" "sync" @@ -13,10 +16,11 @@ const defaultNodeTTL = 60 * time.Second type Handler func(req Request) Response type Manager struct { - mu sync.RWMutex - nodes map[string]NodeInfo - handlers map[string]Handler - ttl time.Duration + mu sync.RWMutex + nodes map[string]NodeInfo + handlers map[string]Handler + ttl time.Duration + auditPath string } var defaultManager = NewManager() @@ -29,12 +33,34 @@ func NewManager() *Manager { return m } +func (m *Manager) SetAuditPath(path string) { + m.mu.Lock() + defer m.mu.Unlock() + m.auditPath = strings.TrimSpace(path) +} + func (m *Manager) Upsert(info NodeInfo) { m.mu.Lock() defer m.mu.Unlock() - info.LastSeenAt = time.Now().UTC() + now := time.Now().UTC() + old, existed := m.nodes[info.ID] + info.LastSeenAt = now info.Online = true + if existed { + if info.RegisteredAt.IsZero() { + info.RegisteredAt = old.RegisteredAt + } + if strings.TrimSpace(info.Endpoint) == "" { + info.Endpoint = old.Endpoint + } + if strings.TrimSpace(info.Token) == "" { + info.Token = old.Token + } + } else if info.RegisteredAt.IsZero() { + info.RegisteredAt = now + } m.nodes[info.ID] = info + m.appendAudit("upsert", info.ID, map[string]interface{}{"existed": existed, "endpoint": info.Endpoint, "version": info.Version}) } func (m *Manager) MarkOffline(id string) { @@ -95,7 +121,20 @@ func (m *Manager) SupportsAction(nodeID, action string) bool { if !ok || !n.Online { return false } - switch strings.ToLower(strings.TrimSpace(action)) { + action = strings.ToLower(strings.TrimSpace(action)) + if len(n.Actions) > 0 { + allowed := false + for _, a := range n.Actions { + if strings.ToLower(strings.TrimSpace(a)) == action { + allowed = true + break + } + } + if !allowed { + return false + } + } + switch action { case "run": return n.Capabilities.Run case "camera_snap", "camera_clip": @@ -154,12 +193,38 @@ func (m *Manager) reaperLoop() { for range t.C { cutoff := time.Now().UTC().Add(-m.ttl) m.mu.Lock() + offlined := make([]string, 0) for id, n := range m.nodes { if n.Online && !n.LastSeenAt.IsZero() && n.LastSeenAt.Before(cutoff) { n.Online = false m.nodes[id] = n + offlined = append(offlined, id) } } m.mu.Unlock() + for _, id := range offlined { + m.appendAudit("offline_ttl", id, nil) + } } } + +func (m *Manager) appendAudit(event, nodeID string, data map[string]interface{}) { + m.mu.RLock() + path := m.auditPath + m.mu.RUnlock() + if strings.TrimSpace(path) == "" { + return + } + _ = os.MkdirAll(filepath.Dir(path), 0755) + row := map[string]interface{}{"time": time.Now().UTC().Format(time.RFC3339), "event": event, "node": nodeID} + for k, v := range data { + row[k] = v + } + b, _ := json.Marshal(row) + f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + return + } + defer f.Close() + _, _ = f.Write(append(b, '\n')) +} diff --git a/pkg/nodes/types.go b/pkg/nodes/types.go index 20611e2..9205c86 100644 --- a/pkg/nodes/types.go +++ b/pkg/nodes/types.go @@ -22,6 +22,8 @@ type NodeInfo struct { Endpoint string `json:"endpoint,omitempty"` Token string `json:"token,omitempty"` Capabilities Capabilities `json:"capabilities"` + Actions []string `json:"actions,omitempty"` + RegisteredAt time.Time `json:"registered_at,omitempty"` LastSeenAt time.Time `json:"last_seen_at"` Online bool `json:"online"` } diff --git a/pkg/tools/nodes_tool.go b/pkg/tools/nodes_tool.go index 7acadbf..a35595b 100644 --- a/pkg/tools/nodes_tool.go +++ b/pkg/tools/nodes_tool.go @@ -82,10 +82,18 @@ func (t *NodesTool) Execute(ctx context.Context, args map[string]interface{}) (s reqArgs["command"] = cmd } if facing, _ := args["facing"].(string); strings.TrimSpace(facing) != "" { - reqArgs["facing"] = strings.TrimSpace(facing) + f := strings.ToLower(strings.TrimSpace(facing)) + if f != "front" && f != "back" && f != "both" { + return "", fmt.Errorf("invalid_args: facing must be front|back|both") + } + reqArgs["facing"] = f } - if d, ok := args["duration_ms"].(float64); ok && d > 0 { - reqArgs["duration_ms"] = int(d) + if d, ok := args["duration_ms"].(float64); ok { + di := int(d) + if di <= 0 || di > 300000 { + return "", fmt.Errorf("invalid_args: duration_ms must be in 1..300000") + } + reqArgs["duration_ms"] = di } resp, err := t.router.Dispatch(ctx, nodes.Request{Action: action, Node: nodeID, Args: reqArgs}, mode) if err != nil {