feat: expand node agent routing and media artifacts

This commit is contained in:
lpf
2026-03-09 01:21:19 +08:00
parent c0fe977bce
commit 2d5a384342
14 changed files with 1291 additions and 81 deletions

View File

@@ -3,19 +3,30 @@ package main
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io/fs"
"net/http"
"os"
"os/exec"
"os/signal"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"
"clawgo/pkg/agent"
"clawgo/pkg/bus"
"clawgo/pkg/config"
"clawgo/pkg/cron"
"clawgo/pkg/nodes"
"clawgo/pkg/providers"
"clawgo/pkg/runtimecfg"
"clawgo/pkg/tools"
"github.com/gorilla/websocket"
"github.com/pion/webrtc/v4"
)
@@ -32,6 +43,7 @@ type nodeRegisterOptions struct {
Version string
Actions []string
Models []string
Agents []nodes.AgentInfo
Capabilities nodes.Capabilities
Watch bool
HeartbeatSec int
@@ -54,6 +66,26 @@ type nodeWebRTCSession struct {
dc *webrtc.DataChannel
}
type nodeLocalExecutor struct {
configPath string
workspace string
once sync.Once
loop *agent.AgentLoop
err error
}
var (
nodeLocalExecutorMu sync.Mutex
nodeLocalExecutors = map[string]*nodeLocalExecutor{}
nodeProviderFactory = providers.CreateProvider
nodeAgentLoopFactory = agent.NewAgentLoop
nodeLocalExecutorFactory = newNodeLocalExecutor
nodeCameraSnapFunc = captureNodeCameraSnapshot
nodeScreenSnapFunc = captureNodeScreenSnapshot
)
const nodeArtifactInlineLimit = 512 * 1024
func nodeCmd() {
args := os.Args[2:]
if len(args) == 0 {
@@ -159,6 +191,7 @@ func parseNodeRegisterArgs(args []string, cfg *config.Config) (nodeRegisterOptio
HeartbeatSec: 30,
Capabilities: capabilitiesFromCSV("run,invoke,model"),
}
opts.Agents = nodeAgentsFromConfig(cfg)
for i := 0; i < len(args); i++ {
arg := strings.TrimSpace(args[i])
next := func() (string, error) {
@@ -332,9 +365,33 @@ func buildNodeInfo(opts nodeRegisterOptions) nodes.NodeInfo {
Capabilities: opts.Capabilities,
Actions: append([]string(nil), opts.Actions...),
Models: append([]string(nil), opts.Models...),
Agents: append([]nodes.AgentInfo(nil), opts.Agents...),
}
}
func nodeAgentsFromConfig(cfg *config.Config) []nodes.AgentInfo {
if cfg == nil {
return nil
}
items := make([]nodes.AgentInfo, 0, len(cfg.Agents.Subagents))
for agentID, subcfg := range cfg.Agents.Subagents {
id := strings.TrimSpace(agentID)
if id == "" || !subcfg.Enabled {
continue
}
items = append(items, nodes.AgentInfo{
ID: id,
DisplayName: strings.TrimSpace(subcfg.DisplayName),
Role: strings.TrimSpace(subcfg.Role),
Type: strings.TrimSpace(subcfg.Type),
Transport: strings.TrimSpace(subcfg.Transport),
ParentAgentID: strings.TrimSpace(subcfg.ParentAgentID),
})
}
sort.Slice(items, func(i, j int) bool { return items[i].ID < items[j].ID })
return items
}
func runNodeHeartbeatLoop(client *http.Client, opts nodeRegisterOptions, info nodes.NodeInfo) error {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()
@@ -654,6 +711,38 @@ func executeNodeRequest(ctx context.Context, client *http.Client, info nodes.Nod
}
next := *req
resp.Action = next.Action
switch strings.ToLower(strings.TrimSpace(next.Action)) {
case "agent_task":
execResp, err := executeNodeAgentTask(ctx, info, next)
if err == nil {
return execResp
}
if strings.TrimSpace(opts.Endpoint) == "" {
resp.Error = err.Error()
resp.Code = "local_runtime_error"
return resp
}
case "camera_snap":
execResp, err := executeNodeCameraSnap(ctx, info, next)
if err == nil {
return execResp
}
if strings.TrimSpace(opts.Endpoint) == "" {
resp.Error = err.Error()
resp.Code = "local_runtime_error"
return resp
}
case "screen_snapshot":
execResp, err := executeNodeScreenSnapshot(ctx, info, next)
if err == nil {
return execResp
}
if strings.TrimSpace(opts.Endpoint) == "" {
resp.Error = err.Error()
resp.Code = "local_runtime_error"
return resp
}
}
if strings.TrimSpace(opts.Endpoint) == "" {
resp.Error = "node endpoint not configured"
resp.Code = "endpoint_missing"
@@ -678,6 +767,444 @@ func executeNodeRequest(ctx context.Context, client *http.Client, info nodes.Nod
return execResp
}
func executeNodeAgentTask(ctx context.Context, info nodes.NodeInfo, req nodes.Request) (nodes.Response, error) {
executor, err := getNodeLocalExecutor()
if err != nil {
return nodes.Response{}, err
}
loop, err := executor.Loop()
if err != nil {
return nodes.Response{}, err
}
remoteAgentID := strings.TrimSpace(stringArg(req.Args, "remote_agent_id"))
if remoteAgentID == "" || strings.EqualFold(remoteAgentID, "main") {
sessionKey := fmt.Sprintf("node:%s:main", info.ID)
result, err := loop.ProcessDirectWithOptions(ctx, strings.TrimSpace(req.Task), sessionKey, "node", info.ID, "main", nil)
if err != nil {
return nodes.Response{}, err
}
artifacts, err := collectNodeArtifacts(executor.workspace, req.Args)
if err != nil {
return nodes.Response{}, err
}
return nodes.Response{
OK: true,
Code: "ok",
Node: info.ID,
Action: req.Action,
Payload: map[string]interface{}{
"transport": "clawgo-local",
"agent_id": "main",
"result": strings.TrimSpace(result),
"artifacts": artifacts,
},
}, nil
}
out, err := loop.HandleSubagentRuntime(ctx, "dispatch_and_wait", map[string]interface{}{
"task": strings.TrimSpace(req.Task),
"agent_id": remoteAgentID,
"channel": "node",
"chat_id": info.ID,
"wait_timeout_sec": float64(120),
})
if err != nil {
return nodes.Response{}, err
}
payload, _ := out.(map[string]interface{})
result := strings.TrimSpace(fmt.Sprint(payload["merged"]))
if result == "" {
if reply, ok := payload["reply"].(*tools.RouterReply); ok {
result = strings.TrimSpace(reply.Result)
}
}
artifacts, err := collectNodeArtifacts(executor.workspace, req.Args)
if err != nil {
return nodes.Response{}, err
}
return nodes.Response{
OK: true,
Code: "ok",
Node: info.ID,
Action: req.Action,
Payload: map[string]interface{}{
"transport": "clawgo-local",
"agent_id": remoteAgentID,
"result": result,
"artifacts": artifacts,
},
}, nil
}
func executeNodeCameraSnap(ctx context.Context, info nodes.NodeInfo, req nodes.Request) (nodes.Response, error) {
executor, err := getNodeLocalExecutor()
if err != nil {
return nodes.Response{}, err
}
outputPath, err := nodeCameraSnapFunc(ctx, executor.workspace, req.Args)
if err != nil {
return nodes.Response{}, err
}
artifact, err := buildNodeArtifact(executor.workspace, outputPath)
if err != nil {
return nodes.Response{}, err
}
return nodes.Response{
OK: true,
Code: "ok",
Node: info.ID,
Action: req.Action,
Payload: map[string]interface{}{
"transport": "clawgo-local",
"media_type": "image",
"storage": artifact["storage"],
"artifacts": []map[string]interface{}{artifact},
"meta": map[string]interface{}{
"facing": stringArg(req.Args, "facing"),
},
},
}, nil
}
func executeNodeScreenSnapshot(ctx context.Context, info nodes.NodeInfo, req nodes.Request) (nodes.Response, error) {
executor, err := getNodeLocalExecutor()
if err != nil {
return nodes.Response{}, err
}
outputPath, err := nodeScreenSnapFunc(ctx, executor.workspace, req.Args)
if err != nil {
return nodes.Response{}, err
}
artifact, err := buildNodeArtifact(executor.workspace, outputPath)
if err != nil {
return nodes.Response{}, err
}
return nodes.Response{
OK: true,
Code: "ok",
Node: info.ID,
Action: req.Action,
Payload: map[string]interface{}{
"transport": "clawgo-local",
"media_type": "image",
"storage": artifact["storage"],
"artifacts": []map[string]interface{}{artifact},
},
}, nil
}
func getNodeLocalExecutor() (*nodeLocalExecutor, error) {
key := strings.TrimSpace(getConfigPath())
if key == "" {
return nil, fmt.Errorf("config path is required")
}
nodeLocalExecutorMu.Lock()
defer nodeLocalExecutorMu.Unlock()
if existing := nodeLocalExecutors[key]; existing != nil {
return existing, nil
}
exec, err := nodeLocalExecutorFactory(key)
if err != nil {
return nil, err
}
nodeLocalExecutors[key] = exec
return exec, nil
}
func newNodeLocalExecutor(configPath string) (*nodeLocalExecutor, error) {
configPath = strings.TrimSpace(configPath)
if configPath == "" {
return nil, fmt.Errorf("config path is required")
}
return &nodeLocalExecutor{configPath: configPath}, nil
}
func (e *nodeLocalExecutor) Loop() (*agent.AgentLoop, error) {
if e == nil {
return nil, fmt.Errorf("node local executor is nil")
}
e.once.Do(func() {
prev := globalConfigPathOverride
globalConfigPathOverride = e.configPath
defer func() { globalConfigPathOverride = prev }()
cfg, err := loadConfig()
if err != nil {
e.err = err
return
}
runtimecfg.Set(cfg)
msgBus := bus.NewMessageBus()
cronStorePath := filepath.Join(filepath.Dir(e.configPath), "cron", "jobs.json")
cronService := cron.NewCronService(cronStorePath, nil)
configureCronServiceRuntime(cronService, cfg)
provider, err := nodeProviderFactory(cfg)
if err != nil {
e.err = err
return
}
e.workspace = cfg.WorkspacePath()
loop := nodeAgentLoopFactory(cfg, msgBus, provider, cronService)
loop.SetConfigPath(e.configPath)
e.loop = loop
})
if e.err != nil {
return nil, e.err
}
if e.loop == nil {
return nil, fmt.Errorf("node local executor unavailable")
}
return e.loop, nil
}
func stringArg(args map[string]interface{}, key string) string {
if len(args) == 0 {
return ""
}
value, ok := args[key]
if !ok || value == nil {
return ""
}
return strings.TrimSpace(fmt.Sprint(value))
}
func collectNodeArtifacts(workspace string, args map[string]interface{}) ([]map[string]interface{}, error) {
paths := stringListArg(args, "artifact_paths")
if len(paths) == 0 {
return []map[string]interface{}{}, nil
}
root := strings.TrimSpace(workspace)
if root == "" {
return nil, fmt.Errorf("workspace path not configured")
}
out := make([]map[string]interface{}, 0, len(paths))
for _, raw := range paths {
artifact, err := buildNodeArtifact(root, raw)
if err != nil {
return nil, err
}
out = append(out, artifact)
}
return out, nil
}
func buildNodeArtifact(workspace, rawPath string) (map[string]interface{}, error) {
rawPath = strings.TrimSpace(rawPath)
if rawPath == "" {
return nil, fmt.Errorf("artifact path is required")
}
clean := filepath.Clean(rawPath)
fullPath := clean
if !filepath.IsAbs(clean) {
fullPath = filepath.Join(workspace, clean)
}
fullPath = filepath.Clean(fullPath)
rel, err := filepath.Rel(workspace, fullPath)
if err != nil || rel == ".." || strings.HasPrefix(rel, ".."+string(os.PathSeparator)) {
return nil, fmt.Errorf("artifact path escapes workspace: %s", rawPath)
}
info, err := os.Stat(fullPath)
if err != nil {
return nil, err
}
if info.IsDir() {
return nil, fmt.Errorf("artifact path must be file: %s", rawPath)
}
artifact := map[string]interface{}{
"name": filepath.Base(fullPath),
"kind": nodeArtifactKindFromPath(fullPath),
"source_path": filepath.ToSlash(rel),
"size_bytes": info.Size(),
}
if mimeType := mimeTypeForPath(fullPath); mimeType != "" {
artifact["mime_type"] = mimeType
}
data, err := os.ReadFile(fullPath)
if err != nil {
return nil, err
}
if shouldInlineAsText(fullPath, data, info.Mode()) {
artifact["storage"] = "inline"
artifact["content_text"] = string(data)
return artifact, nil
}
artifact["storage"] = "inline"
if len(data) > nodeArtifactInlineLimit {
data = data[:nodeArtifactInlineLimit]
artifact["truncated"] = true
}
artifact["content_base64"] = base64.StdEncoding.EncodeToString(data)
return artifact, nil
}
func stringListArg(args map[string]interface{}, key string) []string {
if len(args) == 0 {
return nil
}
items, ok := args[key].([]interface{})
if !ok {
return nil
}
out := make([]string, 0, len(items))
for _, item := range items {
value := strings.TrimSpace(fmt.Sprint(item))
if value == "" {
continue
}
out = append(out, value)
}
return out
}
func mimeTypeForPath(path string) string {
switch strings.ToLower(filepath.Ext(path)) {
case ".md":
return "text/markdown"
case ".txt", ".log", ".json", ".yaml", ".yml", ".xml", ".csv":
return "text/plain"
case ".png":
return "image/png"
case ".jpg", ".jpeg":
return "image/jpeg"
case ".gif":
return "image/gif"
case ".webp":
return "image/webp"
case ".mp4":
return "video/mp4"
case ".mov":
return "video/quicktime"
case ".pdf":
return "application/pdf"
default:
return ""
}
}
func nodeArtifactKindFromPath(path string) string {
ext := strings.ToLower(filepath.Ext(path))
switch ext {
case ".png", ".jpg", ".jpeg", ".gif", ".webp":
return "image"
case ".mp4", ".mov", ".webm":
return "video"
case ".pdf":
return "document"
default:
return "file"
}
}
func shouldInlineAsText(path string, data []byte, mode fs.FileMode) bool {
if mode&fs.ModeType != 0 {
return false
}
switch strings.ToLower(filepath.Ext(path)) {
case ".md", ".txt", ".log", ".json", ".yaml", ".yml", ".xml", ".csv", ".go", ".ts", ".tsx", ".js", ".jsx", ".css", ".html", ".sh":
return len(data) <= nodeArtifactInlineLimit
default:
return false
}
}
func captureNodeCameraSnapshot(ctx context.Context, workspace string, args map[string]interface{}) (string, error) {
outputPath, err := nodeMediaOutputPath(workspace, "camera", ".jpg", stringArg(args, "filename"))
if err != nil {
return "", err
}
switch runtime.GOOS {
case "linux":
if _, err := os.Stat("/dev/video0"); err != nil {
return "", fmt.Errorf("camera device /dev/video0 not found")
}
if _, err := exec.LookPath("ffmpeg"); err != nil {
return "", fmt.Errorf("ffmpeg not installed")
}
cmd := exec.CommandContext(ctx, "ffmpeg", "-y", "-f", "video4linux2", "-i", "/dev/video0", "-vframes", "1", "-q:v", "2", outputPath)
if out, err := cmd.CombinedOutput(); err != nil {
return "", fmt.Errorf("camera capture failed: %v, output=%s", err, strings.TrimSpace(string(out)))
}
return outputPath, nil
case "darwin":
if _, err := exec.LookPath("imagesnap"); err != nil {
return "", fmt.Errorf("imagesnap not installed")
}
cmd := exec.CommandContext(ctx, "imagesnap", "-q", outputPath)
if out, err := cmd.CombinedOutput(); err != nil {
return "", fmt.Errorf("camera capture failed: %v, output=%s", err, strings.TrimSpace(string(out)))
}
return outputPath, nil
default:
return "", fmt.Errorf("camera_snap not supported on %s", runtime.GOOS)
}
}
func captureNodeScreenSnapshot(ctx context.Context, workspace string, args map[string]interface{}) (string, error) {
outputPath, err := nodeMediaOutputPath(workspace, "screen", ".png", stringArg(args, "filename"))
if err != nil {
return "", err
}
switch runtime.GOOS {
case "darwin":
cmd := exec.CommandContext(ctx, "screencapture", "-x", outputPath)
if out, err := cmd.CombinedOutput(); err != nil {
return "", fmt.Errorf("screen capture failed: %v, output=%s", err, strings.TrimSpace(string(out)))
}
return outputPath, nil
case "linux":
candidates := [][]string{
{"grim", outputPath},
{"gnome-screenshot", "-f", outputPath},
{"scrot", outputPath},
{"import", "-window", "root", outputPath},
}
for _, candidate := range candidates {
if _, err := exec.LookPath(candidate[0]); err != nil {
continue
}
cmd := exec.CommandContext(ctx, candidate[0], candidate[1:]...)
if out, err := cmd.CombinedOutput(); err == nil {
return outputPath, nil
} else if strings.TrimSpace(string(out)) != "" {
continue
}
}
return "", fmt.Errorf("no supported screen capture command found (grim, gnome-screenshot, scrot, import)")
default:
return "", fmt.Errorf("screen_snapshot not supported on %s", runtime.GOOS)
}
}
func nodeMediaOutputPath(workspace, kind, ext, requested string) (string, error) {
root := strings.TrimSpace(workspace)
if root == "" {
return "", fmt.Errorf("workspace path not configured")
}
baseDir := filepath.Join(root, "artifacts", "node")
if err := os.MkdirAll(baseDir, 0755); err != nil {
return "", err
}
filename := strings.TrimSpace(requested)
if filename == "" {
filename = fmt.Sprintf("%s_%d%s", kind, time.Now().UnixNano(), ext)
}
filename = filepath.Clean(filename)
if filepath.IsAbs(filename) {
return "", fmt.Errorf("filename must be relative to workspace")
}
fullPath := filepath.Join(baseDir, filename)
fullPath = filepath.Clean(fullPath)
rel, err := filepath.Rel(root, fullPath)
if err != nil || rel == ".." || strings.HasPrefix(rel, ".."+string(os.PathSeparator)) {
return "", fmt.Errorf("capture path escapes workspace")
}
if err := os.MkdirAll(filepath.Dir(fullPath), 0755); err != nil {
return "", err
}
return fullPath, nil
}
func structToWirePayload(v interface{}) map[string]interface{} {
b, _ := json.Marshal(v)
var out map[string]interface{}

View File

@@ -5,14 +5,30 @@ import (
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"testing"
"time"
"clawgo/pkg/agent"
"clawgo/pkg/config"
"clawgo/pkg/nodes"
"clawgo/pkg/providers"
)
type stubNodeProvider struct {
content string
}
func (p stubNodeProvider) Chat(ctx context.Context, messages []providers.Message, tools []providers.ToolDefinition, model string, options map[string]interface{}) (*providers.LLMResponse, error) {
return &providers.LLMResponse{Content: p.content, FinishReason: "stop"}, nil
}
func (p stubNodeProvider) GetDefaultModel() string {
return "stub-model"
}
func TestParseNodeRegisterArgsDefaults(t *testing.T) {
t.Parallel()
@@ -106,6 +122,37 @@ func TestPostNodeHeartbeatSendsNodeID(t *testing.T) {
}
}
func TestNodeAgentsFromConfigCollectsEnabledAgents(t *testing.T) {
t.Parallel()
cfg := config.DefaultConfig()
cfg.Agents.Subagents["main"] = config.SubagentConfig{
Enabled: true,
Type: "router",
DisplayName: "Main Agent",
Role: "orchestrator",
}
cfg.Agents.Subagents["coder"] = config.SubagentConfig{
Enabled: true,
Type: "worker",
DisplayName: "Code Agent",
Role: "code",
}
cfg.Agents.Subagents["tester"] = config.SubagentConfig{
Enabled: false,
Type: "worker",
DisplayName: "Test Agent",
Role: "test",
}
items := nodeAgentsFromConfig(cfg)
if len(items) != 2 {
t.Fatalf("expected 2 enabled agents, got %+v", items)
}
if items[0].ID != "coder" || items[1].ID != "main" {
t.Fatalf("unexpected agent export order: %+v", items)
}
}
func TestNodeWebsocketURL(t *testing.T) {
t.Parallel()
@@ -130,3 +177,212 @@ func TestNodeSocketPingInterval(t *testing.T) {
t.Fatalf("expected half heartbeat, got %s", got)
}
}
func TestExecuteNodeRequestRunsLocalMainAgentTask(t *testing.T) {
prevCfg := globalConfigPathOverride
prevProviderFactory := nodeProviderFactory
prevLoopFactory := nodeAgentLoopFactory
prevExecutors := nodeLocalExecutors
globalConfigPathOverride = filepath.Join(t.TempDir(), "config.json")
nodeLocalExecutors = map[string]*nodeLocalExecutor{}
nodeProviderFactory = func(cfg *config.Config) (providers.LLMProvider, error) {
return stubNodeProvider{content: "main-local-ok"}, nil
}
nodeAgentLoopFactory = agent.NewAgentLoop
defer func() {
globalConfigPathOverride = prevCfg
nodeProviderFactory = prevProviderFactory
nodeAgentLoopFactory = prevLoopFactory
nodeLocalExecutors = prevExecutors
}()
cfg := config.DefaultConfig()
cfg.Agents.Defaults.Workspace = filepath.Join(t.TempDir(), "workspace")
cfg.Agents.Subagents["main"] = config.SubagentConfig{
Enabled: true,
Type: "router",
Role: "orchestrator",
}
if err := config.SaveConfig(globalConfigPathOverride, cfg); err != nil {
t.Fatalf("save config: %v", err)
}
info := nodes.NodeInfo{ID: "edge-a", Name: "Edge A"}
resp := executeNodeRequest(context.Background(), &http.Client{Timeout: time.Second}, info, nodeRegisterOptions{}, &nodes.Request{
Action: "agent_task",
Task: "say ok",
})
if !resp.OK {
t.Fatalf("expected ok response, got %+v", resp)
}
if got := strings.TrimSpace(resp.Payload["result"].(string)); got != "main-local-ok" {
t.Fatalf("unexpected result: %+v", resp.Payload)
}
if got := strings.TrimSpace(resp.Payload["agent_id"].(string)); got != "main" {
t.Fatalf("unexpected agent id: %+v", resp.Payload)
}
}
func TestExecuteNodeRequestRunsLocalSubagentTask(t *testing.T) {
prevCfg := globalConfigPathOverride
prevProviderFactory := nodeProviderFactory
prevLoopFactory := nodeAgentLoopFactory
prevExecutors := nodeLocalExecutors
globalConfigPathOverride = filepath.Join(t.TempDir(), "config.json")
nodeLocalExecutors = map[string]*nodeLocalExecutor{}
nodeProviderFactory = func(cfg *config.Config) (providers.LLMProvider, error) {
return stubNodeProvider{content: "coder-local-ok"}, nil
}
nodeAgentLoopFactory = agent.NewAgentLoop
defer func() {
globalConfigPathOverride = prevCfg
nodeProviderFactory = prevProviderFactory
nodeAgentLoopFactory = prevLoopFactory
nodeLocalExecutors = prevExecutors
}()
cfg := config.DefaultConfig()
cfg.Agents.Defaults.Workspace = filepath.Join(t.TempDir(), "workspace")
cfg.Agents.Subagents["main"] = config.SubagentConfig{
Enabled: true,
Type: "router",
Role: "orchestrator",
}
cfg.Agents.Subagents["coder"] = config.SubagentConfig{
Enabled: true,
Type: "worker",
Role: "code",
}
if err := os.MkdirAll(filepath.Join(cfg.Agents.Defaults.Workspace, "out"), 0755); err != nil {
t.Fatalf("mkdir artifact dir: %v", err)
}
if err := os.WriteFile(filepath.Join(cfg.Agents.Defaults.Workspace, "out", "result.txt"), []byte("artifact-body"), 0644); err != nil {
t.Fatalf("write artifact: %v", err)
}
if err := config.SaveConfig(globalConfigPathOverride, cfg); err != nil {
t.Fatalf("save config: %v", err)
}
info := nodes.NodeInfo{ID: "edge-b", Name: "Edge B"}
resp := executeNodeRequest(context.Background(), &http.Client{Timeout: time.Second}, info, nodeRegisterOptions{}, &nodes.Request{
Action: "agent_task",
Task: "write tests",
Args: map[string]interface{}{"remote_agent_id": "coder", "artifact_paths": []interface{}{"out/result.txt"}},
})
if !resp.OK {
t.Fatalf("expected ok response, got %+v", resp)
}
if got := strings.TrimSpace(resp.Payload["result"].(string)); !strings.Contains(got, "coder-local-ok") {
t.Fatalf("unexpected result: %+v", resp.Payload)
}
if got := strings.TrimSpace(resp.Payload["agent_id"].(string)); got != "coder" {
t.Fatalf("unexpected agent id: %+v", resp.Payload)
}
artifacts, ok := resp.Payload["artifacts"].([]map[string]interface{})
if !ok || len(artifacts) != 1 {
t.Fatalf("expected one artifact, got %+v", resp.Payload["artifacts"])
}
if artifacts[0]["content_text"] != "artifact-body" {
t.Fatalf("unexpected artifact payload: %+v", artifacts[0])
}
}
func TestCollectNodeArtifactsRejectsPathEscape(t *testing.T) {
t.Parallel()
_, err := collectNodeArtifacts(t.TempDir(), map[string]interface{}{
"artifact_paths": []interface{}{"../secret.txt"},
})
if err == nil || !strings.Contains(err.Error(), "escapes workspace") {
t.Fatalf("expected workspace escape error, got %v", err)
}
}
func TestExecuteNodeRequestRunsLocalCameraSnap(t *testing.T) {
prevCfg := globalConfigPathOverride
prevExecutors := nodeLocalExecutors
prevCamera := nodeCameraSnapFunc
globalConfigPathOverride = filepath.Join(t.TempDir(), "config.json")
nodeLocalExecutors = map[string]*nodeLocalExecutor{}
defer func() {
globalConfigPathOverride = prevCfg
nodeLocalExecutors = prevExecutors
nodeCameraSnapFunc = prevCamera
}()
cfg := config.DefaultConfig()
cfg.Agents.Defaults.Workspace = filepath.Join(t.TempDir(), "workspace")
if err := config.SaveConfig(globalConfigPathOverride, cfg); err != nil {
t.Fatalf("save config: %v", err)
}
nodeCameraSnapFunc = func(ctx context.Context, workspace string, args map[string]interface{}) (string, error) {
out := filepath.Join(workspace, "artifacts", "node", "camera-test.jpg")
if err := os.MkdirAll(filepath.Dir(out), 0755); err != nil {
return "", err
}
if err := os.WriteFile(out, []byte("camera-bytes"), 0644); err != nil {
return "", err
}
return out, nil
}
info := nodes.NodeInfo{ID: "edge-cam", Name: "Edge Cam"}
resp := executeNodeRequest(context.Background(), &http.Client{Timeout: time.Second}, info, nodeRegisterOptions{}, &nodes.Request{
Action: "camera_snap",
Args: map[string]interface{}{"facing": "front"},
})
if !resp.OK {
t.Fatalf("expected ok response, got %+v", resp)
}
artifacts, ok := resp.Payload["artifacts"].([]map[string]interface{})
if !ok || len(artifacts) != 1 {
t.Fatalf("expected one artifact, got %+v", resp.Payload["artifacts"])
}
if artifacts[0]["name"] != "camera-test.jpg" {
t.Fatalf("unexpected artifact: %+v", artifacts[0])
}
}
func TestExecuteNodeRequestRunsLocalScreenSnapshot(t *testing.T) {
prevCfg := globalConfigPathOverride
prevExecutors := nodeLocalExecutors
prevScreen := nodeScreenSnapFunc
globalConfigPathOverride = filepath.Join(t.TempDir(), "config.json")
nodeLocalExecutors = map[string]*nodeLocalExecutor{}
defer func() {
globalConfigPathOverride = prevCfg
nodeLocalExecutors = prevExecutors
nodeScreenSnapFunc = prevScreen
}()
cfg := config.DefaultConfig()
cfg.Agents.Defaults.Workspace = filepath.Join(t.TempDir(), "workspace")
if err := config.SaveConfig(globalConfigPathOverride, cfg); err != nil {
t.Fatalf("save config: %v", err)
}
nodeScreenSnapFunc = func(ctx context.Context, workspace string, args map[string]interface{}) (string, error) {
out := filepath.Join(workspace, "artifacts", "node", "screen-test.png")
if err := os.MkdirAll(filepath.Dir(out), 0755); err != nil {
return "", err
}
if err := os.WriteFile(out, []byte{0x89, 0x50, 0x4e, 0x47}, 0644); err != nil {
return "", err
}
return out, nil
}
info := nodes.NodeInfo{ID: "edge-screen", Name: "Edge Screen"}
resp := executeNodeRequest(context.Background(), &http.Client{Timeout: time.Second}, info, nodeRegisterOptions{}, &nodes.Request{
Action: "screen_snapshot",
})
if !resp.OK {
t.Fatalf("expected ok response, got %+v", resp)
}
artifacts, ok := resp.Payload["artifacts"].([]map[string]interface{})
if !ok || len(artifacts) != 1 {
t.Fatalf("expected one artifact, got %+v", resp.Payload["artifacts"])
}
if artifacts[0]["name"] != "screen-test.png" {
t.Fatalf("unexpected artifact: %+v", artifacts[0])
}
}

View File

@@ -352,10 +352,15 @@ func (al *AgentLoop) dispatchNodeSubagentTask(ctx context.Context, task *tools.S
return "", fmt.Errorf("node-backed subagent %q missing node_id", task.AgentID)
}
taskInput := loopTaskInputForNode(task)
reqArgs := map[string]interface{}{}
if remoteAgentID := remoteAgentIDForNodeBranch(task.AgentID, nodeID); remoteAgentID != "" {
reqArgs["remote_agent_id"] = remoteAgentID
}
resp, err := al.nodeRouter.Dispatch(ctx, nodes.Request{
Action: "agent_task",
Node: nodeID,
Task: taskInput,
Args: reqArgs,
}, "auto")
if err != nil {
return "", err
@@ -372,6 +377,23 @@ func (al *AgentLoop) dispatchNodeSubagentTask(ctx context.Context, task *tools.S
return fmt.Sprintf("node %s completed agent_task", nodeID), nil
}
func remoteAgentIDForNodeBranch(agentID, nodeID string) string {
agentID = strings.TrimSpace(agentID)
nodeID = strings.TrimSpace(nodeID)
if agentID == "" || nodeID == "" {
return ""
}
prefix := "node." + nodeID + "."
if !strings.HasPrefix(agentID, prefix) {
return ""
}
remote := strings.TrimPrefix(agentID, prefix)
if strings.TrimSpace(remote) == "" {
return ""
}
return remote
}
func loopTaskInputForNode(task *tools.SubagentTask) string {
if task == nil {
return ""

View File

@@ -23,6 +23,9 @@ func TestDispatchNodeSubagentTaskUsesNodeAgentTask(t *testing.T) {
if req.Action != "agent_task" {
t.Fatalf("unexpected action: %s", req.Action)
}
if got, _ := req.Args["remote_agent_id"].(string); got != "coder" {
t.Fatalf("expected remote_agent_id=coder, got %+v", req.Args)
}
if !strings.Contains(req.Task, "Parent Agent: main") {
t.Fatalf("expected parent-agent context in task, got %q", req.Task)
}
@@ -43,7 +46,7 @@ func TestDispatchNodeSubagentTaskUsesNodeAgentTask(t *testing.T) {
}
out, err := loop.dispatchNodeSubagentTask(context.Background(), &tools.SubagentTask{
ID: "subagent-1",
AgentID: "node.edge-dev.main",
AgentID: "node.edge-dev.coder",
Transport: "node",
NodeID: "edge-dev",
ParentAgentID: "main",

View File

@@ -251,7 +251,19 @@ func (m *Manager) SupportsAction(nodeID, action string) bool {
if !ok || !n.Online {
return false
}
action = strings.ToLower(strings.TrimSpace(action))
return nodeSupportsRequest(n, Request{Action: action})
}
func (m *Manager) SupportsRequest(nodeID string, req Request) bool {
n, ok := m.Get(nodeID)
if !ok || !n.Online {
return false
}
return nodeSupportsRequest(n, req)
}
func nodeSupportsRequest(n NodeInfo, req Request) bool {
action := strings.ToLower(strings.TrimSpace(req.Action))
if len(n.Actions) > 0 {
allowed := false
for _, a := range n.Actions {
@@ -283,44 +295,112 @@ func (m *Manager) SupportsAction(nodeID, action string) bool {
}
func (m *Manager) PickFor(action string) (NodeInfo, bool) {
return m.PickRequest(Request{Action: action}, "auto")
}
func (m *Manager) PickRequest(req Request, mode string) (NodeInfo, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
bestScore := -1
bestNode := NodeInfo{}
for _, n := range m.nodes {
if !n.Online {
score, ok := scoreNodeCandidate(n, req, mode, m.senders[strings.TrimSpace(n.ID)] != nil)
if !ok {
continue
}
switch strings.ToLower(strings.TrimSpace(action)) {
case "run":
if n.Capabilities.Run {
return n, true
}
case "agent_task":
if n.Capabilities.Model {
return n, true
}
case "camera_snap", "camera_clip":
if n.Capabilities.Camera {
return n, true
}
case "screen_record", "screen_snapshot":
if n.Capabilities.Screen {
return n, true
}
case "location_get":
if n.Capabilities.Location {
return n, true
}
case "canvas_snapshot", "canvas_action":
if n.Capabilities.Canvas {
return n, true
}
default:
if n.Capabilities.Invoke {
return n, true
}
if score > bestScore || (score == bestScore && bestNode.ID != "" && n.LastSeenAt.After(bestNode.LastSeenAt)) {
bestScore = score
bestNode = n
}
}
return NodeInfo{}, false
if bestScore < 0 || strings.TrimSpace(bestNode.ID) == "" {
return NodeInfo{}, false
}
return bestNode, true
}
func scoreNodeCandidate(n NodeInfo, req Request, mode string, hasWireSender bool) (int, bool) {
if !n.Online {
return 0, false
}
if !nodeSupportsRequest(n, req) {
return 0, false
}
mode = strings.ToLower(strings.TrimSpace(mode))
if mode == "p2p" && !hasWireSender {
return 0, false
}
score := 100
if hasWireSender {
score += 30
}
if prefersRealtimeTransport(req.Action) && hasWireSender {
score += 40
}
if mode == "relay" && hasWireSender {
score -= 10
}
if mode == "p2p" && hasWireSender {
score += 80
}
if strings.EqualFold(strings.TrimSpace(req.Action), "agent_task") {
remoteAgentID := requestedRemoteAgentID(req.Args)
switch {
case remoteAgentID == "", remoteAgentID == "main":
score += 20
case nodeHasAgent(n, remoteAgentID):
score += 80
default:
return 0, false
}
}
if !n.LastSeenAt.IsZero() {
ageSeconds := int(time.Since(n.LastSeenAt).Seconds())
if ageSeconds < 0 {
ageSeconds = 0
}
if ageSeconds < 60 {
score += 20
} else if ageSeconds < 300 {
score += 5
}
}
return score, true
}
func requestedRemoteAgentID(args map[string]interface{}) string {
if len(args) == 0 {
return ""
}
value, ok := args["remote_agent_id"]
if !ok || value == nil {
return ""
}
return strings.ToLower(strings.TrimSpace(fmt.Sprint(value)))
}
func nodeHasAgent(n NodeInfo, agentID string) bool {
agentID = strings.ToLower(strings.TrimSpace(agentID))
if agentID == "" {
return false
}
for _, agent := range n.Agents {
if strings.ToLower(strings.TrimSpace(agent.ID)) == agentID {
return true
}
}
return false
}
func prefersRealtimeTransport(action string) bool {
switch strings.ToLower(strings.TrimSpace(action)) {
case "camera_snap", "camera_clip", "screen_record", "screen_snapshot", "canvas_snapshot", "canvas_action":
return true
default:
return false
}
}
func (m *Manager) reaperLoop() {

76
pkg/nodes/manager_test.go Normal file
View File

@@ -0,0 +1,76 @@
package nodes
import (
"testing"
"time"
)
func TestPickRequestPrefersMatchingRemoteAgent(t *testing.T) {
t.Parallel()
manager := NewManager()
now := time.Now().UTC()
manager.Upsert(NodeInfo{
ID: "node-main-only",
Online: true,
LastSeenAt: now,
Capabilities: Capabilities{
Model: true,
},
Agents: []AgentInfo{{ID: "main"}},
})
manager.Upsert(NodeInfo{
ID: "node-coder",
Online: true,
LastSeenAt: now,
Capabilities: Capabilities{
Model: true,
},
Agents: []AgentInfo{{ID: "main"}, {ID: "coder"}},
})
picked, ok := manager.PickRequest(Request{
Action: "agent_task",
Args: map[string]interface{}{"remote_agent_id": "coder"},
}, "auto")
if !ok {
t.Fatalf("expected node pick")
}
if picked.ID != "node-coder" {
t.Fatalf("expected node-coder, got %+v", picked)
}
}
func TestPickRequestPrefersRealtimeCapableNodeForScreenActions(t *testing.T) {
t.Parallel()
manager := NewManager()
now := time.Now().UTC()
manager.Upsert(NodeInfo{
ID: "relay-only",
Online: true,
LastSeenAt: now.Add(-2 * time.Minute),
Capabilities: Capabilities{
Screen: true,
},
Actions: []string{"screen_snapshot"},
})
manager.Upsert(NodeInfo{
ID: "p2p-ready",
Online: true,
LastSeenAt: now,
Capabilities: Capabilities{
Screen: true,
},
Actions: []string{"screen_snapshot"},
})
manager.RegisterWireSender("p2p-ready", &captureWireSender{})
picked, ok := manager.PickRequest(Request{Action: "screen_snapshot"}, "auto")
if !ok {
t.Fatalf("expected node pick")
}
if picked.ID != "p2p-ready" {
t.Fatalf("expected p2p-ready, got %+v", picked)
}
}

View File

@@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/http"
"path/filepath"
"strings"
"time"
)
@@ -211,5 +212,101 @@ func normalizeDevicePayload(action string, payload map[string]interface{}) map[s
if _, ok := payload["meta"]; !ok {
payload["meta"] = map[string]interface{}{}
}
payload["artifacts"] = normalizeArtifacts(payload, a)
return payload
}
func normalizeArtifacts(payload map[string]interface{}, action string) []map[string]interface{} {
if payload == nil {
return []map[string]interface{}{}
}
if raw, ok := payload["artifacts"]; ok {
items := normalizeArtifactList(raw)
if len(items) > 0 {
return items
}
}
artifact := map[string]interface{}{}
if mediaType, _ := payload["media_type"].(string); strings.TrimSpace(mediaType) != "" {
artifact["kind"] = strings.TrimSpace(mediaType)
}
if mimeType, _ := payload["mime_type"].(string); strings.TrimSpace(mimeType) != "" {
artifact["mime_type"] = strings.TrimSpace(mimeType)
}
if storage, _ := payload["storage"].(string); strings.TrimSpace(storage) != "" {
artifact["storage"] = strings.TrimSpace(storage)
}
if path, _ := payload["path"].(string); strings.TrimSpace(path) != "" {
artifact["path"] = filepath.Clean(strings.TrimSpace(path))
}
if url, _ := payload["url"].(string); strings.TrimSpace(url) != "" {
artifact["url"] = strings.TrimSpace(url)
}
if image, _ := payload["image"].(string); strings.TrimSpace(image) != "" {
artifact["content_base64"] = strings.TrimSpace(image)
}
if text, _ := payload["content_text"].(string); strings.TrimSpace(text) != "" {
artifact["content_text"] = text
}
if name, _ := payload["name"].(string); strings.TrimSpace(name) != "" {
artifact["name"] = strings.TrimSpace(name)
}
if size := int64FromPayload(payload["size_bytes"]); size > 0 {
artifact["size_bytes"] = size
}
if len(artifact) == 0 {
return []map[string]interface{}{}
}
if _, ok := artifact["kind"]; !ok && strings.TrimSpace(action) != "" {
artifact["kind"] = strings.ToLower(strings.TrimSpace(action))
}
return []map[string]interface{}{artifact}
}
func normalizeArtifactList(raw interface{}) []map[string]interface{} {
items, ok := raw.([]interface{})
if !ok {
return []map[string]interface{}{}
}
out := make([]map[string]interface{}, 0, len(items))
for _, item := range items {
row, ok := item.(map[string]interface{})
if !ok || len(row) == 0 {
continue
}
normalized := map[string]interface{}{}
for _, key := range []string{"id", "name", "kind", "mime_type", "storage", "path", "url", "content_text", "content_base64", "source_path"} {
if value, ok := row[key]; ok && strings.TrimSpace(fmt.Sprint(value)) != "" {
normalized[key] = value
}
}
if truncated, ok := row["truncated"].(bool); ok && truncated {
normalized["truncated"] = true
}
if size := int64FromPayload(row["size_bytes"]); size > 0 {
normalized["size_bytes"] = size
}
if len(normalized) == 0 {
continue
}
out = append(out, normalized)
}
return out
}
func int64FromPayload(v interface{}) int64 {
switch value := v.(type) {
case int:
return int64(value)
case int64:
return value
case float64:
return int64(value)
case json.Number:
n, _ := value.Int64()
return n
default:
return 0
}
}

View File

@@ -74,6 +74,24 @@ func TestWebsocketP2PTransportSend(t *testing.T) {
}
}
func TestNormalizeDevicePayloadBuildsArtifacts(t *testing.T) {
t.Parallel()
payload := normalizeDevicePayload("screen_snapshot", map[string]interface{}{
"media_type": "image",
"storage": "path",
"path": "/tmp/screen.png",
"mime_type": "image/png",
})
artifacts, ok := payload["artifacts"].([]map[string]interface{})
if !ok || len(artifacts) != 1 {
t.Fatalf("expected one artifact, got %+v", payload["artifacts"])
}
if artifacts[0]["kind"] != "image" || artifacts[0]["path"] != "/tmp/screen.png" {
t.Fatalf("unexpected artifact payload: %+v", artifacts[0])
}
}
func TestWebRTCTransportSendEndToEnd(t *testing.T) {
t.Parallel()

View File

@@ -13,6 +13,31 @@ type Capabilities struct {
Canvas bool `json:"canvas"`
}
// AgentInfo describes an enabled agent exposed by a remote clawgo node.
type AgentInfo struct {
ID string `json:"id"`
DisplayName string `json:"display_name,omitempty"`
Role string `json:"role,omitempty"`
Type string `json:"type,omitempty"`
Transport string `json:"transport,omitempty"`
ParentAgentID string `json:"parent_agent_id,omitempty"`
}
// Artifact describes a file/media payload returned from a node action.
type Artifact struct {
ID string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Kind string `json:"kind,omitempty"`
MIMEType string `json:"mime_type,omitempty"`
Storage string `json:"storage,omitempty"`
Path string `json:"path,omitempty"`
URL string `json:"url,omitempty"`
ContentText string `json:"content_text,omitempty"`
ContentB64 string `json:"content_base64,omitempty"`
SizeBytes int64 `json:"size_bytes,omitempty"`
SourcePath string `json:"source_path,omitempty"`
}
// NodeInfo is the runtime descriptor for cross-device scheduling.
type NodeInfo struct {
ID string `json:"id"`
@@ -25,6 +50,7 @@ type NodeInfo struct {
Capabilities Capabilities `json:"capabilities"`
Actions []string `json:"actions,omitempty"`
Models []string `json:"models,omitempty"`
Agents []AgentInfo `json:"agents,omitempty"`
RegisteredAt time.Time `json:"registered_at,omitempty"`
LastSeenAt time.Time `json:"last_seen_at"`
Online bool `json:"online"`
@@ -51,15 +77,15 @@ type Response struct {
// WireMessage is the websocket envelope for node lifecycle messages.
type WireMessage struct {
Type string `json:"type"`
ID string `json:"id,omitempty"`
From string `json:"from,omitempty"`
To string `json:"to,omitempty"`
Session string `json:"session,omitempty"`
Node *NodeInfo `json:"node,omitempty"`
Request *Request `json:"request,omitempty"`
Response *Response `json:"response,omitempty"`
Payload map[string]interface{} `json:"payload,omitempty"`
Type string `json:"type"`
ID string `json:"id,omitempty"`
From string `json:"from,omitempty"`
To string `json:"to,omitempty"`
Session string `json:"session,omitempty"`
Node *NodeInfo `json:"node,omitempty"`
Request *Request `json:"request,omitempty"`
Response *Response `json:"response,omitempty"`
Payload map[string]interface{} `json:"payload,omitempty"`
}
// WireAck is the websocket response envelope for node lifecycle messages.

View File

@@ -28,15 +28,16 @@ func (t *NodesTool) Description() string {
}
func (t *NodesTool) Parameters() map[string]interface{} {
return map[string]interface{}{"type": "object", "properties": map[string]interface{}{
"action": map[string]interface{}{"type": "string", "description": "status|describe|run|invoke|agent_task|camera_snap|camera_clip|screen_record|screen_snapshot|location_get|canvas_snapshot|canvas_action"},
"node": map[string]interface{}{"type": "string", "description": "target node id"},
"mode": map[string]interface{}{"type": "string", "description": "auto|p2p|relay (default auto)"},
"args": map[string]interface{}{"type": "object", "description": "action args"},
"task": map[string]interface{}{"type": "string", "description": "agent_task content for child node model"},
"model": map[string]interface{}{"type": "string", "description": "optional model for agent_task"},
"command": map[string]interface{}{"type": "array", "items": map[string]interface{}{"type": "string"}, "description": "run command array shortcut"},
"facing": map[string]interface{}{"type": "string", "description": "camera facing: front|back|both"},
"duration_ms": map[string]interface{}{"type": "integer", "description": "clip/record duration"},
"action": map[string]interface{}{"type": "string", "description": "status|describe|run|invoke|agent_task|camera_snap|camera_clip|screen_record|screen_snapshot|location_get|canvas_snapshot|canvas_action"},
"node": map[string]interface{}{"type": "string", "description": "target node id"},
"mode": map[string]interface{}{"type": "string", "description": "auto|p2p|relay (default auto)"},
"args": map[string]interface{}{"type": "object", "description": "action args"},
"artifact_paths": map[string]interface{}{"type": "array", "items": map[string]interface{}{"type": "string"}, "description": "optional workspace-relative file paths to bring back as artifacts for agent_task"},
"task": map[string]interface{}{"type": "string", "description": "agent_task content for child node model"},
"model": map[string]interface{}{"type": "string", "description": "optional model for agent_task"},
"command": map[string]interface{}{"type": "array", "items": map[string]interface{}{"type": "string"}, "description": "run command array shortcut"},
"facing": map[string]interface{}{"type": "string", "description": "camera facing: front|back|both"},
"duration_ms": map[string]interface{}{"type": "integer", "description": "clip/record duration"},
}, "required": []string{"action"}}
}
@@ -66,26 +67,15 @@ func (t *NodesTool) Execute(ctx context.Context, args map[string]interface{}) (s
b, _ := json.Marshal(t.manager.List())
return string(b), nil
default:
if nodeID == "" {
if picked, ok := t.manager.PickFor(action); ok {
nodeID = picked.ID
}
}
if nodeID == "" {
return "", fmt.Errorf("no eligible node found for action=%s", action)
}
if !t.manager.SupportsAction(nodeID, action) {
return "", fmt.Errorf("node %s does not support action=%s", nodeID, action)
}
if t.router == nil {
return "", fmt.Errorf("nodes transport router not configured")
}
reqArgs := map[string]interface{}{}
if raw, ok := args["args"].(map[string]interface{}); ok {
for k, v := range raw {
reqArgs[k] = v
}
}
if rawPaths, ok := args["artifact_paths"].([]interface{}); ok && len(rawPaths) > 0 {
reqArgs["artifact_paths"] = rawPaths
}
if cmd, ok := args["command"].([]interface{}); ok && len(cmd) > 0 {
reqArgs["command"] = cmd
}
@@ -113,7 +103,21 @@ func (t *NodesTool) Execute(ctx context.Context, args map[string]interface{}) (s
return "", fmt.Errorf("invalid_args: canvas_action requires args.action")
}
}
if nodeID == "" {
if picked, ok := t.manager.PickRequest(nodes.Request{Action: action, Task: task, Model: model, Args: reqArgs}, mode); ok {
nodeID = picked.ID
}
}
if nodeID == "" {
return "", fmt.Errorf("no eligible node found for action=%s", action)
}
req := nodes.Request{Action: action, Node: nodeID, Task: task, Model: model, Args: reqArgs}
if !t.manager.SupportsRequest(nodeID, req) {
return "", fmt.Errorf("node %s does not support action=%s", nodeID, action)
}
if t.router == nil {
return "", fmt.Errorf("nodes transport router not configured")
}
started := time.Now()
resp, err := t.router.Dispatch(ctx, req, mode)
durationMs := int(time.Since(started).Milliseconds())
@@ -150,6 +154,12 @@ func (t *NodesTool) writeAudit(req nodes.Request, resp nodes.Response, mode stri
if fallback, _ := resp.Payload["fallback_from"].(string); strings.TrimSpace(fallback) != "" {
row["fallback_from"] = strings.TrimSpace(fallback)
}
if count, kinds := artifactAuditSummary(resp.Payload["artifacts"]); count > 0 {
row["artifact_count"] = count
if len(kinds) > 0 {
row["artifact_kinds"] = kinds
}
}
b, _ := json.Marshal(row)
f, err := os.OpenFile(t.auditPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
@@ -158,3 +168,29 @@ func (t *NodesTool) writeAudit(req nodes.Request, resp nodes.Response, mode stri
defer f.Close()
_, _ = f.Write(append(b, '\n'))
}
func artifactAuditSummary(raw interface{}) (int, []string) {
items, ok := raw.([]interface{})
if !ok {
if typed, ok := raw.([]map[string]interface{}); ok {
items = make([]interface{}, 0, len(typed))
for _, item := range typed {
items = append(items, item)
}
}
}
if len(items) == 0 {
return 0, nil
}
kinds := make([]string, 0, len(items))
for _, item := range items {
row, ok := item.(map[string]interface{})
if !ok {
continue
}
if kind, _ := row["kind"].(string); strings.TrimSpace(kind) != "" {
kinds = append(kinds, strings.TrimSpace(kind))
}
}
return len(items), kinds
}

View File

@@ -387,9 +387,10 @@ func (s *SubagentProfileStore) nodeProfileLocked(agentID string) (SubagentProfil
if isLocalNode(node.ID) {
continue
}
profile := profileFromNode(node, parentAgentID)
if profile.AgentID == id {
return profile, true
for _, profile := range profilesFromNode(node, parentAgentID) {
if profile.AgentID == id {
return profile, true
}
}
}
return SubagentProfile{}, false
@@ -439,20 +440,18 @@ func (s *SubagentProfileStore) nodeProfilesLocked() []SubagentProfile {
if isLocalNode(node.ID) {
continue
}
profile := profileFromNode(node, parentAgentID)
if profile.AgentID == "" {
continue
profiles := profilesFromNode(node, parentAgentID)
for _, profile := range profiles {
if profile.AgentID == "" {
continue
}
out = append(out, profile)
}
out = append(out, profile)
}
return out
}
func profileFromNode(node nodes.NodeInfo, parentAgentID string) SubagentProfile {
agentID := nodeBranchAgentID(node.ID)
if agentID == "" {
return SubagentProfile{}
}
func profilesFromNode(node nodes.NodeInfo, parentAgentID string) []SubagentProfile {
name := strings.TrimSpace(node.Name)
if name == "" {
name = strings.TrimSpace(node.ID)
@@ -461,17 +460,39 @@ func profileFromNode(node nodes.NodeInfo, parentAgentID string) SubagentProfile
if !node.Online {
status = "disabled"
}
return normalizeSubagentProfile(SubagentProfile{
AgentID: agentID,
rootAgentID := nodeBranchAgentID(node.ID)
if rootAgentID == "" {
return nil
}
out := []SubagentProfile{normalizeSubagentProfile(SubagentProfile{
AgentID: rootAgentID,
Name: name + " Main Agent",
Transport: "node",
NodeID: strings.TrimSpace(node.ID),
ParentAgentID: parentAgentID,
Role: "remote_main",
MemoryNamespace: agentID,
MemoryNamespace: rootAgentID,
Status: status,
ManagedBy: "node_registry",
})
})}
for _, agent := range node.Agents {
agentID := normalizeSubagentIdentifier(agent.ID)
if agentID == "" || agentID == "main" {
continue
}
out = append(out, normalizeSubagentProfile(SubagentProfile{
AgentID: nodeChildAgentID(node.ID, agentID),
Name: nodeChildAgentDisplayName(name, agent),
Transport: "node",
NodeID: strings.TrimSpace(node.ID),
ParentAgentID: rootAgentID,
Role: strings.TrimSpace(agent.Role),
MemoryNamespace: nodeChildAgentID(node.ID, agentID),
Status: status,
ManagedBy: "node_registry",
}))
}
return out
}
func nodeBranchAgentID(nodeID string) string {
@@ -482,6 +503,27 @@ func nodeBranchAgentID(nodeID string) string {
return "node." + id + ".main"
}
func nodeChildAgentID(nodeID, agentID string) string {
nodeID = normalizeSubagentIdentifier(nodeID)
agentID = normalizeSubagentIdentifier(agentID)
if nodeID == "" || agentID == "" {
return ""
}
return "node." + nodeID + "." + agentID
}
func nodeChildAgentDisplayName(nodeName string, agent nodes.AgentInfo) string {
base := strings.TrimSpace(agent.DisplayName)
if base == "" {
base = strings.TrimSpace(agent.ID)
}
nodeName = strings.TrimSpace(nodeName)
if nodeName == "" {
return base
}
return nodeName + " / " + base
}
func isLocalNode(nodeID string) bool {
return normalizeSubagentIdentifier(nodeID) == "local"
}

View File

@@ -208,6 +208,10 @@ func TestSubagentProfileStoreIncludesNodeMainBranchProfiles(t *testing.T) {
ID: "edge-dev",
Name: "Edge Dev",
Online: true,
Agents: []nodes.AgentInfo{
{ID: "main", DisplayName: "Main Agent", Role: "orchestrator", Type: "router"},
{ID: "coder", DisplayName: "Code Agent", Role: "code", Type: "worker"},
},
Capabilities: nodes.Capabilities{
Model: true,
},
@@ -227,6 +231,19 @@ func TestSubagentProfileStoreIncludesNodeMainBranchProfiles(t *testing.T) {
if profile.ParentAgentID != "main" {
t.Fatalf("expected main parent agent, got %+v", profile)
}
childProfile, ok, err := store.Get("node.edge-dev.coder")
if err != nil {
t.Fatalf("get child profile failed: %v", err)
}
if !ok {
t.Fatalf("expected child node-backed profile")
}
if childProfile.ManagedBy != "node_registry" || childProfile.Transport != "node" || childProfile.NodeID != "edge-dev" {
t.Fatalf("unexpected child node profile: %+v", childProfile)
}
if childProfile.ParentAgentID != "node.edge-dev.main" {
t.Fatalf("expected child profile to attach to remote main, got %+v", childProfile)
}
if _, err := store.Upsert(SubagentProfile{AgentID: profile.AgentID}); err == nil {
t.Fatalf("expected node-managed upsert to fail")
}

View File

@@ -135,6 +135,7 @@ const resources = {
dashboardNodeDispatchTransport: 'Used Transport',
dashboardNodeDispatchFallback: 'Fallback From',
dashboardNodeDispatchDuration: 'Duration',
dashboardNodeDispatchArtifacts: 'Artifacts',
dashboardNodeDispatchError: 'Error',
configNodeP2P: 'Node P2P',
configNodeP2PHint: 'Configure websocket tunnel or WebRTC transport for remote nodes.',
@@ -687,6 +688,7 @@ const resources = {
dashboardNodeDispatchTransport: '实际传输',
dashboardNodeDispatchFallback: '回退来源',
dashboardNodeDispatchDuration: '耗时',
dashboardNodeDispatchArtifacts: '工件',
dashboardNodeDispatchError: '错误',
configNodeP2P: '节点 P2P',
configNodeP2PHint: '为远端节点配置 websocket tunnel 或 WebRTC 传输。',

View File

@@ -79,6 +79,8 @@ const Dashboard: React.FC = () => {
usedTransport: String(item?.used_transport || '-'),
fallbackFrom: String(item?.fallback_from || '').trim(),
durationMs: Number(item?.duration_ms || 0),
artifactCount: Number(item?.artifact_count || 0),
artifactKinds: Array.isArray(item?.artifact_kinds) ? item.artifact_kinds.map((kind: any) => String(kind || '').trim()).filter(Boolean) : [],
ok: Boolean(item?.ok),
error: String(item?.error || '').trim(),
}));
@@ -287,6 +289,12 @@ const Dashboard: React.FC = () => {
<div className="text-zinc-400">{t('dashboardNodeDispatchDuration')}</div>
<div className="text-zinc-200 mt-1">{`${item.durationMs}ms`}</div>
</div>
<div>
<div className="text-zinc-400">{t('dashboardNodeDispatchArtifacts')}</div>
<div className="text-zinc-200 mt-1">
{item.artifactCount > 0 ? `${item.artifactCount}${item.artifactKinds.length ? ` · ${item.artifactKinds.join(', ')}` : ''}` : '-'}
</div>
</div>
<div>
<div className="text-zinc-400">{t('dashboardNodeDispatchError')}</div>
<div className={`mt-1 break-all ${item.error ? 'text-rose-300' : 'text-zinc-500'}`}>