feat: add turn-ready node p2p config

This commit is contained in:
lpf
2026-03-08 23:12:29 +08:00
parent 3db78e0577
commit f441972c56
13 changed files with 384 additions and 29 deletions

View File

@@ -298,10 +298,17 @@ type GatewayNodesConfig struct {
P2P GatewayNodesP2PConfig `json:"p2p,omitempty"`
}
type GatewayICEConfig struct {
URLs []string `json:"urls,omitempty"`
Username string `json:"username,omitempty"`
Credential string `json:"credential,omitempty"`
}
type GatewayNodesP2PConfig struct {
Enabled bool `json:"enabled"`
Transport string `json:"transport,omitempty"`
STUNServers []string `json:"stun_servers,omitempty"`
Enabled bool `json:"enabled"`
Transport string `json:"transport,omitempty"`
STUNServers []string `json:"stun_servers,omitempty"`
ICEServers []GatewayICEConfig `json:"ice_servers,omitempty"`
}
type CronConfig struct {
@@ -550,6 +557,7 @@ func DefaultConfig() *Config {
Enabled: false,
Transport: "websocket_tunnel",
STUNServers: []string{},
ICEServers: []GatewayICEConfig{},
},
},
},

View File

@@ -125,6 +125,26 @@ func Validate(cfg *Config) []error {
errs = append(errs, fmt.Errorf("gateway.nodes.p2p.transport must be one of: websocket_tunnel, webrtc"))
}
errs = append(errs, validateNonEmptyStringList("gateway.nodes.p2p.stun_servers", cfg.Gateway.Nodes.P2P.STUNServers)...)
for i, server := range cfg.Gateway.Nodes.P2P.ICEServers {
prefix := fmt.Sprintf("gateway.nodes.p2p.ice_servers[%d]", i)
errs = append(errs, validateNonEmptyStringList(prefix+".urls", server.URLs)...)
needsAuth := false
for _, raw := range server.URLs {
u := strings.ToLower(strings.TrimSpace(raw))
if strings.HasPrefix(u, "turn:") || strings.HasPrefix(u, "turns:") {
needsAuth = true
break
}
}
if needsAuth {
if strings.TrimSpace(server.Username) == "" {
errs = append(errs, fmt.Errorf("%s.username is required for turn/turns urls", prefix))
}
if strings.TrimSpace(server.Credential) == "" {
errs = append(errs, fmt.Errorf("%s.credential is required for turn/turns urls", prefix))
}
}
}
if cfg.Cron.MinSleepSec <= 0 {
errs = append(errs, fmt.Errorf("cron.min_sleep_sec must be > 0"))
}

View File

@@ -151,3 +151,29 @@ func TestValidateRejectsUnknownGatewayNodeP2PTransport(t *testing.T) {
t.Fatalf("expected validation errors")
}
}
func TestValidateGatewayNodeP2PIceServersAllowsStunOnly(t *testing.T) {
t.Parallel()
cfg := DefaultConfig()
cfg.Gateway.Nodes.P2P.ICEServers = []GatewayICEConfig{
{URLs: []string{"stun:stun.l.google.com:19302"}},
}
if errs := Validate(cfg); len(errs) != 0 {
t.Fatalf("expected config to be valid, got %v", errs)
}
}
func TestValidateGatewayNodeP2PIceServersRequireTurnCredentials(t *testing.T) {
t.Parallel()
cfg := DefaultConfig()
cfg.Gateway.Nodes.P2P.ICEServers = []GatewayICEConfig{
{URLs: []string{"turn:turn.example.com:3478?transport=udp"}},
}
if errs := Validate(cfg); len(errs) == 0 {
t.Fatalf("expected validation errors")
}
}

View File

@@ -92,24 +92,40 @@ func (s *gatewayRTCSession) snapshot() map[string]interface{} {
}
type WebRTCTransport struct {
stunServers []string
iceServers []webrtc.ICEServer
mu sync.Mutex
sessions map[string]*gatewayRTCSession
signal map[string]WireSender
}
func NewWebRTCTransport(stunServers []string) *WebRTCTransport {
out := make([]string, 0, len(stunServers))
func NewWebRTCTransport(stunServers []string, extraICEServers ...webrtc.ICEServer) *WebRTCTransport {
out := make([]webrtc.ICEServer, 0, len(stunServers)+len(extraICEServers))
for _, server := range stunServers {
if v := strings.TrimSpace(server); v != "" {
out = append(out, v)
out = append(out, webrtc.ICEServer{URLs: []string{v}})
}
}
for _, server := range extraICEServers {
urls := make([]string, 0, len(server.URLs))
for _, raw := range server.URLs {
if v := strings.TrimSpace(raw); v != "" {
urls = append(urls, v)
}
}
if len(urls) == 0 {
continue
}
out = append(out, webrtc.ICEServer{
URLs: urls,
Username: strings.TrimSpace(server.Username),
Credential: server.Credential,
})
}
return &WebRTCTransport{
stunServers: out,
sessions: map[string]*gatewayRTCSession{},
signal: map[string]WireSender{},
iceServers: out,
sessions: map[string]*gatewayRTCSession{},
signal: map[string]WireSender{},
}
}
@@ -133,6 +149,7 @@ func (t *WebRTCTransport) Snapshot() map[string]interface{} {
return map[string]interface{}{
"transport": "webrtc",
"active_sessions": active,
"ice_servers": len(t.iceServers),
"nodes": nodes,
}
}
@@ -276,8 +293,8 @@ func (t *WebRTCTransport) ensureSession(nodeID string) (*gatewayRTCSession, erro
}
config := webrtc.Configuration{}
if len(t.stunServers) > 0 {
config.ICEServers = []webrtc.ICEServer{{URLs: append([]string(nil), t.stunServers...)}}
if len(t.iceServers) > 0 {
config.ICEServers = append([]webrtc.ICEServer(nil), t.iceServers...)
}
pc, err := webrtc.NewPeerConnection(config)
if err != nil {