Files
file-transfer-go/internal/services/relay_service.go

324 lines
8.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package services
import (
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
// RelayService 处理 WebSocket 数据中继(当 P2P 失败时的降级方案)
type RelayService struct {
rooms map[string]*RelayRoom
roomsMux sync.RWMutex
upgrader websocket.Upgrader
// 复用 WebRTCService 来验证房间
webrtcService *WebRTCService
}
// RelayRoom 中继房间
type RelayRoom struct {
Code string
Sender *RelayClient
Receiver *RelayClient
CreatedAt time.Time
mu sync.Mutex
}
// RelayClient 中继客户端
type RelayClient struct {
ID string
Role string // "sender" or "receiver"
Connection *websocket.Conn
mu sync.Mutex
}
// RelayMessage 中继消息的包装格式
type RelayMessage struct {
Type string `json:"type"` // "relay-data" | "relay-binary" | "relay-ready" | "relay-ping" | "relay-pong"
Channel string `json:"channel,omitempty"` // 逻辑通道file-transfer, text-transfer
Payload json.RawMessage `json:"payload,omitempty"` // JSON 消息体
}
func NewRelayService(webrtcService *WebRTCService) *RelayService {
return &RelayService{
rooms: make(map[string]*RelayRoom),
roomsMux: sync.RWMutex{},
webrtcService: webrtcService,
upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
// 增大消息尺寸限制以支持文件传输10MB
ReadBufferSize: 10 * 1024 * 1024,
WriteBufferSize: 10 * 1024 * 1024,
},
}
}
// HandleRelayWebSocket 处理中继 WebSocket 连接
func (rs *RelayService) HandleRelayWebSocket(w http.ResponseWriter, r *http.Request) {
log.Printf("[Relay] 收到中继 WebSocket 连接请求: %s", r.URL.String())
conn, err := rs.upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("[Relay] WebSocket 升级失败: %v", err)
return
}
// 设置最大消息大小为 10MB
conn.SetReadLimit(10 * 1024 * 1024)
defer conn.Close()
// 获取参数
code := r.URL.Query().Get("code")
role := r.URL.Query().Get("role")
log.Printf("[Relay] 连接参数: code=%s, role=%s", code, role)
if code == "" || (role != "sender" && role != "receiver") {
log.Printf("[Relay] 参数无效: code=%s, role=%s", code, role)
conn.WriteJSON(map[string]interface{}{
"type": "error",
"error": "连接参数无效",
})
return
}
// 验证房间是否存在(通过 WebRTC service 验证)
status := rs.webrtcService.GetRoomStatus(code)
exists, _ := status["exists"].(bool)
if !exists {
log.Printf("[Relay] 房间不存在: %s", code)
conn.WriteJSON(map[string]interface{}{
"type": "error",
"error": "房间不存在或已过期",
})
return
}
// 创建或获取中继房间
rs.roomsMux.Lock()
room, ok := rs.rooms[code]
if !ok {
room = &RelayRoom{
Code: code,
CreatedAt: time.Now(),
}
rs.rooms[code] = room
}
rs.roomsMux.Unlock()
// 创建客户端
client := &RelayClient{
ID: rs.webrtcService.generateClientID(),
Role: role,
Connection: conn,
}
// 添加到房间
room.mu.Lock()
if role == "sender" {
// 关闭旧的 sender 连接
if room.Sender != nil {
room.Sender.Connection.Close()
}
room.Sender = client
} else {
// 关闭旧的 receiver 连接
if room.Receiver != nil {
room.Receiver.Connection.Close()
}
room.Receiver = client
}
// 检查对方是否已连接,通知双方 relay 已就绪
peerConnected := false
if role == "sender" && room.Receiver != nil {
peerConnected = true
} else if role == "receiver" && room.Sender != nil {
peerConnected = true
}
room.mu.Unlock()
log.Printf("[Relay] 客户端加入中继房间: ID=%s, Role=%s, Room=%s, 对方是否在线=%v", client.ID, role, code, peerConnected)
// 通知自己已就绪
conn.WriteJSON(map[string]interface{}{
"type": "relay-ready",
"role": role,
"peer_connected": peerConnected,
})
// 如果对方已连接,通知对方
if peerConnected {
room.mu.Lock()
var peer *RelayClient
if role == "sender" {
peer = room.Receiver
} else {
peer = room.Sender
}
room.mu.Unlock()
if peer != nil {
peer.mu.Lock()
peer.Connection.WriteJSON(map[string]interface{}{
"type": "relay-peer-joined",
"peer_role": role,
})
peer.mu.Unlock()
}
}
// 连接关闭时清理
defer func() {
room.mu.Lock()
if role == "sender" && room.Sender != nil && room.Sender.ID == client.ID {
room.Sender = nil
} else if role == "receiver" && room.Receiver != nil && room.Receiver.ID == client.ID {
room.Receiver = nil
}
// 通知对方断开
var peer *RelayClient
if role == "sender" {
peer = room.Receiver
} else {
peer = room.Sender
}
// 如果房间空了,清理
isEmpty := room.Sender == nil && room.Receiver == nil
room.mu.Unlock()
if peer != nil {
peer.mu.Lock()
peer.Connection.WriteJSON(map[string]interface{}{
"type": "relay-peer-left",
"peer_role": role,
})
peer.mu.Unlock()
}
if isEmpty {
rs.roomsMux.Lock()
delete(rs.rooms, code)
rs.roomsMux.Unlock()
log.Printf("[Relay] 清理空的中继房间: %s", code)
}
log.Printf("[Relay] 客户端断开中继: ID=%s, Room=%s", client.ID, code)
}()
// 消息转发循环 - 带统计日志
var textMsgCount, binaryMsgCount int64
var totalTextBytes, totalBinaryBytes int64
startTime := time.Now()
lastLogTime := startTime
log.Printf("[Relay] ▶ 开始消息转发: Room=%s, Role=%s", code, role)
for {
msgType, data, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
log.Printf("[Relay] 读取消息错误: Room=%s, Role=%s, err=%v", code, role, err)
}
break
}
dataLen := int64(len(data))
// 统计消息类型
if msgType == websocket.TextMessage {
textMsgCount++
totalTextBytes += dataLen
// 解析文本消息类型用于日志
var peek struct {
Type string `json:"type"`
Channel string `json:"channel"`
}
if json.Unmarshal(data, &peek) == nil {
log.Printf("[Relay] 📨 转发文本消息: Room=%s, %s→%s, type=%s, channel=%s, size=%d bytes",
code, role, peerRole(role), peek.Type, peek.Channel, dataLen)
} else {
log.Printf("[Relay] 📨 转发文本消息: Room=%s, %s→%s, size=%d bytes",
code, role, peerRole(role), dataLen)
}
} else if msgType == websocket.BinaryMessage {
binaryMsgCount++
totalBinaryBytes += dataLen
// 二进制消息只在每 10 个包或每 5 秒输出一次摘要,避免日志过多
if binaryMsgCount%10 == 1 || time.Since(lastLogTime) > 5*time.Second {
log.Printf("[Relay] 📦 转发二进制数据: Room=%s, %s→%s, size=%d bytes (累计: %d 包, %s)",
code, role, peerRole(role), dataLen, binaryMsgCount, formatBytes(totalBinaryBytes))
lastLogTime = time.Now()
}
}
// 获取对方客户端
room.mu.Lock()
var peer *RelayClient
if role == "sender" {
peer = room.Receiver
} else {
peer = room.Sender
}
room.mu.Unlock()
if peer == nil {
log.Printf("[Relay] ⚠ 对方不在线,丢弃消息: Room=%s, Role=%s, size=%d bytes", code, role, dataLen)
continue
}
// 直接转发消息(文本或二进制)
peer.mu.Lock()
err = peer.Connection.WriteMessage(msgType, data)
peer.mu.Unlock()
if err != nil {
log.Printf("[Relay] ❌ 转发消息失败: Room=%s, %s→%s, err=%v", code, role, peerRole(role), err)
break
}
}
elapsed := time.Since(startTime)
log.Printf("[Relay] ■ 消息转发结束: Room=%s, Role=%s, 持续=%v, 文本消息=%d(%s), 二进制消息=%d(%s)",
code, role, elapsed.Round(time.Second),
textMsgCount, formatBytes(totalTextBytes),
binaryMsgCount, formatBytes(totalBinaryBytes))
}
// peerRole 返回对方角色名
func peerRole(role string) string {
if role == "sender" {
return "receiver"
}
return "sender"
}
// formatBytes 人性化格式化字节数
func formatBytes(b int64) string {
const (
KB = 1024
MB = 1024 * KB
GB = 1024 * MB
)
switch {
case b >= GB:
return fmt.Sprintf("%.2f GB", float64(b)/float64(GB))
case b >= MB:
return fmt.Sprintf("%.2f MB", float64(b)/float64(MB))
case b >= KB:
return fmt.Sprintf("%.2f KB", float64(b)/float64(KB))
default:
return fmt.Sprintf("%d B", b)
}
}