From c0fe977bcebc8ab1331850ba8ce7f8f1c4445ed8 Mon Sep 17 00:00:00 2001 From: lpf Date: Mon, 9 Mar 2026 00:56:32 +0800 Subject: [PATCH] feat: document and surface node p2p telemetry --- README.md | 1 + README_EN.md | 1 + docs/node-p2p-e2e.md | 287 +++++++++++++++++++++++++++++++ pkg/api/server.go | 84 +++++---- pkg/api/server_test.go | 12 ++ webui/src/context/AppContext.tsx | 8 +- webui/src/i18n/index.ts | 28 +++ webui/src/pages/Dashboard.tsx | 151 ++++++++++++++++ webui/src/pages/Subagents.tsx | 49 +++++- 9 files changed, 571 insertions(+), 50 deletions(-) create mode 100644 docs/node-p2p-e2e.md diff --git a/README.md b/README.md index db3a785..45b035f 100644 --- a/README.md +++ b/README.md @@ -245,6 +245,7 @@ user -> main -> worker -> main -> user - `webrtc` 建连失败时,调度层仍会回退到现有 relay / tunnel 路径 - Dashboard、`status`、`/webui/api/nodes` 会显示当前 Node P2P 状态和会话摘要 +- 两台公网机器的实网验证流程见 [docs/node-p2p-e2e.md](/Users/lpf/Desktop/project/clawgo/docs/node-p2p-e2e.md) ## MCP 服务支持 diff --git a/README_EN.md b/README_EN.md index 4a07500..58a80e4 100644 --- a/README_EN.md +++ b/README_EN.md @@ -245,6 +245,7 @@ Notes: - when `webrtc` session setup fails, dispatch still falls back to the existing relay / tunnel path - Dashboard, `status`, and `/webui/api/nodes` expose the current Node P2P runtime summary +- a reusable public-network validation flow is documented in [docs/node-p2p-e2e.md](/Users/lpf/Desktop/project/clawgo/docs/node-p2p-e2e.md) ## MCP Server Support diff --git a/docs/node-p2p-e2e.md b/docs/node-p2p-e2e.md new file mode 100644 index 0000000..b59408a --- /dev/null +++ b/docs/node-p2p-e2e.md @@ -0,0 +1,287 @@ +# Node P2P E2E + +这份文档用于验证 `gateway.nodes.p2p` 的两条真实数据面: + +- `websocket_tunnel` +- `webrtc` + +目标不是单元测试,而是两台公网机器上的真实联通性验证。 + +## 验证目标 + +验证通过需要同时满足: + +1. 两台远端 node 都能成功注册到同一个 gateway +2. `websocket_tunnel` 模式下,远端 node 任务可成功完成 +3. `webrtc` 模式下,远端 node 任务可成功完成 +4. `webrtc` 模式下,`/webui/api/nodes` 的 `p2p.active_sessions` 大于 `0` +5. `Dashboard` / `Subagents` 能看到 node P2P 会话状态和最近调度路径 + +## 前置条件 + +- 一台 gateway 机器 +- 两台远端 node 机器 +- 三台机器都能运行 `clawgo` +- 远端机器有 `python3` +- gateway 机器对外开放 WebUI / node registry 端口 + +推荐: + +- 先验证 `websocket_tunnel` +- 再切到 `webrtc` +- `webrtc` 至少配置一个可用的 `stun_servers` + +## 测试思路 + +为了排除 HTTP relay 误判,建议让目标 node 的 `endpoint` 故意写成只对目标 node 本机有效的地址,例如: + +```text +http://127.0.0.1: +``` + +这样如果任务仍能完成,就说明请求不是靠 gateway 直接 HTTP relay 打过去的,而是走了 node P2P 通道。 + +## 建议配置 + +### 1. websocket_tunnel + +```json +{ + "gateway": { + "host": "0.0.0.0", + "port": 18790, + "token": "YOUR_GATEWAY_TOKEN", + "nodes": { + "p2p": { + "enabled": true, + "transport": "websocket_tunnel", + "stun_servers": [], + "ice_servers": [] + } + } + } +} +``` + +### 2. webrtc + +```json +{ + "gateway": { + "host": "0.0.0.0", + "port": 18790, + "token": "YOUR_GATEWAY_TOKEN", + "nodes": { + "p2p": { + "enabled": true, + "transport": "webrtc", + "stun_servers": ["stun:stun.l.google.com:19302"], + "ice_servers": [] + } + } + } +} +``` + +## 最小 node endpoint + +在每台远端 node 上启动一个最小 HTTP 服务,用于返回固定结果: + +```python +#!/usr/bin/env python3 +import json +import os +import socket +from http.server import BaseHTTPRequestHandler, HTTPServer + +PORT = int(os.environ.get("PORT", "19081")) +LABEL = os.environ.get("NODE_LABEL", socket.gethostname()) + +class H(BaseHTTPRequestHandler): + def log_message(self, fmt, *args): + pass + + def do_POST(self): + length = int(self.headers.get("Content-Length", "0") or 0) + raw = self.rfile.read(length) if length else b"{}" + try: + req = json.loads(raw.decode("utf-8") or "{}") + except Exception: + req = {} + action = req.get("action") or self.path.strip("/") + payload = { + "handler": LABEL, + "hostname": socket.gethostname(), + "path": self.path, + "echo": req, + } + if action == "agent_task": + payload["result"] = f"agent_task from {LABEL}" + else: + payload["result"] = f"{action} from {LABEL}" + body = json.dumps({ + "ok": True, + "code": "ok", + "node": LABEL, + "action": action, + "payload": payload, + }).encode("utf-8") + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + +HTTPServer(("0.0.0.0", PORT), H).serve_forever() +``` + +## 注册远端 node + +在每台 node 上执行: + +```bash +clawgo node register \ + --gateway http://:18790 \ + --token YOUR_GATEWAY_TOKEN \ + --id \ + --name \ + --endpoint http://127.0.0.1: \ + --actions run,agent_task \ + --models gpt-4o-mini \ + --capabilities run,invoke,model \ + --watch \ + --heartbeat-sec 10 +``` + +验证注册成功: + +```bash +curl -s -H 'Authorization: Bearer YOUR_GATEWAY_TOKEN' \ + http://:18790/webui/api/nodes +``` + +预期: + +- 远端 node 出现在 `nodes` +- `online = true` +- 主拓扑中出现 `node..main` + +## 建议的任务验证方式 + +不要通过普通聊天 prompt 让模型“自己决定是否调用 nodes 工具”作为主判据。 +更稳定的方式是直接调用 subagent runtime,把任务派给远端 node branch: + +```bash +curl -s \ + -H 'Authorization: Bearer YOUR_GATEWAY_TOKEN' \ + -H 'Content-Type: application/json' \ + http://:18790/webui/api/subagents_runtime \ + -d '{ + "action": "dispatch_and_wait", + "agent_id": "node..main", + "task": "Return exactly the string NODE_P2P_OK", + "wait_timeout_sec": 30 + }' +``` + +预期: + +- `ok = true` +- `result.reply.status = completed` +- `result.reply.result` 含远端 endpoint 返回内容 + +## websocket_tunnel 判定 + +在 `websocket_tunnel` 模式下,上面的任务应能成功完成。 + +如果目标 node 的 `endpoint` 配成了 `127.0.0.1:`,且任务仍成功,则说明: + +- 不是 gateway 直接 HTTP relay 到远端公网地址 +- 实际请求已经通过 node websocket 隧道送达目标 node + +## webrtc 判定 + +切到 `webrtc` 配置后,重复同样的 `dispatch_and_wait`。 + +随后查看: + +```bash +curl -s -H 'Authorization: Bearer YOUR_GATEWAY_TOKEN' \ + http://:18790/webui/api/nodes +``` + +预期 `p2p` 段包含: + +- `transport = "webrtc"` +- `active_sessions > 0` +- `nodes[].status = "open"` +- `nodes[].last_ready_at` 非空 + +这表示 WebRTC DataChannel 已经真正建立,而不只是 signaling 被触发。 + +## WebUI 判定 + +验证页面: + +- `Dashboard` + - 能看到 Node P2P 会话明细 + - 能看到最近节点调度记录,包括 `used_transport` 和 `fallback_from` +- `Subagents` + - 远端 node branch 的卡片/tooltip 能显示: + - P2P transport + - session status + - retry count + - last ready + - last error + +## 常见问题 + +### 1. gateway 端口上已经有旧实例 + +现象: + +- 新配置明明改了,但 `/webui/api/version` 或 `/webui/api/nodes` 仍表现出旧行为 + +处理: + +- 先确认端口上实际监听的是哪一个 `clawgo` 进程 +- 再启动测试实例 + +### 2. chat 路由干扰 node 工具验证 + +现象: + +- 普通聊天请求被 router 或 skill 行为分流 +- 没有真正命中 `nodes` 数据面 + +处理: + +- 直接用 `/webui/api/subagents_runtime` 的 `dispatch_and_wait` +- 让任务明确走 `node..main` + +### 3. webrtc 一直停在 connecting + +优先检查: + +- `stun_servers` 是否可达 +- 两端机器是否允许 UDP 出站 +- 是否需要 `turn:` / `turns:` 服务器 + +### 4. 任务成功但 UI 没显示会话 + +优先检查: + +- 是否真的运行在 `webrtc` 配置下 +- `/webui/api/nodes` 返回的 `p2p` 是否含 `active_sessions` +- 前端是否已经更新到包含 node P2P runtime 展示的版本 + +## 回归建议 + +每次改动以下模块后,至少回归一次本流程: + +- `pkg/nodes/webrtc.go` +- `pkg/nodes/transport.go` +- `pkg/agent/loop.go` +- `pkg/api/server.go` +- `cmd/clawgo/cmd_node.go` +- `cmd/clawgo/cmd_gateway.go` diff --git a/pkg/api/server.go b/pkg/api/server.go index b5112a8..f7dd418 100644 --- a/pkg/api/server.go +++ b/pkg/api/server.go @@ -1075,12 +1075,45 @@ func (s *Server) webUINodesPayload(ctx context.Context) map[string]interface{} { p2p = s.nodeP2PStatus() } return map[string]interface{}{ - "nodes": list, - "trees": s.buildNodeAgentTrees(ctx, list), - "p2p": p2p, + "nodes": list, + "trees": s.buildNodeAgentTrees(ctx, list), + "p2p": p2p, + "dispatches": s.webUINodesDispatchPayload(12), } } +func (s *Server) webUINodesDispatchPayload(limit int) []map[string]interface{} { + workspace := strings.TrimSpace(s.workspacePath) + if workspace == "" { + return []map[string]interface{}{} + } + path := filepath.Join(workspace, "memory", "nodes-dispatch-audit.jsonl") + data, err := os.ReadFile(path) + if err != nil { + return []map[string]interface{}{} + } + lines := strings.Split(strings.TrimSpace(string(data)), "\n") + if len(lines) == 1 && strings.TrimSpace(lines[0]) == "" { + return []map[string]interface{}{} + } + out := make([]map[string]interface{}, 0, limit) + for i := len(lines) - 1; i >= 0; i-- { + line := strings.TrimSpace(lines[i]) + if line == "" { + continue + } + row := map[string]interface{}{} + if err := json.Unmarshal([]byte(line), &row); err != nil { + continue + } + out = append(out, row) + if limit > 0 && len(out) >= limit { + break + } + } + return out +} + func (s *Server) webUISessionsPayload() map[string]interface{} { sessionsDir := filepath.Join(filepath.Dir(s.workspacePath), "agents", "main", "sessions") _ = os.MkdirAll(sessionsDir, 0755) @@ -1520,48 +1553,9 @@ func (s *Server) handleWebUINodes(w http.ResponseWriter, r *http.Request) { } switch r.Method { case http.MethodGet: - list := []nodes.NodeInfo{} - if s.mgr != nil { - list = s.mgr.List() - } - host, _ := os.Hostname() - local := nodes.NodeInfo{ID: "local", Name: "local", Endpoint: "gateway", Version: gatewayBuildVersion(), LastSeenAt: time.Now(), Online: true} - if strings.TrimSpace(host) != "" { - local.Name = host - } - if ip := detectLocalIP(); ip != "" { - local.Endpoint = ip - } - hostLower := strings.ToLower(strings.TrimSpace(host)) - matched := false - for i := range list { - id := strings.ToLower(strings.TrimSpace(list[i].ID)) - name := strings.ToLower(strings.TrimSpace(list[i].Name)) - if id == "local" || name == "local" || (hostLower != "" && name == hostLower) { - // Always keep local node green/alive with latest ip+version - list[i].ID = "local" - list[i].Online = true - list[i].Version = local.Version - if strings.TrimSpace(local.Endpoint) != "" { - list[i].Endpoint = local.Endpoint - } - if strings.TrimSpace(local.Name) != "" { - list[i].Name = local.Name - } - list[i].LastSeenAt = time.Now() - matched = true - break - } - } - if !matched { - list = append([]nodes.NodeInfo{local}, list...) - } - trees := s.buildNodeAgentTrees(r.Context(), list) - p2p := map[string]interface{}{} - if s.nodeP2PStatus != nil { - p2p = s.nodeP2PStatus() - } - _ = json.NewEncoder(w).Encode(map[string]interface{}{"ok": true, "nodes": list, "trees": trees, "p2p": p2p}) + payload := s.webUINodesPayload(r.Context()) + payload["ok"] = true + _ = json.NewEncoder(w).Encode(payload) case http.MethodPost: var body struct { Action string `json:"action"` diff --git a/pkg/api/server_test.go b/pkg/api/server_test.go index 11314ce..ee10df4 100644 --- a/pkg/api/server_test.go +++ b/pkg/api/server_test.go @@ -433,6 +433,14 @@ func TestHandleWebUINodesIncludesP2PSummary(t *testing.T) { t.Parallel() srv := NewServer("127.0.0.1", 0, "", nodes.NewManager()) + workspace := t.TempDir() + srv.SetWorkspacePath(workspace) + if err := os.MkdirAll(filepath.Join(workspace, "memory"), 0755); err != nil { + t.Fatalf("mkdir memory: %v", err) + } + if err := os.WriteFile(filepath.Join(workspace, "memory", "nodes-dispatch-audit.jsonl"), []byte("{\"node\":\"edge-b\",\"used_transport\":\"webrtc\",\"fallback_from\":\"\",\"duration_ms\":12}\n"), 0644); err != nil { + t.Fatalf("write audit: %v", err) + } srv.SetNodeP2PStatusHandler(func() map[string]interface{} { return map[string]interface{}{ "enabled": true, @@ -455,4 +463,8 @@ func TestHandleWebUINodesIncludesP2PSummary(t *testing.T) { if p2p == nil || p2p["transport"] != "webrtc" { t.Fatalf("expected p2p summary, got %+v", body) } + dispatches, _ := body["dispatches"].([]interface{}) + if len(dispatches) != 1 { + t.Fatalf("expected dispatch audit rows, got %+v", body["dispatches"]) + } } diff --git a/webui/src/context/AppContext.tsx b/webui/src/context/AppContext.tsx index 777d484..30e5753 100644 --- a/webui/src/context/AppContext.tsx +++ b/webui/src/context/AppContext.tsx @@ -10,6 +10,7 @@ type RuntimeSnapshot = { nodes?: any[]; trees?: any[]; p2p?: Record; + dispatches?: any[]; }; sessions?: { sessions?: Array<{ key: string; title?: string; channel?: string }>; @@ -46,6 +47,8 @@ interface AppContextType { setNodeTrees: (trees: string) => void; nodeP2P: Record; setNodeP2P: React.Dispatch>>; + nodeDispatchItems: any[]; + setNodeDispatchItems: React.Dispatch>; cron: CronJob[]; setCron: (cron: CronJob[]) => void; skills: Skill[]; @@ -107,6 +110,7 @@ export const AppProvider: React.FC<{ children: React.ReactNode }> = ({ children const [nodes, setNodes] = useState('[]'); const [nodeTrees, setNodeTrees] = useState('[]'); const [nodeP2P, setNodeP2P] = useState>({}); + const [nodeDispatchItems, setNodeDispatchItems] = useState([]); const [cron, setCron] = useState([]); const [skills, setSkills] = useState([]); const [clawhubInstalled, setClawhubInstalled] = useState(false); @@ -166,6 +170,7 @@ export const AppProvider: React.FC<{ children: React.ReactNode }> = ({ children setNodes(JSON.stringify(j.nodes || [], null, 2)); setNodeTrees(JSON.stringify(j.trees || [], null, 2)); setNodeP2P(j.p2p || {}); + setNodeDispatchItems(Array.isArray(j.dispatches) ? j.dispatches : []); setIsGatewayOnline(true); } catch (e) { setIsGatewayOnline(false); @@ -271,6 +276,7 @@ export const AppProvider: React.FC<{ children: React.ReactNode }> = ({ children setNodes(JSON.stringify(Array.isArray(snapshot.nodes.nodes) ? snapshot.nodes.nodes : [], null, 2)); setNodeTrees(JSON.stringify(Array.isArray(snapshot.nodes.trees) ? snapshot.nodes.trees : [], null, 2)); setNodeP2P(snapshot.nodes.p2p || {}); + setNodeDispatchItems(Array.isArray(snapshot.nodes.dispatches) ? snapshot.nodes.dispatches : []); } if (snapshot.sessions) { const arr = Array.isArray(snapshot.sessions.sessions) ? snapshot.sessions.sessions : []; @@ -349,7 +355,7 @@ export const AppProvider: React.FC<{ children: React.ReactNode }> = ({ children return ( { const { t } = useTranslation(); const { @@ -15,6 +23,7 @@ const Dashboard: React.FC = () => { skills, cfg, nodeP2P, + nodeDispatchItems, taskQueueItems, ekgSummary, } = useAppContext(); @@ -45,6 +54,35 @@ const Dashboard: React.FC = () => { const p2pRetryCount = Array.isArray(nodeP2P?.nodes) ? nodeP2P.nodes.reduce((sum: number, session: any) => sum + Number(session?.retry_count || 0), 0) : 0; + const p2pNodeSessions = useMemo(() => { + if (!Array.isArray(nodeP2P?.nodes)) return []; + return [...nodeP2P.nodes] + .map((session: any) => ({ + node: String(session?.node || '-'), + status: String(session?.status || 'unknown'), + retryCount: Number(session?.retry_count || 0), + lastError: String(session?.last_error || '').trim(), + lastReadyAt: formatRuntimeTime(session?.last_ready_at), + lastAttempt: formatRuntimeTime(session?.last_attempt), + createdAt: formatRuntimeTime(session?.created_at), + })) + .sort((a, b) => a.node.localeCompare(b.node)); + }, [nodeP2P]); + const recentNodeDispatches = useMemo(() => { + return [...nodeDispatchItems] + .slice(0, 8) + .map((item: any, index: number) => ({ + id: `${item?.time || 'dispatch'}-${index}`, + time: formatRuntimeTime(item?.time), + node: String(item?.node || '-'), + action: String(item?.action || '-'), + usedTransport: String(item?.used_transport || '-'), + fallbackFrom: String(item?.fallback_from || '').trim(), + durationMs: Number(item?.duration_ms || 0), + ok: Boolean(item?.ok), + error: String(item?.error || '').trim(), + })); + }, [nodeDispatchItems]); return (
@@ -148,6 +186,119 @@ const Dashboard: React.FC = () => {
+ +
+
+
+
+ +

{t('dashboardNodeP2PSessions')}

+
+
+ {t('dashboardNodeP2PDetail', { transport: p2pTransport, sessions: p2pSessions, retries: p2pRetryCount })} +
+
+
+ {`${p2pConfiguredIce} ICE · ${p2pConfiguredStun} STUN`} +
+
+ {p2pNodeSessions.length === 0 ? ( +
{t('dashboardNodeP2PSessionsEmpty')}
+ ) : ( +
+ {p2pNodeSessions.map((session) => { + const isOpen = session.status.toLowerCase() === 'open'; + const isConnecting = session.status.toLowerCase() === 'connecting'; + return ( +
+
+
+
{session.node}
+
+ {t('dashboardNodeP2PSessionCreated')}: {session.createdAt} +
+
+
+ {session.status} +
+
+
+
+
{t('dashboardNodeP2PSessionRetries')}
+
{session.retryCount}
+
+
+
{t('dashboardNodeP2PSessionReady')}
+
{session.lastReadyAt}
+
+
+
{t('dashboardNodeP2PSessionAttempt')}
+
{session.lastAttempt}
+
+
+
{t('dashboardNodeP2PSessionError')}
+
+ {session.lastError || '-'} +
+
+
+
+ ); + })} +
+ )} +
+ +
+
+
+
+ +

{t('dashboardNodeDispatches')}

+
+
{t('dashboardNodeDispatchesHint')}
+
+
+ {recentNodeDispatches.length === 0 ? ( +
{t('dashboardNodeDispatchesEmpty')}
+ ) : ( +
+ {recentNodeDispatches.map((item) => ( +
+
+
+
{`${item.node} · ${item.action}`}
+
{item.time}
+
+
+ {item.ok ? 'ok' : 'error'} +
+
+
+
+
{t('dashboardNodeDispatchTransport')}
+
{item.usedTransport}
+
+
+
{t('dashboardNodeDispatchFallback')}
+
{item.fallbackFrom || '-'}
+
+
+
{t('dashboardNodeDispatchDuration')}
+
{`${item.durationMs}ms`}
+
+
+
{t('dashboardNodeDispatchError')}
+
+ {item.error || '-'} +
+
+
+
+ ))} +
+ )} +
); }; diff --git a/webui/src/pages/Subagents.tsx b/webui/src/pages/Subagents.tsx index 1fa7f07..6354ad3 100644 --- a/webui/src/pages/Subagents.tsx +++ b/webui/src/pages/Subagents.tsx @@ -227,6 +227,14 @@ function formatStreamTime(ts?: number): string { return new Date(ts).toLocaleTimeString([], { hour12: false }); } +function formatRuntimeTimestamp(value?: string): string { + const raw = `${value || ''}`.trim(); + if (!raw || raw === '0001-01-01T00:00:00Z') return '-'; + const ts = Date.parse(raw); + if (Number.isNaN(ts)) return raw; + return new Date(ts).toLocaleString(); +} + function summarizePreviewText(value?: string, limit = 180): string { const compact = `${value || ''}`.replace(/\s+/g, ' ').trim(); if (!compact) return '(empty)'; @@ -360,7 +368,7 @@ function GraphCard({ const Subagents: React.FC = () => { const { t } = useTranslation(); - const { q, nodeTrees, subagentRuntimeItems, subagentRegistryItems } = useAppContext(); + const { q, nodeTrees, nodeP2P, nodeDispatchItems, subagentRuntimeItems, subagentRegistryItems } = useAppContext(); const ui = useUI(); const [items, setItems] = useState([]); @@ -510,6 +518,26 @@ const Subagents: React.FC = () => { return acc; }, {}); }, [items]); + const p2pSessionByNode = useMemo(() => { + const out: Record = {}; + const sessions = Array.isArray(nodeP2P?.nodes) ? nodeP2P.nodes : []; + sessions.forEach((session: any) => { + const nodeID = normalizeTitle(session?.node, ''); + if (!nodeID) return; + out[nodeID] = session; + }); + return out; + }, [nodeP2P]); + const recentDispatchByNode = useMemo(() => { + const out: Record = {}; + const rows = Array.isArray(nodeDispatchItems) ? nodeDispatchItems : []; + rows.forEach((row: any) => { + const nodeID = normalizeTitle(row?.node, ''); + if (!nodeID || out[nodeID]) return; + out[nodeID] = row; + }); + return out; + }, [nodeDispatchItems]); const topologyGraph = useMemo(() => { const scale = topologyZoom; const originX = 56; @@ -645,6 +673,9 @@ const Subagents: React.FC = () => { remoteClusters.forEach((cluster, treeIndex) => { const { tree, root: treeRoot, children } = cluster; const branch = `node:${normalizeTitle(tree.node_id, `remote-${treeIndex}`)}`; + const nodeID = normalizeTitle(tree.node_id, ''); + const p2pSession = p2pSessionByNode[nodeID]; + const recentDispatch = recentDispatchByNode[nodeID]; const rootX = remoteOffsetX + Math.max(0, (cluster.width - cardWidth) / 2); if (!treeRoot) return; const rootCard: GraphCardSpec = { @@ -662,10 +693,15 @@ const Subagents: React.FC = () => { meta: [ `status=${tree.online ? t('online') : t('offline')}`, `transport=${normalizeTitle(treeRoot.transport, 'node')} type=${normalizeTitle(treeRoot.type, 'router')}`, + `p2p=${normalizeTitle(nodeP2P?.transport, 'disabled')} session=${normalizeTitle(p2pSession?.status, 'unknown')}`, + `last_transport=${normalizeTitle(recentDispatch?.used_transport, '-')}${recentDispatch?.fallback_from ? ` fallback=${normalizeTitle(recentDispatch?.fallback_from, '-')}` : ''}`, + `last_ready=${formatRuntimeTimestamp(p2pSession?.last_ready_at)}`, + `retry=${Number(p2pSession?.retry_count || 0)}`, + `${t('error')}=${normalizeTitle(p2pSession?.last_error, '-')}`, `source=${normalizeTitle(treeRoot.managed_by, tree.source || '-')}`, t('remoteTasksUnavailable'), ], - accent: tree.online ? 'bg-fuchsia-400' : 'bg-zinc-500', + accent: !tree.online ? 'bg-zinc-500' : normalizeTitle(p2pSession?.status, '').toLowerCase() === 'open' ? 'bg-emerald-400' : normalizeTitle(p2pSession?.status, '').toLowerCase() === 'connecting' ? 'bg-amber-400' : 'bg-fuchsia-400', clickable: true, scale, onClick: () => { @@ -695,10 +731,15 @@ const Subagents: React.FC = () => { subtitle: `${normalizeTitle(child.agent_id, '-')} · ${normalizeTitle(child.role, '-')}`, meta: [ `transport=${normalizeTitle(child.transport, 'node')} type=${normalizeTitle(child.type, 'worker')}`, + `p2p=${normalizeTitle(nodeP2P?.transport, 'disabled')} session=${normalizeTitle(p2pSession?.status, 'unknown')}`, + `last_transport=${normalizeTitle(recentDispatch?.used_transport, '-')}${recentDispatch?.fallback_from ? ` fallback=${normalizeTitle(recentDispatch?.fallback_from, '-')}` : ''}`, + `last_ready=${formatRuntimeTimestamp(p2pSession?.last_ready_at)}`, + `retry=${Number(p2pSession?.retry_count || 0)}`, + `${t('error')}=${normalizeTitle(p2pSession?.last_error, '-')}`, `source=${normalizeTitle(child.managed_by, 'remote_webui')}`, t('remoteTasksUnavailable'), ], - accent: 'bg-violet-400', + accent: normalizeTitle(p2pSession?.status, '').toLowerCase() === 'open' ? 'bg-emerald-400' : normalizeTitle(p2pSession?.status, '').toLowerCase() === 'connecting' ? 'bg-amber-400' : 'bg-violet-400', clickable: true, scale, onClick: () => { @@ -787,7 +828,7 @@ const Subagents: React.FC = () => { })); return { width, height, cards: decoratedCards, lines: decoratedLines }; - }, [parsedNodeTrees, registryItems, taskStats, recentTaskByAgent, selectedTopologyBranch, topologyFilter, t, topologyZoom, nodeOverrides]); + }, [parsedNodeTrees, registryItems, taskStats, recentTaskByAgent, selectedTopologyBranch, topologyFilter, t, topologyZoom, nodeOverrides, nodeP2P, p2pSessionByNode, recentDispatchByNode]); const fitView = () => { const viewport = topologyViewportRef.current;