From 94cd67b487d136cd38cf37cd77446939c2fa219f Mon Sep 17 00:00:00 2001 From: lpf Date: Mon, 9 Mar 2026 01:26:49 +0800 Subject: [PATCH] feat: preview node media artifacts in dashboard --- cmd/clawgo/cmd_node.go | 185 ++++++++++++++++++++++++++++++++++ cmd/clawgo/cmd_node_test.go | 96 ++++++++++++++++++ pkg/api/server_test.go | 7 +- pkg/tools/nodes_tool.go | 58 +++++++++++ webui/src/i18n/index.ts | 2 + webui/src/pages/Dashboard.tsx | 55 ++++++++++ 6 files changed, 402 insertions(+), 1 deletion(-) diff --git a/cmd/clawgo/cmd_node.go b/cmd/clawgo/cmd_node.go index 90ed4e1..0656916 100644 --- a/cmd/clawgo/cmd_node.go +++ b/cmd/clawgo/cmd_node.go @@ -81,7 +81,9 @@ var ( nodeAgentLoopFactory = agent.NewAgentLoop nodeLocalExecutorFactory = newNodeLocalExecutor nodeCameraSnapFunc = captureNodeCameraSnapshot + nodeCameraClipFunc = captureNodeCameraClip nodeScreenSnapFunc = captureNodeScreenSnapshot + nodeScreenRecordFunc = captureNodeScreenRecord ) const nodeArtifactInlineLimit = 512 * 1024 @@ -742,6 +744,26 @@ func executeNodeRequest(ctx context.Context, client *http.Client, info nodes.Nod resp.Code = "local_runtime_error" return resp } + case "camera_clip": + execResp, err := executeNodeCameraClip(ctx, info, next) + if err == nil { + return execResp + } + if strings.TrimSpace(opts.Endpoint) == "" { + resp.Error = err.Error() + resp.Code = "local_runtime_error" + return resp + } + case "screen_record": + execResp, err := executeNodeScreenRecord(ctx, info, next) + if err == nil { + return execResp + } + if strings.TrimSpace(opts.Endpoint) == "" { + resp.Error = err.Error() + resp.Code = "local_runtime_error" + return resp + } } if strings.TrimSpace(opts.Endpoint) == "" { resp.Error = "node endpoint not configured" @@ -894,6 +916,67 @@ func executeNodeScreenSnapshot(ctx context.Context, info nodes.NodeInfo, req nod }, nil } +func executeNodeCameraClip(ctx context.Context, info nodes.NodeInfo, req nodes.Request) (nodes.Response, error) { + executor, err := getNodeLocalExecutor() + if err != nil { + return nodes.Response{}, err + } + durationMs := durationArg(req.Args, "duration_ms", 3000) + outputPath, err := nodeCameraClipFunc(ctx, executor.workspace, req.Args) + if err != nil { + return nodes.Response{}, err + } + artifact, err := buildNodeArtifact(executor.workspace, outputPath) + if err != nil { + return nodes.Response{}, err + } + return nodes.Response{ + OK: true, + Code: "ok", + Node: info.ID, + Action: req.Action, + Payload: map[string]interface{}{ + "transport": "clawgo-local", + "media_type": "video", + "storage": artifact["storage"], + "duration_ms": durationMs, + "artifacts": []map[string]interface{}{artifact}, + "meta": map[string]interface{}{ + "facing": stringArg(req.Args, "facing"), + }, + }, + }, nil +} + +func executeNodeScreenRecord(ctx context.Context, info nodes.NodeInfo, req nodes.Request) (nodes.Response, error) { + executor, err := getNodeLocalExecutor() + if err != nil { + return nodes.Response{}, err + } + durationMs := durationArg(req.Args, "duration_ms", 3000) + outputPath, err := nodeScreenRecordFunc(ctx, executor.workspace, req.Args) + if err != nil { + return nodes.Response{}, err + } + artifact, err := buildNodeArtifact(executor.workspace, outputPath) + if err != nil { + return nodes.Response{}, err + } + return nodes.Response{ + OK: true, + Code: "ok", + Node: info.ID, + Action: req.Action, + Payload: map[string]interface{}{ + "transport": "clawgo-local", + "media_type": "video", + "storage": artifact["storage"], + "duration_ms": durationMs, + "artifacts": []map[string]interface{}{artifact}, + }, + }, nil +} + func getNodeLocalExecutor() (*nodeLocalExecutor, error) { key := strings.TrimSpace(getConfigPath()) if key == "" { @@ -1176,6 +1259,83 @@ func captureNodeScreenSnapshot(ctx context.Context, workspace string, args map[s } } +func captureNodeCameraClip(ctx context.Context, workspace string, args map[string]interface{}) (string, error) { + outputPath, err := nodeMediaOutputPath(workspace, "camera", ".mp4", stringArg(args, "filename")) + if err != nil { + return "", err + } + durationSec := fmt.Sprintf("%.3f", float64(durationArg(args, "duration_ms", 3000))/1000.0) + switch runtime.GOOS { + case "linux": + if _, err := os.Stat("/dev/video0"); err != nil { + return "", fmt.Errorf("camera device /dev/video0 not found") + } + if _, err := exec.LookPath("ffmpeg"); err != nil { + return "", fmt.Errorf("ffmpeg not installed") + } + cmd := exec.CommandContext(ctx, "ffmpeg", "-y", "-f", "video4linux2", "-t", durationSec, "-i", "/dev/video0", "-pix_fmt", "yuv420p", outputPath) + if out, err := cmd.CombinedOutput(); err != nil { + return "", fmt.Errorf("camera clip failed: %v, output=%s", err, strings.TrimSpace(string(out))) + } + return outputPath, nil + case "darwin": + if _, err := exec.LookPath("ffmpeg"); err != nil { + return "", fmt.Errorf("ffmpeg not installed") + } + cmd := exec.CommandContext(ctx, "ffmpeg", "-y", "-f", "avfoundation", "-t", durationSec, "-i", "0:none", "-pix_fmt", "yuv420p", outputPath) + if out, err := cmd.CombinedOutput(); err != nil { + return "", fmt.Errorf("camera clip failed: %v, output=%s", err, strings.TrimSpace(string(out))) + } + return outputPath, nil + default: + return "", fmt.Errorf("camera_clip not supported on %s", runtime.GOOS) + } +} + +func captureNodeScreenRecord(ctx context.Context, workspace string, args map[string]interface{}) (string, error) { + outputPath, err := nodeMediaOutputPath(workspace, "screen", ".mp4", stringArg(args, "filename")) + if err != nil { + return "", err + } + durationMs := durationArg(args, "duration_ms", 3000) + durationSec := fmt.Sprintf("%.3f", float64(durationMs)/1000.0) + durationWholeSec := strconv.Itoa((durationMs + 999) / 1000) + switch runtime.GOOS { + case "darwin": + if _, err := exec.LookPath("ffmpeg"); err == nil { + cmd := exec.CommandContext(ctx, "ffmpeg", "-y", "-f", "avfoundation", "-t", durationSec, "-i", "1:none", "-pix_fmt", "yuv420p", outputPath) + if out, err := cmd.CombinedOutput(); err == nil { + return outputPath, nil + } else if strings.TrimSpace(string(out)) != "" { + return "", fmt.Errorf("screen record failed: %v, output=%s", err, strings.TrimSpace(string(out))) + } + } + return "", fmt.Errorf("ffmpeg not installed") + case "linux": + candidates := [][]string{ + {"ffmpeg", "-y", "-f", "x11grab", "-t", durationSec, "-i", os.Getenv("DISPLAY"), "-pix_fmt", "yuv420p", outputPath}, + {"wf-recorder", "-f", outputPath, "-d", durationWholeSec}, + } + for _, candidate := range candidates { + if candidate[0] == "ffmpeg" && strings.TrimSpace(os.Getenv("DISPLAY")) == "" { + continue + } + if _, err := exec.LookPath(candidate[0]); err != nil { + continue + } + cmd := exec.CommandContext(ctx, candidate[0], candidate[1:]...) + if out, err := cmd.CombinedOutput(); err == nil { + return outputPath, nil + } else if strings.TrimSpace(string(out)) != "" && candidate[0] == "ffmpeg" { + continue + } + } + return "", fmt.Errorf("no supported screen recorder found (ffmpeg x11grab or wf-recorder)") + default: + return "", fmt.Errorf("screen_record not supported on %s", runtime.GOOS) + } +} + func nodeMediaOutputPath(workspace, kind, ext, requested string) (string, error) { root := strings.TrimSpace(workspace) if root == "" { @@ -1205,6 +1365,31 @@ func nodeMediaOutputPath(workspace, kind, ext, requested string) (string, error) return fullPath, nil } +func durationArg(args map[string]interface{}, key string, fallback int) int { + if len(args) == 0 { + return fallback + } + switch v := args[key].(type) { + case int: + if v > 0 { + return v + } + case int64: + if v > 0 { + return int(v) + } + case float64: + if v > 0 { + return int(v) + } + case json.Number: + if n, err := v.Int64(); err == nil && n > 0 { + return int(n) + } + } + return fallback +} + func structToWirePayload(v interface{}) map[string]interface{} { b, _ := json.Marshal(v) var out map[string]interface{} diff --git a/cmd/clawgo/cmd_node_test.go b/cmd/clawgo/cmd_node_test.go index af05b77..df9a2f0 100644 --- a/cmd/clawgo/cmd_node_test.go +++ b/cmd/clawgo/cmd_node_test.go @@ -386,3 +386,99 @@ func TestExecuteNodeRequestRunsLocalScreenSnapshot(t *testing.T) { t.Fatalf("unexpected artifact: %+v", artifacts[0]) } } + +func TestExecuteNodeRequestRunsLocalCameraClip(t *testing.T) { + prevCfg := globalConfigPathOverride + prevExecutors := nodeLocalExecutors + prevClip := nodeCameraClipFunc + globalConfigPathOverride = filepath.Join(t.TempDir(), "config.json") + nodeLocalExecutors = map[string]*nodeLocalExecutor{} + defer func() { + globalConfigPathOverride = prevCfg + nodeLocalExecutors = prevExecutors + nodeCameraClipFunc = prevClip + }() + + cfg := config.DefaultConfig() + cfg.Agents.Defaults.Workspace = filepath.Join(t.TempDir(), "workspace") + if err := config.SaveConfig(globalConfigPathOverride, cfg); err != nil { + t.Fatalf("save config: %v", err) + } + nodeCameraClipFunc = func(ctx context.Context, workspace string, args map[string]interface{}) (string, error) { + out := filepath.Join(workspace, "artifacts", "node", "camera-test.mp4") + if err := os.MkdirAll(filepath.Dir(out), 0755); err != nil { + return "", err + } + if err := os.WriteFile(out, []byte("video-bytes"), 0644); err != nil { + return "", err + } + return out, nil + } + + info := nodes.NodeInfo{ID: "edge-clip", Name: "Edge Clip"} + resp := executeNodeRequest(context.Background(), &http.Client{Timeout: time.Second}, info, nodeRegisterOptions{}, &nodes.Request{ + Action: "camera_clip", + Args: map[string]interface{}{"duration_ms": 2500}, + }) + if !resp.OK { + t.Fatalf("expected ok response, got %+v", resp) + } + if got, _ := resp.Payload["duration_ms"].(int); got != 2500 { + t.Fatalf("unexpected duration payload: %+v", resp.Payload) + } + artifacts, ok := resp.Payload["artifacts"].([]map[string]interface{}) + if !ok || len(artifacts) != 1 { + t.Fatalf("expected one artifact, got %+v", resp.Payload["artifacts"]) + } + if artifacts[0]["name"] != "camera-test.mp4" { + t.Fatalf("unexpected artifact: %+v", artifacts[0]) + } +} + +func TestExecuteNodeRequestRunsLocalScreenRecord(t *testing.T) { + prevCfg := globalConfigPathOverride + prevExecutors := nodeLocalExecutors + prevRecord := nodeScreenRecordFunc + globalConfigPathOverride = filepath.Join(t.TempDir(), "config.json") + nodeLocalExecutors = map[string]*nodeLocalExecutor{} + defer func() { + globalConfigPathOverride = prevCfg + nodeLocalExecutors = prevExecutors + nodeScreenRecordFunc = prevRecord + }() + + cfg := config.DefaultConfig() + cfg.Agents.Defaults.Workspace = filepath.Join(t.TempDir(), "workspace") + if err := config.SaveConfig(globalConfigPathOverride, cfg); err != nil { + t.Fatalf("save config: %v", err) + } + nodeScreenRecordFunc = func(ctx context.Context, workspace string, args map[string]interface{}) (string, error) { + out := filepath.Join(workspace, "artifacts", "node", "screen-test.mp4") + if err := os.MkdirAll(filepath.Dir(out), 0755); err != nil { + return "", err + } + if err := os.WriteFile(out, []byte("screen-video"), 0644); err != nil { + return "", err + } + return out, nil + } + + info := nodes.NodeInfo{ID: "edge-record", Name: "Edge Record"} + resp := executeNodeRequest(context.Background(), &http.Client{Timeout: time.Second}, info, nodeRegisterOptions{}, &nodes.Request{ + Action: "screen_record", + Args: map[string]interface{}{"duration_ms": 1800}, + }) + if !resp.OK { + t.Fatalf("expected ok response, got %+v", resp) + } + if got, _ := resp.Payload["duration_ms"].(int); got != 1800 { + t.Fatalf("unexpected duration payload: %+v", resp.Payload) + } + artifacts, ok := resp.Payload["artifacts"].([]map[string]interface{}) + if !ok || len(artifacts) != 1 { + t.Fatalf("expected one artifact, got %+v", resp.Payload["artifacts"]) + } + if artifacts[0]["name"] != "screen-test.mp4" { + t.Fatalf("unexpected artifact: %+v", artifacts[0]) + } +} diff --git a/pkg/api/server_test.go b/pkg/api/server_test.go index ee10df4..e01f80a 100644 --- a/pkg/api/server_test.go +++ b/pkg/api/server_test.go @@ -438,7 +438,7 @@ func TestHandleWebUINodesIncludesP2PSummary(t *testing.T) { if err := os.MkdirAll(filepath.Join(workspace, "memory"), 0755); err != nil { t.Fatalf("mkdir memory: %v", err) } - if err := os.WriteFile(filepath.Join(workspace, "memory", "nodes-dispatch-audit.jsonl"), []byte("{\"node\":\"edge-b\",\"used_transport\":\"webrtc\",\"fallback_from\":\"\",\"duration_ms\":12}\n"), 0644); err != nil { + if err := os.WriteFile(filepath.Join(workspace, "memory", "nodes-dispatch-audit.jsonl"), []byte("{\"node\":\"edge-b\",\"used_transport\":\"webrtc\",\"fallback_from\":\"\",\"duration_ms\":12,\"artifacts\":[{\"name\":\"snap.png\",\"kind\":\"image\",\"mime_type\":\"image/png\",\"storage\":\"inline\",\"content_base64\":\"iVBORw0KGgo=\"}]}\n"), 0644); err != nil { t.Fatalf("write audit: %v", err) } srv.SetNodeP2PStatusHandler(func() map[string]interface{} { @@ -467,4 +467,9 @@ func TestHandleWebUINodesIncludesP2PSummary(t *testing.T) { if len(dispatches) != 1 { t.Fatalf("expected dispatch audit rows, got %+v", body["dispatches"]) } + first, _ := dispatches[0].(map[string]interface{}) + artifacts, _ := first["artifacts"].([]interface{}) + if len(artifacts) != 1 { + t.Fatalf("expected artifact previews in dispatch row, got %+v", first) + } } diff --git a/pkg/tools/nodes_tool.go b/pkg/tools/nodes_tool.go index cdc277d..b1504ba 100644 --- a/pkg/tools/nodes_tool.go +++ b/pkg/tools/nodes_tool.go @@ -19,6 +19,8 @@ type NodesTool struct { auditPath string } +const nodeAuditArtifactPreviewLimit = 32768 + func NewNodesTool(m *nodes.Manager, r *nodes.Router, auditPath string) *NodesTool { return &NodesTool{manager: m, router: r, auditPath: strings.TrimSpace(auditPath)} } @@ -159,6 +161,9 @@ func (t *NodesTool) writeAudit(req nodes.Request, resp nodes.Response, mode stri if len(kinds) > 0 { row["artifact_kinds"] = kinds } + if previews := artifactAuditPreviews(resp.Payload["artifacts"]); len(previews) > 0 { + row["artifacts"] = previews + } } b, _ := json.Marshal(row) f, err := os.OpenFile(t.auditPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) @@ -194,3 +199,56 @@ func artifactAuditSummary(raw interface{}) (int, []string) { } return len(items), kinds } + +func artifactAuditPreviews(raw interface{}) []map[string]interface{} { + items, ok := raw.([]interface{}) + if !ok { + if typed, ok := raw.([]map[string]interface{}); ok { + items = make([]interface{}, 0, len(typed)) + for _, item := range typed { + items = append(items, item) + } + } + } + if len(items) == 0 { + return nil + } + out := make([]map[string]interface{}, 0, len(items)) + for _, item := range items { + row, ok := item.(map[string]interface{}) + if !ok || len(row) == 0 { + continue + } + entry := map[string]interface{}{} + for _, key := range []string{"name", "kind", "mime_type", "storage", "path", "url", "source_path"} { + if value, ok := row[key]; ok && strings.TrimSpace(fmt.Sprint(value)) != "" { + entry[key] = value + } + } + if size, ok := row["size_bytes"]; ok { + entry["size_bytes"] = size + } + if text, _ := row["content_text"].(string); strings.TrimSpace(text) != "" { + entry["content_text"] = trimAuditContent(text) + } + if b64, _ := row["content_base64"].(string); strings.TrimSpace(b64) != "" { + entry["content_base64"] = trimAuditContent(b64) + entry["content_base64_truncated"] = len(b64) > nodeAuditArtifactPreviewLimit + } + if truncated, ok := row["truncated"].(bool); ok && truncated { + entry["truncated"] = true + } + if len(entry) > 0 { + out = append(out, entry) + } + } + return out +} + +func trimAuditContent(raw string) string { + raw = strings.TrimSpace(raw) + if len(raw) <= nodeAuditArtifactPreviewLimit { + return raw + } + return raw[:nodeAuditArtifactPreviewLimit] +} diff --git a/webui/src/i18n/index.ts b/webui/src/i18n/index.ts index 55dfd4b..1245ed2 100644 --- a/webui/src/i18n/index.ts +++ b/webui/src/i18n/index.ts @@ -136,6 +136,7 @@ const resources = { dashboardNodeDispatchFallback: 'Fallback From', dashboardNodeDispatchDuration: 'Duration', dashboardNodeDispatchArtifacts: 'Artifacts', + dashboardNodeDispatchArtifactPreview: 'Artifact Preview', dashboardNodeDispatchError: 'Error', configNodeP2P: 'Node P2P', configNodeP2PHint: 'Configure websocket tunnel or WebRTC transport for remote nodes.', @@ -689,6 +690,7 @@ const resources = { dashboardNodeDispatchFallback: '回退来源', dashboardNodeDispatchDuration: '耗时', dashboardNodeDispatchArtifacts: '工件', + dashboardNodeDispatchArtifactPreview: '工件预览', dashboardNodeDispatchError: '错误', configNodeP2P: '节点 P2P', configNodeP2PHint: '为远端节点配置 websocket tunnel 或 WebRTC 传输。', diff --git a/webui/src/pages/Dashboard.tsx b/webui/src/pages/Dashboard.tsx index 2c92a83..7537d44 100644 --- a/webui/src/pages/Dashboard.tsx +++ b/webui/src/pages/Dashboard.tsx @@ -12,6 +12,21 @@ function formatRuntimeTime(value: unknown) { return new Date(ts).toLocaleString(); } +function dataUrlForArtifact(artifact: any) { + const mime = String(artifact?.mime_type || '').trim() || 'application/octet-stream'; + const content = String(artifact?.content_base64 || '').trim(); + if (!content) return ''; + return `data:${mime};base64,${content}`; +} + +function formatBytes(value: unknown) { + const size = Number(value || 0); + if (!Number.isFinite(size) || size <= 0) return '-'; + if (size < 1024) return `${size} B`; + if (size < 1024 * 1024) return `${(size / 1024).toFixed(1)} KB`; + return `${(size / (1024 * 1024)).toFixed(1)} MB`; +} + const Dashboard: React.FC = () => { const { t } = useTranslation(); const { @@ -81,6 +96,7 @@ const Dashboard: React.FC = () => { durationMs: Number(item?.duration_ms || 0), artifactCount: Number(item?.artifact_count || 0), artifactKinds: Array.isArray(item?.artifact_kinds) ? item.artifact_kinds.map((kind: any) => String(kind || '').trim()).filter(Boolean) : [], + artifacts: Array.isArray(item?.artifacts) ? item.artifacts : [], ok: Boolean(item?.ok), error: String(item?.error || '').trim(), })); @@ -302,6 +318,45 @@ const Dashboard: React.FC = () => { + {item.artifacts.length > 0 && ( +
+
{t('dashboardNodeDispatchArtifactPreview')}
+ {item.artifacts.slice(0, 2).map((artifact: any, artifactIndex: number) => { + const kind = String(artifact?.kind || '').trim().toLowerCase(); + const mime = String(artifact?.mime_type || '').trim().toLowerCase(); + const isImage = kind === 'image' || mime.startsWith('image/'); + const isVideo = kind === 'video' || mime.startsWith('video/'); + const dataUrl = dataUrlForArtifact(artifact); + return ( +
+
+
+
{String(artifact?.name || artifact?.source_path || `artifact-${artifactIndex + 1}`)}
+
+ {[artifact?.kind, artifact?.mime_type, formatBytes(artifact?.size_bytes)].filter(Boolean).join(' · ')} +
+
+
{String(artifact?.storage || '-')}
+
+ {isImage && dataUrl && ( + {String(artifact?.name + )} + {isVideo && dataUrl && ( +
+ ); + })} +
+ )} ))}