feat:统一连接层,精简前后端

This commit is contained in:
MatrixSeven
2025-08-06 18:08:02 +08:00
parent 3f3b7d8f18
commit 7cb0d34fb1
42 changed files with 2790 additions and 9426 deletions

View File

@@ -1,61 +0,0 @@
package services
import (
"fmt"
"sync"
"time"
"chuan/internal/models"
)
// 内存存储生产环境应使用Redis
type MemoryStore struct {
files map[string]*models.FileInfo
mutex sync.RWMutex
}
var globalStore = &MemoryStore{
files: make(map[string]*models.FileInfo),
}
// StoreFileInfo 存储文件信息
func (ms *MemoryStore) StoreFileInfo(fileInfo *models.FileInfo) error {
ms.mutex.Lock()
defer ms.mutex.Unlock()
ms.files[fileInfo.Code] = fileInfo
return nil
}
// GetFileInfo 获取文件信息
func (ms *MemoryStore) GetFileInfo(code string) (*models.FileInfo, error) {
ms.mutex.RLock()
defer ms.mutex.RUnlock()
fileInfo, exists := ms.files[code]
if !exists {
return nil, fmt.Errorf("文件不存在或已过期")
}
// 检查是否过期
if time.Now().After(fileInfo.ExpiryTime) {
delete(ms.files, code)
return nil, fmt.Errorf("文件已过期")
}
return fileInfo, nil
}
// DeleteFileInfo 删除文件信息
func (ms *MemoryStore) DeleteFileInfo(code string) error {
ms.mutex.Lock()
defer ms.mutex.Unlock()
delete(ms.files, code)
return nil
}
// GetStore 获取全局存储实例
func GetStore() *MemoryStore {
return globalStore
}

View File

