From ea73a9444f108ae652ce48afc127503a7cbcc963 Mon Sep 17 00:00:00 2001 From: MatrixSeven Date: Mon, 2 Mar 2026 21:43:11 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=20WebRTC=20=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E9=80=9A=E9=81=93=E7=AE=A1=E7=90=86=E5=99=A8=E7=9A=84?= =?UTF-8?q?=20P2P=20=E5=92=8C=20WS=20=E4=B8=AD=E7=BB=A7=E6=A8=A1=E5=BC=8F?= =?UTF-8?q?=E6=94=AF=E6=8C=81=EF=BC=8C=E6=B7=BB=E5=8A=A0=E4=B8=AD=E7=BB=A7?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E4=BB=A5=E5=A4=84=E7=90=86=20P2P=20=E5=A4=B1?= =?UTF-8?q?=E8=B4=A5=E6=97=B6=E7=9A=84=E9=99=8D=E7=BA=A7=E6=96=B9=E6=A1=88?= =?UTF-8?q?=EF=BC=8C=E6=9B=B4=E6=96=B0=E7=9B=B8=E5=85=B3=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E5=92=8C=E7=8A=B6=E6=80=81=E7=AE=A1=E7=90=86=EF=BC=8C=E5=A2=9E?= =?UTF-8?q?=E5=BC=BA=E9=94=99=E8=AF=AF=E5=A4=84=E7=90=86=E5=92=8C=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E8=BD=AC=E5=8F=91=E5=8A=9F=E8=83=BD=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/components/ConnectionStatus.tsx | 46 ++- .../connection/useSharedWebRTCManager.ts | 7 +- .../connection/useWebRTCConnectionCore.ts | 279 ++++++++++++++- .../connection/useWebRTCDataChannelManager.ts | 264 ++++++++++---- chuan-next/src/hooks/ui/webRTCStore.ts | 6 + chuan-next/src/lib/config.ts | 1 + cmd/router.go | 4 + internal/handlers/handlers.go | 10 +- internal/services/relay_service.go | 323 ++++++++++++++++++ 9 files changed, 843 insertions(+), 97 deletions(-) create mode 100644 internal/services/relay_service.go diff --git a/chuan-next/src/components/ConnectionStatus.tsx b/chuan-next/src/components/ConnectionStatus.tsx index bf3e72d..5839e0d 100644 --- a/chuan-next/src/components/ConnectionStatus.tsx +++ b/chuan-next/src/components/ConnectionStatus.tsx @@ -14,11 +14,12 @@ interface ConnectionStatusProps { } // 连接状态枚举 -const getConnectionStatus = (connection: { isWebSocketConnected?: boolean; isPeerConnected?: boolean; isConnecting?: boolean; error?: string | null }, currentRoom: { code: string; role: 'sender' | 'receiver' } | null) => { +const getConnectionStatus = (connection: { isWebSocketConnected?: boolean; isPeerConnected?: boolean; isConnecting?: boolean; error?: string | null; transportMode?: string }, currentRoom: { code: string; role: 'sender' | 'receiver' } | null) => { const isWebSocketConnected = connection?.isWebSocketConnected || false; const isPeerConnected = connection?.isPeerConnected || false; const isConnecting = connection?.isConnecting || false; const error = connection?.error || null; + const transportMode = connection?.transportMode || 'p2p'; if (error) { return { @@ -32,7 +33,7 @@ const getConnectionStatus = (connection: { isWebSocketConnected?: boolean; isPee return { type: 'connecting' as const, message: '正在连接', - detail: '建立房间连接中...', + detail: transportMode === 'relay' ? '正在建立中继连接...' : '建立房间连接中...', }; } @@ -45,7 +46,6 @@ const getConnectionStatus = (connection: { isWebSocketConnected?: boolean; isPee } // 如果有房间信息但WebSocket未连接,且不是正在连接状态 - // 可能是状态更新的时序问题,显示连接中状态 if (!isWebSocketConnected && !isConnecting) { return { type: 'connecting' as const, @@ -63,6 +63,13 @@ const getConnectionStatus = (connection: { isWebSocketConnected?: boolean; isPee } if (isWebSocketConnected && isPeerConnected) { + if (transportMode === 'relay') { + return { + type: 'connected-relay' as const, + message: '服务器中继连接', + detail: 'P2P不可用,已自动切换到服务器中继传输', + }; + } return { type: 'connected' as const, message: 'P2P连接成功', @@ -82,6 +89,8 @@ const getStatusColor = (type: string) => { switch (type) { case 'connected': return 'text-green-600'; + case 'connected-relay': + return 'text-blue-600'; case 'connecting': case 'room-ready': return 'text-yellow-600'; @@ -101,6 +110,8 @@ const StatusIcon = ({ type, className = 'w-3 h-3' }: { type: string; className?: switch (type) { case 'connected': return
; + case 'connected-relay': + return
; case 'connecting': case 'room-ready': return ( @@ -116,15 +127,17 @@ const StatusIcon = ({ type, className = 'w-3 h-3' }: { type: string; className?: }; // 获取连接状态文字描述 -const getConnectionStatusText = (connection: { isWebSocketConnected?: boolean; isPeerConnected?: boolean; isConnecting?: boolean; error?: string | null }) => { +const getConnectionStatusText = (connection: { isWebSocketConnected?: boolean; isPeerConnected?: boolean; isConnecting?: boolean; error?: string | null; transportMode?: string }) => { const isWebSocketConnected = connection?.isWebSocketConnected || false; const isPeerConnected = connection?.isPeerConnected || false; const isConnecting = connection?.isConnecting || false; const error = connection?.error || null; + const transportMode = connection?.transportMode || 'p2p'; const wsStatus = isWebSocketConnected ? 'WS已连接' : 'WS未连接'; - const rtcStatus = isPeerConnected ? 'RTC已连接' : - isWebSocketConnected ? 'RTC等待连接' : 'RTC未连接'; + const modeLabel = transportMode === 'relay' ? '中继' : 'P2P'; + const rtcStatus = isPeerConnected ? `${modeLabel}已连接` : + isWebSocketConnected ? `${modeLabel}等待连接` : `${modeLabel}未连接`; if (error) { return `${wsStatus} ${rtcStatus} - 连接失败`; @@ -135,6 +148,9 @@ const getConnectionStatusText = (connection: { isWebSocketConnected?: boolean; i } if (isPeerConnected) { + if (transportMode === 'relay') { + return `${wsStatus} ${rtcStatus} - 服务器中继`; + } return `${wsStatus} ${rtcStatus} - P2P连接成功`; } @@ -153,6 +169,7 @@ export function ConnectionStatus(props: ConnectionStatusProps) { isPeerConnected: webrtcState.isPeerConnected, isConnecting: webrtcState.isConnecting, error: webrtcState.error, + transportMode: webrtcState.transportMode, }; const isConnected = webrtcState.isWebSocketConnected && webrtcState.isPeerConnected; @@ -163,6 +180,7 @@ export function ConnectionStatus(props: ConnectionStatusProps) { } const status = getConnectionStatus(connection, currentRoom ?? null); + const isRelay = webrtcState.transportMode === 'relay'; if (compact) { return ( @@ -182,10 +200,10 @@ export function ConnectionStatus(props: ConnectionStatusProps) { |
- RTC + {isRelay ? '中继' : 'RTC'}
@@ -221,15 +239,19 @@ export function ConnectionStatus(props: ConnectionStatusProps) { |
- RTC + {isRelay ? '中继' : 'RTC'} - {connection.isPeerConnected ? '已连接' : '未连接'} + {connection.isPeerConnected + ? (isRelay ? '中继已连接' : '已连接') + : '未连接'}
diff --git a/chuan-next/src/hooks/connection/useSharedWebRTCManager.ts b/chuan-next/src/hooks/connection/useSharedWebRTCManager.ts index a94f0b8..7123775 100644 --- a/chuan-next/src/hooks/connection/useSharedWebRTCManager.ts +++ b/chuan-next/src/hooks/connection/useSharedWebRTCManager.ts @@ -1,5 +1,5 @@ import { useCallback, useMemo } from 'react'; -import { useWebRTCStore, type WebRTCStateManager } from '../ui/webRTCStore'; +import { useWebRTCStore, type WebRTCStateManager, type TransportMode } from '../ui/webRTCStore'; import { useWebRTCDataChannelManager, WebRTCMessage } from './useWebRTCDataChannelManager'; import { useWebRTCTrackManager } from './useWebRTCTrackManager'; import { useWebRTCConnectionCore } from './useWebRTCConnectionCore'; @@ -17,6 +17,8 @@ export interface WebRTCConnection { isPeerConnected: boolean; error: string | null; canRetry: boolean; + // 传输模式 + transportMode: TransportMode; // 操作方法 connect: (roomCode: string, role: 'sender' | 'receiver') => Promise; @@ -61,6 +63,7 @@ export function useSharedWebRTCManager(): WebRTCConnection { error: store.error, canRetry: store.canRetry, currentRoom: store.currentRoom, + transportMode: store.transportMode, }), updateState: store.updateState, setCurrentRoom: store.setCurrentRoom, @@ -87,6 +90,7 @@ export function useSharedWebRTCManager(): WebRTCConnection { isPeerConnected: store.isPeerConnected, error: store.error, canRetry: store.canRetry, + transportMode: store.transportMode, }; // 创建 createOfferNow 方法 @@ -115,6 +119,7 @@ export function useSharedWebRTCManager(): WebRTCConnection { isPeerConnected: state.isPeerConnected, error: state.error, canRetry: state.canRetry, + transportMode: state.transportMode, // 操作方法 connect: connectionCore.connect, diff --git a/chuan-next/src/hooks/connection/useWebRTCConnectionCore.ts b/chuan-next/src/hooks/connection/useWebRTCConnectionCore.ts index ffbc6e3..c433f15 100644 --- a/chuan-next/src/hooks/connection/useWebRTCConnectionCore.ts +++ b/chuan-next/src/hooks/connection/useWebRTCConnectionCore.ts @@ -3,12 +3,12 @@ import { useRef, useCallback } from 'react'; import { getWsUrl } from '@/lib/config'; import { getIceServersConfig } from '../settings/useIceServersConfig'; import { WebRTCStateManager } from '../ui/webRTCStore'; -import { WebRTCDataChannelManager, WebRTCMessage } from './useWebRTCDataChannelManager'; +import { WebRTCDataChannelManager } from './useWebRTCDataChannelManager'; import { WebRTCTrackManager } from './useWebRTCTrackManager'; /** * WebRTC 核心连接管理器 - * 负责基础的 WebRTC 连接管理 + * 负责基础的 WebRTC 连接管理,支持 P2P → WS Relay 自动降级 */ export interface WebRTCConnectionCore { // 连接到房间 @@ -48,6 +48,13 @@ export function useWebRTCConnectionCore( // 用于跟踪是否是用户主动断开连接 const isUserDisconnecting = useRef(false); + + // 中继降级相关 + const relayWsRef = useRef(null); + const isRelayFallbackInProgress = useRef(false); + const p2pFailureTimeout = useRef(null); + // 标记是否已经发送过 relay-request(避免重复发送) + const relayRequestSent = useRef(false); // 清理连接 const cleanup = useCallback((shouldNotifyDisconnect: boolean = false) => { @@ -57,12 +64,26 @@ export function useWebRTCConnectionCore( clearTimeout(timeoutRef.current); timeoutRef.current = null; } + + if (p2pFailureTimeout.current) { + clearTimeout(p2pFailureTimeout.current); + p2pFailureTimeout.current = null; + } if (pcRef.current) { pcRef.current.close(); pcRef.current = null; } + // 关闭中继连接 + dataChannelManager.closeRelay(); + if (relayWsRef.current) { + relayWsRef.current.close(); + relayWsRef.current = null; + } + isRelayFallbackInProgress.current = false; + relayRequestSent.current = false; + // 在清理 WebSocket 之前发送断开通知 if (shouldNotifyDisconnect && wsRef.current && wsRef.current.readyState === WebSocket.OPEN) { try { @@ -83,7 +104,206 @@ export function useWebRTCConnectionCore( currentRoom.current = null; isUserDisconnecting.current = false; // 重置主动断开标志 - }, []); + }, [dataChannelManager]); + + // ===== 连接到中继服务器(实际的 WS 连接逻辑) ===== + const connectToRelay = useCallback(() => { + const room = currentRoom.current; + if (!room) { + console.warn('[ConnectionCore] 没有房间信息,无法连接中继'); + return; + } + + if (isRelayFallbackInProgress.current) { + console.log('[ConnectionCore] ⏭️ 中继连接已在进行中,跳过'); + return; + } + + if (isUserDisconnecting.current) { + console.log('[ConnectionCore] 用户正在主动断开,跳过中继'); + return; + } + + isRelayFallbackInProgress.current = true; + console.log('[ConnectionCore] 🔄 连接到中继服务器...'); + + // 更新状态:正在降级 + stateManager.updateState({ + error: null, + isConnecting: true, + canRetry: false, + }); + + const baseWsUrl = getWsUrl(); + if (!baseWsUrl) { + console.error('[ConnectionCore] 无法获取 WS URL,中继连接失败'); + isRelayFallbackInProgress.current = false; + stateManager.updateState({ + error: '中继连接失败:无法获取服务器地址', + isConnecting: false, + canRetry: true, + }); + return; + } + + const relayUrl = `${baseWsUrl}/api/ws/relay?code=${room.code}&role=${room.role}`; + console.log('[ConnectionCore] 🌐 连接中继服务器:', relayUrl); + + try { + const relayWs = new WebSocket(relayUrl); + relayWsRef.current = relayWs; + + relayWs.binaryType = 'arraybuffer'; + + relayWs.onopen = () => { + console.log('[ConnectionCore] ✅ 中继 WebSocket 连接已建立'); + }; + + // 统一的消息处理器:控制消息在这里处理,数据消息转发给 dataChannelManager + relayWs.onmessage = (event: MessageEvent) => { + // 文本消息:先检查是否是中继控制消息 + if (typeof event.data === 'string') { + try { + const msg = JSON.parse(event.data); + + // 中继服务的控制消息 + if (msg.type === 'relay-ready') { + console.log('[ConnectionCore] 📡 中继已就绪, 对方在线:', msg.peer_connected); + if (msg.peer_connected) { + console.log('[ConnectionCore] 🎉 双方已通过中继连接,切换传输通道'); + dataChannelManager.switchToRelay(relayWs); + isRelayFallbackInProgress.current = false; + stateManager.updateState({ + isConnected: true, + isConnecting: false, + isPeerConnected: true, + error: null, + canRetry: false, + transportMode: 'relay', + }); + } + // peer_connected === false: 等待对方也连接到中继 + return; + } + + if (msg.type === 'relay-peer-joined') { + console.log('[ConnectionCore] 🎉 对方已加入中继房间,切换传输通道'); + dataChannelManager.switchToRelay(relayWs); + isRelayFallbackInProgress.current = false; + stateManager.updateState({ + isConnected: true, + isConnecting: false, + isPeerConnected: true, + error: null, + canRetry: false, + transportMode: 'relay', + }); + return; + } + + if (msg.type === 'relay-peer-left') { + console.log('[ConnectionCore] 🔌 对方离开中继房间'); + stateManager.updateState({ + isPeerConnected: false, + isConnected: false, + error: '对方已离开房间', + canRetry: true, + }); + return; + } + + if (msg.type === 'error') { + console.error('[ConnectionCore] 中继服务错误:', msg.error); + isRelayFallbackInProgress.current = false; + stateManager.updateState({ + error: `中继连接失败: ${msg.error}`, + isConnecting: false, + canRetry: true, + }); + return; + } + } catch { + // 不是合法 JSON 或不是控制消息,当作数据消息处理 + } + } + + // 非控制消息 → 交给 dataChannelManager 分发给业务层 + dataChannelManager.handleRelayMessage(event); + }; + + relayWs.onerror = (error) => { + console.error('[ConnectionCore] ❌ 中继 WebSocket 错误:', error); + isRelayFallbackInProgress.current = false; + stateManager.updateState({ + error: 'WS 中继连接失败,请重试', + isConnecting: false, + canRetry: true, + }); + }; + + relayWs.onclose = (event) => { + console.log('[ConnectionCore] 🔌 中继 WebSocket 关闭:', event.code, event.reason); + if (relayWsRef.current === relayWs) { + relayWsRef.current = null; + } + isRelayFallbackInProgress.current = false; + + // 如果不是用户主动断开,且当前是中继模式 + if (!isUserDisconnecting.current && stateManager.getState().transportMode === 'relay') { + stateManager.updateState({ + isConnected: false, + isPeerConnected: false, + error: '中继连接断开', + canRetry: true, + }); + } + }; + + } catch (error) { + console.error('[ConnectionCore] 创建中继连接失败:', error); + isRelayFallbackInProgress.current = false; + stateManager.updateState({ + error: '无法建立中继连接,请重试', + isConnecting: false, + canRetry: true, + }); + } + }, [stateManager, dataChannelManager]); + + // ===== 发起中继降级(通知对方 + 自己连接) ===== + const initiateRelayFallback = useCallback(() => { + if (relayRequestSent.current || isRelayFallbackInProgress.current) { + console.log('[ConnectionCore] ⏭️ 中继降级已发起/进行中,跳过'); + return; + } + + const room = currentRoom.current; + if (!room) { + console.warn('[ConnectionCore] 没有房间信息,无法降级'); + return; + } + + if (isUserDisconnecting.current) { + console.log('[ConnectionCore] 用户正在主动断开,跳过降级'); + return; + } + + console.log('[ConnectionCore] 🔄 P2P 连接失败,通过信令通知对方切换中继...'); + relayRequestSent.current = true; + + // 通过信令 WS 通知对方也连接到中继 + const ws = wsRef.current; + if (ws && ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify({ + type: 'relay-request', + payload: { reason: 'P2P连接失败' } + })); + console.log('[ConnectionCore] 📤 已通过信令通知对方切换中继'); + } + + // 自己也连接到中继 + connectToRelay(); + }, [connectToRelay]); // 创建 PeerConnection 和相关设置 const createPeerConnection = useCallback((ws: WebSocket, role: 'sender' | 'receiver', isReconnect: boolean = false) => { @@ -138,10 +358,15 @@ export function useWebRTCConnectionCore( case 'connected': case 'completed': console.log('[ConnectionCore] ✅ ICE连接成功'); + // ICE 连接成功,清除降级定时器 + if (p2pFailureTimeout.current) { + clearTimeout(p2pFailureTimeout.current); + p2pFailureTimeout.current = null; + } break; case 'failed': - console.error('[ConnectionCore] ❌ ICE连接失败'); - stateManager.updateState({ error: 'ICE连接失败,可能是网络防火墙阻止了连接', isConnecting: false, canRetry: true }); + console.error('[ConnectionCore] ❌ ICE连接失败,启动中继降级'); + initiateRelayFallback(); break; case 'disconnected': console.log('[ConnectionCore] 🔌 ICE连接断开'); @@ -158,22 +383,38 @@ export function useWebRTCConnectionCore( case 'connecting': console.log('[ConnectionCore] 🔄 WebRTC正在连接中...'); stateManager.updateState({ isPeerConnected: false }); + + // 设置 P2P 连接超时:15 秒后如果还没连上就降级 + if (p2pFailureTimeout.current) { + clearTimeout(p2pFailureTimeout.current); + } + p2pFailureTimeout.current = setTimeout(() => { + if (pcRef.current && pcRef.current.connectionState !== 'connected') { + console.log('[ConnectionCore] ⏰ P2P 连接超时(15秒),启动中继降级'); + initiateRelayFallback(); + } + }, 15000); break; case 'connected': console.log('[ConnectionCore] 🎉 WebRTC P2P连接已完全建立,可以进行媒体传输'); + // 清除降级定时器 + if (p2pFailureTimeout.current) { + clearTimeout(p2pFailureTimeout.current); + p2pFailureTimeout.current = null; + } // 确保所有连接状态都正确更新 stateManager.updateState({ isWebSocketConnected: true, isConnected: true, isPeerConnected: true, error: null, - canRetry: false + canRetry: false, + transportMode: 'p2p', }); // 如果是重新连接,触发数据同步 if (isReconnect) { console.log('[ConnectionCore] 🔄 检测到重新连接,触发数据同步'); - // 发送同步请求消息 setTimeout(() => { const dc = pcRef.current?.createDataChannel('sync-channel'); if (dc && dc.readyState === 'open') { @@ -184,12 +425,14 @@ export function useWebRTCConnectionCore( console.log('[ConnectionCore] 📤 发送数据同步请求'); dc.close(); } - }, 500); // 等待数据通道完全稳定 + }, 500); } break; case 'failed': - console.error('[ConnectionCore] ❌ WebRTC连接失败'); - stateManager.updateState({ error: 'WebRTC连接失败,请检查网络设置或重试', isPeerConnected: false, canRetry: true }); + console.error('[ConnectionCore] ❌ WebRTC连接失败,启动中继降级'); + stateManager.updateState({ isPeerConnected: false }); + // P2P 连接失败,自动降级到中继 + initiateRelayFallback(); break; case 'disconnected': console.log('[ConnectionCore] 🔌 WebRTC连接已断开'); @@ -207,7 +450,7 @@ export function useWebRTCConnectionCore( console.log('[ConnectionCore] ✅ PeerConnection创建完成,角色:', role, '是否重新连接:', isReconnect); return pc; - }, [stateManager, dataChannelManager]); + }, [stateManager, dataChannelManager, initiateRelayFallback]); // 连接到房间 const connect = useCallback(async (roomCode: string, role: 'sender' | 'receiver') => { @@ -278,11 +521,10 @@ export function useWebRTCConnectionCore( console.log('[ConnectionCore] 👥 对方已加入房间,角色:', message.payload?.role); if (role === 'sender' && message.payload?.role === 'receiver') { console.log('[ConnectionCore] 🚀 接收方已连接,发送方开始建立P2P连接'); - // 确保WebSocket连接状态正确更新 + // 标记对方已加入,但 isPeerConnected 在 P2P/Relay 真正连通后才设为 true stateManager.updateState({ isWebSocketConnected: true, isConnected: true, - isPeerConnected: true // 标记对方已加入,可以开始P2P }); // 如果是重新连接,先清理旧的PeerConnection @@ -308,11 +550,10 @@ export function useWebRTCConnectionCore( } } else if (role === 'receiver' && message.payload?.role === 'sender') { console.log('[ConnectionCore] 🚀 发送方已连接,接收方准备接收P2P连接'); - // 确保WebSocket连接状态正确更新 + // 标记对方已加入,但 isPeerConnected 在 P2P/Relay 真正连通后才设为 true stateManager.updateState({ isWebSocketConnected: true, isConnected: true, - isPeerConnected: true // 标记对方已加入 }); // 如果是重新连接,先清理旧的PeerConnection @@ -454,6 +695,12 @@ export function useWebRTCConnectionCore( stateManager.updateState({ error: message.error, isConnecting: false, canRetry: true }); break; + case 'relay-request': + // 对方的 P2P 失败,请求双方都切换到中继模式 + console.log('[ConnectionCore] 📨 收到对方的中继降级请求'); + connectToRelay(); + break; + case 'disconnection': console.log('[ConnectionCore] 🔌 对方主动断开连接'); // 对方断开连接的处理 @@ -509,7 +756,7 @@ export function useWebRTCConnectionCore( canRetry: true }); } - }, [stateManager, cleanup, createPeerConnection]); + }, [stateManager, cleanup, createPeerConnection, connectToRelay]); // 断开连接 const disconnect = useCallback((shouldNotifyDisconnect: boolean = false) => { diff --git a/chuan-next/src/hooks/connection/useWebRTCDataChannelManager.ts b/chuan-next/src/hooks/connection/useWebRTCDataChannelManager.ts index e25f7b8..197214c 100644 --- a/chuan-next/src/hooks/connection/useWebRTCDataChannelManager.ts +++ b/chuan-next/src/hooks/connection/useWebRTCDataChannelManager.ts @@ -14,16 +14,25 @@ export type DataHandler = (data: ArrayBuffer) => void; /** * WebRTC 数据通道管理器 - * 负责数据通道的创建和管理 + * 负责数据通道的创建和管理,支持 P2P DataChannel 和 WS Relay 两种传输模式 */ export interface WebRTCDataChannelManager { - // 创建数据通道 + // 创建数据通道 (P2P 模式) createDataChannel: (pc: RTCPeerConnection, role: 'sender' | 'receiver', isReconnect?: boolean) => void; - // 发送消息 + // 切换到 WS 中继模式(仅设置发送引用,不设置事件监听) + switchToRelay: (relayWs: WebSocket) => void; + + // 关闭中继连接 + closeRelay: () => void; + + // 处理中继收到的数据消息(由 ConnectionCore 的 onmessage 调用) + handleRelayMessage: (event: MessageEvent) => void; + + // 发送消息(自动选择可用通道) sendMessage: (message: WebRTCMessage, channel?: string) => boolean; - // 发送二进制数据 + // 发送二进制数据(自动选择可用通道) sendData: (data: ArrayBuffer) => boolean; // 注册消息处理器 @@ -32,27 +41,40 @@ export interface WebRTCDataChannelManager { // 注册数据处理器 registerDataHandler: (channel: string, handler: DataHandler) => () => void; - // 获取数据通道状态 + // 获取数据通道状态(兼容 RTCDataChannelState) getChannelState: () => RTCDataChannelState; - // 处理数据通道消息 + // 处理数据通道消息 (P2P) handleDataChannelMessage: (event: MessageEvent) => void; } /** * WebRTC 数据通道管理 Hook * 负责数据通道的创建和管理,处理数据通道消息的发送和接收 + * 支持 P2P DataChannel 和 WS Relay 两种传输模式,对上层透明 */ export function useWebRTCDataChannelManager( stateManager: WebRTCStateManager ): WebRTCDataChannelManager { const dcRef = useRef(null); + // WS 中继通道 + const relayWsRef = useRef(null); // 多通道消息处理器 const messageHandlers = useRef>(new Map()); const dataHandlers = useRef>(new Map()); - // 创建数据通道 + // 判断当前是否处于中继模式 + const isRelayMode = useCallback(() => { + return relayWsRef.current !== null && relayWsRef.current.readyState === WebSocket.OPEN; + }, []); + + // 判断 P2P 数据通道是否可用 + const isP2PAvailable = useCallback(() => { + return dcRef.current !== null && dcRef.current.readyState === 'open'; + }, []); + + // 创建数据通道 (P2P 模式) const createDataChannel = useCallback(( pc: RTCPeerConnection, role: 'sender' | 'receiver', @@ -77,6 +99,13 @@ export function useWebRTCDataChannelManager( dataChannel.onopen = () => { console.log('[DataChannelManager] 数据通道已打开 (发送方)'); + // 如果之前在中继模式,切回 P2P + if (relayWsRef.current) { + console.log('[DataChannelManager] P2P 恢复,关闭中继通道'); + relayWsRef.current.close(); + relayWsRef.current = null; + stateManager.updateState({ transportMode: 'p2p' }); + } // 确保所有连接状态都正确更新 stateManager.updateState({ isWebSocketConnected: true, @@ -90,7 +119,6 @@ export function useWebRTCDataChannelManager( // 如果是重新连接,触发数据同步 if (isReconnect) { console.log('[DataChannelManager] 发送方重新连接,数据通道已打开,准备同步数据'); - // 发送同步请求消息 setTimeout(() => { if (dataChannel.readyState === 'open') { dataChannel.send(JSON.stringify({ @@ -99,7 +127,7 @@ export function useWebRTCDataChannelManager( })); console.log('[DataChannelManager] 发送方发送数据同步请求'); } - }, 300); // 等待数据通道完全稳定 + }, 300); } }; @@ -109,11 +137,9 @@ export function useWebRTCDataChannelManager( dataChannel.onerror = (error) => { console.error('[DataChannelManager] 数据通道错误:', error); - // 获取更详细的错误信息 let errorMessage = '数据通道连接失败'; let shouldRetry = false; - // 根据数据通道状态提供更具体的错误信息 switch (dataChannel.readyState) { case 'connecting': errorMessage = '数据通道正在连接中,请稍候...'; @@ -127,7 +153,6 @@ export function useWebRTCDataChannelManager( shouldRetry = true; break; default: - // 检查PeerConnection状态 if (pc) { switch (pc.connectionState) { case 'failed': @@ -147,12 +172,15 @@ export function useWebRTCDataChannelManager( console.error(`[DataChannelManager] 数据通道详细错误 - 状态: ${dataChannel.readyState}, 消息: ${errorMessage}, 建议重试: ${shouldRetry}`); - stateManager.updateState({ - error: errorMessage, - isConnecting: false, - isPeerConnected: false, // 数据通道出错时,P2P连接肯定不可用 - canRetry: shouldRetry // 设置是否可以重试 - }); + // 如果已经在中继模式,不更新错误状态 + if (!isRelayMode()) { + stateManager.updateState({ + error: errorMessage, + isConnecting: false, + isPeerConnected: false, + canRetry: shouldRetry + }); + } }; } else { pc.ondatachannel = (event) => { @@ -161,7 +189,13 @@ export function useWebRTCDataChannelManager( dataChannel.onopen = () => { console.log('[DataChannelManager] 数据通道已打开 (接收方)'); - // 确保所有连接状态都正确更新 + // 如果之前在中继模式,切回 P2P + if (relayWsRef.current) { + console.log('[DataChannelManager] P2P 恢复,关闭中继通道'); + relayWsRef.current.close(); + relayWsRef.current = null; + stateManager.updateState({ transportMode: 'p2p' }); + } stateManager.updateState({ isWebSocketConnected: true, isConnected: true, @@ -171,10 +205,8 @@ export function useWebRTCDataChannelManager( canRetry: false }); - // 如果是重新连接,触发数据同步 if (isReconnect) { console.log('[DataChannelManager] 接收方重新连接,数据通道已打开,准备同步数据'); - // 发送同步请求消息 setTimeout(() => { if (dataChannel.readyState === 'open') { dataChannel.send(JSON.stringify({ @@ -183,7 +215,7 @@ export function useWebRTCDataChannelManager( })); console.log('[DataChannelManager] 接收方发送数据同步请求'); } - }, 300); // 等待数据通道完全稳定 + }, 300); } }; @@ -192,11 +224,9 @@ export function useWebRTCDataChannelManager( dataChannel.onerror = (error) => { console.error('[DataChannelManager] 数据通道错误 (接收方):', error); - // 获取更详细的错误信息 let errorMessage = '数据通道连接失败'; let shouldRetry = false; - // 根据数据通道状态提供更具体的错误信息 switch (dataChannel.readyState) { case 'connecting': errorMessage = '数据通道正在连接中,请稍候...'; @@ -210,7 +240,6 @@ export function useWebRTCDataChannelManager( shouldRetry = true; break; default: - // 检查PeerConnection状态 if (pc) { switch (pc.connectionState) { case 'failed': @@ -230,12 +259,15 @@ export function useWebRTCDataChannelManager( console.error(`[DataChannelManager] 数据通道详细错误 (接收方) - 状态: ${dataChannel.readyState}, 消息: ${errorMessage}, 建议重试: ${shouldRetry}`); - stateManager.updateState({ - error: errorMessage, - isConnecting: false, - isPeerConnected: false, // 数据通道出错时,P2P连接肯定不可用 - canRetry: shouldRetry // 设置是否可以重试 - }); + // 如果已经在中继模式,不更新错误状态 + if (!isRelayMode()) { + stateManager.updateState({ + error: errorMessage, + isConnecting: false, + isPeerConnected: false, + canRetry: shouldRetry + }); + } }; }; } @@ -243,21 +275,81 @@ export function useWebRTCDataChannelManager( console.log('[DataChannelManager] 数据通道创建完成,角色:', role, '是否重新连接:', isReconnect); }, [stateManager]); - // 处理数据通道消息 - const handleDataChannelMessage = useCallback((event: MessageEvent) => { + // 切换到 WS 中继模式 - 仅设置发送引用 + // 事件监听由 ConnectionCore 的 initiateRelayFallback 统一管理 + const switchToRelay = useCallback((relayWs: WebSocket) => { + console.log('[DataChannelManager] 🔄 切换到 WS 中继模式(设置发送引用)'); + relayWsRef.current = relayWs; + }, []); + + // 关闭中继连接 + const closeRelay = useCallback(() => { + if (relayWsRef.current) { + console.log('[DataChannelManager] 关闭中继连接'); + relayWsRef.current.close(); + relayWsRef.current = null; + } + }, []); + + // 处理中继收到的数据消息(由 ConnectionCore 分发调用) + const handleRelayMessage = useCallback((event: MessageEvent) => { if (typeof event.data === 'string') { try { const message = JSON.parse(event.data) as WebRTCMessage; - console.log('[DataChannelManager] 收到消息:', message.type, message.channel || 'default'); + console.log('[DataChannelManager:Relay] 收到中继消息:', message.type, message.channel || 'default'); + + if (message.channel) { + const handler = messageHandlers.current.get(message.channel); + if (handler) { + handler(message); + } + } else { + messageHandlers.current.forEach(handler => handler(message)); + } + } catch (error) { + console.error('[DataChannelManager:Relay] 解析中继消息失败:', error); + } + } else if (event.data instanceof ArrayBuffer) { + console.log('[DataChannelManager:Relay] 收到中继二进制数据:', event.data.byteLength, 'bytes'); + const fileHandler = dataHandlers.current.get('file-transfer'); + if (fileHandler) { + fileHandler(event.data); + } else { + const firstHandler = dataHandlers.current.values().next().value; + if (firstHandler) { + firstHandler(event.data); + } + } + } else if (event.data instanceof Blob) { + // WebSocket 某些情况下收到 Blob + event.data.arrayBuffer().then((buffer: ArrayBuffer) => { + console.log('[DataChannelManager:Relay] 收到中继二进制数据(Blob):', buffer.byteLength, 'bytes'); + const fileHandler = dataHandlers.current.get('file-transfer'); + if (fileHandler) { + fileHandler(buffer); + } else { + const firstHandler = dataHandlers.current.values().next().value; + if (firstHandler) { + firstHandler(buffer); + } + } + }); + } + }, []); + + // 处理数据通道消息 (P2P 模式) + const handleDataChannelMessage = useCallback((event: MessageEvent) => { + if (typeof event.data === 'string') { + try { + const message = JSON.parse(event.data) as WebRTCMessage; + console.log('[DataChannelManager] 收到消息:', message.type, message.channel || 'default'); - // 根据通道分发消息 if (message.channel) { const handler = messageHandlers.current.get(message.channel); if (handler) { handler(message); } } else { - // 兼容旧版本,广播给所有处理器 messageHandlers.current.forEach(handler => handler(message)); } } catch (error) { @@ -266,12 +358,10 @@ export function useWebRTCDataChannelManager( } else if (event.data instanceof ArrayBuffer) { console.log('[DataChannelManager] 收到数据:', event.data.byteLength, 'bytes'); - // 数据优先发给文件传输处理器 const fileHandler = dataHandlers.current.get('file-transfer'); if (fileHandler) { fileHandler(event.data); } else { - // 如果没有文件处理器,发给第一个处理器 const firstHandler = dataHandlers.current.values().next().value; if (firstHandler) { firstHandler(event.data); @@ -280,42 +370,67 @@ export function useWebRTCDataChannelManager( } }, []); - // 发送消息 + // 发送消息 - 自动选择可用通道(P2P 优先,否则用中继) const sendMessage = useCallback((message: WebRTCMessage, channel?: string) => { - const dataChannel = dcRef.current; - if (!dataChannel || dataChannel.readyState !== 'open') { - console.error('[DataChannelManager] 数据通道未准备就绪'); - return false; + const messageWithChannel = channel ? { ...message, channel } : message; + const jsonStr = JSON.stringify(messageWithChannel); + + // 优先使用 P2P DataChannel + if (isP2PAvailable()) { + try { + dcRef.current!.send(jsonStr); + console.log('[DataChannelManager:P2P] 发送消息:', message.type, channel || 'default'); + return true; + } catch (error) { + console.error('[DataChannelManager:P2P] 发送消息失败:', error); + // P2P 发送失败,尝试中继 + } } - try { - const messageWithChannel = channel ? { ...message, channel } : message; - dataChannel.send(JSON.stringify(messageWithChannel)); - console.log('[DataChannelManager] 发送消息:', message.type, channel || 'default'); - return true; - } catch (error) { - console.error('[DataChannelManager] 发送消息失败:', error); - return false; + // 回退到 WS 中继 + if (isRelayMode()) { + try { + relayWsRef.current!.send(jsonStr); + console.log('[DataChannelManager:Relay] 发送消息:', message.type, channel || 'default'); + return true; + } catch (error) { + console.error('[DataChannelManager:Relay] 发送消息失败:', error); + return false; + } } - }, []); - // 发送二进制数据 + console.error('[DataChannelManager] 没有可用的传输通道'); + return false; + }, [isP2PAvailable, isRelayMode]); + + // 发送二进制数据 - 自动选择可用通道 const sendData = useCallback((data: ArrayBuffer) => { - const dataChannel = dcRef.current; - if (!dataChannel || dataChannel.readyState !== 'open') { - console.error('[DataChannelManager] 数据通道未准备就绪'); - return false; + // 优先使用 P2P DataChannel + if (isP2PAvailable()) { + try { + dcRef.current!.send(data); + console.log('[DataChannelManager:P2P] 发送数据:', data.byteLength, 'bytes'); + return true; + } catch (error) { + console.error('[DataChannelManager:P2P] 发送数据失败:', error); + } } - try { - dataChannel.send(data); - console.log('[DataChannelManager] 发送数据:', data.byteLength, 'bytes'); - return true; - } catch (error) { - console.error('[DataChannelManager] 发送数据失败:', error); - return false; + // 回退到 WS 中继 + if (isRelayMode()) { + try { + relayWsRef.current!.send(data); + console.log('[DataChannelManager:Relay] 发送数据:', data.byteLength, 'bytes'); + return true; + } catch (error) { + console.error('[DataChannelManager:Relay] 发送数据失败:', error); + return false; + } } - }, []); + + console.error('[DataChannelManager] 没有可用的传输通道'); + return false; + }, [isP2PAvailable, isRelayMode]); // 注册消息处理器 const registerMessageHandler = useCallback((channel: string, handler: MessageHandler) => { @@ -339,13 +454,28 @@ export function useWebRTCDataChannelManager( }; }, []); - // 获取数据通道状态 - const getChannelState = useCallback(() => { + // 获取数据通道状态 - 综合 P2P 和 Relay 状态 + const getChannelState = useCallback((): RTCDataChannelState => { + // P2P 通道打开时优先返回 + if (dcRef.current?.readyState === 'open') { + return 'open'; + } + // 中继模式可用 + if (relayWsRef.current?.readyState === WebSocket.OPEN) { + return 'open'; + } + // P2P 通道正在连接 + if (dcRef.current?.readyState === 'connecting') { + return 'connecting'; + } return dcRef.current?.readyState || 'closed'; }, []); return { createDataChannel, + switchToRelay, + closeRelay, + handleRelayMessage, sendMessage, sendData, registerMessageHandler, diff --git a/chuan-next/src/hooks/ui/webRTCStore.ts b/chuan-next/src/hooks/ui/webRTCStore.ts index d392133..447eeee 100644 --- a/chuan-next/src/hooks/ui/webRTCStore.ts +++ b/chuan-next/src/hooks/ui/webRTCStore.ts @@ -1,5 +1,8 @@ import { create } from 'zustand'; +// 传输模式 +export type TransportMode = 'p2p' | 'relay'; + export interface WebRTCState { isConnected: boolean; isConnecting: boolean; @@ -8,6 +11,8 @@ export interface WebRTCState { error: string | null; canRetry: boolean; currentRoom: { code: string; role: 'sender' | 'receiver' } | null; + // 传输模式:p2p 直连 | relay 服务器中继 + transportMode: TransportMode; } /** @@ -37,6 +42,7 @@ const initialState: WebRTCState = { error: null, canRetry: false, currentRoom: null, + transportMode: 'p2p', }; export const useWebRTCStore = create((set) => ({ diff --git a/chuan-next/src/lib/config.ts b/chuan-next/src/lib/config.ts index 10d1fdb..5cbc68b 100644 --- a/chuan-next/src/lib/config.ts +++ b/chuan-next/src/lib/config.ts @@ -26,6 +26,7 @@ const getCurrentBaseUrl = () => { // 动态获取 WebSocket URL - 总是在客户端运行时计算 const getCurrentWsUrl = () => { + return `ws://${window.location.hostname}:8080`; if (typeof window !== 'undefined') { // 检查是否是 Next.js 开发服务器(端口 3000 或 3001) const isNextDevServer = window.location.hostname === 'localhost' && diff --git a/cmd/router.go b/cmd/router.go index 00b2bfd..8b132cb 100644 --- a/cmd/router.go +++ b/cmd/router.go @@ -53,6 +53,10 @@ func setupAPIRoutes(r *chi.Mux, h *handlers.Handler) { r.Get("/api/ws/webrtc", h.HandleWebRTCWebSocket) r.Get("/ws/webrtc", h.HandleWebRTCWebSocket) + // WebSocket 数据中继路由(P2P降级方案) + r.Get("/api/ws/relay", h.HandleRelayWebSocket) + r.Get("/ws/relay", h.HandleRelayWebSocket) + // WebRTC房间API r.Post("/api/create-room", h.CreateRoomHandler) r.Get("/api/room-info", h.WebRTCRoomStatusHandler) diff --git a/internal/handlers/handlers.go b/internal/handlers/handlers.go index b2b8faa..343fe70 100644 --- a/internal/handlers/handlers.go +++ b/internal/handlers/handlers.go @@ -10,14 +10,22 @@ import ( type Handler struct { webrtcService *services.WebRTCService + relayService *services.RelayService } func NewHandler() *Handler { + webrtcService := services.NewWebRTCService() return &Handler{ - webrtcService: services.NewWebRTCService(), + webrtcService: webrtcService, + relayService: services.NewRelayService(webrtcService), } } +// HandleRelayWebSocket 处理数据中继WebSocket连接(P2P失败时的降级方案) +func (h *Handler) HandleRelayWebSocket(w http.ResponseWriter, r *http.Request) { + h.relayService.HandleRelayWebSocket(w, r) +} + // HandleWebRTCWebSocket 处理WebRTC信令WebSocket连接 func (h *Handler) HandleWebRTCWebSocket(w http.ResponseWriter, r *http.Request) { h.webrtcService.HandleWebSocket(w, r) diff --git a/internal/services/relay_service.go b/internal/services/relay_service.go new file mode 100644 index 0000000..82cf358 --- /dev/null +++ b/internal/services/relay_service.go @@ -0,0 +1,323 @@ +package services + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +// RelayService 处理 WebSocket 数据中继(当 P2P 失败时的降级方案) +type RelayService struct { + rooms map[string]*RelayRoom + roomsMux sync.RWMutex + upgrader websocket.Upgrader + // 复用 WebRTCService 来验证房间 + webrtcService *WebRTCService +} + +// RelayRoom 中继房间 +type RelayRoom struct { + Code string + Sender *RelayClient + Receiver *RelayClient + CreatedAt time.Time + mu sync.Mutex +} + +// RelayClient 中继客户端 +type RelayClient struct { + ID string + Role string // "sender" or "receiver" + Connection *websocket.Conn + mu sync.Mutex +} + +// RelayMessage 中继消息的包装格式 +type RelayMessage struct { + Type string `json:"type"` // "relay-data" | "relay-binary" | "relay-ready" | "relay-ping" | "relay-pong" + Channel string `json:"channel,omitempty"` // 逻辑通道:file-transfer, text-transfer + Payload json.RawMessage `json:"payload,omitempty"` // JSON 消息体 +} + +func NewRelayService(webrtcService *WebRTCService) *RelayService { + return &RelayService{ + rooms: make(map[string]*RelayRoom), + roomsMux: sync.RWMutex{}, + webrtcService: webrtcService, + upgrader: websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, + // 增大消息尺寸限制以支持文件传输(10MB) + ReadBufferSize: 10 * 1024 * 1024, + WriteBufferSize: 10 * 1024 * 1024, + }, + } +} + +// HandleRelayWebSocket 处理中继 WebSocket 连接 +func (rs *RelayService) HandleRelayWebSocket(w http.ResponseWriter, r *http.Request) { + log.Printf("[Relay] 收到中继 WebSocket 连接请求: %s", r.URL.String()) + + conn, err := rs.upgrader.Upgrade(w, r, nil) + if err != nil { + log.Printf("[Relay] WebSocket 升级失败: %v", err) + return + } + // 设置最大消息大小为 10MB + conn.SetReadLimit(10 * 1024 * 1024) + defer conn.Close() + + // 获取参数 + code := r.URL.Query().Get("code") + role := r.URL.Query().Get("role") + + log.Printf("[Relay] 连接参数: code=%s, role=%s", code, role) + + if code == "" || (role != "sender" && role != "receiver") { + log.Printf("[Relay] 参数无效: code=%s, role=%s", code, role) + conn.WriteJSON(map[string]interface{}{ + "type": "error", + "error": "连接参数无效", + }) + return + } + + // 验证房间是否存在(通过 WebRTC service 验证) + status := rs.webrtcService.GetRoomStatus(code) + exists, _ := status["exists"].(bool) + if !exists { + log.Printf("[Relay] 房间不存在: %s", code) + conn.WriteJSON(map[string]interface{}{ + "type": "error", + "error": "房间不存在或已过期", + }) + return + } + + // 创建或获取中继房间 + rs.roomsMux.Lock() + room, ok := rs.rooms[code] + if !ok { + room = &RelayRoom{ + Code: code, + CreatedAt: time.Now(), + } + rs.rooms[code] = room + } + rs.roomsMux.Unlock() + + // 创建客户端 + client := &RelayClient{ + ID: rs.webrtcService.generateClientID(), + Role: role, + Connection: conn, + } + + // 添加到房间 + room.mu.Lock() + if role == "sender" { + // 关闭旧的 sender 连接 + if room.Sender != nil { + room.Sender.Connection.Close() + } + room.Sender = client + } else { + // 关闭旧的 receiver 连接 + if room.Receiver != nil { + room.Receiver.Connection.Close() + } + room.Receiver = client + } + + // 检查对方是否已连接,通知双方 relay 已就绪 + peerConnected := false + if role == "sender" && room.Receiver != nil { + peerConnected = true + } else if role == "receiver" && room.Sender != nil { + peerConnected = true + } + room.mu.Unlock() + + log.Printf("[Relay] 客户端加入中继房间: ID=%s, Role=%s, Room=%s, 对方是否在线=%v", client.ID, role, code, peerConnected) + + // 通知自己已就绪 + conn.WriteJSON(map[string]interface{}{ + "type": "relay-ready", + "role": role, + "peer_connected": peerConnected, + }) + + // 如果对方已连接,通知对方 + if peerConnected { + room.mu.Lock() + var peer *RelayClient + if role == "sender" { + peer = room.Receiver + } else { + peer = room.Sender + } + room.mu.Unlock() + + if peer != nil { + peer.mu.Lock() + peer.Connection.WriteJSON(map[string]interface{}{ + "type": "relay-peer-joined", + "peer_role": role, + }) + peer.mu.Unlock() + } + } + + // 连接关闭时清理 + defer func() { + room.mu.Lock() + if role == "sender" && room.Sender != nil && room.Sender.ID == client.ID { + room.Sender = nil + } else if role == "receiver" && room.Receiver != nil && room.Receiver.ID == client.ID { + room.Receiver = nil + } + + // 通知对方断开 + var peer *RelayClient + if role == "sender" { + peer = room.Receiver + } else { + peer = room.Sender + } + + // 如果房间空了,清理 + isEmpty := room.Sender == nil && room.Receiver == nil + room.mu.Unlock() + + if peer != nil { + peer.mu.Lock() + peer.Connection.WriteJSON(map[string]interface{}{ + "type": "relay-peer-left", + "peer_role": role, + }) + peer.mu.Unlock() + } + + if isEmpty { + rs.roomsMux.Lock() + delete(rs.rooms, code) + rs.roomsMux.Unlock() + log.Printf("[Relay] 清理空的中继房间: %s", code) + } + + log.Printf("[Relay] 客户端断开中继: ID=%s, Room=%s", client.ID, code) + }() + + // 消息转发循环 - 带统计日志 + var textMsgCount, binaryMsgCount int64 + var totalTextBytes, totalBinaryBytes int64 + startTime := time.Now() + lastLogTime := startTime + + log.Printf("[Relay] ▶ 开始消息转发: Room=%s, Role=%s", code, role) + + for { + msgType, data, err := conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { + log.Printf("[Relay] 读取消息错误: Room=%s, Role=%s, err=%v", code, role, err) + } + break + } + + dataLen := int64(len(data)) + + // 统计消息类型 + if msgType == websocket.TextMessage { + textMsgCount++ + totalTextBytes += dataLen + + // 解析文本消息类型用于日志 + var peek struct { + Type string `json:"type"` + Channel string `json:"channel"` + } + if json.Unmarshal(data, &peek) == nil { + log.Printf("[Relay] 📨 转发文本消息: Room=%s, %s→%s, type=%s, channel=%s, size=%d bytes", + code, role, peerRole(role), peek.Type, peek.Channel, dataLen) + } else { + log.Printf("[Relay] 📨 转发文本消息: Room=%s, %s→%s, size=%d bytes", + code, role, peerRole(role), dataLen) + } + } else if msgType == websocket.BinaryMessage { + binaryMsgCount++ + totalBinaryBytes += dataLen + + // 二进制消息只在每 10 个包或每 5 秒输出一次摘要,避免日志过多 + if binaryMsgCount%10 == 1 || time.Since(lastLogTime) > 5*time.Second { + log.Printf("[Relay] 📦 转发二进制数据: Room=%s, %s→%s, size=%d bytes (累计: %d 包, %s)", + code, role, peerRole(role), dataLen, binaryMsgCount, formatBytes(totalBinaryBytes)) + lastLogTime = time.Now() + } + } + + // 获取对方客户端 + room.mu.Lock() + var peer *RelayClient + if role == "sender" { + peer = room.Receiver + } else { + peer = room.Sender + } + room.mu.Unlock() + + if peer == nil { + log.Printf("[Relay] ⚠ 对方不在线,丢弃消息: Room=%s, Role=%s, size=%d bytes", code, role, dataLen) + continue + } + + // 直接转发消息(文本或二进制) + peer.mu.Lock() + err = peer.Connection.WriteMessage(msgType, data) + peer.mu.Unlock() + + if err != nil { + log.Printf("[Relay] ❌ 转发消息失败: Room=%s, %s→%s, err=%v", code, role, peerRole(role), err) + break + } + } + + elapsed := time.Since(startTime) + log.Printf("[Relay] ■ 消息转发结束: Room=%s, Role=%s, 持续=%v, 文本消息=%d(%s), 二进制消息=%d(%s)", + code, role, elapsed.Round(time.Second), + textMsgCount, formatBytes(totalTextBytes), + binaryMsgCount, formatBytes(totalBinaryBytes)) +} + +// peerRole 返回对方角色名 +func peerRole(role string) string { + if role == "sender" { + return "receiver" + } + return "sender" +} + +// formatBytes 人性化格式化字节数 +func formatBytes(b int64) string { + const ( + KB = 1024 + MB = 1024 * KB + GB = 1024 * MB + ) + switch { + case b >= GB: + return fmt.Sprintf("%.2f GB", float64(b)/float64(GB)) + case b >= MB: + return fmt.Sprintf("%.2f MB", float64(b)/float64(MB)) + case b >= KB: + return fmt.Sprintf("%.2f KB", float64(b)/float64(KB)) + default: + return fmt.Sprintf("%d B", b) + } +}