mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-04 06:54:45 +08:00
235 lines
6.7 KiB
Go
235 lines
6.7 KiB
Go
package tcpmodule
|
||
|
||
import (
|
||
"fmt"
|
||
"github.com/duanhf2012/origin/v2/event"
|
||
"github.com/duanhf2012/origin/v2/log"
|
||
"github.com/duanhf2012/origin/v2/network"
|
||
"github.com/duanhf2012/origin/v2/network/processor"
|
||
"github.com/duanhf2012/origin/v2/service"
|
||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
type TcpModule struct {
|
||
tcpServer network.TCPServer
|
||
service.Module
|
||
|
||
mapClientLocker sync.RWMutex
|
||
mapClient map[string]*Client
|
||
process processor.IRawProcessor
|
||
tcpCfg *TcpCfg
|
||
newClientIdHandler func() string
|
||
}
|
||
|
||
type TcpPackType int8
|
||
|
||
const (
|
||
TPTConnected TcpPackType = 0
|
||
TPTDisConnected TcpPackType = 1
|
||
TPTPack TcpPackType = 2
|
||
TPTUnknownPack TcpPackType = 3
|
||
)
|
||
|
||
type TcpPack struct {
|
||
Type TcpPackType //0表示连接 1表示断开 2表示数据
|
||
ClientId string
|
||
Data interface{}
|
||
rawData []byte
|
||
RecyclerReaderBytes func(data []byte)
|
||
}
|
||
|
||
type Client struct {
|
||
id string
|
||
tcpConn *network.NetConn
|
||
tcpModule *TcpModule
|
||
}
|
||
|
||
type TcpCfg struct {
|
||
ListenAddr string //监听地址
|
||
MaxConnNum int //最大连接数
|
||
PendingWriteNum int //写channel最大消息数量
|
||
LittleEndian bool //是否小端序
|
||
LenMsgLen int //消息头占用byte数量,只能是1byte,2byte,4byte。如果是4byte,意味着消息最大可以是math.MaxUint32(4GB)
|
||
MinMsgLen uint32 //最小消息长度
|
||
MaxReadMsgLen uint32 //最大消息长度,超过判定不合法,断开连接
|
||
MaxWriteMsgLen uint32 // 最大写消息长度
|
||
ReadDeadlineSecond time.Duration //读超时
|
||
WriteDeadlineSecond time.Duration //写超时
|
||
}
|
||
|
||
func (tm *TcpModule) OnInit() error {
|
||
if tm.tcpCfg == nil || tm.process == nil {
|
||
return fmt.Errorf("please call the Init function correctly")
|
||
}
|
||
|
||
//2.初始化网络模块
|
||
tm.tcpServer.Addr = tm.tcpCfg.ListenAddr
|
||
tm.tcpServer.MaxConnNum = tm.tcpCfg.MaxConnNum
|
||
tm.tcpServer.PendingWriteNum = tm.tcpCfg.PendingWriteNum
|
||
tm.tcpServer.LittleEndian = tm.tcpCfg.LittleEndian
|
||
tm.tcpServer.LenMsgLen = tm.tcpCfg.LenMsgLen
|
||
tm.tcpServer.MinMsgLen = tm.tcpCfg.MinMsgLen
|
||
tm.tcpServer.MaxReadMsgLen = tm.tcpCfg.MaxReadMsgLen
|
||
tm.tcpServer.MaxWriteMsgLen = tm.tcpCfg.MaxWriteMsgLen
|
||
tm.tcpServer.ReadDeadline = tm.tcpCfg.ReadDeadlineSecond * time.Second
|
||
tm.tcpServer.WriteDeadline = tm.tcpCfg.WriteDeadlineSecond * time.Second
|
||
tm.mapClient = make(map[string]*Client, tm.tcpServer.MaxConnNum)
|
||
tm.tcpServer.NewAgent = tm.NewClient
|
||
if tm.newClientIdHandler == nil {
|
||
tm.newClientIdHandler = func()string{
|
||
return primitive.NewObjectID().Hex()
|
||
}
|
||
}
|
||
|
||
//3.设置解析处理器
|
||
tm.process.SetByteOrder(tm.tcpCfg.LittleEndian)
|
||
|
||
//4.设置网络事件处理
|
||
tm.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_Tcp, tm.GetEventHandler(), tm.tcpEventHandler)
|
||
return nil
|
||
}
|
||
|
||
func (tm *TcpModule) Init(tcpCfg *TcpCfg, process processor.IRawProcessor) {
|
||
tm.tcpCfg = tcpCfg
|
||
tm.process = process
|
||
}
|
||
|
||
func (tm *TcpModule) SetNewClientIdHandler(newClientIdHandler func() string){
|
||
tm.newClientIdHandler = newClientIdHandler
|
||
}
|
||
|
||
func (tm *TcpModule) Start() error {
|
||
return tm.tcpServer.Start()
|
||
}
|
||
|
||
func (tm *TcpModule) tcpEventHandler(ev event.IEvent) {
|
||
pack := ev.(*event.Event).Data.(TcpPack)
|
||
switch pack.Type {
|
||
case TPTConnected:
|
||
tm.process.ConnectedRoute(pack.ClientId)
|
||
case TPTDisConnected:
|
||
tm.process.DisConnectedRoute(pack.ClientId)
|
||
case TPTUnknownPack:
|
||
tm.process.UnknownMsgRoute(pack.ClientId, pack.Data)
|
||
pack.RecyclerReaderBytes(pack.rawData)
|
||
case TPTPack:
|
||
tm.process.MsgRoute(pack.ClientId, pack.Data)
|
||
pack.RecyclerReaderBytes(pack.rawData)
|
||
}
|
||
}
|
||
|
||
func (tm *TcpModule) NewClient(conn network.Conn) network.Agent {
|
||
tm.mapClientLocker.Lock()
|
||
defer tm.mapClientLocker.Unlock()
|
||
|
||
clientId := tm.newClientIdHandler()
|
||
pClient := &Client{tcpConn: conn.(*network.NetConn), id: clientId}
|
||
pClient.tcpModule = tm
|
||
tm.mapClient[clientId] = pClient
|
||
|
||
return pClient
|
||
}
|
||
|
||
func (slf *Client) GetId() string {
|
||
return slf.id
|
||
}
|
||
|
||
func (slf *Client) Run() {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
log.StackError(fmt.Sprint(r))
|
||
}
|
||
}()
|
||
|
||
slf.tcpModule.NotifyEvent(&event.Event{Type: event.Sys_Event_Tcp, Data: TcpPack{ClientId: slf.id, Type: TPTConnected}})
|
||
for slf.tcpConn != nil {
|
||
slf.tcpConn.SetReadDeadline(slf.tcpModule.tcpServer.ReadDeadline)
|
||
bytes, err := slf.tcpConn.ReadMsg()
|
||
if err != nil {
|
||
log.Debug("read client failed", log.ErrorField("error", err), log.String("clientId", slf.id))
|
||
break
|
||
}
|
||
data, err := slf.tcpModule.process.Unmarshal(slf.id, bytes)
|
||
if err != nil {
|
||
slf.tcpModule.NotifyEvent(&event.Event{Type: event.Sys_Event_Tcp, Data: TcpPack{ClientId: slf.id, Type: TPTUnknownPack, Data: data,rawData: bytes,RecyclerReaderBytes: slf.tcpConn.GetRecyclerReaderBytes()}})
|
||
continue
|
||
}
|
||
slf.tcpModule.NotifyEvent(&event.Event{Type: event.Sys_Event_Tcp, Data: TcpPack{ClientId: slf.id, Type: TPTPack, Data: data,rawData: bytes, RecyclerReaderBytes: slf.tcpConn.GetRecyclerReaderBytes()}})
|
||
}
|
||
}
|
||
|
||
func (slf *Client) OnClose() {
|
||
slf.tcpModule.NotifyEvent(&event.Event{Type: event.Sys_Event_Tcp, Data: TcpPack{ClientId: slf.id, Type: TPTDisConnected}})
|
||
slf.tcpModule.mapClientLocker.Lock()
|
||
defer slf.tcpModule.mapClientLocker.Unlock()
|
||
delete(slf.tcpModule.mapClient, slf.GetId())
|
||
}
|
||
|
||
func (tm *TcpModule) SendMsg(clientId string, msg interface{}) error {
|
||
tm.mapClientLocker.Lock()
|
||
client, ok := tm.mapClient[clientId]
|
||
if ok == false {
|
||
tm.mapClientLocker.Unlock()
|
||
return fmt.Errorf("client %s is disconnect", clientId)
|
||
}
|
||
|
||
tm.mapClientLocker.Unlock()
|
||
bytes, err := tm.process.Marshal(clientId, msg)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return client.tcpConn.WriteMsg(bytes)
|
||
}
|
||
|
||
func (tm *TcpModule) Close(clientId string) {
|
||
tm.mapClientLocker.Lock()
|
||
defer tm.mapClientLocker.Unlock()
|
||
|
||
client, ok := tm.mapClient[clientId]
|
||
if ok == false {
|
||
return
|
||
}
|
||
|
||
if client.tcpConn != nil {
|
||
client.tcpConn.Close()
|
||
}
|
||
|
||
log.SWarn("close client:", clientId)
|
||
return
|
||
}
|
||
|
||
func (tm *TcpModule) GetClientIp(clientId string) string {
|
||
tm.mapClientLocker.Lock()
|
||
defer tm.mapClientLocker.Unlock()
|
||
pClient, ok := tm.mapClient[clientId]
|
||
if ok == false {
|
||
return ""
|
||
}
|
||
|
||
return pClient.tcpConn.GetRemoteIp()
|
||
}
|
||
|
||
func (tm *TcpModule) SendRawMsg(clientId string, msg []byte) error {
|
||
tm.mapClientLocker.Lock()
|
||
client, ok := tm.mapClient[clientId]
|
||
if ok == false {
|
||
tm.mapClientLocker.Unlock()
|
||
return fmt.Errorf("client %s is disconnect", clientId)
|
||
}
|
||
tm.mapClientLocker.Unlock()
|
||
return client.tcpConn.WriteMsg(msg)
|
||
}
|
||
|
||
func (tm *TcpModule) GetConnNum() int {
|
||
tm.mapClientLocker.Lock()
|
||
connNum := len(tm.mapClient)
|
||
tm.mapClientLocker.Unlock()
|
||
return connNum
|
||
}
|
||
|
||
func (tm *TcpModule) GetProcessor() processor.IRawProcessor {
|
||
return tm.process
|
||
}
|