diff --git a/README.md b/README.md index f24b32c..db3a785 100644 --- a/README.md +++ b/README.md @@ -200,6 +200,52 @@ user -> main -> worker -> main -> user 完整示例见 [config.example.json](/Users/lpf/Desktop/project/clawgo/config.example.json)。 +## Node P2P + +远端 node 的调度数据面现在支持: + +- `websocket_tunnel` +- `webrtc` + +默认仍然关闭,只有显式配置 `gateway.nodes.p2p.enabled=true` 才会启用。建议先用 `websocket_tunnel` 验证链路,再切到 `webrtc`。 + +`webrtc` 建议同时理解这两个字段: + +- `stun_servers` + - 兼容旧式 STUN 列表 +- `ice_servers` + - 推荐的新结构 + - 可以配置 `stun:`、`turn:`、`turns:` URL + - `turn:` / `turns:` 必须同时提供 `username` 和 `credential` + +示例: + +```json +{ + "gateway": { + "nodes": { + "p2p": { + "enabled": true, + "transport": "webrtc", + "stun_servers": ["stun:stun.l.google.com:19302"], + "ice_servers": [ + { + "urls": ["turn:turn.example.com:3478"], + "username": "demo", + "credential": "secret" + } + ] + } + } + } +} +``` + +说明: + +- `webrtc` 建连失败时,调度层仍会回退到现有 relay / tunnel 路径 +- Dashboard、`status`、`/webui/api/nodes` 会显示当前 Node P2P 状态和会话摘要 + ## MCP 服务支持 ClawGo 现在支持通过 `tools.mcp` 接入 `stdio`、`http`、`streamable_http`、`sse` 型 MCP server。 diff --git a/README_EN.md b/README_EN.md index 931c8b0..4a07500 100644 --- a/README_EN.md +++ b/README_EN.md @@ -200,6 +200,52 @@ Notes: See [config.example.json](/Users/lpf/Desktop/project/clawgo/config.example.json) for a full example. +## Node P2P + +The remote node data plane supports: + +- `websocket_tunnel` +- `webrtc` + +It remains disabled by default. Node P2P is only enabled when `gateway.nodes.p2p.enabled=true` is set explicitly. In practice, start with `websocket_tunnel`, then switch to `webrtc` after validating connectivity. + +For `webrtc`, these two fields matter: + +- `stun_servers` + - legacy-compatible STUN list +- `ice_servers` + - the preferred structured format + - may include `stun:`, `turn:`, and `turns:` URLs + - `turn:` / `turns:` entries require both `username` and `credential` + +Example: + +```json +{ + "gateway": { + "nodes": { + "p2p": { + "enabled": true, + "transport": "webrtc", + "stun_servers": ["stun:stun.l.google.com:19302"], + "ice_servers": [ + { + "urls": ["turn:turn.example.com:3478"], + "username": "demo", + "credential": "secret" + } + ] + } + } + } +} +``` + +Notes: + +- when `webrtc` session setup fails, dispatch still falls back to the existing relay / tunnel path +- Dashboard, `status`, and `/webui/api/nodes` expose the current Node P2P runtime summary + ## MCP Server Support ClawGo now supports `stdio`, `http`, `streamable_http`, and `sse` MCP servers through `tools.mcp`. diff --git a/cmd/clawgo/cmd_gateway.go b/cmd/clawgo/cmd_gateway.go index 12bac7b..89cd5ec 100644 --- a/cmd/clawgo/cmd_gateway.go +++ b/cmd/clawgo/cmd_gateway.go @@ -24,6 +24,7 @@ import ( "clawgo/pkg/providers" "clawgo/pkg/runtimecfg" "clawgo/pkg/sentinel" + "github.com/pion/webrtc/v4" ) func gatewayCmd() { @@ -133,16 +134,37 @@ func gatewayCmd() { if loop == nil || server == nil || runtimeCfg == nil { return } + buildICEServers := func() []webrtc.ICEServer { + out := make([]webrtc.ICEServer, 0, len(runtimeCfg.Gateway.Nodes.P2P.ICEServers)) + for _, serverCfg := range runtimeCfg.Gateway.Nodes.P2P.ICEServers { + urls := make([]string, 0, len(serverCfg.URLs)) + for _, raw := range serverCfg.URLs { + if v := strings.TrimSpace(raw); v != "" { + urls = append(urls, v) + } + } + if len(urls) == 0 { + continue + } + out = append(out, webrtc.ICEServer{ + URLs: urls, + Username: strings.TrimSpace(serverCfg.Username), + Credential: serverCfg.Credential, + }) + } + return out + } server.SetNodeP2PStatusHandler(func() map[string]interface{} { return map[string]interface{}{ "enabled": runtimeCfg.Gateway.Nodes.P2P.Enabled, "transport": strings.TrimSpace(runtimeCfg.Gateway.Nodes.P2P.Transport), "configured_stun": append([]string(nil), runtimeCfg.Gateway.Nodes.P2P.STUNServers...), + "configured_ice": len(runtimeCfg.Gateway.Nodes.P2P.ICEServers), } }) switch { case runtimeCfg.Gateway.Nodes.P2P.Enabled && strings.EqualFold(strings.TrimSpace(runtimeCfg.Gateway.Nodes.P2P.Transport), "webrtc"): - webrtcTransport := nodes.NewWebRTCTransport(runtimeCfg.Gateway.Nodes.P2P.STUNServers) + webrtcTransport := nodes.NewWebRTCTransport(runtimeCfg.Gateway.Nodes.P2P.STUNServers, buildICEServers()...) loop.SetNodeP2PTransport(webrtcTransport) server.SetNodeWebRTCTransport(webrtcTransport) server.SetNodeP2PStatusHandler(func() map[string]interface{} { @@ -150,6 +172,7 @@ func gatewayCmd() { snapshot["enabled"] = true snapshot["transport"] = "webrtc" snapshot["configured_stun"] = append([]string(nil), runtimeCfg.Gateway.Nodes.P2P.STUNServers...) + snapshot["configured_ice"] = len(runtimeCfg.Gateway.Nodes.P2P.ICEServers) return snapshot }) default: diff --git a/cmd/clawgo/cmd_status.go b/cmd/clawgo/cmd_status.go index 5cfd9ba..c22615e 100644 --- a/cmd/clawgo/cmd_status.go +++ b/cmd/clawgo/cmd_status.go @@ -134,6 +134,8 @@ func statusCmd() { fmt.Printf(" - %s\n", key) } } + fmt.Printf("Nodes P2P: enabled=%t transport=%s\n", cfg.Gateway.Nodes.P2P.Enabled, strings.TrimSpace(cfg.Gateway.Nodes.P2P.Transport)) + fmt.Printf("Nodes P2P ICE: stun=%d ice=%d\n", len(cfg.Gateway.Nodes.P2P.STUNServers), len(cfg.Gateway.Nodes.P2P.ICEServers)) ns := nodes.DefaultManager().List() if len(ns) > 0 { online := 0 @@ -163,7 +165,6 @@ func statusCmd() { } fmt.Printf("Nodes: total=%d online=%d\n", len(ns), online) fmt.Printf("Nodes Capabilities: run=%d model=%d camera=%d screen=%d location=%d canvas=%d\n", caps["run"], caps["model"], caps["camera"], caps["screen"], caps["location"], caps["canvas"]) - fmt.Printf("Nodes P2P: enabled=%t transport=%s\n", cfg.Gateway.Nodes.P2P.Enabled, strings.TrimSpace(cfg.Gateway.Nodes.P2P.Transport)) if total, okCnt, avgMs, actionTop, transportTop, fallbackCnt, err := collectNodeDispatchStats(filepath.Join(workspace, "memory", "nodes-dispatch-audit.jsonl")); err == nil && total > 0 { fmt.Printf("Nodes Dispatch: total=%d ok=%d fail=%d avg_ms=%d\n", total, okCnt, total-okCnt, avgMs) if actionTop != "" { diff --git a/cmd/clawgo/cmd_status_test.go b/cmd/clawgo/cmd_status_test.go index 33669f4..e8fe402 100644 --- a/cmd/clawgo/cmd_status_test.go +++ b/cmd/clawgo/cmd_status_test.go @@ -25,6 +25,12 @@ func TestStatusCmdUsesActiveProviderDetails(t *testing.T) { cfg.Logging.Enabled = false cfg.Agents.Defaults.Workspace = workspace cfg.Agents.Defaults.Proxy = "backup" + cfg.Gateway.Nodes.P2P.Enabled = true + cfg.Gateway.Nodes.P2P.Transport = "webrtc" + cfg.Gateway.Nodes.P2P.STUNServers = []string{"stun:stun.example.net:3478"} + cfg.Gateway.Nodes.P2P.ICEServers = []config.GatewayICEConfig{ + {URLs: []string{"turn:turn.example.net:3478"}, Username: "user", Credential: "secret"}, + } cfg.Providers.Proxy.APIBase = "https://primary.example/v1" cfg.Providers.Proxy.APIKey = "" cfg.Providers.Proxies["backup"] = config.ProviderConfig{ @@ -68,4 +74,10 @@ func TestStatusCmdUsesActiveProviderDetails(t *testing.T) { if !strings.Contains(out, "Provider API Key: ✓") { t.Fatalf("expected active provider api key status in output, got: %s", out) } + if !strings.Contains(out, "Nodes P2P: enabled=true transport=webrtc") { + t.Fatalf("expected nodes p2p status in output, got: %s", out) + } + if !strings.Contains(out, "Nodes P2P ICE: stun=1 ice=1") { + t.Fatalf("expected nodes p2p ice summary in output, got: %s", out) + } } diff --git a/config.example.json b/config.example.json index 379d29a..f653e13 100644 --- a/config.example.json +++ b/config.example.json @@ -280,7 +280,14 @@ "p2p": { "enabled": false, "transport": "websocket_tunnel", - "stun_servers": [] + "stun_servers": [], + "ice_servers": [ + { + "urls": [ + "stun:stun.l.google.com:19302" + ] + } + ] } } }, diff --git a/pkg/config/config.go b/pkg/config/config.go index 4e0edc9..1d17ff4 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -298,10 +298,17 @@ type GatewayNodesConfig struct { P2P GatewayNodesP2PConfig `json:"p2p,omitempty"` } +type GatewayICEConfig struct { + URLs []string `json:"urls,omitempty"` + Username string `json:"username,omitempty"` + Credential string `json:"credential,omitempty"` +} + type GatewayNodesP2PConfig struct { - Enabled bool `json:"enabled"` - Transport string `json:"transport,omitempty"` - STUNServers []string `json:"stun_servers,omitempty"` + Enabled bool `json:"enabled"` + Transport string `json:"transport,omitempty"` + STUNServers []string `json:"stun_servers,omitempty"` + ICEServers []GatewayICEConfig `json:"ice_servers,omitempty"` } type CronConfig struct { @@ -550,6 +557,7 @@ func DefaultConfig() *Config { Enabled: false, Transport: "websocket_tunnel", STUNServers: []string{}, + ICEServers: []GatewayICEConfig{}, }, }, }, diff --git a/pkg/config/validate.go b/pkg/config/validate.go index d625d00..62987e2 100644 --- a/pkg/config/validate.go +++ b/pkg/config/validate.go @@ -125,6 +125,26 @@ func Validate(cfg *Config) []error { errs = append(errs, fmt.Errorf("gateway.nodes.p2p.transport must be one of: websocket_tunnel, webrtc")) } errs = append(errs, validateNonEmptyStringList("gateway.nodes.p2p.stun_servers", cfg.Gateway.Nodes.P2P.STUNServers)...) + for i, server := range cfg.Gateway.Nodes.P2P.ICEServers { + prefix := fmt.Sprintf("gateway.nodes.p2p.ice_servers[%d]", i) + errs = append(errs, validateNonEmptyStringList(prefix+".urls", server.URLs)...) + needsAuth := false + for _, raw := range server.URLs { + u := strings.ToLower(strings.TrimSpace(raw)) + if strings.HasPrefix(u, "turn:") || strings.HasPrefix(u, "turns:") { + needsAuth = true + break + } + } + if needsAuth { + if strings.TrimSpace(server.Username) == "" { + errs = append(errs, fmt.Errorf("%s.username is required for turn/turns urls", prefix)) + } + if strings.TrimSpace(server.Credential) == "" { + errs = append(errs, fmt.Errorf("%s.credential is required for turn/turns urls", prefix)) + } + } + } if cfg.Cron.MinSleepSec <= 0 { errs = append(errs, fmt.Errorf("cron.min_sleep_sec must be > 0")) } diff --git a/pkg/config/validate_test.go b/pkg/config/validate_test.go index 5a53cd6..c07b592 100644 --- a/pkg/config/validate_test.go +++ b/pkg/config/validate_test.go @@ -151,3 +151,29 @@ func TestValidateRejectsUnknownGatewayNodeP2PTransport(t *testing.T) { t.Fatalf("expected validation errors") } } + +func TestValidateGatewayNodeP2PIceServersAllowsStunOnly(t *testing.T) { + t.Parallel() + + cfg := DefaultConfig() + cfg.Gateway.Nodes.P2P.ICEServers = []GatewayICEConfig{ + {URLs: []string{"stun:stun.l.google.com:19302"}}, + } + + if errs := Validate(cfg); len(errs) != 0 { + t.Fatalf("expected config to be valid, got %v", errs) + } +} + +func TestValidateGatewayNodeP2PIceServersRequireTurnCredentials(t *testing.T) { + t.Parallel() + + cfg := DefaultConfig() + cfg.Gateway.Nodes.P2P.ICEServers = []GatewayICEConfig{ + {URLs: []string{"turn:turn.example.com:3478?transport=udp"}}, + } + + if errs := Validate(cfg); len(errs) == 0 { + t.Fatalf("expected validation errors") + } +} diff --git a/pkg/nodes/webrtc.go b/pkg/nodes/webrtc.go index 9049a83..3222276 100644 --- a/pkg/nodes/webrtc.go +++ b/pkg/nodes/webrtc.go @@ -92,24 +92,40 @@ func (s *gatewayRTCSession) snapshot() map[string]interface{} { } type WebRTCTransport struct { - stunServers []string + iceServers []webrtc.ICEServer mu sync.Mutex sessions map[string]*gatewayRTCSession signal map[string]WireSender } -func NewWebRTCTransport(stunServers []string) *WebRTCTransport { - out := make([]string, 0, len(stunServers)) +func NewWebRTCTransport(stunServers []string, extraICEServers ...webrtc.ICEServer) *WebRTCTransport { + out := make([]webrtc.ICEServer, 0, len(stunServers)+len(extraICEServers)) for _, server := range stunServers { if v := strings.TrimSpace(server); v != "" { - out = append(out, v) + out = append(out, webrtc.ICEServer{URLs: []string{v}}) } } + for _, server := range extraICEServers { + urls := make([]string, 0, len(server.URLs)) + for _, raw := range server.URLs { + if v := strings.TrimSpace(raw); v != "" { + urls = append(urls, v) + } + } + if len(urls) == 0 { + continue + } + out = append(out, webrtc.ICEServer{ + URLs: urls, + Username: strings.TrimSpace(server.Username), + Credential: server.Credential, + }) + } return &WebRTCTransport{ - stunServers: out, - sessions: map[string]*gatewayRTCSession{}, - signal: map[string]WireSender{}, + iceServers: out, + sessions: map[string]*gatewayRTCSession{}, + signal: map[string]WireSender{}, } } @@ -133,6 +149,7 @@ func (t *WebRTCTransport) Snapshot() map[string]interface{} { return map[string]interface{}{ "transport": "webrtc", "active_sessions": active, + "ice_servers": len(t.iceServers), "nodes": nodes, } } @@ -276,8 +293,8 @@ func (t *WebRTCTransport) ensureSession(nodeID string) (*gatewayRTCSession, erro } config := webrtc.Configuration{} - if len(t.stunServers) > 0 { - config.ICEServers = []webrtc.ICEServer{{URLs: append([]string(nil), t.stunServers...)}} + if len(t.iceServers) > 0 { + config.ICEServers = append([]webrtc.ICEServer(nil), t.iceServers...) } pc, err := webrtc.NewPeerConnection(config) if err != nil { diff --git a/webui/src/i18n/index.ts b/webui/src/i18n/index.ts index 295f5ca..ff869d8 100644 --- a/webui/src/i18n/index.ts +++ b/webui/src/i18n/index.ts @@ -118,6 +118,18 @@ const resources = { nodesSnapshot: 'Nodes Snapshot', refreshAll: 'Refresh All', refresh: 'Refresh', + dashboardNodeP2PDetail: '{{transport}} · {{sessions}} active · {{retries}} retries', + dashboardNodeP2PTransport: 'Transport', + dashboardNodeP2PIce: 'ICE Config', + dashboardNodeP2PHealth: 'Health', + configNodeP2P: 'Node P2P', + configNodeP2PHint: 'Configure websocket tunnel or WebRTC transport for remote nodes.', + configNodeP2PStunPlaceholder: 'Comma-separated STUN URLs', + configNodeP2PIceServers: 'ICE Servers', + configNodeP2PIceServersEmpty: 'No structured ICE servers configured.', + configNodeP2PIceUrlsPlaceholder: 'Comma-separated ICE URLs', + configNodeP2PIceUsername: 'ICE Username', + configNodeP2PIceCredential: 'ICE Credential', active: 'Active', paused: 'Paused', noCronJobs: 'No cron jobs found', @@ -644,6 +656,18 @@ const resources = { nodesSnapshot: '节点快照', refreshAll: '刷新全部', refresh: '刷新', + dashboardNodeP2PDetail: '{{transport}} · {{sessions}} 个活跃会话 · {{retries}} 次重试', + dashboardNodeP2PTransport: '传输方式', + dashboardNodeP2PIce: 'ICE 配置', + dashboardNodeP2PHealth: '健康状态', + configNodeP2P: '节点 P2P', + configNodeP2PHint: '为远端节点配置 websocket tunnel 或 WebRTC 传输。', + configNodeP2PStunPlaceholder: '逗号分隔的 STUN URL', + configNodeP2PIceServers: 'ICE 服务器', + configNodeP2PIceServersEmpty: '当前没有结构化 ICE 服务器配置。', + configNodeP2PIceUrlsPlaceholder: '逗号分隔的 ICE URL', + configNodeP2PIceUsername: 'ICE 用户名', + configNodeP2PIceCredential: 'ICE 凭证', active: '活跃', paused: '已暂停', noCronJobs: '未找到定时任务', diff --git a/webui/src/pages/Config.tsx b/webui/src/pages/Config.tsx index 6548bc3..6dd665e 100644 --- a/webui/src/pages/Config.tsx +++ b/webui/src/pages/Config.tsx @@ -107,6 +107,48 @@ const Config: React.FC = () => { setCfg((v) => setPath(v, `providers.proxies.${name}.${field}`, value)); } + function updateGatewayP2PField(field: string, value: any) { + setCfg((v) => setPath(v, `gateway.nodes.p2p.${field}`, value)); + } + + function updateGatewayIceServer(index: number, field: string, value: any) { + setCfg((v) => { + const next = JSON.parse(JSON.stringify(v || {})); + if (!next.gateway || typeof next.gateway !== 'object') next.gateway = {}; + if (!next.gateway.nodes || typeof next.gateway.nodes !== 'object') next.gateway.nodes = {}; + if (!next.gateway.nodes.p2p || typeof next.gateway.nodes.p2p !== 'object') next.gateway.nodes.p2p = {}; + if (!Array.isArray(next.gateway.nodes.p2p.ice_servers)) next.gateway.nodes.p2p.ice_servers = []; + if (!next.gateway.nodes.p2p.ice_servers[index] || typeof next.gateway.nodes.p2p.ice_servers[index] !== 'object') { + next.gateway.nodes.p2p.ice_servers[index] = { urls: [], username: '', credential: '' }; + } + next.gateway.nodes.p2p.ice_servers[index][field] = value; + return next; + }); + } + + function addGatewayIceServer() { + setCfg((v) => { + const next = JSON.parse(JSON.stringify(v || {})); + if (!next.gateway || typeof next.gateway !== 'object') next.gateway = {}; + if (!next.gateway.nodes || typeof next.gateway.nodes !== 'object') next.gateway.nodes = {}; + if (!next.gateway.nodes.p2p || typeof next.gateway.nodes.p2p !== 'object') next.gateway.nodes.p2p = {}; + if (!Array.isArray(next.gateway.nodes.p2p.ice_servers)) next.gateway.nodes.p2p.ice_servers = []; + next.gateway.nodes.p2p.ice_servers.push({ urls: [], username: '', credential: '' }); + return next; + }); + } + + function removeGatewayIceServer(index: number) { + setCfg((v) => { + const next = JSON.parse(JSON.stringify(v || {})); + const iceServers = next?.gateway?.nodes?.p2p?.ice_servers; + if (Array.isArray(iceServers)) { + iceServers.splice(index, 1); + } + return next; + }); + } + async function removeProxy(name: string) { const ok = await ui.confirmDialog({ title: t('configDeleteProviderConfirmTitle'), @@ -291,6 +333,77 @@ const Config: React.FC = () => { )} + {activeTop === 'gateway' && !showRaw && ( +