@@ -1,676 +0,0 @@
package services
import (
"crypto/rand"
"fmt"
"log"
mathrand "math/rand"
"net/http"
"strconv"
"sync"
"time"
"chuan/internal/models"
"github.com/gorilla/websocket"
)
type FileTransferRoom struct {
ID string
Code string // 取件码
Files []models.FileTransferInfo // 待传输文件信息
Clients map[string]*models.ClientInfo // 所有连接的客户端 (客户端ID -> ClientInfo)
CreatedAt time.Time // 创建时间
TextContent string // 文字内容
IsTextRoom bool // 是否是文字传输房间
mutex sync.RWMutex
}
type P2PService struct {
rooms map[string]*FileTransferRoom // 使用取件码作为key
roomsMux sync.RWMutex
upgrader websocket.Upgrader
}
func NewP2PService() *P2PService {
service := &P2PService{
rooms: make(map[string]*FileTransferRoom),
roomsMux: sync.RWMutex{},
upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true // 允许所有来源,生产环境应当限制
},
},
}
// 启动房间清理任务
go service.cleanupExpiredRooms()
return service
}
// CreateRoom 创建新房间并返回取件码
func (p *P2PService) CreateRoom(files []models.FileTransferInfo) string {
code := generatePickupCode()
p.roomsMux.Lock()
defer p.roomsMux.Unlock()
room := &FileTransferRoom{
ID: "room_" + code,
Code: code,
Files: files,
Clients: make(map[string]*models.ClientInfo),
CreatedAt: time.Now(),
}
p.rooms[code] = room
log.Printf("创建房间,取件码: %s文件数量: %d", code, len(files))
return code
}
// GetRoomByCode 根据取件码获取房间
func (p *P2PService) GetRoomByCode(code string) (*FileTransferRoom, bool) {
p.roomsMux.RLock()
defer p.roomsMux.RUnlock()
room, exists := p.rooms[code]
return room, exists
}
// CreateTextRoom 创建文字传输房间并返回取件码
func (p *P2PService) CreateTextRoom(text string) string {
code := generatePickupCode()
p.roomsMux.Lock()
defer p.roomsMux.Unlock()
room := &FileTransferRoom{
ID: "text_room_" + code,
Code: code,
Files: []models.FileTransferInfo{}, // 文字房间不需要文件
Clients: make(map[string]*models.ClientInfo),
CreatedAt: time.Now(),
TextContent: text,
IsTextRoom: true,
}
p.rooms[code] = room
log.Printf("创建文字传输房间,取件码: %s文字长度: %d", code, len(text))
return code
}
// GetTextContent 根据取件码获取文字内容
func (p *P2PService) GetTextContent(code string) (string, bool) {
p.roomsMux.RLock()
defer p.roomsMux.RUnlock()
room, exists := p.rooms[code]
if !exists || !room.IsTextRoom {
return "", false
}
return room.TextContent, true
}
// generateClientID 生成客户端唯一标识
func generateClientID() string {
b := make([]byte, 8)
rand.Read(b)
return fmt.Sprintf("client_%x", b)
}
// HandleWebSocket 处理WebSocket连接
func (p *P2PService) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, err := p.upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("WebSocket升级失败: %v", err)
return
}
defer conn.Close()
// 获取取件码和角色
code := r.URL.Query().Get("code")
role := r.URL.Query().Get("role") // "sender" or "receiver"
if code == "" || (role != "sender" && role != "receiver") {
log.Printf("缺少取件码或角色参数")
return
}
// 获取房间
room, exists := p.GetRoomByCode(code)
if !exists {
log.Printf("房间不存在: %s", code)
return
}
// 生成客户端ID并创建客户端信息
clientID := generateClientID()
client := &models.ClientInfo{
ID: clientID,
Role: role,
Connection: conn,
JoinedAt: time.Now(),
UserAgent: r.Header.Get("User-Agent"),
}
// 将客户端加入房间
room.mutex.Lock()
room.Clients[clientID] = client
log.Printf("%s连接到房间: %s (客户端ID: %s)", role, code, clientID)
// 如果是接收方,发送相应内容
if role == "receiver" {
// 如果是文字房间,发送文字内容
if room.IsTextRoom {
textMsg := models.VideoMessage{
Type: "text-content",
Payload: map[string]interface{}{
"text": room.TextContent,
"is_text_room": true,
},
}
if err := conn.WriteJSON(textMsg); err != nil {
log.Printf("发送文字内容失败: %v", err)
}
} else {
// 如果是文件房间,发送文件列表
filesMsg := models.VideoMessage{
Type: "file-list",
Payload: map[string]interface{}{"files": room.Files},
}
if err := conn.WriteJSON(filesMsg); err != nil {
log.Printf("发送文件列表失败: %v", err)
}
}
// 通知所有发送方有新的接收方加入
p.notifyClients(room, "sender", models.VideoMessage{
Type: "new-receiver",
Payload: map[string]interface{}{
"client_id": clientID,
"joined_at": client.JoinedAt,
},
})
} else if role == "sender" {
// 如果是文字房间且发送方连接,也发送当前文字内容用于同步
if room.IsTextRoom {
textMsg := models.VideoMessage{
Type: "text-content",
Payload: map[string]interface{}{
"text": room.TextContent,
"is_text_room": true,
},
}
if err := conn.WriteJSON(textMsg); err != nil {
log.Printf("发送文字内容失败: %v", err)
}
}
// 通知所有接收方有新的发送方加入
p.notifyClients(room, "receiver", models.VideoMessage{
Type: "new-sender",
Payload: map[string]interface{}{
"client_id": clientID,
"joined_at": client.JoinedAt,
},
})
}
// 发送房间状态给所有客户端
p.broadcastRoomStatus(room)
room.mutex.Unlock()
// 连接关闭时清理
defer func() {
room.mutex.Lock()
delete(room.Clients, clientID)
log.Printf("客户端断开连接: %s (房间: %s)", clientID, code)
// 通知其他客户端有人离开
p.notifyClients(room, "", models.VideoMessage{
Type: "client-left",
Payload: map[string]interface{}{
"client_id": clientID,
"role": role,
},
})
// 发送更新后的房间状态
p.broadcastRoomStatus(room)
room.mutex.Unlock()
// 如果房间没有客户端了,清理房间
p.cleanupRoom(code)
}()
// 处理消息
for {
var msg models.VideoMessage
err := conn.ReadJSON(&msg)
if err != nil {
log.Printf("读取WebSocket消息失败: %v", err)
break
}
log.Printf("收到WebSocket消息: 类型=%s, 来自=%s, 房间=%s", msg.Type, clientID, code)
// 处理特殊消息类型
switch msg.Type {
case "update-file-list":
// 处理文件列表更新
p.handleFileListUpdate(room, clientID, msg)
case "file-request":
// 处理文件请求
p.handleFileRequest(room, clientID, msg)
case "file-info", "file-chunk", "file-complete":
// 处理文件传输相关消息,直接转发给接收方
p.forwardMessage(room, clientID, msg)
case "text-update":
// 处理实时文字更新
p.handleTextUpdate(room, clientID, msg)
case "text-send":
// 处理文字发送
p.handleTextSend(room, clientID, msg)
case "image-send":
// 处理图片发送
p.handleImageSend(room, clientID, msg)
default:
// 转发消息到对应的客户端
p.forwardMessage(room, clientID, msg)
}
}
}
// notifyClients 通知指定角色的客户端
func (p *P2PService) notifyClients(room *FileTransferRoom, role string, msg models.VideoMessage) {
for _, client := range room.Clients {
if role == "" || client.Role == role {
if err := client.Connection.WriteJSON(msg); err != nil {
log.Printf("发送消息到客户端失败 %s: %v", client.ID, err)
}
}
}
}
// broadcastRoomStatus 广播房间状态给所有客户端
func (p *P2PService) broadcastRoomStatus(room *FileTransferRoom) {
status := p.getRoomStatus(room)
statusMsg := models.VideoMessage{
Type: "room-status",
Payload: status,
}
for _, client := range room.Clients {
if err := client.Connection.WriteJSON(statusMsg); err != nil {
log.Printf("发送房间状态失败 %s: %v", client.ID, err)
}
}
}
// getRoomStatus 获取房间状态
func (p *P2PService) getRoomStatus(room *FileTransferRoom) models.RoomStatus {
senderCount := 0
receiverCount := 0
clients := make([]models.ClientInfo, 0, len(room.Clients))
for _, client := range room.Clients {
// 创建不包含连接的客户端信息副本
clientCopy := models.ClientInfo{
ID: client.ID,
Role: client.Role,
JoinedAt: client.JoinedAt,
UserAgent: client.UserAgent,
}
clients = append(clients, clientCopy)
if client.Role == "sender" {
senderCount++
} else if client.Role == "receiver" {
receiverCount++
}
}
return models.RoomStatus{
Code: room.Code,
FileCount: len(room.Files),
SenderCount: senderCount,
ReceiverCount: receiverCount,
Clients: clients,
CreatedAt: room.CreatedAt,
}
}
// handleFileListUpdate 处理文件列表更新
func (p *P2PService) handleFileListUpdate(room *FileTransferRoom, clientID string, msg models.VideoMessage) {
// 获取文件列表
payload, ok := msg.Payload.(map[string]interface{})
if !ok {
log.Printf("无效的文件列表更新消息格式")
return
}
filesData, ok := payload["files"].([]interface{})
if !ok {
log.Printf("缺少文件列表数据")
return
}
// 转换文件列表格式
var files []models.FileTransferInfo
for _, fileData := range filesData {
if fileMap, ok := fileData.(map[string]interface{}); ok {
file := models.FileTransferInfo{
ID: getString(fileMap, "id"),
Name: getString(fileMap, "name"),
Size: getInt64(fileMap, "size"),
Type: getString(fileMap, "type"),
LastModified: getInt64(fileMap, "lastModified"),
}
files = append(files, file)
}
}
log.Printf("收到文件列表更新请求,共 %d 个文件", len(files))
// 更新房间文件列表
room.mutex.Lock()
room.Files = files
room.mutex.Unlock()
log.Printf("房间 %s 文件列表已更新,共 %d 个文件", room.Code, len(files))
// 通知所有接收方客户端文件列表已更新
room.mutex.RLock()
for _, client := range room.Clients {
if client.Role == "receiver" {
message := models.VideoMessage{
Type: "file-list-updated",
Payload: map[string]interface{}{
"files": files,
},
}
if err := client.Connection.WriteJSON(message); err != nil {
log.Printf("发送文件列表更新消息失败: %v", err)
} else {
log.Printf("已向接收方 %s 发送文件列表更新消息", client.ID)
}
}
}
room.mutex.RUnlock()
}
// 辅助函数从map中获取字符串值
func getString(m map[string]interface{}, key string) string {
if val, ok := m[key].(string); ok {
return val
}
return ""
}
// 辅助函数从map中获取int64值
func getInt64(m map[string]interface{}, key string) int64 {
if val, ok := m[key].(float64); ok {
return int64(val)
}
if val, ok := m[key].(int64); ok {
return val
}
if val, ok := m[key].(int); ok {
return int64(val)
}
return 0
}
// handleFileRequest 处理文件请求
func (p *P2PService) handleFileRequest(room *FileTransferRoom, clientID string, msg models.VideoMessage) {
// 获取请求的文件ID
payload, ok := msg.Payload.(map[string]interface{})
if !ok {
log.Printf("无效的文件请求消息格式")
return
}
fileID, ok := payload["file_id"].(string)
if !ok {
log.Printf("缺少文件ID")
return
}
// 转发文件请求给所有发送方
requestMsg := models.VideoMessage{
Type: "file-request",
Payload: map[string]interface{}{
"file_id": fileID,
"requester": clientID,
"request_id": payload["request_id"],
},
}
p.notifyClients(room, "sender", requestMsg)
}
// forwardMessage 转发消息到指定客户端或所有对应角色的客户端
func (p *P2PService) forwardMessage(room *FileTransferRoom, senderClientID string, msg models.VideoMessage) {
room.mutex.RLock()
defer room.mutex.RUnlock()
senderClient, exists := room.Clients[senderClientID]
if !exists {
log.Printf("发送方客户端不存在: %s", senderClientID)
return
}
// 检查消息是否指定了目标客户端
if payload, ok := msg.Payload.(map[string]interface{}); ok {
if targetID, hasTarget := payload["target_client"].(string); hasTarget {
// 发送给指定客户端
if targetClient, exists := room.Clients[targetID]; exists {
log.Printf("转发消息: 类型=%s, 从%s到%s", msg.Type, senderClientID, targetID)
if err := targetClient.Connection.WriteJSON(msg); err != nil {
log.Printf("转发消息失败: %v", err)
}
return
}
}
}
// 否则根据角色转发给对应的客户端
targetRole := ""
if senderClient.Role == "sender" {
targetRole = "receiver"
} else if senderClient.Role == "receiver" {
targetRole = "sender"
}
if targetRole != "" {
log.Printf("广播消息: 类型=%s, 从%s到所有%s", msg.Type, senderClient.Role, targetRole)
p.notifyClients(room, targetRole, msg)
}
}
// cleanupRoom 清理房间
func (p *P2PService) cleanupRoom(code string) {
p.roomsMux.Lock()
defer p.roomsMux.Unlock()
if room, exists := p.rooms[code]; exists {
room.mutex.RLock()
noClients := len(room.Clients) == 0
room.mutex.RUnlock()
if noClients {
delete(p.rooms, code)
log.Printf("清理房间: %s", code)
}
}
}
// cleanupExpiredRooms 定期清理过期房间
func (p *P2PService) cleanupExpiredRooms() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for range ticker.C {
p.roomsMux.Lock()
now := time.Now()
for code, room := range p.rooms {
// 房间存在超过1小时则删除
if now.Sub(room.CreatedAt) > time.Hour {
delete(p.rooms, code)
log.Printf("清理过期房间: %s", code)
}
}
p.roomsMux.Unlock()
}
}
// generatePickupCode 生成6位取件码
func generatePickupCode() string {
mathrand.Seed(time.Now().UnixNano())
code := mathrand.Intn(900000) + 100000
return strconv.Itoa(code)
}
// GetRoomStatusByCode 根据取件码获取房间状态
func (p *P2PService) GetRoomStatusByCode(code string) (models.RoomStatus, bool) {
p.roomsMux.RLock()
defer p.roomsMux.RUnlock()
room, exists := p.rooms[code]
if !exists {
return models.RoomStatus{}, false
}
room.mutex.RLock()
status := p.getRoomStatus(room)
room.mutex.RUnlock()
return status, true
}
func (p *P2PService) GetRoomStats() map[string]interface{} {
p.roomsMux.RLock()
defer p.roomsMux.RUnlock()
stats := map[string]interface{}{
"total_rooms": len(p.rooms),
"rooms": make([]map[string]interface{}, 0),
}
for code, room := range p.rooms {
room.mutex.RLock()
status := p.getRoomStatus(room)
roomInfo := map[string]interface{}{
"code": code,
"file_count": len(room.Files),
"sender_count": status.SenderCount,
"receiver_count": status.ReceiverCount,
"total_clients": len(room.Clients),
"created_at": room.CreatedAt,
}
room.mutex.RUnlock()
stats["rooms"] = append(stats["rooms"].([]map[string]interface{}), roomInfo)
}
return stats
}
// UpdateRoomFiles 更新房间文件列表
func (p *P2PService) UpdateRoomFiles(code string, files []models.FileTransferInfo) bool {
p.roomsMux.RLock()
room, exists := p.rooms[code]
p.roomsMux.RUnlock()
if !exists {
return false
}
room.mutex.Lock()
room.Files = files
room.mutex.Unlock()
log.Printf("房间 %s 文件列表已更新,共 %d 个文件", code, len(files))
// 通知所有连接的客户端文件列表已更新
room.mutex.RLock()
for _, client := range room.Clients {
if client.Role == "receiver" {
message := models.VideoMessage{
Type: "file-list-updated",
Payload: map[string]interface{}{
"files": files,
},
}
if err := client.Connection.WriteJSON(message); err != nil {
log.Printf("发送文件列表更新消息失败: %v", err)
}
}
}
room.mutex.RUnlock()
return true
}
// handleTextUpdate 处理实时文字更新
func (p *P2PService) handleTextUpdate(room *FileTransferRoom, senderID string, msg models.VideoMessage) {
log.Printf("处理文字更新: 来自客户端 %s", senderID)
// 更新房间的文字内容
if payload, ok := msg.Payload.(map[string]interface{}); ok {
if textContent, exists := payload["text"].(string); exists {
room.mutex.Lock()
room.TextContent = textContent
room.mutex.Unlock()
log.Printf("房间 %s 文字内容已更新", room.Code)
}
}
// 转发文字更新给房间内其他所有客户端
room.mutex.RLock()
defer room.mutex.RUnlock()
for clientID, client := range room.Clients {
if clientID != senderID { // 不发送给发送者自己
if err := client.Connection.WriteJSON(msg); err != nil {
log.Printf("转发文字更新失败 %s: %v", clientID, err)
}
}
}
}
// handleTextSend 处理文字发送
func (p *P2PService) handleTextSend(room *FileTransferRoom, senderID string, msg models.VideoMessage) {
log.Printf("处理文字发送: 来自客户端 %s", senderID)
// 转发文字发送给房间内所有客户端
room.mutex.RLock()
defer room.mutex.RUnlock()
for _, client := range room.Clients {
if err := client.Connection.WriteJSON(msg); err != nil {
log.Printf("转发文字发送失败 %s: %v", client.ID, err)
}
}
}
// handleImageSend 处理图片发送
func (p *P2PService) handleImageSend(room *FileTransferRoom, senderID string, msg models.VideoMessage) {
log.Printf("处理图片发送: 来自客户端 %s", senderID)
// 转发图片发送给房间内其他客户端(不包括发送者)
room.mutex.RLock()
defer room.mutex.RUnlock()
for clientID, client := range room.Clients {
if clientID != senderID { // 不发送给发送者自己
if err := client.Connection.WriteJSON(msg); err != nil {
log.Printf("转发图片发送失败 %s: %v", clientID, err)
}
}
}
}

