Files
file-transfer-go/internal/services/p2p_service.go
2025-08-01 18:38:28 +08:00

636 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package services
import (
"crypto/rand"
"fmt"
"log"
mathrand "math/rand"
"net/http"
"strconv"
"sync"
"time"
"chuan/internal/models"
"github.com/gorilla/websocket"
)
type FileTransferRoom struct {
ID string
Code string // 取件码
Files []models.FileTransferInfo // 待传输文件信息
Clients map[string]*models.ClientInfo // 所有连接的客户端 (客户端ID -> ClientInfo)
CreatedAt time.Time // 创建时间
TextContent string // 文字内容
IsTextRoom bool // 是否是文字传输房间
mutex sync.RWMutex
}
type P2PService struct {
rooms map[string]*FileTransferRoom // 使用取件码作为key
roomsMux sync.RWMutex
upgrader websocket.Upgrader
}
func NewP2PService() *P2PService {
service := &P2PService{
rooms: make(map[string]*FileTransferRoom),
roomsMux: sync.RWMutex{},
upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true // 允许所有来源,生产环境应当限制
},
},
}
// 启动房间清理任务
go service.cleanupExpiredRooms()
return service
}
// CreateRoom 创建新房间并返回取件码
func (p *P2PService) CreateRoom(files []models.FileTransferInfo) string {
code := generatePickupCode()
p.roomsMux.Lock()
defer p.roomsMux.Unlock()
room := &FileTransferRoom{
ID: "room_" + code,
Code: code,
Files: files,
Clients: make(map[string]*models.ClientInfo),
CreatedAt: time.Now(),
}
p.rooms[code] = room
log.Printf("创建房间,取件码: %s文件数量: %d", code, len(files))
return code
}
// GetRoomByCode 根据取件码获取房间
func (p *P2PService) GetRoomByCode(code string) (*FileTransferRoom, bool) {
p.roomsMux.RLock()
defer p.roomsMux.RUnlock()
room, exists := p.rooms[code]
return room, exists
}
// CreateTextRoom 创建文字传输房间并返回取件码
func (p *P2PService) CreateTextRoom(text string) string {
code := generatePickupCode()
p.roomsMux.Lock()
defer p.roomsMux.Unlock()
room := &FileTransferRoom{
ID: "text_room_" + code,
Code: code,
Files: []models.FileTransferInfo{}, // 文字房间不需要文件
Clients: make(map[string]*models.ClientInfo),
CreatedAt: time.Now(),
TextContent: text,
IsTextRoom: true,
}
p.rooms[code] = room
log.Printf("创建文字传输房间,取件码: %s文字长度: %d", code, len(text))
return code
}
// GetTextContent 根据取件码获取文字内容
func (p *P2PService) GetTextContent(code string) (string, bool) {
p.roomsMux.RLock()
defer p.roomsMux.RUnlock()
room, exists := p.rooms[code]
if !exists || !room.IsTextRoom {
return "", false
}
return room.TextContent, true
}
// generateClientID 生成客户端唯一标识
func generateClientID() string {
b := make([]byte, 8)
rand.Read(b)
return fmt.Sprintf("client_%x", b)
}
// HandleWebSocket 处理WebSocket连接
func (p *P2PService) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, err := p.upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("WebSocket升级失败: %v", err)
return
}
defer conn.Close()
// 获取取件码和角色
code := r.URL.Query().Get("code")
role := r.URL.Query().Get("role") // "sender" or "receiver"
if code == "" || (role != "sender" && role != "receiver") {
log.Printf("缺少取件码或角色参数")
return
}
// 获取房间
room, exists := p.GetRoomByCode(code)
if !exists {
log.Printf("房间不存在: %s", code)
return
}
// 生成客户端ID并创建客户端信息
clientID := generateClientID()
client := &models.ClientInfo{
ID: clientID,
Role: role,
Connection: conn,
JoinedAt: time.Now(),
UserAgent: r.Header.Get("User-Agent"),
}
// 将客户端加入房间
room.mutex.Lock()
room.Clients[clientID] = client
log.Printf("%s连接到房间: %s (客户端ID: %s)", role, code, clientID)
// 如果是接收方,发送文件列表
if role == "receiver" {
filesMsg := models.VideoMessage{
Type: "file-list",
Payload: map[string]interface{}{"files": room.Files},
}
if err := conn.WriteJSON(filesMsg); err != nil {
log.Printf("发送文件列表失败: %v", err)
}
// 通知所有发送方有新的接收方加入
p.notifyClients(room, "sender", models.VideoMessage{
Type: "new-receiver",
Payload: map[string]interface{}{
"client_id": clientID,
"joined_at": client.JoinedAt,
},
})
} else if role == "sender" {
// 通知所有接收方有新的发送方加入
p.notifyClients(room, "receiver", models.VideoMessage{
Type: "new-sender",
Payload: map[string]interface{}{
"client_id": clientID,
"joined_at": client.JoinedAt,
},
})
}
// 发送房间状态给所有客户端
p.broadcastRoomStatus(room)
room.mutex.Unlock()
// 连接关闭时清理
defer func() {
room.mutex.Lock()
delete(room.Clients, clientID)
log.Printf("客户端断开连接: %s (房间: %s)", clientID, code)
// 通知其他客户端有人离开
p.notifyClients(room, "", models.VideoMessage{
Type: "client-left",
Payload: map[string]interface{}{
"client_id": clientID,
"role": role,
},
})
// 发送更新后的房间状态
p.broadcastRoomStatus(room)
room.mutex.Unlock()
// 如果房间没有客户端了,清理房间
p.cleanupRoom(code)
}()
// 处理消息
for {
var msg models.VideoMessage
err := conn.ReadJSON(&msg)
if err != nil {
log.Printf("读取WebSocket消息失败: %v", err)
break
}
log.Printf("收到WebSocket消息: 类型=%s, 来自=%s, 房间=%s", msg.Type, clientID, code)
// 处理特殊消息类型
switch msg.Type {
case "update-file-list":
// 处理文件列表更新
p.handleFileListUpdate(room, clientID, msg)
case "file-request":
// 处理文件请求
p.handleFileRequest(room, clientID, msg)
case "file-info", "file-chunk", "file-complete":
// 处理文件传输相关消息,直接转发给接收方
p.forwardMessage(room, clientID, msg)
case "text-update":
// 处理实时文字更新
p.handleTextUpdate(room, clientID, msg)
case "text-send":
// 处理文字发送
p.handleTextSend(room, clientID, msg)
case "image-send":
// 处理图片发送
p.handleImageSend(room, clientID, msg)
default:
// 转发消息到对应的客户端
p.forwardMessage(room, clientID, msg)
}
}
}
// notifyClients 通知指定角色的客户端
func (p *P2PService) notifyClients(room *FileTransferRoom, role string, msg models.VideoMessage) {
for _, client := range room.Clients {
if role == "" || client.Role == role {
if err := client.Connection.WriteJSON(msg); err != nil {
log.Printf("发送消息到客户端失败 %s: %v", client.ID, err)
}
}
}
}
// broadcastRoomStatus 广播房间状态给所有客户端
func (p *P2PService) broadcastRoomStatus(room *FileTransferRoom) {
status := p.getRoomStatus(room)
statusMsg := models.VideoMessage{
Type: "room-status",
Payload: status,
}
for _, client := range room.Clients {
if err := client.Connection.WriteJSON(statusMsg); err != nil {
log.Printf("发送房间状态失败 %s: %v", client.ID, err)
}
}
}
// getRoomStatus 获取房间状态
func (p *P2PService) getRoomStatus(room *FileTransferRoom) models.RoomStatus {
senderCount := 0
receiverCount := 0
clients := make([]models.ClientInfo, 0, len(room.Clients))
for _, client := range room.Clients {
// 创建不包含连接的客户端信息副本
clientCopy := models.ClientInfo{
ID: client.ID,
Role: client.Role,
JoinedAt: client.JoinedAt,
UserAgent: client.UserAgent,
}
clients = append(clients, clientCopy)
if client.Role == "sender" {
senderCount++
} else if client.Role == "receiver" {
receiverCount++
}
}
return models.RoomStatus{
Code: room.Code,
FileCount: len(room.Files),
SenderCount: senderCount,
ReceiverCount: receiverCount,
Clients: clients,
CreatedAt: room.CreatedAt,
}
}
// handleFileListUpdate 处理文件列表更新
func (p *P2PService) handleFileListUpdate(room *FileTransferRoom, clientID string, msg models.VideoMessage) {
// 获取文件列表
payload, ok := msg.Payload.(map[string]interface{})
if !ok {
log.Printf("无效的文件列表更新消息格式")
return
}
filesData, ok := payload["files"].([]interface{})
if !ok {
log.Printf("缺少文件列表数据")
return
}
// 转换文件列表格式
var files []models.FileTransferInfo
for _, fileData := range filesData {
if fileMap, ok := fileData.(map[string]interface{}); ok {
file := models.FileTransferInfo{
ID: getString(fileMap, "id"),
Name: getString(fileMap, "name"),
Size: getInt64(fileMap, "size"),
Type: getString(fileMap, "type"),
LastModified: getInt64(fileMap, "lastModified"),
}
files = append(files, file)
}
}
log.Printf("收到文件列表更新请求,共 %d 个文件", len(files))
// 更新房间文件列表
room.mutex.Lock()
room.Files = files
room.mutex.Unlock()
log.Printf("房间 %s 文件列表已更新,共 %d 个文件", room.Code, len(files))
// 通知所有接收方客户端文件列表已更新
room.mutex.RLock()
for _, client := range room.Clients {
if client.Role == "receiver" {
message := models.VideoMessage{
Type: "file-list-updated",
Payload: map[string]interface{}{
"files": files,
},
}
if err := client.Connection.WriteJSON(message); err != nil {
log.Printf("发送文件列表更新消息失败: %v", err)
} else {
log.Printf("已向接收方 %s 发送文件列表更新消息", client.ID)
}
}
}
room.mutex.RUnlock()
}
// 辅助函数从map中获取字符串值
func getString(m map[string]interface{}, key string) string {
if val, ok := m[key].(string); ok {
return val
}
return ""
}
// 辅助函数从map中获取int64值
func getInt64(m map[string]interface{}, key string) int64 {
if val, ok := m[key].(float64); ok {
return int64(val)
}
if val, ok := m[key].(int64); ok {
return val
}
if val, ok := m[key].(int); ok {
return int64(val)
}
return 0
}
// handleFileRequest 处理文件请求
func (p *P2PService) handleFileRequest(room *FileTransferRoom, clientID string, msg models.VideoMessage) {
// 获取请求的文件ID
payload, ok := msg.Payload.(map[string]interface{})
if !ok {
log.Printf("无效的文件请求消息格式")
return
}
fileID, ok := payload["file_id"].(string)
if !ok {
log.Printf("缺少文件ID")
return
}
// 转发文件请求给所有发送方
requestMsg := models.VideoMessage{
Type: "file-request",
Payload: map[string]interface{}{
"file_id": fileID,
"requester": clientID,
"request_id": payload["request_id"],
},
}
p.notifyClients(room, "sender", requestMsg)
}
// forwardMessage 转发消息到指定客户端或所有对应角色的客户端
func (p *P2PService) forwardMessage(room *FileTransferRoom, senderClientID string, msg models.VideoMessage) {
room.mutex.RLock()
defer room.mutex.RUnlock()
senderClient, exists := room.Clients[senderClientID]
if !exists {
log.Printf("发送方客户端不存在: %s", senderClientID)
return
}
// 检查消息是否指定了目标客户端
if payload, ok := msg.Payload.(map[string]interface{}); ok {
if targetID, hasTarget := payload["target_client"].(string); hasTarget {
// 发送给指定客户端
if targetClient, exists := room.Clients[targetID]; exists {
log.Printf("转发消息: 类型=%s, 从%s到%s", msg.Type, senderClientID, targetID)
if err := targetClient.Connection.WriteJSON(msg); err != nil {
log.Printf("转发消息失败: %v", err)
}
return
}
}
}
// 否则根据角色转发给对应的客户端
targetRole := ""
if senderClient.Role == "sender" {
targetRole = "receiver"
} else if senderClient.Role == "receiver" {
targetRole = "sender"
}
if targetRole != "" {
log.Printf("广播消息: 类型=%s, 从%s到所有%s", msg.Type, senderClient.Role, targetRole)
p.notifyClients(room, targetRole, msg)
}
}
// cleanupRoom 清理房间
func (p *P2PService) cleanupRoom(code string) {
p.roomsMux.Lock()
defer p.roomsMux.Unlock()
if room, exists := p.rooms[code]; exists {
room.mutex.RLock()
noClients := len(room.Clients) == 0
room.mutex.RUnlock()
if noClients {
delete(p.rooms, code)
log.Printf("清理房间: %s", code)
}
}
}
// cleanupExpiredRooms 定期清理过期房间
func (p *P2PService) cleanupExpiredRooms() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for range ticker.C {
p.roomsMux.Lock()
now := time.Now()
for code, room := range p.rooms {
// 房间存在超过1小时则删除
if now.Sub(room.CreatedAt) > time.Hour {
delete(p.rooms, code)
log.Printf("清理过期房间: %s", code)
}
}
p.roomsMux.Unlock()
}
}
// generatePickupCode 生成6位取件码
func generatePickupCode() string {
mathrand.Seed(time.Now().UnixNano())
code := mathrand.Intn(900000) + 100000
return strconv.Itoa(code)
}
// GetRoomStatusByCode 根据取件码获取房间状态
func (p *P2PService) GetRoomStatusByCode(code string) (models.RoomStatus, bool) {
p.roomsMux.RLock()
defer p.roomsMux.RUnlock()
room, exists := p.rooms[code]
if !exists {
return models.RoomStatus{}, false
}
room.mutex.RLock()
status := p.getRoomStatus(room)
room.mutex.RUnlock()
return status, true
}
func (p *P2PService) GetRoomStats() map[string]interface{} {
p.roomsMux.RLock()
defer p.roomsMux.RUnlock()
stats := map[string]interface{}{
"total_rooms": len(p.rooms),
"rooms": make([]map[string]interface{}, 0),
}
for code, room := range p.rooms {
room.mutex.RLock()
status := p.getRoomStatus(room)
roomInfo := map[string]interface{}{
"code": code,
"file_count": len(room.Files),
"sender_count": status.SenderCount,
"receiver_count": status.ReceiverCount,
"total_clients": len(room.Clients),
"created_at": room.CreatedAt,
}
room.mutex.RUnlock()
stats["rooms"] = append(stats["rooms"].([]map[string]interface{}), roomInfo)
}
return stats
}
// UpdateRoomFiles 更新房间文件列表
func (p *P2PService) UpdateRoomFiles(code string, files []models.FileTransferInfo) bool {
p.roomsMux.RLock()
room, exists := p.rooms[code]
p.roomsMux.RUnlock()
if !exists {
return false
}
room.mutex.Lock()
room.Files = files
room.mutex.Unlock()
log.Printf("房间 %s 文件列表已更新,共 %d 个文件", code, len(files))
// 通知所有连接的客户端文件列表已更新
room.mutex.RLock()
for _, client := range room.Clients {
if client.Role == "receiver" {
message := models.VideoMessage{
Type: "file-list-updated",
Payload: map[string]interface{}{
"files": files,
},
}
if err := client.Connection.WriteJSON(message); err != nil {
log.Printf("发送文件列表更新消息失败: %v", err)
}
}
}
room.mutex.RUnlock()
return true
}
// handleTextUpdate 处理实时文字更新
func (p *P2PService) handleTextUpdate(room *FileTransferRoom, senderID string, msg models.VideoMessage) {
log.Printf("处理文字更新: 来自客户端 %s", senderID)
// 转发文字更新给房间内其他所有客户端
room.mutex.RLock()
defer room.mutex.RUnlock()
for clientID, client := range room.Clients {
if clientID != senderID { // 不发送给发送者自己
if err := client.Connection.WriteJSON(msg); err != nil {
log.Printf("转发文字更新失败 %s: %v", clientID, err)
}
}
}
}
// handleTextSend 处理文字发送
func (p *P2PService) handleTextSend(room *FileTransferRoom, senderID string, msg models.VideoMessage) {
log.Printf("处理文字发送: 来自客户端 %s", senderID)
// 转发文字发送给房间内所有客户端
room.mutex.RLock()
defer room.mutex.RUnlock()
for _, client := range room.Clients {
if err := client.Connection.WriteJSON(msg); err != nil {
log.Printf("转发文字发送失败 %s: %v", client.ID, err)
}
}
}
// handleImageSend 处理图片发送
func (p *P2PService) handleImageSend(room *FileTransferRoom, senderID string, msg models.VideoMessage) {
log.Printf("处理图片发送: 来自客户端 %s", senderID)
// 转发图片发送给房间内所有客户端
room.mutex.RLock()
defer room.mutex.RUnlock()
for _, client := range room.Clients {
if err := client.Connection.WriteJSON(msg); err != nil {
log.Printf("转发图片发送失败 %s: %v", client.ID, err)
}
}
}