From 29729d7c70193814c4d3e5b1152d49119a26e1c7 Mon Sep 17 00:00:00 2001 From: lpf Date: Sun, 8 Mar 2026 22:53:03 +0800 Subject: [PATCH] feat: surface node p2p runtime visibility --- cmd/clawgo/cmd_gateway.go | 14 ++++++++++ cmd/clawgo/cmd_status.go | 44 ++++++++++++++++++++++++++------ pkg/api/server.go | 15 ++++++++++- pkg/api/server_test.go | 28 ++++++++++++++++++++ pkg/nodes/transport.go | 27 +++++++++++++++++--- pkg/nodes/webrtc.go | 23 +++++++++++++++++ pkg/tools/nodes_tool.go | 24 ++++++++++------- webui/src/context/AppContext.tsx | 8 +++++- webui/src/i18n/index.ts | 2 ++ webui/src/pages/Dashboard.tsx | 7 ++++- 10 files changed, 168 insertions(+), 24 deletions(-) diff --git a/cmd/clawgo/cmd_gateway.go b/cmd/clawgo/cmd_gateway.go index 9a9f47c..12bac7b 100644 --- a/cmd/clawgo/cmd_gateway.go +++ b/cmd/clawgo/cmd_gateway.go @@ -133,11 +133,25 @@ func gatewayCmd() { if loop == nil || server == nil || runtimeCfg == nil { return } + server.SetNodeP2PStatusHandler(func() map[string]interface{} { + return map[string]interface{}{ + "enabled": runtimeCfg.Gateway.Nodes.P2P.Enabled, + "transport": strings.TrimSpace(runtimeCfg.Gateway.Nodes.P2P.Transport), + "configured_stun": append([]string(nil), runtimeCfg.Gateway.Nodes.P2P.STUNServers...), + } + }) switch { case runtimeCfg.Gateway.Nodes.P2P.Enabled && strings.EqualFold(strings.TrimSpace(runtimeCfg.Gateway.Nodes.P2P.Transport), "webrtc"): webrtcTransport := nodes.NewWebRTCTransport(runtimeCfg.Gateway.Nodes.P2P.STUNServers) loop.SetNodeP2PTransport(webrtcTransport) server.SetNodeWebRTCTransport(webrtcTransport) + server.SetNodeP2PStatusHandler(func() map[string]interface{} { + snapshot := webrtcTransport.Snapshot() + snapshot["enabled"] = true + snapshot["transport"] = "webrtc" + snapshot["configured_stun"] = append([]string(nil), runtimeCfg.Gateway.Nodes.P2P.STUNServers...) + return snapshot + }) default: server.SetNodeWebRTCTransport(nil) } diff --git a/cmd/clawgo/cmd_status.go b/cmd/clawgo/cmd_status.go index 53df9b0..5cfd9ba 100644 --- a/cmd/clawgo/cmd_status.go +++ b/cmd/clawgo/cmd_status.go @@ -163,11 +163,18 @@ func statusCmd() { } fmt.Printf("Nodes: total=%d online=%d\n", len(ns), online) fmt.Printf("Nodes Capabilities: run=%d model=%d camera=%d screen=%d location=%d canvas=%d\n", caps["run"], caps["model"], caps["camera"], caps["screen"], caps["location"], caps["canvas"]) - if total, okCnt, avgMs, actionTop, err := collectNodeDispatchStats(filepath.Join(workspace, "memory", "nodes-dispatch-audit.jsonl")); err == nil && total > 0 { + fmt.Printf("Nodes P2P: enabled=%t transport=%s\n", cfg.Gateway.Nodes.P2P.Enabled, strings.TrimSpace(cfg.Gateway.Nodes.P2P.Transport)) + if total, okCnt, avgMs, actionTop, transportTop, fallbackCnt, err := collectNodeDispatchStats(filepath.Join(workspace, "memory", "nodes-dispatch-audit.jsonl")); err == nil && total > 0 { fmt.Printf("Nodes Dispatch: total=%d ok=%d fail=%d avg_ms=%d\n", total, okCnt, total-okCnt, avgMs) if actionTop != "" { fmt.Printf("Nodes Dispatch Top Action: %s\n", actionTop) } + if transportTop != "" { + fmt.Printf("Nodes Dispatch Top Transport: %s\n", transportTop) + } + if fallbackCnt > 0 { + fmt.Printf("Nodes Dispatch Fallbacks: %d\n", fallbackCnt) + } } } } @@ -261,23 +268,26 @@ func collectTriggerErrorCounts(path string) (map[string]int, error) { return counts, nil } -func collectNodeDispatchStats(path string) (int, int, int, string, error) { +func collectNodeDispatchStats(path string) (int, int, int, string, string, int, error) { data, err := os.ReadFile(path) if err != nil { - return 0, 0, 0, "", err + return 0, 0, 0, "", "", 0, err } lines := strings.Split(strings.TrimSpace(string(data)), "\n") - total, okCnt, msSum := 0, 0, 0 + total, okCnt, msSum, fallbackCnt := 0, 0, 0, 0 actionCnt := map[string]int{} + transportCnt := map[string]int{} for _, line := range lines { line = strings.TrimSpace(line) if line == "" { continue } var row struct { - Action string `json:"action"` - OK bool `json:"ok"` - DurationMS int `json:"duration_ms"` + Action string `json:"action"` + UsedTransport string `json:"used_transport"` + FallbackFrom string `json:"fallback_from"` + OK bool `json:"ok"` + DurationMS int `json:"duration_ms"` } if err := json.Unmarshal([]byte(line), &row); err != nil { continue @@ -294,6 +304,13 @@ func collectNodeDispatchStats(path string) (int, int, int, string, error) { a = "unknown" } actionCnt[a]++ + used := strings.TrimSpace(strings.ToLower(row.UsedTransport)) + if used != "" { + transportCnt[used]++ + } + if strings.TrimSpace(row.FallbackFrom) != "" { + fallbackCnt++ + } } avg := 0 if total > 0 { @@ -310,7 +327,18 @@ func collectNodeDispatchStats(path string) (int, int, int, string, error) { if topAction != "" { topAction = fmt.Sprintf("%s(%d)", topAction, topN) } - return total, okCnt, avg, topAction, nil + topTransport := "" + topTN := 0 + for k, v := range transportCnt { + if v > topTN { + topTN = v + topTransport = k + } + } + if topTransport != "" { + topTransport = fmt.Sprintf("%s(%d)", topTransport, topTN) + } + return total, okCnt, avg, topAction, topTransport, fallbackCnt, nil } func collectSkillExecStats(path string) (int, int, int, float64, string, error) { diff --git a/pkg/api/server.go b/pkg/api/server.go index 28a2451..b5112a8 100644 --- a/pkg/api/server.go +++ b/pkg/api/server.go @@ -41,6 +41,7 @@ type Server struct { nodeConnIDs map[string]string nodeSockets map[string]*nodeSocketConn nodeWebRTC *nodes.WebRTCTransport + nodeP2PStatus func() map[string]interface{} gatewayVersion string webuiVersion string configPath string @@ -120,6 +121,9 @@ func (s *Server) SetWebUIVersion(v string) { s.webuiVersion func (s *Server) SetNodeWebRTCTransport(t *nodes.WebRTCTransport) { s.nodeWebRTC = t } +func (s *Server) SetNodeP2PStatusHandler(fn func() map[string]interface{}) { + s.nodeP2PStatus = fn +} func (s *Server) rememberNodeConnection(nodeID, connID string) { nodeID = strings.TrimSpace(nodeID) @@ -1066,9 +1070,14 @@ func (s *Server) webUINodesPayload(ctx context.Context) map[string]interface{} { if !matched { list = append([]nodes.NodeInfo{local}, list...) } + p2p := map[string]interface{}{} + if s.nodeP2PStatus != nil { + p2p = s.nodeP2PStatus() + } return map[string]interface{}{ "nodes": list, "trees": s.buildNodeAgentTrees(ctx, list), + "p2p": p2p, } } @@ -1548,7 +1557,11 @@ func (s *Server) handleWebUINodes(w http.ResponseWriter, r *http.Request) { list = append([]nodes.NodeInfo{local}, list...) } trees := s.buildNodeAgentTrees(r.Context(), list) - _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "nodes": list, "trees": trees}) + p2p := map[string]interface{}{} + if s.nodeP2PStatus != nil { + p2p = s.nodeP2PStatus() + } + _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "nodes": list, "trees": trees, "p2p": p2p}) case http.MethodPost: var body struct { Action string `json:"action"` diff --git a/pkg/api/server_test.go b/pkg/api/server_test.go index d106878..11314ce 100644 --- a/pkg/api/server_test.go +++ b/pkg/api/server_test.go @@ -428,3 +428,31 @@ func TestHandleWebUILogsLive(t *testing.T) { t.Fatalf("expected tail-ok entry, got: %+v", entry) } } + +func TestHandleWebUINodesIncludesP2PSummary(t *testing.T) { + t.Parallel() + + srv := NewServer("127.0.0.1", 0, "", nodes.NewManager()) + srv.SetNodeP2PStatusHandler(func() map[string]interface{} { + return map[string]interface{}{ + "enabled": true, + "transport": "webrtc", + "active_sessions": 2, + } + }) + + req := httptest.NewRequest(http.MethodGet, "/webui/api/nodes", nil) + rec := httptest.NewRecorder() + srv.handleWebUINodes(rec, req) + if rec.Code != http.StatusOK { + t.Fatalf("expected 200, got %d", rec.Code) + } + var body map[string]interface{} + if err := json.Unmarshal(rec.Body.Bytes(), &body); err != nil { + t.Fatalf("decode body: %v", err) + } + p2p, _ := body["p2p"].(map[string]interface{}) + if p2p == nil || p2p["transport"] != "webrtc" { + t.Fatalf("expected p2p summary, got %+v", body) + } +} diff --git a/pkg/nodes/transport.go b/pkg/nodes/transport.go index 6e0fd3d..d77503d 100644 --- a/pkg/nodes/transport.go +++ b/pkg/nodes/transport.go @@ -33,25 +33,44 @@ func (r *Router) Dispatch(ctx context.Context, req Request, mode string) (Respon if r.P2P == nil { return Response{OK: false, Node: req.Node, Action: req.Action, Error: "p2p transport unavailable"}, nil } - return r.P2P.Send(ctx, req) + resp, err := r.P2P.Send(ctx, req) + return annotateTransport(resp, "p2p", r.P2P.Name(), ""), err case "relay": if r.Relay == nil { return Response{OK: false, Node: req.Node, Action: req.Action, Error: "relay transport unavailable"}, nil } - return r.Relay.Send(ctx, req) + resp, err := r.Relay.Send(ctx, req) + return annotateTransport(resp, "relay", r.Relay.Name(), ""), err default: // auto if r.P2P != nil { if resp, err := r.P2P.Send(ctx, req); err == nil && resp.OK { - return resp, nil + return annotateTransport(resp, "auto", r.P2P.Name(), ""), nil } } if r.Relay != nil { - return r.Relay.Send(ctx, req) + resp, err := r.Relay.Send(ctx, req) + return annotateTransport(resp, "auto", r.Relay.Name(), "p2p"), err } return Response{}, fmt.Errorf("no transport available") } } +func annotateTransport(resp Response, mode, usedTransport, fallbackFrom string) Response { + if resp.Payload == nil { + resp.Payload = map[string]interface{}{} + } + if strings.TrimSpace(mode) != "" { + resp.Payload["dispatch_mode"] = strings.TrimSpace(mode) + } + if strings.TrimSpace(usedTransport) != "" { + resp.Payload["used_transport"] = strings.TrimSpace(usedTransport) + } + if strings.TrimSpace(fallbackFrom) != "" { + resp.Payload["fallback_from"] = strings.TrimSpace(fallbackFrom) + } + return resp +} + // WebsocketP2PTransport uses the persistent node websocket as a request/response tunnel // while the project evolves toward a true peer data channel. type WebsocketP2PTransport struct { diff --git a/pkg/nodes/webrtc.go b/pkg/nodes/webrtc.go index 548c4df..a02b855 100644 --- a/pkg/nodes/webrtc.go +++ b/pkg/nodes/webrtc.go @@ -71,6 +71,29 @@ func NewWebRTCTransport(stunServers []string) *WebRTCTransport { func (t *WebRTCTransport) Name() string { return "p2p-webrtc" } +func (t *WebRTCTransport) Snapshot() map[string]interface{} { + t.mu.Lock() + defer t.mu.Unlock() + nodes := make([]map[string]interface{}, 0, len(t.sessions)) + active := 0 + for nodeID, session := range t.sessions { + status := "connecting" + if session != nil && session.dc != nil && session.dc.ReadyState() == webrtc.DataChannelStateOpen { + status = "open" + active++ + } + nodes = append(nodes, map[string]interface{}{ + "node": nodeID, + "status": status, + }) + } + return map[string]interface{}{ + "transport": "webrtc", + "active_sessions": active, + "nodes": nodes, + } +} + func (t *WebRTCTransport) BindSignaler(nodeID string, sender WireSender) { nodeID = strings.TrimSpace(nodeID) if nodeID == "" { diff --git a/pkg/tools/nodes_tool.go b/pkg/tools/nodes_tool.go index f6e578f..2ba385e 100644 --- a/pkg/tools/nodes_tool.go +++ b/pkg/tools/nodes_tool.go @@ -22,20 +22,20 @@ type NodesTool struct { func NewNodesTool(m *nodes.Manager, r *nodes.Router, auditPath string) *NodesTool { return &NodesTool{manager: m, router: r, auditPath: strings.TrimSpace(auditPath)} } -func (t *NodesTool) Name() string { return "nodes" } +func (t *NodesTool) Name() string { return "nodes" } func (t *NodesTool) Description() string { return "Manage paired nodes (status/describe/run/invoke/camera/screen/location/canvas)." } func (t *NodesTool) Parameters() map[string]interface{} { return map[string]interface{}{"type": "object", "properties": map[string]interface{}{ - "action": map[string]interface{}{"type": "string", "description": "status|describe|run|invoke|agent_task|camera_snap|camera_clip|screen_record|screen_snapshot|location_get|canvas_snapshot|canvas_action"}, - "node": map[string]interface{}{"type": "string", "description": "target node id"}, - "mode": map[string]interface{}{"type": "string", "description": "auto|p2p|relay (default auto)"}, - "args": map[string]interface{}{"type": "object", "description": "action args"}, - "task": map[string]interface{}{"type": "string", "description": "agent_task content for child node model"}, - "model": map[string]interface{}{"type": "string", "description": "optional model for agent_task"}, - "command": map[string]interface{}{"type": "array", "items": map[string]interface{}{"type": "string"}, "description": "run command array shortcut"}, - "facing": map[string]interface{}{"type": "string", "description": "camera facing: front|back|both"}, + "action": map[string]interface{}{"type": "string", "description": "status|describe|run|invoke|agent_task|camera_snap|camera_clip|screen_record|screen_snapshot|location_get|canvas_snapshot|canvas_action"}, + "node": map[string]interface{}{"type": "string", "description": "target node id"}, + "mode": map[string]interface{}{"type": "string", "description": "auto|p2p|relay (default auto)"}, + "args": map[string]interface{}{"type": "object", "description": "action args"}, + "task": map[string]interface{}{"type": "string", "description": "agent_task content for child node model"}, + "model": map[string]interface{}{"type": "string", "description": "optional model for agent_task"}, + "command": map[string]interface{}{"type": "array", "items": map[string]interface{}{"type": "string"}, "description": "run command array shortcut"}, + "facing": map[string]interface{}{"type": "string", "description": "camera facing: front|back|both"}, "duration_ms": map[string]interface{}{"type": "integer", "description": "clip/record duration"}, }, "required": []string{"action"}} } @@ -144,6 +144,12 @@ func (t *NodesTool) writeAudit(req nodes.Request, resp nodes.Response, mode stri "error": resp.Error, "duration_ms": durationMs, } + if used, _ := resp.Payload["used_transport"].(string); strings.TrimSpace(used) != "" { + row["used_transport"] = strings.TrimSpace(used) + } + if fallback, _ := resp.Payload["fallback_from"].(string); strings.TrimSpace(fallback) != "" { + row["fallback_from"] = strings.TrimSpace(fallback) + } b, _ := json.Marshal(row) f, err := os.OpenFile(t.auditPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) if err != nil { diff --git a/webui/src/context/AppContext.tsx b/webui/src/context/AppContext.tsx index 645a695..777d484 100644 --- a/webui/src/context/AppContext.tsx +++ b/webui/src/context/AppContext.tsx @@ -9,6 +9,7 @@ type RuntimeSnapshot = { nodes?: { nodes?: any[]; trees?: any[]; + p2p?: Record; }; sessions?: { sessions?: Array<{ key: string; title?: string; channel?: string }>; @@ -43,6 +44,8 @@ interface AppContextType { setNodes: (nodes: string) => void; nodeTrees: string; setNodeTrees: (trees: string) => void; + nodeP2P: Record; + setNodeP2P: React.Dispatch>>; cron: CronJob[]; setCron: (cron: CronJob[]) => void; skills: Skill[]; @@ -103,6 +106,7 @@ export const AppProvider: React.FC<{ children: React.ReactNode }> = ({ children const [configEditing, setConfigEditing] = useState(false); const [nodes, setNodes] = useState('[]'); const [nodeTrees, setNodeTrees] = useState('[]'); + const [nodeP2P, setNodeP2P] = useState>({}); const [cron, setCron] = useState([]); const [skills, setSkills] = useState([]); const [clawhubInstalled, setClawhubInstalled] = useState(false); @@ -161,6 +165,7 @@ export const AppProvider: React.FC<{ children: React.ReactNode }> = ({ children const j = await r.json(); setNodes(JSON.stringify(j.nodes || [], null, 2)); setNodeTrees(JSON.stringify(j.trees || [], null, 2)); + setNodeP2P(j.p2p || {}); setIsGatewayOnline(true); } catch (e) { setIsGatewayOnline(false); @@ -265,6 +270,7 @@ export const AppProvider: React.FC<{ children: React.ReactNode }> = ({ children if (snapshot.nodes) { setNodes(JSON.stringify(Array.isArray(snapshot.nodes.nodes) ? snapshot.nodes.nodes : [], null, 2)); setNodeTrees(JSON.stringify(Array.isArray(snapshot.nodes.trees) ? snapshot.nodes.trees : [], null, 2)); + setNodeP2P(snapshot.nodes.p2p || {}); } if (snapshot.sessions) { const arr = Array.isArray(snapshot.sessions.sessions) ? snapshot.sessions.sessions : []; @@ -343,7 +349,7 @@ export const AppProvider: React.FC<{ children: React.ReactNode }> = ({ children return ( { webuiVersion, skills, cfg, + nodeP2P, taskQueueItems, ekgSummary, } = useAppContext(); @@ -36,6 +37,9 @@ const Dashboard: React.FC = () => { const ekgEscalationCount = Number(ekgSummary?.escalation_count || 0); const ekgTopProvider = (Array.isArray(ekgSummary?.provider_top_workload) ? ekgSummary.provider_top_workload[0]?.key : '') || '-'; const ekgTopErrSig = (Array.isArray(ekgSummary?.errsig_top_workload) ? ekgSummary.errsig_top_workload[0]?.key : '') || '-'; + const p2pEnabled = Boolean(nodeP2P?.enabled); + const p2pTransport = String(nodeP2P?.transport || (p2pEnabled ? 'enabled' : 'disabled')); + const p2pSessions = Number(nodeP2P?.active_sessions || 0); return (
@@ -53,12 +57,13 @@ const Dashboard: React.FC = () => {
-
+
} /> } /> } /> } /> } /> + } />