View File

@@ -22,6 +22,7 @@ type WebRTCRoom struct {
Sender *WebRTCClient
Receiver *WebRTCClient
CreatedAt time.Time
ExpiresAt time.Time // 添加过期时间
LastOffer *WebRTCMessage // 保存最后的offer消息
}
@@ -33,7 +34,7 @@ type WebRTCClient struct {
}
func NewWebRTCService() *WebRTCService {
return &WebRTCService{
service := &WebRTCService{
rooms: make(map[string]*WebRTCRoom),
roomsMux: sync.RWMutex{},
upgrader: websocket.Upgrader{
@@ -42,6 +43,11 @@ func NewWebRTCService() *WebRTCService {
},
},
}
// 启动房间清理任务
go service.cleanupExpiredRooms()
return service
}
type WebRTCMessage struct {
@@ -124,8 +130,10 @@ func (ws *WebRTCService) addClientToRoom(code string, client *WebRTCClient) {
room = &WebRTCRoom{
Code: code,
CreatedAt: time.Now(),
ExpiresAt: time.Now().Add(time.Hour), // 1小时后过期
}
ws.rooms[code] = room
log.Printf("自动创建WebRTC房间: %s", code)
}
if client.Role == "sender" {
@@ -205,7 +213,55 @@ func (ws *WebRTCService) forwardMessage(roomCode string, fromClientID string, ms
}
}
// 生成客户端ID
// CreateRoom 创建或获取房间
func (ws *WebRTCService) CreateRoom(code string) {
ws.roomsMux.Lock()
defer ws.roomsMux.Unlock()
if _, exists := ws.rooms[code]; !exists {
ws.rooms[code] = &WebRTCRoom{
Code: code,
CreatedAt: time.Now(),
ExpiresAt: time.Now().Add(time.Hour), // 1小时后过期
}
log.Printf("创建WebRTC房间: %s", code)
}
}
// CreateNewRoom 创建新房间并返回房间码
func (ws *WebRTCService) CreateNewRoom() string {
code := ws.generatePickupCode()
ws.CreateRoom(code)
return code
}
// generatePickupCode 生成6位取件码
func (ws *WebRTCService) generatePickupCode() string {
rand.Seed(time.Now().UnixNano())
code := rand.Intn(900000) + 100000
return fmt.Sprintf("%d", code)
}
// cleanupExpiredRooms 定期清理过期房间
func (ws *WebRTCService) cleanupExpiredRooms() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for range ticker.C {
ws.roomsMux.Lock()
now := time.Now()
for code, room := range ws.rooms {
// 房间过期或无客户端连接则删除
if now.After(room.ExpiresAt) || (room.Sender == nil && room.Receiver == nil) {
delete(ws.rooms, code)
log.Printf("清理过期WebRTC房间: %s", code)
}
}
ws.roomsMux.Unlock()
}
}
// generateClientID 生成客户端ID
func (ws *WebRTCService) generateClientID() string {
return fmt.Sprintf("webrtc_client_%d", rand.Int63())
}