From 92433a6e212f6e700a53d6c5919357909f01b341 Mon Sep 17 00:00:00 2001 From: DBT Date: Tue, 24 Feb 2026 15:51:24 +0000 Subject: [PATCH] wire nodes relay transport to HTTP bridge with endpoint token support --- README.md | 3 ++- README_EN.md | 3 ++- pkg/agent/loop.go | 2 +- pkg/nodes/transport.go | 56 +++++++++++++++++++++++++++++++++++++----- pkg/nodes/types.go | 2 ++ 5 files changed, 57 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index e4a3047..84f50be 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,8 @@ - `action=status|describe`:查看已配对节点状态与能力矩阵 - `action=run|invoke|camera_snap|screen_record|location_get`:已接入路由框架 - `mode=auto|p2p|relay`:默认 `auto`(优先 p2p,失败回退 relay) -- relay 已接入本地 handler 调用链,便于逐步替换为真实跨节点传输 +- relay 已支持 HTTP 节点桥接:当节点配置 `endpoint` 后,调用 `POST {endpoint}/invoke` 并透传 `Request` +- 可在 `NodeInfo` 中配置 `token`,relay 会自动附加 `Authorization: Bearer ` 实现位置: - `pkg/nodes/types.go` diff --git a/README_EN.md b/README_EN.md index 7ca15a2..aa3dc38 100644 --- a/README_EN.md +++ b/README_EN.md @@ -42,7 +42,8 @@ A `nodes` tool control-plane PoC is now available: - `action=status|describe`: inspect paired node status and capability matrix - `action=run|invoke|camera_snap|screen_record|location_get`: routing framework is in place - `mode=auto|p2p|relay`: default `auto` (prefer p2p, fallback to relay) -- relay now uses local handler invocation path, ready for real cross-node transport replacement +- relay now supports HTTP node bridging: with node `endpoint` configured, it calls `POST {endpoint}/invoke` +- `NodeInfo.token` is supported; relay automatically sets `Authorization: Bearer ` Implementation: - `pkg/nodes/types.go` diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 179838b..e2c3ec4 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -82,7 +82,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers nodesManager.RegisterHandler("local", func(req nodes.Request) nodes.Response { return nodes.Response{OK: true, Node: "local", Action: req.Action, Payload: map[string]interface{}{"echo": req.Args, "transport": "relay-local"}} }) - nodesRouter := &nodes.Router{P2P: &nodes.StubP2PTransport{}, Relay: &nodes.StubRelayTransport{Manager: nodesManager}} + nodesRouter := &nodes.Router{P2P: &nodes.StubP2PTransport{}, Relay: &nodes.HTTPRelayTransport{Manager: nodesManager}} toolsRegistry.Register(tools.NewNodesTool(nodesManager, nodesRouter)) if cs != nil { diff --git a/pkg/nodes/transport.go b/pkg/nodes/transport.go index d75c994..23fd766 100644 --- a/pkg/nodes/transport.go +++ b/pkg/nodes/transport.go @@ -1,9 +1,14 @@ package nodes import ( + "bytes" "context" + "encoding/json" "fmt" + "io" + "net/http" "strings" + "time" ) // Transport abstracts node data-plane delivery. @@ -56,17 +61,56 @@ func (s *StubP2PTransport) Send(ctx context.Context, req Request) (Response, err return Response{OK: false, Node: req.Node, Action: req.Action, Error: "p2p session not established yet"}, nil } -// StubRelayTransport provides executable placeholder until real bridge lands. -type StubRelayTransport struct{ Manager *Manager } +// HTTPRelayTransport dispatches requests to node-agent endpoints over HTTP. +type HTTPRelayTransport struct { + Manager *Manager + Client *http.Client +} -func (s *StubRelayTransport) Name() string { return "relay" } -func (s *StubRelayTransport) Send(ctx context.Context, req Request) (Response, error) { - _ = ctx +func (s *HTTPRelayTransport) Name() string { return "relay" } +func (s *HTTPRelayTransport) Send(ctx context.Context, req Request) (Response, error) { if s.Manager == nil { return Response{OK: false, Node: req.Node, Action: req.Action, Error: "relay manager not configured"}, nil } if resp, ok := s.Manager.Invoke(req); ok { return resp, nil } - return Response{OK: false, Node: req.Node, Action: req.Action, Error: "relay handler not found for node"}, nil + n, ok := s.Manager.Get(req.Node) + if !ok { + return Response{OK: false, Node: req.Node, Action: req.Action, Error: "node not found"}, nil + } + endpoint := strings.TrimRight(strings.TrimSpace(n.Endpoint), "/") + if endpoint == "" { + return Response{OK: false, Node: req.Node, Action: req.Action, Error: "node endpoint not configured"}, nil + } + client := s.Client + if client == nil { + client = &http.Client{Timeout: 20 * time.Second} + } + body, _ := json.Marshal(req) + hreq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint+"/invoke", bytes.NewReader(body)) + if err != nil { + return Response{}, err + } + hreq.Header.Set("Content-Type", "application/json") + if tok := strings.TrimSpace(n.Token); tok != "" { + hreq.Header.Set("Authorization", "Bearer "+tok) + } + hresp, err := client.Do(hreq) + if err != nil { + return Response{OK: false, Node: req.Node, Action: req.Action, Error: err.Error()}, nil + } + defer hresp.Body.Close() + payload, _ := io.ReadAll(io.LimitReader(hresp.Body, 1<<20)) + var resp Response + if err := json.Unmarshal(payload, &resp); err != nil { + return Response{OK: false, Node: req.Node, Action: req.Action, Error: fmt.Sprintf("invalid node response: %s", strings.TrimSpace(string(payload)))}, nil + } + if strings.TrimSpace(resp.Node) == "" { + resp.Node = req.Node + } + if strings.TrimSpace(resp.Action) == "" { + resp.Action = req.Action + } + return resp, nil } diff --git a/pkg/nodes/types.go b/pkg/nodes/types.go index 474a821..0a9ba24 100644 --- a/pkg/nodes/types.go +++ b/pkg/nodes/types.go @@ -19,6 +19,8 @@ type NodeInfo struct { OS string `json:"os,omitempty"` Arch string `json:"arch,omitempty"` Version string `json:"version,omitempty"` + Endpoint string `json:"endpoint,omitempty"` + Token string `json:"token,omitempty"` Capabilities Capabilities `json:"capabilities"` LastSeenAt time.Time `json:"last_seen_at"` Online bool `json:"online"`