diff --git a/cmd/clawgo/cmd_node.go b/cmd/clawgo/cmd_node.go index 9a56e6d..90ed4e1 100644 --- a/cmd/clawgo/cmd_node.go +++ b/cmd/clawgo/cmd_node.go @@ -3,19 +3,30 @@ package main import ( "bytes" "context" + "encoding/base64" "encoding/json" "fmt" + "io/fs" "net/http" "os" + "os/exec" "os/signal" + "path/filepath" "runtime" + "sort" "strconv" "strings" "sync" "time" + "clawgo/pkg/agent" + "clawgo/pkg/bus" "clawgo/pkg/config" + "clawgo/pkg/cron" "clawgo/pkg/nodes" + "clawgo/pkg/providers" + "clawgo/pkg/runtimecfg" + "clawgo/pkg/tools" "github.com/gorilla/websocket" "github.com/pion/webrtc/v4" ) @@ -32,6 +43,7 @@ type nodeRegisterOptions struct { Version string Actions []string Models []string + Agents []nodes.AgentInfo Capabilities nodes.Capabilities Watch bool HeartbeatSec int @@ -54,6 +66,26 @@ type nodeWebRTCSession struct { dc *webrtc.DataChannel } +type nodeLocalExecutor struct { + configPath string + workspace string + once sync.Once + loop *agent.AgentLoop + err error +} + +var ( + nodeLocalExecutorMu sync.Mutex + nodeLocalExecutors = map[string]*nodeLocalExecutor{} + nodeProviderFactory = providers.CreateProvider + nodeAgentLoopFactory = agent.NewAgentLoop + nodeLocalExecutorFactory = newNodeLocalExecutor + nodeCameraSnapFunc = captureNodeCameraSnapshot + nodeScreenSnapFunc = captureNodeScreenSnapshot +) + +const nodeArtifactInlineLimit = 512 * 1024 + func nodeCmd() { args := os.Args[2:] if len(args) == 0 { @@ -159,6 +191,7 @@ func parseNodeRegisterArgs(args []string, cfg *config.Config) (nodeRegisterOptio HeartbeatSec: 30, Capabilities: capabilitiesFromCSV("run,invoke,model"), } + opts.Agents = nodeAgentsFromConfig(cfg) for i := 0; i < len(args); i++ { arg := strings.TrimSpace(args[i]) next := func() (string, error) { @@ -332,9 +365,33 @@ func buildNodeInfo(opts nodeRegisterOptions) nodes.NodeInfo { Capabilities: opts.Capabilities, Actions: append([]string(nil), opts.Actions...), Models: append([]string(nil), opts.Models...), + Agents: append([]nodes.AgentInfo(nil), opts.Agents...), } } +func nodeAgentsFromConfig(cfg *config.Config) []nodes.AgentInfo { + if cfg == nil { + return nil + } + items := make([]nodes.AgentInfo, 0, len(cfg.Agents.Subagents)) + for agentID, subcfg := range cfg.Agents.Subagents { + id := strings.TrimSpace(agentID) + if id == "" || !subcfg.Enabled { + continue + } + items = append(items, nodes.AgentInfo{ + ID: id, + DisplayName: strings.TrimSpace(subcfg.DisplayName), + Role: strings.TrimSpace(subcfg.Role), + Type: strings.TrimSpace(subcfg.Type), + Transport: strings.TrimSpace(subcfg.Transport), + ParentAgentID: strings.TrimSpace(subcfg.ParentAgentID), + }) + } + sort.Slice(items, func(i, j int) bool { return items[i].ID < items[j].ID }) + return items +} + func runNodeHeartbeatLoop(client *http.Client, opts nodeRegisterOptions, info nodes.NodeInfo) error { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt) defer stop() @@ -654,6 +711,38 @@ func executeNodeRequest(ctx context.Context, client *http.Client, info nodes.Nod } next := *req resp.Action = next.Action + switch strings.ToLower(strings.TrimSpace(next.Action)) { + case "agent_task": + execResp, err := executeNodeAgentTask(ctx, info, next) + if err == nil { + return execResp + } + if strings.TrimSpace(opts.Endpoint) == "" { + resp.Error = err.Error() + resp.Code = "local_runtime_error" + return resp + } + case "camera_snap": + execResp, err := executeNodeCameraSnap(ctx, info, next) + if err == nil { + return execResp + } + if strings.TrimSpace(opts.Endpoint) == "" { + resp.Error = err.Error() + resp.Code = "local_runtime_error" + return resp + } + case "screen_snapshot": + execResp, err := executeNodeScreenSnapshot(ctx, info, next) + if err == nil { + return execResp + } + if strings.TrimSpace(opts.Endpoint) == "" { + resp.Error = err.Error() + resp.Code = "local_runtime_error" + return resp + } + } if strings.TrimSpace(opts.Endpoint) == "" { resp.Error = "node endpoint not configured" resp.Code = "endpoint_missing" @@ -678,6 +767,444 @@ func executeNodeRequest(ctx context.Context, client *http.Client, info nodes.Nod return execResp } +func executeNodeAgentTask(ctx context.Context, info nodes.NodeInfo, req nodes.Request) (nodes.Response, error) { + executor, err := getNodeLocalExecutor() + if err != nil { + return nodes.Response{}, err + } + loop, err := executor.Loop() + if err != nil { + return nodes.Response{}, err + } + + remoteAgentID := strings.TrimSpace(stringArg(req.Args, "remote_agent_id")) + if remoteAgentID == "" || strings.EqualFold(remoteAgentID, "main") { + sessionKey := fmt.Sprintf("node:%s:main", info.ID) + result, err := loop.ProcessDirectWithOptions(ctx, strings.TrimSpace(req.Task), sessionKey, "node", info.ID, "main", nil) + if err != nil { + return nodes.Response{}, err + } + artifacts, err := collectNodeArtifacts(executor.workspace, req.Args) + if err != nil { + return nodes.Response{}, err + } + return nodes.Response{ + OK: true, + Code: "ok", + Node: info.ID, + Action: req.Action, + Payload: map[string]interface{}{ + "transport": "clawgo-local", + "agent_id": "main", + "result": strings.TrimSpace(result), + "artifacts": artifacts, + }, + }, nil + } + + out, err := loop.HandleSubagentRuntime(ctx, "dispatch_and_wait", map[string]interface{}{ + "task": strings.TrimSpace(req.Task), + "agent_id": remoteAgentID, + "channel": "node", + "chat_id": info.ID, + "wait_timeout_sec": float64(120), + }) + if err != nil { + return nodes.Response{}, err + } + payload, _ := out.(map[string]interface{}) + result := strings.TrimSpace(fmt.Sprint(payload["merged"])) + if result == "" { + if reply, ok := payload["reply"].(*tools.RouterReply); ok { + result = strings.TrimSpace(reply.Result) + } + } + artifacts, err := collectNodeArtifacts(executor.workspace, req.Args) + if err != nil { + return nodes.Response{}, err + } + return nodes.Response{ + OK: true, + Code: "ok", + Node: info.ID, + Action: req.Action, + Payload: map[string]interface{}{ + "transport": "clawgo-local", + "agent_id": remoteAgentID, + "result": result, + "artifacts": artifacts, + }, + }, nil +} + +func executeNodeCameraSnap(ctx context.Context, info nodes.NodeInfo, req nodes.Request) (nodes.Response, error) { + executor, err := getNodeLocalExecutor() + if err != nil { + return nodes.Response{}, err + } + outputPath, err := nodeCameraSnapFunc(ctx, executor.workspace, req.Args) + if err != nil { + return nodes.Response{}, err + } + artifact, err := buildNodeArtifact(executor.workspace, outputPath) + if err != nil { + return nodes.Response{}, err + } + return nodes.Response{ + OK: true, + Code: "ok", + Node: info.ID, + Action: req.Action, + Payload: map[string]interface{}{ + "transport": "clawgo-local", + "media_type": "image", + "storage": artifact["storage"], + "artifacts": []map[string]interface{}{artifact}, + "meta": map[string]interface{}{ + "facing": stringArg(req.Args, "facing"), + }, + }, + }, nil +} + +func executeNodeScreenSnapshot(ctx context.Context, info nodes.NodeInfo, req nodes.Request) (nodes.Response, error) { + executor, err := getNodeLocalExecutor() + if err != nil { + return nodes.Response{}, err + } + outputPath, err := nodeScreenSnapFunc(ctx, executor.workspace, req.Args) + if err != nil { + return nodes.Response{}, err + } + artifact, err := buildNodeArtifact(executor.workspace, outputPath) + if err != nil { + return nodes.Response{}, err + } + return nodes.Response{ + OK: true, + Code: "ok", + Node: info.ID, + Action: req.Action, + Payload: map[string]interface{}{ + "transport": "clawgo-local", + "media_type": "image", + "storage": artifact["storage"], + "artifacts": []map[string]interface{}{artifact}, + }, + }, nil +} + +func getNodeLocalExecutor() (*nodeLocalExecutor, error) { + key := strings.TrimSpace(getConfigPath()) + if key == "" { + return nil, fmt.Errorf("config path is required") + } + nodeLocalExecutorMu.Lock() + defer nodeLocalExecutorMu.Unlock() + if existing := nodeLocalExecutors[key]; existing != nil { + return existing, nil + } + exec, err := nodeLocalExecutorFactory(key) + if err != nil { + return nil, err + } + nodeLocalExecutors[key] = exec + return exec, nil +} + +func newNodeLocalExecutor(configPath string) (*nodeLocalExecutor, error) { + configPath = strings.TrimSpace(configPath) + if configPath == "" { + return nil, fmt.Errorf("config path is required") + } + return &nodeLocalExecutor{configPath: configPath}, nil +} + +func (e *nodeLocalExecutor) Loop() (*agent.AgentLoop, error) { + if e == nil { + return nil, fmt.Errorf("node local executor is nil") + } + e.once.Do(func() { + prev := globalConfigPathOverride + globalConfigPathOverride = e.configPath + defer func() { globalConfigPathOverride = prev }() + + cfg, err := loadConfig() + if err != nil { + e.err = err + return + } + runtimecfg.Set(cfg) + msgBus := bus.NewMessageBus() + cronStorePath := filepath.Join(filepath.Dir(e.configPath), "cron", "jobs.json") + cronService := cron.NewCronService(cronStorePath, nil) + configureCronServiceRuntime(cronService, cfg) + provider, err := nodeProviderFactory(cfg) + if err != nil { + e.err = err + return + } + e.workspace = cfg.WorkspacePath() + loop := nodeAgentLoopFactory(cfg, msgBus, provider, cronService) + loop.SetConfigPath(e.configPath) + e.loop = loop + }) + if e.err != nil { + return nil, e.err + } + if e.loop == nil { + return nil, fmt.Errorf("node local executor unavailable") + } + return e.loop, nil +} + +func stringArg(args map[string]interface{}, key string) string { + if len(args) == 0 { + return "" + } + value, ok := args[key] + if !ok || value == nil { + return "" + } + return strings.TrimSpace(fmt.Sprint(value)) +} + +func collectNodeArtifacts(workspace string, args map[string]interface{}) ([]map[string]interface{}, error) { + paths := stringListArg(args, "artifact_paths") + if len(paths) == 0 { + return []map[string]interface{}{}, nil + } + root := strings.TrimSpace(workspace) + if root == "" { + return nil, fmt.Errorf("workspace path not configured") + } + out := make([]map[string]interface{}, 0, len(paths)) + for _, raw := range paths { + artifact, err := buildNodeArtifact(root, raw) + if err != nil { + return nil, err + } + out = append(out, artifact) + } + return out, nil +} + +func buildNodeArtifact(workspace, rawPath string) (map[string]interface{}, error) { + rawPath = strings.TrimSpace(rawPath) + if rawPath == "" { + return nil, fmt.Errorf("artifact path is required") + } + clean := filepath.Clean(rawPath) + fullPath := clean + if !filepath.IsAbs(clean) { + fullPath = filepath.Join(workspace, clean) + } + fullPath = filepath.Clean(fullPath) + rel, err := filepath.Rel(workspace, fullPath) + if err != nil || rel == ".." || strings.HasPrefix(rel, ".."+string(os.PathSeparator)) { + return nil, fmt.Errorf("artifact path escapes workspace: %s", rawPath) + } + info, err := os.Stat(fullPath) + if err != nil { + return nil, err + } + if info.IsDir() { + return nil, fmt.Errorf("artifact path must be file: %s", rawPath) + } + artifact := map[string]interface{}{ + "name": filepath.Base(fullPath), + "kind": nodeArtifactKindFromPath(fullPath), + "source_path": filepath.ToSlash(rel), + "size_bytes": info.Size(), + } + if mimeType := mimeTypeForPath(fullPath); mimeType != "" { + artifact["mime_type"] = mimeType + } + data, err := os.ReadFile(fullPath) + if err != nil { + return nil, err + } + if shouldInlineAsText(fullPath, data, info.Mode()) { + artifact["storage"] = "inline" + artifact["content_text"] = string(data) + return artifact, nil + } + artifact["storage"] = "inline" + if len(data) > nodeArtifactInlineLimit { + data = data[:nodeArtifactInlineLimit] + artifact["truncated"] = true + } + artifact["content_base64"] = base64.StdEncoding.EncodeToString(data) + return artifact, nil +} + +func stringListArg(args map[string]interface{}, key string) []string { + if len(args) == 0 { + return nil + } + items, ok := args[key].([]interface{}) + if !ok { + return nil + } + out := make([]string, 0, len(items)) + for _, item := range items { + value := strings.TrimSpace(fmt.Sprint(item)) + if value == "" { + continue + } + out = append(out, value) + } + return out +} + +func mimeTypeForPath(path string) string { + switch strings.ToLower(filepath.Ext(path)) { + case ".md": + return "text/markdown" + case ".txt", ".log", ".json", ".yaml", ".yml", ".xml", ".csv": + return "text/plain" + case ".png": + return "image/png" + case ".jpg", ".jpeg": + return "image/jpeg" + case ".gif": + return "image/gif" + case ".webp": + return "image/webp" + case ".mp4": + return "video/mp4" + case ".mov": + return "video/quicktime" + case ".pdf": + return "application/pdf" + default: + return "" + } +} + +func nodeArtifactKindFromPath(path string) string { + ext := strings.ToLower(filepath.Ext(path)) + switch ext { + case ".png", ".jpg", ".jpeg", ".gif", ".webp": + return "image" + case ".mp4", ".mov", ".webm": + return "video" + case ".pdf": + return "document" + default: + return "file" + } +} + +func shouldInlineAsText(path string, data []byte, mode fs.FileMode) bool { + if mode&fs.ModeType != 0 { + return false + } + switch strings.ToLower(filepath.Ext(path)) { + case ".md", ".txt", ".log", ".json", ".yaml", ".yml", ".xml", ".csv", ".go", ".ts", ".tsx", ".js", ".jsx", ".css", ".html", ".sh": + return len(data) <= nodeArtifactInlineLimit + default: + return false + } +} + +func captureNodeCameraSnapshot(ctx context.Context, workspace string, args map[string]interface{}) (string, error) { + outputPath, err := nodeMediaOutputPath(workspace, "camera", ".jpg", stringArg(args, "filename")) + if err != nil { + return "", err + } + switch runtime.GOOS { + case "linux": + if _, err := os.Stat("/dev/video0"); err != nil { + return "", fmt.Errorf("camera device /dev/video0 not found") + } + if _, err := exec.LookPath("ffmpeg"); err != nil { + return "", fmt.Errorf("ffmpeg not installed") + } + cmd := exec.CommandContext(ctx, "ffmpeg", "-y", "-f", "video4linux2", "-i", "/dev/video0", "-vframes", "1", "-q:v", "2", outputPath) + if out, err := cmd.CombinedOutput(); err != nil { + return "", fmt.Errorf("camera capture failed: %v, output=%s", err, strings.TrimSpace(string(out))) + } + return outputPath, nil + case "darwin": + if _, err := exec.LookPath("imagesnap"); err != nil { + return "", fmt.Errorf("imagesnap not installed") + } + cmd := exec.CommandContext(ctx, "imagesnap", "-q", outputPath) + if out, err := cmd.CombinedOutput(); err != nil { + return "", fmt.Errorf("camera capture failed: %v, output=%s", err, strings.TrimSpace(string(out))) + } + return outputPath, nil + default: + return "", fmt.Errorf("camera_snap not supported on %s", runtime.GOOS) + } +} + +func captureNodeScreenSnapshot(ctx context.Context, workspace string, args map[string]interface{}) (string, error) { + outputPath, err := nodeMediaOutputPath(workspace, "screen", ".png", stringArg(args, "filename")) + if err != nil { + return "", err + } + switch runtime.GOOS { + case "darwin": + cmd := exec.CommandContext(ctx, "screencapture", "-x", outputPath) + if out, err := cmd.CombinedOutput(); err != nil { + return "", fmt.Errorf("screen capture failed: %v, output=%s", err, strings.TrimSpace(string(out))) + } + return outputPath, nil + case "linux": + candidates := [][]string{ + {"grim", outputPath}, + {"gnome-screenshot", "-f", outputPath}, + {"scrot", outputPath}, + {"import", "-window", "root", outputPath}, + } + for _, candidate := range candidates { + if _, err := exec.LookPath(candidate[0]); err != nil { + continue + } + cmd := exec.CommandContext(ctx, candidate[0], candidate[1:]...) + if out, err := cmd.CombinedOutput(); err == nil { + return outputPath, nil + } else if strings.TrimSpace(string(out)) != "" { + continue + } + } + return "", fmt.Errorf("no supported screen capture command found (grim, gnome-screenshot, scrot, import)") + default: + return "", fmt.Errorf("screen_snapshot not supported on %s", runtime.GOOS) + } +} + +func nodeMediaOutputPath(workspace, kind, ext, requested string) (string, error) { + root := strings.TrimSpace(workspace) + if root == "" { + return "", fmt.Errorf("workspace path not configured") + } + baseDir := filepath.Join(root, "artifacts", "node") + if err := os.MkdirAll(baseDir, 0755); err != nil { + return "", err + } + filename := strings.TrimSpace(requested) + if filename == "" { + filename = fmt.Sprintf("%s_%d%s", kind, time.Now().UnixNano(), ext) + } + filename = filepath.Clean(filename) + if filepath.IsAbs(filename) { + return "", fmt.Errorf("filename must be relative to workspace") + } + fullPath := filepath.Join(baseDir, filename) + fullPath = filepath.Clean(fullPath) + rel, err := filepath.Rel(root, fullPath) + if err != nil || rel == ".." || strings.HasPrefix(rel, ".."+string(os.PathSeparator)) { + return "", fmt.Errorf("capture path escapes workspace") + } + if err := os.MkdirAll(filepath.Dir(fullPath), 0755); err != nil { + return "", err + } + return fullPath, nil +} + func structToWirePayload(v interface{}) map[string]interface{} { b, _ := json.Marshal(v) var out map[string]interface{} diff --git a/cmd/clawgo/cmd_node_test.go b/cmd/clawgo/cmd_node_test.go index 6db0a2c..af05b77 100644 --- a/cmd/clawgo/cmd_node_test.go +++ b/cmd/clawgo/cmd_node_test.go @@ -5,14 +5,30 @@ import ( "encoding/json" "net/http" "net/http/httptest" + "os" + "path/filepath" "strings" "testing" "time" + "clawgo/pkg/agent" "clawgo/pkg/config" "clawgo/pkg/nodes" + "clawgo/pkg/providers" ) +type stubNodeProvider struct { + content string +} + +func (p stubNodeProvider) Chat(ctx context.Context, messages []providers.Message, tools []providers.ToolDefinition, model string, options map[string]interface{}) (*providers.LLMResponse, error) { + return &providers.LLMResponse{Content: p.content, FinishReason: "stop"}, nil +} + +func (p stubNodeProvider) GetDefaultModel() string { + return "stub-model" +} + func TestParseNodeRegisterArgsDefaults(t *testing.T) { t.Parallel() @@ -106,6 +122,37 @@ func TestPostNodeHeartbeatSendsNodeID(t *testing.T) { } } +func TestNodeAgentsFromConfigCollectsEnabledAgents(t *testing.T) { + t.Parallel() + + cfg := config.DefaultConfig() + cfg.Agents.Subagents["main"] = config.SubagentConfig{ + Enabled: true, + Type: "router", + DisplayName: "Main Agent", + Role: "orchestrator", + } + cfg.Agents.Subagents["coder"] = config.SubagentConfig{ + Enabled: true, + Type: "worker", + DisplayName: "Code Agent", + Role: "code", + } + cfg.Agents.Subagents["tester"] = config.SubagentConfig{ + Enabled: false, + Type: "worker", + DisplayName: "Test Agent", + Role: "test", + } + items := nodeAgentsFromConfig(cfg) + if len(items) != 2 { + t.Fatalf("expected 2 enabled agents, got %+v", items) + } + if items[0].ID != "coder" || items[1].ID != "main" { + t.Fatalf("unexpected agent export order: %+v", items) + } +} + func TestNodeWebsocketURL(t *testing.T) { t.Parallel() @@ -130,3 +177,212 @@ func TestNodeSocketPingInterval(t *testing.T) { t.Fatalf("expected half heartbeat, got %s", got) } } + +func TestExecuteNodeRequestRunsLocalMainAgentTask(t *testing.T) { + prevCfg := globalConfigPathOverride + prevProviderFactory := nodeProviderFactory + prevLoopFactory := nodeAgentLoopFactory + prevExecutors := nodeLocalExecutors + globalConfigPathOverride = filepath.Join(t.TempDir(), "config.json") + nodeLocalExecutors = map[string]*nodeLocalExecutor{} + nodeProviderFactory = func(cfg *config.Config) (providers.LLMProvider, error) { + return stubNodeProvider{content: "main-local-ok"}, nil + } + nodeAgentLoopFactory = agent.NewAgentLoop + defer func() { + globalConfigPathOverride = prevCfg + nodeProviderFactory = prevProviderFactory + nodeAgentLoopFactory = prevLoopFactory + nodeLocalExecutors = prevExecutors + }() + + cfg := config.DefaultConfig() + cfg.Agents.Defaults.Workspace = filepath.Join(t.TempDir(), "workspace") + cfg.Agents.Subagents["main"] = config.SubagentConfig{ + Enabled: true, + Type: "router", + Role: "orchestrator", + } + if err := config.SaveConfig(globalConfigPathOverride, cfg); err != nil { + t.Fatalf("save config: %v", err) + } + + info := nodes.NodeInfo{ID: "edge-a", Name: "Edge A"} + resp := executeNodeRequest(context.Background(), &http.Client{Timeout: time.Second}, info, nodeRegisterOptions{}, &nodes.Request{ + Action: "agent_task", + Task: "say ok", + }) + if !resp.OK { + t.Fatalf("expected ok response, got %+v", resp) + } + if got := strings.TrimSpace(resp.Payload["result"].(string)); got != "main-local-ok" { + t.Fatalf("unexpected result: %+v", resp.Payload) + } + if got := strings.TrimSpace(resp.Payload["agent_id"].(string)); got != "main" { + t.Fatalf("unexpected agent id: %+v", resp.Payload) + } +} + +func TestExecuteNodeRequestRunsLocalSubagentTask(t *testing.T) { + prevCfg := globalConfigPathOverride + prevProviderFactory := nodeProviderFactory + prevLoopFactory := nodeAgentLoopFactory + prevExecutors := nodeLocalExecutors + globalConfigPathOverride = filepath.Join(t.TempDir(), "config.json") + nodeLocalExecutors = map[string]*nodeLocalExecutor{} + nodeProviderFactory = func(cfg *config.Config) (providers.LLMProvider, error) { + return stubNodeProvider{content: "coder-local-ok"}, nil + } + nodeAgentLoopFactory = agent.NewAgentLoop + defer func() { + globalConfigPathOverride = prevCfg + nodeProviderFactory = prevProviderFactory + nodeAgentLoopFactory = prevLoopFactory + nodeLocalExecutors = prevExecutors + }() + + cfg := config.DefaultConfig() + cfg.Agents.Defaults.Workspace = filepath.Join(t.TempDir(), "workspace") + cfg.Agents.Subagents["main"] = config.SubagentConfig{ + Enabled: true, + Type: "router", + Role: "orchestrator", + } + cfg.Agents.Subagents["coder"] = config.SubagentConfig{ + Enabled: true, + Type: "worker", + Role: "code", + } + if err := os.MkdirAll(filepath.Join(cfg.Agents.Defaults.Workspace, "out"), 0755); err != nil { + t.Fatalf("mkdir artifact dir: %v", err) + } + if err := os.WriteFile(filepath.Join(cfg.Agents.Defaults.Workspace, "out", "result.txt"), []byte("artifact-body"), 0644); err != nil { + t.Fatalf("write artifact: %v", err) + } + if err := config.SaveConfig(globalConfigPathOverride, cfg); err != nil { + t.Fatalf("save config: %v", err) + } + + info := nodes.NodeInfo{ID: "edge-b", Name: "Edge B"} + resp := executeNodeRequest(context.Background(), &http.Client{Timeout: time.Second}, info, nodeRegisterOptions{}, &nodes.Request{ + Action: "agent_task", + Task: "write tests", + Args: map[string]interface{}{"remote_agent_id": "coder", "artifact_paths": []interface{}{"out/result.txt"}}, + }) + if !resp.OK { + t.Fatalf("expected ok response, got %+v", resp) + } + if got := strings.TrimSpace(resp.Payload["result"].(string)); !strings.Contains(got, "coder-local-ok") { + t.Fatalf("unexpected result: %+v", resp.Payload) + } + if got := strings.TrimSpace(resp.Payload["agent_id"].(string)); got != "coder" { + t.Fatalf("unexpected agent id: %+v", resp.Payload) + } + artifacts, ok := resp.Payload["artifacts"].([]map[string]interface{}) + if !ok || len(artifacts) != 1 { + t.Fatalf("expected one artifact, got %+v", resp.Payload["artifacts"]) + } + if artifacts[0]["content_text"] != "artifact-body" { + t.Fatalf("unexpected artifact payload: %+v", artifacts[0]) + } +} + +func TestCollectNodeArtifactsRejectsPathEscape(t *testing.T) { + t.Parallel() + + _, err := collectNodeArtifacts(t.TempDir(), map[string]interface{}{ + "artifact_paths": []interface{}{"../secret.txt"}, + }) + if err == nil || !strings.Contains(err.Error(), "escapes workspace") { + t.Fatalf("expected workspace escape error, got %v", err) + } +} + +func TestExecuteNodeRequestRunsLocalCameraSnap(t *testing.T) { + prevCfg := globalConfigPathOverride + prevExecutors := nodeLocalExecutors + prevCamera := nodeCameraSnapFunc + globalConfigPathOverride = filepath.Join(t.TempDir(), "config.json") + nodeLocalExecutors = map[string]*nodeLocalExecutor{} + defer func() { + globalConfigPathOverride = prevCfg + nodeLocalExecutors = prevExecutors + nodeCameraSnapFunc = prevCamera + }() + + cfg := config.DefaultConfig() + cfg.Agents.Defaults.Workspace = filepath.Join(t.TempDir(), "workspace") + if err := config.SaveConfig(globalConfigPathOverride, cfg); err != nil { + t.Fatalf("save config: %v", err) + } + nodeCameraSnapFunc = func(ctx context.Context, workspace string, args map[string]interface{}) (string, error) { + out := filepath.Join(workspace, "artifacts", "node", "camera-test.jpg") + if err := os.MkdirAll(filepath.Dir(out), 0755); err != nil { + return "", err + } + if err := os.WriteFile(out, []byte("camera-bytes"), 0644); err != nil { + return "", err + } + return out, nil + } + + info := nodes.NodeInfo{ID: "edge-cam", Name: "Edge Cam"} + resp := executeNodeRequest(context.Background(), &http.Client{Timeout: time.Second}, info, nodeRegisterOptions{}, &nodes.Request{ + Action: "camera_snap", + Args: map[string]interface{}{"facing": "front"}, + }) + if !resp.OK { + t.Fatalf("expected ok response, got %+v", resp) + } + artifacts, ok := resp.Payload["artifacts"].([]map[string]interface{}) + if !ok || len(artifacts) != 1 { + t.Fatalf("expected one artifact, got %+v", resp.Payload["artifacts"]) + } + if artifacts[0]["name"] != "camera-test.jpg" { + t.Fatalf("unexpected artifact: %+v", artifacts[0]) + } +} + +func TestExecuteNodeRequestRunsLocalScreenSnapshot(t *testing.T) { + prevCfg := globalConfigPathOverride + prevExecutors := nodeLocalExecutors + prevScreen := nodeScreenSnapFunc + globalConfigPathOverride = filepath.Join(t.TempDir(), "config.json") + nodeLocalExecutors = map[string]*nodeLocalExecutor{} + defer func() { + globalConfigPathOverride = prevCfg + nodeLocalExecutors = prevExecutors + nodeScreenSnapFunc = prevScreen + }() + + cfg := config.DefaultConfig() + cfg.Agents.Defaults.Workspace = filepath.Join(t.TempDir(), "workspace") + if err := config.SaveConfig(globalConfigPathOverride, cfg); err != nil { + t.Fatalf("save config: %v", err) + } + nodeScreenSnapFunc = func(ctx context.Context, workspace string, args map[string]interface{}) (string, error) { + out := filepath.Join(workspace, "artifacts", "node", "screen-test.png") + if err := os.MkdirAll(filepath.Dir(out), 0755); err != nil { + return "", err + } + if err := os.WriteFile(out, []byte{0x89, 0x50, 0x4e, 0x47}, 0644); err != nil { + return "", err + } + return out, nil + } + + info := nodes.NodeInfo{ID: "edge-screen", Name: "Edge Screen"} + resp := executeNodeRequest(context.Background(), &http.Client{Timeout: time.Second}, info, nodeRegisterOptions{}, &nodes.Request{ + Action: "screen_snapshot", + }) + if !resp.OK { + t.Fatalf("expected ok response, got %+v", resp) + } + artifacts, ok := resp.Payload["artifacts"].([]map[string]interface{}) + if !ok || len(artifacts) != 1 { + t.Fatalf("expected one artifact, got %+v", resp.Payload["artifacts"]) + } + if artifacts[0]["name"] != "screen-test.png" { + t.Fatalf("unexpected artifact: %+v", artifacts[0]) + } +} diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 6e5f180..5df8eb0 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -352,10 +352,15 @@ func (al *AgentLoop) dispatchNodeSubagentTask(ctx context.Context, task *tools.S return "", fmt.Errorf("node-backed subagent %q missing node_id", task.AgentID) } taskInput := loopTaskInputForNode(task) + reqArgs := map[string]interface{}{} + if remoteAgentID := remoteAgentIDForNodeBranch(task.AgentID, nodeID); remoteAgentID != "" { + reqArgs["remote_agent_id"] = remoteAgentID + } resp, err := al.nodeRouter.Dispatch(ctx, nodes.Request{ Action: "agent_task", Node: nodeID, Task: taskInput, + Args: reqArgs, }, "auto") if err != nil { return "", err @@ -372,6 +377,23 @@ func (al *AgentLoop) dispatchNodeSubagentTask(ctx context.Context, task *tools.S return fmt.Sprintf("node %s completed agent_task", nodeID), nil } +func remoteAgentIDForNodeBranch(agentID, nodeID string) string { + agentID = strings.TrimSpace(agentID) + nodeID = strings.TrimSpace(nodeID) + if agentID == "" || nodeID == "" { + return "" + } + prefix := "node." + nodeID + "." + if !strings.HasPrefix(agentID, prefix) { + return "" + } + remote := strings.TrimPrefix(agentID, prefix) + if strings.TrimSpace(remote) == "" { + return "" + } + return remote +} + func loopTaskInputForNode(task *tools.SubagentTask) string { if task == nil { return "" diff --git a/pkg/agent/subagent_node_test.go b/pkg/agent/subagent_node_test.go index f99e24b..04cd82c 100644 --- a/pkg/agent/subagent_node_test.go +++ b/pkg/agent/subagent_node_test.go @@ -23,6 +23,9 @@ func TestDispatchNodeSubagentTaskUsesNodeAgentTask(t *testing.T) { if req.Action != "agent_task" { t.Fatalf("unexpected action: %s", req.Action) } + if got, _ := req.Args["remote_agent_id"].(string); got != "coder" { + t.Fatalf("expected remote_agent_id=coder, got %+v", req.Args) + } if !strings.Contains(req.Task, "Parent Agent: main") { t.Fatalf("expected parent-agent context in task, got %q", req.Task) } @@ -43,7 +46,7 @@ func TestDispatchNodeSubagentTaskUsesNodeAgentTask(t *testing.T) { } out, err := loop.dispatchNodeSubagentTask(context.Background(), &tools.SubagentTask{ ID: "subagent-1", - AgentID: "node.edge-dev.main", + AgentID: "node.edge-dev.coder", Transport: "node", NodeID: "edge-dev", ParentAgentID: "main", diff --git a/pkg/nodes/manager.go b/pkg/nodes/manager.go index 17607ec..64ae250 100644 --- a/pkg/nodes/manager.go +++ b/pkg/nodes/manager.go @@ -251,7 +251,19 @@ func (m *Manager) SupportsAction(nodeID, action string) bool { if !ok || !n.Online { return false } - action = strings.ToLower(strings.TrimSpace(action)) + return nodeSupportsRequest(n, Request{Action: action}) +} + +func (m *Manager) SupportsRequest(nodeID string, req Request) bool { + n, ok := m.Get(nodeID) + if !ok || !n.Online { + return false + } + return nodeSupportsRequest(n, req) +} + +func nodeSupportsRequest(n NodeInfo, req Request) bool { + action := strings.ToLower(strings.TrimSpace(req.Action)) if len(n.Actions) > 0 { allowed := false for _, a := range n.Actions { @@ -283,44 +295,112 @@ func (m *Manager) SupportsAction(nodeID, action string) bool { } func (m *Manager) PickFor(action string) (NodeInfo, bool) { + return m.PickRequest(Request{Action: action}, "auto") +} + +func (m *Manager) PickRequest(req Request, mode string) (NodeInfo, bool) { m.mu.RLock() defer m.mu.RUnlock() + bestScore := -1 + bestNode := NodeInfo{} for _, n := range m.nodes { - if !n.Online { + score, ok := scoreNodeCandidate(n, req, mode, m.senders[strings.TrimSpace(n.ID)] != nil) + if !ok { continue } - switch strings.ToLower(strings.TrimSpace(action)) { - case "run": - if n.Capabilities.Run { - return n, true - } - case "agent_task": - if n.Capabilities.Model { - return n, true - } - case "camera_snap", "camera_clip": - if n.Capabilities.Camera { - return n, true - } - case "screen_record", "screen_snapshot": - if n.Capabilities.Screen { - return n, true - } - case "location_get": - if n.Capabilities.Location { - return n, true - } - case "canvas_snapshot", "canvas_action": - if n.Capabilities.Canvas { - return n, true - } - default: - if n.Capabilities.Invoke { - return n, true - } + if score > bestScore || (score == bestScore && bestNode.ID != "" && n.LastSeenAt.After(bestNode.LastSeenAt)) { + bestScore = score + bestNode = n } } - return NodeInfo{}, false + if bestScore < 0 || strings.TrimSpace(bestNode.ID) == "" { + return NodeInfo{}, false + } + return bestNode, true +} + +func scoreNodeCandidate(n NodeInfo, req Request, mode string, hasWireSender bool) (int, bool) { + if !n.Online { + return 0, false + } + if !nodeSupportsRequest(n, req) { + return 0, false + } + + mode = strings.ToLower(strings.TrimSpace(mode)) + if mode == "p2p" && !hasWireSender { + return 0, false + } + + score := 100 + if hasWireSender { + score += 30 + } + if prefersRealtimeTransport(req.Action) && hasWireSender { + score += 40 + } + if mode == "relay" && hasWireSender { + score -= 10 + } + if mode == "p2p" && hasWireSender { + score += 80 + } + if strings.EqualFold(strings.TrimSpace(req.Action), "agent_task") { + remoteAgentID := requestedRemoteAgentID(req.Args) + switch { + case remoteAgentID == "", remoteAgentID == "main": + score += 20 + case nodeHasAgent(n, remoteAgentID): + score += 80 + default: + return 0, false + } + } + if !n.LastSeenAt.IsZero() { + ageSeconds := int(time.Since(n.LastSeenAt).Seconds()) + if ageSeconds < 0 { + ageSeconds = 0 + } + if ageSeconds < 60 { + score += 20 + } else if ageSeconds < 300 { + score += 5 + } + } + return score, true +} + +func requestedRemoteAgentID(args map[string]interface{}) string { + if len(args) == 0 { + return "" + } + value, ok := args["remote_agent_id"] + if !ok || value == nil { + return "" + } + return strings.ToLower(strings.TrimSpace(fmt.Sprint(value))) +} + +func nodeHasAgent(n NodeInfo, agentID string) bool { + agentID = strings.ToLower(strings.TrimSpace(agentID)) + if agentID == "" { + return false + } + for _, agent := range n.Agents { + if strings.ToLower(strings.TrimSpace(agent.ID)) == agentID { + return true + } + } + return false +} + +func prefersRealtimeTransport(action string) bool { + switch strings.ToLower(strings.TrimSpace(action)) { + case "camera_snap", "camera_clip", "screen_record", "screen_snapshot", "canvas_snapshot", "canvas_action": + return true + default: + return false + } } func (m *Manager) reaperLoop() { diff --git a/pkg/nodes/manager_test.go b/pkg/nodes/manager_test.go new file mode 100644 index 0000000..771ad75 --- /dev/null +++ b/pkg/nodes/manager_test.go @@ -0,0 +1,76 @@ +package nodes + +import ( + "testing" + "time" +) + +func TestPickRequestPrefersMatchingRemoteAgent(t *testing.T) { + t.Parallel() + + manager := NewManager() + now := time.Now().UTC() + manager.Upsert(NodeInfo{ + ID: "node-main-only", + Online: true, + LastSeenAt: now, + Capabilities: Capabilities{ + Model: true, + }, + Agents: []AgentInfo{{ID: "main"}}, + }) + manager.Upsert(NodeInfo{ + ID: "node-coder", + Online: true, + LastSeenAt: now, + Capabilities: Capabilities{ + Model: true, + }, + Agents: []AgentInfo{{ID: "main"}, {ID: "coder"}}, + }) + + picked, ok := manager.PickRequest(Request{ + Action: "agent_task", + Args: map[string]interface{}{"remote_agent_id": "coder"}, + }, "auto") + if !ok { + t.Fatalf("expected node pick") + } + if picked.ID != "node-coder" { + t.Fatalf("expected node-coder, got %+v", picked) + } +} + +func TestPickRequestPrefersRealtimeCapableNodeForScreenActions(t *testing.T) { + t.Parallel() + + manager := NewManager() + now := time.Now().UTC() + manager.Upsert(NodeInfo{ + ID: "relay-only", + Online: true, + LastSeenAt: now.Add(-2 * time.Minute), + Capabilities: Capabilities{ + Screen: true, + }, + Actions: []string{"screen_snapshot"}, + }) + manager.Upsert(NodeInfo{ + ID: "p2p-ready", + Online: true, + LastSeenAt: now, + Capabilities: Capabilities{ + Screen: true, + }, + Actions: []string{"screen_snapshot"}, + }) + manager.RegisterWireSender("p2p-ready", &captureWireSender{}) + + picked, ok := manager.PickRequest(Request{Action: "screen_snapshot"}, "auto") + if !ok { + t.Fatalf("expected node pick") + } + if picked.ID != "p2p-ready" { + t.Fatalf("expected p2p-ready, got %+v", picked) + } +} diff --git a/pkg/nodes/transport.go b/pkg/nodes/transport.go index d77503d..daa4bea 100644 --- a/pkg/nodes/transport.go +++ b/pkg/nodes/transport.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net/http" + "path/filepath" "strings" "time" ) @@ -211,5 +212,101 @@ func normalizeDevicePayload(action string, payload map[string]interface{}) map[s if _, ok := payload["meta"]; !ok { payload["meta"] = map[string]interface{}{} } + payload["artifacts"] = normalizeArtifacts(payload, a) return payload } + +func normalizeArtifacts(payload map[string]interface{}, action string) []map[string]interface{} { + if payload == nil { + return []map[string]interface{}{} + } + if raw, ok := payload["artifacts"]; ok { + items := normalizeArtifactList(raw) + if len(items) > 0 { + return items + } + } + + artifact := map[string]interface{}{} + if mediaType, _ := payload["media_type"].(string); strings.TrimSpace(mediaType) != "" { + artifact["kind"] = strings.TrimSpace(mediaType) + } + if mimeType, _ := payload["mime_type"].(string); strings.TrimSpace(mimeType) != "" { + artifact["mime_type"] = strings.TrimSpace(mimeType) + } + if storage, _ := payload["storage"].(string); strings.TrimSpace(storage) != "" { + artifact["storage"] = strings.TrimSpace(storage) + } + if path, _ := payload["path"].(string); strings.TrimSpace(path) != "" { + artifact["path"] = filepath.Clean(strings.TrimSpace(path)) + } + if url, _ := payload["url"].(string); strings.TrimSpace(url) != "" { + artifact["url"] = strings.TrimSpace(url) + } + if image, _ := payload["image"].(string); strings.TrimSpace(image) != "" { + artifact["content_base64"] = strings.TrimSpace(image) + } + if text, _ := payload["content_text"].(string); strings.TrimSpace(text) != "" { + artifact["content_text"] = text + } + if name, _ := payload["name"].(string); strings.TrimSpace(name) != "" { + artifact["name"] = strings.TrimSpace(name) + } + if size := int64FromPayload(payload["size_bytes"]); size > 0 { + artifact["size_bytes"] = size + } + if len(artifact) == 0 { + return []map[string]interface{}{} + } + if _, ok := artifact["kind"]; !ok && strings.TrimSpace(action) != "" { + artifact["kind"] = strings.ToLower(strings.TrimSpace(action)) + } + return []map[string]interface{}{artifact} +} + +func normalizeArtifactList(raw interface{}) []map[string]interface{} { + items, ok := raw.([]interface{}) + if !ok { + return []map[string]interface{}{} + } + out := make([]map[string]interface{}, 0, len(items)) + for _, item := range items { + row, ok := item.(map[string]interface{}) + if !ok || len(row) == 0 { + continue + } + normalized := map[string]interface{}{} + for _, key := range []string{"id", "name", "kind", "mime_type", "storage", "path", "url", "content_text", "content_base64", "source_path"} { + if value, ok := row[key]; ok && strings.TrimSpace(fmt.Sprint(value)) != "" { + normalized[key] = value + } + } + if truncated, ok := row["truncated"].(bool); ok && truncated { + normalized["truncated"] = true + } + if size := int64FromPayload(row["size_bytes"]); size > 0 { + normalized["size_bytes"] = size + } + if len(normalized) == 0 { + continue + } + out = append(out, normalized) + } + return out +} + +func int64FromPayload(v interface{}) int64 { + switch value := v.(type) { + case int: + return int64(value) + case int64: + return value + case float64: + return int64(value) + case json.Number: + n, _ := value.Int64() + return n + default: + return 0 + } +} diff --git a/pkg/nodes/transport_test.go b/pkg/nodes/transport_test.go index 9cd3cc6..d662996 100644 --- a/pkg/nodes/transport_test.go +++ b/pkg/nodes/transport_test.go @@ -74,6 +74,24 @@ func TestWebsocketP2PTransportSend(t *testing.T) { } } +func TestNormalizeDevicePayloadBuildsArtifacts(t *testing.T) { + t.Parallel() + + payload := normalizeDevicePayload("screen_snapshot", map[string]interface{}{ + "media_type": "image", + "storage": "path", + "path": "/tmp/screen.png", + "mime_type": "image/png", + }) + artifacts, ok := payload["artifacts"].([]map[string]interface{}) + if !ok || len(artifacts) != 1 { + t.Fatalf("expected one artifact, got %+v", payload["artifacts"]) + } + if artifacts[0]["kind"] != "image" || artifacts[0]["path"] != "/tmp/screen.png" { + t.Fatalf("unexpected artifact payload: %+v", artifacts[0]) + } +} + func TestWebRTCTransportSendEndToEnd(t *testing.T) { t.Parallel() diff --git a/pkg/nodes/types.go b/pkg/nodes/types.go index bf116f2..e38ce8a 100644 --- a/pkg/nodes/types.go +++ b/pkg/nodes/types.go @@ -13,6 +13,31 @@ type Capabilities struct { Canvas bool `json:"canvas"` } +// AgentInfo describes an enabled agent exposed by a remote clawgo node. +type AgentInfo struct { + ID string `json:"id"` + DisplayName string `json:"display_name,omitempty"` + Role string `json:"role,omitempty"` + Type string `json:"type,omitempty"` + Transport string `json:"transport,omitempty"` + ParentAgentID string `json:"parent_agent_id,omitempty"` +} + +// Artifact describes a file/media payload returned from a node action. +type Artifact struct { + ID string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Kind string `json:"kind,omitempty"` + MIMEType string `json:"mime_type,omitempty"` + Storage string `json:"storage,omitempty"` + Path string `json:"path,omitempty"` + URL string `json:"url,omitempty"` + ContentText string `json:"content_text,omitempty"` + ContentB64 string `json:"content_base64,omitempty"` + SizeBytes int64 `json:"size_bytes,omitempty"` + SourcePath string `json:"source_path,omitempty"` +} + // NodeInfo is the runtime descriptor for cross-device scheduling. type NodeInfo struct { ID string `json:"id"` @@ -25,6 +50,7 @@ type NodeInfo struct { Capabilities Capabilities `json:"capabilities"` Actions []string `json:"actions,omitempty"` Models []string `json:"models,omitempty"` + Agents []AgentInfo `json:"agents,omitempty"` RegisteredAt time.Time `json:"registered_at,omitempty"` LastSeenAt time.Time `json:"last_seen_at"` Online bool `json:"online"` @@ -51,15 +77,15 @@ type Response struct { // WireMessage is the websocket envelope for node lifecycle messages. type WireMessage struct { - Type string `json:"type"` - ID string `json:"id,omitempty"` - From string `json:"from,omitempty"` - To string `json:"to,omitempty"` - Session string `json:"session,omitempty"` - Node *NodeInfo `json:"node,omitempty"` - Request *Request `json:"request,omitempty"` - Response *Response `json:"response,omitempty"` - Payload map[string]interface{} `json:"payload,omitempty"` + Type string `json:"type"` + ID string `json:"id,omitempty"` + From string `json:"from,omitempty"` + To string `json:"to,omitempty"` + Session string `json:"session,omitempty"` + Node *NodeInfo `json:"node,omitempty"` + Request *Request `json:"request,omitempty"` + Response *Response `json:"response,omitempty"` + Payload map[string]interface{} `json:"payload,omitempty"` } // WireAck is the websocket response envelope for node lifecycle messages. diff --git a/pkg/tools/nodes_tool.go b/pkg/tools/nodes_tool.go index 2ba385e..cdc277d 100644 --- a/pkg/tools/nodes_tool.go +++ b/pkg/tools/nodes_tool.go @@ -28,15 +28,16 @@ func (t *NodesTool) Description() string { } func (t *NodesTool) Parameters() map[string]interface{} { return map[string]interface{}{"type": "object", "properties": map[string]interface{}{ - "action": map[string]interface{}{"type": "string", "description": "status|describe|run|invoke|agent_task|camera_snap|camera_clip|screen_record|screen_snapshot|location_get|canvas_snapshot|canvas_action"}, - "node": map[string]interface{}{"type": "string", "description": "target node id"}, - "mode": map[string]interface{}{"type": "string", "description": "auto|p2p|relay (default auto)"}, - "args": map[string]interface{}{"type": "object", "description": "action args"}, - "task": map[string]interface{}{"type": "string", "description": "agent_task content for child node model"}, - "model": map[string]interface{}{"type": "string", "description": "optional model for agent_task"}, - "command": map[string]interface{}{"type": "array", "items": map[string]interface{}{"type": "string"}, "description": "run command array shortcut"}, - "facing": map[string]interface{}{"type": "string", "description": "camera facing: front|back|both"}, - "duration_ms": map[string]interface{}{"type": "integer", "description": "clip/record duration"}, + "action": map[string]interface{}{"type": "string", "description": "status|describe|run|invoke|agent_task|camera_snap|camera_clip|screen_record|screen_snapshot|location_get|canvas_snapshot|canvas_action"}, + "node": map[string]interface{}{"type": "string", "description": "target node id"}, + "mode": map[string]interface{}{"type": "string", "description": "auto|p2p|relay (default auto)"}, + "args": map[string]interface{}{"type": "object", "description": "action args"}, + "artifact_paths": map[string]interface{}{"type": "array", "items": map[string]interface{}{"type": "string"}, "description": "optional workspace-relative file paths to bring back as artifacts for agent_task"}, + "task": map[string]interface{}{"type": "string", "description": "agent_task content for child node model"}, + "model": map[string]interface{}{"type": "string", "description": "optional model for agent_task"}, + "command": map[string]interface{}{"type": "array", "items": map[string]interface{}{"type": "string"}, "description": "run command array shortcut"}, + "facing": map[string]interface{}{"type": "string", "description": "camera facing: front|back|both"}, + "duration_ms": map[string]interface{}{"type": "integer", "description": "clip/record duration"}, }, "required": []string{"action"}} } @@ -66,26 +67,15 @@ func (t *NodesTool) Execute(ctx context.Context, args map[string]interface{}) (s b, _ := json.Marshal(t.manager.List()) return string(b), nil default: - if nodeID == "" { - if picked, ok := t.manager.PickFor(action); ok { - nodeID = picked.ID - } - } - if nodeID == "" { - return "", fmt.Errorf("no eligible node found for action=%s", action) - } - if !t.manager.SupportsAction(nodeID, action) { - return "", fmt.Errorf("node %s does not support action=%s", nodeID, action) - } - if t.router == nil { - return "", fmt.Errorf("nodes transport router not configured") - } reqArgs := map[string]interface{}{} if raw, ok := args["args"].(map[string]interface{}); ok { for k, v := range raw { reqArgs[k] = v } } + if rawPaths, ok := args["artifact_paths"].([]interface{}); ok && len(rawPaths) > 0 { + reqArgs["artifact_paths"] = rawPaths + } if cmd, ok := args["command"].([]interface{}); ok && len(cmd) > 0 { reqArgs["command"] = cmd } @@ -113,7 +103,21 @@ func (t *NodesTool) Execute(ctx context.Context, args map[string]interface{}) (s return "", fmt.Errorf("invalid_args: canvas_action requires args.action") } } + if nodeID == "" { + if picked, ok := t.manager.PickRequest(nodes.Request{Action: action, Task: task, Model: model, Args: reqArgs}, mode); ok { + nodeID = picked.ID + } + } + if nodeID == "" { + return "", fmt.Errorf("no eligible node found for action=%s", action) + } req := nodes.Request{Action: action, Node: nodeID, Task: task, Model: model, Args: reqArgs} + if !t.manager.SupportsRequest(nodeID, req) { + return "", fmt.Errorf("node %s does not support action=%s", nodeID, action) + } + if t.router == nil { + return "", fmt.Errorf("nodes transport router not configured") + } started := time.Now() resp, err := t.router.Dispatch(ctx, req, mode) durationMs := int(time.Since(started).Milliseconds()) @@ -150,6 +154,12 @@ func (t *NodesTool) writeAudit(req nodes.Request, resp nodes.Response, mode stri if fallback, _ := resp.Payload["fallback_from"].(string); strings.TrimSpace(fallback) != "" { row["fallback_from"] = strings.TrimSpace(fallback) } + if count, kinds := artifactAuditSummary(resp.Payload["artifacts"]); count > 0 { + row["artifact_count"] = count + if len(kinds) > 0 { + row["artifact_kinds"] = kinds + } + } b, _ := json.Marshal(row) f, err := os.OpenFile(t.auditPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) if err != nil { @@ -158,3 +168,29 @@ func (t *NodesTool) writeAudit(req nodes.Request, resp nodes.Response, mode stri defer f.Close() _, _ = f.Write(append(b, '\n')) } + +func artifactAuditSummary(raw interface{}) (int, []string) { + items, ok := raw.([]interface{}) + if !ok { + if typed, ok := raw.([]map[string]interface{}); ok { + items = make([]interface{}, 0, len(typed)) + for _, item := range typed { + items = append(items, item) + } + } + } + if len(items) == 0 { + return 0, nil + } + kinds := make([]string, 0, len(items)) + for _, item := range items { + row, ok := item.(map[string]interface{}) + if !ok { + continue + } + if kind, _ := row["kind"].(string); strings.TrimSpace(kind) != "" { + kinds = append(kinds, strings.TrimSpace(kind)) + } + } + return len(items), kinds +} diff --git a/pkg/tools/subagent_profile.go b/pkg/tools/subagent_profile.go index 07d3ad3..bd1610c 100644 --- a/pkg/tools/subagent_profile.go +++ b/pkg/tools/subagent_profile.go @@ -387,9 +387,10 @@ func (s *SubagentProfileStore) nodeProfileLocked(agentID string) (SubagentProfil if isLocalNode(node.ID) { continue } - profile := profileFromNode(node, parentAgentID) - if profile.AgentID == id { - return profile, true + for _, profile := range profilesFromNode(node, parentAgentID) { + if profile.AgentID == id { + return profile, true + } } } return SubagentProfile{}, false @@ -439,20 +440,18 @@ func (s *SubagentProfileStore) nodeProfilesLocked() []SubagentProfile { if isLocalNode(node.ID) { continue } - profile := profileFromNode(node, parentAgentID) - if profile.AgentID == "" { - continue + profiles := profilesFromNode(node, parentAgentID) + for _, profile := range profiles { + if profile.AgentID == "" { + continue + } + out = append(out, profile) } - out = append(out, profile) } return out } -func profileFromNode(node nodes.NodeInfo, parentAgentID string) SubagentProfile { - agentID := nodeBranchAgentID(node.ID) - if agentID == "" { - return SubagentProfile{} - } +func profilesFromNode(node nodes.NodeInfo, parentAgentID string) []SubagentProfile { name := strings.TrimSpace(node.Name) if name == "" { name = strings.TrimSpace(node.ID) @@ -461,17 +460,39 @@ func profileFromNode(node nodes.NodeInfo, parentAgentID string) SubagentProfile if !node.Online { status = "disabled" } - return normalizeSubagentProfile(SubagentProfile{ - AgentID: agentID, + rootAgentID := nodeBranchAgentID(node.ID) + if rootAgentID == "" { + return nil + } + out := []SubagentProfile{normalizeSubagentProfile(SubagentProfile{ + AgentID: rootAgentID, Name: name + " Main Agent", Transport: "node", NodeID: strings.TrimSpace(node.ID), ParentAgentID: parentAgentID, Role: "remote_main", - MemoryNamespace: agentID, + MemoryNamespace: rootAgentID, Status: status, ManagedBy: "node_registry", - }) + })} + for _, agent := range node.Agents { + agentID := normalizeSubagentIdentifier(agent.ID) + if agentID == "" || agentID == "main" { + continue + } + out = append(out, normalizeSubagentProfile(SubagentProfile{ + AgentID: nodeChildAgentID(node.ID, agentID), + Name: nodeChildAgentDisplayName(name, agent), + Transport: "node", + NodeID: strings.TrimSpace(node.ID), + ParentAgentID: rootAgentID, + Role: strings.TrimSpace(agent.Role), + MemoryNamespace: nodeChildAgentID(node.ID, agentID), + Status: status, + ManagedBy: "node_registry", + })) + } + return out } func nodeBranchAgentID(nodeID string) string { @@ -482,6 +503,27 @@ func nodeBranchAgentID(nodeID string) string { return "node." + id + ".main" } +func nodeChildAgentID(nodeID, agentID string) string { + nodeID = normalizeSubagentIdentifier(nodeID) + agentID = normalizeSubagentIdentifier(agentID) + if nodeID == "" || agentID == "" { + return "" + } + return "node." + nodeID + "." + agentID +} + +func nodeChildAgentDisplayName(nodeName string, agent nodes.AgentInfo) string { + base := strings.TrimSpace(agent.DisplayName) + if base == "" { + base = strings.TrimSpace(agent.ID) + } + nodeName = strings.TrimSpace(nodeName) + if nodeName == "" { + return base + } + return nodeName + " / " + base +} + func isLocalNode(nodeID string) bool { return normalizeSubagentIdentifier(nodeID) == "local" } diff --git a/pkg/tools/subagent_profile_test.go b/pkg/tools/subagent_profile_test.go index e45cfa1..1597575 100644 --- a/pkg/tools/subagent_profile_test.go +++ b/pkg/tools/subagent_profile_test.go @@ -208,6 +208,10 @@ func TestSubagentProfileStoreIncludesNodeMainBranchProfiles(t *testing.T) { ID: "edge-dev", Name: "Edge Dev", Online: true, + Agents: []nodes.AgentInfo{ + {ID: "main", DisplayName: "Main Agent", Role: "orchestrator", Type: "router"}, + {ID: "coder", DisplayName: "Code Agent", Role: "code", Type: "worker"}, + }, Capabilities: nodes.Capabilities{ Model: true, }, @@ -227,6 +231,19 @@ func TestSubagentProfileStoreIncludesNodeMainBranchProfiles(t *testing.T) { if profile.ParentAgentID != "main" { t.Fatalf("expected main parent agent, got %+v", profile) } + childProfile, ok, err := store.Get("node.edge-dev.coder") + if err != nil { + t.Fatalf("get child profile failed: %v", err) + } + if !ok { + t.Fatalf("expected child node-backed profile") + } + if childProfile.ManagedBy != "node_registry" || childProfile.Transport != "node" || childProfile.NodeID != "edge-dev" { + t.Fatalf("unexpected child node profile: %+v", childProfile) + } + if childProfile.ParentAgentID != "node.edge-dev.main" { + t.Fatalf("expected child profile to attach to remote main, got %+v", childProfile) + } if _, err := store.Upsert(SubagentProfile{AgentID: profile.AgentID}); err == nil { t.Fatalf("expected node-managed upsert to fail") } diff --git a/webui/src/i18n/index.ts b/webui/src/i18n/index.ts index fa8bf59..55dfd4b 100644 --- a/webui/src/i18n/index.ts +++ b/webui/src/i18n/index.ts @@ -135,6 +135,7 @@ const resources = { dashboardNodeDispatchTransport: 'Used Transport', dashboardNodeDispatchFallback: 'Fallback From', dashboardNodeDispatchDuration: 'Duration', + dashboardNodeDispatchArtifacts: 'Artifacts', dashboardNodeDispatchError: 'Error', configNodeP2P: 'Node P2P', configNodeP2PHint: 'Configure websocket tunnel or WebRTC transport for remote nodes.', @@ -687,6 +688,7 @@ const resources = { dashboardNodeDispatchTransport: '实际传输', dashboardNodeDispatchFallback: '回退来源', dashboardNodeDispatchDuration: '耗时', + dashboardNodeDispatchArtifacts: '工件', dashboardNodeDispatchError: '错误', configNodeP2P: '节点 P2P', configNodeP2PHint: '为远端节点配置 websocket tunnel 或 WebRTC 传输。', diff --git a/webui/src/pages/Dashboard.tsx b/webui/src/pages/Dashboard.tsx index d13e758..2c92a83 100644 --- a/webui/src/pages/Dashboard.tsx +++ b/webui/src/pages/Dashboard.tsx @@ -79,6 +79,8 @@ const Dashboard: React.FC = () => { usedTransport: String(item?.used_transport || '-'), fallbackFrom: String(item?.fallback_from || '').trim(), durationMs: Number(item?.duration_ms || 0), + artifactCount: Number(item?.artifact_count || 0), + artifactKinds: Array.isArray(item?.artifact_kinds) ? item.artifact_kinds.map((kind: any) => String(kind || '').trim()).filter(Boolean) : [], ok: Boolean(item?.ok), error: String(item?.error || '').trim(), })); @@ -287,6 +289,12 @@ const Dashboard: React.FC = () => {
{t('dashboardNodeDispatchDuration')}
{`${item.durationMs}ms`}
+
+
{t('dashboardNodeDispatchArtifacts')}
+
+ {item.artifactCount > 0 ? `${item.artifactCount}${item.artifactKinds.length ? ` · ${item.artifactKinds.join(', ')}` : ''}` : '-'} +
+
{t('dashboardNodeDispatchError')}