From 6d02a9898f3801c2552442ee58f180f8dd31fc3c Mon Sep 17 00:00:00 2001 From: MatrixSeven Date: Thu, 5 Mar 2026 13:28:29 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=87=8D=E6=9E=84=20WebRTC=20=E8=BD=A8?= =?UTF-8?q?=E9=81=93=E7=AE=A1=E7=90=86=EF=BC=8C=E6=94=AF=E6=8C=81=E5=A4=9A?= =?UTF-8?q?=E7=9B=91=E5=90=AC=E5=99=A8=E6=B3=A8=E5=86=8C=EF=BC=8C=E4=BC=98?= =?UTF-8?q?=E5=8C=96=20SDP=20Offer=20=E5=88=9B=E5=BB=BA=E6=B5=81=E7=A8=8B?= =?UTF-8?q?=EF=BC=9B=E6=9B=B4=E6=96=B0=E6=A1=8C=E9=9D=A2=E5=85=B1=E4=BA=AB?= =?UTF-8?q?=E5=92=8C=E8=AF=AD=E9=9F=B3=E9=80=9A=E8=AF=9D=E4=B8=9A=E5=8A=A1?= =?UTF-8?q?=E9=80=BB=E8=BE=91=EF=BC=8C=E5=A2=9E=E5=BC=BA=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- chuan-next/src/components/VoiceChatPanel.tsx | 2 +- chuan-next/src/hooks/connection/index.ts | 5 +- chuan-next/src/hooks/connection/types.ts | 74 ++ .../connection/useSharedWebRTCManager.ts | 119 +--- .../connection/useWebRTCConnectionCore.ts | 53 +- .../connection/useWebRTCDataChannelManager.ts | 647 +++++++----------- .../hooks/connection/useWebRTCTrackManager.ts | 179 ++--- .../desktop-share/useDesktopShareBusiness.ts | 33 +- .../desktop-share/useVoiceChatBusiness.ts | 7 +- .../file-transfer/useFileTransferBusiness.ts | 43 +- .../text-transfer/useTextTransferBusiness.ts | 118 +--- 11 files changed, 545 insertions(+), 735 deletions(-) create mode 100644 chuan-next/src/hooks/connection/types.ts diff --git a/chuan-next/src/components/VoiceChatPanel.tsx b/chuan-next/src/components/VoiceChatPanel.tsx index a81e6a2..adf54cc 100644 --- a/chuan-next/src/components/VoiceChatPanel.tsx +++ b/chuan-next/src/components/VoiceChatPanel.tsx @@ -5,7 +5,7 @@ import { Mic, MicOff, PhoneCall, PhoneOff } from 'lucide-react'; import { Button } from '@/components/ui/button'; import { VoiceIndicator } from '@/components/VoiceIndicator'; import { useVoiceChatBusiness } from '@/hooks/desktop-share'; -import type { WebRTCConnection } from '@/hooks/connection/useSharedWebRTCManager'; +import type { WebRTCConnection } from '@/hooks/connection/types'; interface VoiceChatPanelProps { connection: WebRTCConnection; diff --git a/chuan-next/src/hooks/connection/index.ts b/chuan-next/src/hooks/connection/index.ts index 4d7127f..cf53162 100644 --- a/chuan-next/src/hooks/connection/index.ts +++ b/chuan-next/src/hooks/connection/index.ts @@ -1,4 +1,7 @@ -// 连接相关的hooks +// 连接相关的 hooks export { useRoomConnection } from './useRoomConnection'; export { useSharedWebRTCManager } from './useSharedWebRTCManager'; export { useWebRTCSupport } from './useWebRTCSupport'; + +// 类型导出 +export type { WebRTCConnection, WebRTCMessage, MessageHandler, DataHandler, TrackHandler, Unsubscribe } from './types'; diff --git a/chuan-next/src/hooks/connection/types.ts b/chuan-next/src/hooks/connection/types.ts new file mode 100644 index 0000000..9c1b69b --- /dev/null +++ b/chuan-next/src/hooks/connection/types.ts @@ -0,0 +1,74 @@ +import type { TransportMode } from '../ui/webRTCStore'; + +// ──────────────────────────────────────────── +// 基础类型 +// ──────────────────────────────────────────── + +/** 清理函数 / 取消订阅 */ +export type Unsubscribe = () => void; + +/** WebRTC JSON 消息 */ +export interface WebRTCMessage { + type: string; + payload: any; + channel?: string; +} + +// ──────────────────────────────────────────── +// 处理器类型 +// ──────────────────────────────────────────── + +/** JSON 消息处理器 */ +export type MessageHandler = (message: WebRTCMessage) => void; + +/** 二进制数据处理器 */ +export type DataHandler = (data: ArrayBuffer) => void; + +/** 媒体轨道事件处理器 */ +export type TrackHandler = (event: RTCTrackEvent) => void; + +// ──────────────────────────────────────────── +// WebRTC 连接统一接口 +// 所有业务模块(文件传输 / 文本传输 / 桌面共享 / 语音通话) +// 都通过此接口与底层通信,不直接依赖具体实现 +// ──────────────────────────────────────────── + +export interface WebRTCConnection { + // ── 只读状态 ── + readonly isConnected: boolean; + readonly isConnecting: boolean; + readonly isWebSocketConnected: boolean; + readonly isPeerConnected: boolean; + readonly error: string | null; + readonly canRetry: boolean; + readonly transportMode: TransportMode; + readonly currentRoom: { code: string; role: 'sender' | 'receiver' } | null; + + // ── 连接管理 ── + connect(roomCode: string, role: 'sender' | 'receiver'): Promise; + disconnect(): void; + retry(): Promise; + isConnectedToRoom(roomCode: string, role: 'sender' | 'receiver'): boolean; + + // ── 数据通道(DataChannel / Relay 透明切换)── + sendMessage(message: WebRTCMessage, channel?: string): boolean; + sendData(data: ArrayBuffer): boolean; + registerMessageHandler(channel: string, handler: MessageHandler): Unsubscribe; + registerDataHandler(channel: string, handler: DataHandler): Unsubscribe; + getChannelState(): RTCDataChannelState; + getBufferedAmount(): number; + waitForBufferDrain(threshold?: number): Promise; + + // ── 媒体轨道(支持多监听器) ── + addTrack(track: MediaStreamTrack, stream: MediaStream): RTCRtpSender | null; + removeTrack(sender: RTCRtpSender): void; + /** + * 注册轨道事件处理器 + * - 支持多个消费者同时注册(桌面共享 + 语音通话) + * - 返回清理函数,组件卸载时调用 + * @param key 唯一标识符,如 'desktop-share' / 'voice-chat' + */ + registerTrackHandler(key: string, handler: TrackHandler): Unsubscribe; + getPeerConnection(): RTCPeerConnection | null; + createOfferNow(): Promise; +} diff --git a/chuan-next/src/hooks/connection/useSharedWebRTCManager.ts b/chuan-next/src/hooks/connection/useSharedWebRTCManager.ts index 4b91853..26dbfbd 100644 --- a/chuan-next/src/hooks/connection/useSharedWebRTCManager.ts +++ b/chuan-next/src/hooks/connection/useSharedWebRTCManager.ts @@ -1,60 +1,23 @@ import { useCallback, useMemo } from 'react'; -import { useWebRTCStore, type WebRTCStateManager, type TransportMode } from '../ui/webRTCStore'; -import { useWebRTCDataChannelManager, WebRTCMessage } from './useWebRTCDataChannelManager'; +import { useWebRTCStore, type WebRTCStateManager } from '../ui/webRTCStore'; +import { useWebRTCDataChannelManager } from './useWebRTCDataChannelManager'; import { useWebRTCTrackManager } from './useWebRTCTrackManager'; import { useWebRTCConnectionCore } from './useWebRTCConnectionCore'; +import type { WebRTCConnection, TrackHandler, Unsubscribe } from './types'; -// 消息和数据处理器类型 -export type MessageHandler = (message: WebRTCMessage) => void; -export type DataHandler = (data: ArrayBuffer) => void; - -// WebRTC 连接接口 -export interface WebRTCConnection { - // 状态 - isConnected: boolean; - isConnecting: boolean; - isWebSocketConnected: boolean; - isPeerConnected: boolean; - error: string | null; - canRetry: boolean; - // 传输模式 - transportMode: TransportMode; - - // 操作方法 - connect: (roomCode: string, role: 'sender' | 'receiver') => Promise; - disconnect: () => void; - retry: () => Promise; - sendMessage: (message: WebRTCMessage, channel?: string) => boolean; - sendData: (data: ArrayBuffer) => boolean; - - // 处理器注册 - registerMessageHandler: (channel: string, handler: MessageHandler) => () => void; - registerDataHandler: (channel: string, handler: DataHandler) => () => void; - - // 工具方法 - getChannelState: () => RTCDataChannelState; - getBufferedAmount: () => number; - waitForBufferDrain: (threshold?: number) => Promise; - isConnectedToRoom: (roomCode: string, role: 'sender' | 'receiver') => boolean; - - // 当前房间信息 - currentRoom: { code: string; role: 'sender' | 'receiver' } | null; - - // 媒体轨道方法 - addTrack: (track: MediaStreamTrack, stream: MediaStream) => RTCRtpSender | null; - removeTrack: (sender: RTCRtpSender) => void; - onTrack: (callback: (event: RTCTrackEvent) => void) => void; - getPeerConnection: () => RTCPeerConnection | null; - createOfferNow: () => Promise; -} +// Re-export the canonical interface for consumers +export type { WebRTCConnection }; /** * 共享 WebRTC 连接管理器 - * 创建单一的 WebRTC 连接实例,供多个业务模块共享使用 - * 整合所有模块,提供统一的接口 + * + * 职责: + * 1. 整合 ConnectionCore + DataChannelManager + TrackManager + * 2. 暴露统一的 WebRTCConnection 接口给所有业务模块 + * 3. 确保单一 PeerConnection 实例被多个功能共享 */ export function useSharedWebRTCManager(): WebRTCConnection { - // 直接从 zustand store 创建状态管理器 + // 从 Zustand store 创建状态管理器 const store = useWebRTCStore(); const stateManager: WebRTCStateManager = useMemo(() => ({ getState: () => ({ @@ -81,21 +44,10 @@ export function useSharedWebRTCManager(): WebRTCConnection { const connectionCore = useWebRTCConnectionCore( stateManager, dataChannelManager, - trackManager + trackManager, ); - // 从 store 获取当前状态 - const state = { - isConnected: store.isConnected, - isConnecting: store.isConnecting, - isWebSocketConnected: store.isWebSocketConnected, - isPeerConnected: store.isPeerConnected, - error: store.error, - canRetry: store.canRetry, - transportMode: store.transportMode, - }; - - // 创建 createOfferNow 方法 + // ── createOfferNow 桥接 ── const createOfferNow = useCallback(async () => { const pc = connectionCore.getPeerConnection(); const ws = connectionCore.getWebSocket(); @@ -103,51 +55,50 @@ export function useSharedWebRTCManager(): WebRTCConnection { console.error('[SharedWebRTC] PeerConnection 或 WebSocket 不可用'); return false; } - try { return await trackManager.createOfferNow(pc, ws); } catch (error) { - console.error('[SharedWebRTC] 创建 offer 失败:', error); + console.error('[SharedWebRTC] 创建 Offer 失败:', error); return false; } }, [connectionCore, trackManager]); - // 返回统一的接口,保持与当前 API 一致 - return { - // 状态 - isConnected: state.isConnected, - isConnecting: state.isConnecting, - isWebSocketConnected: state.isWebSocketConnected, - isPeerConnected: state.isPeerConnected, - error: state.error, - canRetry: state.canRetry, - transportMode: state.transportMode, + // ── registerTrackHandler 桥接 ── + const registerTrackHandler = useCallback((key: string, handler: TrackHandler): Unsubscribe => { + return trackManager.registerTrackHandler(key, handler); + }, [trackManager]); - // 操作方法 + return { + // 只读状态 + isConnected: store.isConnected, + isConnecting: store.isConnecting, + isWebSocketConnected: store.isWebSocketConnected, + isPeerConnected: store.isPeerConnected, + error: store.error, + canRetry: store.canRetry, + transportMode: store.transportMode, + currentRoom: connectionCore.getCurrentRoom(), + + // 连接管理 connect: connectionCore.connect, disconnect: () => connectionCore.disconnect(true), retry: connectionCore.retry, + isConnectedToRoom: stateManager.isConnectedToRoom, + + // 数据通道 sendMessage: dataChannelManager.sendMessage, sendData: dataChannelManager.sendData, - - // 处理器注册 registerMessageHandler: dataChannelManager.registerMessageHandler, registerDataHandler: dataChannelManager.registerDataHandler, - - // 工具方法 getChannelState: dataChannelManager.getChannelState, getBufferedAmount: dataChannelManager.getBufferedAmount, waitForBufferDrain: dataChannelManager.waitForBufferDrain, - isConnectedToRoom: stateManager.isConnectedToRoom, - // 媒体轨道方法 + // 媒体轨道 addTrack: trackManager.addTrack, removeTrack: trackManager.removeTrack, - onTrack: trackManager.onTrack, + registerTrackHandler, getPeerConnection: connectionCore.getPeerConnection, createOfferNow, - - // 当前房间信息 - currentRoom: connectionCore.getCurrentRoom(), }; } diff --git a/chuan-next/src/hooks/connection/useWebRTCConnectionCore.ts b/chuan-next/src/hooks/connection/useWebRTCConnectionCore.ts index 96d9221..a47046b 100644 --- a/chuan-next/src/hooks/connection/useWebRTCConnectionCore.ts +++ b/chuan-next/src/hooks/connection/useWebRTCConnectionCore.ts @@ -53,8 +53,11 @@ export function useWebRTCConnectionCore( const relayWsRef = useRef(null); const isRelayFallbackInProgress = useRef(false); const p2pFailureTimeout = useRef(null); + const iceDisconnectedTimeout = useRef(null); // 标记是否已经发送过 relay-request(避免重复发送) const relayRequestSent = useRef(false); + // 通过 ref 避免闭包捕获过时的 initiateRelayFallback + const initiateRelayFallbackRef = useRef<() => void>(() => {}); // 清理连接 const cleanup = useCallback((shouldNotifyDisconnect: boolean = false) => { @@ -70,6 +73,11 @@ export function useWebRTCConnectionCore( p2pFailureTimeout.current = null; } + if (iceDisconnectedTimeout.current) { + clearTimeout(iceDisconnectedTimeout.current); + iceDisconnectedTimeout.current = null; + } + if (pcRef.current) { pcRef.current.close(); pcRef.current = null; @@ -235,6 +243,7 @@ export function useWebRTCConnectionCore( relayWs.onerror = (error) => { console.error('[ConnectionCore] ❌ 中继 WebSocket 错误:', error); isRelayFallbackInProgress.current = false; + relayRequestSent.current = false; // 重置以允许重试 stateManager.updateState({ error: 'WS 中继连接失败,请重试', isConnecting: false, @@ -263,6 +272,7 @@ export function useWebRTCConnectionCore( } catch (error) { console.error('[ConnectionCore] 创建中继连接失败:', error); isRelayFallbackInProgress.current = false; + relayRequestSent.current = false; // 重置以允许重试 stateManager.updateState({ error: '无法建立中继连接,请重试', isConnecting: false, @@ -306,6 +316,9 @@ export function useWebRTCConnectionCore( connectToRelay(); }, [connectToRelay]); + // 保持 ref 同步,避免闭包过时 + initiateRelayFallbackRef.current = initiateRelayFallback; + // 创建 PeerConnection 和相关设置 const createPeerConnection = useCallback((ws: WebSocket, role: 'sender' | 'receiver', isReconnect: boolean = false) => { console.log('[ConnectionCore] 🔧 创建PeerConnection...', { role, isReconnect }); @@ -359,18 +372,34 @@ export function useWebRTCConnectionCore( case 'connected': case 'completed': console.log('[ConnectionCore] ✅ ICE连接成功'); - // ICE 连接成功,清除降级定时器 + // ICE 连接成功,清除所有降级定时器 if (p2pFailureTimeout.current) { clearTimeout(p2pFailureTimeout.current); p2pFailureTimeout.current = null; } + if (iceDisconnectedTimeout.current) { + clearTimeout(iceDisconnectedTimeout.current); + iceDisconnectedTimeout.current = null; + } break; case 'failed': console.error('[ConnectionCore] ❌ ICE连接失败,启动中继降级'); - initiateRelayFallback(); + initiateRelayFallbackRef.current(); break; case 'disconnected': - console.log('[ConnectionCore] 🔌 ICE连接断开'); + console.log('[ConnectionCore] 🔌 ICE连接断开,设置 8 秒超时保护'); + // ICE disconnected 可能是暂时的(网络抖动),也可能永不恢复 + // 设置超时保护:8 秒后如果仍非 connected/completed 则降级 + if (iceDisconnectedTimeout.current) { + clearTimeout(iceDisconnectedTimeout.current); + } + iceDisconnectedTimeout.current = setTimeout(() => { + const iceState = pcRef.current?.iceConnectionState; + if (iceState && iceState !== 'connected' && iceState !== 'completed') { + console.log('[ConnectionCore] ⏰ ICE disconnected 超时(8秒),启动中继降级'); + initiateRelayFallbackRef.current(); + } + }, 8000); break; case 'closed': console.log('[ConnectionCore] 🚫 ICE连接已关闭'); @@ -385,24 +414,28 @@ export function useWebRTCConnectionCore( console.log('[ConnectionCore] 🔄 WebRTC正在连接中...'); stateManager.updateState({ isPeerConnected: false }); - // 设置 P2P 连接超时:15 秒后如果还没连上就降级 + // 设置 P2P 连接超时:10 秒后如果还没连上就降级 if (p2pFailureTimeout.current) { clearTimeout(p2pFailureTimeout.current); } p2pFailureTimeout.current = setTimeout(() => { if (pcRef.current && pcRef.current.connectionState !== 'connected') { - console.log('[ConnectionCore] ⏰ P2P 连接超时(15秒),启动中继降级'); - initiateRelayFallback(); + console.log('[ConnectionCore] ⏰ P2P 连接超时(10秒),启动中继降级'); + initiateRelayFallbackRef.current(); } - }, 15000); + }, 10000); break; case 'connected': console.log('[ConnectionCore] 🎉 WebRTC P2P连接已完全建立,可以进行媒体传输'); - // 清除降级定时器 + // 清除所有降级定时器 if (p2pFailureTimeout.current) { clearTimeout(p2pFailureTimeout.current); p2pFailureTimeout.current = null; } + if (iceDisconnectedTimeout.current) { + clearTimeout(iceDisconnectedTimeout.current); + iceDisconnectedTimeout.current = null; + } // 确保所有连接状态都正确更新 stateManager.updateState({ isWebSocketConnected: true, @@ -434,7 +467,7 @@ export function useWebRTCConnectionCore( if (!isUserDisconnecting.current) { console.log('[ConnectionCore] 启动中继降级'); stateManager.updateState({ isPeerConnected: false }); - initiateRelayFallback(); + initiateRelayFallbackRef.current(); } break; case 'disconnected': @@ -457,7 +490,7 @@ export function useWebRTCConnectionCore( console.log('[ConnectionCore] ✅ PeerConnection创建完成,角色:', role, '是否重新连接:', isReconnect); return pc; - }, [stateManager, dataChannelManager, initiateRelayFallback]); + }, [stateManager, dataChannelManager]); // 连接到房间 const connect = useCallback(async (roomCode: string, role: 'sender' | 'receiver') => { diff --git a/chuan-next/src/hooks/connection/useWebRTCDataChannelManager.ts b/chuan-next/src/hooks/connection/useWebRTCDataChannelManager.ts index 9514bf5..573cda5 100644 --- a/chuan-next/src/hooks/connection/useWebRTCDataChannelManager.ts +++ b/chuan-next/src/hooks/connection/useWebRTCDataChannelManager.ts @@ -1,499 +1,389 @@ import { useRef, useCallback } from 'react'; import { WebRTCStateManager } from '../ui/webRTCStore'; +import type { WebRTCMessage, MessageHandler, DataHandler, Unsubscribe } from './types'; -// 消息类型 -export interface WebRTCMessage { - type: string; - payload: any; - channel?: string; -} - -// 消息和数据处理器类型 -export type MessageHandler = (message: WebRTCMessage) => void; -export type DataHandler = (data: ArrayBuffer) => void; +// Re-export types for backward compatibility +export type { WebRTCMessage, MessageHandler, DataHandler }; /** - * WebRTC 数据通道管理器 + * WebRTC 数据通道管理器接口 * 负责数据通道的创建和管理,支持 P2P DataChannel 和 WS Relay 两种传输模式 */ export interface WebRTCDataChannelManager { - // 创建数据通道 (P2P 模式) + // ── 通道生命周期 ── + + /** 创建 P2P 数据通道 */ createDataChannel: (pc: RTCPeerConnection, role: 'sender' | 'receiver', isReconnect?: boolean) => void; - - // 切换到 WS 中继模式(仅设置发送引用,不设置事件监听) + /** 切换到 WS 中继模式(仅设置发送引用)*/ switchToRelay: (relayWs: WebSocket) => void; - - // 关闭中继连接 + /** 关闭中继连接 */ closeRelay: () => void; - - // 处理中继收到的数据消息(由 ConnectionCore 的 onmessage 调用) - handleRelayMessage: (event: MessageEvent) => void; - - // 发送消息(自动选择可用通道) + + // ── 消息收发 ── + + /** 发送 JSON 消息(自动选择可用通道:P2P 优先,Relay 降级)*/ sendMessage: (message: WebRTCMessage, channel?: string) => boolean; - - // 发送二进制数据(自动选择可用通道) + /** 发送二进制数据(自动选择可用通道)*/ sendData: (data: ArrayBuffer) => boolean; - - // 注册消息处理器 - registerMessageHandler: (channel: string, handler: MessageHandler) => () => void; - - // 注册数据处理器 - registerDataHandler: (channel: string, handler: DataHandler) => () => void; - - // 获取数据通道状态(兼容 RTCDataChannelState) + /** 处理中继收到的数据(由 ConnectionCore 的 onmessage 转发)*/ + handleRelayMessage: (event: MessageEvent) => void; + + // ── 处理器注册 ── + + /** 注册 JSON 消息处理器,返回清理函数 */ + registerMessageHandler: (channel: string, handler: MessageHandler) => Unsubscribe; + /** 注册二进制数据处理器,返回清理函数 */ + registerDataHandler: (channel: string, handler: DataHandler) => Unsubscribe; + + // ── 状态查询 ── + + /** 获取通道状态(综合 P2P + Relay)*/ getChannelState: () => RTCDataChannelState; - - // 获取当前缓冲区大小 + /** 获取当前发送缓冲区大小 */ getBufferedAmount: () => number; - - // 等待缓冲区排空到指定阈值以下 + /** 等待缓冲区排空到指定阈值以下 */ waitForBufferDrain: (threshold?: number) => Promise; - - // 处理数据通道消息 (P2P) - handleDataChannelMessage: (event: MessageEvent) => void; } /** * WebRTC 数据通道管理 Hook - * 负责数据通道的创建和管理,处理数据通道消息的发送和接收 - * 支持 P2P DataChannel 和 WS Relay 两种传输模式,对上层透明 + * + * 核心职责: + * 1. P2P DataChannel 创建(sender createDataChannel / receiver ondatachannel) + * 2. Relay WebSocket 透明切换 + * 3. 统一的消息 / 二进制分发(Map) + * 4. 背压控制(bufferedAmount + waitForBufferDrain) */ export function useWebRTCDataChannelManager( stateManager: WebRTCStateManager ): WebRTCDataChannelManager { const dcRef = useRef(null); - // WS 中继通道 const relayWsRef = useRef(null); - - // 多通道消息处理器 + + // 处理器注册表 const messageHandlers = useRef>(new Map()); const dataHandlers = useRef>(new Map()); - // 判断当前是否处于中继模式 + // ──────────────────────────────────────── + // 内部工具 + // ──────────────────────────────────────── + const isRelayMode = useCallback(() => { return relayWsRef.current !== null && relayWsRef.current.readyState === WebSocket.OPEN; }, []); - // 判断 P2P 数据通道是否可用 const isP2PAvailable = useCallback(() => { return dcRef.current !== null && dcRef.current.readyState === 'open'; }, []); - // 创建数据通道 (P2P 模式) - const createDataChannel = useCallback(( - pc: RTCPeerConnection, - role: 'sender' | 'receiver', - isReconnect: boolean = false + // ── 统一消息分发 ── + + const dispatchJsonMessage = useCallback((message: WebRTCMessage) => { + if (message.channel) { + const handler = messageHandlers.current.get(message.channel); + if (handler) handler(message); + } else { + // 无 channel 标记则广播给所有处理器 + messageHandlers.current.forEach(handler => handler(message)); + } + }, []); + + const dispatchBinaryData = useCallback((data: ArrayBuffer) => { + // 优先分发给 file-transfer 处理器(最常见的二进制消费者) + const fileHandler = dataHandlers.current.get('file-transfer'); + if (fileHandler) { + fileHandler(data); + return; + } + // 降级:分发给第一个注册的处理器 + const firstHandler = dataHandlers.current.values().next().value; + if (firstHandler) { + firstHandler(data); + } + }, []); + + /** + * 统一入站消息处理(P2P 和 Relay 共用) + */ + const dispatchIncoming = useCallback((event: MessageEvent) => { + if (typeof event.data === 'string') { + try { + const message = JSON.parse(event.data) as WebRTCMessage; + dispatchJsonMessage(message); + } catch (error) { + console.error('[DataChannel] 解析消息失败:', error); + } + } else if (event.data instanceof ArrayBuffer) { + dispatchBinaryData(event.data); + } else if (event.data instanceof Blob) { + // WebSocket 某些浏览器 / 环境下返回 Blob 而非 ArrayBuffer + event.data.arrayBuffer().then((buffer: ArrayBuffer) => { + dispatchBinaryData(buffer); + }); + } + }, [dispatchJsonMessage, dispatchBinaryData]); + + // ──────────────────────────────────────── + // DataChannel 生命周期(去重 sender/receiver 共享逻辑) + // ──────────────────────────────────────── + + /** + * 数据通道打开后的统一处理 + */ + const handleChannelOpen = useCallback(( + dataChannel: RTCDataChannel, + role: string, + isReconnect: boolean ) => { - console.log('[DataChannelManager] 创建数据通道...', { role, isReconnect }); - - // 如果已经存在数据通道,先关闭它 + console.log(`[DataChannel] 数据通道已打开 (${role})`); + + // P2P 恢复时关闭中继 + if (relayWsRef.current) { + console.log('[DataChannel] P2P 恢复,关闭中继通道'); + relayWsRef.current.close(); + relayWsRef.current = null; + stateManager.updateState({ transportMode: 'p2p' }); + } + + stateManager.updateState({ + isWebSocketConnected: true, + isConnected: true, + isPeerConnected: true, + error: null, + isConnecting: false, + canRetry: false, + }); + + // 重连后请求数据同步 + if (isReconnect) { + console.log(`[DataChannel] ${role} 重连,发送数据同步请求`); + setTimeout(() => { + if (dataChannel.readyState === 'open') { + dataChannel.send(JSON.stringify({ + type: 'sync-request', + payload: { timestamp: Date.now() }, + })); + } + }, 300); + } + }, [stateManager]); + + /** + * 数据通道错误的统一处理 + */ + const handleChannelError = useCallback(( + dataChannel: RTCDataChannel, + pc: RTCPeerConnection | null, + role: string + ) => { + let errorMessage = '数据通道连接失败'; + let shouldRetry = false; + + switch (dataChannel.readyState) { + case 'connecting': + errorMessage = '数据通道正在连接中,请稍候...'; + shouldRetry = true; + break; + case 'closing': + errorMessage = '数据通道正在关闭,连接即将断开'; + break; + case 'closed': + errorMessage = '数据通道已关闭,P2P 连接失败'; + shouldRetry = true; + break; + default: + if (pc) { + switch (pc.connectionState) { + case 'failed': + errorMessage = 'P2P 连接失败,可能是网络防火墙阻止了连接'; + shouldRetry = true; + break; + case 'disconnected': + errorMessage = 'P2P 连接已断开,网络可能不稳定'; + shouldRetry = true; + break; + default: + errorMessage = '数据通道连接失败,可能是网络环境受限'; + shouldRetry = true; + } + } + } + + console.error(`[DataChannel] 错误 (${role}) - 状态: ${dataChannel.readyState}, ${errorMessage}`); + + // 已在中继模式则不更新错误状态 + if (!isRelayMode()) { + stateManager.updateState({ + error: errorMessage, + isConnecting: false, + isPeerConnected: false, + canRetry: shouldRetry, + }); + } + }, [stateManager, isRelayMode]); + + /** + * 为 DataChannel 绑定事件处理器(sender / receiver 共用) + */ + const setupChannelHandlers = useCallback(( + dataChannel: RTCDataChannel, + pc: RTCPeerConnection, + role: string, + isReconnect: boolean + ) => { + dataChannel.onopen = () => handleChannelOpen(dataChannel, role, isReconnect); + dataChannel.onmessage = dispatchIncoming; + dataChannel.onerror = () => handleChannelError(dataChannel, pc, role); + }, [handleChannelOpen, dispatchIncoming, handleChannelError]); + + // ── 创建数据通道 ── + + const createDataChannel = useCallback(( + pc: RTCPeerConnection, + role: 'sender' | 'receiver', + isReconnect: boolean = false, + ) => { + console.log('[DataChannel] 创建数据通道...', { role, isReconnect }); + + // 关闭已有通道 if (dcRef.current) { - console.log('[DataChannelManager] 关闭已存在的数据通道'); + console.log('[DataChannel] 关闭已存在的数据通道'); dcRef.current.close(); dcRef.current = null; } - // 数据通道处理 if (role === 'sender') { - const dataChannel = pc.createDataChannel('shared-channel', { - ordered: true - }); - dcRef.current = dataChannel; - - dataChannel.onopen = () => { - console.log('[DataChannelManager] 数据通道已打开 (发送方)'); - // 如果之前在中继模式,切回 P2P - if (relayWsRef.current) { - console.log('[DataChannelManager] P2P 恢复,关闭中继通道'); - relayWsRef.current.close(); - relayWsRef.current = null; - stateManager.updateState({ transportMode: 'p2p' }); - } - // 确保所有连接状态都正确更新 - stateManager.updateState({ - isWebSocketConnected: true, - isConnected: true, - isPeerConnected: true, - error: null, - isConnecting: false, - canRetry: false - }); - - // 如果是重新连接,触发数据同步 - if (isReconnect) { - console.log('[DataChannelManager] 发送方重新连接,数据通道已打开,准备同步数据'); - setTimeout(() => { - if (dataChannel.readyState === 'open') { - dataChannel.send(JSON.stringify({ - type: 'sync-request', - payload: { timestamp: Date.now() } - })); - console.log('[DataChannelManager] 发送方发送数据同步请求'); - } - }, 300); - } - }; - - - dataChannel.onmessage = handleDataChannelMessage; - - dataChannel.onerror = (error) => { - console.error('[DataChannelManager] 数据通道错误:', error); - - let errorMessage = '数据通道连接失败'; - let shouldRetry = false; - - switch (dataChannel.readyState) { - case 'connecting': - errorMessage = '数据通道正在连接中,请稍候...'; - shouldRetry = true; - break; - case 'closing': - errorMessage = '数据通道正在关闭,连接即将断开'; - break; - case 'closed': - errorMessage = '数据通道已关闭,P2P连接失败'; - shouldRetry = true; - break; - default: - if (pc) { - switch (pc.connectionState) { - case 'failed': - errorMessage = 'P2P连接失败,可能是网络防火墙阻止了连接,请尝试切换网络或使用VPN'; - shouldRetry = true; - break; - case 'disconnected': - errorMessage = 'P2P连接已断开,网络可能不稳定'; - shouldRetry = true; - break; - default: - errorMessage = '数据通道连接失败,可能是网络环境受限'; - shouldRetry = true; - } - } - } - - console.error(`[DataChannelManager] 数据通道详细错误 - 状态: ${dataChannel.readyState}, 消息: ${errorMessage}, 建议重试: ${shouldRetry}`); - - // 如果已经在中继模式,不更新错误状态 - if (!isRelayMode()) { - stateManager.updateState({ - error: errorMessage, - isConnecting: false, - isPeerConnected: false, - canRetry: shouldRetry - }); - } - }; + const dc = pc.createDataChannel('shared-channel', { ordered: true }); + dcRef.current = dc; + setupChannelHandlers(dc, pc, '发送方', isReconnect); } else { pc.ondatachannel = (event) => { - const dataChannel = event.channel; - dcRef.current = dataChannel; - - dataChannel.onopen = () => { - console.log('[DataChannelManager] 数据通道已打开 (接收方)'); - // 如果之前在中继模式,切回 P2P - if (relayWsRef.current) { - console.log('[DataChannelManager] P2P 恢复,关闭中继通道'); - relayWsRef.current.close(); - relayWsRef.current = null; - stateManager.updateState({ transportMode: 'p2p' }); - } - stateManager.updateState({ - isWebSocketConnected: true, - isConnected: true, - isPeerConnected: true, - error: null, - isConnecting: false, - canRetry: false - }); - - if (isReconnect) { - console.log('[DataChannelManager] 接收方重新连接,数据通道已打开,准备同步数据'); - setTimeout(() => { - if (dataChannel.readyState === 'open') { - dataChannel.send(JSON.stringify({ - type: 'sync-request', - payload: { timestamp: Date.now() } - })); - console.log('[DataChannelManager] 接收方发送数据同步请求'); - } - }, 300); - } - }; - - dataChannel.onmessage = handleDataChannelMessage; - - dataChannel.onerror = (error) => { - console.error('[DataChannelManager] 数据通道错误 (接收方):', error); - - let errorMessage = '数据通道连接失败'; - let shouldRetry = false; - - switch (dataChannel.readyState) { - case 'connecting': - errorMessage = '数据通道正在连接中,请稍候...'; - shouldRetry = true; - break; - case 'closing': - errorMessage = '数据通道正在关闭,连接即将断开'; - break; - case 'closed': - errorMessage = '数据通道已关闭,P2P连接失败'; - shouldRetry = true; - break; - default: - if (pc) { - switch (pc.connectionState) { - case 'failed': - errorMessage = 'P2P连接失败,可能是网络防火墙阻止了连接,请尝试切换网络或使用VPN'; - shouldRetry = true; - break; - case 'disconnected': - errorMessage = 'P2P连接已断开,网络可能不稳定'; - shouldRetry = true; - break; - default: - errorMessage = '数据通道连接失败,可能是网络环境受限'; - shouldRetry = true; - } - } - } - - console.error(`[DataChannelManager] 数据通道详细错误 (接收方) - 状态: ${dataChannel.readyState}, 消息: ${errorMessage}, 建议重试: ${shouldRetry}`); - - // 如果已经在中继模式,不更新错误状态 - if (!isRelayMode()) { - stateManager.updateState({ - error: errorMessage, - isConnecting: false, - isPeerConnected: false, - canRetry: shouldRetry - }); - } - }; + const dc = event.channel; + dcRef.current = dc; + setupChannelHandlers(dc, pc, '接收方', isReconnect); }; } - console.log('[DataChannelManager] 数据通道创建完成,角色:', role, '是否重新连接:', isReconnect); - }, [stateManager]); + console.log('[DataChannel] 数据通道创建完成,角色:', role); + }, [setupChannelHandlers]); + + // ── Relay 管理 ── - // 切换到 WS 中继模式 - 仅设置发送引用 - // 事件监听由 ConnectionCore 的 initiateRelayFallback 统一管理 const switchToRelay = useCallback((relayWs: WebSocket) => { - console.log('[DataChannelManager] 🔄 切换到 WS 中继模式(设置发送引用)'); + console.log('[DataChannel] 🔄 切换到 WS 中继模式'); relayWsRef.current = relayWs; }, []); - // 关闭中继连接 const closeRelay = useCallback(() => { if (relayWsRef.current) { - console.log('[DataChannelManager] 关闭中继连接'); + console.log('[DataChannel] 关闭中继连接'); relayWsRef.current.close(); relayWsRef.current = null; } }, []); - // 处理中继收到的数据消息(由 ConnectionCore 分发调用) const handleRelayMessage = useCallback((event: MessageEvent) => { - if (typeof event.data === 'string') { - try { - const message = JSON.parse(event.data) as WebRTCMessage; - console.log('[DataChannelManager:Relay] 收到中继消息:', message.type, message.channel || 'default'); + dispatchIncoming(event); + }, [dispatchIncoming]); - if (message.channel) { - const handler = messageHandlers.current.get(message.channel); - if (handler) { - handler(message); - } - } else { - messageHandlers.current.forEach(handler => handler(message)); - } - } catch (error) { - console.error('[DataChannelManager:Relay] 解析中继消息失败:', error); - } - } else if (event.data instanceof ArrayBuffer) { - console.log('[DataChannelManager:Relay] 收到中继二进制数据:', event.data.byteLength, 'bytes'); - const fileHandler = dataHandlers.current.get('file-transfer'); - if (fileHandler) { - fileHandler(event.data); - } else { - const firstHandler = dataHandlers.current.values().next().value; - if (firstHandler) { - firstHandler(event.data); - } - } - } else if (event.data instanceof Blob) { - // WebSocket 某些情况下收到 Blob - event.data.arrayBuffer().then((buffer: ArrayBuffer) => { - console.log('[DataChannelManager:Relay] 收到中继二进制数据(Blob):', buffer.byteLength, 'bytes'); - const fileHandler = dataHandlers.current.get('file-transfer'); - if (fileHandler) { - fileHandler(buffer); - } else { - const firstHandler = dataHandlers.current.values().next().value; - if (firstHandler) { - firstHandler(buffer); - } - } - }); - } - }, []); + // ──────────────────────────────────────── + // 消息发送(P2P 优先,Relay 降级) + // ──────────────────────────────────────── - // 处理数据通道消息 (P2P 模式) - const handleDataChannelMessage = useCallback((event: MessageEvent) => { - if (typeof event.data === 'string') { - try { - const message = JSON.parse(event.data) as WebRTCMessage; - console.log('[DataChannelManager] 收到消息:', message.type, message.channel || 'default'); - - if (message.channel) { - const handler = messageHandlers.current.get(message.channel); - if (handler) { - handler(message); - } - } else { - messageHandlers.current.forEach(handler => handler(message)); - } - } catch (error) { - console.error('[DataChannelManager] 解析消息失败:', error); - } - } else if (event.data instanceof ArrayBuffer) { - console.log('[DataChannelManager] 收到数据:', event.data.byteLength, 'bytes'); - - const fileHandler = dataHandlers.current.get('file-transfer'); - if (fileHandler) { - fileHandler(event.data); - } else { - const firstHandler = dataHandlers.current.values().next().value; - if (firstHandler) { - firstHandler(event.data); - } - } - } - }, []); - - // 发送消息 - 自动选择可用通道(P2P 优先,否则用中继) const sendMessage = useCallback((message: WebRTCMessage, channel?: string) => { const messageWithChannel = channel ? { ...message, channel } : message; const jsonStr = JSON.stringify(messageWithChannel); - // 优先使用 P2P DataChannel if (isP2PAvailable()) { try { dcRef.current!.send(jsonStr); - console.log('[DataChannelManager:P2P] 发送消息:', message.type, channel || 'default'); return true; } catch (error) { - console.error('[DataChannelManager:P2P] 发送消息失败:', error); - // P2P 发送失败,尝试中继 + console.error('[DataChannel:P2P] 发送消息失败:', error); } } - // 回退到 WS 中继 if (isRelayMode()) { try { relayWsRef.current!.send(jsonStr); - console.log('[DataChannelManager:Relay] 发送消息:', message.type, channel || 'default'); return true; } catch (error) { - console.error('[DataChannelManager:Relay] 发送消息失败:', error); + console.error('[DataChannel:Relay] 发送消息失败:', error); return false; } } - console.error('[DataChannelManager] 没有可用的传输通道'); + console.error('[DataChannel] 没有可用的传输通道'); return false; }, [isP2PAvailable, isRelayMode]); - // 发送二进制数据 - 自动选择可用通道 const sendData = useCallback((data: ArrayBuffer) => { - // 优先使用 P2P DataChannel if (isP2PAvailable()) { try { dcRef.current!.send(data); - console.log('[DataChannelManager:P2P] 发送数据:', data.byteLength, 'bytes'); return true; } catch (error) { - console.error('[DataChannelManager:P2P] 发送数据失败:', error); + console.error('[DataChannel:P2P] 发送数据失败:', error); } } - // 回退到 WS 中继 if (isRelayMode()) { try { relayWsRef.current!.send(data); - console.log('[DataChannelManager:Relay] 发送数据:', data.byteLength, 'bytes'); return true; } catch (error) { - console.error('[DataChannelManager:Relay] 发送数据失败:', error); + console.error('[DataChannel:Relay] 发送数据失败:', error); return false; } } - console.error('[DataChannelManager] 没有可用的传输通道'); + console.error('[DataChannel] 没有可用的传输通道'); return false; }, [isP2PAvailable, isRelayMode]); - // 注册消息处理器 - const registerMessageHandler = useCallback((channel: string, handler: MessageHandler) => { - console.log('[DataChannelManager] 注册消息处理器:', channel); - messageHandlers.current.set(channel, handler); + // ──────────────────────────────────────── + // 处理器注册 + // ──────────────────────────────────────── + const registerMessageHandler = useCallback((channel: string, handler: MessageHandler): Unsubscribe => { + console.log('[DataChannel] 注册消息处理器:', channel); + messageHandlers.current.set(channel, handler); return () => { - console.log('[DataChannelManager] 取消注册消息处理器:', channel); + console.log('[DataChannel] 取消注册消息处理器:', channel); messageHandlers.current.delete(channel); }; }, []); - // 注册数据处理器 - const registerDataHandler = useCallback((channel: string, handler: DataHandler) => { - console.log('[DataChannelManager] 注册数据处理器:', channel); + const registerDataHandler = useCallback((channel: string, handler: DataHandler): Unsubscribe => { + console.log('[DataChannel] 注册数据处理器:', channel); dataHandlers.current.set(channel, handler); - return () => { - console.log('[DataChannelManager] 取消注册数据处理器:', channel); + console.log('[DataChannel] 取消注册数据处理器:', channel); dataHandlers.current.delete(channel); }; }, []); - // 获取数据通道状态 - 综合 P2P 和 Relay 状态 + // ──────────────────────────────────────── + // 状态查询与背压控制 + // ──────────────────────────────────────── + const getChannelState = useCallback((): RTCDataChannelState => { - // P2P 通道打开时优先返回 - if (dcRef.current?.readyState === 'open') { - return 'open'; - } - // 中继模式可用 - if (relayWsRef.current?.readyState === WebSocket.OPEN) { - return 'open'; - } - // P2P 通道正在连接 - if (dcRef.current?.readyState === 'connecting') { - return 'connecting'; - } + if (dcRef.current?.readyState === 'open') return 'open'; + if (relayWsRef.current?.readyState === WebSocket.OPEN) return 'open'; + if (dcRef.current?.readyState === 'connecting') return 'connecting'; return dcRef.current?.readyState || 'closed'; }, []); - // 获取当前缓冲区大小 const getBufferedAmount = useCallback((): number => { - if (dcRef.current && dcRef.current.readyState === 'open') { - return dcRef.current.bufferedAmount; - } - if (relayWsRef.current && relayWsRef.current.readyState === WebSocket.OPEN) { - return relayWsRef.current.bufferedAmount; - } + if (dcRef.current?.readyState === 'open') return dcRef.current.bufferedAmount; + if (relayWsRef.current?.readyState === WebSocket.OPEN) return relayWsRef.current.bufferedAmount; return 0; }, []); - // 等待缓冲区排空到阈值以下 const waitForBufferDrain = useCallback((threshold: number = 1 * 1024 * 1024): Promise => { - // P2P DataChannel - if (dcRef.current && dcRef.current.readyState === 'open') { - if (dcRef.current.bufferedAmount <= threshold) { - return Promise.resolve(); - } + // P2P DataChannel — 使用 bufferedamountlow 事件 + if (dcRef.current?.readyState === 'open') { + if (dcRef.current.bufferedAmount <= threshold) return Promise.resolve(); return new Promise((resolve) => { const dc = dcRef.current!; dc.bufferedAmountLowThreshold = threshold; @@ -502,18 +392,13 @@ export function useWebRTCDataChannelManager( resolve(); }; dc.addEventListener('bufferedamountlow', onLow); - // 安全超时,防止死等 - setTimeout(() => { - dc.removeEventListener('bufferedamountlow', onLow); - resolve(); - }, 5000); + setTimeout(() => { dc.removeEventListener('bufferedamountlow', onLow); resolve(); }, 5000); }); } - // Relay WebSocket - if (relayWsRef.current && relayWsRef.current.readyState === WebSocket.OPEN) { - if (relayWsRef.current.bufferedAmount <= threshold) { - return Promise.resolve(); - } + + // Relay WebSocket — 轮询 bufferedAmount + if (relayWsRef.current?.readyState === WebSocket.OPEN) { + if (relayWsRef.current.bufferedAmount <= threshold) return Promise.resolve(); return new Promise((resolve) => { const checkInterval = setInterval(() => { if (!relayWsRef.current || relayWsRef.current.readyState !== WebSocket.OPEN || @@ -522,13 +407,10 @@ export function useWebRTCDataChannelManager( resolve(); } }, 50); - // 安全超时 - setTimeout(() => { - clearInterval(checkInterval); - resolve(); - }, 5000); + setTimeout(() => { clearInterval(checkInterval); resolve(); }, 5000); }); } + return Promise.resolve(); }, []); @@ -544,6 +426,5 @@ export function useWebRTCDataChannelManager( getChannelState, getBufferedAmount, waitForBufferDrain, - handleDataChannelMessage, }; -} \ No newline at end of file +} diff --git a/chuan-next/src/hooks/connection/useWebRTCTrackManager.ts b/chuan-next/src/hooks/connection/useWebRTCTrackManager.ts index a9e6a33..cca82e0 100644 --- a/chuan-next/src/hooks/connection/useWebRTCTrackManager.ts +++ b/chuan-next/src/hooks/connection/useWebRTCTrackManager.ts @@ -1,34 +1,42 @@ import { useCallback, useRef } from 'react'; import { WebRTCStateManager } from '../ui/webRTCStore'; +import type { TrackHandler, Unsubscribe } from './types'; /** - * WebRTC 媒体轨道管理器 - * 负责媒体轨道的添加和移除 + * WebRTC 媒体轨道管理器接口 */ export interface WebRTCTrackManager { - // 添加媒体轨道 + // 添加媒体轨道到 PeerConnection addTrack: (track: MediaStreamTrack, stream: MediaStream) => RTCRtpSender | null; // 移除媒体轨道 removeTrack: (sender: RTCRtpSender) => void; - // 设置轨道处理器 - onTrack: (handler: (event: RTCTrackEvent) => void) => void; + /** + * 注册轨道事件处理器(多监听器模式) + * - 多个消费者可同时注册(桌面共享处理 video,语音通话处理 audio) + * - 返回清理函数,组件卸载时务必调用 + */ + registerTrackHandler: (key: string, handler: TrackHandler) => Unsubscribe; - // 创建 Offer + // 创建并发送 SDP Offer(用于初始连接) createOffer: (pc: RTCPeerConnection, ws: WebSocket) => Promise; - // 立即创建offer(用于媒体轨道添加后的重新协商) + // 立即创建 Offer(用于媒体轨道变更后的重新协商) createOfferNow: (pc: RTCPeerConnection, ws: WebSocket) => Promise; - // 内部方法,供核心连接管理器调用 + // ── 内部方法,仅供 ConnectionCore 调用 ── setPeerConnection: (pc: RTCPeerConnection | null) => void; setWebSocket: (ws: WebSocket | null) => void; } /** * WebRTC 媒体轨道管理 Hook - * 负责媒体轨道的添加和移除,处理轨道事件,提供 createOffer 功能 + * + * 职责: + * 1. 管理 RTCRtpSender(添加 / 移除轨道) + * 2. 复合分发 ontrack 事件给多个消费者 + * 3. 创建 SDP Offer 并通过信令 WebSocket 发送 */ export function useWebRTCTrackManager( stateManager: WebRTCStateManager @@ -36,78 +44,82 @@ export function useWebRTCTrackManager( const pcRef = useRef(null); const wsRef = useRef(null); - // 创建 Offer + // 多监听器:key → handler(如 'desktop-share' → handler, 'voice-chat' → handler) + const trackHandlers = useRef>(new Map()); + + // ── 复合分发:将 ontrack 事件广播给所有已注册的处理器 ── + const dispatchTrackEvent = useCallback((event: RTCTrackEvent) => { + const handlerCount = trackHandlers.current.size; + if (handlerCount === 0) { + console.warn('[TrackManager] 收到轨道事件但无处理器注册:', event.track.kind, event.track.id); + return; + } + console.log(`[TrackManager] 📡 分发轨道事件 (${event.track.kind}) 给 ${handlerCount} 个处理器`); + trackHandlers.current.forEach((handler, key) => { + try { + handler(event); + } catch (error) { + console.error(`[TrackManager] 轨道处理器 "${key}" 执行出错:`, error); + } + }); + }, []); + + // ── SDP Offer 创建 ── const createOffer = useCallback(async (pc: RTCPeerConnection, ws: WebSocket) => { try { - console.log('[TrackManager] 🎬 开始创建offer,当前轨道数量:', pc.getSenders().length); - - // 确保连接状态稳定 - if (pc.connectionState !== 'connecting' && pc.connectionState !== 'new') { - console.warn('[TrackManager] ⚠️ PeerConnection状态异常:', pc.connectionState); - } + console.log('[TrackManager] 🎬 开始创建 Offer,当前轨道数:', pc.getSenders().length); const offer = await pc.createOffer({ - offerToReceiveAudio: true, // 改为true以支持音频接收 - offerToReceiveVideo: true, // 改为true以支持视频接收 + offerToReceiveAudio: true, + offerToReceiveVideo: true, }); - - console.log('[TrackManager] 📝 Offer创建成功,设置本地描述...'); await pc.setLocalDescription(offer); console.log('[TrackManager] ✅ 本地描述设置完成'); - // 增加超时时间到5秒,给ICE候选收集更多时间 + // ICE 收集超时保护 const iceTimeout = setTimeout(() => { - console.log('[TrackManager] ⏱️ ICE收集超时,发送当前offer'); if (ws.readyState === WebSocket.OPEN && pc.localDescription) { ws.send(JSON.stringify({ type: 'offer', payload: pc.localDescription })); - console.log('[TrackManager] 📤 发送 offer (超时发送)'); + console.log('[TrackManager] 📤 发送 Offer (ICE 收集超时)'); } }, 5000); - // 如果ICE收集已经完成,立即发送 if (pc.iceGatheringState === 'complete') { clearTimeout(iceTimeout); if (ws.readyState === WebSocket.OPEN && pc.localDescription) { ws.send(JSON.stringify({ type: 'offer', payload: pc.localDescription })); - console.log('[TrackManager] 📤 发送 offer (ICE收集完成)'); + console.log('[TrackManager] 📤 发送 Offer (ICE 已完成)'); } } else { - console.log('[TrackManager] 🧊 等待ICE候选收集...'); - // 监听ICE收集状态变化 pc.onicegatheringstatechange = () => { - console.log('[TrackManager] 🧊 ICE收集状态变化:', pc.iceGatheringState); if (pc.iceGatheringState === 'complete') { clearTimeout(iceTimeout); if (ws.readyState === WebSocket.OPEN && pc.localDescription) { ws.send(JSON.stringify({ type: 'offer', payload: pc.localDescription })); - console.log('[TrackManager] 📤 发送 offer (ICE收集完成)'); + console.log('[TrackManager] 📤 发送 Offer (ICE 收集完成)'); } } }; - - // 同时监听ICE候选事件,用于调试 pc.onicecandidate = (event) => { if (event.candidate) { - console.log('[TrackManager] 🧊 收到ICE候选:', event.candidate.candidate.substring(0, 50) + '...'); - } else { - console.log('[TrackManager] 🏁 ICE候选收集完成'); + console.log('[TrackManager] 🧊 ICE 候选:', event.candidate.candidate.substring(0, 50) + '...'); } }; } } catch (error) { - console.error('[TrackManager] ❌ 创建 offer 失败:', error); + console.error('[TrackManager] ❌ 创建 Offer 失败:', error); stateManager.updateState({ error: '创建连接失败', isConnecting: false, canRetry: true }); } }, [stateManager]); - // 添加媒体轨道 + // ── 轨道操作 ── + const addTrack = useCallback((track: MediaStreamTrack, stream: MediaStream) => { const pc = pcRef.current; if (!pc) { console.error('[TrackManager] PeerConnection 不可用'); return null; } - try { return pc.addTrack(track, stream); } catch (error) { @@ -116,14 +128,12 @@ export function useWebRTCTrackManager( } }, []); - // 移除媒体轨道 const removeTrack = useCallback((sender: RTCRtpSender) => { const pc = pcRef.current; if (!pc) { console.error('[TrackManager] PeerConnection 不可用'); return; } - try { pc.removeTrack(sender); } catch (error) { @@ -131,88 +141,44 @@ export function useWebRTCTrackManager( } }, []); - // 设置轨道处理器 - const onTrack = useCallback((handler: (event: RTCTrackEvent) => void) => { - const pc = pcRef.current; - if (!pc) { - console.warn('[TrackManager] PeerConnection 尚未准备就绪,将在连接建立后设置onTrack'); - // 检查WebSocket连接状态,只有连接后才尝试设置 - const state = stateManager.getState(); - if (!state.isWebSocketConnected) { - console.log('[TrackManager] WebSocket未连接,等待连接建立...'); - return; - } - - // 延迟设置,等待PeerConnection准备就绪 - let retryCount = 0; - const maxRetries = 50; // 增加重试次数到50次,即5秒 - - const checkAndSetTrackHandler = () => { - const currentPc = pcRef.current; - if (currentPc) { - console.log('[TrackManager] ✅ PeerConnection 已准备就绪,设置onTrack处理器'); - currentPc.ontrack = handler; - - // 如果已经有远程轨道,立即触发处理 - const receivers = currentPc.getReceivers(); - console.log(`[TrackManager] 📡 当前有 ${receivers.length} 个接收器`); - receivers.forEach(receiver => { - if (receiver.track) { - console.log(`[TrackManager] 🎥 发现现有轨道: ${receiver.track.kind}, ${receiver.track.id}, 状态: ${receiver.track.readyState}`); - } - }); - } else { - retryCount++; - if (retryCount < maxRetries) { - // 每5次重试输出一次日志,减少日志数量 - if (retryCount % 5 === 0) { - console.log(`[TrackManager] ⏳ 等待PeerConnection准备就绪... (尝试: ${retryCount}/${maxRetries})`); - } - setTimeout(checkAndSetTrackHandler, 100); - } else { - console.error('[TrackManager] ❌ PeerConnection 长时间未准备就绪,停止重试'); - } - } - }; - checkAndSetTrackHandler(); - return; - } - - console.log('[TrackManager] ✅ 立即设置onTrack处理器'); - pc.ontrack = handler; - - // 检查是否已有轨道 - const receivers = pc.getReceivers(); - console.log(`[TrackManager] 📡 当前有 ${receivers.length} 个接收器`); - receivers.forEach(receiver => { - if (receiver.track) { - console.log(`[TrackManager] 🎥 发现现有轨道: ${receiver.track.kind}, ${receiver.track.id}, 状态: ${receiver.track.readyState}`); - } - }); - }, [stateManager]); + // ── 多监听器注册 ── + + const registerTrackHandler = useCallback((key: string, handler: TrackHandler): Unsubscribe => { + console.log('[TrackManager] 注册轨道处理器:', key); + trackHandlers.current.set(key, handler); + + return () => { + console.log('[TrackManager] 取消注册轨道处理器:', key); + trackHandlers.current.delete(key); + }; + }, []); + + // ── 重新协商 ── - // 立即创建offer(用于媒体轨道添加后的重新协商) const createOfferNow = useCallback(async (pc: RTCPeerConnection, ws: WebSocket) => { if (!pc || !ws) { console.error('[TrackManager] PeerConnection 或 WebSocket 不可用'); return false; } - try { await createOffer(pc, ws); return true; } catch (error) { - console.error('[TrackManager] 创建 offer 失败:', error); + console.error('[TrackManager] 创建 Offer 失败:', error); return false; } }, [createOffer]); - // 设置 PeerConnection 引用 + // ── 内部引用设置(仅供 ConnectionCore 调用)── + const setPeerConnection = useCallback((pc: RTCPeerConnection | null) => { pcRef.current = pc; - }, []); + // 新 PeerConnection 创建时,挂载复合轨道分发器 + if (pc) { + pc.ontrack = dispatchTrackEvent; + } + }, [dispatchTrackEvent]); - // 设置 WebSocket 引用 const setWebSocket = useCallback((ws: WebSocket | null) => { wsRef.current = ws; }, []); @@ -220,11 +186,10 @@ export function useWebRTCTrackManager( return { addTrack, removeTrack, - onTrack, + registerTrackHandler, createOffer, createOfferNow, - // 内部方法,供核心连接管理器调用 setPeerConnection, setWebSocket, }; -} \ No newline at end of file +} diff --git a/chuan-next/src/hooks/desktop-share/useDesktopShareBusiness.ts b/chuan-next/src/hooks/desktop-share/useDesktopShareBusiness.ts index 013e4a7..2f1ae7d 100644 --- a/chuan-next/src/hooks/desktop-share/useDesktopShareBusiness.ts +++ b/chuan-next/src/hooks/desktop-share/useDesktopShareBusiness.ts @@ -1,4 +1,5 @@ import { useState, useRef, useCallback, useEffect } from 'react'; +import type { WebRTCConnection } from '../connection/types'; import { useSharedWebRTCManager } from '../connection/useSharedWebRTCManager'; interface DesktopShareState { @@ -11,8 +12,9 @@ interface DesktopShareState { isWaitingForPeer: boolean; // 新增:是否等待对方连接 } -export function useDesktopShareBusiness() { - const webRTC = useSharedWebRTCManager(); +export function useDesktopShareBusiness(externalConnection?: WebRTCConnection) { + const internalConnection = useSharedWebRTCManager(); + const webRTC = externalConnection ?? internalConnection; const [state, setState] = useState({ isSharing: false, isViewing: false, @@ -42,24 +44,22 @@ export function useDesktopShareBusiness() { } }, [updateState]); - // 设置远程轨道处理器(始终监听) + // 设置远程轨道处理器(始终监听,支持与语音通话并存) useEffect(() => { - console.log('[DesktopShare] 🎧 设置远程轨道处理器'); - webRTC.onTrack((event: RTCTrackEvent) => { - console.log('[DesktopShare] 🎥 收到远程轨道:', event.track.kind, event.track.id, '状态:', event.track.readyState); - console.log('[DesktopShare] 远程流数量:', event.streams.length); + console.log('[DesktopShare] 🎧 注册远程轨道处理器'); + const unsubscribe = webRTC.registerTrackHandler('desktop-share', (event: RTCTrackEvent) => { + // 只处理视频轨道,音频由 VoiceChat 处理 + if (event.track.kind !== 'video') return; + + console.log('[DesktopShare] 🎥 收到远程视频轨道:', event.track.id, '状态:', event.track.readyState); if (event.streams.length > 0) { const remoteStream = event.streams[0]; console.log('[DesktopShare] 🎬 设置远程流,轨道数量:', remoteStream.getTracks().length); - remoteStream.getTracks().forEach(track => { - console.log('[DesktopShare] 远程轨道:', track.kind, track.id, '启用:', track.enabled, '状态:', track.readyState); - }); // 确保轨道已启用 remoteStream.getTracks().forEach(track => { if (!track.enabled) { - console.log('[DesktopShare] 🔓 启用远程轨道:', track.id); track.enabled = true; } }); @@ -67,25 +67,18 @@ export function useDesktopShareBusiness() { handleRemoteStream(remoteStream); } else { console.warn('[DesktopShare] ⚠️ 收到轨道但没有关联的流'); - // 尝试从轨道创建流 try { const newStream = new MediaStream([event.track]); - console.log('[DesktopShare] 🔄 从轨道创建新流:', newStream.id); - - // 确保轨道已启用 newStream.getTracks().forEach(track => { - if (!track.enabled) { - console.log('[DesktopShare] 🔓 启用新流中的轨道:', track.id); - track.enabled = true; - } + if (!track.enabled) track.enabled = true; }); - handleRemoteStream(newStream); } catch (error) { console.error('[DesktopShare] ❌ 从轨道创建流失败:', error); } } }); + return unsubscribe; }, [webRTC, handleRemoteStream]); // 获取桌面共享流 diff --git a/chuan-next/src/hooks/desktop-share/useVoiceChatBusiness.ts b/chuan-next/src/hooks/desktop-share/useVoiceChatBusiness.ts index d49d6e7..59d1b0b 100644 --- a/chuan-next/src/hooks/desktop-share/useVoiceChatBusiness.ts +++ b/chuan-next/src/hooks/desktop-share/useVoiceChatBusiness.ts @@ -1,5 +1,5 @@ import { useCallback, useEffect, useRef, useState } from 'react'; -import { WebRTCConnection } from '../connection/useSharedWebRTCManager'; +import type { WebRTCConnection } from '../connection/types'; import { useAudioVisualizer } from './useAudioVisualizer'; interface VoiceChatState { @@ -96,10 +96,11 @@ export function useVoiceChatBusiness(connection: WebRTCConnection) { } }; - // feature/ws 的 onTrack 返回 void(直接设置 pc.ontrack) - connection.onTrack(trackHandler); + // 注册轨道处理器(多监听器模式,与桌面共享并存) + const unsubscribe = connection.registerTrackHandler('voice-chat', trackHandler); return () => { + unsubscribe(); if (currentTrackRef.current) { currentTrackRef.current.onended = null; currentTrackRef.current.onmute = null; diff --git a/chuan-next/src/hooks/file-transfer/useFileTransferBusiness.ts b/chuan-next/src/hooks/file-transfer/useFileTransferBusiness.ts index 1dbf910..4b6eedd 100644 --- a/chuan-next/src/hooks/file-transfer/useFileTransferBusiness.ts +++ b/chuan-next/src/hooks/file-transfer/useFileTransferBusiness.ts @@ -1,5 +1,5 @@ import { useState, useCallback, useRef, useEffect } from 'react'; -import type { WebRTCConnection } from '../connection/useSharedWebRTCManager'; +import type { WebRTCConnection } from '../connection/types'; import type { FileInfo } from '@/types'; // 文件传输状态 @@ -40,38 +40,26 @@ interface FileMetadata { } // 文件块信息 +// 文件块元信息 interface FileChunk { fileId: string; chunkIndex: number; totalChunks: number; - checksum?: string; // 数据校验和 } -// 块确认信息 -interface ChunkAck { - fileId: string; - chunkIndex: number; - success: boolean; - checksum?: string; -} - -// 传输状态 +// 发送端传输状态 interface TransferStatus { fileId: string; fileName: string; totalChunks: number; sentChunks: Set; - acknowledgedChunks: Set; - failedChunks: Set; lastChunkTime: number; - retryCount: Map; - averageSpeed: number; // KB/s // 滑动窗口测速 - speedWindowBytes: number; // 窗口内累计字节数 - speedWindowStart: number; // 窗口开始时间 - lastReportedSpeed: number; // 上次上报的速度 bytes/s - lastReportedEta: number; // 上次上报的 ETA 秒 - lastSpeedReportTime: number; // 上次上报速度的时间 + speedWindowBytes: number; + speedWindowStart: number; + lastReportedSpeed: number; // bytes/s + lastReportedEta: number; // 秒 + lastSpeedReportTime: number; } // 回调类型 @@ -82,9 +70,6 @@ type FileListReceivedCallback = (fileList: FileInfo[]) => void; const CHANNEL_NAME = 'file-transfer'; const CHUNK_SIZE = 256 * 1024; // 256KB — WebRTC DataChannel 单次发送上限 -const MAX_RETRIES = 5; // 最大重试次数(仅用于连接恢复) -const RETRY_DELAY = 1000; // 重试延迟(毫秒) -const ACK_TIMEOUT = 5000; // 完成确认超时(毫秒) const SPEED_WINDOW_MS = 2000; // 速度计算滑动窗口 2 秒 const SPEED_REPORT_INTERVAL_MS = 1000; // 速度上报最小间隔 1 秒 const BUFFER_HIGH_WATER = 2 * 1024 * 1024; // 2MB — 发送背压阈值 @@ -220,12 +205,7 @@ export function useFileTransferBusiness(connection: WebRTCConnection) { break; case 'file-chunk-ack': - const ack: ChunkAck = message.payload; - console.log('收到块确认:', ack); - - // 清除超时定时器 - case 'file-chunk-ack': - // 保留消息处理以兼容旧版对端,但不再依赖逐块 ACK 做可靠性 + // 兼容旧版对端,不再依赖逐块 ACK(SCTP 保证可靠有序传输) break; } }, [updateState]); @@ -360,11 +340,7 @@ export function useFileTransferBusiness(connection: WebRTCConnection) { fileName: file.name, totalChunks, sentChunks: new Set(), - acknowledgedChunks: new Set(), - failedChunks: new Set(), lastChunkTime: nowInit, - retryCount: new Map(), - averageSpeed: 0, speedWindowBytes: 0, speedWindowStart: nowInit, lastReportedSpeed: 0, @@ -424,7 +400,6 @@ export function useFileTransferBusiness(connection: WebRTCConnection) { if (windowElapsed >= SPEED_WINDOW_MS && windowElapsed > 0) { const speedBps = (status.speedWindowBytes / windowElapsed) * 1000; - status.averageSpeed = speedBps / 1024; status.lastReportedSpeed = speedBps; const remainingBytes = Math.max(0, file.size - (chunkIndex + 1) * CHUNK_SIZE); status.lastReportedEta = speedBps > 0 ? Math.max(0, remainingBytes / speedBps) : 0; diff --git a/chuan-next/src/hooks/text-transfer/useTextTransferBusiness.ts b/chuan-next/src/hooks/text-transfer/useTextTransferBusiness.ts index b9b473b..11546c1 100644 --- a/chuan-next/src/hooks/text-transfer/useTextTransferBusiness.ts +++ b/chuan-next/src/hooks/text-transfer/useTextTransferBusiness.ts @@ -1,13 +1,9 @@ import { useState, useCallback, useRef, useEffect } from 'react'; -import type { WebRTCConnection } from '../connection/useSharedWebRTCManager'; +import type { WebRTCConnection } from '../connection/types'; -// 文本传输状态 +// 文本传输业务状态(仅业务相关字段,连接状态直接从 connection 读取) interface TextTransferState { - isConnecting: boolean; - isConnected: boolean; - isWebSocketConnected: boolean; - connectionError: string | null; - currentText: string; // 当前文本内容 + currentText: string; // 对端同步过来的文本内容 isTyping: boolean; // 对方是否在输入 } @@ -23,153 +19,91 @@ const CHANNEL_NAME = 'text-transfer'; */ export function useTextTransferBusiness(connection: WebRTCConnection) { const [state, setState] = useState({ - isConnecting: false, - isConnected: false, - isWebSocketConnected: false, - connectionError: null, currentText: '', - isTyping: false + isTyping: false, }); // 回调引用 const textSyncCallbackRef = useRef(null); const typingCallbackRef = useRef(null); - // 更新状态的辅助函数 - const updateState = useCallback((updates: Partial) => { - setState(prev => ({ ...prev, ...updates })); - }, []); - // 消息处理器 const handleMessage = useCallback((message: any) => { if (!message.type.startsWith('text-')) return; - - console.log('文本传输收到消息:', message.type, message); switch (message.type) { case 'text-sync': - // 实时文本同步 - 接收方看到发送方的实时编辑 if (message.payload && typeof message.payload.text === 'string') { - updateState({ currentText: message.payload.text }); - - // 触发文本同步回调 - if (textSyncCallbackRef.current) { - textSyncCallbackRef.current(message.payload.text); - } + setState(prev => ({ ...prev, currentText: message.payload.text })); + textSyncCallbackRef.current?.(message.payload.text); } break; case 'text-typing': - // 打字状态 if (typeof message.payload?.typing === 'boolean') { - updateState({ isTyping: message.payload.typing }); - - // 触发打字状态回调 - if (typingCallbackRef.current) { - typingCallbackRef.current(message.payload.typing); - } + setState(prev => ({ ...prev, isTyping: message.payload.typing })); + typingCallbackRef.current?.(message.payload.typing); } break; - - default: - console.warn('未知的文本消息类型:', message.type); } - }, [updateState]); + }, []); // 注册消息处理器 useEffect(() => { - const unregister = connection.registerMessageHandler(CHANNEL_NAME, handleMessage); - return unregister; - }, [handleMessage]); + return connection.registerMessageHandler(CHANNEL_NAME, handleMessage); + }, [connection, handleMessage]); - // 监听连接状态变化 (直接使用 connection 的状态) - useEffect(() => { - // 同步连接状态 - updateState({ - isConnecting: connection.isConnecting, - isConnected: connection.isConnected, - isWebSocketConnected: connection.isWebSocketConnected, - connectionError: connection.error - }); - }, [connection.isConnecting, connection.isConnected, connection.isWebSocketConnected, connection.error, updateState]); - - // 连接 + // 连接管理(透传) const connect = useCallback((roomCode: string, role: 'sender' | 'receiver') => { return connection.connect(roomCode, role); }, [connection]); - // 断开连接 const disconnect = useCallback(() => { return connection.disconnect(); }, [connection]); - // 发送实时文本同步 (替代原来的 sendMessage) + // 发送实时文本同步 const sendTextSync = useCallback((text: string) => { - if (!connection || !connection.isPeerConnected) return; - - const message = { - type: 'text-sync', - payload: { text } - }; - - const success = connection.sendMessage(message, CHANNEL_NAME); - if (success) { - console.log('发送实时文本同步:', text.length, '字符'); - } + if (!connection.isPeerConnected) return; + connection.sendMessage({ type: 'text-sync', payload: { text } }, CHANNEL_NAME); }, [connection]); // 发送打字状态 const sendTypingStatus = useCallback((isTyping: boolean) => { - if (!connection || !connection.isPeerConnected) return; - - const message = { - type: 'text-typing', - payload: { typing: isTyping } - }; - - const success = connection.sendMessage(message, CHANNEL_NAME); - if (success) { - console.log('发送打字状态:', isTyping); - } + if (!connection.isPeerConnected) return; + connection.sendMessage({ type: 'text-typing', payload: { typing: isTyping } }, CHANNEL_NAME); }, [connection]); - // 设置文本同步回调 + // 回调注册(返回清理函数) const onTextSync = useCallback((callback: TextSyncCallback) => { textSyncCallbackRef.current = callback; - - // 返回清理函数 - return () => { - textSyncCallbackRef.current = null; - }; + return () => { textSyncCallbackRef.current = null; }; }, []); - // 设置打字状态回调 const onTypingStatus = useCallback((callback: TypingStatusCallback) => { typingCallbackRef.current = callback; - - // 返回清理函数 - return () => { - typingCallbackRef.current = null; - }; + return () => { typingCallbackRef.current = null; }; }, []); return { - // 状态 - 直接从 connection 获取 + // 连接状态(直接读 connection,不做冗余同步) isConnecting: connection.isConnecting, isConnected: connection.isConnected, isWebSocketConnected: connection.isWebSocketConnected, connectionError: connection.error, + + // 业务状态 currentText: state.currentText, isTyping: state.isTyping, - + // 操作方法 connect, disconnect, sendTextSync, sendTypingStatus, - + // 回调设置 onTextSync, - onTypingStatus + onTypingStatus, }; }