From 6a29ba2c88695812c11b38dec22996ccb28bdfa2 Mon Sep 17 00:00:00 2001 From: duanhf2012 <6549168@qq.com> Date: Wed, 9 Oct 2024 10:50:54 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E7=BD=91=E7=BB=9C=E6=A8=A1?= =?UTF-8?q?=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sysmodule/netmodule/kcpmodule/KcpModule.go | 214 ++++++++++++++++++++ sysmodule/netmodule/tcpmodule/TcpModule.go | 223 +++++++++++++++++++++ sysmodule/netmodule/wsmodule/WSModule.go | 200 ++++++++++++++++++ 3 files changed, 637 insertions(+) create mode 100644 sysmodule/netmodule/kcpmodule/KcpModule.go create mode 100644 sysmodule/netmodule/tcpmodule/TcpModule.go create mode 100644 sysmodule/netmodule/wsmodule/WSModule.go diff --git a/sysmodule/netmodule/kcpmodule/KcpModule.go b/sysmodule/netmodule/kcpmodule/KcpModule.go new file mode 100644 index 0000000..8bb9995 --- /dev/null +++ b/sysmodule/netmodule/kcpmodule/KcpModule.go @@ -0,0 +1,214 @@ +package kcpmodule + +import ( + "fmt" + "github.com/duanhf2012/origin/v2/event" + "github.com/duanhf2012/origin/v2/log" + "github.com/duanhf2012/origin/v2/network" + "github.com/duanhf2012/origin/v2/network/processor" + "github.com/duanhf2012/origin/v2/service" + "github.com/xtaci/kcp-go/v5" + "go.mongodb.org/mongo-driver/bson/primitive" + "runtime" + "sync" +) + +type KcpModule struct { + service.Module + blockCrypt kcp.BlockCrypt + + mapClientLocker sync.RWMutex + mapClient map[string]*Client + process processor.IRawProcessor + + kcpServer network.KCPServer + kcpCfg *network.KcpCfg +} + +type Client struct { + id string + kcpConn *network.NetConn + kcpModule *KcpModule +} + +type KcpPackType int8 + +const ( + KPTConnected KcpPackType = 0 + KPTDisConnected KcpPackType = 1 + KPTPack KcpPackType = 2 + KPTUnknownPack KcpPackType = 3 +) + +type KcpPack struct { + Type KcpPackType //0表示连接 1表示断开 2表示数据 + ClientId string + Data interface{} + RecyclerReaderBytes func(data []byte) +} + +func (km *KcpModule) OnInit() error { + if km.kcpCfg == nil || km.process == nil { + return fmt.Errorf("please call the Init function correctly") + } + + km.mapClient = make(map[string]*Client, km.kcpCfg.MaxConnNum) + km.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_Kcp, km.GetEventHandler(), km.kcpEventHandler) + km.process.SetByteOrder(km.kcpCfg.LittleEndian) + km.kcpServer.Init(km.kcpCfg) + km.kcpServer.NewAgent = km.NewAgent + + return nil +} + +func (km *KcpModule) Init(kcpCfg *network.KcpCfg, process processor.IRawProcessor) { + km.kcpCfg = kcpCfg + km.process = process +} + +func (km *KcpModule) Start() error { + return km.kcpServer.Start() +} + +func (km *KcpModule) kcpEventHandler(ev event.IEvent) { + e := ev.(*event.Event) + switch KcpPackType(e.IntExt[0]) { + case KPTConnected: + km.process.ConnectedRoute(e.StringExt[0]) + case KPTDisConnected: + km.process.DisConnectedRoute(e.StringExt[0]) + case KPTUnknownPack: + km.process.UnknownMsgRoute(e.StringExt[0], e.Data, e.AnyExt[0].(func(data []byte))) + case KPTPack: + km.process.MsgRoute(e.StringExt[0], e.Data, e.AnyExt[0].(func(data []byte))) + } + + event.DeleteEvent(ev) +} + +func (km *KcpModule) SetBlob(blockCrypt kcp.BlockCrypt) { + km.blockCrypt = blockCrypt +} + +func (km *KcpModule) OnConnected(c *Client) { + ev := event.NewEvent() + ev.Type = event.Sys_Event_Kcp + ev.IntExt[0] = int64(KPTConnected) + ev.StringExt[0] = c.id + + km.NotifyEvent(ev) +} + +func (km *KcpModule) OnClose(c *Client) { + ev := event.NewEvent() + ev.Type = event.Sys_Event_Kcp + ev.IntExt[0] = int64(KPTDisConnected) + ev.StringExt[0] = c.id + + km.NotifyEvent(ev) +} + +func (km *KcpModule) newClient(conn network.Conn) *Client { + km.mapClientLocker.Lock() + defer km.mapClientLocker.Unlock() + + pClient := &Client{kcpConn: conn.(*network.NetConn), id: primitive.NewObjectID().Hex()} + pClient.kcpModule = km + km.mapClient[pClient.id] = pClient + + return pClient +} + +func (km *KcpModule) GetProcessor() processor.IRawProcessor { + return km.process +} + +func (km *KcpModule) SendRawMsg(clientId string, data []byte) error { + km.mapClientLocker.Lock() + client, ok := km.mapClient[clientId] + if ok == false { + km.mapClientLocker.Unlock() + return fmt.Errorf("client %s is disconnect", clientId) + } + km.mapClientLocker.Unlock() + return client.kcpConn.WriteMsg(data) +} + +func (km *KcpModule) Close(clientId string) { + km.mapClientLocker.Lock() + client, ok := km.mapClient[clientId] + if ok == false { + km.mapClientLocker.Unlock() + return + } + km.mapClientLocker.Unlock() + client.kcpConn.Close() +} + +func (km *KcpModule) GetClientIp(clientId string) string { + km.mapClientLocker.Lock() + defer km.mapClientLocker.Unlock() + client, ok := km.mapClient[clientId] + if ok == false { + return "" + } + removeAddr := client.kcpConn.RemoteAddr() + if removeAddr != nil { + return removeAddr.String() + } + return "" +} + +func (km *KcpModule) NewAgent(conn network.Conn) network.Agent { + c := km.newClient(conn) + return c +} + +func (c *Client) Run() { + defer func() { + if r := recover(); r != nil { + buf := make([]byte, 4096) + l := runtime.Stack(buf, false) + errString := fmt.Sprint(r) + log.Dump(string(buf[:l]), log.String("error", errString)) + } + }() + + c.kcpModule.OnConnected(c) + for c.kcpConn != nil { + c.kcpConn.SetReadDeadline(*c.kcpModule.kcpCfg.ReadDeadlineMill) + msgBuff, err := c.kcpConn.ReadMsg() + if err != nil { + log.Debug("read client failed", log.ErrorAttr("error", err), log.String("clientId", c.id)) + break + } + + data, err := c.kcpModule.process.Unmarshal(c.id, msgBuff) + if err != nil { + ev := event.NewEvent() + ev.Type = event.Sys_Event_Kcp + ev.IntExt[0] = int64(KPTUnknownPack) + ev.StringExt[0] = c.id + ev.Data = msgBuff + ev.AnyExt[0] = c.kcpConn.GetRecyclerReaderBytes() + c.kcpModule.NotifyEvent(ev) + continue + } + + ev := event.NewEvent() + ev.Type = event.Sys_Event_Kcp + ev.IntExt[0] = int64(KPTPack) + ev.StringExt[0] = c.id + ev.Data = data + ev.AnyExt[0] = c.kcpConn.GetRecyclerReaderBytes() + c.kcpModule.NotifyEvent(ev) + } +} + +func (c *Client) OnClose() { + c.kcpModule.OnClose(c) + + c.kcpModule.mapClientLocker.Lock() + delete(c.kcpModule.mapClient, c.id) + c.kcpModule.mapClientLocker.Unlock() +} diff --git a/sysmodule/netmodule/tcpmodule/TcpModule.go b/sysmodule/netmodule/tcpmodule/TcpModule.go new file mode 100644 index 0000000..726b75f --- /dev/null +++ b/sysmodule/netmodule/tcpmodule/TcpModule.go @@ -0,0 +1,223 @@ +package tcpmodule + +import ( + "fmt" + "github.com/duanhf2012/origin/v2/event" + "github.com/duanhf2012/origin/v2/log" + "github.com/duanhf2012/origin/v2/network" + "github.com/duanhf2012/origin/v2/network/processor" + "github.com/duanhf2012/origin/v2/service" + "go.mongodb.org/mongo-driver/bson/primitive" + "runtime" + "sync" + "time" +) + +type TcpModule struct { + tcpServer network.TCPServer + service.Module + + mapClientLocker sync.RWMutex + mapClient map[string]*Client + process processor.IRawProcessor + tcpCfg *TcpCfg +} + +type TcpPackType int8 + +const ( + TPTConnected TcpPackType = 0 + TPTDisConnected TcpPackType = 1 + TPTPack TcpPackType = 2 + TPTUnknownPack TcpPackType = 3 +) + +type TcpPack struct { + Type TcpPackType //0表示连接 1表示断开 2表示数据 + ClientId string + Data interface{} + RecyclerReaderBytes func(data []byte) +} + +type Client struct { + id string + tcpConn *network.NetConn + tcpModule *TcpModule +} + +type TcpCfg struct { + ListenAddr string //监听地址 + MaxConnNum int //最大连接数 + PendingWriteNum int //写channel最大消息数量 + LittleEndian bool //是否小端序 + LenMsgLen int //消息头占用byte数量,只能是1byte,2byte,4byte。如果是4byte,意味着消息最大可以是math.MaxUint32(4GB) + MinMsgLen uint32 //最小消息长度 + MaxMsgLen uint32 //最大消息长度,超过判定不合法,断开连接 + ReadDeadlineSecond time.Duration //读超时 + WriteDeadlineSecond time.Duration //写超时 +} + +func (tm *TcpModule) OnInit() error { + if tm.tcpCfg == nil || tm.process == nil { + return fmt.Errorf("please call the Init function correctly") + } + + //2.初始化网络模块 + tm.tcpServer.Addr = tm.tcpCfg.ListenAddr + tm.tcpServer.MaxConnNum = tm.tcpCfg.MaxConnNum + tm.tcpServer.PendingWriteNum = tm.tcpCfg.PendingWriteNum + tm.tcpServer.LittleEndian = tm.tcpCfg.LittleEndian + tm.tcpServer.LenMsgLen = tm.tcpCfg.LenMsgLen + tm.tcpServer.MinMsgLen = tm.tcpCfg.MinMsgLen + tm.tcpServer.MaxMsgLen = tm.tcpCfg.MaxMsgLen + tm.tcpServer.ReadDeadline = tm.tcpCfg.ReadDeadlineSecond * time.Second + tm.tcpServer.WriteDeadline = tm.tcpCfg.WriteDeadlineSecond * time.Second + tm.mapClient = make(map[string]*Client, tm.tcpServer.MaxConnNum) + tm.tcpServer.NewAgent = tm.NewClient + + //3.设置解析处理器 + tm.process.SetByteOrder(tm.tcpCfg.LittleEndian) + + //4.设置网络事件处理 + tm.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_Tcp, tm.GetEventHandler(), tm.tcpEventHandler) + return nil +} + +func (tm *TcpModule) Init(tcpCfg *TcpCfg, process processor.IRawProcessor) { + tm.tcpCfg = tcpCfg + tm.process = process +} + +func (tm *TcpModule) Start() error { + return tm.tcpServer.Start() +} + +func (tm *TcpModule) tcpEventHandler(ev event.IEvent) { + pack := ev.(*event.Event).Data.(TcpPack) + switch pack.Type { + case TPTConnected: + tm.process.ConnectedRoute(pack.ClientId) + case TPTDisConnected: + tm.process.DisConnectedRoute(pack.ClientId) + case TPTUnknownPack: + tm.process.UnknownMsgRoute(pack.ClientId, pack.Data, pack.RecyclerReaderBytes) + case TPTPack: + tm.process.MsgRoute(pack.ClientId, pack.Data, pack.RecyclerReaderBytes) + } +} + +func (tm *TcpModule) NewClient(conn network.Conn) network.Agent { + tm.mapClientLocker.Lock() + defer tm.mapClientLocker.Unlock() + + clientId := primitive.NewObjectID().Hex() + pClient := &Client{tcpConn: conn.(*network.NetConn), id: clientId} + pClient.tcpModule = tm + tm.mapClient[clientId] = pClient + + return pClient +} + +func (slf *Client) GetId() string { + return slf.id +} + +func (slf *Client) Run() { + defer func() { + if r := recover(); r != nil { + buf := make([]byte, 4096) + l := runtime.Stack(buf, false) + errString := fmt.Sprint(r) + log.Dump(string(buf[:l]), log.String("error", errString)) + } + }() + + slf.tcpModule.NotifyEvent(&event.Event{Type: event.Sys_Event_Tcp, Data: TcpPack{ClientId: slf.id, Type: TPTConnected}}) + for slf.tcpConn != nil { + slf.tcpConn.SetReadDeadline(slf.tcpModule.tcpServer.ReadDeadline) + bytes, err := slf.tcpConn.ReadMsg() + if err != nil { + log.Debug("read client failed", log.ErrorAttr("error", err), log.String("clientId", slf.id)) + break + } + data, err := slf.tcpModule.process.Unmarshal(slf.id, bytes) + if err != nil { + slf.tcpModule.NotifyEvent(&event.Event{Type: event.Sys_Event_Tcp, Data: TcpPack{ClientId: slf.id, Type: TPTUnknownPack, Data: bytes, RecyclerReaderBytes: slf.tcpConn.GetRecyclerReaderBytes()}}) + continue + } + slf.tcpModule.NotifyEvent(&event.Event{Type: event.Sys_Event_Tcp, Data: TcpPack{ClientId: slf.id, Type: TPTPack, Data: data, RecyclerReaderBytes: slf.tcpConn.GetRecyclerReaderBytes()}}) + } +} + +func (slf *Client) OnClose() { + slf.tcpModule.NotifyEvent(&event.Event{Type: event.Sys_Event_Tcp, Data: TcpPack{ClientId: slf.id, Type: TPTDisConnected}}) + slf.tcpModule.mapClientLocker.Lock() + defer slf.tcpModule.mapClientLocker.Unlock() + delete(slf.tcpModule.mapClient, slf.GetId()) +} + +func (tm *TcpModule) SendMsg(clientId string, msg interface{}) error { + tm.mapClientLocker.Lock() + client, ok := tm.mapClient[clientId] + if ok == false { + tm.mapClientLocker.Unlock() + return fmt.Errorf("client %d is disconnect!", clientId) + } + + tm.mapClientLocker.Unlock() + bytes, err := tm.process.Marshal(clientId, msg) + if err != nil { + return err + } + return client.tcpConn.WriteMsg(bytes) +} + +func (tm *TcpModule) Close(clientId string) { + tm.mapClientLocker.Lock() + defer tm.mapClientLocker.Unlock() + + client, ok := tm.mapClient[clientId] + if ok == false { + return + } + + if client.tcpConn != nil { + client.tcpConn.Close() + } + + log.SWarning("close client:", clientId) + return +} + +func (tm *TcpModule) GetClientIp(clientId string) string { + tm.mapClientLocker.Lock() + defer tm.mapClientLocker.Unlock() + pClient, ok := tm.mapClient[clientId] + if ok == false { + return "" + } + + return pClient.tcpConn.GetRemoteIp() +} + +func (tm *TcpModule) SendRawMsg(clientId string, msg []byte) error { + tm.mapClientLocker.Lock() + client, ok := tm.mapClient[clientId] + if ok == false { + tm.mapClientLocker.Unlock() + return fmt.Errorf("client %s is disconnect", clientId) + } + tm.mapClientLocker.Unlock() + return client.tcpConn.WriteMsg(msg) +} + +func (tm *TcpModule) GetConnNum() int { + tm.mapClientLocker.Lock() + connNum := len(tm.mapClient) + tm.mapClientLocker.Unlock() + return connNum +} + +func (tm *TcpModule) GetProcessor() processor.IRawProcessor { + return tm.process +} diff --git a/sysmodule/netmodule/wsmodule/WSModule.go b/sysmodule/netmodule/wsmodule/WSModule.go new file mode 100644 index 0000000..93d8c15 --- /dev/null +++ b/sysmodule/netmodule/wsmodule/WSModule.go @@ -0,0 +1,200 @@ +package wsmodule + +import ( + "fmt" + "github.com/duanhf2012/origin/v2/event" + "github.com/duanhf2012/origin/v2/log" + "github.com/duanhf2012/origin/v2/network" + "github.com/duanhf2012/origin/v2/network/processor" + "github.com/duanhf2012/origin/v2/service" + "go.mongodb.org/mongo-driver/bson/primitive" + "sync" +) + +type WSModule struct { + service.Module + + wsServer network.WSServer + + mapClientLocker sync.RWMutex + mapClient map[string]*WSClient + process processor.IRawProcessor + wsCfg *WSCfg +} + +type WSClient struct { + id string + wsConn *network.WSConn + wsModule *WSModule +} + +type WSCfg struct { + ListenAddr string + MaxConnNum int + PendingWriteNum int + MaxMsgLen uint32 + LittleEndian bool //是否小端序 +} + +type WSPackType int8 + +const ( + WPTConnected WSPackType = 0 + WPTDisConnected WSPackType = 1 + WPTPack WSPackType = 2 + WPTUnknownPack WSPackType = 3 +) + +type WSPack struct { + Type WSPackType //0表示连接 1表示断开 2表示数据 + MsgProcessor processor.IRawProcessor + ClientId string + Data any +} + +func (ws *WSModule) OnInit() error { + if ws.wsCfg == nil || ws.process == nil { + return fmt.Errorf("please call the Init function correctly") + } + + ws.wsServer.MaxConnNum = ws.wsCfg.MaxConnNum + ws.wsServer.PendingWriteNum = ws.wsCfg.PendingWriteNum + ws.wsServer.MaxMsgLen = ws.wsCfg.MaxMsgLen + ws.wsServer.Addr = ws.wsCfg.ListenAddr + + //3.设置解析处理器 + ws.process.SetByteOrder(ws.wsCfg.LittleEndian) + + ws.mapClient = make(map[string]*WSClient, ws.wsServer.MaxConnNum) + ws.wsServer.NewAgent = ws.NewWSClient + + //4.设置网络事件处理 + ws.GetEventProcessor().RegEventReceiverFunc(event.Sys_Event_WebSocket, ws.GetEventHandler(), ws.wsEventHandler) + + return nil +} + +func (ws *WSModule) Init(wsCfg *WSCfg, process processor.IRawProcessor) { + ws.wsCfg = wsCfg + ws.process = process +} + +func (ws *WSModule) Start() error { + ws.wsServer.Start() + return nil +} + +func (ws *WSModule) wsEventHandler(ev event.IEvent) { + pack := ev.(*event.Event).Data.(*WSPack) + switch pack.Type { + case WPTConnected: + ws.process.ConnectedRoute(pack.ClientId) + case WPTDisConnected: + ws.process.DisConnectedRoute(pack.ClientId) + case WPTUnknownPack: + ws.process.UnknownMsgRoute(pack.ClientId, pack.Data, ws.recyclerReaderBytes) + case WPTPack: + ws.process.MsgRoute(pack.ClientId, pack.Data, ws.recyclerReaderBytes) + } +} + +func (ws *WSModule) recyclerReaderBytes(data []byte) { +} + +func (ws *WSModule) NewWSClient(conn *network.WSConn) network.Agent { + ws.mapClientLocker.Lock() + defer ws.mapClientLocker.Unlock() + + pClient := &WSClient{wsConn: conn, id: primitive.NewObjectID().Hex()} + pClient.wsModule = ws + ws.mapClient[pClient.id] = pClient + + return pClient +} + +func (wc *WSClient) GetId() string { + return wc.id +} + +func (wc *WSClient) Run() { + wc.wsModule.NotifyEvent(&event.Event{Type: event.Sys_Event_WebSocket, Data: &WSPack{ClientId: wc.id, Type: WPTConnected}}) + for { + bytes, err := wc.wsConn.ReadMsg() + if err != nil { + log.Debug("read client is error", log.String("clientId", wc.id), log.ErrorAttr("err", err)) + break + } + data, err := wc.wsModule.process.Unmarshal(wc.id, bytes) + if err != nil { + wc.wsModule.NotifyEvent(&event.Event{Type: event.Sys_Event_WebSocket, Data: &WSPack{ClientId: wc.id, Type: WPTUnknownPack, Data: bytes}}) + continue + } + wc.wsModule.NotifyEvent(&event.Event{Type: event.Sys_Event_WebSocket, Data: &WSPack{ClientId: wc.id, Type: WPTPack, Data: data}}) + } +} + +func (wc *WSClient) OnClose() { + wc.wsModule.NotifyEvent(&event.Event{Type: event.Sys_Event_WebSocket, Data: &WSPack{ClientId: wc.id, Type: WPTDisConnected}}) + wc.wsModule.mapClientLocker.Lock() + defer wc.wsModule.mapClientLocker.Unlock() + delete(wc.wsModule.mapClient, wc.GetId()) +} + +func (ws *WSModule) GetProcessor() processor.IRawProcessor { + return ws.process +} + +func (ws *WSModule) GetClientIp(clientId string) string { + ws.mapClientLocker.Lock() + defer ws.mapClientLocker.Unlock() + + pClient, ok := ws.mapClient[clientId] + if ok == false { + return "" + } + + return pClient.wsConn.RemoteAddr().String() +} + +func (ws *WSModule) Close(clientId string) { + ws.mapClientLocker.Lock() + defer ws.mapClientLocker.Unlock() + + client, ok := ws.mapClient[clientId] + if ok == false { + return + } + + if client.wsConn != nil { + client.wsConn.Close() + } + + return +} + +func (ws *WSModule) SendMsg(clientId string, msg interface{}) error { + ws.mapClientLocker.Lock() + client, ok := ws.mapClient[clientId] + if ok == false { + ws.mapClientLocker.Unlock() + return fmt.Errorf("client %s is disconnect!", clientId) + } + + ws.mapClientLocker.Unlock() + bytes, err := ws.process.Marshal(clientId, msg) + if err != nil { + return err + } + return client.wsConn.WriteMsg(bytes) +} + +func (ws *WSModule) SendRawMsg(clientId string, msg []byte) error { + ws.mapClientLocker.Lock() + client, ok := ws.mapClient[clientId] + if ok == false { + ws.mapClientLocker.Unlock() + return fmt.Errorf("client %s is disconnect", clientId) + } + ws.mapClientLocker.Unlock() + return client.wsConn.WriteMsg(msg) +}