From 7ec4c87779c3e97757b6edef1cfce6f1a5f958ee Mon Sep 17 00:00:00 2001 From: DBT Date: Wed, 25 Feb 2026 01:53:19 +0000 Subject: [PATCH] add node dispatch latency audit and status metrics for child-task workflow --- README.md | 1 + README_EN.md | 1 + cmd/clawgo/cmd_status.go | 58 ++++++++++++++++++++++++++++++++++++++++ pkg/tools/nodes_tool.go | 27 ++++++++++--------- 4 files changed, 75 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index df67b60..74a21fa 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,7 @@ - 设备 `payload` 规范字段:`media_type` `storage` `url|path|image` `meta` - 支持 `agent_task`:主节点可向具备 `model` 能力的子节点下发任务,子节点返回执行结果 - 节点分发审计写入:`memory/nodes-dispatch-audit.jsonl` +- `/status` 展示节点分发统计(total/ok/fail/avg_ms/top_action) 实现位置: - `pkg/nodes/types.go` diff --git a/README_EN.md b/README_EN.md index 91901fa..fb5c3ab 100644 --- a/README_EN.md +++ b/README_EN.md @@ -52,6 +52,7 @@ A `nodes` tool control-plane PoC is now available: - device `payload` normalized fields: `media_type` `storage` `url|path|image` `meta` - supports `agent_task`: parent node can dispatch tasks to child nodes with `model` capability and receive execution results - node dispatch audit is persisted to `memory/nodes-dispatch-audit.jsonl` +- `/status` shows node dispatch stats (total/ok/fail/avg_ms/top_action) Implementation: - `pkg/nodes/types.go` diff --git a/cmd/clawgo/cmd_status.go b/cmd/clawgo/cmd_status.go index 5805074..f1bf756 100644 --- a/cmd/clawgo/cmd_status.go +++ b/cmd/clawgo/cmd_status.go @@ -176,6 +176,12 @@ func statusCmd() { } fmt.Printf("Nodes: total=%d online=%d\n", len(ns), online) fmt.Printf("Nodes Capabilities: run=%d model=%d camera=%d screen=%d location=%d canvas=%d\n", caps["run"], caps["model"], caps["camera"], caps["screen"], caps["location"], caps["canvas"]) + if total, okCnt, avgMs, actionTop, err := collectNodeDispatchStats(filepath.Join(workspace, "memory", "nodes-dispatch-audit.jsonl")); err == nil && total > 0 { + fmt.Printf("Nodes Dispatch: total=%d ok=%d fail=%d avg_ms=%d\n", total, okCnt, total-okCnt, avgMs) + if actionTop != "" { + fmt.Printf("Nodes Dispatch Top Action: %s\n", actionTop) + } + } } } } @@ -429,6 +435,58 @@ func collectAutonomyTaskSummary(path string) (map[string]int, map[string]int, ma return summary, priorities, reasons, nextRetry, totalDedupe, waitingLocks, len(lockKeySet), nil } +func collectNodeDispatchStats(path string) (int, int, int, string, error) { + data, err := os.ReadFile(path) + if err != nil { + return 0, 0, 0, "", err + } + lines := strings.Split(strings.TrimSpace(string(data)), "\n") + total, okCnt, msSum := 0, 0, 0 + actionCnt := map[string]int{} + for _, line := range lines { + line = strings.TrimSpace(line) + if line == "" { + continue + } + var row struct { + Action string `json:"action"` + OK bool `json:"ok"` + DurationMS int `json:"duration_ms"` + } + if err := json.Unmarshal([]byte(line), &row); err != nil { + continue + } + total++ + if row.OK { + okCnt++ + } + if row.DurationMS > 0 { + msSum += row.DurationMS + } + a := strings.TrimSpace(strings.ToLower(row.Action)) + if a == "" { + a = "unknown" + } + actionCnt[a]++ + } + avg := 0 + if total > 0 { + avg = msSum / total + } + topAction := "" + topN := 0 + for k, v := range actionCnt { + if v > topN { + topN = v + topAction = k + } + } + if topAction != "" { + topAction = fmt.Sprintf("%s(%d)", topAction, topN) + } + return total, okCnt, avg, topAction, nil +} + func collectSkillExecStats(path string) (int, int, int, float64, string, error) { data, err := os.ReadFile(path) if err != nil { diff --git a/pkg/tools/nodes_tool.go b/pkg/tools/nodes_tool.go index 6525c68..8aa0191 100644 --- a/pkg/tools/nodes_tool.go +++ b/pkg/tools/nodes_tool.go @@ -114,32 +114,35 @@ func (t *NodesTool) Execute(ctx context.Context, args map[string]interface{}) (s } } req := nodes.Request{Action: action, Node: nodeID, Task: strings.TrimSpace(task), Model: strings.TrimSpace(model), Args: reqArgs} + started := time.Now() resp, err := t.router.Dispatch(ctx, req, mode) + durationMs := int(time.Since(started).Milliseconds()) if err != nil { - t.writeAudit(req, nodes.Response{OK: false, Code: "transport_error", Error: err.Error(), Node: nodeID, Action: action}, mode) + t.writeAudit(req, nodes.Response{OK: false, Code: "transport_error", Error: err.Error(), Node: nodeID, Action: action}, mode, durationMs) return "", err } - t.writeAudit(req, resp, mode) + t.writeAudit(req, resp, mode, durationMs) b, _ := json.Marshal(resp) return string(b), nil } } -func (t *NodesTool) writeAudit(req nodes.Request, resp nodes.Response, mode string) { +func (t *NodesTool) writeAudit(req nodes.Request, resp nodes.Response, mode string, durationMs int) { if strings.TrimSpace(t.auditPath) == "" { return } _ = os.MkdirAll(filepath.Dir(t.auditPath), 0755) row := map[string]interface{}{ - "time": time.Now().UTC().Format(time.RFC3339), - "mode": strings.TrimSpace(mode), - "action": req.Action, - "node": req.Node, - "task": req.Task, - "model": req.Model, - "ok": resp.OK, - "code": resp.Code, - "error": resp.Error, + "time": time.Now().UTC().Format(time.RFC3339), + "mode": strings.TrimSpace(mode), + "action": req.Action, + "node": req.Node, + "task": req.Task, + "model": req.Model, + "ok": resp.OK, + "code": resp.Code, + "error": resp.Error, + "duration_ms": durationMs, } b, _ := json.Marshal(row) f, err := os.OpenFile(t.auditPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)