diff --git a/README.md b/README.md index feac4aa..d5ad05c 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,8 @@ 已新增 `nodes` 工具控制平面(PoC): - `action=status|describe`:查看已配对节点状态与能力矩阵 -- `action=run|invoke|camera_snap|screen_record|location_get`:已接入路由框架(下一阶段接数据平面传输) +- `action=run|invoke|camera_snap|screen_record|location_get`:已接入路由框架 +- `mode=auto|p2p|relay`:默认 `auto`(优先 p2p,失败回退 relay) 实现位置: - `pkg/nodes/types.go` diff --git a/README_EN.md b/README_EN.md index e0893fb..7d5e354 100644 --- a/README_EN.md +++ b/README_EN.md @@ -40,7 +40,8 @@ These changes improve stability, observability, and maintainability under concur A `nodes` tool control-plane PoC is now available: - `action=status|describe`: inspect paired node status and capability matrix -- `action=run|invoke|camera_snap|screen_record|location_get`: routing framework is in place (data-plane bridge lands in next phase) +- `action=run|invoke|camera_snap|screen_record|location_get`: routing framework is in place +- `mode=auto|p2p|relay`: default `auto` (prefer p2p, fallback to relay) Implementation: - `pkg/nodes/types.go` diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 0e3c3de..9d083c0 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -78,7 +78,8 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers toolsRegistry.Register(tools.NewExecTool(cfg.Tools.Shell, workspace, processManager)) toolsRegistry.Register(tools.NewProcessTool(processManager)) nodesManager := nodes.NewManager() - toolsRegistry.Register(tools.NewNodesTool(nodesManager)) + nodesRouter := &nodes.Router{P2P: &nodes.StubP2PTransport{}, Relay: &nodes.StubRelayTransport{}} + toolsRegistry.Register(tools.NewNodesTool(nodesManager, nodesRouter)) if cs != nil { toolsRegistry.Register(tools.NewRemindTool(cs)) diff --git a/pkg/nodes/transport.go b/pkg/nodes/transport.go new file mode 100644 index 0000000..bc6a6e5 --- /dev/null +++ b/pkg/nodes/transport.go @@ -0,0 +1,66 @@ +package nodes + +import ( + "context" + "fmt" + "strings" +) + +// Transport abstracts node data-plane delivery. +type Transport interface { + Name() string + Send(ctx context.Context, req Request) (Response, error) +} + +// Router prefers p2p transport and falls back to relay. +type Router struct { + P2P Transport + Relay Transport +} + +func (r *Router) Dispatch(ctx context.Context, req Request, mode string) (Response, error) { + m := strings.ToLower(strings.TrimSpace(mode)) + if m == "" { + m = "auto" + } + switch m { + case "p2p": + if r.P2P == nil { + return Response{OK: false, Node: req.Node, Action: req.Action, Error: "p2p transport unavailable"}, nil + } + return r.P2P.Send(ctx, req) + case "relay": + if r.Relay == nil { + return Response{OK: false, Node: req.Node, Action: req.Action, Error: "relay transport unavailable"}, nil + } + return r.Relay.Send(ctx, req) + default: // auto + if r.P2P != nil { + if resp, err := r.P2P.Send(ctx, req); err == nil && resp.OK { + return resp, nil + } + } + if r.Relay != nil { + return r.Relay.Send(ctx, req) + } + return Response{}, fmt.Errorf("no transport available") + } +} + +// StubP2PTransport provides phase-2 negotiation scaffold. +type StubP2PTransport struct{} + +func (s *StubP2PTransport) Name() string { return "p2p" } +func (s *StubP2PTransport) Send(ctx context.Context, req Request) (Response, error) { + _ = ctx + return Response{OK: false, Node: req.Node, Action: req.Action, Error: "p2p session not established yet"}, nil +} + +// StubRelayTransport provides executable placeholder until real bridge lands. +type StubRelayTransport struct{} + +func (s *StubRelayTransport) Name() string { return "relay" } +func (s *StubRelayTransport) Send(ctx context.Context, req Request) (Response, error) { + _ = ctx + return Response{OK: false, Node: req.Node, Action: req.Action, Error: "relay bridge not implemented yet"}, nil +} diff --git a/pkg/tools/nodes_tool.go b/pkg/tools/nodes_tool.go index 7695bce..05b2898 100644 --- a/pkg/tools/nodes_tool.go +++ b/pkg/tools/nodes_tool.go @@ -12,9 +12,10 @@ import ( // NodesTool provides an OpenClaw-style control surface for paired nodes. type NodesTool struct { manager *nodes.Manager + router *nodes.Router } -func NewNodesTool(m *nodes.Manager) *NodesTool { return &NodesTool{manager: m} } +func NewNodesTool(m *nodes.Manager, r *nodes.Router) *NodesTool { return &NodesTool{manager: m, router: r} } func (t *NodesTool) Name() string { return "nodes" } func (t *NodesTool) Description() string { return "Manage paired nodes (status/describe/run/invoke/camera/screen/location)." @@ -23,6 +24,7 @@ func (t *NodesTool) Parameters() map[string]interface{} { return map[string]interface{}{"type": "object", "properties": map[string]interface{}{ "action": map[string]interface{}{"type": "string", "description": "status|describe|run|invoke|camera_snap|screen_record|location_get"}, "node": map[string]interface{}{"type": "string", "description": "target node id"}, + "mode": map[string]interface{}{"type": "string", "description": "auto|p2p|relay (default auto)"}, "args": map[string]interface{}{"type": "object", "description": "action args"}, }, "required": []string{"action"}} } @@ -35,6 +37,7 @@ func (t *NodesTool) Execute(ctx context.Context, args map[string]interface{}) (s return "", fmt.Errorf("action is required") } nodeID, _ := args["node"].(string) + mode, _ := args["mode"].(string) if t.manager == nil { return "", fmt.Errorf("nodes manager not configured") } @@ -52,7 +55,6 @@ func (t *NodesTool) Execute(ctx context.Context, args map[string]interface{}) (s b, _ := json.Marshal(t.manager.List()) return string(b), nil default: - // Phase-1: control-plane exists, data-plane RPC bridge lands in next step. if nodeID == "" { if picked, ok := t.manager.PickFor(action); ok { nodeID = picked.ID @@ -61,7 +63,17 @@ func (t *NodesTool) Execute(ctx context.Context, args map[string]interface{}) (s if nodeID == "" { return "", fmt.Errorf("no eligible node found for action=%s", action) } - resp := nodes.Response{OK: false, Node: nodeID, Action: action, Error: "node transport bridge not implemented yet"} + if t.router == nil { + return "", fmt.Errorf("nodes transport router not configured") + } + var reqArgs map[string]interface{} + if raw, ok := args["args"].(map[string]interface{}); ok { + reqArgs = raw + } + resp, err := t.router.Dispatch(ctx, nodes.Request{Action: action, Node: nodeID, Args: reqArgs}, mode) + if err != nil { + return "", err + } b, _ := json.Marshal(resp) return string(b), nil }