feat: 重构 WebRTC 轨道管理,支持多监听器注册,优化 SDP Offer 创建流程;更新桌面共享和语音通话业务逻辑,增强连接管理

This commit is contained in:
MatrixSeven
2026-03-05 13:28:29 +08:00
parent da3e4f1067
commit 6d02a9898f
11 changed files with 545 additions and 735 deletions

View File

@@ -5,7 +5,7 @@ import { Mic, MicOff, PhoneCall, PhoneOff } from 'lucide-react';
import { Button } from '@/components/ui/button';
import { VoiceIndicator } from '@/components/VoiceIndicator';
import { useVoiceChatBusiness } from '@/hooks/desktop-share';
import type { WebRTCConnection } from '@/hooks/connection/useSharedWebRTCManager';
import type { WebRTCConnection } from '@/hooks/connection/types';
interface VoiceChatPanelProps {
connection: WebRTCConnection;

View File

@@ -1,4 +1,7 @@
// 连接相关的hooks
// 连接相关的 hooks
export { useRoomConnection } from './useRoomConnection';
export { useSharedWebRTCManager } from './useSharedWebRTCManager';
export { useWebRTCSupport } from './useWebRTCSupport';
// 类型导出
export type { WebRTCConnection, WebRTCMessage, MessageHandler, DataHandler, TrackHandler, Unsubscribe } from './types';

View File

@@ -0,0 +1,74 @@
import type { TransportMode } from '../ui/webRTCStore';
// ────────────────────────────────────────────
// 基础类型
// ────────────────────────────────────────────
/** 清理函数 / 取消订阅 */
export type Unsubscribe = () => void;
/** WebRTC JSON 消息 */
export interface WebRTCMessage {
type: string;
payload: any;
channel?: string;
}
// ────────────────────────────────────────────
// 处理器类型
// ────────────────────────────────────────────
/** JSON 消息处理器 */
export type MessageHandler = (message: WebRTCMessage) => void;
/** 二进制数据处理器 */
export type DataHandler = (data: ArrayBuffer) => void;
/** 媒体轨道事件处理器 */
export type TrackHandler = (event: RTCTrackEvent) => void;
// ────────────────────────────────────────────
// WebRTC 连接统一接口
// 所有业务模块(文件传输 / 文本传输 / 桌面共享 / 语音通话)
// 都通过此接口与底层通信,不直接依赖具体实现
// ────────────────────────────────────────────
export interface WebRTCConnection {
// ── 只读状态 ──
readonly isConnected: boolean;
readonly isConnecting: boolean;
readonly isWebSocketConnected: boolean;
readonly isPeerConnected: boolean;
readonly error: string | null;
readonly canRetry: boolean;
readonly transportMode: TransportMode;
readonly currentRoom: { code: string; role: 'sender' | 'receiver' } | null;
// ── 连接管理 ──
connect(roomCode: string, role: 'sender' | 'receiver'): Promise<void>;
disconnect(): void;
retry(): Promise<void>;
isConnectedToRoom(roomCode: string, role: 'sender' | 'receiver'): boolean;
// ── 数据通道DataChannel / Relay 透明切换)──
sendMessage(message: WebRTCMessage, channel?: string): boolean;
sendData(data: ArrayBuffer): boolean;
registerMessageHandler(channel: string, handler: MessageHandler): Unsubscribe;
registerDataHandler(channel: string, handler: DataHandler): Unsubscribe;
getChannelState(): RTCDataChannelState;
getBufferedAmount(): number;
waitForBufferDrain(threshold?: number): Promise<void>;
// ── 媒体轨道(支持多监听器) ──
addTrack(track: MediaStreamTrack, stream: MediaStream): RTCRtpSender | null;
removeTrack(sender: RTCRtpSender): void;
/**
* 注册轨道事件处理器
* - 支持多个消费者同时注册(桌面共享 + 语音通话)
* - 返回清理函数,组件卸载时调用
* @param key 唯一标识符,如 'desktop-share' / 'voice-chat'
*/
registerTrackHandler(key: string, handler: TrackHandler): Unsubscribe;
getPeerConnection(): RTCPeerConnection | null;
createOfferNow(): Promise<boolean>;
}

View File

@@ -1,60 +1,23 @@
import { useCallback, useMemo } from 'react';
import { useWebRTCStore, type WebRTCStateManager, type TransportMode } from '../ui/webRTCStore';
import { useWebRTCDataChannelManager, WebRTCMessage } from './useWebRTCDataChannelManager';
import { useWebRTCStore, type WebRTCStateManager } from '../ui/webRTCStore';
import { useWebRTCDataChannelManager } from './useWebRTCDataChannelManager';
import { useWebRTCTrackManager } from './useWebRTCTrackManager';
import { useWebRTCConnectionCore } from './useWebRTCConnectionCore';
import type { WebRTCConnection, TrackHandler, Unsubscribe } from './types';
// 消息和数据处理器类型
export type MessageHandler = (message: WebRTCMessage) => void;
export type DataHandler = (data: ArrayBuffer) => void;
// WebRTC 连接接口
export interface WebRTCConnection {
// 状态
isConnected: boolean;
isConnecting: boolean;
isWebSocketConnected: boolean;
isPeerConnected: boolean;
error: string | null;
canRetry: boolean;
// 传输模式
transportMode: TransportMode;
// 操作方法
connect: (roomCode: string, role: 'sender' | 'receiver') => Promise<void>;
disconnect: () => void;
retry: () => Promise<void>;
sendMessage: (message: WebRTCMessage, channel?: string) => boolean;
sendData: (data: ArrayBuffer) => boolean;
// 处理器注册
registerMessageHandler: (channel: string, handler: MessageHandler) => () => void;
registerDataHandler: (channel: string, handler: DataHandler) => () => void;
// 工具方法
getChannelState: () => RTCDataChannelState;
getBufferedAmount: () => number;
waitForBufferDrain: (threshold?: number) => Promise<void>;
isConnectedToRoom: (roomCode: string, role: 'sender' | 'receiver') => boolean;
// 当前房间信息
currentRoom: { code: string; role: 'sender' | 'receiver' } | null;
// 媒体轨道方法
addTrack: (track: MediaStreamTrack, stream: MediaStream) => RTCRtpSender | null;
removeTrack: (sender: RTCRtpSender) => void;
onTrack: (callback: (event: RTCTrackEvent) => void) => void;
getPeerConnection: () => RTCPeerConnection | null;
createOfferNow: () => Promise<boolean>;
}
// Re-export the canonical interface for consumers
export type { WebRTCConnection };
/**
* 共享 WebRTC 连接管理器
* 创建单一的 WebRTC 连接实例,供多个业务模块共享使用
* 整合所有模块,提供统一的接口
*
* 职责:
* 1. 整合 ConnectionCore + DataChannelManager + TrackManager
* 2. 暴露统一的 WebRTCConnection 接口给所有业务模块
* 3. 确保单一 PeerConnection 实例被多个功能共享
*/
export function useSharedWebRTCManager(): WebRTCConnection {
// 直接zustand store 创建状态管理器
// 从 Zustand store 创建状态管理器
const store = useWebRTCStore();
const stateManager: WebRTCStateManager = useMemo(() => ({
getState: () => ({
@@ -81,21 +44,10 @@ export function useSharedWebRTCManager(): WebRTCConnection {
const connectionCore = useWebRTCConnectionCore(
stateManager,
dataChannelManager,
trackManager
trackManager,
);
// 从 store 获取当前状态
const state = {
isConnected: store.isConnected,
isConnecting: store.isConnecting,
isWebSocketConnected: store.isWebSocketConnected,
isPeerConnected: store.isPeerConnected,
error: store.error,
canRetry: store.canRetry,
transportMode: store.transportMode,
};
// 创建 createOfferNow 方法
// ── createOfferNow 桥接 ──
const createOfferNow = useCallback(async () => {
const pc = connectionCore.getPeerConnection();
const ws = connectionCore.getWebSocket();
@@ -103,51 +55,50 @@ export function useSharedWebRTCManager(): WebRTCConnection {
console.error('[SharedWebRTC] PeerConnection 或 WebSocket 不可用');
return false;
}
try {
return await trackManager.createOfferNow(pc, ws);
} catch (error) {
console.error('[SharedWebRTC] 创建 offer 失败:', error);
console.error('[SharedWebRTC] 创建 Offer 失败:', error);
return false;
}
}, [connectionCore, trackManager]);
// 返回统一的接口,保持与当前 API 一致
return {
// 状态
isConnected: state.isConnected,
isConnecting: state.isConnecting,
isWebSocketConnected: state.isWebSocketConnected,
isPeerConnected: state.isPeerConnected,
error: state.error,
canRetry: state.canRetry,
transportMode: state.transportMode,
// ── registerTrackHandler 桥接 ──
const registerTrackHandler = useCallback((key: string, handler: TrackHandler): Unsubscribe => {
return trackManager.registerTrackHandler(key, handler);
}, [trackManager]);
// 操作方法
return {
// 只读状态
isConnected: store.isConnected,
isConnecting: store.isConnecting,
isWebSocketConnected: store.isWebSocketConnected,
isPeerConnected: store.isPeerConnected,
error: store.error,
canRetry: store.canRetry,
transportMode: store.transportMode,
currentRoom: connectionCore.getCurrentRoom(),
// 连接管理
connect: connectionCore.connect,
disconnect: () => connectionCore.disconnect(true),
retry: connectionCore.retry,
isConnectedToRoom: stateManager.isConnectedToRoom,
// 数据通道
sendMessage: dataChannelManager.sendMessage,
sendData: dataChannelManager.sendData,
// 处理器注册
registerMessageHandler: dataChannelManager.registerMessageHandler,
registerDataHandler: dataChannelManager.registerDataHandler,
// 工具方法
getChannelState: dataChannelManager.getChannelState,
getBufferedAmount: dataChannelManager.getBufferedAmount,
waitForBufferDrain: dataChannelManager.waitForBufferDrain,
isConnectedToRoom: stateManager.isConnectedToRoom,
// 媒体轨道方法
// 媒体轨道
addTrack: trackManager.addTrack,
removeTrack: trackManager.removeTrack,
onTrack: trackManager.onTrack,
registerTrackHandler,
getPeerConnection: connectionCore.getPeerConnection,
createOfferNow,
// 当前房间信息
currentRoom: connectionCore.getCurrentRoom(),
};
}

View File

@@ -53,8 +53,11 @@ export function useWebRTCConnectionCore(
const relayWsRef = useRef<WebSocket | null>(null);
const isRelayFallbackInProgress = useRef<boolean>(false);
const p2pFailureTimeout = useRef<NodeJS.Timeout | null>(null);
const iceDisconnectedTimeout = useRef<NodeJS.Timeout | null>(null);
// 标记是否已经发送过 relay-request避免重复发送
const relayRequestSent = useRef<boolean>(false);
// 通过 ref 避免闭包捕获过时的 initiateRelayFallback
const initiateRelayFallbackRef = useRef<() => void>(() => {});
// 清理连接
const cleanup = useCallback((shouldNotifyDisconnect: boolean = false) => {
@@ -70,6 +73,11 @@ export function useWebRTCConnectionCore(
p2pFailureTimeout.current = null;
}
if (iceDisconnectedTimeout.current) {
clearTimeout(iceDisconnectedTimeout.current);
iceDisconnectedTimeout.current = null;
}
if (pcRef.current) {
pcRef.current.close();
pcRef.current = null;
@@ -235,6 +243,7 @@ export function useWebRTCConnectionCore(
relayWs.onerror = (error) => {
console.error('[ConnectionCore] ❌ 中继 WebSocket 错误:', error);
isRelayFallbackInProgress.current = false;
relayRequestSent.current = false; // 重置以允许重试
stateManager.updateState({
error: 'WS 中继连接失败,请重试',
isConnecting: false,
@@ -263,6 +272,7 @@ export function useWebRTCConnectionCore(
} catch (error) {
console.error('[ConnectionCore] 创建中继连接失败:', error);
isRelayFallbackInProgress.current = false;
relayRequestSent.current = false; // 重置以允许重试
stateManager.updateState({
error: '无法建立中继连接,请重试',
isConnecting: false,
@@ -306,6 +316,9 @@ export function useWebRTCConnectionCore(
connectToRelay();
}, [connectToRelay]);
// 保持 ref 同步,避免闭包过时
initiateRelayFallbackRef.current = initiateRelayFallback;
// 创建 PeerConnection 和相关设置
const createPeerConnection = useCallback((ws: WebSocket, role: 'sender' | 'receiver', isReconnect: boolean = false) => {
console.log('[ConnectionCore] 🔧 创建PeerConnection...', { role, isReconnect });
@@ -359,18 +372,34 @@ export function useWebRTCConnectionCore(
case 'connected':
case 'completed':
console.log('[ConnectionCore] ✅ ICE连接成功');
// ICE 连接成功,清除降级定时器
// ICE 连接成功,清除所有降级定时器
if (p2pFailureTimeout.current) {
clearTimeout(p2pFailureTimeout.current);
p2pFailureTimeout.current = null;
}
if (iceDisconnectedTimeout.current) {
clearTimeout(iceDisconnectedTimeout.current);
iceDisconnectedTimeout.current = null;
}
break;
case 'failed':
console.error('[ConnectionCore] ❌ ICE连接失败启动中继降级');
initiateRelayFallback();
initiateRelayFallbackRef.current();
break;
case 'disconnected':
console.log('[ConnectionCore] 🔌 ICE连接断开');
console.log('[ConnectionCore] 🔌 ICE连接断开,设置 8 秒超时保护');
// ICE disconnected 可能是暂时的(网络抖动),也可能永不恢复
// 设置超时保护8 秒后如果仍非 connected/completed 则降级
if (iceDisconnectedTimeout.current) {
clearTimeout(iceDisconnectedTimeout.current);
}
iceDisconnectedTimeout.current = setTimeout(() => {
const iceState = pcRef.current?.iceConnectionState;
if (iceState && iceState !== 'connected' && iceState !== 'completed') {
console.log('[ConnectionCore] ⏰ ICE disconnected 超时8秒启动中继降级');
initiateRelayFallbackRef.current();
}
}, 8000);
break;
case 'closed':
console.log('[ConnectionCore] 🚫 ICE连接已关闭');
@@ -385,24 +414,28 @@ export function useWebRTCConnectionCore(
console.log('[ConnectionCore] 🔄 WebRTC正在连接中...');
stateManager.updateState({ isPeerConnected: false });
// 设置 P2P 连接超时15 秒后如果还没连上就降级
// 设置 P2P 连接超时10 秒后如果还没连上就降级
if (p2pFailureTimeout.current) {
clearTimeout(p2pFailureTimeout.current);
}
p2pFailureTimeout.current = setTimeout(() => {
if (pcRef.current && pcRef.current.connectionState !== 'connected') {
console.log('[ConnectionCore] ⏰ P2P 连接超时15秒),启动中继降级');
initiateRelayFallback();
console.log('[ConnectionCore] ⏰ P2P 连接超时10秒),启动中继降级');
initiateRelayFallbackRef.current();
}
}, 15000);
}, 10000);
break;
case 'connected':
console.log('[ConnectionCore] 🎉 WebRTC P2P连接已完全建立可以进行媒体传输');
// 清除降级定时器
// 清除所有降级定时器
if (p2pFailureTimeout.current) {
clearTimeout(p2pFailureTimeout.current);
p2pFailureTimeout.current = null;
}
if (iceDisconnectedTimeout.current) {
clearTimeout(iceDisconnectedTimeout.current);
iceDisconnectedTimeout.current = null;
}
// 确保所有连接状态都正确更新
stateManager.updateState({
isWebSocketConnected: true,
@@ -434,7 +467,7 @@ export function useWebRTCConnectionCore(
if (!isUserDisconnecting.current) {
console.log('[ConnectionCore] 启动中继降级');
stateManager.updateState({ isPeerConnected: false });
initiateRelayFallback();
initiateRelayFallbackRef.current();
}
break;
case 'disconnected':
@@ -457,7 +490,7 @@ export function useWebRTCConnectionCore(
console.log('[ConnectionCore] ✅ PeerConnection创建完成角色:', role, '是否重新连接:', isReconnect);
return pc;
}, [stateManager, dataChannelManager, initiateRelayFallback]);
}, [stateManager, dataChannelManager]);
// 连接到房间
const connect = useCallback(async (roomCode: string, role: 'sender' | 'receiver') => {

View File

@@ -1,499 +1,389 @@
import { useRef, useCallback } from 'react';
import { WebRTCStateManager } from '../ui/webRTCStore';
import type { WebRTCMessage, MessageHandler, DataHandler, Unsubscribe } from './types';
// 消息类型
export interface WebRTCMessage {
type: string;
payload: any;
channel?: string;
}
// 消息和数据处理器类型
export type MessageHandler = (message: WebRTCMessage) => void;
export type DataHandler = (data: ArrayBuffer) => void;
// Re-export types for backward compatibility
export type { WebRTCMessage, MessageHandler, DataHandler };
/**
* WebRTC 数据通道管理器
* WebRTC 数据通道管理器接口
* 负责数据通道的创建和管理,支持 P2P DataChannel 和 WS Relay 两种传输模式
*/
export interface WebRTCDataChannelManager {
// 创建数据通道 (P2P 模式)
// ── 通道生命周期 ──
/** 创建 P2P 数据通道 */
createDataChannel: (pc: RTCPeerConnection, role: 'sender' | 'receiver', isReconnect?: boolean) => void;
// 切换到 WS 中继模式(仅设置发送引用,不设置事件监听)
/** 切换到 WS 中继模式(仅设置发送引用)*/
switchToRelay: (relayWs: WebSocket) => void;
// 关闭中继连接
/** 关闭中继连接 */
closeRelay: () => void;
// 处理中继收到的数据消息(由 ConnectionCore 的 onmessage 调用)
handleRelayMessage: (event: MessageEvent) => void;
// 发送消息(自动选择可用通道)
// ── 消息收发 ──
/** 发送 JSON 消息自动选择可用通道P2P 优先Relay 降级)*/
sendMessage: (message: WebRTCMessage, channel?: string) => boolean;
// 发送二进制数据(自动选择可用通道)
/** 发送二进制数据(自动选择可用通道)*/
sendData: (data: ArrayBuffer) => boolean;
// 注册消息处理器
registerMessageHandler: (channel: string, handler: MessageHandler) => () => void;
// 注册数据处理器
registerDataHandler: (channel: string, handler: DataHandler) => () => void;
// 获取数据通道状态(兼容 RTCDataChannelState
/** 处理中继收到的数据(由 ConnectionCore 的 onmessage 转发)*/
handleRelayMessage: (event: MessageEvent) => void;
// ── 处理器注册 ──
/** 注册 JSON 消息处理器,返回清理函数 */
registerMessageHandler: (channel: string, handler: MessageHandler) => Unsubscribe;
/** 注册二进制数据处理器,返回清理函数 */
registerDataHandler: (channel: string, handler: DataHandler) => Unsubscribe;
// ── 状态查询 ──
/** 获取通道状态(综合 P2P + Relay*/
getChannelState: () => RTCDataChannelState;
// 获取当前缓冲区大小
/** 获取当前发送缓冲区大小 */
getBufferedAmount: () => number;
// 等待缓冲区排空到指定阈值以下
/** 等待缓冲区排空到指定阈值以下 */
waitForBufferDrain: (threshold?: number) => Promise<void>;
// 处理数据通道消息 (P2P)
handleDataChannelMessage: (event: MessageEvent) => void;
}
/**
* WebRTC 数据通道管理 Hook
* 负责数据通道的创建和管理,处理数据通道消息的发送和接收
* 支持 P2P DataChannel 和 WS Relay 两种传输模式,对上层透明
*
* 核心职责:
* 1. P2P DataChannel 创建sender createDataChannel / receiver ondatachannel
* 2. Relay WebSocket 透明切换
* 3. 统一的消息 / 二进制分发Map<channel, handler>
* 4. 背压控制bufferedAmount + waitForBufferDrain
*/
export function useWebRTCDataChannelManager(
stateManager: WebRTCStateManager
): WebRTCDataChannelManager {
const dcRef = useRef<RTCDataChannel | null>(null);
// WS 中继通道
const relayWsRef = useRef<WebSocket | null>(null);
// 多通道消息处理器
// 处理器注册表
const messageHandlers = useRef<Map<string, MessageHandler>>(new Map());
const dataHandlers = useRef<Map<string, DataHandler>>(new Map());
// 判断当前是否处于中继模式
// ────────────────────────────────────────
// 内部工具
// ────────────────────────────────────────
const isRelayMode = useCallback(() => {
return relayWsRef.current !== null && relayWsRef.current.readyState === WebSocket.OPEN;
}, []);
// 判断 P2P 数据通道是否可用
const isP2PAvailable = useCallback(() => {
return dcRef.current !== null && dcRef.current.readyState === 'open';
}, []);
// 创建数据通道 (P2P 模式)
const createDataChannel = useCallback((
pc: RTCPeerConnection,
role: 'sender' | 'receiver',
isReconnect: boolean = false
// ── 统一消息分发 ──
const dispatchJsonMessage = useCallback((message: WebRTCMessage) => {
if (message.channel) {
const handler = messageHandlers.current.get(message.channel);
if (handler) handler(message);
} else {
// 无 channel 标记则广播给所有处理器
messageHandlers.current.forEach(handler => handler(message));
}
}, []);
const dispatchBinaryData = useCallback((data: ArrayBuffer) => {
// 优先分发给 file-transfer 处理器(最常见的二进制消费者)
const fileHandler = dataHandlers.current.get('file-transfer');
if (fileHandler) {
fileHandler(data);
return;
}
// 降级:分发给第一个注册的处理器
const firstHandler = dataHandlers.current.values().next().value;
if (firstHandler) {
firstHandler(data);
}
}, []);
/**
* 统一入站消息处理P2P 和 Relay 共用)
*/
const dispatchIncoming = useCallback((event: MessageEvent) => {
if (typeof event.data === 'string') {
try {
const message = JSON.parse(event.data) as WebRTCMessage;
dispatchJsonMessage(message);
} catch (error) {
console.error('[DataChannel] 解析消息失败:', error);
}
} else if (event.data instanceof ArrayBuffer) {
dispatchBinaryData(event.data);
} else if (event.data instanceof Blob) {
// WebSocket 某些浏览器 / 环境下返回 Blob 而非 ArrayBuffer
event.data.arrayBuffer().then((buffer: ArrayBuffer) => {
dispatchBinaryData(buffer);
});
}
}, [dispatchJsonMessage, dispatchBinaryData]);
// ────────────────────────────────────────
// DataChannel 生命周期(去重 sender/receiver 共享逻辑)
// ────────────────────────────────────────
/**
* 数据通道打开后的统一处理
*/
const handleChannelOpen = useCallback((
dataChannel: RTCDataChannel,
role: string,
isReconnect: boolean
) => {
console.log('[DataChannelManager] 创建数据通道...', { role, isReconnect });
// 如果已经存在数据通道,先关闭它
console.log(`[DataChannel] 数据通道已打开 (${role})`);
// P2P 恢复时关闭中继
if (relayWsRef.current) {
console.log('[DataChannel] P2P 恢复,关闭中继通道');
relayWsRef.current.close();
relayWsRef.current = null;
stateManager.updateState({ transportMode: 'p2p' });
}
stateManager.updateState({
isWebSocketConnected: true,
isConnected: true,
isPeerConnected: true,
error: null,
isConnecting: false,
canRetry: false,
});
// 重连后请求数据同步
if (isReconnect) {
console.log(`[DataChannel] ${role} 重连,发送数据同步请求`);
setTimeout(() => {
if (dataChannel.readyState === 'open') {
dataChannel.send(JSON.stringify({
type: 'sync-request',
payload: { timestamp: Date.now() },
}));
}
}, 300);
}
}, [stateManager]);
/**
* 数据通道错误的统一处理
*/
const handleChannelError = useCallback((
dataChannel: RTCDataChannel,
pc: RTCPeerConnection | null,
role: string
) => {
let errorMessage = '数据通道连接失败';
let shouldRetry = false;
switch (dataChannel.readyState) {
case 'connecting':
errorMessage = '数据通道正在连接中,请稍候...';
shouldRetry = true;
break;
case 'closing':
errorMessage = '数据通道正在关闭,连接即将断开';
break;
case 'closed':
errorMessage = '数据通道已关闭P2P 连接失败';
shouldRetry = true;
break;
default:
if (pc) {
switch (pc.connectionState) {
case 'failed':
errorMessage = 'P2P 连接失败,可能是网络防火墙阻止了连接';
shouldRetry = true;
break;
case 'disconnected':
errorMessage = 'P2P 连接已断开,网络可能不稳定';
shouldRetry = true;
break;
default:
errorMessage = '数据通道连接失败,可能是网络环境受限';
shouldRetry = true;
}
}
}
console.error(`[DataChannel] 错误 (${role}) - 状态: ${dataChannel.readyState}, ${errorMessage}`);
// 已在中继模式则不更新错误状态
if (!isRelayMode()) {
stateManager.updateState({
error: errorMessage,
isConnecting: false,
isPeerConnected: false,
canRetry: shouldRetry,
});
}
}, [stateManager, isRelayMode]);
/**
* 为 DataChannel 绑定事件处理器sender / receiver 共用)
*/
const setupChannelHandlers = useCallback((
dataChannel: RTCDataChannel,
pc: RTCPeerConnection,
role: string,
isReconnect: boolean
) => {
dataChannel.onopen = () => handleChannelOpen(dataChannel, role, isReconnect);
dataChannel.onmessage = dispatchIncoming;
dataChannel.onerror = () => handleChannelError(dataChannel, pc, role);
}, [handleChannelOpen, dispatchIncoming, handleChannelError]);
// ── 创建数据通道 ──
const createDataChannel = useCallback((
pc: RTCPeerConnection,
role: 'sender' | 'receiver',
isReconnect: boolean = false,
) => {
console.log('[DataChannel] 创建数据通道...', { role, isReconnect });
// 关闭已有通道
if (dcRef.current) {
console.log('[DataChannelManager] 关闭已存在的数据通道');
console.log('[DataChannel] 关闭已存在的数据通道');
dcRef.current.close();
dcRef.current = null;
}
// 数据通道处理
if (role === 'sender') {
const dataChannel = pc.createDataChannel('shared-channel', {
ordered: true
});
dcRef.current = dataChannel;
dataChannel.onopen = () => {
console.log('[DataChannelManager] 数据通道已打开 (发送方)');
// 如果之前在中继模式,切回 P2P
if (relayWsRef.current) {
console.log('[DataChannelManager] P2P 恢复,关闭中继通道');
relayWsRef.current.close();
relayWsRef.current = null;
stateManager.updateState({ transportMode: 'p2p' });
}
// 确保所有连接状态都正确更新
stateManager.updateState({
isWebSocketConnected: true,
isConnected: true,
isPeerConnected: true,
error: null,
isConnecting: false,
canRetry: false
});
// 如果是重新连接,触发数据同步
if (isReconnect) {
console.log('[DataChannelManager] 发送方重新连接,数据通道已打开,准备同步数据');
setTimeout(() => {
if (dataChannel.readyState === 'open') {
dataChannel.send(JSON.stringify({
type: 'sync-request',
payload: { timestamp: Date.now() }
}));
console.log('[DataChannelManager] 发送方发送数据同步请求');
}
}, 300);
}
};
dataChannel.onmessage = handleDataChannelMessage;
dataChannel.onerror = (error) => {
console.error('[DataChannelManager] 数据通道错误:', error);
let errorMessage = '数据通道连接失败';
let shouldRetry = false;
switch (dataChannel.readyState) {
case 'connecting':
errorMessage = '数据通道正在连接中,请稍候...';
shouldRetry = true;
break;
case 'closing':
errorMessage = '数据通道正在关闭,连接即将断开';
break;
case 'closed':
errorMessage = '数据通道已关闭P2P连接失败';
shouldRetry = true;
break;
default:
if (pc) {
switch (pc.connectionState) {
case 'failed':
errorMessage = 'P2P连接失败可能是网络防火墙阻止了连接请尝试切换网络或使用VPN';
shouldRetry = true;
break;
case 'disconnected':
errorMessage = 'P2P连接已断开网络可能不稳定';
shouldRetry = true;
break;
default:
errorMessage = '数据通道连接失败,可能是网络环境受限';
shouldRetry = true;
}
}
}
console.error(`[DataChannelManager] 数据通道详细错误 - 状态: ${dataChannel.readyState}, 消息: ${errorMessage}, 建议重试: ${shouldRetry}`);
// 如果已经在中继模式,不更新错误状态
if (!isRelayMode()) {
stateManager.updateState({
error: errorMessage,
isConnecting: false,
isPeerConnected: false,
canRetry: shouldRetry
});
}
};
const dc = pc.createDataChannel('shared-channel', { ordered: true });
dcRef.current = dc;
setupChannelHandlers(dc, pc, '发送方', isReconnect);
} else {
pc.ondatachannel = (event) => {
const dataChannel = event.channel;
dcRef.current = dataChannel;
dataChannel.onopen = () => {
console.log('[DataChannelManager] 数据通道已打开 (接收方)');
// 如果之前在中继模式,切回 P2P
if (relayWsRef.current) {
console.log('[DataChannelManager] P2P 恢复,关闭中继通道');
relayWsRef.current.close();
relayWsRef.current = null;
stateManager.updateState({ transportMode: 'p2p' });
}
stateManager.updateState({
isWebSocketConnected: true,
isConnected: true,
isPeerConnected: true,
error: null,
isConnecting: false,
canRetry: false
});
if (isReconnect) {
console.log('[DataChannelManager] 接收方重新连接,数据通道已打开,准备同步数据');
setTimeout(() => {
if (dataChannel.readyState === 'open') {
dataChannel.send(JSON.stringify({
type: 'sync-request',
payload: { timestamp: Date.now() }
}));
console.log('[DataChannelManager] 接收方发送数据同步请求');
}
}, 300);
}
};
dataChannel.onmessage = handleDataChannelMessage;
dataChannel.onerror = (error) => {
console.error('[DataChannelManager] 数据通道错误 (接收方):', error);
let errorMessage = '数据通道连接失败';
let shouldRetry = false;
switch (dataChannel.readyState) {
case 'connecting':
errorMessage = '数据通道正在连接中,请稍候...';
shouldRetry = true;
break;
case 'closing':
errorMessage = '数据通道正在关闭,连接即将断开';
break;
case 'closed':
errorMessage = '数据通道已关闭P2P连接失败';
shouldRetry = true;
break;
default:
if (pc) {
switch (pc.connectionState) {
case 'failed':
errorMessage = 'P2P连接失败可能是网络防火墙阻止了连接请尝试切换网络或使用VPN';
shouldRetry = true;
break;
case 'disconnected':
errorMessage = 'P2P连接已断开网络可能不稳定';
shouldRetry = true;
break;
default:
errorMessage = '数据通道连接失败,可能是网络环境受限';
shouldRetry = true;
}
}
}
console.error(`[DataChannelManager] 数据通道详细错误 (接收方) - 状态: ${dataChannel.readyState}, 消息: ${errorMessage}, 建议重试: ${shouldRetry}`);
// 如果已经在中继模式,不更新错误状态
if (!isRelayMode()) {
stateManager.updateState({
error: errorMessage,
isConnecting: false,
isPeerConnected: false,
canRetry: shouldRetry
});
}
};
const dc = event.channel;
dcRef.current = dc;
setupChannelHandlers(dc, pc, '接收方', isReconnect);
};
}
console.log('[DataChannelManager] 数据通道创建完成,角色:', role, '是否重新连接:', isReconnect);
}, [stateManager]);
console.log('[DataChannel] 数据通道创建完成,角色:', role);
}, [setupChannelHandlers]);
// ── Relay 管理 ──
// 切换到 WS 中继模式 - 仅设置发送引用
// 事件监听由 ConnectionCore 的 initiateRelayFallback 统一管理
const switchToRelay = useCallback((relayWs: WebSocket) => {
console.log('[DataChannelManager] 🔄 切换到 WS 中继模式(设置发送引用)');
console.log('[DataChannel] 🔄 切换到 WS 中继模式');
relayWsRef.current = relayWs;
}, []);
// 关闭中继连接
const closeRelay = useCallback(() => {
if (relayWsRef.current) {
console.log('[DataChannelManager] 关闭中继连接');
console.log('[DataChannel] 关闭中继连接');
relayWsRef.current.close();
relayWsRef.current = null;
}
}, []);
// 处理中继收到的数据消息(由 ConnectionCore 分发调用)
const handleRelayMessage = useCallback((event: MessageEvent) => {
if (typeof event.data === 'string') {
try {
const message = JSON.parse(event.data) as WebRTCMessage;
console.log('[DataChannelManager:Relay] 收到中继消息:', message.type, message.channel || 'default');
dispatchIncoming(event);
}, [dispatchIncoming]);
if (message.channel) {
const handler = messageHandlers.current.get(message.channel);
if (handler) {
handler(message);
}
} else {
messageHandlers.current.forEach(handler => handler(message));
}
} catch (error) {
console.error('[DataChannelManager:Relay] 解析中继消息失败:', error);
}
} else if (event.data instanceof ArrayBuffer) {
console.log('[DataChannelManager:Relay] 收到中继二进制数据:', event.data.byteLength, 'bytes');
const fileHandler = dataHandlers.current.get('file-transfer');
if (fileHandler) {
fileHandler(event.data);
} else {
const firstHandler = dataHandlers.current.values().next().value;
if (firstHandler) {
firstHandler(event.data);
}
}
} else if (event.data instanceof Blob) {
// WebSocket 某些情况下收到 Blob
event.data.arrayBuffer().then((buffer: ArrayBuffer) => {
console.log('[DataChannelManager:Relay] 收到中继二进制数据(Blob):', buffer.byteLength, 'bytes');
const fileHandler = dataHandlers.current.get('file-transfer');
if (fileHandler) {
fileHandler(buffer);
} else {
const firstHandler = dataHandlers.current.values().next().value;
if (firstHandler) {
firstHandler(buffer);
}
}
});
}
}, []);
// ────────────────────────────────────────
// 消息发送P2P 优先Relay 降级)
// ────────────────────────────────────────
// 处理数据通道消息 (P2P 模式)
const handleDataChannelMessage = useCallback((event: MessageEvent) => {
if (typeof event.data === 'string') {
try {
const message = JSON.parse(event.data) as WebRTCMessage;
console.log('[DataChannelManager] 收到消息:', message.type, message.channel || 'default');
if (message.channel) {
const handler = messageHandlers.current.get(message.channel);
if (handler) {
handler(message);
}
} else {
messageHandlers.current.forEach(handler => handler(message));
}
} catch (error) {
console.error('[DataChannelManager] 解析消息失败:', error);
}
} else if (event.data instanceof ArrayBuffer) {
console.log('[DataChannelManager] 收到数据:', event.data.byteLength, 'bytes');
const fileHandler = dataHandlers.current.get('file-transfer');
if (fileHandler) {
fileHandler(event.data);
} else {
const firstHandler = dataHandlers.current.values().next().value;
if (firstHandler) {
firstHandler(event.data);
}
}
}
}, []);
// 发送消息 - 自动选择可用通道P2P 优先,否则用中继)
const sendMessage = useCallback((message: WebRTCMessage, channel?: string) => {
const messageWithChannel = channel ? { ...message, channel } : message;
const jsonStr = JSON.stringify(messageWithChannel);
// 优先使用 P2P DataChannel
if (isP2PAvailable()) {
try {
dcRef.current!.send(jsonStr);
console.log('[DataChannelManager:P2P] 发送消息:', message.type, channel || 'default');
return true;
} catch (error) {
console.error('[DataChannelManager:P2P] 发送消息失败:', error);
// P2P 发送失败,尝试中继
console.error('[DataChannel:P2P] 发送消息失败:', error);
}
}
// 回退到 WS 中继
if (isRelayMode()) {
try {
relayWsRef.current!.send(jsonStr);
console.log('[DataChannelManager:Relay] 发送消息:', message.type, channel || 'default');
return true;
} catch (error) {
console.error('[DataChannelManager:Relay] 发送消息失败:', error);
console.error('[DataChannel:Relay] 发送消息失败:', error);
return false;
}
}
console.error('[DataChannelManager] 没有可用的传输通道');
console.error('[DataChannel] 没有可用的传输通道');
return false;
}, [isP2PAvailable, isRelayMode]);
// 发送二进制数据 - 自动选择可用通道
const sendData = useCallback((data: ArrayBuffer) => {
// 优先使用 P2P DataChannel
if (isP2PAvailable()) {
try {
dcRef.current!.send(data);
console.log('[DataChannelManager:P2P] 发送数据:', data.byteLength, 'bytes');
return true;
} catch (error) {
console.error('[DataChannelManager:P2P] 发送数据失败:', error);
console.error('[DataChannel:P2P] 发送数据失败:', error);
}
}
// 回退到 WS 中继
if (isRelayMode()) {
try {
relayWsRef.current!.send(data);
console.log('[DataChannelManager:Relay] 发送数据:', data.byteLength, 'bytes');
return true;
} catch (error) {
console.error('[DataChannelManager:Relay] 发送数据失败:', error);
console.error('[DataChannel:Relay] 发送数据失败:', error);
return false;
}
}
console.error('[DataChannelManager] 没有可用的传输通道');
console.error('[DataChannel] 没有可用的传输通道');
return false;
}, [isP2PAvailable, isRelayMode]);
// 注册消息处理器
const registerMessageHandler = useCallback((channel: string, handler: MessageHandler) => {
console.log('[DataChannelManager] 注册消息处理器:', channel);
messageHandlers.current.set(channel, handler);
// ────────────────────────────────────────
// 处理器注册
// ────────────────────────────────────────
const registerMessageHandler = useCallback((channel: string, handler: MessageHandler): Unsubscribe => {
console.log('[DataChannel] 注册消息处理器:', channel);
messageHandlers.current.set(channel, handler);
return () => {
console.log('[DataChannelManager] 取消注册消息处理器:', channel);
console.log('[DataChannel] 取消注册消息处理器:', channel);
messageHandlers.current.delete(channel);
};
}, []);
// 注册数据处理器
const registerDataHandler = useCallback((channel: string, handler: DataHandler) => {
console.log('[DataChannelManager] 注册数据处理器:', channel);
const registerDataHandler = useCallback((channel: string, handler: DataHandler): Unsubscribe => {
console.log('[DataChannel] 注册数据处理器:', channel);
dataHandlers.current.set(channel, handler);
return () => {
console.log('[DataChannelManager] 取消注册数据处理器:', channel);
console.log('[DataChannel] 取消注册数据处理器:', channel);
dataHandlers.current.delete(channel);
};
}, []);
// 获取数据通道状态 - 综合 P2P 和 Relay 状态
// ────────────────────────────────────────
// 状态查询与背压控制
// ────────────────────────────────────────
const getChannelState = useCallback((): RTCDataChannelState => {
// P2P 通道打开时优先返回
if (dcRef.current?.readyState === 'open') {
return 'open';
}
// 中继模式可用
if (relayWsRef.current?.readyState === WebSocket.OPEN) {
return 'open';
}
// P2P 通道正在连接
if (dcRef.current?.readyState === 'connecting') {
return 'connecting';
}
if (dcRef.current?.readyState === 'open') return 'open';
if (relayWsRef.current?.readyState === WebSocket.OPEN) return 'open';
if (dcRef.current?.readyState === 'connecting') return 'connecting';
return dcRef.current?.readyState || 'closed';
}, []);
// 获取当前缓冲区大小
const getBufferedAmount = useCallback((): number => {
if (dcRef.current && dcRef.current.readyState === 'open') {
return dcRef.current.bufferedAmount;
}
if (relayWsRef.current && relayWsRef.current.readyState === WebSocket.OPEN) {
return relayWsRef.current.bufferedAmount;
}
if (dcRef.current?.readyState === 'open') return dcRef.current.bufferedAmount;
if (relayWsRef.current?.readyState === WebSocket.OPEN) return relayWsRef.current.bufferedAmount;
return 0;
}, []);
// 等待缓冲区排空到阈值以下
const waitForBufferDrain = useCallback((threshold: number = 1 * 1024 * 1024): Promise<void> => {
// P2P DataChannel
if (dcRef.current && dcRef.current.readyState === 'open') {
if (dcRef.current.bufferedAmount <= threshold) {
return Promise.resolve();
}
// P2P DataChannel — 使用 bufferedamountlow 事件
if (dcRef.current?.readyState === 'open') {
if (dcRef.current.bufferedAmount <= threshold) return Promise.resolve();
return new Promise<void>((resolve) => {
const dc = dcRef.current!;
dc.bufferedAmountLowThreshold = threshold;
@@ -502,18 +392,13 @@ export function useWebRTCDataChannelManager(
resolve();
};
dc.addEventListener('bufferedamountlow', onLow);
// 安全超时,防止死等
setTimeout(() => {
dc.removeEventListener('bufferedamountlow', onLow);
resolve();
}, 5000);
setTimeout(() => { dc.removeEventListener('bufferedamountlow', onLow); resolve(); }, 5000);
});
}
// Relay WebSocket
if (relayWsRef.current && relayWsRef.current.readyState === WebSocket.OPEN) {
if (relayWsRef.current.bufferedAmount <= threshold) {
return Promise.resolve();
}
// Relay WebSocket — 轮询 bufferedAmount
if (relayWsRef.current?.readyState === WebSocket.OPEN) {
if (relayWsRef.current.bufferedAmount <= threshold) return Promise.resolve();
return new Promise<void>((resolve) => {
const checkInterval = setInterval(() => {
if (!relayWsRef.current || relayWsRef.current.readyState !== WebSocket.OPEN ||
@@ -522,13 +407,10 @@ export function useWebRTCDataChannelManager(
resolve();
}
}, 50);
// 安全超时
setTimeout(() => {
clearInterval(checkInterval);
resolve();
}, 5000);
setTimeout(() => { clearInterval(checkInterval); resolve(); }, 5000);
});
}
return Promise.resolve();
}, []);
@@ -544,6 +426,5 @@ export function useWebRTCDataChannelManager(
getChannelState,
getBufferedAmount,
waitForBufferDrain,
handleDataChannelMessage,
};
}
}

View File

@@ -1,34 +1,42 @@
import { useCallback, useRef } from 'react';
import { WebRTCStateManager } from '../ui/webRTCStore';
import type { TrackHandler, Unsubscribe } from './types';
/**
* WebRTC 媒体轨道管理器
* 负责媒体轨道的添加和移除
* WebRTC 媒体轨道管理器接口
*/
export interface WebRTCTrackManager {
// 添加媒体轨道
// 添加媒体轨道到 PeerConnection
addTrack: (track: MediaStreamTrack, stream: MediaStream) => RTCRtpSender | null;
// 移除媒体轨道
removeTrack: (sender: RTCRtpSender) => void;
// 设置轨道处理器
onTrack: (handler: (event: RTCTrackEvent) => void) => void;
/**
* 注册轨道事件处理器(多监听器模式)
* - 多个消费者可同时注册(桌面共享处理 video语音通话处理 audio
* - 返回清理函数,组件卸载时务必调用
*/
registerTrackHandler: (key: string, handler: TrackHandler) => Unsubscribe;
// 创建 Offer
// 创建并发送 SDP Offer用于初始连接
createOffer: (pc: RTCPeerConnection, ws: WebSocket) => Promise<void>;
// 立即创建offer用于媒体轨道添加后的重新协商)
// 立即创建 Offer用于媒体轨道变更后的重新协商)
createOfferNow: (pc: RTCPeerConnection, ws: WebSocket) => Promise<boolean>;
// 内部方法,供核心连接管理器调用
// ── 内部方法,仅供 ConnectionCore 调用 ──
setPeerConnection: (pc: RTCPeerConnection | null) => void;
setWebSocket: (ws: WebSocket | null) => void;
}
/**
* WebRTC 媒体轨道管理 Hook
* 负责媒体轨道的添加和移除,处理轨道事件,提供 createOffer 功能
*
* 职责:
* 1. 管理 RTCRtpSender添加 / 移除轨道)
* 2. 复合分发 ontrack 事件给多个消费者
* 3. 创建 SDP Offer 并通过信令 WebSocket 发送
*/
export function useWebRTCTrackManager(
stateManager: WebRTCStateManager
@@ -36,78 +44,82 @@ export function useWebRTCTrackManager(
const pcRef = useRef<RTCPeerConnection | null>(null);
const wsRef = useRef<WebSocket | null>(null);
// 创建 Offer
// 多监听器key → handler如 'desktop-share' → handler, 'voice-chat' → handler
const trackHandlers = useRef<Map<string, TrackHandler>>(new Map());
// ── 复合分发:将 ontrack 事件广播给所有已注册的处理器 ──
const dispatchTrackEvent = useCallback((event: RTCTrackEvent) => {
const handlerCount = trackHandlers.current.size;
if (handlerCount === 0) {
console.warn('[TrackManager] 收到轨道事件但无处理器注册:', event.track.kind, event.track.id);
return;
}
console.log(`[TrackManager] 📡 分发轨道事件 (${event.track.kind}) 给 ${handlerCount} 个处理器`);
trackHandlers.current.forEach((handler, key) => {
try {
handler(event);
} catch (error) {
console.error(`[TrackManager] 轨道处理器 "${key}" 执行出错:`, error);
}
});
}, []);
// ── SDP Offer 创建 ──
const createOffer = useCallback(async (pc: RTCPeerConnection, ws: WebSocket) => {
try {
console.log('[TrackManager] 🎬 开始创建offer当前轨道数:', pc.getSenders().length);
// 确保连接状态稳定
if (pc.connectionState !== 'connecting' && pc.connectionState !== 'new') {
console.warn('[TrackManager] ⚠️ PeerConnection状态异常:', pc.connectionState);
}
console.log('[TrackManager] 🎬 开始创建 Offer当前轨道数:', pc.getSenders().length);
const offer = await pc.createOffer({
offerToReceiveAudio: true, // 改为true以支持音频接收
offerToReceiveVideo: true, // 改为true以支持视频接收
offerToReceiveAudio: true,
offerToReceiveVideo: true,
});
console.log('[TrackManager] 📝 Offer创建成功设置本地描述...');
await pc.setLocalDescription(offer);
console.log('[TrackManager] ✅ 本地描述设置完成');
// 增加超时时间到5秒给ICE候选收集更多时间
// ICE 收集超时保护
const iceTimeout = setTimeout(() => {
console.log('[TrackManager] ⏱️ ICE收集超时发送当前offer');
if (ws.readyState === WebSocket.OPEN && pc.localDescription) {
ws.send(JSON.stringify({ type: 'offer', payload: pc.localDescription }));
console.log('[TrackManager] 📤 发送 offer (超时发送)');
console.log('[TrackManager] 📤 发送 Offer (ICE 收集超时)');
}
}, 5000);
// 如果ICE收集已经完成立即发送
if (pc.iceGatheringState === 'complete') {
clearTimeout(iceTimeout);
if (ws.readyState === WebSocket.OPEN && pc.localDescription) {
ws.send(JSON.stringify({ type: 'offer', payload: pc.localDescription }));
console.log('[TrackManager] 📤 发送 offer (ICE收集完成)');
console.log('[TrackManager] 📤 发送 Offer (ICE完成)');
}
} else {
console.log('[TrackManager] 🧊 等待ICE候选收集...');
// 监听ICE收集状态变化
pc.onicegatheringstatechange = () => {
console.log('[TrackManager] 🧊 ICE收集状态变化:', pc.iceGatheringState);
if (pc.iceGatheringState === 'complete') {
clearTimeout(iceTimeout);
if (ws.readyState === WebSocket.OPEN && pc.localDescription) {
ws.send(JSON.stringify({ type: 'offer', payload: pc.localDescription }));
console.log('[TrackManager] 📤 发送 offer (ICE收集完成)');
console.log('[TrackManager] 📤 发送 Offer (ICE 收集完成)');
}
}
};
// 同时监听ICE候选事件用于调试
pc.onicecandidate = (event) => {
if (event.candidate) {
console.log('[TrackManager] 🧊 收到ICE候选:', event.candidate.candidate.substring(0, 50) + '...');
} else {
console.log('[TrackManager] 🏁 ICE候选收集完成');
console.log('[TrackManager] 🧊 ICE 候选:', event.candidate.candidate.substring(0, 50) + '...');
}
};
}
} catch (error) {
console.error('[TrackManager] ❌ 创建 offer 失败:', error);
console.error('[TrackManager] ❌ 创建 Offer 失败:', error);
stateManager.updateState({ error: '创建连接失败', isConnecting: false, canRetry: true });
}
}, [stateManager]);
// 添加媒体轨道
// ── 轨道操作 ──
const addTrack = useCallback((track: MediaStreamTrack, stream: MediaStream) => {
const pc = pcRef.current;
if (!pc) {
console.error('[TrackManager] PeerConnection 不可用');
return null;
}
try {
return pc.addTrack(track, stream);
} catch (error) {
@@ -116,14 +128,12 @@ export function useWebRTCTrackManager(
}
}, []);
// 移除媒体轨道
const removeTrack = useCallback((sender: RTCRtpSender) => {
const pc = pcRef.current;
if (!pc) {
console.error('[TrackManager] PeerConnection 不可用');
return;
}
try {
pc.removeTrack(sender);
} catch (error) {
@@ -131,88 +141,44 @@ export function useWebRTCTrackManager(
}
}, []);
// 设置轨道处理器
const onTrack = useCallback((handler: (event: RTCTrackEvent) => void) => {
const pc = pcRef.current;
if (!pc) {
console.warn('[TrackManager] PeerConnection 尚未准备就绪将在连接建立后设置onTrack');
// 检查WebSocket连接状态只有连接后才尝试设置
const state = stateManager.getState();
if (!state.isWebSocketConnected) {
console.log('[TrackManager] WebSocket未连接等待连接建立...');
return;
}
// 延迟设置等待PeerConnection准备就绪
let retryCount = 0;
const maxRetries = 50; // 增加重试次数到50次即5秒
const checkAndSetTrackHandler = () => {
const currentPc = pcRef.current;
if (currentPc) {
console.log('[TrackManager] ✅ PeerConnection 已准备就绪设置onTrack处理器');
currentPc.ontrack = handler;
// 如果已经有远程轨道,立即触发处理
const receivers = currentPc.getReceivers();
console.log(`[TrackManager] 📡 当前有 ${receivers.length} 个接收器`);
receivers.forEach(receiver => {
if (receiver.track) {
console.log(`[TrackManager] 🎥 发现现有轨道: ${receiver.track.kind}, ${receiver.track.id}, 状态: ${receiver.track.readyState}`);
}
});
} else {
retryCount++;
if (retryCount < maxRetries) {
// 每5次重试输出一次日志减少日志数量
if (retryCount % 5 === 0) {
console.log(`[TrackManager] ⏳ 等待PeerConnection准备就绪... (尝试: ${retryCount}/${maxRetries})`);
}
setTimeout(checkAndSetTrackHandler, 100);
} else {
console.error('[TrackManager] ❌ PeerConnection 长时间未准备就绪,停止重试');
}
}
};
checkAndSetTrackHandler();
return;
}
console.log('[TrackManager] ✅ 立即设置onTrack处理器');
pc.ontrack = handler;
// 检查是否已有轨道
const receivers = pc.getReceivers();
console.log(`[TrackManager] 📡 当前有 ${receivers.length} 个接收器`);
receivers.forEach(receiver => {
if (receiver.track) {
console.log(`[TrackManager] 🎥 发现现有轨道: ${receiver.track.kind}, ${receiver.track.id}, 状态: ${receiver.track.readyState}`);
}
});
}, [stateManager]);
// ── 多监听器注册 ──
const registerTrackHandler = useCallback((key: string, handler: TrackHandler): Unsubscribe => {
console.log('[TrackManager] 注册轨道处理器:', key);
trackHandlers.current.set(key, handler);
return () => {
console.log('[TrackManager] 取消注册轨道处理器:', key);
trackHandlers.current.delete(key);
};
}, []);
// ── 重新协商 ──
// 立即创建offer用于媒体轨道添加后的重新协商
const createOfferNow = useCallback(async (pc: RTCPeerConnection, ws: WebSocket) => {
if (!pc || !ws) {
console.error('[TrackManager] PeerConnection 或 WebSocket 不可用');
return false;
}
try {
await createOffer(pc, ws);
return true;
} catch (error) {
console.error('[TrackManager] 创建 offer 失败:', error);
console.error('[TrackManager] 创建 Offer 失败:', error);
return false;
}
}, [createOffer]);
// 设置 PeerConnection 引用
// ── 内部引用设置(仅供 ConnectionCore 调用)──
const setPeerConnection = useCallback((pc: RTCPeerConnection | null) => {
pcRef.current = pc;
}, []);
// 新 PeerConnection 创建时,挂载复合轨道分发器
if (pc) {
pc.ontrack = dispatchTrackEvent;
}
}, [dispatchTrackEvent]);
// 设置 WebSocket 引用
const setWebSocket = useCallback((ws: WebSocket | null) => {
wsRef.current = ws;
}, []);
@@ -220,11 +186,10 @@ export function useWebRTCTrackManager(
return {
addTrack,
removeTrack,
onTrack,
registerTrackHandler,
createOffer,
createOfferNow,
// 内部方法,供核心连接管理器调用
setPeerConnection,
setWebSocket,
};
}
}

View File

@@ -1,4 +1,5 @@
import { useState, useRef, useCallback, useEffect } from 'react';
import type { WebRTCConnection } from '../connection/types';
import { useSharedWebRTCManager } from '../connection/useSharedWebRTCManager';
interface DesktopShareState {
@@ -11,8 +12,9 @@ interface DesktopShareState {
isWaitingForPeer: boolean; // 新增:是否等待对方连接
}
export function useDesktopShareBusiness() {
const webRTC = useSharedWebRTCManager();
export function useDesktopShareBusiness(externalConnection?: WebRTCConnection) {
const internalConnection = useSharedWebRTCManager();
const webRTC = externalConnection ?? internalConnection;
const [state, setState] = useState<DesktopShareState>({
isSharing: false,
isViewing: false,
@@ -42,24 +44,22 @@ export function useDesktopShareBusiness() {
}
}, [updateState]);
// 设置远程轨道处理器(始终监听)
// 设置远程轨道处理器(始终监听,支持与语音通话并存
useEffect(() => {
console.log('[DesktopShare] 🎧 设置远程轨道处理器');
webRTC.onTrack((event: RTCTrackEvent) => {
console.log('[DesktopShare] 🎥 收到远程轨道:', event.track.kind, event.track.id, '状态:', event.track.readyState);
console.log('[DesktopShare] 远程流数量:', event.streams.length);
console.log('[DesktopShare] 🎧 注册远程轨道处理器');
const unsubscribe = webRTC.registerTrackHandler('desktop-share', (event: RTCTrackEvent) => {
// 只处理视频轨道,音频由 VoiceChat 处理
if (event.track.kind !== 'video') return;
console.log('[DesktopShare] 🎥 收到远程视频轨道:', event.track.id, '状态:', event.track.readyState);
if (event.streams.length > 0) {
const remoteStream = event.streams[0];
console.log('[DesktopShare] 🎬 设置远程流,轨道数量:', remoteStream.getTracks().length);
remoteStream.getTracks().forEach(track => {
console.log('[DesktopShare] 远程轨道:', track.kind, track.id, '启用:', track.enabled, '状态:', track.readyState);
});
// 确保轨道已启用
remoteStream.getTracks().forEach(track => {
if (!track.enabled) {
console.log('[DesktopShare] 🔓 启用远程轨道:', track.id);
track.enabled = true;
}
});
@@ -67,25 +67,18 @@ export function useDesktopShareBusiness() {
handleRemoteStream(remoteStream);
} else {
console.warn('[DesktopShare] ⚠️ 收到轨道但没有关联的流');
// 尝试从轨道创建流
try {
const newStream = new MediaStream([event.track]);
console.log('[DesktopShare] 🔄 从轨道创建新流:', newStream.id);
// 确保轨道已启用
newStream.getTracks().forEach(track => {
if (!track.enabled) {
console.log('[DesktopShare] 🔓 启用新流中的轨道:', track.id);
track.enabled = true;
}
if (!track.enabled) track.enabled = true;
});
handleRemoteStream(newStream);
} catch (error) {
console.error('[DesktopShare] ❌ 从轨道创建流失败:', error);
}
}
});
return unsubscribe;
}, [webRTC, handleRemoteStream]);
// 获取桌面共享流

View File

@@ -1,5 +1,5 @@
import { useCallback, useEffect, useRef, useState } from 'react';
import { WebRTCConnection } from '../connection/useSharedWebRTCManager';
import type { WebRTCConnection } from '../connection/types';
import { useAudioVisualizer } from './useAudioVisualizer';
interface VoiceChatState {
@@ -96,10 +96,11 @@ export function useVoiceChatBusiness(connection: WebRTCConnection) {
}
};
// feature/ws 的 onTrack 返回 void直接设置 pc.ontrack
connection.onTrack(trackHandler);
// 注册轨道处理器(多监听器模式,与桌面共享并存
const unsubscribe = connection.registerTrackHandler('voice-chat', trackHandler);
return () => {
unsubscribe();
if (currentTrackRef.current) {
currentTrackRef.current.onended = null;
currentTrackRef.current.onmute = null;

View File

@@ -1,5 +1,5 @@
import { useState, useCallback, useRef, useEffect } from 'react';
import type { WebRTCConnection } from '../connection/useSharedWebRTCManager';
import type { WebRTCConnection } from '../connection/types';
import type { FileInfo } from '@/types';
// 文件传输状态
@@ -40,38 +40,26 @@ interface FileMetadata {
}
// 文件块信息
// 文件块元信息
interface FileChunk {
fileId: string;
chunkIndex: number;
totalChunks: number;
checksum?: string; // 数据校验和
}
// 块确认信息
interface ChunkAck {
fileId: string;
chunkIndex: number;
success: boolean;
checksum?: string;
}
// 传输状态
// 发送端传输状态
interface TransferStatus {
fileId: string;
fileName: string;
totalChunks: number;
sentChunks: Set<number>;
acknowledgedChunks: Set<number>;
failedChunks: Set<number>;
lastChunkTime: number;
retryCount: Map<number, number>;
averageSpeed: number; // KB/s
// 滑动窗口测速
speedWindowBytes: number; // 窗口内累计字节数
speedWindowStart: number; // 窗口开始时间
lastReportedSpeed: number; // 上次上报的速度 bytes/s
lastReportedEta: number; // 上次上报的 ETA
lastSpeedReportTime: number; // 上次上报速度的时间
speedWindowBytes: number;
speedWindowStart: number;
lastReportedSpeed: number; // bytes/s
lastReportedEta: number; // 秒
lastSpeedReportTime: number;
}
// 回调类型
@@ -82,9 +70,6 @@ type FileListReceivedCallback = (fileList: FileInfo[]) => void;
const CHANNEL_NAME = 'file-transfer';
const CHUNK_SIZE = 256 * 1024; // 256KB — WebRTC DataChannel 单次发送上限
const MAX_RETRIES = 5; // 最大重试次数(仅用于连接恢复)
const RETRY_DELAY = 1000; // 重试延迟(毫秒)
const ACK_TIMEOUT = 5000; // 完成确认超时(毫秒)
const SPEED_WINDOW_MS = 2000; // 速度计算滑动窗口 2 秒
const SPEED_REPORT_INTERVAL_MS = 1000; // 速度上报最小间隔 1 秒
const BUFFER_HIGH_WATER = 2 * 1024 * 1024; // 2MB — 发送背压阈值
@@ -220,12 +205,7 @@ export function useFileTransferBusiness(connection: WebRTCConnection) {
break;
case 'file-chunk-ack':
const ack: ChunkAck = message.payload;
console.log('收到块确认:', ack);
// 清除超时定时器
case 'file-chunk-ack':
// 保留消息处理以兼容旧版对端,但不再依赖逐块 ACK 做可靠性
// 兼容旧版对端,不再依赖逐块 ACKSCTP 保证可靠有序传输)
break;
}
}, [updateState]);
@@ -360,11 +340,7 @@ export function useFileTransferBusiness(connection: WebRTCConnection) {
fileName: file.name,
totalChunks,
sentChunks: new Set(),
acknowledgedChunks: new Set(),
failedChunks: new Set(),
lastChunkTime: nowInit,
retryCount: new Map(),
averageSpeed: 0,
speedWindowBytes: 0,
speedWindowStart: nowInit,
lastReportedSpeed: 0,
@@ -424,7 +400,6 @@ export function useFileTransferBusiness(connection: WebRTCConnection) {
if (windowElapsed >= SPEED_WINDOW_MS && windowElapsed > 0) {
const speedBps = (status.speedWindowBytes / windowElapsed) * 1000;
status.averageSpeed = speedBps / 1024;
status.lastReportedSpeed = speedBps;
const remainingBytes = Math.max(0, file.size - (chunkIndex + 1) * CHUNK_SIZE);
status.lastReportedEta = speedBps > 0 ? Math.max(0, remainingBytes / speedBps) : 0;

View File

@@ -1,13 +1,9 @@
import { useState, useCallback, useRef, useEffect } from 'react';
import type { WebRTCConnection } from '../connection/useSharedWebRTCManager';
import type { WebRTCConnection } from '../connection/types';
// 文本传输状态
// 文本传输业务状态(仅业务相关字段,连接状态直接从 connection 读取)
interface TextTransferState {
isConnecting: boolean;
isConnected: boolean;
isWebSocketConnected: boolean;
connectionError: string | null;
currentText: string; // 当前文本内容
currentText: string; // 对端同步过来的文本内容
isTyping: boolean; // 对方是否在输入
}
@@ -23,153 +19,91 @@ const CHANNEL_NAME = 'text-transfer';
*/
export function useTextTransferBusiness(connection: WebRTCConnection) {
const [state, setState] = useState<TextTransferState>({
isConnecting: false,
isConnected: false,
isWebSocketConnected: false,
connectionError: null,
currentText: '',
isTyping: false
isTyping: false,
});
// 回调引用
const textSyncCallbackRef = useRef<TextSyncCallback | null>(null);
const typingCallbackRef = useRef<TypingStatusCallback | null>(null);
// 更新状态的辅助函数
const updateState = useCallback((updates: Partial<TextTransferState>) => {
setState(prev => ({ ...prev, ...updates }));
}, []);
// 消息处理器
const handleMessage = useCallback((message: any) => {
if (!message.type.startsWith('text-')) return;
console.log('文本传输收到消息:', message.type, message);
switch (message.type) {
case 'text-sync':
// 实时文本同步 - 接收方看到发送方的实时编辑
if (message.payload && typeof message.payload.text === 'string') {
updateState({ currentText: message.payload.text });
// 触发文本同步回调
if (textSyncCallbackRef.current) {
textSyncCallbackRef.current(message.payload.text);
}
setState(prev => ({ ...prev, currentText: message.payload.text }));
textSyncCallbackRef.current?.(message.payload.text);
}
break;
case 'text-typing':
// 打字状态
if (typeof message.payload?.typing === 'boolean') {
updateState({ isTyping: message.payload.typing });
// 触发打字状态回调
if (typingCallbackRef.current) {
typingCallbackRef.current(message.payload.typing);
}
setState(prev => ({ ...prev, isTyping: message.payload.typing }));
typingCallbackRef.current?.(message.payload.typing);
}
break;
default:
console.warn('未知的文本消息类型:', message.type);
}
}, [updateState]);
}, []);
// 注册消息处理器
useEffect(() => {
const unregister = connection.registerMessageHandler(CHANNEL_NAME, handleMessage);
return unregister;
}, [handleMessage]);
return connection.registerMessageHandler(CHANNEL_NAME, handleMessage);
}, [connection, handleMessage]);
// 监听连接状态变化 (直接使用 connection 的状态)
useEffect(() => {
// 同步连接状态
updateState({
isConnecting: connection.isConnecting,
isConnected: connection.isConnected,
isWebSocketConnected: connection.isWebSocketConnected,
connectionError: connection.error
});
}, [connection.isConnecting, connection.isConnected, connection.isWebSocketConnected, connection.error, updateState]);
// 连接
// 连接管理(透传)
const connect = useCallback((roomCode: string, role: 'sender' | 'receiver') => {
return connection.connect(roomCode, role);
}, [connection]);
// 断开连接
const disconnect = useCallback(() => {
return connection.disconnect();
}, [connection]);
// 发送实时文本同步 (替代原来的 sendMessage)
// 发送实时文本同步
const sendTextSync = useCallback((text: string) => {
if (!connection || !connection.isPeerConnected) return;
const message = {
type: 'text-sync',
payload: { text }
};
const success = connection.sendMessage(message, CHANNEL_NAME);
if (success) {
console.log('发送实时文本同步:', text.length, '字符');
}
if (!connection.isPeerConnected) return;
connection.sendMessage({ type: 'text-sync', payload: { text } }, CHANNEL_NAME);
}, [connection]);
// 发送打字状态
const sendTypingStatus = useCallback((isTyping: boolean) => {
if (!connection || !connection.isPeerConnected) return;
const message = {
type: 'text-typing',
payload: { typing: isTyping }
};
const success = connection.sendMessage(message, CHANNEL_NAME);
if (success) {
console.log('发送打字状态:', isTyping);
}
if (!connection.isPeerConnected) return;
connection.sendMessage({ type: 'text-typing', payload: { typing: isTyping } }, CHANNEL_NAME);
}, [connection]);
// 设置文本同步回调
// 回调注册(返回清理函数)
const onTextSync = useCallback((callback: TextSyncCallback) => {
textSyncCallbackRef.current = callback;
// 返回清理函数
return () => {
textSyncCallbackRef.current = null;
};
return () => { textSyncCallbackRef.current = null; };
}, []);
// 设置打字状态回调
const onTypingStatus = useCallback((callback: TypingStatusCallback) => {
typingCallbackRef.current = callback;
// 返回清理函数
return () => {
typingCallbackRef.current = null;
};
return () => { typingCallbackRef.current = null; };
}, []);
return {
// 状态 - 直接 connection 获取
// 连接状态(直接 connection,不做冗余同步)
isConnecting: connection.isConnecting,
isConnected: connection.isConnected,
isWebSocketConnected: connection.isWebSocketConnected,
connectionError: connection.error,
// 业务状态
currentText: state.currentText,
isTyping: state.isTyping,
// 操作方法
connect,
disconnect,
sendTextSync,
sendTypingStatus,
// 回调设置
onTextSync,
onTypingStatus
onTypingStatus,
};
}