harden node lifecycle with ttl heartbeat audits, action ACL checks, and status panel

This commit is contained in:
DBT
2026-02-25 01:16:43 +00:00
parent 44acfb8c24
commit 68145f8185
5 changed files with 103 additions and 9 deletions

View File

@@ -10,6 +10,7 @@ import (
"time"
"clawgo/pkg/config"
"clawgo/pkg/nodes"
"clawgo/pkg/providers"
)
func statusCmd() {
@@ -158,6 +159,23 @@ func statusCmd() {
}
fmt.Printf("Autonomy Control: %s\n", autonomyControlState(workspace))
}
ns := nodes.DefaultManager().List()
if len(ns) > 0 {
online := 0
caps := map[string]int{"run": 0, "camera": 0, "screen": 0, "location": 0, "canvas": 0}
for _, n := range ns {
if n.Online {
online++
}
if n.Capabilities.Run { caps["run"]++ }
if n.Capabilities.Camera { caps["camera"]++ }
if n.Capabilities.Screen { caps["screen"]++ }
if n.Capabilities.Location { caps["location"]++ }
if n.Capabilities.Canvas { caps["canvas"]++ }
}
fmt.Printf("Nodes: total=%d online=%d\n", len(ns), online)
fmt.Printf("Nodes Capabilities: run=%d camera=%d screen=%d location=%d canvas=%d\n", caps["run"], caps["camera"], caps["screen"], caps["location"], caps["canvas"])
}
}
}

View File

@@ -78,6 +78,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
toolsRegistry.Register(tools.NewExecTool(cfg.Tools.Shell, workspace, processManager))
toolsRegistry.Register(tools.NewProcessTool(processManager))
nodesManager := nodes.DefaultManager()
nodesManager.SetAuditPath(filepath.Join(workspace, "memory", "nodes-audit.jsonl"))
nodesManager.Upsert(nodes.NodeInfo{ID: "local", Name: "local", Capabilities: nodes.Capabilities{Run: true, Invoke: true, Camera: true, Screen: true, Location: true, Canvas: true}, Online: true})
nodesManager.RegisterHandler("local", func(req nodes.Request) nodes.Response {
switch req.Action {

View File

@@ -1,6 +1,9 @@
package nodes
import (
"encoding/json"
"os"
"path/filepath"
"sort"
"strings"
"sync"
@@ -13,10 +16,11 @@ const defaultNodeTTL = 60 * time.Second
type Handler func(req Request) Response
type Manager struct {
mu sync.RWMutex
nodes map[string]NodeInfo
handlers map[string]Handler
ttl time.Duration
mu sync.RWMutex
nodes map[string]NodeInfo
handlers map[string]Handler
ttl time.Duration
auditPath string
}
var defaultManager = NewManager()
@@ -29,12 +33,34 @@ func NewManager() *Manager {
return m
}
func (m *Manager) SetAuditPath(path string) {
m.mu.Lock()
defer m.mu.Unlock()
m.auditPath = strings.TrimSpace(path)
}
func (m *Manager) Upsert(info NodeInfo) {
m.mu.Lock()
defer m.mu.Unlock()
info.LastSeenAt = time.Now().UTC()
now := time.Now().UTC()
old, existed := m.nodes[info.ID]
info.LastSeenAt = now
info.Online = true
if existed {
if info.RegisteredAt.IsZero() {
info.RegisteredAt = old.RegisteredAt
}
if strings.TrimSpace(info.Endpoint) == "" {
info.Endpoint = old.Endpoint
}
if strings.TrimSpace(info.Token) == "" {
info.Token = old.Token
}
} else if info.RegisteredAt.IsZero() {
info.RegisteredAt = now
}
m.nodes[info.ID] = info
m.appendAudit("upsert", info.ID, map[string]interface{}{"existed": existed, "endpoint": info.Endpoint, "version": info.Version})
}
func (m *Manager) MarkOffline(id string) {
@@ -95,7 +121,20 @@ func (m *Manager) SupportsAction(nodeID, action string) bool {
if !ok || !n.Online {
return false
}
switch strings.ToLower(strings.TrimSpace(action)) {
action = strings.ToLower(strings.TrimSpace(action))
if len(n.Actions) > 0 {
allowed := false
for _, a := range n.Actions {
if strings.ToLower(strings.TrimSpace(a)) == action {
allowed = true
break
}
}
if !allowed {
return false
}
}
switch action {
case "run":
return n.Capabilities.Run
case "camera_snap", "camera_clip":
@@ -154,12 +193,38 @@ func (m *Manager) reaperLoop() {
for range t.C {
cutoff := time.Now().UTC().Add(-m.ttl)
m.mu.Lock()
offlined := make([]string, 0)
for id, n := range m.nodes {
if n.Online && !n.LastSeenAt.IsZero() && n.LastSeenAt.Before(cutoff) {
n.Online = false
m.nodes[id] = n
offlined = append(offlined, id)
}
}
m.mu.Unlock()
for _, id := range offlined {
m.appendAudit("offline_ttl", id, nil)
}
}
}
func (m *Manager) appendAudit(event, nodeID string, data map[string]interface{}) {
m.mu.RLock()
path := m.auditPath
m.mu.RUnlock()
if strings.TrimSpace(path) == "" {
return
}
_ = os.MkdirAll(filepath.Dir(path), 0755)
row := map[string]interface{}{"time": time.Now().UTC().Format(time.RFC3339), "event": event, "node": nodeID}
for k, v := range data {
row[k] = v
}
b, _ := json.Marshal(row)
f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return
}
defer f.Close()
_, _ = f.Write(append(b, '\n'))
}

View File

@@ -22,6 +22,8 @@ type NodeInfo struct {
Endpoint string `json:"endpoint,omitempty"`
Token string `json:"token,omitempty"`
Capabilities Capabilities `json:"capabilities"`
Actions []string `json:"actions,omitempty"`
RegisteredAt time.Time `json:"registered_at,omitempty"`
LastSeenAt time.Time `json:"last_seen_at"`
Online bool `json:"online"`
}

View File

@@ -82,10 +82,18 @@ func (t *NodesTool) Execute(ctx context.Context, args map[string]interface{}) (s
reqArgs["command"] = cmd
}
if facing, _ := args["facing"].(string); strings.TrimSpace(facing) != "" {
reqArgs["facing"] = strings.TrimSpace(facing)
f := strings.ToLower(strings.TrimSpace(facing))
if f != "front" && f != "back" && f != "both" {
return "", fmt.Errorf("invalid_args: facing must be front|back|both")
}
reqArgs["facing"] = f
}
if d, ok := args["duration_ms"].(float64); ok && d > 0 {
reqArgs["duration_ms"] = int(d)
if d, ok := args["duration_ms"].(float64); ok {
di := int(d)
if di <= 0 || di > 300000 {
return "", fmt.Errorf("invalid_args: duration_ms must be in 1..300000")
}
reqArgs["duration_ms"] = di
}
resp, err := t.router.Dispatch(ctx, nodes.Request{Action: action, Node: nodeID, Args: reqArgs}, mode)
if err != nil {