add p2p-relay transport router scaffold for nodes tool

This commit is contained in:
DBT
2026-02-24 15:06:54 +00:00
parent 7fa0e629e8
commit 9635d48e67
5 changed files with 87 additions and 6 deletions

View File

@@ -40,7 +40,8 @@
已新增 `nodes` 工具控制平面PoC
- `action=status|describe`:查看已配对节点状态与能力矩阵
- `action=run|invoke|camera_snap|screen_record|location_get`:已接入路由框架(下一阶段接数据平面传输)
- `action=run|invoke|camera_snap|screen_record|location_get`:已接入路由框架
- `mode=auto|p2p|relay`:默认 `auto`(优先 p2p失败回退 relay
实现位置:
- `pkg/nodes/types.go`

View File

@@ -40,7 +40,8 @@ These changes improve stability, observability, and maintainability under concur
A `nodes` tool control-plane PoC is now available:
- `action=status|describe`: inspect paired node status and capability matrix
- `action=run|invoke|camera_snap|screen_record|location_get`: routing framework is in place (data-plane bridge lands in next phase)
- `action=run|invoke|camera_snap|screen_record|location_get`: routing framework is in place
- `mode=auto|p2p|relay`: default `auto` (prefer p2p, fallback to relay)
Implementation:
- `pkg/nodes/types.go`

View File

@@ -78,7 +78,8 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
toolsRegistry.Register(tools.NewExecTool(cfg.Tools.Shell, workspace, processManager))
toolsRegistry.Register(tools.NewProcessTool(processManager))
nodesManager := nodes.NewManager()
toolsRegistry.Register(tools.NewNodesTool(nodesManager))
nodesRouter := &nodes.Router{P2P: &nodes.StubP2PTransport{}, Relay: &nodes.StubRelayTransport{}}
toolsRegistry.Register(tools.NewNodesTool(nodesManager, nodesRouter))
if cs != nil {
toolsRegistry.Register(tools.NewRemindTool(cs))

66
pkg/nodes/transport.go Normal file
View File

@@ -0,0 +1,66 @@
package nodes
import (
"context"
"fmt"
"strings"
)
// Transport abstracts node data-plane delivery.
type Transport interface {
Name() string
Send(ctx context.Context, req Request) (Response, error)
}
// Router prefers p2p transport and falls back to relay.
type Router struct {
P2P Transport
Relay Transport
}
func (r *Router) Dispatch(ctx context.Context, req Request, mode string) (Response, error) {
m := strings.ToLower(strings.TrimSpace(mode))
if m == "" {
m = "auto"
}
switch m {
case "p2p":
if r.P2P == nil {
return Response{OK: false, Node: req.Node, Action: req.Action, Error: "p2p transport unavailable"}, nil
}
return r.P2P.Send(ctx, req)
case "relay":
if r.Relay == nil {
return Response{OK: false, Node: req.Node, Action: req.Action, Error: "relay transport unavailable"}, nil
}
return r.Relay.Send(ctx, req)
default: // auto
if r.P2P != nil {
if resp, err := r.P2P.Send(ctx, req); err == nil && resp.OK {
return resp, nil
}
}
if r.Relay != nil {
return r.Relay.Send(ctx, req)
}
return Response{}, fmt.Errorf("no transport available")
}
}
// StubP2PTransport provides phase-2 negotiation scaffold.
type StubP2PTransport struct{}
func (s *StubP2PTransport) Name() string { return "p2p" }
func (s *StubP2PTransport) Send(ctx context.Context, req Request) (Response, error) {
_ = ctx
return Response{OK: false, Node: req.Node, Action: req.Action, Error: "p2p session not established yet"}, nil
}
// StubRelayTransport provides executable placeholder until real bridge lands.
type StubRelayTransport struct{}
func (s *StubRelayTransport) Name() string { return "relay" }
func (s *StubRelayTransport) Send(ctx context.Context, req Request) (Response, error) {
_ = ctx
return Response{OK: false, Node: req.Node, Action: req.Action, Error: "relay bridge not implemented yet"}, nil
}

View File

@@ -12,9 +12,10 @@ import (
// NodesTool provides an OpenClaw-style control surface for paired nodes.
type NodesTool struct {
manager *nodes.Manager
router *nodes.Router
}
func NewNodesTool(m *nodes.Manager) *NodesTool { return &NodesTool{manager: m} }
func NewNodesTool(m *nodes.Manager, r *nodes.Router) *NodesTool { return &NodesTool{manager: m, router: r} }
func (t *NodesTool) Name() string { return "nodes" }
func (t *NodesTool) Description() string {
return "Manage paired nodes (status/describe/run/invoke/camera/screen/location)."
@@ -23,6 +24,7 @@ func (t *NodesTool) Parameters() map[string]interface{} {
return map[string]interface{}{"type": "object", "properties": map[string]interface{}{
"action": map[string]interface{}{"type": "string", "description": "status|describe|run|invoke|camera_snap|screen_record|location_get"},
"node": map[string]interface{}{"type": "string", "description": "target node id"},
"mode": map[string]interface{}{"type": "string", "description": "auto|p2p|relay (default auto)"},
"args": map[string]interface{}{"type": "object", "description": "action args"},
}, "required": []string{"action"}}
}
@@ -35,6 +37,7 @@ func (t *NodesTool) Execute(ctx context.Context, args map[string]interface{}) (s
return "", fmt.Errorf("action is required")
}
nodeID, _ := args["node"].(string)
mode, _ := args["mode"].(string)
if t.manager == nil {
return "", fmt.Errorf("nodes manager not configured")
}
@@ -52,7 +55,6 @@ func (t *NodesTool) Execute(ctx context.Context, args map[string]interface{}) (s
b, _ := json.Marshal(t.manager.List())
return string(b), nil
default:
// Phase-1: control-plane exists, data-plane RPC bridge lands in next step.
if nodeID == "" {
if picked, ok := t.manager.PickFor(action); ok {
nodeID = picked.ID
@@ -61,7 +63,17 @@ func (t *NodesTool) Execute(ctx context.Context, args map[string]interface{}) (s
if nodeID == "" {
return "", fmt.Errorf("no eligible node found for action=%s", action)
}
resp := nodes.Response{OK: false, Node: nodeID, Action: action, Error: "node transport bridge not implemented yet"}
if t.router == nil {
return "", fmt.Errorf("nodes transport router not configured")
}
var reqArgs map[string]interface{}
if raw, ok := args["args"].(map[string]interface{}); ok {
reqArgs = raw
}
resp, err := t.router.Dispatch(ctx, nodes.Request{Action: action, Node: nodeID, Args: reqArgs}, mode)
if err != nil {
return "", err
}
b, _ := json.Marshal(resp)
return string(b), nil
}