支持多人加入/下载

This commit is contained in:
seven
2025-07-28 18:12:05 +08:00
parent 8031a29037
commit 22cbaae0ab
12 changed files with 2485 additions and 370 deletions

View File

@@ -123,6 +123,69 @@ func (h *Handler) GetRoomInfoHandler(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(response)
}
// GetRoomStatusHandler 获取房间状态API
func (h *Handler) GetRoomStatusHandler(w http.ResponseWriter, r *http.Request) {
code := r.URL.Query().Get("code")
if code == "" {
http.Error(w, "缺少取件码", http.StatusBadRequest)
return
}
status, exists := h.p2pService.GetRoomStatusByCode(code)
if !exists {
response := map[string]interface{}{
"success": false,
"message": "取件码不存在或已过期",
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
return
}
response := map[string]interface{}{
"success": true,
"status": status,
"message": "房间状态获取成功",
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// UpdateRoomFilesHandler 更新房间文件列表API
func (h *Handler) UpdateRoomFilesHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "方法不允许", http.StatusMethodNotAllowed)
return
}
var req struct {
Code string `json:"code"`
Files []models.FileTransferInfo `json:"files"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "解析请求失败", http.StatusBadRequest)
return
}
// 更新房间文件列表
success := h.p2pService.UpdateRoomFiles(req.Code, req.Files)
response := map[string]interface{}{
"success": success,
}
if success {
response["message"] = "文件列表更新成功"
} else {
response["message"] = "房间不存在或更新失败"
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(response)
}
// HandleP2PWebSocket 处理P2P WebSocket连接
func (h *Handler) HandleP2PWebSocket(w http.ResponseWriter, r *http.Request) {
h.p2pService.HandleWebSocket(w, r)

View File

@@ -2,6 +2,8 @@ package models
import (
"time"
"github.com/gorilla/websocket"
)
// FileInfo 文件信息结构
@@ -60,6 +62,25 @@ type FileTransferInfo struct {
LastModified int64 `json:"lastModified"`
}
// ClientInfo 客户端连接信息
type ClientInfo struct {
ID string `json:"id"` // 客户端唯一标识
Role string `json:"role"` // sender 或 receiver
Connection *websocket.Conn `json:"-"` // WebSocket连接不序列化
JoinedAt time.Time `json:"joined_at"` // 加入时间
UserAgent string `json:"user_agent"` // 用户代理
}
// RoomStatus 房间状态信息
type RoomStatus struct {
Code string `json:"code"`
FileCount int `json:"file_count"`
SenderCount int `json:"sender_count"`
ReceiverCount int `json:"receiver_count"`
Clients []ClientInfo `json:"clients"`
CreatedAt time.Time `json:"created_at"`
}
// ErrorResponse 错误响应结构
type ErrorResponse struct {
Success bool `json:"success"`

View File

@@ -1,8 +1,10 @@
package services
import (
"crypto/rand"
"fmt"
"log"
"math/rand"
mathrand "math/rand"
"net/http"
"strconv"
"sync"
@@ -15,11 +17,10 @@ import (
type FileTransferRoom struct {
ID string
Code string // 取件码
Files []models.FileTransferInfo // 待传输文件信息
Sender *websocket.Conn // 发送方连接
Receiver *websocket.Conn // 接收方连接
CreatedAt time.Time // 创建时间
Code string // 取件码
Files []models.FileTransferInfo // 待传输文件信息
Clients map[string]*models.ClientInfo // 所有连接的客户端 (客户端ID -> ClientInfo)
CreatedAt time.Time // 创建时间
mutex sync.RWMutex
}
@@ -57,6 +58,7 @@ func (p *P2PService) CreateRoom(files []models.FileTransferInfo) string {
ID: "room_" + code,
Code: code,
Files: files,
Clients: make(map[string]*models.ClientInfo),
CreatedAt: time.Now(),
}
@@ -75,6 +77,13 @@ func (p *P2PService) GetRoomByCode(code string) (*FileTransferRoom, bool) {
return room, exists
}
// generateClientID 生成客户端唯一标识
func generateClientID() string {
b := make([]byte, 8)
rand.Read(b)
return fmt.Sprintf("client_%x", b)
}
// HandleWebSocket 处理WebSocket连接
func (p *P2PService) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, err := p.upgrader.Upgrade(w, r, nil)
@@ -100,16 +109,23 @@ func (p *P2PService) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
return
}
// 设置连接
room.mutex.Lock()
if role == "sender" {
room.Sender = conn
log.Printf("发送方连接到房间: %s", code)
} else {
room.Receiver = conn
log.Printf("接收方连接到房间: %s", code)
// 生成客户端ID并创建客户端信息
clientID := generateClientID()
client := &models.ClientInfo{
ID: clientID,
Role: role,
Connection: conn,
JoinedAt: time.Now(),
UserAgent: r.Header.Get("User-Agent"),
}
// 发送文件列表给接收方
// 将客户端加入房间
room.mutex.Lock()
room.Clients[clientID] = client
log.Printf("%s连接到房间: %s (客户端ID: %s)", role, code, clientID)
// 如果是接收方,发送文件列表
if role == "receiver" {
filesMsg := models.VideoMessage{
Type: "file-list",
Payload: map[string]interface{}{"files": room.Files},
@@ -118,28 +134,49 @@ func (p *P2PService) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
log.Printf("发送文件列表失败: %v", err)
}
// 通知发送方接收方已连接
if room.Sender != nil {
readyMsg := models.VideoMessage{
Type: "receiver-ready",
Payload: map[string]interface{}{},
}
if err := room.Sender.WriteJSON(readyMsg); err != nil {
log.Printf("发送接收方就绪消息失败: %v", err)
}
}
// 通知所有发送方有新的接收方加入
p.notifyClients(room, "sender", models.VideoMessage{
Type: "new-receiver",
Payload: map[string]interface{}{
"client_id": clientID,
"joined_at": client.JoinedAt,
},
})
} else if role == "sender" {
// 通知所有接收方有新的发送方加入
p.notifyClients(room, "receiver", models.VideoMessage{
Type: "new-sender",
Payload: map[string]interface{}{
"client_id": clientID,
"joined_at": client.JoinedAt,
},
})
}
room.mutex.Unlock() // 连接关闭时清理
// 发送房间状态给所有客户端
p.broadcastRoomStatus(room)
room.mutex.Unlock()
// 连接关闭时清理
defer func() {
room.mutex.Lock()
if role == "sender" {
room.Sender = nil
} else {
room.Receiver = nil
}
delete(room.Clients, clientID)
log.Printf("客户端断开连接: %s (房间: %s)", clientID, code)
// 通知其他客户端有人离开
p.notifyClients(room, "", models.VideoMessage{
Type: "client-left",
Payload: map[string]interface{}{
"client_id": clientID,
"role": role,
},
})
// 发送更新后的房间状态
p.broadcastRoomStatus(room)
room.mutex.Unlock()
// 如果双方都断开连接,删除房间
// 如果房间没有客户端了,清理房间
p.cleanupRoom(code)
}()
@@ -152,37 +189,146 @@ func (p *P2PService) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
break
}
log.Printf("收到WebSocket消息: 类型=%s, 来自=%s, 房间=%s", msg.Type, role, code)
log.Printf("收到WebSocket消息: 类型=%s, 来自=%s, 房间=%s", msg.Type, clientID, code)
// 转发消息到对方
p.forwardMessage(room, role, msg)
// 处理特殊消息类型
switch msg.Type {
case "file-request":
// 处理文件请求
p.handleFileRequest(room, clientID, msg)
case "file-info", "file-chunk", "file-complete":
// 处理文件传输相关消息,直接转发给接收方
p.forwardMessage(room, clientID, msg)
default:
// 转发消息到对应的客户端
p.forwardMessage(room, clientID, msg)
}
}
}
// forwardMessage 转发消息到对方
func (p *P2PService) forwardMessage(room *FileTransferRoom, senderRole string, msg models.VideoMessage) {
// notifyClients 通知指定角色的客户端
func (p *P2PService) notifyClients(room *FileTransferRoom, role string, msg models.VideoMessage) {
for _, client := range room.Clients {
if role == "" || client.Role == role {
if err := client.Connection.WriteJSON(msg); err != nil {
log.Printf("发送消息到客户端失败 %s: %v", client.ID, err)
}
}
}
}
// broadcastRoomStatus 广播房间状态给所有客户端
func (p *P2PService) broadcastRoomStatus(room *FileTransferRoom) {
status := p.getRoomStatus(room)
statusMsg := models.VideoMessage{
Type: "room-status",
Payload: status,
}
for _, client := range room.Clients {
if err := client.Connection.WriteJSON(statusMsg); err != nil {
log.Printf("发送房间状态失败 %s: %v", client.ID, err)
}
}
}
// getRoomStatus 获取房间状态
func (p *P2PService) getRoomStatus(room *FileTransferRoom) models.RoomStatus {
senderCount := 0
receiverCount := 0
clients := make([]models.ClientInfo, 0, len(room.Clients))
for _, client := range room.Clients {
// 创建不包含连接的客户端信息副本
clientCopy := models.ClientInfo{
ID: client.ID,
Role: client.Role,
JoinedAt: client.JoinedAt,
UserAgent: client.UserAgent,
}
clients = append(clients, clientCopy)
if client.Role == "sender" {
senderCount++
} else if client.Role == "receiver" {
receiverCount++
}
}
return models.RoomStatus{
Code: room.Code,
FileCount: len(room.Files),
SenderCount: senderCount,
ReceiverCount: receiverCount,
Clients: clients,
CreatedAt: room.CreatedAt,
}
}
// handleFileRequest 处理文件请求
func (p *P2PService) handleFileRequest(room *FileTransferRoom, clientID string, msg models.VideoMessage) {
// 获取请求的文件ID
payload, ok := msg.Payload.(map[string]interface{})
if !ok {
log.Printf("无效的文件请求消息格式")
return
}
fileID, ok := payload["file_id"].(string)
if !ok {
log.Printf("缺少文件ID")
return
}
// 转发文件请求给所有发送方
requestMsg := models.VideoMessage{
Type: "file-request",
Payload: map[string]interface{}{
"file_id": fileID,
"requester": clientID,
"request_id": payload["request_id"],
},
}
p.notifyClients(room, "sender", requestMsg)
}
// forwardMessage 转发消息到指定客户端或所有对应角色的客户端
func (p *P2PService) forwardMessage(room *FileTransferRoom, senderClientID string, msg models.VideoMessage) {
room.mutex.RLock()
defer room.mutex.RUnlock()
var targetConn *websocket.Conn
var targetRole string
if senderRole == "sender" && room.Receiver != nil {
targetConn = room.Receiver
senderClient, exists := room.Clients[senderClientID]
if !exists {
log.Printf("发送方客户端不存在: %s", senderClientID)
return
}
// 检查消息是否指定了目标客户端
if payload, ok := msg.Payload.(map[string]interface{}); ok {
if targetID, hasTarget := payload["target_client"].(string); hasTarget {
// 发送给指定客户端
if targetClient, exists := room.Clients[targetID]; exists {
log.Printf("转发消息: 类型=%s, 从%s到%s", msg.Type, senderClientID, targetID)
if err := targetClient.Connection.WriteJSON(msg); err != nil {
log.Printf("转发消息失败: %v", err)
}
return
}
}
}
// 否则根据角色转发给对应的客户端
targetRole := ""
if senderClient.Role == "sender" {
targetRole = "receiver"
} else if senderRole == "receiver" && room.Sender != nil {
targetConn = room.Sender
} else if senderClient.Role == "receiver" {
targetRole = "sender"
}
if targetConn != nil {
log.Printf("转发消息: 类型=%s, 从%s到%s", msg.Type, senderRole, targetRole)
if err := targetConn.WriteJSON(msg); err != nil {
log.Printf("转发消息失败: %v", err)
} else {
log.Printf("消息转发成功: 类型=%s", msg.Type)
}
} else {
log.Printf("无法转发消息: 目标连接不存在, 发送方=%s", senderRole)
if targetRole != "" {
log.Printf("广播消息: 类型=%s, 从%s到所有%s", msg.Type, senderClient.Role, targetRole)
p.notifyClients(room, targetRole, msg)
}
}
@@ -193,10 +339,10 @@ func (p *P2PService) cleanupRoom(code string) {
if room, exists := p.rooms[code]; exists {
room.mutex.RLock()
bothDisconnected := room.Sender == nil && room.Receiver == nil
noClients := len(room.Clients) == 0
room.mutex.RUnlock()
if bothDisconnected {
if noClients {
delete(p.rooms, code)
log.Printf("清理房间: %s", code)
}
@@ -224,12 +370,27 @@ func (p *P2PService) cleanupExpiredRooms() {
// generatePickupCode 生成6位取件码
func generatePickupCode() string {
rand.Seed(time.Now().UnixNano())
code := rand.Intn(900000) + 100000
mathrand.Seed(time.Now().UnixNano())
code := mathrand.Intn(900000) + 100000
return strconv.Itoa(code)
}
// GetRoomStats 获取房间统计信息
// GetRoomStatusByCode 根据取件码获取房间状态
func (p *P2PService) GetRoomStatusByCode(code string) (models.RoomStatus, bool) {
p.roomsMux.RLock()
defer p.roomsMux.RUnlock()
room, exists := p.rooms[code]
if !exists {
return models.RoomStatus{}, false
}
room.mutex.RLock()
status := p.getRoomStatus(room)
room.mutex.RUnlock()
return status, true
}
func (p *P2PService) GetRoomStats() map[string]interface{} {
p.roomsMux.RLock()
defer p.roomsMux.RUnlock()
@@ -241,12 +402,14 @@ func (p *P2PService) GetRoomStats() map[string]interface{} {
for code, room := range p.rooms {
room.mutex.RLock()
status := p.getRoomStatus(room)
roomInfo := map[string]interface{}{
"code": code,
"file_count": len(room.Files),
"has_sender": room.Sender != nil,
"has_receiver": room.Receiver != nil,
"created_at": room.CreatedAt,
"code": code,
"file_count": len(room.Files),
"sender_count": status.SenderCount,
"receiver_count": status.ReceiverCount,
"total_clients": len(room.Clients),
"created_at": room.CreatedAt,
}
room.mutex.RUnlock()
stats["rooms"] = append(stats["rooms"].([]map[string]interface{}), roomInfo)
@@ -254,3 +417,40 @@ func (p *P2PService) GetRoomStats() map[string]interface{} {
return stats
}
// UpdateRoomFiles 更新房间文件列表
func (p *P2PService) UpdateRoomFiles(code string, files []models.FileTransferInfo) bool {
p.roomsMux.RLock()
room, exists := p.rooms[code]
p.roomsMux.RUnlock()
if !exists {
return false
}
room.mutex.Lock()
room.Files = files
room.mutex.Unlock()
log.Printf("房间 %s 文件列表已更新,共 %d 个文件", code, len(files))
// 通知所有连接的客户端文件列表已更新
room.mutex.RLock()
for _, client := range room.Clients {
if client.Role == "receiver" {
message := models.VideoMessage{
Type: "file-list-updated",
Payload: map[string]interface{}{
"files": files,
},
}
if err := client.Connection.WriteJSON(message); err != nil {
log.Printf("发送文件列表更新消息失败: %v", err)
}
}
}
room.mutex.RUnlock()
return true
}