wire nodes relay transport to HTTP bridge with endpoint token support

This commit is contained in:
DBT
2026-02-24 15:51:24 +00:00
parent 2486bbd80c
commit 92433a6e21
5 changed files with 57 additions and 9 deletions

View File

@@ -82,7 +82,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
nodesManager.RegisterHandler("local", func(req nodes.Request) nodes.Response {
return nodes.Response{OK: true, Node: "local", Action: req.Action, Payload: map[string]interface{}{"echo": req.Args, "transport": "relay-local"}}
})
nodesRouter := &nodes.Router{P2P: &nodes.StubP2PTransport{}, Relay: &nodes.StubRelayTransport{Manager: nodesManager}}
nodesRouter := &nodes.Router{P2P: &nodes.StubP2PTransport{}, Relay: &nodes.HTTPRelayTransport{Manager: nodesManager}}
toolsRegistry.Register(tools.NewNodesTool(nodesManager, nodesRouter))
if cs != nil {

View File

@@ -1,9 +1,14 @@
package nodes
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
)
// Transport abstracts node data-plane delivery.
@@ -56,17 +61,56 @@ func (s *StubP2PTransport) Send(ctx context.Context, req Request) (Response, err
return Response{OK: false, Node: req.Node, Action: req.Action, Error: "p2p session not established yet"}, nil
}
// StubRelayTransport provides executable placeholder until real bridge lands.
type StubRelayTransport struct{ Manager *Manager }
// HTTPRelayTransport dispatches requests to node-agent endpoints over HTTP.
type HTTPRelayTransport struct {
Manager *Manager
Client *http.Client
}
func (s *StubRelayTransport) Name() string { return "relay" }
func (s *StubRelayTransport) Send(ctx context.Context, req Request) (Response, error) {
_ = ctx
func (s *HTTPRelayTransport) Name() string { return "relay" }
func (s *HTTPRelayTransport) Send(ctx context.Context, req Request) (Response, error) {
if s.Manager == nil {
return Response{OK: false, Node: req.Node, Action: req.Action, Error: "relay manager not configured"}, nil
}
if resp, ok := s.Manager.Invoke(req); ok {
return resp, nil
}
return Response{OK: false, Node: req.Node, Action: req.Action, Error: "relay handler not found for node"}, nil
n, ok := s.Manager.Get(req.Node)
if !ok {
return Response{OK: false, Node: req.Node, Action: req.Action, Error: "node not found"}, nil
}
endpoint := strings.TrimRight(strings.TrimSpace(n.Endpoint), "/")
if endpoint == "" {
return Response{OK: false, Node: req.Node, Action: req.Action, Error: "node endpoint not configured"}, nil
}
client := s.Client
if client == nil {
client = &http.Client{Timeout: 20 * time.Second}
}
body, _ := json.Marshal(req)
hreq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint+"/invoke", bytes.NewReader(body))
if err != nil {
return Response{}, err
}
hreq.Header.Set("Content-Type", "application/json")
if tok := strings.TrimSpace(n.Token); tok != "" {
hreq.Header.Set("Authorization", "Bearer "+tok)
}
hresp, err := client.Do(hreq)
if err != nil {
return Response{OK: false, Node: req.Node, Action: req.Action, Error: err.Error()}, nil
}
defer hresp.Body.Close()
payload, _ := io.ReadAll(io.LimitReader(hresp.Body, 1<<20))
var resp Response
if err := json.Unmarshal(payload, &resp); err != nil {
return Response{OK: false, Node: req.Node, Action: req.Action, Error: fmt.Sprintf("invalid node response: %s", strings.TrimSpace(string(payload)))}, nil
}
if strings.TrimSpace(resp.Node) == "" {
resp.Node = req.Node
}
if strings.TrimSpace(resp.Action) == "" {
resp.Action = req.Action
}
return resp, nil
}

View File

@@ -19,6 +19,8 @@ type NodeInfo struct {
OS string `json:"os,omitempty"`
Arch string `json:"arch,omitempty"`
Version string `json:"version,omitempty"`
Endpoint string `json:"endpoint,omitempty"`
Token string `json:"token,omitempty"`
Capabilities Capabilities `json:"capabilities"`
LastSeenAt time.Time `json:"last_seen_at"`
Online bool `json:"online"`