Files
file-transfer-go/internal/services/webrtc_service.go
2025-09-16 16:41:38 +08:00

451 lines
12 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package services
import (
"encoding/json"
"fmt"
"log"
"math/rand"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
type WebRTCService struct {
rooms map[string]*WebRTCRoom
roomsMux sync.RWMutex
upgrader websocket.Upgrader
}
type WebRTCRoom struct {
Code string
Sender *WebRTCClient
Receiver *WebRTCClient
CreatedAt time.Time
ExpiresAt time.Time // 添加过期时间
}
type WebRTCClient struct {
ID string
Role string // "sender" or "receiver"
Connection *websocket.Conn
Room string
}
func NewWebRTCService() *WebRTCService {
service := &WebRTCService{
rooms: make(map[string]*WebRTCRoom),
roomsMux: sync.RWMutex{},
upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true // 允许所有来源,生产环境应当限制
},
},
}
// 启动房间清理任务
go service.cleanupExpiredRooms()
return service
}
type WebRTCMessage struct {
Type string `json:"type"`
From string `json:"from"`
To string `json:"to"`
Payload interface{} `json:"payload"`
}
// HandleWebSocket 处理WebRTC信令WebSocket连接
func (ws *WebRTCService) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
log.Printf("收到WebRTC WebSocket连接请求: %s", r.URL.String())
conn, err := ws.upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("WebRTC WebSocket升级失败: %v", err)
return
}
defer conn.Close()
// 获取房间码和角色
code := r.URL.Query().Get("code")
role := r.URL.Query().Get("role")
log.Printf("WebRTC连接参数: code=%s, role=%s", code, role)
if code == "" || (role != "sender" && role != "receiver") {
log.Printf("WebRTC连接参数无效: code=%s, role=%s", code, role)
conn.WriteJSON(map[string]interface{}{
"type": "error",
"message": "连接参数无效",
})
return
}
// 验证房间是否存在
ws.roomsMux.RLock()
room := ws.rooms[code]
ws.roomsMux.RUnlock()
if room == nil {
log.Printf("房间不存在: %s", code)
conn.WriteJSON(map[string]interface{}{
"type": "error",
"message": "房间不存在或已过期",
})
return
}
// 检查房间是否已过期
if time.Now().After(room.ExpiresAt) {
log.Printf("房间已过期: %s", code)
conn.WriteJSON(map[string]interface{}{
"type": "error",
"message": "房间已过期",
})
return
}
// 检查房间是否已满(两个连接都已存在)
ws.roomsMux.RLock()
isRoomFull := room.Sender != nil && room.Receiver != nil
ws.roomsMux.RUnlock()
if isRoomFull {
log.Printf("房间已满,拒绝连接: %s", code)
conn.WriteJSON(map[string]interface{}{
"type": "error",
"message": "当前房间人数已满,正在传输中无法加入",
})
return
}
// 生成客户端ID
clientID := ws.generateClientID()
client := &WebRTCClient{
ID: clientID,
Role: role,
Connection: conn,
Room: code,
}
log.Printf("WebRTC客户端已创建: ID=%s, Role=%s, Room=%s", clientID, role, code)
// 添加客户端到房间
ws.addClientToRoom(code, client)
log.Printf("WebRTC %s连接到房间: %s (客户端ID: %s)", role, code, clientID)
// 连接关闭时清理
defer func() {
ws.removeClientFromRoom(code, clientID)
log.Printf("WebRTC客户端断开连接: %s (房间: %s)", clientID, code)
// 通知房间内其他客户端对方已断开连接
ws.notifyRoomDisconnection(code, clientID, client.Role)
}()
// 处理消息
for {
// 首先读取原始消息类型和数据
messageType, data, err := conn.ReadMessage()
if err != nil {
log.Printf("读取WebRTC WebSocket消息失败: %v", err)
break
}
if messageType == websocket.TextMessage {
// 文本消息尝试解析为JSON
var msg WebRTCMessage
if err := json.Unmarshal(data, &msg); err != nil {
log.Printf("解析WebRTC JSON消息失败: %v", err)
continue
}
msg.From = clientID
log.Printf("收到WebRTC信令: 类型=%s, 来自=%s, 房间=%s", msg.Type, clientID, code)
// 转发信令消息给对方
ws.forwardMessage(code, clientID, &msg)
} else if messageType == websocket.BinaryMessage {
// 二进制消息,直接转发
log.Printf("收到WebRTC二进制数据: 大小=%d bytes, 来自=%s, 房间=%s", len(data), clientID, code)
// 转发二进制数据给对方
ws.forwardBinaryMessage(code, clientID, data)
} else {
log.Printf("收到未知消息类型: %d", messageType)
}
}
}
// 添加客户端到房间
func (ws *WebRTCService) addClientToRoom(code string, client *WebRTCClient) {
ws.roomsMux.Lock()
defer ws.roomsMux.Unlock()
room := ws.rooms[code]
if room == nil {
log.Printf("尝试加入不存在的WebRTC房间: %s", code)
return
}
if client.Role == "sender" {
room.Sender = client
// 如果发送方连接,检查是否有接收方在等待,通知接收方
if room.Receiver != nil {
log.Printf("通知接收方:发送方已连接")
peerJoinedMsg := &WebRTCMessage{
Type: "peer-joined",
From: client.ID,
Payload: map[string]interface{}{
"role": "sender",
},
}
room.Receiver.Connection.WriteJSON(peerJoinedMsg)
}
} else {
room.Receiver = client
// 如果接收方连接通知发送方可以开始建立P2P连接
if room.Sender != nil {
log.Printf("通知发送方接收方已连接可以开始建立P2P连接")
peerJoinedMsg := &WebRTCMessage{
Type: "peer-joined",
From: client.ID,
Payload: map[string]interface{}{
"role": "receiver",
},
}
room.Sender.Connection.WriteJSON(peerJoinedMsg)
}
}
}
// 从房间移除客户端
func (ws *WebRTCService) removeClientFromRoom(code string, clientID string) {
ws.roomsMux.Lock()
defer ws.roomsMux.Unlock()
room := ws.rooms[code]
if room == nil {
return
}
if room.Sender != nil && room.Sender.ID == clientID {
room.Sender = nil
}
if room.Receiver != nil && room.Receiver.ID == clientID {
room.Receiver = nil
}
// 如果房间为空,删除房间
if room.Sender == nil && room.Receiver == nil {
delete(ws.rooms, code)
log.Printf("清理WebRTC房间: %s", code)
}
}
// 转发信令消息
func (ws *WebRTCService) forwardMessage(roomCode string, fromClientID string, msg *WebRTCMessage) {
ws.roomsMux.Lock()
defer ws.roomsMux.Unlock()
room := ws.rooms[roomCode]
if room == nil {
return
}
var targetClient *WebRTCClient
if room.Sender != nil && room.Sender.ID == fromClientID {
// 消息来自sender转发给receiver
targetClient = room.Receiver
} else if room.Receiver != nil && room.Receiver.ID == fromClientID {
// 消息来自receiver转发给sender
targetClient = room.Sender
}
if targetClient != nil && targetClient.Connection != nil {
msg.To = targetClient.ID
err := targetClient.Connection.WriteJSON(msg)
if err != nil {
log.Printf("转发WebRTC信令失败: %v", err)
} else {
log.Printf("转发WebRTC信令: 类型=%s, 从=%s到=%s", msg.Type, fromClientID, targetClient.ID)
}
} else {
log.Printf("目标客户端不在线,消息类型=%s", msg.Type)
}
}
// 转发二进制消息
func (ws *WebRTCService) forwardBinaryMessage(roomCode string, fromClientID string, data []byte) {
ws.roomsMux.Lock()
defer ws.roomsMux.Unlock()
room := ws.rooms[roomCode]
if room == nil {
return
}
var targetClient *WebRTCClient
if room.Sender != nil && room.Sender.ID == fromClientID {
// 消息来自sender转发给receiver
targetClient = room.Receiver
} else if room.Receiver != nil && room.Receiver.ID == fromClientID {
// 消息来自receiver转发给sender
targetClient = room.Sender
}
if targetClient != nil && targetClient.Connection != nil {
err := targetClient.Connection.WriteMessage(websocket.BinaryMessage, data)
if err != nil {
log.Printf("转发WebRTC二进制数据失败: %v", err)
} else {
log.Printf("转发WebRTC二进制数据: 大小=%d bytes, 从=%s到=%s", len(data), fromClientID, targetClient.ID)
}
} else {
log.Printf("目标客户端不在线,无法转发二进制数据")
}
}
// CreateRoom 创建或获取房间
func (ws *WebRTCService) CreateRoom(code string) {
ws.roomsMux.Lock()
defer ws.roomsMux.Unlock()
if _, exists := ws.rooms[code]; !exists {
ws.rooms[code] = &WebRTCRoom{
Code: code,
CreatedAt: time.Now(),
ExpiresAt: time.Now().Add(time.Hour), // 1小时后过期
}
log.Printf("创建WebRTC房间: %s", code)
}
}
// CreateNewRoom 创建新房间并返回房间码 - 确保不重复
func (ws *WebRTCService) CreateNewRoom() string {
var code string
// 生成唯一房间码,确保不重复
for {
code = ws.generatePickupCode()
ws.roomsMux.RLock()
_, exists := ws.rooms[code]
ws.roomsMux.RUnlock()
if !exists {
break // 找到了不重复的代码
}
// 如果重复了,继续生成新的
}
ws.CreateRoom(code)
return code
}
// generatePickupCode 生成6位取件码 - 统一规则只使用大写字母和数字排除0和O避免混淆
func (ws *WebRTCService) generatePickupCode() string {
// 只使用大写字母和数字排除容易混淆的字符数字0和字母O
chars := "123456789ABCDEFGHIJKLMNPQRSTUVWXYZ"
source := rand.NewSource(time.Now().UnixNano())
rng := rand.New(source)
result := make([]byte, 6)
for i := 0; i < 6; i++ {
result[i] = chars[rng.Intn(len(chars))]
}
return string(result)
}
// cleanupExpiredRooms 定期清理过期房间
func (ws *WebRTCService) cleanupExpiredRooms() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for range ticker.C {
ws.roomsMux.Lock()
now := time.Now()
for code, room := range ws.rooms {
// 房间过期或无客户端连接则删除
if now.After(room.ExpiresAt) || (room.Sender == nil && room.Receiver == nil) {
delete(ws.rooms, code)
log.Printf("清理过期WebRTC房间: %s", code)
}
}
ws.roomsMux.Unlock()
}
}
// generateClientID 生成客户端ID
func (ws *WebRTCService) generateClientID() string {
return fmt.Sprintf("webrtc_client_%d", rand.Int63())
}
// 通知房间内客户端有人断开连接
func (ws *WebRTCService) notifyRoomDisconnection(roomCode string, disconnectedClientID string, disconnectedRole string) {
ws.roomsMux.Lock()
defer ws.roomsMux.Unlock()
room := ws.rooms[roomCode]
if room == nil {
return
}
// 构建断开连接通知消息
disconnectionMsg := &WebRTCMessage{
Type: "disconnection",
From: disconnectedClientID,
Payload: map[string]interface{}{
"role": disconnectedRole,
"message": "对方已停止传输",
},
}
// 通知房间内其他客户端
if room.Sender != nil && room.Sender.ID != disconnectedClientID {
err := room.Sender.Connection.WriteJSON(disconnectionMsg)
if err != nil {
log.Printf("通知发送方断开连接失败: %v", err)
} else {
log.Printf("已通知发送方: 对方已断开连接")
}
}
if room.Receiver != nil && room.Receiver.ID != disconnectedClientID {
err := room.Receiver.Connection.WriteJSON(disconnectionMsg)
if err != nil {
log.Printf("通知接收方断开连接失败: %v", err)
} else {
log.Printf("已通知接收方: 对方已断开连接")
}
}
}
func (ws *WebRTCService) GetRoomStatus(code string) map[string]interface{} {
ws.roomsMux.RLock()
defer ws.roomsMux.RUnlock()
room := ws.rooms[code]
if room == nil {
return map[string]interface{}{
"success": false,
"exists": false,
"message": "房间不存在或已过期",
}
}
// 检查房间是否已满(两个连接都已存在)
isRoomFull := room.Sender != nil && room.Receiver != nil
return map[string]interface{}{
"success": true,
"exists": true,
"sender_online": room.Sender != nil,
"receiver_online": room.Receiver != nil,
"is_room_full": isRoomFull,
"created_at": room.CreatedAt,
}
}