mirror of
https://github.com/YspCoder/clawgo.git
synced 2026-05-17 19:03:24 +08:00
add node dispatch latency audit and status metrics for child-task workflow
This commit is contained in:
@@ -52,6 +52,7 @@
|
|||||||
- 设备 `payload` 规范字段:`media_type` `storage` `url|path|image` `meta`
|
- 设备 `payload` 规范字段:`media_type` `storage` `url|path|image` `meta`
|
||||||
- 支持 `agent_task`:主节点可向具备 `model` 能力的子节点下发任务,子节点返回执行结果
|
- 支持 `agent_task`:主节点可向具备 `model` 能力的子节点下发任务,子节点返回执行结果
|
||||||
- 节点分发审计写入:`memory/nodes-dispatch-audit.jsonl`
|
- 节点分发审计写入:`memory/nodes-dispatch-audit.jsonl`
|
||||||
|
- `/status` 展示节点分发统计(total/ok/fail/avg_ms/top_action)
|
||||||
|
|
||||||
实现位置:
|
实现位置:
|
||||||
- `pkg/nodes/types.go`
|
- `pkg/nodes/types.go`
|
||||||
|
|||||||
@@ -52,6 +52,7 @@ A `nodes` tool control-plane PoC is now available:
|
|||||||
- device `payload` normalized fields: `media_type` `storage` `url|path|image` `meta`
|
- device `payload` normalized fields: `media_type` `storage` `url|path|image` `meta`
|
||||||
- supports `agent_task`: parent node can dispatch tasks to child nodes with `model` capability and receive execution results
|
- supports `agent_task`: parent node can dispatch tasks to child nodes with `model` capability and receive execution results
|
||||||
- node dispatch audit is persisted to `memory/nodes-dispatch-audit.jsonl`
|
- node dispatch audit is persisted to `memory/nodes-dispatch-audit.jsonl`
|
||||||
|
- `/status` shows node dispatch stats (total/ok/fail/avg_ms/top_action)
|
||||||
|
|
||||||
Implementation:
|
Implementation:
|
||||||
- `pkg/nodes/types.go`
|
- `pkg/nodes/types.go`
|
||||||
|
|||||||
@@ -176,6 +176,12 @@ func statusCmd() {
|
|||||||
}
|
}
|
||||||
fmt.Printf("Nodes: total=%d online=%d\n", len(ns), online)
|
fmt.Printf("Nodes: total=%d online=%d\n", len(ns), online)
|
||||||
fmt.Printf("Nodes Capabilities: run=%d model=%d camera=%d screen=%d location=%d canvas=%d\n", caps["run"], caps["model"], caps["camera"], caps["screen"], caps["location"], caps["canvas"])
|
fmt.Printf("Nodes Capabilities: run=%d model=%d camera=%d screen=%d location=%d canvas=%d\n", caps["run"], caps["model"], caps["camera"], caps["screen"], caps["location"], caps["canvas"])
|
||||||
|
if total, okCnt, avgMs, actionTop, err := collectNodeDispatchStats(filepath.Join(workspace, "memory", "nodes-dispatch-audit.jsonl")); err == nil && total > 0 {
|
||||||
|
fmt.Printf("Nodes Dispatch: total=%d ok=%d fail=%d avg_ms=%d\n", total, okCnt, total-okCnt, avgMs)
|
||||||
|
if actionTop != "" {
|
||||||
|
fmt.Printf("Nodes Dispatch Top Action: %s\n", actionTop)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -429,6 +435,58 @@ func collectAutonomyTaskSummary(path string) (map[string]int, map[string]int, ma
|
|||||||
return summary, priorities, reasons, nextRetry, totalDedupe, waitingLocks, len(lockKeySet), nil
|
return summary, priorities, reasons, nextRetry, totalDedupe, waitingLocks, len(lockKeySet), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func collectNodeDispatchStats(path string) (int, int, int, string, error) {
|
||||||
|
data, err := os.ReadFile(path)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, 0, "", err
|
||||||
|
}
|
||||||
|
lines := strings.Split(strings.TrimSpace(string(data)), "\n")
|
||||||
|
total, okCnt, msSum := 0, 0, 0
|
||||||
|
actionCnt := map[string]int{}
|
||||||
|
for _, line := range lines {
|
||||||
|
line = strings.TrimSpace(line)
|
||||||
|
if line == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var row struct {
|
||||||
|
Action string `json:"action"`
|
||||||
|
OK bool `json:"ok"`
|
||||||
|
DurationMS int `json:"duration_ms"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal([]byte(line), &row); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
total++
|
||||||
|
if row.OK {
|
||||||
|
okCnt++
|
||||||
|
}
|
||||||
|
if row.DurationMS > 0 {
|
||||||
|
msSum += row.DurationMS
|
||||||
|
}
|
||||||
|
a := strings.TrimSpace(strings.ToLower(row.Action))
|
||||||
|
if a == "" {
|
||||||
|
a = "unknown"
|
||||||
|
}
|
||||||
|
actionCnt[a]++
|
||||||
|
}
|
||||||
|
avg := 0
|
||||||
|
if total > 0 {
|
||||||
|
avg = msSum / total
|
||||||
|
}
|
||||||
|
topAction := ""
|
||||||
|
topN := 0
|
||||||
|
for k, v := range actionCnt {
|
||||||
|
if v > topN {
|
||||||
|
topN = v
|
||||||
|
topAction = k
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if topAction != "" {
|
||||||
|
topAction = fmt.Sprintf("%s(%d)", topAction, topN)
|
||||||
|
}
|
||||||
|
return total, okCnt, avg, topAction, nil
|
||||||
|
}
|
||||||
|
|
||||||
func collectSkillExecStats(path string) (int, int, int, float64, string, error) {
|
func collectSkillExecStats(path string) (int, int, int, float64, string, error) {
|
||||||
data, err := os.ReadFile(path)
|
data, err := os.ReadFile(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -114,32 +114,35 @@ func (t *NodesTool) Execute(ctx context.Context, args map[string]interface{}) (s
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
req := nodes.Request{Action: action, Node: nodeID, Task: strings.TrimSpace(task), Model: strings.TrimSpace(model), Args: reqArgs}
|
req := nodes.Request{Action: action, Node: nodeID, Task: strings.TrimSpace(task), Model: strings.TrimSpace(model), Args: reqArgs}
|
||||||
|
started := time.Now()
|
||||||
resp, err := t.router.Dispatch(ctx, req, mode)
|
resp, err := t.router.Dispatch(ctx, req, mode)
|
||||||
|
durationMs := int(time.Since(started).Milliseconds())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.writeAudit(req, nodes.Response{OK: false, Code: "transport_error", Error: err.Error(), Node: nodeID, Action: action}, mode)
|
t.writeAudit(req, nodes.Response{OK: false, Code: "transport_error", Error: err.Error(), Node: nodeID, Action: action}, mode, durationMs)
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
t.writeAudit(req, resp, mode)
|
t.writeAudit(req, resp, mode, durationMs)
|
||||||
b, _ := json.Marshal(resp)
|
b, _ := json.Marshal(resp)
|
||||||
return string(b), nil
|
return string(b), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *NodesTool) writeAudit(req nodes.Request, resp nodes.Response, mode string) {
|
func (t *NodesTool) writeAudit(req nodes.Request, resp nodes.Response, mode string, durationMs int) {
|
||||||
if strings.TrimSpace(t.auditPath) == "" {
|
if strings.TrimSpace(t.auditPath) == "" {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
_ = os.MkdirAll(filepath.Dir(t.auditPath), 0755)
|
_ = os.MkdirAll(filepath.Dir(t.auditPath), 0755)
|
||||||
row := map[string]interface{}{
|
row := map[string]interface{}{
|
||||||
"time": time.Now().UTC().Format(time.RFC3339),
|
"time": time.Now().UTC().Format(time.RFC3339),
|
||||||
"mode": strings.TrimSpace(mode),
|
"mode": strings.TrimSpace(mode),
|
||||||
"action": req.Action,
|
"action": req.Action,
|
||||||
"node": req.Node,
|
"node": req.Node,
|
||||||
"task": req.Task,
|
"task": req.Task,
|
||||||
"model": req.Model,
|
"model": req.Model,
|
||||||
"ok": resp.OK,
|
"ok": resp.OK,
|
||||||
"code": resp.Code,
|
"code": resp.Code,
|
||||||
"error": resp.Error,
|
"error": resp.Error,
|
||||||
|
"duration_ms": durationMs,
|
||||||
}
|
}
|
||||||
b, _ := json.Marshal(row)
|
b, _ := json.Marshal(row)
|
||||||
f, err := os.OpenFile(t.auditPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
|
f, err := os.OpenFile(t.auditPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
|
||||||
|
|||||||
Reference in New Issue
Block a user