mirror of
https://github.com/MatrixSeven/file-transfer-go.git
synced 2026-03-09 09:17:36 +08:00
feat: 更新文件传输功能,添加速度和ETA计算,优化进度回调,增强用户体验
This commit is contained in:
@@ -181,7 +181,7 @@ export const WebRTCFileTransfer: React.FC = () => {
|
||||
fileName: progressInfo.fileName,
|
||||
progress: progressInfo.progress
|
||||
});
|
||||
updateFileProgress(progressInfo.fileId, progressInfo.fileName, progressInfo.progress);
|
||||
updateFileProgress(progressInfo.fileId, progressInfo.fileName, progressInfo.progress, progressInfo.speed, progressInfo.eta);
|
||||
if (progressInfo.progress >= 100 && mode === 'send') {
|
||||
setCurrentTransferFile(null);
|
||||
}
|
||||
|
||||
@@ -25,6 +25,28 @@ const formatFileSize = (bytes: number): string => {
|
||||
return parseFloat((bytes / Math.pow(k, i)).toFixed(2)) + ' ' + sizes[i];
|
||||
};
|
||||
|
||||
const formatSpeed = (bytesPerSecond: number): string => {
|
||||
if (bytesPerSecond <= 0) return '--';
|
||||
const k = 1024;
|
||||
if (bytesPerSecond < k) return `${bytesPerSecond.toFixed(0)} B/s`;
|
||||
if (bytesPerSecond < k * k) return `${(bytesPerSecond / k).toFixed(1)} KB/s`;
|
||||
if (bytesPerSecond < k * k * k) return `${(bytesPerSecond / (k * k)).toFixed(2)} MB/s`;
|
||||
return `${(bytesPerSecond / (k * k * k)).toFixed(2)} GB/s`;
|
||||
};
|
||||
|
||||
const formatETA = (seconds: number): string => {
|
||||
if (seconds <= 0 || !isFinite(seconds)) return '--';
|
||||
if (seconds < 60) return `${Math.ceil(seconds)}秒`;
|
||||
if (seconds < 3600) {
|
||||
const m = Math.floor(seconds / 60);
|
||||
const s = Math.ceil(seconds % 60);
|
||||
return `${m}分${s > 0 ? s + '秒' : ''}`;
|
||||
}
|
||||
const h = Math.floor(seconds / 3600);
|
||||
const m = Math.ceil((seconds % 3600) / 60);
|
||||
return `${h}时${m > 0 ? m + '分' : ''}`;
|
||||
};
|
||||
|
||||
interface WebRTCFileReceiveProps {
|
||||
onJoinRoom: (code: string) => void;
|
||||
files: FileInfo[];
|
||||
@@ -204,6 +226,8 @@ export function WebRTCFileReceive({
|
||||
const isCompleted = file.status === 'completed';
|
||||
const hasDownloadedFile = downloadedFiles?.has(file.id);
|
||||
const currentProgress = file.progress;
|
||||
const currentSpeed = file.speed;
|
||||
const currentEta = file.eta;
|
||||
|
||||
console.log('文件状态:', {
|
||||
fileName: file.name,
|
||||
@@ -226,9 +250,6 @@ export function WebRTCFileReceive({
|
||||
{hasDownloadedFile && (
|
||||
<p className="text-xs text-emerald-600 font-medium">✅ 传输完成,点击保存</p>
|
||||
)}
|
||||
{isDownloading && (
|
||||
<p className="text-xs text-blue-600 font-medium">⏳ 传输中...{currentProgress.toFixed(1)}%</p>
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
<Button
|
||||
@@ -249,9 +270,17 @@ export function WebRTCFileReceive({
|
||||
|
||||
{(isDownloading || isCompleted) && currentProgress > 0 && (
|
||||
<div className="mt-3 space-y-2">
|
||||
<div className="flex justify-between text-sm text-slate-600">
|
||||
<div className="flex justify-between items-center text-sm text-slate-600">
|
||||
<span>{hasDownloadedFile ? '传输完成' : '正在传输...'}</span>
|
||||
<span className="font-medium">{currentProgress.toFixed(1)}%</span>
|
||||
<div className="flex items-center space-x-3">
|
||||
{isDownloading && currentSpeed != null && currentSpeed > 0 && (
|
||||
<span className="text-xs text-blue-600 font-medium">{formatSpeed(currentSpeed)}</span>
|
||||
)}
|
||||
{isDownloading && currentEta != null && currentEta > 0 && (
|
||||
<span className="text-xs text-slate-500">剩余 {formatETA(currentEta)}</span>
|
||||
)}
|
||||
<span className="font-medium">{currentProgress.toFixed(1)}%</span>
|
||||
</div>
|
||||
</div>
|
||||
<div className="w-full bg-slate-200 rounded-full h-2">
|
||||
<div
|
||||
|
||||
@@ -23,6 +23,28 @@ const formatFileSize = (bytes: number): string => {
|
||||
return parseFloat((bytes / Math.pow(k, i)).toFixed(2)) + ' ' + sizes[i];
|
||||
};
|
||||
|
||||
const formatSpeed = (bytesPerSecond: number): string => {
|
||||
if (bytesPerSecond <= 0) return '--';
|
||||
const k = 1024;
|
||||
if (bytesPerSecond < k) return `${bytesPerSecond.toFixed(0)} B/s`;
|
||||
if (bytesPerSecond < k * k) return `${(bytesPerSecond / k).toFixed(1)} KB/s`;
|
||||
if (bytesPerSecond < k * k * k) return `${(bytesPerSecond / (k * k)).toFixed(2)} MB/s`;
|
||||
return `${(bytesPerSecond / (k * k * k)).toFixed(2)} GB/s`;
|
||||
};
|
||||
|
||||
const formatETA = (seconds: number): string => {
|
||||
if (seconds <= 0 || !isFinite(seconds)) return '--';
|
||||
if (seconds < 60) return `${Math.ceil(seconds)}秒`;
|
||||
if (seconds < 3600) {
|
||||
const m = Math.floor(seconds / 60);
|
||||
const s = Math.ceil(seconds % 60);
|
||||
return `${m}分${s > 0 ? s + '秒' : ''}`;
|
||||
}
|
||||
const h = Math.floor(seconds / 3600);
|
||||
const m = Math.ceil((seconds % 3600) / 60);
|
||||
return `${h}时${m > 0 ? m + '分' : ''}`;
|
||||
};
|
||||
|
||||
interface WebRTCFileUploadProps {
|
||||
selectedFiles: File[];
|
||||
fileList?: FileInfo[]; // 添加文件列表信息(包含状态和进度)
|
||||
@@ -190,6 +212,8 @@ export function WebRTCFileUpload({
|
||||
const isTransferringThisFile = fileInfo?.status === 'downloading';
|
||||
const currentProgress = fileInfo?.progress || 0;
|
||||
const fileStatus = fileInfo?.status || 'ready';
|
||||
const currentSpeed = fileInfo?.speed;
|
||||
const currentEta = fileInfo?.eta;
|
||||
|
||||
return (
|
||||
<div
|
||||
@@ -235,9 +259,17 @@ export function WebRTCFileUpload({
|
||||
{(fileStatus === 'downloading' || fileStatus === 'completed') && currentProgress > 0 && (
|
||||
<div className="px-3 sm:px-4 pb-3 sm:pb-4">
|
||||
<div className="space-y-2">
|
||||
<div className="flex justify-between text-xs text-slate-600">
|
||||
<div className="flex justify-between items-center text-xs text-slate-600">
|
||||
<span>{fileStatus === 'downloading' ? '正在发送...' : '发送完成'}</span>
|
||||
<span className="font-medium">{currentProgress.toFixed(1)}%</span>
|
||||
<div className="flex items-center space-x-3">
|
||||
{fileStatus === 'downloading' && currentSpeed != null && currentSpeed > 0 && (
|
||||
<span className="text-orange-600 font-medium">{formatSpeed(currentSpeed)}</span>
|
||||
)}
|
||||
{fileStatus === 'downloading' && currentEta != null && currentEta > 0 && (
|
||||
<span className="text-slate-500">剩余 {formatETA(currentEta)}</span>
|
||||
)}
|
||||
<span className="font-medium">{currentProgress.toFixed(1)}%</span>
|
||||
</div>
|
||||
</div>
|
||||
<div className="w-full bg-slate-200 rounded-full h-2">
|
||||
<div
|
||||
|
||||
@@ -33,6 +33,8 @@ export interface WebRTCConnection {
|
||||
|
||||
// 工具方法
|
||||
getChannelState: () => RTCDataChannelState;
|
||||
getBufferedAmount: () => number;
|
||||
waitForBufferDrain: (threshold?: number) => Promise<void>;
|
||||
isConnectedToRoom: (roomCode: string, role: 'sender' | 'receiver') => boolean;
|
||||
|
||||
// 当前房间信息
|
||||
@@ -134,6 +136,8 @@ export function useSharedWebRTCManager(): WebRTCConnection {
|
||||
|
||||
// 工具方法
|
||||
getChannelState: dataChannelManager.getChannelState,
|
||||
getBufferedAmount: dataChannelManager.getBufferedAmount,
|
||||
waitForBufferDrain: dataChannelManager.waitForBufferDrain,
|
||||
isConnectedToRoom: stateManager.isConnectedToRoom,
|
||||
|
||||
// 媒体轨道方法
|
||||
|
||||
@@ -44,6 +44,12 @@ export interface WebRTCDataChannelManager {
|
||||
// 获取数据通道状态(兼容 RTCDataChannelState)
|
||||
getChannelState: () => RTCDataChannelState;
|
||||
|
||||
// 获取当前缓冲区大小
|
||||
getBufferedAmount: () => number;
|
||||
|
||||
// 等待缓冲区排空到指定阈值以下
|
||||
waitForBufferDrain: (threshold?: number) => Promise<void>;
|
||||
|
||||
// 处理数据通道消息 (P2P)
|
||||
handleDataChannelMessage: (event: MessageEvent) => void;
|
||||
}
|
||||
@@ -92,8 +98,7 @@ export function useWebRTCDataChannelManager(
|
||||
// 数据通道处理
|
||||
if (role === 'sender') {
|
||||
const dataChannel = pc.createDataChannel('shared-channel', {
|
||||
ordered: true,
|
||||
maxRetransmits: 3
|
||||
ordered: true
|
||||
});
|
||||
dcRef.current = dataChannel;
|
||||
|
||||
@@ -471,6 +476,62 @@ export function useWebRTCDataChannelManager(
|
||||
return dcRef.current?.readyState || 'closed';
|
||||
}, []);
|
||||
|
||||
// 获取当前缓冲区大小
|
||||
const getBufferedAmount = useCallback((): number => {
|
||||
if (dcRef.current && dcRef.current.readyState === 'open') {
|
||||
return dcRef.current.bufferedAmount;
|
||||
}
|
||||
if (relayWsRef.current && relayWsRef.current.readyState === WebSocket.OPEN) {
|
||||
return relayWsRef.current.bufferedAmount;
|
||||
}
|
||||
return 0;
|
||||
}, []);
|
||||
|
||||
// 等待缓冲区排空到阈值以下
|
||||
const waitForBufferDrain = useCallback((threshold: number = 1 * 1024 * 1024): Promise<void> => {
|
||||
// P2P DataChannel
|
||||
if (dcRef.current && dcRef.current.readyState === 'open') {
|
||||
if (dcRef.current.bufferedAmount <= threshold) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
return new Promise<void>((resolve) => {
|
||||
const dc = dcRef.current!;
|
||||
dc.bufferedAmountLowThreshold = threshold;
|
||||
const onLow = () => {
|
||||
dc.removeEventListener('bufferedamountlow', onLow);
|
||||
resolve();
|
||||
};
|
||||
dc.addEventListener('bufferedamountlow', onLow);
|
||||
// 安全超时,防止死等
|
||||
setTimeout(() => {
|
||||
dc.removeEventListener('bufferedamountlow', onLow);
|
||||
resolve();
|
||||
}, 5000);
|
||||
});
|
||||
}
|
||||
// Relay WebSocket
|
||||
if (relayWsRef.current && relayWsRef.current.readyState === WebSocket.OPEN) {
|
||||
if (relayWsRef.current.bufferedAmount <= threshold) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
return new Promise<void>((resolve) => {
|
||||
const checkInterval = setInterval(() => {
|
||||
if (!relayWsRef.current || relayWsRef.current.readyState !== WebSocket.OPEN ||
|
||||
relayWsRef.current.bufferedAmount <= threshold) {
|
||||
clearInterval(checkInterval);
|
||||
resolve();
|
||||
}
|
||||
}, 50);
|
||||
// 安全超时
|
||||
setTimeout(() => {
|
||||
clearInterval(checkInterval);
|
||||
resolve();
|
||||
}, 5000);
|
||||
});
|
||||
}
|
||||
return Promise.resolve();
|
||||
}, []);
|
||||
|
||||
return {
|
||||
createDataChannel,
|
||||
switchToRelay,
|
||||
@@ -481,6 +542,8 @@ export function useWebRTCDataChannelManager(
|
||||
registerMessageHandler,
|
||||
registerDataHandler,
|
||||
getChannelState,
|
||||
getBufferedAmount,
|
||||
waitForBufferDrain,
|
||||
handleDataChannelMessage,
|
||||
};
|
||||
}
|
||||
@@ -73,12 +73,19 @@ export const useFileStateManager = ({
|
||||
}, []);
|
||||
|
||||
// 更新文件进度
|
||||
const updateFileProgress = useCallback((fileId: string, fileName: string, progress: number) => {
|
||||
const updateFileProgress = useCallback((fileId: string, fileName: string, progress: number, speed?: number, eta?: number) => {
|
||||
const newStatus = progress >= 100 ? 'completed' as const : 'downloading' as const;
|
||||
setFileList(prev => prev.map(item => {
|
||||
if (item.id === fileId || item.name === fileName) {
|
||||
console.log(`更新文件 ${item.name} 进度: ${item.progress} -> ${progress}`);
|
||||
return { ...item, progress, status: newStatus };
|
||||
return {
|
||||
...item,
|
||||
progress,
|
||||
status: newStatus,
|
||||
// 仅在传输中且有新值时更新速度/ETA,完成时清除
|
||||
speed: newStatus === 'completed' ? undefined : (speed !== undefined ? speed : item.speed),
|
||||
eta: newStatus === 'completed' ? undefined : (eta !== undefined ? eta : item.eta)
|
||||
};
|
||||
}
|
||||
return item;
|
||||
}));
|
||||
|
||||
@@ -20,6 +20,15 @@ interface FileReceiveProgress {
|
||||
fileName: string;
|
||||
totalChunks: number;
|
||||
progress: number;
|
||||
fileSize: number; // 文件总大小 bytes
|
||||
startTime: number; // 开始接收时间
|
||||
lastChunkTime: number; // 上一个块接收时间
|
||||
// 滑动窗口测速
|
||||
speedWindowBytes: number; // 窗口内累计字节数
|
||||
speedWindowStart: number; // 窗口开始时间
|
||||
lastReportedSpeed: number; // 上次上报的速度 bytes/s
|
||||
lastReportedEta: number; // 上次上报的 ETA 秒
|
||||
lastSpeedReportTime: number; // 上次上报速度的时间
|
||||
}
|
||||
|
||||
// 文件元数据
|
||||
@@ -57,50 +66,29 @@ interface TransferStatus {
|
||||
lastChunkTime: number;
|
||||
retryCount: Map<number, number>;
|
||||
averageSpeed: number; // KB/s
|
||||
// 滑动窗口测速
|
||||
speedWindowBytes: number; // 窗口内累计字节数
|
||||
speedWindowStart: number; // 窗口开始时间
|
||||
lastReportedSpeed: number; // 上次上报的速度 bytes/s
|
||||
lastReportedEta: number; // 上次上报的 ETA 秒
|
||||
lastSpeedReportTime: number; // 上次上报速度的时间
|
||||
}
|
||||
|
||||
// 回调类型
|
||||
type FileReceivedCallback = (fileData: { id: string; file: File }) => void;
|
||||
type FileRequestedCallback = (fileId: string, fileName: string) => void;
|
||||
type FileProgressCallback = (progressInfo: { fileId: string; fileName: string; progress: number }) => void;
|
||||
type FileProgressCallback = (progressInfo: { fileId: string; fileName: string; progress: number; speed?: number; eta?: number }) => void;
|
||||
type FileListReceivedCallback = (fileList: FileInfo[]) => void;
|
||||
|
||||
const CHANNEL_NAME = 'file-transfer';
|
||||
const CHUNK_SIZE = 256 * 1024; // 256KB
|
||||
const MAX_RETRIES = 5; // 最大重试次数
|
||||
const CHUNK_SIZE = 256 * 1024; // 256KB — WebRTC DataChannel 单次发送上限
|
||||
const MAX_RETRIES = 5; // 最大重试次数(仅用于连接恢复)
|
||||
const RETRY_DELAY = 1000; // 重试延迟(毫秒)
|
||||
const ACK_TIMEOUT = 5000; // 确认超时(毫秒)
|
||||
|
||||
/**
|
||||
* 计算数据的CRC32校验和
|
||||
*/
|
||||
function calculateChecksum(data: ArrayBuffer): string {
|
||||
const buffer = new Uint8Array(data);
|
||||
let crc = 0xFFFFFFFF;
|
||||
|
||||
for (let i = 0; i < buffer.length; i++) {
|
||||
crc ^= buffer[i];
|
||||
for (let j = 0; j < 8; j++) {
|
||||
crc = crc & 1 ? (crc >>> 1) ^ 0xEDB88320 : crc >>> 1;
|
||||
}
|
||||
}
|
||||
|
||||
return (crc ^ 0xFFFFFFFF).toString(16).padStart(8, '0');
|
||||
}
|
||||
|
||||
/**
|
||||
* 生成简单的校验和(备用方案)
|
||||
*/
|
||||
function simpleChecksum(data: ArrayBuffer): string {
|
||||
const buffer = new Uint8Array(data);
|
||||
let sum = 0;
|
||||
|
||||
for (let i = 0; i < Math.min(buffer.length, 1000); i++) {
|
||||
sum += buffer[i];
|
||||
}
|
||||
|
||||
return sum.toString(16);
|
||||
}
|
||||
const ACK_TIMEOUT = 5000; // 完成确认超时(毫秒)
|
||||
const SPEED_WINDOW_MS = 2000; // 速度计算滑动窗口 2 秒
|
||||
const SPEED_REPORT_INTERVAL_MS = 1000; // 速度上报最小间隔 1 秒
|
||||
const BUFFER_HIGH_WATER = 2 * 1024 * 1024; // 2MB — 发送背压阈值
|
||||
const PROGRESS_LOG_INTERVAL = 50; // 每 50 个块打印一次日志
|
||||
|
||||
/**
|
||||
* 文件传输业务层
|
||||
@@ -137,8 +125,6 @@ export function useFileTransferBusiness(connection: WebRTCConnection) {
|
||||
|
||||
// 传输状态管理
|
||||
const transferStatus = useRef<Map<string, TransferStatus>>(new Map());
|
||||
const pendingChunks = useRef<Map<string, NodeJS.Timeout>>(new Map());
|
||||
const chunkAckCallbacks = useRef<Map<string, Set<(ack: ChunkAck) => void>>>(new Map());
|
||||
|
||||
// 接收文件进度跟踪
|
||||
const receiveProgress = useRef<Map<string, FileReceiveProgress>>(new Map());
|
||||
@@ -152,7 +138,7 @@ export function useFileTransferBusiness(connection: WebRTCConnection) {
|
||||
const handleMessage = useCallback((message: any) => {
|
||||
if (!message.type.startsWith('file-')) return;
|
||||
|
||||
console.log('文件传输收到消息:', message.type, message); switch (message.type) {
|
||||
console.log('文件传输收到消息:', message.type); switch (message.type) {
|
||||
case 'file-metadata':
|
||||
const metadata: FileMetadata = message.payload;
|
||||
console.log('开始接收文件:', metadata.name);
|
||||
@@ -165,11 +151,20 @@ export function useFileTransferBusiness(connection: WebRTCConnection) {
|
||||
|
||||
// 初始化接收进度跟踪
|
||||
const totalChunks = Math.ceil(metadata.size / CHUNK_SIZE);
|
||||
const nowInit = Date.now();
|
||||
receiveProgress.current.set(metadata.id, {
|
||||
fileId: metadata.id,
|
||||
fileName: metadata.name,
|
||||
totalChunks,
|
||||
progress: 0
|
||||
progress: 0,
|
||||
fileSize: metadata.size,
|
||||
startTime: nowInit,
|
||||
lastChunkTime: nowInit,
|
||||
speedWindowBytes: 0,
|
||||
speedWindowStart: nowInit,
|
||||
lastReportedSpeed: 0,
|
||||
lastReportedEta: 0,
|
||||
lastSpeedReportTime: 0
|
||||
});
|
||||
|
||||
// 设置当前活跃的接收文件
|
||||
@@ -229,113 +224,87 @@ export function useFileTransferBusiness(connection: WebRTCConnection) {
|
||||
console.log('收到块确认:', ack);
|
||||
|
||||
// 清除超时定时器
|
||||
const chunkKey = `${ack.fileId}-${ack.chunkIndex}`;
|
||||
const timeout = pendingChunks.current.get(chunkKey);
|
||||
if (timeout) {
|
||||
clearTimeout(timeout);
|
||||
pendingChunks.current.delete(chunkKey);
|
||||
}
|
||||
|
||||
// 调用确认回调
|
||||
const callbacks = chunkAckCallbacks.current.get(chunkKey);
|
||||
if (callbacks) {
|
||||
callbacks.forEach(cb => cb(ack));
|
||||
chunkAckCallbacks.current.delete(chunkKey);
|
||||
}
|
||||
|
||||
// 更新传输状态
|
||||
const status = transferStatus.current.get(ack.fileId);
|
||||
if (status) {
|
||||
if (ack.success) {
|
||||
status.acknowledgedChunks.add(ack.chunkIndex);
|
||||
status.failedChunks.delete(ack.chunkIndex);
|
||||
} else {
|
||||
status.failedChunks.add(ack.chunkIndex);
|
||||
}
|
||||
}
|
||||
case 'file-chunk-ack':
|
||||
// 保留消息处理以兼容旧版对端,但不再依赖逐块 ACK 做可靠性
|
||||
break;
|
||||
}
|
||||
}, [updateState]);
|
||||
|
||||
// 处理文件块数据
|
||||
// 处理文件块数据 — 流式接收,无 CRC32(SCTP 已保证完整性)
|
||||
const handleData = useCallback((data: ArrayBuffer) => {
|
||||
if (!expectedChunk.current) {
|
||||
console.warn('收到数据但没有对应的块信息');
|
||||
return;
|
||||
}
|
||||
|
||||
const { fileId, chunkIndex, totalChunks, checksum: expectedChecksum } = expectedChunk.current;
|
||||
const { fileId, chunkIndex } = expectedChunk.current;
|
||||
const fileInfo = receivingFiles.current.get(fileId);
|
||||
|
||||
if (fileInfo) {
|
||||
// 验证数据完整性
|
||||
const actualChecksum = calculateChecksum(data);
|
||||
const isValid = !expectedChecksum || actualChecksum === expectedChecksum;
|
||||
|
||||
if (!isValid) {
|
||||
console.warn(`文件块校验失败: 期望 ${expectedChecksum}, 实际 ${actualChecksum}`);
|
||||
|
||||
// 发送失败确认
|
||||
connection.sendMessage({
|
||||
type: 'file-chunk-ack',
|
||||
payload: {
|
||||
fileId,
|
||||
chunkIndex,
|
||||
success: false,
|
||||
checksum: actualChecksum
|
||||
}
|
||||
}, CHANNEL_NAME);
|
||||
|
||||
expectedChunk.current = null;
|
||||
return;
|
||||
}
|
||||
|
||||
// 检查是否已经接收过这个块,避免重复计数
|
||||
const alreadyReceived = fileInfo.chunks[chunkIndex] !== undefined;
|
||||
|
||||
// 数据有效,保存到缓存
|
||||
// 保存到缓存
|
||||
fileInfo.chunks[chunkIndex] = data;
|
||||
|
||||
// 只有在首次接收时才增加计数
|
||||
if (!alreadyReceived) {
|
||||
fileInfo.receivedChunks++;
|
||||
}
|
||||
|
||||
// 更新接收进度跟踪 - 使用 fileInfo 的计数,避免双重计数
|
||||
// 更新接收进度跟踪
|
||||
const progressInfo = receiveProgress.current.get(fileId);
|
||||
if (progressInfo) {
|
||||
progressInfo.progress = progressInfo.totalChunks > 0 ?
|
||||
(fileInfo.receivedChunks / progressInfo.totalChunks) * 100 : 0;
|
||||
|
||||
// 滑动窗口累计字节
|
||||
const now = Date.now();
|
||||
progressInfo.speedWindowBytes += data.byteLength;
|
||||
progressInfo.lastChunkTime = now;
|
||||
|
||||
const windowElapsed = now - progressInfo.speedWindowStart;
|
||||
|
||||
if (windowElapsed >= SPEED_WINDOW_MS && windowElapsed > 0) {
|
||||
const speedBps = (progressInfo.speedWindowBytes / windowElapsed) * 1000;
|
||||
const receivedBytes = fileInfo.receivedChunks * CHUNK_SIZE;
|
||||
const remainingBytes = Math.max(0, progressInfo.fileSize - receivedBytes);
|
||||
const eta = speedBps > 0 ? remainingBytes / speedBps : 0;
|
||||
progressInfo.speedWindowBytes = 0;
|
||||
progressInfo.speedWindowStart = now;
|
||||
progressInfo.lastReportedSpeed = speedBps;
|
||||
progressInfo.lastReportedEta = Math.max(0, eta);
|
||||
}
|
||||
|
||||
// 只有当这个文件是当前活跃文件时才更新全局进度
|
||||
if (activeReceiveFile.current === fileId) {
|
||||
updateState({ progress: progressInfo.progress });
|
||||
}
|
||||
|
||||
// 触发进度回调
|
||||
// 节流上报
|
||||
const timeSinceLastReport = now - progressInfo.lastSpeedReportTime;
|
||||
const reportSpeed = timeSinceLastReport >= SPEED_REPORT_INTERVAL_MS;
|
||||
|
||||
fileProgressCallbacks.current.forEach(cb => cb({
|
||||
fileId: fileId,
|
||||
fileName: progressInfo.fileName,
|
||||
progress: progressInfo.progress
|
||||
progress: progressInfo.progress,
|
||||
speed: reportSpeed ? progressInfo.lastReportedSpeed : undefined,
|
||||
eta: reportSpeed ? progressInfo.lastReportedEta : undefined
|
||||
}));
|
||||
|
||||
console.log(`文件 ${progressInfo.fileName} 接收进度: ${progressInfo.progress.toFixed(1)}%`);
|
||||
}
|
||||
|
||||
// 发送成功确认
|
||||
connection.sendMessage({
|
||||
type: 'file-chunk-ack',
|
||||
payload: {
|
||||
fileId,
|
||||
chunkIndex,
|
||||
success: true,
|
||||
checksum: actualChecksum
|
||||
if (reportSpeed) {
|
||||
progressInfo.lastSpeedReportTime = now;
|
||||
}
|
||||
}, CHANNEL_NAME);
|
||||
|
||||
// 稀疏日志
|
||||
if (fileInfo.receivedChunks % PROGRESS_LOG_INTERVAL === 0) {
|
||||
console.log(`接收 ${progressInfo.fileName}: ${progressInfo.progress.toFixed(1)}%`);
|
||||
}
|
||||
}
|
||||
|
||||
expectedChunk.current = null;
|
||||
}
|
||||
}, [updateState, connection]);
|
||||
}, [updateState]);
|
||||
|
||||
const connectionRef = useRef(connection);
|
||||
useEffect(() => {
|
||||
@@ -369,72 +338,8 @@ export function useFileTransferBusiness(connection: WebRTCConnection) {
|
||||
return connection.connect(roomCode, role);
|
||||
}, [connection]);
|
||||
|
||||
// 安全发送单个文件块
|
||||
const sendChunkWithAck = useCallback(async (
|
||||
fileId: string,
|
||||
chunkIndex: number,
|
||||
chunkData: ArrayBuffer,
|
||||
checksum: string,
|
||||
retryCount = 0
|
||||
): Promise<boolean> => {
|
||||
return new Promise((resolve) => {
|
||||
// 主要检查数据通道状态,因为数据通道是文件传输的实际通道
|
||||
const channelState = connection.getChannelState();
|
||||
if (channelState === 'closed') {
|
||||
console.warn(`数据通道已关闭,停止发送文件块 ${chunkIndex}`);
|
||||
resolve(false);
|
||||
return;
|
||||
}
|
||||
|
||||
// 如果连接暂时断开但数据通道可用,仍然可以尝试发送
|
||||
if (!connection.isConnected && channelState === 'connecting') {
|
||||
console.warn(`WebRTC 连接暂时断开,但数据通道正在连接,继续尝试发送文件块 ${chunkIndex}`);
|
||||
}
|
||||
|
||||
const chunkKey = `${fileId}-${chunkIndex}`;
|
||||
|
||||
// 设置确认回调
|
||||
const ackCallback = (ack: ChunkAck) => {
|
||||
if (ack.success) {
|
||||
resolve(true);
|
||||
} else {
|
||||
console.warn(`文件块 ${chunkIndex} 确认失败,准备重试`);
|
||||
resolve(false);
|
||||
}
|
||||
};
|
||||
|
||||
// 注册确认回调
|
||||
if (!chunkAckCallbacks.current.has(chunkKey)) {
|
||||
chunkAckCallbacks.current.set(chunkKey, new Set());
|
||||
}
|
||||
chunkAckCallbacks.current.get(chunkKey)!.add(ackCallback);
|
||||
|
||||
// 设置超时定时器
|
||||
const timeout = setTimeout(() => {
|
||||
console.warn(`文件块 ${chunkIndex} 确认超时`);
|
||||
chunkAckCallbacks.current.get(chunkKey)?.delete(ackCallback);
|
||||
resolve(false);
|
||||
}, ACK_TIMEOUT);
|
||||
|
||||
pendingChunks.current.set(chunkKey, timeout);
|
||||
|
||||
// 发送块信息
|
||||
connection.sendMessage({
|
||||
type: 'file-chunk-info',
|
||||
payload: {
|
||||
fileId,
|
||||
chunkIndex,
|
||||
totalChunks: 0, // 这里不需要,因为已经在元数据中发送
|
||||
checksum
|
||||
}
|
||||
}, CHANNEL_NAME);
|
||||
|
||||
// 发送块数据
|
||||
connection.sendData(chunkData);
|
||||
});
|
||||
}, [connection]);
|
||||
|
||||
// 安全发送文件
|
||||
// 安全发送文件 — 流式传输 + bufferedAmount 背压控制
|
||||
// DataChannel ordered+reliable 模式下 SCTP 保证按序可靠交付,无需逐块 ACK
|
||||
const sendFileSecure = useCallback(async (file: File, fileId?: string) => {
|
||||
if (connection.getChannelState() !== 'open') {
|
||||
updateState({ error: '连接未就绪' });
|
||||
@@ -449,6 +354,7 @@ export function useFileTransferBusiness(connection: WebRTCConnection) {
|
||||
updateState({ isTransferring: true, progress: 0, error: null });
|
||||
|
||||
// 初始化传输状态
|
||||
const nowInit = Date.now();
|
||||
const status: TransferStatus = {
|
||||
fileId: actualFileId,
|
||||
fileName: file.name,
|
||||
@@ -456,9 +362,14 @@ export function useFileTransferBusiness(connection: WebRTCConnection) {
|
||||
sentChunks: new Set(),
|
||||
acknowledgedChunks: new Set(),
|
||||
failedChunks: new Set(),
|
||||
lastChunkTime: Date.now(),
|
||||
lastChunkTime: nowInit,
|
||||
retryCount: new Map(),
|
||||
averageSpeed: 0
|
||||
averageSpeed: 0,
|
||||
speedWindowBytes: 0,
|
||||
speedWindowStart: nowInit,
|
||||
lastReportedSpeed: 0,
|
||||
lastReportedEta: 0,
|
||||
lastSpeedReportTime: 0
|
||||
};
|
||||
transferStatus.current.set(actualFileId, status);
|
||||
|
||||
@@ -474,91 +385,82 @@ export function useFileTransferBusiness(connection: WebRTCConnection) {
|
||||
}
|
||||
}, CHANNEL_NAME);
|
||||
|
||||
// 2. 分块发送文件
|
||||
// 2. 流式分块发送 — 使用 bufferedAmount 背压控制,不等待逐块 ACK
|
||||
for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++) {
|
||||
let success = false;
|
||||
let retryCount = 0;
|
||||
|
||||
while (!success && retryCount <= MAX_RETRIES) {
|
||||
// 检查数据通道状态,这是文件传输的实际通道
|
||||
const channelState = connection.getChannelState();
|
||||
if (channelState === 'closed') {
|
||||
console.warn(`数据通道已关闭,停止文件传输`);
|
||||
throw new Error('数据通道已关闭');
|
||||
}
|
||||
|
||||
// 如果连接暂时断开但数据通道可用,仍然可以尝试发送
|
||||
if (!connection.isConnected && channelState === 'connecting') {
|
||||
console.warn(`WebRTC 连接暂时断开,但数据通道正在连接,继续尝试发送文件块 ${chunkIndex}`);
|
||||
}
|
||||
|
||||
const start = chunkIndex * CHUNK_SIZE;
|
||||
const end = Math.min(start + CHUNK_SIZE, file.size);
|
||||
const chunk = file.slice(start, end);
|
||||
const arrayBuffer = await chunk.arrayBuffer();
|
||||
const checksum = calculateChecksum(arrayBuffer);
|
||||
|
||||
console.log(`发送文件块 ${chunkIndex}/${totalChunks}, 重试次数: ${retryCount}`);
|
||||
|
||||
// 发送块并等待确认
|
||||
success = await sendChunkWithAck(actualFileId, chunkIndex, arrayBuffer, checksum, retryCount);
|
||||
|
||||
if (success) {
|
||||
status.sentChunks.add(chunkIndex);
|
||||
status.acknowledgedChunks.add(chunkIndex);
|
||||
status.failedChunks.delete(chunkIndex);
|
||||
|
||||
// 计算传输速度
|
||||
const now = Date.now();
|
||||
const timeDiff = (now - status.lastChunkTime) / 1000; // 秒
|
||||
if (timeDiff > 0) {
|
||||
const speed = (arrayBuffer.byteLength / 1024) / timeDiff; // KB/s
|
||||
status.averageSpeed = status.averageSpeed * 0.7 + speed * 0.3; // 平滑平均
|
||||
}
|
||||
status.lastChunkTime = now;
|
||||
} else {
|
||||
retryCount++;
|
||||
status.retryCount.set(chunkIndex, retryCount);
|
||||
|
||||
if (retryCount > MAX_RETRIES) {
|
||||
status.failedChunks.add(chunkIndex);
|
||||
throw new Error(`文件块 ${chunkIndex} 发送失败,超过最大重试次数`);
|
||||
}
|
||||
|
||||
// 指数退避
|
||||
const delay = Math.min(RETRY_DELAY * Math.pow(2, retryCount - 1), 10000);
|
||||
console.log(`等待 ${delay}ms 后重试文件块 ${chunkIndex}`);
|
||||
await new Promise(resolve => setTimeout(resolve, delay));
|
||||
}
|
||||
// 检查数据通道状态
|
||||
const channelState = connection.getChannelState();
|
||||
if (channelState === 'closed') {
|
||||
throw new Error('数据通道已关闭');
|
||||
}
|
||||
|
||||
// 更新进度 - 基于已发送的块数,这样与接收方的进度更同步
|
||||
// 背压控制:等待缓冲区排空到阈值以下
|
||||
await connection.waitForBufferDrain(BUFFER_HIGH_WATER);
|
||||
|
||||
const start = chunkIndex * CHUNK_SIZE;
|
||||
const end = Math.min(start + CHUNK_SIZE, file.size);
|
||||
const chunk = file.slice(start, end);
|
||||
const arrayBuffer = await chunk.arrayBuffer();
|
||||
|
||||
// 发送块信息(不含校验和 — SCTP 层保证完整性)
|
||||
connection.sendMessage({
|
||||
type: 'file-chunk-info',
|
||||
payload: {
|
||||
fileId: actualFileId,
|
||||
chunkIndex,
|
||||
totalChunks
|
||||
}
|
||||
}, CHANNEL_NAME);
|
||||
|
||||
// 发送块数据
|
||||
connection.sendData(arrayBuffer);
|
||||
|
||||
// 标记已发送
|
||||
status.sentChunks.add(chunkIndex);
|
||||
|
||||
// 滑动窗口测速
|
||||
const now = Date.now();
|
||||
status.speedWindowBytes += arrayBuffer.byteLength;
|
||||
const windowElapsed = now - status.speedWindowStart;
|
||||
|
||||
if (windowElapsed >= SPEED_WINDOW_MS && windowElapsed > 0) {
|
||||
const speedBps = (status.speedWindowBytes / windowElapsed) * 1000;
|
||||
status.averageSpeed = speedBps / 1024;
|
||||
status.lastReportedSpeed = speedBps;
|
||||
const remainingBytes = Math.max(0, file.size - (chunkIndex + 1) * CHUNK_SIZE);
|
||||
status.lastReportedEta = speedBps > 0 ? Math.max(0, remainingBytes / speedBps) : 0;
|
||||
status.speedWindowBytes = 0;
|
||||
status.speedWindowStart = now;
|
||||
}
|
||||
status.lastChunkTime = now;
|
||||
|
||||
// 更新进度
|
||||
const progress = ((chunkIndex + 1) / totalChunks) * 100;
|
||||
updateState({ progress });
|
||||
|
||||
// 节流上报速度/ETA
|
||||
const timeSinceLastReport = now - status.lastSpeedReportTime;
|
||||
const reportSpeed = timeSinceLastReport >= SPEED_REPORT_INTERVAL_MS;
|
||||
|
||||
fileProgressCallbacks.current.forEach(cb => cb({
|
||||
fileId: actualFileId,
|
||||
fileName: file.name,
|
||||
progress
|
||||
progress,
|
||||
speed: reportSpeed ? status.lastReportedSpeed : undefined,
|
||||
eta: reportSpeed ? status.lastReportedEta : undefined
|
||||
}));
|
||||
|
||||
// 自适应流控:根据传输速度调整发送间隔
|
||||
if (status.averageSpeed > 0) {
|
||||
const chunkSize = Math.min(CHUNK_SIZE, file.size - chunkIndex * CHUNK_SIZE);
|
||||
const expectedTime = (chunkSize / 1024) / status.averageSpeed;
|
||||
const actualTime = Date.now() - status.lastChunkTime;
|
||||
const delay = Math.max(0, expectedTime - actualTime);
|
||||
if (reportSpeed) {
|
||||
status.lastSpeedReportTime = now;
|
||||
}
|
||||
|
||||
if (delay > 10) {
|
||||
await new Promise(resolve => setTimeout(resolve, Math.min(delay, 100)));
|
||||
}
|
||||
// 稀疏日志
|
||||
if (chunkIndex % PROGRESS_LOG_INTERVAL === 0 || chunkIndex === totalChunks - 1) {
|
||||
console.log(`发送进度 ${chunkIndex + 1}/${totalChunks} (${progress.toFixed(1)}%)`);
|
||||
}
|
||||
}
|
||||
|
||||
// 3. 验证所有块都已确认
|
||||
if (status.acknowledgedChunks.size !== totalChunks) {
|
||||
throw new Error(`文件传输不完整:${status.acknowledgedChunks.size}/${totalChunks} 块已确认`);
|
||||
}
|
||||
// 3. 等待缓冲区完全排空,确保所有数据已发出
|
||||
await connection.waitForBufferDrain(0);
|
||||
|
||||
// 4. 发送完成信号
|
||||
connection.sendMessage({
|
||||
@@ -567,7 +469,8 @@ export function useFileTransferBusiness(connection: WebRTCConnection) {
|
||||
}, CHANNEL_NAME);
|
||||
|
||||
updateState({ isTransferring: false, progress: 100 });
|
||||
console.log('文件安全发送完成:', file.name, `平均速度: ${status.averageSpeed.toFixed(2)} KB/s`);
|
||||
const totalTime = (Date.now() - status.speedWindowStart) / 1000 || 1;
|
||||
console.log(`文件发送完成: ${file.name}, 平均速度: ${(file.size / 1024 / totalTime).toFixed(0)} KB/s`);
|
||||
transferStatus.current.delete(actualFileId);
|
||||
|
||||
} catch (error) {
|
||||
@@ -578,7 +481,7 @@ export function useFileTransferBusiness(connection: WebRTCConnection) {
|
||||
});
|
||||
transferStatus.current.delete(actualFileId);
|
||||
}
|
||||
}, [connection, updateState, sendChunkWithAck]);
|
||||
}, [connection, updateState]);
|
||||
|
||||
// 保持原有的 sendFile 方法用于向后兼容
|
||||
const sendFile = useCallback(async (file: File, fileId?: string) => {
|
||||
|
||||
@@ -6,6 +6,8 @@ export interface FileInfo {
|
||||
type: string;
|
||||
status: 'ready' | 'downloading' | 'completed';
|
||||
progress: number;
|
||||
speed?: number; // 传输速度 bytes/s
|
||||
eta?: number; // 预估剩余时间 秒
|
||||
lastModified?: number;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user