diff --git a/cmd/clawgo/cmd_gateway.go b/cmd/clawgo/cmd_gateway.go index 89cd5ec..ec12e12 100644 --- a/cmd/clawgo/cmd_gateway.go +++ b/cmd/clawgo/cmd_gateway.go @@ -217,6 +217,9 @@ func gatewayCmd() { registryServer.SetSubagentHandler(func(cctx context.Context, action string, args map[string]interface{}) (interface{}, error) { return agentLoop.HandleSubagentRuntime(cctx, action, args) }) + registryServer.SetNodeDispatchHandler(func(cctx context.Context, req nodes.Request, mode string) (nodes.Response, error) { + return agentLoop.DispatchNodeRequest(cctx, req, mode) + }) registryServer.SetToolsCatalogHandler(func() interface{} { return agentLoop.GetToolCatalog() }) diff --git a/cmd/clawgo/cmd_node.go b/cmd/clawgo/cmd_node.go index 0656916..d7ab616 100644 --- a/cmd/clawgo/cmd_node.go +++ b/cmd/clawgo/cmd_node.go @@ -43,6 +43,7 @@ type nodeRegisterOptions struct { Version string Actions []string Models []string + Tags []string Agents []nodes.AgentInfo Capabilities nodes.Capabilities Watch bool @@ -124,6 +125,7 @@ func printNodeHelp() { fmt.Println(" --version Reported node version (default: current clawgo version)") fmt.Println(" --actions Supported actions, e.g. run,agent_task") fmt.Println(" --models Supported models, e.g. gpt-4o-mini") + fmt.Println(" --tags Node tags for dispatch policy, e.g. gpu,vision,build") fmt.Println(" --capabilities Capability flags: run,invoke,model,camera,screen,location,canvas") fmt.Println(" --watch Keep a websocket connection open and send heartbeats") fmt.Println(" --heartbeat-sec Heartbeat interval in seconds when --watch is set (default: 30)") @@ -270,6 +272,12 @@ func parseNodeRegisterArgs(args []string, cfg *config.Config) (nodeRegisterOptio return opts, err } opts.Models = splitCSV(v) + case "--tags": + v, err := next() + if err != nil { + return opts, err + } + opts.Tags = splitCSV(v) case "--capabilities": v, err := next() if err != nil { @@ -359,6 +367,7 @@ func buildNodeInfo(opts nodeRegisterOptions) nodes.NodeInfo { return nodes.NodeInfo{ ID: strings.TrimSpace(opts.ID), Name: strings.TrimSpace(opts.Name), + Tags: append([]string(nil), opts.Tags...), OS: strings.TrimSpace(opts.OS), Arch: strings.TrimSpace(opts.Arch), Version: strings.TrimSpace(opts.Version), diff --git a/cmd/clawgo/cmd_node_test.go b/cmd/clawgo/cmd_node_test.go index df9a2f0..8274f77 100644 --- a/cmd/clawgo/cmd_node_test.go +++ b/cmd/clawgo/cmd_node_test.go @@ -55,6 +55,19 @@ func TestParseNodeRegisterArgsDefaults(t *testing.T) { } } +func TestParseNodeRegisterArgsTags(t *testing.T) { + t.Parallel() + + cfg := config.DefaultConfig() + opts, err := parseNodeRegisterArgs([]string{"--id", "edge-dev", "--tags", "vision,gpu"}, cfg) + if err != nil { + t.Fatalf("parseNodeRegisterArgs failed: %v", err) + } + if len(opts.Tags) != 2 || opts.Tags[0] != "vision" || opts.Tags[1] != "gpu" { + t.Fatalf("unexpected tags: %+v", opts.Tags) + } +} + func TestPostNodeRegisterSendsNodeInfo(t *testing.T) { t.Parallel() diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 5df8eb0..80f6cd1 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -86,6 +86,31 @@ func (al *AgentLoop) SetNodeP2PTransport(t nodes.Transport) { al.nodeRouter.P2P = t } +func (al *AgentLoop) DispatchNodeRequest(ctx context.Context, req nodes.Request, mode string) (nodes.Response, error) { + if al == nil || al.tools == nil { + return nodes.Response{}, fmt.Errorf("agent loop not ready") + } + args := map[string]interface{}{ + "action": req.Action, + "node": req.Node, + "mode": mode, + "task": req.Task, + "model": req.Model, + } + if len(req.Args) > 0 { + args["args"] = req.Args + } + out, err := al.tools.Execute(ctx, "nodes", args) + if err != nil { + return nodes.Response{}, err + } + var resp nodes.Response + if err := json.Unmarshal([]byte(out), &resp); err != nil { + return nodes.Response{}, err + } + return resp, nil +} + // StartupCompactionReport provides startup memory/session maintenance stats. type StartupCompactionReport struct { TotalSessions int `json:"total_sessions"` @@ -149,6 +174,18 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers return nodes.Response{OK: false, Code: "unsupported_action", Node: "local", Action: req.Action, Error: "unsupported local simulated action"} } }) + nodeDispatchPolicy := nodes.DispatchPolicy{ + PreferLocal: cfg.Gateway.Nodes.Dispatch.PreferLocal, + PreferP2P: cfg.Gateway.Nodes.Dispatch.PreferP2P, + AllowRelayFallback: cfg.Gateway.Nodes.Dispatch.AllowRelayFallback, + ActionTags: cfg.Gateway.Nodes.Dispatch.ActionTags, + AgentTags: cfg.Gateway.Nodes.Dispatch.AgentTags, + AllowActions: cfg.Gateway.Nodes.Dispatch.AllowActions, + DenyActions: cfg.Gateway.Nodes.Dispatch.DenyActions, + AllowAgents: cfg.Gateway.Nodes.Dispatch.AllowAgents, + DenyAgents: cfg.Gateway.Nodes.Dispatch.DenyAgents, + } + nodesManager.SetDispatchPolicy(nodeDispatchPolicy) var nodeP2P nodes.Transport if cfg.Gateway.Nodes.P2P.Enabled { switch strings.ToLower(strings.TrimSpace(cfg.Gateway.Nodes.P2P.Transport)) { @@ -159,7 +196,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers nodeP2P = &nodes.WebsocketP2PTransport{Manager: nodesManager} } } - nodesRouter := &nodes.Router{P2P: nodeP2P, Relay: &nodes.HTTPRelayTransport{Manager: nodesManager}} + nodesRouter := &nodes.Router{P2P: nodeP2P, Relay: &nodes.HTTPRelayTransport{Manager: nodesManager}, Policy: nodesManager.DispatchPolicy()} toolsRegistry.Register(tools.NewNodesTool(nodesManager, nodesRouter, filepath.Join(workspace, "memory", "nodes-dispatch-audit.jsonl"))) if cs != nil { diff --git a/pkg/api/server.go b/pkg/api/server.go index 4353448..a92e80e 100644 --- a/pkg/api/server.go +++ b/pkg/api/server.go @@ -7,6 +7,8 @@ import ( "bytes" "compress/gzip" "context" + "crypto/sha1" + "encoding/base64" "encoding/json" "errors" "fmt" @@ -33,32 +35,35 @@ import ( ) type Server struct { - addr string - token string - mgr *nodes.Manager - server *http.Server - nodeConnMu sync.Mutex - nodeConnIDs map[string]string - nodeSockets map[string]*nodeSocketConn - nodeWebRTC *nodes.WebRTCTransport - nodeP2PStatus func() map[string]interface{} - gatewayVersion string - webuiVersion string - configPath string - workspacePath string - logFilePath string - onChat func(ctx context.Context, sessionKey, content string) (string, error) - onChatHistory func(sessionKey string) []map[string]interface{} - onConfigAfter func() - onCron func(action string, args map[string]interface{}) (interface{}, error) - onSubagents func(ctx context.Context, action string, args map[string]interface{}) (interface{}, error) - onToolsCatalog func() interface{} - webUIDir string - ekgCacheMu sync.Mutex - ekgCachePath string - ekgCacheStamp time.Time - ekgCacheSize int64 - ekgCacheRows []map[string]interface{} + addr string + token string + mgr *nodes.Manager + server *http.Server + nodeConnMu sync.Mutex + nodeConnIDs map[string]string + nodeSockets map[string]*nodeSocketConn + nodeWebRTC *nodes.WebRTCTransport + nodeP2PStatus func() map[string]interface{} + artifactStatsMu sync.Mutex + artifactStats map[string]interface{} + gatewayVersion string + webuiVersion string + configPath string + workspacePath string + logFilePath string + onChat func(ctx context.Context, sessionKey, content string) (string, error) + onChatHistory func(sessionKey string) []map[string]interface{} + onConfigAfter func() + onCron func(action string, args map[string]interface{}) (interface{}, error) + onSubagents func(ctx context.Context, action string, args map[string]interface{}) (interface{}, error) + onNodeDispatch func(ctx context.Context, req nodes.Request, mode string) (nodes.Response, error) + onToolsCatalog func() interface{} + webUIDir string + ekgCacheMu sync.Mutex + ekgCachePath string + ekgCacheStamp time.Time + ekgCacheSize int64 + ekgCacheRows []map[string]interface{} } var nodesWebsocketUpgrader = websocket.Upgrader{ @@ -74,11 +79,12 @@ func NewServer(host string, port int, token string, mgr *nodes.Manager) *Server port = 7788 } return &Server{ - addr: fmt.Sprintf("%s:%d", addr, port), - token: strings.TrimSpace(token), - mgr: mgr, - nodeConnIDs: map[string]string{}, - nodeSockets: map[string]*nodeSocketConn{}, + addr: fmt.Sprintf("%s:%d", addr, port), + token: strings.TrimSpace(token), + mgr: mgr, + nodeConnIDs: map[string]string{}, + nodeSockets: map[string]*nodeSocketConn{}, + artifactStats: map[string]interface{}{}, } } @@ -114,6 +120,9 @@ func (s *Server) SetCronHandler(fn func(action string, args map[string]interface func (s *Server) SetSubagentHandler(fn func(ctx context.Context, action string, args map[string]interface{}) (interface{}, error)) { s.onSubagents = fn } +func (s *Server) SetNodeDispatchHandler(fn func(ctx context.Context, req nodes.Request, mode string) (nodes.Response, error)) { + s.onNodeDispatch = fn +} func (s *Server) SetToolsCatalogHandler(fn func() interface{}) { s.onToolsCatalog = fn } func (s *Server) SetWebUIDir(dir string) { s.webUIDir = strings.TrimSpace(dir) } func (s *Server) SetGatewayVersion(v string) { s.gatewayVersion = strings.TrimSpace(v) } @@ -227,6 +236,12 @@ func (s *Server) Start(ctx context.Context) error { mux.HandleFunc("/webui/api/upload", s.handleWebUIUpload) mux.HandleFunc("/webui/api/nodes", s.handleWebUINodes) mux.HandleFunc("/webui/api/node_dispatches", s.handleWebUINodeDispatches) + mux.HandleFunc("/webui/api/node_dispatches/replay", s.handleWebUINodeDispatchReplay) + mux.HandleFunc("/webui/api/node_artifacts", s.handleWebUINodeArtifacts) + mux.HandleFunc("/webui/api/node_artifacts/export", s.handleWebUINodeArtifactsExport) + mux.HandleFunc("/webui/api/node_artifacts/download", s.handleWebUINodeArtifactDownload) + mux.HandleFunc("/webui/api/node_artifacts/delete", s.handleWebUINodeArtifactDelete) + mux.HandleFunc("/webui/api/node_artifacts/prune", s.handleWebUINodeArtifactPrune) mux.HandleFunc("/webui/api/cron", s.handleWebUICron) mux.HandleFunc("/webui/api/skills", s.handleWebUISkills) mux.HandleFunc("/webui/api/sessions", s.handleWebUISessions) @@ -1075,14 +1090,121 @@ func (s *Server) webUINodesPayload(ctx context.Context) map[string]interface{} { if s.nodeP2PStatus != nil { p2p = s.nodeP2PStatus() } + dispatches := s.webUINodesDispatchPayload(12) return map[string]interface{}{ - "nodes": list, - "trees": s.buildNodeAgentTrees(ctx, list), - "p2p": p2p, - "dispatches": s.webUINodesDispatchPayload(12), + "nodes": list, + "trees": s.buildNodeAgentTrees(ctx, list), + "p2p": p2p, + "dispatches": dispatches, + "alerts": s.webUINodeAlertsPayload(list, p2p, dispatches), + "artifact_retention": s.artifactStatsSnapshot(), } } +func (s *Server) webUINodeAlertsPayload(nodeList []nodes.NodeInfo, p2p map[string]interface{}, dispatches []map[string]interface{}) []map[string]interface{} { + alerts := make([]map[string]interface{}, 0) + for _, node := range nodeList { + nodeID := strings.TrimSpace(node.ID) + if nodeID == "" || nodeID == "local" { + continue + } + if !node.Online { + alerts = append(alerts, map[string]interface{}{ + "severity": "critical", + "kind": "node_offline", + "node": nodeID, + "title": "Node offline", + "detail": fmt.Sprintf("node %s is offline", nodeID), + }) + } + } + if sessions, ok := p2p["nodes"].([]map[string]interface{}); ok { + for _, session := range sessions { + appendNodeSessionAlert(&alerts, session) + } + } else if sessions, ok := p2p["nodes"].([]interface{}); ok { + for _, raw := range sessions { + if session, ok := raw.(map[string]interface{}); ok { + appendNodeSessionAlert(&alerts, session) + } + } + } + failuresByNode := map[string]int{} + for _, row := range dispatches { + nodeID := strings.TrimSpace(fmt.Sprint(row["node"])) + if nodeID == "" { + continue + } + if ok, _ := row["ok"].(bool); ok { + continue + } + failuresByNode[nodeID]++ + } + for nodeID, count := range failuresByNode { + if count < 2 { + continue + } + alerts = append(alerts, map[string]interface{}{ + "severity": "warning", + "kind": "dispatch_failures", + "node": nodeID, + "title": "Repeated dispatch failures", + "detail": fmt.Sprintf("node %s has %d recent failed dispatches", nodeID, count), + "count": count, + }) + } + return alerts +} + +func appendNodeSessionAlert(alerts *[]map[string]interface{}, session map[string]interface{}) { + nodeID := strings.TrimSpace(fmt.Sprint(session["node"])) + if nodeID == "" { + return + } + status := strings.ToLower(strings.TrimSpace(fmt.Sprint(session["status"]))) + retryCount := int(int64Value(session["retry_count"])) + lastError := strings.TrimSpace(fmt.Sprint(session["last_error"])) + switch { + case status == "failed" || status == "closed": + *alerts = append(*alerts, map[string]interface{}{ + "severity": "critical", + "kind": "p2p_session_down", + "node": nodeID, + "title": "P2P session down", + "detail": firstNonEmptyString(lastError, fmt.Sprintf("node %s p2p session is %s", nodeID, status)), + }) + case retryCount >= 3 || (status == "connecting" && retryCount >= 2): + *alerts = append(*alerts, map[string]interface{}{ + "severity": "warning", + "kind": "p2p_session_unstable", + "node": nodeID, + "title": "P2P session unstable", + "detail": firstNonEmptyString(lastError, fmt.Sprintf("node %s p2p session retry_count=%d", nodeID, retryCount)), + "count": retryCount, + }) + } +} + +func int64Value(v interface{}) int64 { + switch value := v.(type) { + case int: + return int64(value) + case int32: + return int64(value) + case int64: + return value + case float32: + return int64(value) + case float64: + return int64(value) + case json.Number: + if n, err := value.Int64(); err == nil { + return n + } + } + return 0 +} + func (s *Server) webUINodesDispatchPayload(limit int) []map[string]interface{} { workspace := strings.TrimSpace(s.workspacePath) if workspace == "" { @@ -1115,6 +1237,381 @@ func (s *Server) webUINodesDispatchPayload(limit int) []map[string]interface{} { return out } +func (s *Server) webUINodeArtifactsPayload(limit int) []map[string]interface{} { + return s.webUINodeArtifactsPayloadFiltered("", "", "", limit) +} + +func (s *Server) webUINodeArtifactsPayloadFiltered(nodeFilter, actionFilter, kindFilter string, limit int) []map[string]interface{} { + nodeFilter = strings.TrimSpace(nodeFilter) + actionFilter = strings.TrimSpace(actionFilter) + kindFilter = strings.TrimSpace(kindFilter) + rows, _ := s.readNodeDispatchAuditRows() + if len(rows) == 0 { + return []map[string]interface{}{} + } + out := make([]map[string]interface{}, 0, limit) + for rowIndex := len(rows) - 1; rowIndex >= 0; rowIndex-- { + row := rows[rowIndex] + artifacts, _ := row["artifacts"].([]interface{}) + for artifactIndex, raw := range artifacts { + artifact, ok := raw.(map[string]interface{}) + if !ok { + continue + } + item := map[string]interface{}{ + "id": buildNodeArtifactID(row, artifact, artifactIndex), + "time": row["time"], + "node": row["node"], + "action": row["action"], + "used_transport": row["used_transport"], + "ok": row["ok"], + "error": row["error"], + } + for _, key := range []string{"name", "kind", "mime_type", "storage", "path", "url", "content_text", "content_base64", "source_path", "size_bytes"} { + if value, ok := artifact[key]; ok { + item[key] = value + } + } + if nodeFilter != "" && !strings.EqualFold(strings.TrimSpace(fmt.Sprint(item["node"])), nodeFilter) { + continue + } + if actionFilter != "" && !strings.EqualFold(strings.TrimSpace(fmt.Sprint(item["action"])), actionFilter) { + continue + } + if kindFilter != "" && !strings.EqualFold(strings.TrimSpace(fmt.Sprint(item["kind"])), kindFilter) { + continue + } + out = append(out, item) + if limit > 0 && len(out) >= limit { + return out + } + } + } + return out +} + +func (s *Server) readNodeDispatchAuditRows() ([]map[string]interface{}, string) { + workspace := strings.TrimSpace(s.workspacePath) + if workspace == "" { + return nil, "" + } + path := filepath.Join(workspace, "memory", "nodes-dispatch-audit.jsonl") + data, err := os.ReadFile(path) + if err != nil { + return nil, path + } + lines := strings.Split(strings.TrimSpace(string(data)), "\n") + rows := make([]map[string]interface{}, 0, len(lines)) + for _, line := range lines { + line = strings.TrimSpace(line) + if line == "" { + continue + } + row := map[string]interface{}{} + if err := json.Unmarshal([]byte(line), &row); err != nil { + continue + } + rows = append(rows, row) + } + return rows, path +} + +func buildNodeArtifactID(row, artifact map[string]interface{}, artifactIndex int) string { + seed := fmt.Sprintf("%v|%v|%v|%d|%v|%v|%v", + row["time"], row["node"], row["action"], artifactIndex, + artifact["name"], artifact["source_path"], artifact["path"], + ) + sum := sha1.Sum([]byte(seed)) + return fmt.Sprintf("%x", sum[:8]) +} + +func sanitizeZipEntryName(name string) string { + name = strings.TrimSpace(name) + if name == "" { + return "artifact.bin" + } + name = strings.ReplaceAll(name, "\\", "/") + name = filepath.Base(name) + name = strings.Map(func(r rune) rune { + switch { + case r >= 'a' && r <= 'z': + return r + case r >= 'A' && r <= 'Z': + return r + case r >= '0' && r <= '9': + return r + case r == '.', r == '-', r == '_': + return r + default: + return '_' + } + }, name) + if strings.Trim(name, "._") == "" { + return "artifact.bin" + } + return name +} + +func (s *Server) findNodeArtifactByID(id string) (map[string]interface{}, bool) { + for _, item := range s.webUINodeArtifactsPayload(10000) { + if strings.TrimSpace(fmt.Sprint(item["id"])) == id { + return item, true + } + } + return nil, false +} + +func resolveArtifactPath(workspace, raw string) string { + raw = strings.TrimSpace(raw) + if raw == "" { + return "" + } + if filepath.IsAbs(raw) { + clean := filepath.Clean(raw) + if info, err := os.Stat(clean); err == nil && !info.IsDir() { + return clean + } + return "" + } + root := strings.TrimSpace(workspace) + if root == "" { + return "" + } + clean := filepath.Clean(filepath.Join(root, raw)) + if rel, err := filepath.Rel(root, clean); err != nil || strings.HasPrefix(rel, "..") { + return "" + } + if info, err := os.Stat(clean); err == nil && !info.IsDir() { + return clean + } + return "" +} + +func readArtifactBytes(workspace string, item map[string]interface{}) ([]byte, string, error) { + if content := strings.TrimSpace(fmt.Sprint(item["content_base64"])); content != "" { + raw, err := base64.StdEncoding.DecodeString(content) + if err != nil { + return nil, "", err + } + return raw, strings.TrimSpace(fmt.Sprint(item["mime_type"])), nil + } + for _, rawPath := range []string{fmt.Sprint(item["source_path"]), fmt.Sprint(item["path"])} { + if path := resolveArtifactPath(workspace, rawPath); path != "" { + b, err := os.ReadFile(path) + if err != nil { + return nil, "", err + } + return b, strings.TrimSpace(fmt.Sprint(item["mime_type"])), nil + } + } + if contentText := fmt.Sprint(item["content_text"]); strings.TrimSpace(contentText) != "" { + return []byte(contentText), "text/plain; charset=utf-8", nil + } + return nil, "", fmt.Errorf("artifact content unavailable") +} + +func (s *Server) filteredNodeDispatches(nodeFilter, actionFilter string, limit int) []map[string]interface{} { + items := s.webUINodesDispatchPayload(limit) + if nodeFilter == "" && actionFilter == "" { + return items + } + out := make([]map[string]interface{}, 0, len(items)) + for _, item := range items { + if nodeFilter != "" && !strings.EqualFold(strings.TrimSpace(fmt.Sprint(item["node"])), nodeFilter) { + continue + } + if actionFilter != "" && !strings.EqualFold(strings.TrimSpace(fmt.Sprint(item["action"])), actionFilter) { + continue + } + out = append(out, item) + } + return out +} + +func filteredNodeAlerts(alerts []map[string]interface{}, nodeFilter string) []map[string]interface{} { + if nodeFilter == "" { + return alerts + } + out := make([]map[string]interface{}, 0, len(alerts)) + for _, item := range alerts { + if strings.EqualFold(strings.TrimSpace(fmt.Sprint(item["node"])), nodeFilter) { + out = append(out, item) + } + } + return out +} + +func (s *Server) setArtifactStats(summary map[string]interface{}) { + s.artifactStatsMu.Lock() + defer s.artifactStatsMu.Unlock() + if summary == nil { + s.artifactStats = map[string]interface{}{} + return + } + copySummary := make(map[string]interface{}, len(summary)) + for k, v := range summary { + copySummary[k] = v + } + s.artifactStats = copySummary +} + +func (s *Server) artifactStatsSnapshot() map[string]interface{} { + s.artifactStatsMu.Lock() + defer s.artifactStatsMu.Unlock() + out := make(map[string]interface{}, len(s.artifactStats)) + for k, v := range s.artifactStats { + out[k] = v + } + return out +} + +func (s *Server) nodeArtifactRetentionConfig() cfgpkg.GatewayNodesArtifactsConfig { + cfg := cfgpkg.DefaultConfig() + if strings.TrimSpace(s.configPath) != "" { + if loaded, err := cfgpkg.LoadConfig(s.configPath); err == nil && loaded != nil { + cfg = loaded + } + } + return cfg.Gateway.Nodes.Artifacts +} + +func (s *Server) applyNodeArtifactRetention() map[string]interface{} { + retention := s.nodeArtifactRetentionConfig() + if !retention.Enabled || !retention.PruneOnRead || retention.KeepLatest <= 0 { + summary := map[string]interface{}{ + "enabled": retention.Enabled, + "keep_latest": retention.KeepLatest, + "retain_days": retention.RetainDays, + "prune_on_read": retention.PruneOnRead, + "pruned": 0, + "last_run_at": time.Now().UTC().Format(time.RFC3339), + } + s.setArtifactStats(summary) + return summary + } + items := s.webUINodeArtifactsPayload(0) + cutoff := time.Time{} + if retention.RetainDays > 0 { + cutoff = time.Now().UTC().Add(-time.Duration(retention.RetainDays) * 24 * time.Hour) + } + pruned := 0 + prunedByAge := 0 + prunedByCount := 0 + for index, item := range items { + drop := false + dropByAge := false + if !cutoff.IsZero() { + if tm, err := time.Parse(time.RFC3339, strings.TrimSpace(fmt.Sprint(item["time"]))); err == nil && tm.Before(cutoff) { + drop = true + dropByAge = true + } + } + if !drop && index >= retention.KeepLatest { + drop = true + } + if !drop { + continue + } + _, deletedAudit, _ := s.deleteNodeArtifact(strings.TrimSpace(fmt.Sprint(item["id"]))) + if deletedAudit { + pruned++ + if dropByAge { + prunedByAge++ + } else { + prunedByCount++ + } + } + } + summary := map[string]interface{}{ + "enabled": true, + "keep_latest": retention.KeepLatest, + "retain_days": retention.RetainDays, + "prune_on_read": retention.PruneOnRead, + "pruned": pruned, + "pruned_by_age": prunedByAge, + "pruned_by_count": prunedByCount, + "remaining": len(s.webUINodeArtifactsPayload(0)), + "last_run_at": time.Now().UTC().Format(time.RFC3339), + } + s.setArtifactStats(summary) + return summary +} + +func (s *Server) deleteNodeArtifact(id string) (bool, bool, error) { + id = strings.TrimSpace(id) + if id == "" { + return false, false, fmt.Errorf("id is required") + } + rows, auditPath := s.readNodeDispatchAuditRows() + if len(rows) == 0 || auditPath == "" { + return false, false, fmt.Errorf("artifact audit is empty") + } + deletedFile := false + deletedAudit := false + for rowIndex, row := range rows { + artifacts, _ := row["artifacts"].([]interface{}) + if len(artifacts) == 0 { + continue + } + nextArtifacts := make([]interface{}, 0, len(artifacts)) + for artifactIndex, raw := range artifacts { + artifact, ok := raw.(map[string]interface{}) + if !ok { + nextArtifacts = append(nextArtifacts, raw) + continue + } + if buildNodeArtifactID(row, artifact, artifactIndex) != id { + nextArtifacts = append(nextArtifacts, artifact) + continue + } + for _, rawPath := range []string{fmt.Sprint(artifact["source_path"]), fmt.Sprint(artifact["path"])} { + if path := resolveArtifactPath(s.workspacePath, rawPath); path != "" { + if err := os.Remove(path); err == nil { + deletedFile = true + break + } + } + } + deletedAudit = true + } + if deletedAudit { + row["artifacts"] = nextArtifacts + row["artifact_count"] = len(nextArtifacts) + kinds := make([]string, 0, len(nextArtifacts)) + for _, raw := range nextArtifacts { + if artifact, ok := raw.(map[string]interface{}); ok { + if kind := strings.TrimSpace(fmt.Sprint(artifact["kind"])); kind != "" { + kinds = append(kinds, kind) + } + } + } + if len(kinds) > 0 { + row["artifact_kinds"] = kinds + } else { + delete(row, "artifact_kinds") + } + rows[rowIndex] = row + break + } + } + if !deletedAudit { + return false, false, fmt.Errorf("artifact not found") + } + var buf bytes.Buffer + for _, row := range rows { + encoded, err := json.Marshal(row) + if err != nil { + continue + } + buf.Write(encoded) + buf.WriteByte('\n') + } + if err := os.WriteFile(auditPath, buf.Bytes(), 0644); err != nil { + return deletedFile, false, err + } + return deletedFile, true, nil +} + func (s *Server) webUISessionsPayload() map[string]interface{} { sessionsDir := filepath.Join(filepath.Dir(s.workspacePath), "agents", "main", "sessions") _ = os.MkdirAll(sessionsDir, 0755) @@ -1607,6 +2104,325 @@ func (s *Server) handleWebUINodeDispatches(w http.ResponseWriter, r *http.Reques }) } +func (s *Server) handleWebUINodeDispatchReplay(w http.ResponseWriter, r *http.Request) { + if !s.checkAuth(r) { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + if s.onNodeDispatch == nil { + http.Error(w, "node dispatch handler not configured", http.StatusServiceUnavailable) + return + } + var body struct { + Node string `json:"node"` + Action string `json:"action"` + Mode string `json:"mode"` + Task string `json:"task"` + Model string `json:"model"` + Args map[string]interface{} `json:"args"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + http.Error(w, "invalid json", http.StatusBadRequest) + return + } + req := nodes.Request{ + Node: strings.TrimSpace(body.Node), + Action: strings.TrimSpace(body.Action), + Task: body.Task, + Model: body.Model, + Args: body.Args, + } + if req.Node == "" || req.Action == "" { + http.Error(w, "node and action are required", http.StatusBadRequest) + return + } + resp, err := s.onNodeDispatch(r.Context(), req, strings.TrimSpace(body.Mode)) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + _ = json.NewEncoder(w).Encode(map[string]interface{}{ + "ok": true, + "result": resp, + }) +} + +func (s *Server) handleWebUINodeArtifacts(w http.ResponseWriter, r *http.Request) { + if !s.checkAuth(r) { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + limit := 200 + if raw := strings.TrimSpace(r.URL.Query().Get("limit")); raw != "" { + if n, err := strconv.Atoi(raw); err == nil && n > 0 { + if n > 1000 { + n = 1000 + } + limit = n + } + } + retentionSummary := s.applyNodeArtifactRetention() + nodeFilter := strings.TrimSpace(r.URL.Query().Get("node")) + actionFilter := strings.TrimSpace(r.URL.Query().Get("action")) + kindFilter := strings.TrimSpace(r.URL.Query().Get("kind")) + _ = json.NewEncoder(w).Encode(map[string]interface{}{ + "ok": true, + "items": s.webUINodeArtifactsPayloadFiltered(nodeFilter, actionFilter, kindFilter, limit), + "artifact_retention": retentionSummary, + }) +} + +func (s *Server) handleWebUINodeArtifactsExport(w http.ResponseWriter, r *http.Request) { + if !s.checkAuth(r) { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + retentionSummary := s.applyNodeArtifactRetention() + limit := 200 + if raw := strings.TrimSpace(r.URL.Query().Get("limit")); raw != "" { + if n, err := strconv.Atoi(raw); err == nil && n > 0 { + if n > 1000 { + n = 1000 + } + limit = n + } + } + nodeFilter := strings.TrimSpace(r.URL.Query().Get("node")) + actionFilter := strings.TrimSpace(r.URL.Query().Get("action")) + kindFilter := strings.TrimSpace(r.URL.Query().Get("kind")) + artifacts := s.webUINodeArtifactsPayloadFiltered(nodeFilter, actionFilter, kindFilter, limit) + dispatches := s.filteredNodeDispatches(nodeFilter, actionFilter, limit) + payload := s.webUINodesPayload(r.Context()) + nodeList, _ := payload["nodes"].([]nodes.NodeInfo) + p2p, _ := payload["p2p"].(map[string]interface{}) + alerts := filteredNodeAlerts(s.webUINodeAlertsPayload(nodeList, p2p, dispatches), nodeFilter) + + var archive bytes.Buffer + zw := zip.NewWriter(&archive) + writeJSON := func(name string, value interface{}) error { + entry, err := zw.Create(name) + if err != nil { + return err + } + enc := json.NewEncoder(entry) + enc.SetIndent("", " ") + return enc.Encode(value) + } + manifest := map[string]interface{}{ + "generated_at": time.Now().UTC().Format(time.RFC3339), + "filters": map[string]interface{}{ + "node": nodeFilter, + "action": actionFilter, + "kind": kindFilter, + "limit": limit, + }, + "artifact_count": len(artifacts), + "dispatch_count": len(dispatches), + "alert_count": len(alerts), + "retention": retentionSummary, + } + if err := writeJSON("manifest.json", manifest); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if err := writeJSON("dispatches.json", dispatches); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if err := writeJSON("alerts.json", alerts); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if err := writeJSON("artifacts.json", artifacts); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + for _, item := range artifacts { + name := sanitizeZipEntryName(firstNonEmptyString( + fmt.Sprint(item["name"]), + fmt.Sprint(item["source_path"]), + fmt.Sprint(item["path"]), + fmt.Sprintf("%s.bin", fmt.Sprint(item["id"])), + )) + raw, _, err := readArtifactBytes(s.workspacePath, item) + entryName := filepath.ToSlash(filepath.Join("files", fmt.Sprintf("%s-%s", fmt.Sprint(item["id"]), name))) + if err != nil || len(raw) == 0 { + entryName = filepath.ToSlash(filepath.Join("files", fmt.Sprintf("%s-metadata.json", fmt.Sprint(item["id"])))) + raw, err = json.MarshalIndent(item, "", " ") + if err != nil { + continue + } + } + entry, err := zw.Create(entryName) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if _, err := entry.Write(raw); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + if err := zw.Close(); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + filename := "node-artifacts-export.zip" + if nodeFilter != "" { + filename = fmt.Sprintf("node-artifacts-%s.zip", sanitizeZipEntryName(nodeFilter)) + } + w.Header().Set("Content-Type", "application/zip") + w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", filename)) + w.WriteHeader(http.StatusOK) + _, _ = w.Write(archive.Bytes()) +} + +func (s *Server) handleWebUINodeArtifactDownload(w http.ResponseWriter, r *http.Request) { + if !s.checkAuth(r) { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + id := strings.TrimSpace(r.URL.Query().Get("id")) + if id == "" { + http.Error(w, "id is required", http.StatusBadRequest) + return + } + item, ok := s.findNodeArtifactByID(id) + if !ok { + http.Error(w, "artifact not found", http.StatusNotFound) + return + } + name := strings.TrimSpace(fmt.Sprint(item["name"])) + if name == "" { + name = "artifact" + } + mimeType := strings.TrimSpace(fmt.Sprint(item["mime_type"])) + if mimeType == "" { + mimeType = "application/octet-stream" + } + if contentB64 := strings.TrimSpace(fmt.Sprint(item["content_base64"])); contentB64 != "" { + payload, err := base64.StdEncoding.DecodeString(contentB64) + if err != nil { + http.Error(w, "invalid inline artifact payload", http.StatusBadRequest) + return + } + w.Header().Set("Content-Type", mimeType) + w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", name)) + _, _ = w.Write(payload) + return + } + for _, rawPath := range []string{fmt.Sprint(item["source_path"]), fmt.Sprint(item["path"])} { + if path := resolveArtifactPath(s.workspacePath, rawPath); path != "" { + http.ServeFile(w, r, path) + return + } + } + if contentText := fmt.Sprint(item["content_text"]); strings.TrimSpace(contentText) != "" { + w.Header().Set("Content-Type", mimeType) + w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", name)) + _, _ = w.Write([]byte(contentText)) + return + } + http.Error(w, "artifact content unavailable", http.StatusNotFound) +} + +func (s *Server) handleWebUINodeArtifactDelete(w http.ResponseWriter, r *http.Request) { + if !s.checkAuth(r) { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + var body struct { + ID string `json:"id"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + http.Error(w, "invalid json", http.StatusBadRequest) + return + } + deletedFile, deletedAudit, err := s.deleteNodeArtifact(strings.TrimSpace(body.ID)) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + _ = json.NewEncoder(w).Encode(map[string]interface{}{ + "ok": true, + "id": strings.TrimSpace(body.ID), + "deleted_file": deletedFile, + "deleted_audit": deletedAudit, + }) +} + +func (s *Server) handleWebUINodeArtifactPrune(w http.ResponseWriter, r *http.Request) { + if !s.checkAuth(r) { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + var body struct { + Node string `json:"node"` + Action string `json:"action"` + Kind string `json:"kind"` + KeepLatest int `json:"keep_latest"` + Limit int `json:"limit"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + http.Error(w, "invalid json", http.StatusBadRequest) + return + } + limit := body.Limit + if limit <= 0 || limit > 5000 { + limit = 5000 + } + keepLatest := body.KeepLatest + if keepLatest < 0 { + keepLatest = 0 + } + items := s.webUINodeArtifactsPayloadFiltered(strings.TrimSpace(body.Node), strings.TrimSpace(body.Action), strings.TrimSpace(body.Kind), limit) + pruned := 0 + deletedFiles := 0 + for index, item := range items { + if index < keepLatest { + continue + } + deletedFile, deletedAudit, err := s.deleteNodeArtifact(strings.TrimSpace(fmt.Sprint(item["id"]))) + if err != nil || !deletedAudit { + continue + } + pruned++ + if deletedFile { + deletedFiles++ + } + } + _ = json.NewEncoder(w).Encode(map[string]interface{}{ + "ok": true, + "pruned": pruned, + "deleted_files": deletedFiles, + "kept": keepLatest, + }) +} + func (s *Server) buildNodeAgentTrees(ctx context.Context, nodeList []nodes.NodeInfo) []map[string]interface{} { trees := make([]map[string]interface{}, 0, len(nodeList)) localRegistry := s.fetchRegistryItems(ctx) diff --git a/pkg/api/server_test.go b/pkg/api/server_test.go index e01f80a..6bbfa62 100644 --- a/pkg/api/server_test.go +++ b/pkg/api/server_test.go @@ -1,10 +1,12 @@ package api import ( + "archive/zip" "bytes" "context" "encoding/json" "fmt" + "io" "net/http" "net/http/httptest" "os" @@ -432,7 +434,10 @@ func TestHandleWebUILogsLive(t *testing.T) { func TestHandleWebUINodesIncludesP2PSummary(t *testing.T) { t.Parallel() - srv := NewServer("127.0.0.1", 0, "", nodes.NewManager()) + mgr := nodes.NewManager() + mgr.Upsert(nodes.NodeInfo{ID: "edge-b", Name: "Edge B"}) + mgr.MarkOffline("edge-b") + srv := NewServer("127.0.0.1", 0, "", mgr) workspace := t.TempDir() srv.SetWorkspacePath(workspace) if err := os.MkdirAll(filepath.Join(workspace, "memory"), 0755); err != nil { @@ -446,6 +451,9 @@ func TestHandleWebUINodesIncludesP2PSummary(t *testing.T) { "enabled": true, "transport": "webrtc", "active_sessions": 2, + "nodes": []map[string]interface{}{ + {"node": "edge-b", "status": "connecting", "retry_count": 3, "last_error": "signal timeout"}, + }, } }) @@ -463,6 +471,10 @@ func TestHandleWebUINodesIncludesP2PSummary(t *testing.T) { if p2p == nil || p2p["transport"] != "webrtc" { t.Fatalf("expected p2p summary, got %+v", body) } + alerts, _ := body["alerts"].([]interface{}) + if len(alerts) == 0 { + t.Fatalf("expected node alerts, got %+v", body) + } dispatches, _ := body["dispatches"].([]interface{}) if len(dispatches) != 1 { t.Fatalf("expected dispatch audit rows, got %+v", body["dispatches"]) @@ -473,3 +485,273 @@ func TestHandleWebUINodesIncludesP2PSummary(t *testing.T) { t.Fatalf("expected artifact previews in dispatch row, got %+v", first) } } + +func TestHandleWebUINodeDispatchReplay(t *testing.T) { + t.Parallel() + + srv := NewServer("127.0.0.1", 0, "", nodes.NewManager()) + srv.SetNodeDispatchHandler(func(ctx context.Context, req nodes.Request, mode string) (nodes.Response, error) { + if req.Node != "edge-a" || req.Action != "screen_snapshot" || mode != "auto" { + t.Fatalf("unexpected replay request: %+v mode=%s", req, mode) + } + if fmt.Sprint(req.Args["quality"]) != "high" { + t.Fatalf("unexpected args: %+v", req.Args) + } + return nodes.Response{ + OK: true, + Node: req.Node, + Action: req.Action, + Payload: map[string]interface{}{ + "used_transport": "webrtc", + }, + }, nil + }) + + body := `{"node":"edge-a","action":"screen_snapshot","mode":"auto","args":{"quality":"high"}}` + req := httptest.NewRequest(http.MethodPost, "/webui/api/node_dispatches/replay", strings.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + + srv.handleWebUINodeDispatchReplay(rec, req) + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", rec.Code, rec.Body.String()) + } + if !strings.Contains(rec.Body.String(), `"used_transport":"webrtc"`) { + t.Fatalf("expected replay result body, got: %s", rec.Body.String()) + } +} + +func TestHandleWebUINodeArtifactsListAndDelete(t *testing.T) { + t.Parallel() + + srv := NewServer("127.0.0.1", 0, "", nodes.NewManager()) + workspace := t.TempDir() + srv.SetWorkspacePath(workspace) + if err := os.MkdirAll(filepath.Join(workspace, "memory"), 0o755); err != nil { + t.Fatalf("mkdir memory: %v", err) + } + artifactPath := filepath.Join(workspace, "artifact.txt") + if err := os.WriteFile(artifactPath, []byte("artifact-body"), 0o644); err != nil { + t.Fatalf("write artifact: %v", err) + } + auditLine := fmt.Sprintf("{\"time\":\"2026-03-09T00:00:00Z\",\"node\":\"edge-a\",\"action\":\"run\",\"artifacts\":[{\"name\":\"artifact.txt\",\"kind\":\"text\",\"mime_type\":\"text/plain\",\"source_path\":\"%s\",\"size_bytes\":13}]}\n", artifactPath) + if err := os.WriteFile(filepath.Join(workspace, "memory", "nodes-dispatch-audit.jsonl"), []byte(auditLine), 0o644); err != nil { + t.Fatalf("write audit: %v", err) + } + + listReq := httptest.NewRequest(http.MethodGet, "/webui/api/node_artifacts", nil) + listRec := httptest.NewRecorder() + srv.handleWebUINodeArtifacts(listRec, listReq) + if listRec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", listRec.Code) + } + var listBody map[string]interface{} + if err := json.Unmarshal(listRec.Body.Bytes(), &listBody); err != nil { + t.Fatalf("decode list body: %v", err) + } + items, _ := listBody["items"].([]interface{}) + if len(items) != 1 { + t.Fatalf("expected 1 artifact, got %+v", listBody) + } + item, _ := items[0].(map[string]interface{}) + artifactID := strings.TrimSpace(fmt.Sprint(item["id"])) + if artifactID == "" { + t.Fatalf("expected artifact id, got %+v", item) + } + + deleteReq := httptest.NewRequest(http.MethodPost, "/webui/api/node_artifacts/delete", strings.NewReader(fmt.Sprintf(`{"id":"%s"}`, artifactID))) + deleteReq.Header.Set("Content-Type", "application/json") + deleteRec := httptest.NewRecorder() + srv.handleWebUINodeArtifactDelete(deleteRec, deleteReq) + if deleteRec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", deleteRec.Code, deleteRec.Body.String()) + } + if _, err := os.Stat(artifactPath); !os.IsNotExist(err) { + t.Fatalf("expected artifact file removed, stat err=%v", err) + } +} + +func TestHandleWebUINodeArtifactsExport(t *testing.T) { + t.Parallel() + + srv := NewServer("127.0.0.1", 0, "", nodes.NewManager()) + workspace := t.TempDir() + srv.SetWorkspacePath(workspace) + if err := os.MkdirAll(filepath.Join(workspace, "memory"), 0o755); err != nil { + t.Fatalf("mkdir memory: %v", err) + } + auditLine := "{\"time\":\"2026-03-09T00:00:00Z\",\"node\":\"edge-a\",\"action\":\"screen_snapshot\",\"ok\":true,\"artifacts\":[{\"name\":\"shot.txt\",\"kind\":\"text\",\"mime_type\":\"text/plain\",\"content_base64\":\"Y2FwdHVyZQ==\",\"size_bytes\":7}]}\n" + if err := os.WriteFile(filepath.Join(workspace, "memory", "nodes-dispatch-audit.jsonl"), []byte(auditLine), 0o644); err != nil { + t.Fatalf("write audit: %v", err) + } + srv.mgr.Upsert(nodes.NodeInfo{ID: "edge-a", Name: "Edge A", Online: true}) + + req := httptest.NewRequest(http.MethodGet, "/webui/api/node_artifacts/export?node=edge-a&action=screen_snapshot&kind=text", nil) + rec := httptest.NewRecorder() + srv.handleWebUINodeArtifactsExport(rec, req) + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", rec.Code, rec.Body.String()) + } + if got := rec.Header().Get("Content-Type"); !strings.Contains(got, "application/zip") { + t.Fatalf("expected zip response, got %q", got) + } + zr, err := zip.NewReader(bytes.NewReader(rec.Body.Bytes()), int64(rec.Body.Len())) + if err != nil { + t.Fatalf("open zip: %v", err) + } + seen := map[string]bool{} + for _, file := range zr.File { + seen[file.Name] = true + } + for _, required := range []string{"manifest.json", "dispatches.json", "alerts.json", "artifacts.json"} { + if !seen[required] { + t.Fatalf("missing zip entry %q in %+v", required, seen) + } + } + foundFile := false + for _, file := range zr.File { + if !strings.HasPrefix(file.Name, "files/") { + continue + } + foundFile = true + rc, err := file.Open() + if err != nil { + t.Fatalf("open artifact file: %v", err) + } + body, _ := io.ReadAll(rc) + _ = rc.Close() + if string(body) != "capture" { + t.Fatalf("unexpected artifact payload %q", string(body)) + } + } + if !foundFile { + t.Fatalf("expected exported artifact file in zip") + } +} + +func TestHandleWebUINodeArtifactsPrune(t *testing.T) { + t.Parallel() + + srv := NewServer("127.0.0.1", 0, "", nodes.NewManager()) + workspace := t.TempDir() + srv.SetWorkspacePath(workspace) + if err := os.MkdirAll(filepath.Join(workspace, "memory"), 0o755); err != nil { + t.Fatalf("mkdir memory: %v", err) + } + auditLines := strings.Join([]string{ + "{\"time\":\"2026-03-09T00:00:00Z\",\"node\":\"edge-a\",\"action\":\"screen_snapshot\",\"ok\":true,\"artifacts\":[{\"name\":\"one.txt\",\"kind\":\"text\",\"mime_type\":\"text/plain\",\"content_base64\":\"b25l\"}]}", + "{\"time\":\"2026-03-09T00:01:00Z\",\"node\":\"edge-a\",\"action\":\"screen_snapshot\",\"ok\":true,\"artifacts\":[{\"name\":\"two.txt\",\"kind\":\"text\",\"mime_type\":\"text/plain\",\"content_base64\":\"dHdv\"}]}", + "{\"time\":\"2026-03-09T00:02:00Z\",\"node\":\"edge-a\",\"action\":\"screen_snapshot\",\"ok\":true,\"artifacts\":[{\"name\":\"three.txt\",\"kind\":\"text\",\"mime_type\":\"text/plain\",\"content_base64\":\"dGhyZWU=\"}]}", + }, "\n") + "\n" + if err := os.WriteFile(filepath.Join(workspace, "memory", "nodes-dispatch-audit.jsonl"), []byte(auditLines), 0o644); err != nil { + t.Fatalf("write audit: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/webui/api/node_artifacts/prune", strings.NewReader(`{"node":"edge-a","action":"screen_snapshot","kind":"text","keep_latest":1}`)) + req.Header.Set("Content-Type", "application/json") + rec := httptest.NewRecorder() + srv.handleWebUINodeArtifactPrune(rec, req) + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", rec.Code, rec.Body.String()) + } + + items := srv.webUINodeArtifactsPayloadFiltered("edge-a", "screen_snapshot", "text", 10) + if len(items) != 1 { + t.Fatalf("expected 1 remaining artifact, got %d", len(items)) + } + if got := fmt.Sprint(items[0]["name"]); got != "three.txt" { + t.Fatalf("expected newest artifact to remain, got %q", got) + } +} + +func TestHandleWebUINodeArtifactsAppliesRetentionConfig(t *testing.T) { + t.Parallel() + + srv := NewServer("127.0.0.1", 0, "", nodes.NewManager()) + workspace := t.TempDir() + srv.SetWorkspacePath(workspace) + if err := os.MkdirAll(filepath.Join(workspace, "memory"), 0o755); err != nil { + t.Fatalf("mkdir memory: %v", err) + } + cfg := cfgpkg.DefaultConfig() + cfg.Gateway.Nodes.Artifacts.Enabled = true + cfg.Gateway.Nodes.Artifacts.KeepLatest = 1 + cfg.Gateway.Nodes.Artifacts.PruneOnRead = true + cfgPath := filepath.Join(workspace, "config.json") + if err := cfgpkg.SaveConfig(cfgPath, cfg); err != nil { + t.Fatalf("save config: %v", err) + } + srv.SetConfigPath(cfgPath) + auditLines := strings.Join([]string{ + "{\"time\":\"2026-03-09T00:00:00Z\",\"node\":\"edge-a\",\"action\":\"screen_snapshot\",\"ok\":true,\"artifacts\":[{\"name\":\"one.txt\",\"kind\":\"text\",\"mime_type\":\"text/plain\",\"content_base64\":\"b25l\"}]}", + "{\"time\":\"2026-03-09T00:01:00Z\",\"node\":\"edge-a\",\"action\":\"screen_snapshot\",\"ok\":true,\"artifacts\":[{\"name\":\"two.txt\",\"kind\":\"text\",\"mime_type\":\"text/plain\",\"content_base64\":\"dHdv\"}]}", + }, "\n") + "\n" + if err := os.WriteFile(filepath.Join(workspace, "memory", "nodes-dispatch-audit.jsonl"), []byte(auditLines), 0o644); err != nil { + t.Fatalf("write audit: %v", err) + } + + req := httptest.NewRequest(http.MethodGet, "/webui/api/node_artifacts", nil) + rec := httptest.NewRecorder() + srv.handleWebUINodeArtifacts(rec, req) + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", rec.Code, rec.Body.String()) + } + items := srv.webUINodeArtifactsPayload(10) + if len(items) != 1 { + t.Fatalf("expected retention to keep 1 artifact, got %d", len(items)) + } + if got := fmt.Sprint(items[0]["name"]); got != "two.txt" { + t.Fatalf("expected newest artifact to remain, got %q", got) + } + stats := srv.artifactStatsSnapshot() + if fmt.Sprint(stats["pruned"]) == "" || fmt.Sprint(stats["pruned"]) == "0" { + t.Fatalf("expected retention stats to record pruned artifacts, got %+v", stats) + } + if fmt.Sprint(stats["keep_latest"]) != "1" { + t.Fatalf("expected keep_latest in stats, got %+v", stats) + } +} + +func TestHandleWebUINodeArtifactsAppliesRetentionDays(t *testing.T) { + t.Parallel() + + srv := NewServer("127.0.0.1", 0, "", nodes.NewManager()) + workspace := t.TempDir() + srv.SetWorkspacePath(workspace) + if err := os.MkdirAll(filepath.Join(workspace, "memory"), 0o755); err != nil { + t.Fatalf("mkdir memory: %v", err) + } + cfg := cfgpkg.DefaultConfig() + cfg.Gateway.Nodes.Artifacts.Enabled = true + cfg.Gateway.Nodes.Artifacts.KeepLatest = 10 + cfg.Gateway.Nodes.Artifacts.RetainDays = 1 + cfg.Gateway.Nodes.Artifacts.PruneOnRead = true + cfgPath := filepath.Join(workspace, "config.json") + if err := cfgpkg.SaveConfig(cfgPath, cfg); err != nil { + t.Fatalf("save config: %v", err) + } + srv.SetConfigPath(cfgPath) + oldTime := time.Now().UTC().Add(-48 * time.Hour).Format(time.RFC3339) + newTime := time.Now().UTC().Add(-2 * time.Hour).Format(time.RFC3339) + auditLines := strings.Join([]string{ + fmt.Sprintf("{\"time\":%q,\"node\":\"edge-a\",\"action\":\"screen_snapshot\",\"ok\":true,\"artifacts\":[{\"name\":\"old.txt\",\"kind\":\"text\",\"mime_type\":\"text/plain\",\"content_base64\":\"b2xk\"}]}", oldTime), + fmt.Sprintf("{\"time\":%q,\"node\":\"edge-a\",\"action\":\"screen_snapshot\",\"ok\":true,\"artifacts\":[{\"name\":\"fresh.txt\",\"kind\":\"text\",\"mime_type\":\"text/plain\",\"content_base64\":\"ZnJlc2g=\"}]}", newTime), + }, "\n") + "\n" + if err := os.WriteFile(filepath.Join(workspace, "memory", "nodes-dispatch-audit.jsonl"), []byte(auditLines), 0o644); err != nil { + t.Fatalf("write audit: %v", err) + } + + req := httptest.NewRequest(http.MethodGet, "/webui/api/node_artifacts", nil) + rec := httptest.NewRecorder() + srv.handleWebUINodeArtifacts(rec, req) + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d: %s", rec.Code, rec.Body.String()) + } + items := srv.webUINodeArtifactsPayload(10) + if len(items) != 1 { + t.Fatalf("expected retention days to keep 1 artifact, got %d", len(items)) + } + if got := fmt.Sprint(items[0]["name"]); got != "fresh.txt" { + t.Fatalf("expected fresh artifact to remain, got %q", got) + } +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 1d17ff4..0fc50a7 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -295,7 +295,9 @@ type GatewayConfig struct { } type GatewayNodesConfig struct { - P2P GatewayNodesP2PConfig `json:"p2p,omitempty"` + P2P GatewayNodesP2PConfig `json:"p2p,omitempty"` + Dispatch GatewayNodesDispatchConfig `json:"dispatch,omitempty"` + Artifacts GatewayNodesArtifactsConfig `json:"artifacts,omitempty"` } type GatewayICEConfig struct { @@ -311,6 +313,25 @@ type GatewayNodesP2PConfig struct { ICEServers []GatewayICEConfig `json:"ice_servers,omitempty"` } +type GatewayNodesDispatchConfig struct { + PreferLocal bool `json:"prefer_local,omitempty"` + PreferP2P bool `json:"prefer_p2p,omitempty"` + AllowRelayFallback bool `json:"allow_relay_fallback,omitempty"` + ActionTags map[string][]string `json:"action_tags,omitempty"` + AgentTags map[string][]string `json:"agent_tags,omitempty"` + AllowActions map[string][]string `json:"allow_actions,omitempty"` + DenyActions map[string][]string `json:"deny_actions,omitempty"` + AllowAgents map[string][]string `json:"allow_agents,omitempty"` + DenyAgents map[string][]string `json:"deny_agents,omitempty"` +} + +type GatewayNodesArtifactsConfig struct { + Enabled bool `json:"enabled,omitempty"` + KeepLatest int `json:"keep_latest,omitempty"` + RetainDays int `json:"retain_days,omitempty"` + PruneOnRead bool `json:"prune_on_read,omitempty"` +} + type CronConfig struct { MinSleepSec int `json:"min_sleep_sec" env:"CLAWGO_CRON_MIN_SLEEP_SEC"` MaxSleepSec int `json:"max_sleep_sec" env:"CLAWGO_CRON_MAX_SLEEP_SEC"` @@ -559,6 +580,23 @@ func DefaultConfig() *Config { STUNServers: []string{}, ICEServers: []GatewayICEConfig{}, }, + Dispatch: GatewayNodesDispatchConfig{ + PreferLocal: false, + PreferP2P: true, + AllowRelayFallback: true, + ActionTags: map[string][]string{}, + AgentTags: map[string][]string{}, + AllowActions: map[string][]string{}, + DenyActions: map[string][]string{}, + AllowAgents: map[string][]string{}, + DenyAgents: map[string][]string{}, + }, + Artifacts: GatewayNodesArtifactsConfig{ + Enabled: false, + KeepLatest: 500, + RetainDays: 7, + PruneOnRead: true, + }, }, }, Cron: CronConfig{ diff --git a/pkg/config/validate.go b/pkg/config/validate.go index 62987e2..527f289 100644 --- a/pkg/config/validate.go +++ b/pkg/config/validate.go @@ -145,6 +145,21 @@ func Validate(cfg *Config) []error { } } } + errs = append(errs, validateDispatchTagMap("gateway.nodes.dispatch.action_tags", cfg.Gateway.Nodes.Dispatch.ActionTags)...) + errs = append(errs, validateDispatchTagMap("gateway.nodes.dispatch.agent_tags", cfg.Gateway.Nodes.Dispatch.AgentTags)...) + errs = append(errs, validateDispatchTagMap("gateway.nodes.dispatch.allow_actions", cfg.Gateway.Nodes.Dispatch.AllowActions)...) + errs = append(errs, validateDispatchTagMap("gateway.nodes.dispatch.deny_actions", cfg.Gateway.Nodes.Dispatch.DenyActions)...) + errs = append(errs, validateDispatchTagMap("gateway.nodes.dispatch.allow_agents", cfg.Gateway.Nodes.Dispatch.AllowAgents)...) + errs = append(errs, validateDispatchTagMap("gateway.nodes.dispatch.deny_agents", cfg.Gateway.Nodes.Dispatch.DenyAgents)...) + if cfg.Gateway.Nodes.Artifacts.Enabled && cfg.Gateway.Nodes.Artifacts.KeepLatest <= 0 { + errs = append(errs, fmt.Errorf("gateway.nodes.artifacts.keep_latest must be > 0 when enabled=true")) + } + if cfg.Gateway.Nodes.Artifacts.KeepLatest < 0 { + errs = append(errs, fmt.Errorf("gateway.nodes.artifacts.keep_latest must be >= 0")) + } + if cfg.Gateway.Nodes.Artifacts.RetainDays < 0 { + errs = append(errs, fmt.Errorf("gateway.nodes.artifacts.retain_days must be >= 0")) + } if cfg.Cron.MinSleepSec <= 0 { errs = append(errs, fmt.Errorf("cron.min_sleep_sec must be > 0")) } @@ -239,6 +254,21 @@ func Validate(cfg *Config) []error { return errs } +func validateDispatchTagMap(prefix string, mapping map[string][]string) []error { + if len(mapping) == 0 { + return nil + } + errs := make([]error, 0) + for key, tags := range mapping { + if strings.TrimSpace(key) == "" { + errs = append(errs, fmt.Errorf("%s contains empty key", prefix)) + continue + } + errs = append(errs, validateNonEmptyStringList(fmt.Sprintf("%s.%s", prefix, key), tags)...) + } + return errs +} + func validateMCPTools(cfg *Config) []error { var errs []error mcp := cfg.Tools.MCP diff --git a/pkg/config/validate_test.go b/pkg/config/validate_test.go index c07b592..bd8582a 100644 --- a/pkg/config/validate_test.go +++ b/pkg/config/validate_test.go @@ -177,3 +177,70 @@ func TestValidateGatewayNodeP2PIceServersRequireTurnCredentials(t *testing.T) { t.Fatalf("expected validation errors") } } + +func TestValidateGatewayNodeDispatchRejectsEmptyTagKey(t *testing.T) { + t.Parallel() + + cfg := DefaultConfig() + cfg.Gateway.Nodes.Dispatch.ActionTags = map[string][]string{ + "": {"vision"}, + } + + if errs := Validate(cfg); len(errs) == 0 { + t.Fatalf("expected validation errors") + } +} + +func TestValidateGatewayNodeDispatchRejectsEmptyAllowNodeKey(t *testing.T) { + t.Parallel() + + cfg := DefaultConfig() + cfg.Gateway.Nodes.Dispatch.AllowActions = map[string][]string{ + "": {"screen_snapshot"}, + } + + if errs := Validate(cfg); len(errs) == 0 { + t.Fatalf("expected validation errors") + } +} + +func TestDefaultConfigSetsNodeArtifactRetentionDefaults(t *testing.T) { + t.Parallel() + + cfg := DefaultConfig() + if cfg.Gateway.Nodes.Artifacts.Enabled { + t.Fatalf("expected node artifact retention disabled by default") + } + if cfg.Gateway.Nodes.Artifacts.KeepLatest != 500 { + t.Fatalf("unexpected default keep_latest: %d", cfg.Gateway.Nodes.Artifacts.KeepLatest) + } + if cfg.Gateway.Nodes.Artifacts.RetainDays != 7 { + t.Fatalf("unexpected default retain_days: %d", cfg.Gateway.Nodes.Artifacts.RetainDays) + } + if !cfg.Gateway.Nodes.Artifacts.PruneOnRead { + t.Fatalf("expected prune_on_read enabled by default") + } +} + +func TestValidateNodeArtifactRetentionRequiresPositiveKeepLatestWhenEnabled(t *testing.T) { + t.Parallel() + + cfg := DefaultConfig() + cfg.Gateway.Nodes.Artifacts.Enabled = true + cfg.Gateway.Nodes.Artifacts.KeepLatest = 0 + + if errs := Validate(cfg); len(errs) == 0 { + t.Fatalf("expected validation errors") + } +} + +func TestValidateNodeArtifactRetentionRejectsNegativeRetainDays(t *testing.T) { + t.Parallel() + + cfg := DefaultConfig() + cfg.Gateway.Nodes.Artifacts.RetainDays = -1 + + if errs := Validate(cfg); len(errs) == 0 { + t.Fatalf("expected validation errors") + } +} diff --git a/pkg/nodes/manager.go b/pkg/nodes/manager.go index 64ae250..11ff990 100644 --- a/pkg/nodes/manager.go +++ b/pkg/nodes/manager.go @@ -30,6 +30,19 @@ type Manager struct { ttl time.Duration auditPath string statePath string + policy DispatchPolicy +} + +type DispatchPolicy struct { + PreferLocal bool + PreferP2P bool + AllowRelayFallback bool + ActionTags map[string][]string + AgentTags map[string][]string + AllowActions map[string][]string + DenyActions map[string][]string + AllowAgents map[string][]string + DenyAgents map[string][]string } var defaultManager = NewManager() @@ -43,6 +56,16 @@ func NewManager() *Manager { senders: map[string]WireSender{}, pending: map[string]chan WireMessage{}, ttl: defaultNodeTTL, + policy: DispatchPolicy{ + PreferP2P: true, + AllowRelayFallback: true, + ActionTags: map[string][]string{}, + AgentTags: map[string][]string{}, + AllowActions: map[string][]string{}, + DenyActions: map[string][]string{}, + AllowAgents: map[string][]string{}, + DenyAgents: map[string][]string{}, + }, } go m.reaperLoop() return m @@ -61,6 +84,18 @@ func (m *Manager) SetStatePath(path string) { m.loadState() } +func (m *Manager) SetDispatchPolicy(policy DispatchPolicy) { + m.mu.Lock() + defer m.mu.Unlock() + m.policy = normalizeDispatchPolicy(policy) +} + +func (m *Manager) DispatchPolicy() DispatchPolicy { + m.mu.RLock() + defer m.mu.RUnlock() + return cloneDispatchPolicy(m.policy) +} + func (m *Manager) Upsert(info NodeInfo) { m.mu.Lock() now := time.Now().UTC() @@ -71,6 +106,9 @@ func (m *Manager) Upsert(info NodeInfo) { if info.RegisteredAt.IsZero() { info.RegisteredAt = old.RegisteredAt } + if len(info.Tags) == 0 && len(old.Tags) > 0 { + info.Tags = append([]string(nil), old.Tags...) + } if strings.TrimSpace(info.Endpoint) == "" { info.Endpoint = old.Endpoint } @@ -303,8 +341,9 @@ func (m *Manager) PickRequest(req Request, mode string) (NodeInfo, bool) { defer m.mu.RUnlock() bestScore := -1 bestNode := NodeInfo{} + policy := normalizeDispatchPolicy(m.policy) for _, n := range m.nodes { - score, ok := scoreNodeCandidate(n, req, mode, m.senders[strings.TrimSpace(n.ID)] != nil) + score, ok := scoreNodeCandidate(n, req, mode, m.senders[strings.TrimSpace(n.ID)] != nil, policy) if !ok { continue } @@ -319,23 +358,39 @@ func (m *Manager) PickRequest(req Request, mode string) (NodeInfo, bool) { return bestNode, true } -func scoreNodeCandidate(n NodeInfo, req Request, mode string, hasWireSender bool) (int, bool) { +func scoreNodeCandidate(n NodeInfo, req Request, mode string, hasWireSender bool, policy DispatchPolicy) (int, bool) { if !n.Online { return 0, false } if !nodeSupportsRequest(n, req) { return 0, false } + if !matchesDispatchPolicy(n, req, policy) { + return 0, false + } mode = strings.ToLower(strings.TrimSpace(mode)) if mode == "p2p" && !hasWireSender { return 0, false } + if !policy.AllowRelayFallback && strings.TrimSpace(n.ID) != "local" && !hasWireSender { + return 0, false + } score := 100 if hasWireSender { score += 30 } + if policy.PreferP2P { + if hasWireSender { + score += 35 + } else { + score -= 10 + } + } + if policy.PreferLocal && strings.EqualFold(strings.TrimSpace(n.ID), "local") { + score += 60 + } if prefersRealtimeTransport(req.Action) && hasWireSender { score += 40 } @@ -370,6 +425,151 @@ func scoreNodeCandidate(n NodeInfo, req Request, mode string, hasWireSender bool return score, true } +func normalizeDispatchPolicy(policy DispatchPolicy) DispatchPolicy { + normalized := DispatchPolicy{ + PreferLocal: policy.PreferLocal, + PreferP2P: policy.PreferP2P, + AllowRelayFallback: policy.AllowRelayFallback, + ActionTags: map[string][]string{}, + AgentTags: map[string][]string{}, + AllowActions: map[string][]string{}, + DenyActions: map[string][]string{}, + AllowAgents: map[string][]string{}, + DenyAgents: map[string][]string{}, + } + for key, tags := range policy.ActionTags { + trimmed := normalizeStringList(tags) + if len(trimmed) > 0 { + normalized.ActionTags[strings.ToLower(strings.TrimSpace(key))] = trimmed + } + } + for key, tags := range policy.AgentTags { + trimmed := normalizeStringList(tags) + if len(trimmed) > 0 { + normalized.AgentTags[strings.ToLower(strings.TrimSpace(key))] = trimmed + } + } + for key, tags := range policy.AllowActions { + trimmed := normalizeStringList(tags) + if len(trimmed) > 0 { + normalized.AllowActions[strings.ToLower(strings.TrimSpace(key))] = trimmed + } + } + for key, tags := range policy.DenyActions { + trimmed := normalizeStringList(tags) + if len(trimmed) > 0 { + normalized.DenyActions[strings.ToLower(strings.TrimSpace(key))] = trimmed + } + } + for key, tags := range policy.AllowAgents { + trimmed := normalizeStringList(tags) + if len(trimmed) > 0 { + normalized.AllowAgents[strings.ToLower(strings.TrimSpace(key))] = trimmed + } + } + for key, tags := range policy.DenyAgents { + trimmed := normalizeStringList(tags) + if len(trimmed) > 0 { + normalized.DenyAgents[strings.ToLower(strings.TrimSpace(key))] = trimmed + } + } + return normalized +} + +func cloneDispatchPolicy(policy DispatchPolicy) DispatchPolicy { + return normalizeDispatchPolicy(policy) +} + +func normalizeStringList(values []string) []string { + if len(values) == 0 { + return nil + } + seen := map[string]struct{}{} + out := make([]string, 0, len(values)) + for _, raw := range values { + trimmed := strings.ToLower(strings.TrimSpace(raw)) + if trimmed == "" { + continue + } + if _, ok := seen[trimmed]; ok { + continue + } + seen[trimmed] = struct{}{} + out = append(out, trimmed) + } + return out +} + +func matchesDispatchPolicy(n NodeInfo, req Request, policy DispatchPolicy) bool { + if !isNodePermittedByPolicy(n, req, policy) { + return false + } + if tags := policy.ActionTags[strings.ToLower(strings.TrimSpace(req.Action))]; len(tags) > 0 && !nodeMatchesAnyTag(n, tags) { + return false + } + remoteAgentID := requestedRemoteAgentID(req.Args) + if remoteAgentID != "" { + if tags := policy.AgentTags[remoteAgentID]; len(tags) > 0 && !nodeMatchesAnyTag(n, tags) { + return false + } + } + return true +} + +func isNodePermittedByPolicy(n NodeInfo, req Request, policy DispatchPolicy) bool { + nodeID := strings.ToLower(strings.TrimSpace(n.ID)) + action := strings.ToLower(strings.TrimSpace(req.Action)) + remoteAgentID := requestedRemoteAgentID(req.Args) + if remoteAgentID == "" && action == "agent_task" { + remoteAgentID = "main" + } + if deny := policy.DenyActions[nodeID]; len(deny) > 0 && containsNormalized(deny, action) { + return false + } + if allow := policy.AllowActions[nodeID]; len(allow) > 0 && !containsNormalized(allow, action) { + return false + } + if remoteAgentID != "" { + if deny := policy.DenyAgents[nodeID]; len(deny) > 0 && containsNormalized(deny, remoteAgentID) { + return false + } + if allow := policy.AllowAgents[nodeID]; len(allow) > 0 && !containsNormalized(allow, remoteAgentID) { + return false + } + } + return true +} + +func containsNormalized(items []string, target string) bool { + target = strings.ToLower(strings.TrimSpace(target)) + for _, item := range items { + if strings.ToLower(strings.TrimSpace(item)) == target { + return true + } + } + return false +} + +func nodeMatchesAnyTag(n NodeInfo, tags []string) bool { + if len(tags) == 0 { + return true + } + nodeTags := normalizeStringList(n.Tags) + if len(nodeTags) == 0 { + return false + } + seen := map[string]struct{}{} + for _, tag := range nodeTags { + seen[tag] = struct{}{} + } + for _, tag := range tags { + if _, ok := seen[strings.ToLower(strings.TrimSpace(tag))]; ok { + return true + } + } + return false +} + func requestedRemoteAgentID(args map[string]interface{}) string { if len(args) == 0 { return "" diff --git a/pkg/nodes/manager_test.go b/pkg/nodes/manager_test.go index 771ad75..13ce35c 100644 --- a/pkg/nodes/manager_test.go +++ b/pkg/nodes/manager_test.go @@ -74,3 +74,139 @@ func TestPickRequestPrefersRealtimeCapableNodeForScreenActions(t *testing.T) { t.Fatalf("expected p2p-ready, got %+v", picked) } } + +func TestPickRequestHonorsActionTagsPolicy(t *testing.T) { + t.Parallel() + + manager := NewManager() + manager.SetDispatchPolicy(DispatchPolicy{ + PreferP2P: true, + AllowRelayFallback: true, + ActionTags: map[string][]string{ + "screen_snapshot": {"vision"}, + }, + }) + now := time.Now().UTC() + manager.Upsert(NodeInfo{ + ID: "build-node", + Tags: []string{"build"}, + Online: true, + LastSeenAt: now, + Capabilities: Capabilities{ + Screen: true, + }, + Actions: []string{"screen_snapshot"}, + }) + manager.Upsert(NodeInfo{ + ID: "vision-node", + Tags: []string{"vision"}, + Online: true, + LastSeenAt: now, + Capabilities: Capabilities{ + Screen: true, + }, + Actions: []string{"screen_snapshot"}, + }) + + picked, ok := manager.PickRequest(Request{Action: "screen_snapshot"}, "auto") + if !ok { + t.Fatalf("expected node pick") + } + if picked.ID != "vision-node" { + t.Fatalf("expected vision-node, got %+v", picked) + } +} + +func TestPickRequestHonorsPreferLocalPolicy(t *testing.T) { + t.Parallel() + + manager := NewManager() + manager.SetDispatchPolicy(DispatchPolicy{ + PreferLocal: true, + PreferP2P: false, + AllowRelayFallback: true, + }) + now := time.Now().UTC() + manager.Upsert(NodeInfo{ + ID: "local", + Online: true, + LastSeenAt: now.Add(-1 * time.Minute), + Capabilities: Capabilities{ + Run: true, + }, + Actions: []string{"run"}, + }) + manager.Upsert(NodeInfo{ + ID: "remote", + Online: true, + LastSeenAt: now, + Capabilities: Capabilities{ + Run: true, + }, + Actions: []string{"run"}, + }) + + picked, ok := manager.PickRequest(Request{Action: "run"}, "auto") + if !ok { + t.Fatalf("expected node pick") + } + if picked.ID != "local" { + t.Fatalf("expected local, got %+v", picked) + } +} + +func TestPickRequestHonorsNodeAllowActionsPolicy(t *testing.T) { + t.Parallel() + + manager := NewManager() + manager.SetDispatchPolicy(DispatchPolicy{ + AllowRelayFallback: true, + AllowActions: map[string][]string{ + "camera-node": {"camera_snap"}, + }, + }) + now := time.Now().UTC() + manager.Upsert(NodeInfo{ + ID: "camera-node", + Online: true, + LastSeenAt: now, + Capabilities: Capabilities{ + Camera: true, + Screen: true, + }, + Actions: []string{"camera_snap", "screen_snapshot"}, + }) + + if _, ok := manager.PickRequest(Request{Action: "screen_snapshot"}, "auto"); ok { + t.Fatalf("expected screen_snapshot to be blocked by allow_actions") + } + if _, ok := manager.PickRequest(Request{Action: "camera_snap"}, "auto"); !ok { + t.Fatalf("expected camera_snap to remain allowed") + } +} + +func TestPickRequestHonorsNodeDenyAgentsPolicy(t *testing.T) { + t.Parallel() + + manager := NewManager() + manager.SetDispatchPolicy(DispatchPolicy{ + AllowRelayFallback: true, + DenyAgents: map[string][]string{ + "edge-a": {"coder"}, + }, + }) + now := time.Now().UTC() + manager.Upsert(NodeInfo{ + ID: "edge-a", + Online: true, + LastSeenAt: now, + Capabilities: Capabilities{ + Model: true, + }, + Agents: []AgentInfo{{ID: "main"}, {ID: "coder"}}, + }) + + if _, ok := manager.PickRequest(Request{Action: "agent_task", Args: map[string]interface{}{"remote_agent_id": "coder"}}, "auto"); ok { + t.Fatalf("expected coder agent_task to be denied by policy") + } +} diff --git a/pkg/nodes/transport.go b/pkg/nodes/transport.go index daa4bea..3cd6b60 100644 --- a/pkg/nodes/transport.go +++ b/pkg/nodes/transport.go @@ -20,8 +20,9 @@ type Transport interface { // Router prefers p2p transport and falls back to relay. type Router struct { - P2P Transport - Relay Transport + P2P Transport + Relay Transport + Policy DispatchPolicy } func (r *Router) Dispatch(ctx context.Context, req Request, mode string) (Response, error) { @@ -43,14 +44,25 @@ func (r *Router) Dispatch(ctx context.Context, req Request, mode string) (Respon resp, err := r.Relay.Send(ctx, req) return annotateTransport(resp, "relay", r.Relay.Name(), ""), err default: // auto - if r.P2P != nil { + preferP2P := r.Policy.PreferP2P || r.Relay == nil + if preferP2P && r.P2P != nil { if resp, err := r.P2P.Send(ctx, req); err == nil && resp.OK { return annotateTransport(resp, "auto", r.P2P.Name(), ""), nil + } else if !r.Policy.AllowRelayFallback { + return annotateTransport(resp, "auto", r.P2P.Name(), ""), err } } if r.Relay != nil { resp, err := r.Relay.Send(ctx, req) - return annotateTransport(resp, "auto", r.Relay.Name(), "p2p"), err + fallback := "" + if preferP2P && r.P2P != nil { + fallback = "p2p" + } + return annotateTransport(resp, "auto", r.Relay.Name(), fallback), err + } + if !preferP2P && r.P2P != nil { + resp, err := r.P2P.Send(ctx, req) + return annotateTransport(resp, "auto", r.P2P.Name(), "relay"), err } return Response{}, fmt.Errorf("no transport available") } diff --git a/pkg/nodes/types.go b/pkg/nodes/types.go index e38ce8a..1448bbc 100644 --- a/pkg/nodes/types.go +++ b/pkg/nodes/types.go @@ -42,6 +42,7 @@ type Artifact struct { type NodeInfo struct { ID string `json:"id"` Name string `json:"name,omitempty"` + Tags []string `json:"tags,omitempty"` OS string `json:"os,omitempty"` Arch string `json:"arch,omitempty"` Version string `json:"version,omitempty"` diff --git a/pkg/tools/nodes_tool.go b/pkg/tools/nodes_tool.go index b1504ba..fcc4900 100644 --- a/pkg/tools/nodes_tool.go +++ b/pkg/tools/nodes_tool.go @@ -150,6 +150,9 @@ func (t *NodesTool) writeAudit(req nodes.Request, resp nodes.Response, mode stri "error": resp.Error, "duration_ms": durationMs, } + if len(req.Args) > 0 { + row["request_args"] = req.Args + } if used, _ := resp.Payload["used_transport"].(string); strings.TrimSpace(used) != "" { row["used_transport"] = strings.TrimSpace(used) } diff --git a/webui/src/App.tsx b/webui/src/App.tsx index fb32185..8df044e 100644 --- a/webui/src/App.tsx +++ b/webui/src/App.tsx @@ -13,6 +13,7 @@ const Skills = lazy(() => import('./pages/Skills')); const MCP = lazy(() => import('./pages/MCP')); const Memory = lazy(() => import('./pages/Memory')); const Nodes = lazy(() => import('./pages/Nodes')); +const NodeArtifacts = lazy(() => import('./pages/NodeArtifacts')); const TaskAudit = lazy(() => import('./pages/TaskAudit')); const EKG = lazy(() => import('./pages/EKG')); const LogCodes = lazy(() => import('./pages/LogCodes')); @@ -43,6 +44,7 @@ export default function App() { } /> } /> } /> + } /> } /> } /> } /> diff --git a/webui/src/components/Sidebar.tsx b/webui/src/components/Sidebar.tsx index fd8d97c..fa2e64c 100644 --- a/webui/src/components/Sidebar.tsx +++ b/webui/src/components/Sidebar.tsx @@ -21,6 +21,7 @@ const Sidebar: React.FC = () => { title: t('sidebarRuntime'), items: [ { icon: , label: t('nodes'), to: '/nodes' }, + { icon: , label: t('nodeArtifacts'), to: '/node-artifacts' }, { icon: , label: t('taskAudit'), to: '/task-audit' }, { icon: , label: t('logs'), to: '/logs' }, { icon: , label: t('ekg'), to: '/ekg' }, diff --git a/webui/src/context/AppContext.tsx b/webui/src/context/AppContext.tsx index 30e5753..493b2de 100644 --- a/webui/src/context/AppContext.tsx +++ b/webui/src/context/AppContext.tsx @@ -11,6 +11,8 @@ type RuntimeSnapshot = { trees?: any[]; p2p?: Record; dispatches?: any[]; + alerts?: any[]; + artifact_retention?: Record; }; sessions?: { sessions?: Array<{ key: string; title?: string; channel?: string }>; @@ -49,6 +51,10 @@ interface AppContextType { setNodeP2P: React.Dispatch>>; nodeDispatchItems: any[]; setNodeDispatchItems: React.Dispatch>; + nodeAlerts: any[]; + setNodeAlerts: React.Dispatch>; + nodeArtifactRetention: Record; + setNodeArtifactRetention: React.Dispatch>>; cron: CronJob[]; setCron: (cron: CronJob[]) => void; skills: Skill[]; @@ -111,6 +117,8 @@ export const AppProvider: React.FC<{ children: React.ReactNode }> = ({ children const [nodeTrees, setNodeTrees] = useState('[]'); const [nodeP2P, setNodeP2P] = useState>({}); const [nodeDispatchItems, setNodeDispatchItems] = useState([]); + const [nodeAlerts, setNodeAlerts] = useState([]); + const [nodeArtifactRetention, setNodeArtifactRetention] = useState>({}); const [cron, setCron] = useState([]); const [skills, setSkills] = useState([]); const [clawhubInstalled, setClawhubInstalled] = useState(false); @@ -171,6 +179,8 @@ export const AppProvider: React.FC<{ children: React.ReactNode }> = ({ children setNodeTrees(JSON.stringify(j.trees || [], null, 2)); setNodeP2P(j.p2p || {}); setNodeDispatchItems(Array.isArray(j.dispatches) ? j.dispatches : []); + setNodeAlerts(Array.isArray(j.alerts) ? j.alerts : []); + setNodeArtifactRetention(j.artifact_retention || {}); setIsGatewayOnline(true); } catch (e) { setIsGatewayOnline(false); @@ -277,6 +287,8 @@ export const AppProvider: React.FC<{ children: React.ReactNode }> = ({ children setNodeTrees(JSON.stringify(Array.isArray(snapshot.nodes.trees) ? snapshot.nodes.trees : [], null, 2)); setNodeP2P(snapshot.nodes.p2p || {}); setNodeDispatchItems(Array.isArray(snapshot.nodes.dispatches) ? snapshot.nodes.dispatches : []); + setNodeAlerts(Array.isArray(snapshot.nodes.alerts) ? snapshot.nodes.alerts : []); + setNodeArtifactRetention(snapshot.nodes.artifact_retention || {}); } if (snapshot.sessions) { const arr = Array.isArray(snapshot.sessions.sessions) ? snapshot.sessions.sessions : []; @@ -355,7 +367,7 @@ export const AppProvider: React.FC<{ children: React.ReactNode }> = ({ children return ( = {}; + for (const line of String(raw || '').split('\n')) { + const trimmed = line.trim(); + if (!trimmed) continue; + const idx = trimmed.indexOf('='); + if (idx <= 0) continue; + const key = trimmed.slice(0, idx).trim(); + const tags = trimmed.slice(idx + 1).split(',').map((item) => item.trim()).filter(Boolean); + if (!key || tags.length === 0) continue; + out[key] = tags; + } + return out; +} + +function formatTagRuleText(value: unknown) { + if (!value || typeof value !== 'object' || Array.isArray(value)) return ''; + return Object.entries(value as Record) + .map(([key, tags]) => `${key}=${Array.isArray(tags) ? tags.join(',') : ''}`) + .filter((line) => line !== '=') + .join('\n'); +} + const Config: React.FC = () => { const { t } = useTranslation(); const ui = useUI(); @@ -111,6 +134,14 @@ const Config: React.FC = () => { setCfg((v) => setPath(v, `gateway.nodes.p2p.${field}`, value)); } + function updateGatewayDispatchField(field: string, value: any) { + setCfg((v) => setPath(v, `gateway.nodes.dispatch.${field}`, value)); + } + + function updateGatewayArtifactsField(field: string, value: any) { + setCfg((v) => setPath(v, `gateway.nodes.artifacts.${field}`, value)); + } + function updateGatewayIceServer(index: number, field: string, value: any) { setCfg((v) => { const next = JSON.parse(JSON.stringify(v || {})); @@ -402,6 +433,144 @@ const Config: React.FC = () => {
{t('configNodeP2PIceServersEmpty')}
)} +
+
+
{t('configNodeDispatch')}
+
{t('configNodeDispatchHint')}
+
+
+ + + +
+
+