feat: 文件传输ACK确认机制.保证数据完整.

This commit is contained in:
MatrixSeven
2025-08-24 15:24:12 +08:00
parent 720f808ed6
commit 75825e1104
2 changed files with 393 additions and 37 deletions

View File

@@ -13,6 +13,15 @@ interface FileTransferState {
receivedFiles: Array<{ id: string; file: File }>;
}
// 单个文件的接收进度
interface FileReceiveProgress {
fileId: string;
fileName: string;
receivedChunks: number;
totalChunks: number;
progress: number;
}
// 文件信息
interface FileInfo {
id: string;
@@ -36,6 +45,28 @@ interface FileChunk {
fileId: string;
chunkIndex: number;
totalChunks: number;
checksum?: string; // 数据校验和
}
// 块确认信息
interface ChunkAck {
fileId: string;
chunkIndex: number;
success: boolean;
checksum?: string;
}
// 传输状态
interface TransferStatus {
fileId: string;
fileName: string;
totalChunks: number;
sentChunks: Set<number>;
acknowledgedChunks: Set<number>;
failedChunks: Set<number>;
lastChunkTime: number;
retryCount: Map<number, number>;
averageSpeed: number; // KB/s
}
// 回调类型
@@ -46,6 +77,40 @@ type FileListReceivedCallback = (fileList: FileInfo[]) => void;
const CHANNEL_NAME = 'file-transfer';
const CHUNK_SIZE = 256 * 1024; // 256KB
const MAX_RETRIES = 5; // 最大重试次数
const RETRY_DELAY = 1000; // 重试延迟(毫秒)
const ACK_TIMEOUT = 5000; // 确认超时(毫秒)
/**
* 计算数据的CRC32校验和
*/
function calculateChecksum(data: ArrayBuffer): string {
const buffer = new Uint8Array(data);
let crc = 0xFFFFFFFF;
for (let i = 0; i < buffer.length; i++) {
crc ^= buffer[i];
for (let j = 0; j < 8; j++) {
crc = crc & 1 ? (crc >>> 1) ^ 0xEDB88320 : crc >>> 1;
}
}
return (crc ^ 0xFFFFFFFF).toString(16).padStart(8, '0');
}
/**
* 生成简单的校验和(备用方案)
*/
function simpleChecksum(data: ArrayBuffer): string {
const buffer = new Uint8Array(data);
let sum = 0;
for (let i = 0; i < Math.min(buffer.length, 1000); i++) {
sum += buffer[i];
}
return sum.toString(16);
}
/**
* 文件传输业务层
@@ -80,6 +145,15 @@ export function useFileTransferBusiness(connection: WebRTCConnection) {
const fileProgressCallbacks = useRef<Set<FileProgressCallback>>(new Set());
const fileListCallbacks = useRef<Set<FileListReceivedCallback>>(new Set());
// 传输状态管理
const transferStatus = useRef<Map<string, TransferStatus>>(new Map());
const pendingChunks = useRef<Map<string, NodeJS.Timeout>>(new Map());
const chunkAckCallbacks = useRef<Map<string, Set<(ack: ChunkAck) => void>>>(new Map());
// 接收文件进度跟踪
const receiveProgress = useRef<Map<string, FileReceiveProgress>>(new Map());
const activeReceiveFile = useRef<string | null>(null);
const updateState = useCallback((updates: Partial<FileTransferState>) => {
setState(prev => ({ ...prev, ...updates }));
}, []);
@@ -98,7 +172,19 @@ export function useFileTransferBusiness(connection: WebRTCConnection) {
chunks: [],
receivedChunks: 0,
});
// 初始化接收进度跟踪
const totalChunks = Math.ceil(metadata.size / CHUNK_SIZE);
receiveProgress.current.set(metadata.id, {
fileId: metadata.id,
fileName: metadata.name,
receivedChunks: 0,
totalChunks,
progress: 0
});
// 设置当前活跃的接收文件
activeReceiveFile.current = metadata.id;
updateState({ isTransferring: true, progress: 0 });
break;
@@ -129,6 +215,12 @@ export function useFileTransferBusiness(connection: WebRTCConnection) {
fileReceivedCallbacks.current.forEach(cb => cb({ id: fileId, file }));
receivingFiles.current.delete(fileId);
receiveProgress.current.delete(fileId);
// 清除活跃文件
if (activeReceiveFile.current === fileId) {
activeReceiveFile.current = null;
}
}
break;
@@ -142,6 +234,37 @@ export function useFileTransferBusiness(connection: WebRTCConnection) {
console.log('收到文件请求:', fileName, requestedFileId);
fileRequestedCallbacks.current.forEach(cb => cb(requestedFileId, fileName));
break;
case 'file-chunk-ack':
const ack: ChunkAck = message.payload;
console.log('收到块确认:', ack);
// 清除超时定时器
const chunkKey = `${ack.fileId}-${ack.chunkIndex}`;
const timeout = pendingChunks.current.get(chunkKey);
if (timeout) {
clearTimeout(timeout);
pendingChunks.current.delete(chunkKey);
}
// 调用确认回调
const callbacks = chunkAckCallbacks.current.get(chunkKey);
if (callbacks) {
callbacks.forEach(cb => cb(ack));
chunkAckCallbacks.current.delete(chunkKey);
}
// 更新传输状态
const status = transferStatus.current.get(ack.fileId);
if (status) {
if (ack.success) {
status.acknowledgedChunks.add(ack.chunkIndex);
status.failedChunks.delete(ack.chunkIndex);
} else {
status.failedChunks.add(ack.chunkIndex);
}
}
break;
}
}, [updateState]);
@@ -152,26 +275,72 @@ export function useFileTransferBusiness(connection: WebRTCConnection) {
return;
}
const { fileId, chunkIndex, totalChunks } = expectedChunk.current;
const { fileId, chunkIndex, totalChunks, checksum: expectedChecksum } = expectedChunk.current;
const fileInfo = receivingFiles.current.get(fileId);
if (fileInfo) {
// 验证数据完整性
const actualChecksum = calculateChecksum(data);
const isValid = !expectedChecksum || actualChecksum === expectedChecksum;
if (!isValid) {
console.warn(`文件块校验失败: 期望 ${expectedChecksum}, 实际 ${actualChecksum}`);
// 发送失败确认
connection.sendMessage({
type: 'file-chunk-ack',
payload: {
fileId,
chunkIndex,
success: false,
checksum: actualChecksum
}
}, CHANNEL_NAME);
expectedChunk.current = null;
return;
}
// 数据有效,保存到缓存
fileInfo.chunks[chunkIndex] = data;
fileInfo.receivedChunks++;
const progress = (fileInfo.receivedChunks / totalChunks) * 100;
updateState({ progress });
fileProgressCallbacks.current.forEach(cb => cb({
fileId: fileId,
fileName: fileInfo.metadata.name,
progress
}));
// 更新接收进度跟踪
const progressInfo = receiveProgress.current.get(fileId);
if (progressInfo) {
progressInfo.receivedChunks++;
progressInfo.progress = progressInfo.totalChunks > 0 ?
(progressInfo.receivedChunks / progressInfo.totalChunks) * 100 : 0;
// 只有当这个文件是当前活跃文件时才更新全局进度
if (activeReceiveFile.current === fileId) {
updateState({ progress: progressInfo.progress });
}
// 触发进度回调
fileProgressCallbacks.current.forEach(cb => cb({
fileId: fileId,
fileName: progressInfo.fileName,
progress: progressInfo.progress
}));
console.log(`文件 ${fileInfo.metadata.name} 接收进度: ${progress.toFixed(1)}%`);
console.log(`文件 ${progressInfo.fileName} 接收进度: ${progressInfo.progress.toFixed(1)}%`);
}
// 发送成功确认
connection.sendMessage({
type: 'file-chunk-ack',
payload: {
fileId,
chunkIndex,
success: true,
checksum: actualChecksum
}
}, CHANNEL_NAME);
expectedChunk.current = null;
}
}, [updateState]);
}, [updateState, connection]);
// 设置处理器
useEffect(() => {
@@ -201,8 +370,60 @@ export function useFileTransferBusiness(connection: WebRTCConnection) {
return connection.connect(roomCode, role);
}, [connection]);
// 发送文件
const sendFile = useCallback(async (file: File, fileId?: string) => {
// 安全发送单个文件
const sendChunkWithAck = useCallback(async (
fileId: string,
chunkIndex: number,
chunkData: ArrayBuffer,
checksum: string,
retryCount = 0
): Promise<boolean> => {
return new Promise((resolve) => {
const chunkKey = `${fileId}-${chunkIndex}`;
// 设置确认回调
const ackCallback = (ack: ChunkAck) => {
if (ack.success) {
resolve(true);
} else {
console.warn(`文件块 ${chunkIndex} 确认失败,准备重试`);
resolve(false);
}
};
// 注册确认回调
if (!chunkAckCallbacks.current.has(chunkKey)) {
chunkAckCallbacks.current.set(chunkKey, new Set());
}
chunkAckCallbacks.current.get(chunkKey)!.add(ackCallback);
// 设置超时定时器
const timeout = setTimeout(() => {
console.warn(`文件块 ${chunkIndex} 确认超时`);
chunkAckCallbacks.current.get(chunkKey)?.delete(ackCallback);
resolve(false);
}, ACK_TIMEOUT);
pendingChunks.current.set(chunkKey, timeout);
// 发送块信息
connection.sendMessage({
type: 'file-chunk-info',
payload: {
fileId,
chunkIndex,
totalChunks: 0, // 这里不需要,因为已经在元数据中发送
checksum
}
}, CHANNEL_NAME);
// 发送块数据
connection.sendData(chunkData);
});
}, [connection]);
// 安全发送文件
const sendFileSecure = useCallback(async (file: File, fileId?: string) => {
if (connection.getChannelState() !== 'open') {
updateState({ error: '连接未就绪' });
return;
@@ -211,10 +432,24 @@ export function useFileTransferBusiness(connection: WebRTCConnection) {
const actualFileId = fileId || `file_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
const totalChunks = Math.ceil(file.size / CHUNK_SIZE);
console.log('开始发送文件:', file.name, '文件ID:', actualFileId, '总块数:', totalChunks);
console.log('开始安全发送文件:', file.name, '文件ID:', actualFileId, '总块数:', totalChunks);
updateState({ isTransferring: true, progress: 0, error: null });
// 初始化传输状态
const status: TransferStatus = {
fileId: actualFileId,
fileName: file.name,
totalChunks,
sentChunks: new Set(),
acknowledgedChunks: new Set(),
failedChunks: new Set(),
lastChunkTime: Date.now(),
retryCount: new Map(),
averageSpeed: 0
};
transferStatus.current.set(actualFileId, status);
try {
// 1. 发送文件元数据
connection.sendMessage({
@@ -229,25 +464,52 @@ export function useFileTransferBusiness(connection: WebRTCConnection) {
// 2. 分块发送文件
for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++) {
const start = chunkIndex * CHUNK_SIZE;
const end = Math.min(start + CHUNK_SIZE, file.size);
const chunk = file.slice(start, end);
let success = false;
let retryCount = 0;
// 先发送块信息
connection.sendMessage({
type: 'file-chunk-info',
payload: {
fileId: actualFileId,
chunkIndex,
totalChunks
while (!success && retryCount <= MAX_RETRIES) {
const start = chunkIndex * CHUNK_SIZE;
const end = Math.min(start + CHUNK_SIZE, file.size);
const chunk = file.slice(start, end);
const arrayBuffer = await chunk.arrayBuffer();
const checksum = calculateChecksum(arrayBuffer);
console.log(`发送文件块 ${chunkIndex}/${totalChunks}, 重试次数: ${retryCount}`);
// 发送块并等待确认
success = await sendChunkWithAck(actualFileId, chunkIndex, arrayBuffer, checksum, retryCount);
if (success) {
status.sentChunks.add(chunkIndex);
status.acknowledgedChunks.add(chunkIndex);
status.failedChunks.delete(chunkIndex);
// 计算传输速度
const now = Date.now();
const timeDiff = (now - status.lastChunkTime) / 1000; // 秒
if (timeDiff > 0) {
const speed = (arrayBuffer.byteLength / 1024) / timeDiff; // KB/s
status.averageSpeed = status.averageSpeed * 0.7 + speed * 0.3; // 平滑平均
}
status.lastChunkTime = now;
} else {
retryCount++;
status.retryCount.set(chunkIndex, retryCount);
if (retryCount > MAX_RETRIES) {
status.failedChunks.add(chunkIndex);
throw new Error(`文件块 ${chunkIndex} 发送失败,超过最大重试次数`);
}
// 指数退避
const delay = Math.min(RETRY_DELAY * Math.pow(2, retryCount - 1), 10000);
console.log(`等待 ${delay}ms 后重试文件块 ${chunkIndex}`);
await new Promise(resolve => setTimeout(resolve, delay));
}
}, CHANNEL_NAME);
}
// 再发送块数据
const arrayBuffer = await chunk.arrayBuffer();
connection.sendData(arrayBuffer);
const progress = ((chunkIndex + 1) / totalChunks) * 100;
// 更新进度
const progress = (status.acknowledgedChunks.size / totalChunks) * 100;
updateState({ progress });
fileProgressCallbacks.current.forEach(cb => cb({
@@ -256,29 +518,49 @@ export function useFileTransferBusiness(connection: WebRTCConnection) {
progress
}));
// 简单的流控:等待一小段时间让接收方处理
if (chunkIndex % 10 === 0) {
await new Promise(resolve => setTimeout(resolve, 10));
// 自适应流控:根据传输速度调整发送间隔
if (status.averageSpeed > 0) {
const chunkSize = Math.min(CHUNK_SIZE, file.size - chunkIndex * CHUNK_SIZE);
const expectedTime = (chunkSize / 1024) / status.averageSpeed;
const actualTime = Date.now() - status.lastChunkTime;
const delay = Math.max(0, expectedTime - actualTime);
if (delay > 10) {
await new Promise(resolve => setTimeout(resolve, Math.min(delay, 100)));
}
}
}
// 3. 发送完成信号
// 3. 验证所有块都已确认
if (status.acknowledgedChunks.size !== totalChunks) {
throw new Error(`文件传输不完整:${status.acknowledgedChunks.size}/${totalChunks} 块已确认`);
}
// 4. 发送完成信号
connection.sendMessage({
type: 'file-complete',
payload: { fileId: actualFileId }
}, CHANNEL_NAME);
updateState({ isTransferring: false, progress: 100 });
console.log('文件发送完成:', file.name);
console.log('文件安全发送完成:', file.name, `平均速度: ${status.averageSpeed.toFixed(2)} KB/s`);
transferStatus.current.delete(actualFileId);
} catch (error) {
console.error('发送文件失败:', error);
console.error('安全发送文件失败:', error);
updateState({
error: error instanceof Error ? error.message : '发送失败',
isTransferring: false
});
transferStatus.current.delete(actualFileId);
}
}, [connection, updateState]);
}, [connection, updateState, sendChunkWithAck]);
// 保持原有的 sendFile 方法用于向后兼容
const sendFile = useCallback(async (file: File, fileId?: string) => {
// 默认使用新的安全发送方法
return sendFileSecure(file, fileId);
}, [sendFileSecure]);
// 发送文件列表
const sendFileList = useCallback((fileList: FileInfo[]) => {

View File

@@ -0,0 +1,74 @@
/** @type {import('tailwindcss').Config} */
module.exports = {
content: [
'./src/pages/**/*.{js,ts,jsx,tsx,mdx}',
'./src/components/**/*.{js,ts,jsx,tsx,mdx}',
'./src/app/**/*.{js,ts,jsx,tsx,mdx}',
],
theme: {
extend: {
colors: {
border: "hsl(var(--border))",
input: "hsl(var(--input))",
ring: "hsl(var(--ring))",
background: "hsl(var(--background))",
foreground: "hsl(var(--foreground))",
primary: {
DEFAULT: "hsl(var(--primary))",
foreground: "hsl(var(--primary-foreground))",
},
secondary: {
DEFAULT: "hsl(var(--secondary))",
foreground: "hsl(var(--secondary-foreground))",
},
destructive: {
DEFAULT: "hsl(var(--destructive))",
foreground: "hsl(var(--destructive-foreground))",
},
muted: {
DEFAULT: "hsl(var(--muted))",
foreground: "hsl(var(--muted-foreground))",
},
accent: {
DEFAULT: "hsl(var(--accent))",
foreground: "hsl(var(--accent-foreground))",
},
popover: {
DEFAULT: "hsl(var(--popover))",
foreground: "hsl(var(--popover-foreground))",
},
card: {
DEFAULT: "hsl(var(--card))",
foreground: "hsl(var(--card-foreground))",
},
},
borderRadius: {
lg: "var(--radius)",
md: "calc(var(--radius) - 2px)",
sm: "calc(var(--radius) - 4px)",
},
borderWidth: {
'3': '3px',
'4': '4px',
},
boxShadow: {
'cartoon-sm': '2px 2px 0 #000',
'cartoon': '4px 4px 0 #000',
'cartoon-md': '6px 6px 0 #000',
'cartoon-lg': '8px 8px 0 #000',
'cartoon-xl': '10px 10px 0 #000',
},
animation: {
'bounce-in': 'bounce-in 0.8s cubic-bezier(0.68, -0.55, 0.265, 1.55)',
'wiggle': 'wiggle 0.5s ease-in-out infinite',
'float-cartoon': 'float-cartoon 4s ease-in-out infinite',
'rainbow': 'rainbow 5s ease infinite',
'gradient-shift': 'gradientShift 15s ease infinite',
},
fontFamily: {
'cartoon': ['"Comic Sans MS"', '"Chalkboard SE"', '"Comic Neue"', 'cursive'],
},
},
},
plugins: [],
}