From ad2e732f567b1cbe027c9b8bcbdf6d6c94577d65 Mon Sep 17 00:00:00 2001 From: lpf Date: Sun, 8 Mar 2026 22:38:00 +0800 Subject: [PATCH] fix: gate node p2p behind explicit config --- config.example.json | 8 +++++++- pkg/agent/loop.go | 12 +++++++++++- pkg/agent/loop_nodes_p2p_test.go | 32 ++++++++++++++++++++++++++++++++ pkg/config/config.go | 22 +++++++++++++++++++--- pkg/config/validate.go | 5 +++++ pkg/config/validate_test.go | 23 +++++++++++++++++++++++ 6 files changed, 97 insertions(+), 5 deletions(-) create mode 100644 pkg/agent/loop_nodes_p2p_test.go diff --git a/config.example.json b/config.example.json index 8f57eeb..bdceee0 100644 --- a/config.example.json +++ b/config.example.json @@ -275,7 +275,13 @@ "gateway": { "host": "0.0.0.0", "port": 18790, - "token": "" + "token": "", + "nodes": { + "p2p": { + "enabled": false, + "transport": "websocket_tunnel" + } + } }, "cron": { "min_sleep_sec": 1, diff --git a/pkg/agent/loop.go b/pkg/agent/loop.go index 31e7eae..71cd171 100644 --- a/pkg/agent/loop.go +++ b/pkg/agent/loop.go @@ -142,7 +142,17 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers return nodes.Response{OK: false, Code: "unsupported_action", Node: "local", Action: req.Action, Error: "unsupported local simulated action"} } }) - nodesRouter := &nodes.Router{P2P: &nodes.WebsocketP2PTransport{Manager: nodesManager}, Relay: &nodes.HTTPRelayTransport{Manager: nodesManager}} + var nodeP2P nodes.Transport + if cfg.Gateway.Nodes.P2P.Enabled { + switch strings.ToLower(strings.TrimSpace(cfg.Gateway.Nodes.P2P.Transport)) { + case "", "websocket_tunnel": + nodeP2P = &nodes.WebsocketP2PTransport{Manager: nodesManager} + case "webrtc": + // Keep the mode explicit but non-default until a direct data channel is production-ready. + nodeP2P = &nodes.WebsocketP2PTransport{Manager: nodesManager} + } + } + nodesRouter := &nodes.Router{P2P: nodeP2P, Relay: &nodes.HTTPRelayTransport{Manager: nodesManager}} toolsRegistry.Register(tools.NewNodesTool(nodesManager, nodesRouter, filepath.Join(workspace, "memory", "nodes-dispatch-audit.jsonl"))) if cs != nil { diff --git a/pkg/agent/loop_nodes_p2p_test.go b/pkg/agent/loop_nodes_p2p_test.go new file mode 100644 index 0000000..d026c85 --- /dev/null +++ b/pkg/agent/loop_nodes_p2p_test.go @@ -0,0 +1,32 @@ +package agent + +import ( + "testing" + + "clawgo/pkg/bus" + "clawgo/pkg/config" +) + +func TestNewAgentLoopDisablesNodeP2PByDefault(t *testing.T) { + cfg := config.DefaultConfig() + cfg.Agents.Defaults.Workspace = t.TempDir() + + loop := NewAgentLoop(cfg, bus.NewMessageBus(), stubLLMProvider{}, nil) + if loop.nodeRouter == nil { + t.Fatalf("expected node router to be configured") + } + if loop.nodeRouter.P2P != nil { + t.Fatalf("expected node p2p transport to be disabled by default") + } +} + +func TestNewAgentLoopEnablesNodeP2PWhenConfigured(t *testing.T) { + cfg := config.DefaultConfig() + cfg.Agents.Defaults.Workspace = t.TempDir() + cfg.Gateway.Nodes.P2P.Enabled = true + + loop := NewAgentLoop(cfg, bus.NewMessageBus(), stubLLMProvider{}, nil) + if loop.nodeRouter == nil || loop.nodeRouter.P2P == nil { + t.Fatalf("expected node p2p transport to be enabled") + } +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 3193f31..800d1fa 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -288,9 +288,19 @@ type ProviderResponsesConfig struct { } type GatewayConfig struct { - Host string `json:"host" env:"CLAWGO_GATEWAY_HOST"` - Port int `json:"port" env:"CLAWGO_GATEWAY_PORT"` - Token string `json:"token" env:"CLAWGO_GATEWAY_TOKEN"` + Host string `json:"host" env:"CLAWGO_GATEWAY_HOST"` + Port int `json:"port" env:"CLAWGO_GATEWAY_PORT"` + Token string `json:"token" env:"CLAWGO_GATEWAY_TOKEN"` + Nodes GatewayNodesConfig `json:"nodes,omitempty"` +} + +type GatewayNodesConfig struct { + P2P GatewayNodesP2PConfig `json:"p2p,omitempty"` +} + +type GatewayNodesP2PConfig struct { + Enabled bool `json:"enabled"` + Transport string `json:"transport,omitempty"` } type CronConfig struct { @@ -534,6 +544,12 @@ func DefaultConfig() *Config { Host: "0.0.0.0", Port: 18790, Token: generateGatewayToken(), + Nodes: GatewayNodesConfig{ + P2P: GatewayNodesP2PConfig{ + Enabled: false, + Transport: "websocket_tunnel", + }, + }, }, Cron: CronConfig{ MinSleepSec: 1, diff --git a/pkg/config/validate.go b/pkg/config/validate.go index c5632f3..f002d89 100644 --- a/pkg/config/validate.go +++ b/pkg/config/validate.go @@ -119,6 +119,11 @@ func Validate(cfg *Config) []error { if cfg.Gateway.Port <= 0 || cfg.Gateway.Port > 65535 { errs = append(errs, fmt.Errorf("gateway.port must be in 1..65535")) } + switch strings.ToLower(strings.TrimSpace(cfg.Gateway.Nodes.P2P.Transport)) { + case "", "websocket_tunnel", "webrtc": + default: + errs = append(errs, fmt.Errorf("gateway.nodes.p2p.transport must be one of: websocket_tunnel, webrtc")) + } if cfg.Cron.MinSleepSec <= 0 { errs = append(errs, fmt.Errorf("cron.min_sleep_sec must be > 0")) } diff --git a/pkg/config/validate_test.go b/pkg/config/validate_test.go index 5902a6b..5a53cd6 100644 --- a/pkg/config/validate_test.go +++ b/pkg/config/validate_test.go @@ -128,3 +128,26 @@ func TestValidateSubagentsRejectsInvalidNotifyMainPolicy(t *testing.T) { t.Fatalf("expected validation errors") } } + +func TestDefaultConfigDisablesGatewayNodeP2P(t *testing.T) { + t.Parallel() + + cfg := DefaultConfig() + if cfg.Gateway.Nodes.P2P.Enabled { + t.Fatalf("expected gateway node p2p to be disabled by default") + } + if cfg.Gateway.Nodes.P2P.Transport != "websocket_tunnel" { + t.Fatalf("unexpected default gateway node p2p transport: %s", cfg.Gateway.Nodes.P2P.Transport) + } +} + +func TestValidateRejectsUnknownGatewayNodeP2PTransport(t *testing.T) { + t.Parallel() + + cfg := DefaultConfig() + cfg.Gateway.Nodes.P2P.Transport = "udp" + + if errs := Validate(cfg); len(errs) == 0 { + t.Fatalf("expected validation errors") + } +}