From 1014bc54e4e3934db98a03b737ab2f79098eead0 Mon Sep 17 00:00:00 2001 From: boyce Date: Fri, 14 Jun 2024 16:21:52 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=BD=91=E7=BB=9C=E5=A4=84?= =?UTF-8?q?=E7=90=86=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- network/processor/jsonprocessor.go | 10 ++++++---- network/processor/pbprocessor.go | 10 ++++++---- network/processor/pbrawprocessor.go | 7 +++++-- network/processor/processor.go | 4 ++-- network/tcp_conn.go | 7 +++++++ sysservice/tcpservice/tcpservice.go | 8 ++++++-- sysservice/wsservice/wsservice.go | 6 ++++-- 7 files changed, 36 insertions(+), 16 deletions(-) diff --git a/network/processor/jsonprocessor.go b/network/processor/jsonprocessor.go index 2b4f7a3..fd12ef4 100644 --- a/network/processor/jsonprocessor.go +++ b/network/processor/jsonprocessor.go @@ -45,8 +45,10 @@ func (jsonProcessor *JsonProcessor) SetByteOrder(littleEndian bool) { } // must goroutine safe -func (jsonProcessor *JsonProcessor ) MsgRoute(clientId string,msg interface{}) error{ +func (jsonProcessor *JsonProcessor ) MsgRoute(clientId string,msg interface{},recyclerReaderBytes func(data []byte)) error{ pPackInfo := msg.(*JsonPackInfo) + defer recyclerReaderBytes(pPackInfo.rawMsg) + v,ok := jsonProcessor.mapMsg[pPackInfo.typ] if ok == false { return fmt.Errorf("cannot find msgtype %d is register!",pPackInfo.typ) @@ -58,7 +60,6 @@ func (jsonProcessor *JsonProcessor ) MsgRoute(clientId string,msg interface{}) e func (jsonProcessor *JsonProcessor) Unmarshal(clientId string,data []byte) (interface{}, error) { typeStruct := struct {Type int `json:"typ"`}{} - defer jsonProcessor.ReleaseBytes(data) err := json.Unmarshal(data, &typeStruct) if err != nil { return nil, err @@ -76,7 +77,7 @@ func (jsonProcessor *JsonProcessor) Unmarshal(clientId string,data []byte) (inte return nil,err } - return &JsonPackInfo{typ:msgType,msg:msgData},nil + return &JsonPackInfo{typ:msgType,msg:msgData,rawMsg: data},nil } func (jsonProcessor *JsonProcessor) Marshal(clientId string,msg interface{}) ([]byte, error) { @@ -104,7 +105,8 @@ func (jsonProcessor *JsonProcessor) MakeRawMsg(msgType uint16,msg []byte) *JsonP return &JsonPackInfo{typ:msgType,rawMsg:msg} } -func (jsonProcessor *JsonProcessor) UnknownMsgRoute(clientId string,msg interface{}){ +func (jsonProcessor *JsonProcessor) UnknownMsgRoute(clientId string,msg interface{},recyclerReaderBytes func(data []byte)){ + defer recyclerReaderBytes(msg.([]byte)) if jsonProcessor.unknownMessageHandler==nil { log.Debug("Unknown message",log.String("clientId",clientId)) return diff --git a/network/processor/pbprocessor.go b/network/processor/pbprocessor.go index 44bc439..8fbfcff 100644 --- a/network/processor/pbprocessor.go +++ b/network/processor/pbprocessor.go @@ -54,8 +54,10 @@ func (slf *PBPackInfo) GetMsg() proto.Message { } // must goroutine safe -func (pbProcessor *PBProcessor) MsgRoute(clientId string, msg interface{}) error { +func (pbProcessor *PBProcessor) MsgRoute(clientId string, msg interface{},recyclerReaderBytes func(data []byte)) error { pPackInfo := msg.(*PBPackInfo) + defer recyclerReaderBytes(pPackInfo.rawMsg) + v, ok := pbProcessor.mapMsg[pPackInfo.typ] if ok == false { return fmt.Errorf("Cannot find msgtype %d is register!", pPackInfo.typ) @@ -67,7 +69,6 @@ func (pbProcessor *PBProcessor) MsgRoute(clientId string, msg interface{}) error // must goroutine safe func (pbProcessor *PBProcessor) Unmarshal(clientId string, data []byte) (interface{}, error) { - defer pbProcessor.ReleaseBytes(data) return pbProcessor.UnmarshalWithOutRelease(clientId, data) } @@ -91,7 +92,7 @@ func (pbProcessor *PBProcessor) UnmarshalWithOutRelease(clientId string, data [] return nil, err } - return &PBPackInfo{typ: msgType, msg: protoMsg}, nil + return &PBPackInfo{typ: msgType, msg: protoMsg,rawMsg:data}, nil } // must goroutine safe @@ -133,8 +134,9 @@ func (pbProcessor *PBProcessor) MakeRawMsg(msgType uint16, msg []byte) *PBPackIn return &PBPackInfo{typ: msgType, rawMsg: msg} } -func (pbProcessor *PBProcessor) UnknownMsgRoute(clientId string, msg interface{}) { +func (pbProcessor *PBProcessor) UnknownMsgRoute(clientId string, msg interface{},recyclerReaderBytes func(data []byte)) { pbProcessor.unknownMessageHandler(clientId, msg.([]byte)) + recyclerReaderBytes(msg.([]byte)) } // connect event diff --git a/network/processor/pbrawprocessor.go b/network/processor/pbrawprocessor.go index c6b8fe3..de74ef6 100644 --- a/network/processor/pbrawprocessor.go +++ b/network/processor/pbrawprocessor.go @@ -38,9 +38,11 @@ func (pbRawProcessor *PBRawProcessor) SetByteOrder(littleEndian bool) { } // must goroutine safe -func (pbRawProcessor *PBRawProcessor ) MsgRoute(clientId string, msg interface{}) error{ +func (pbRawProcessor *PBRawProcessor ) MsgRoute(clientId string, msg interface{},recyclerReaderBytes func(data []byte)) error{ pPackInfo := msg.(*PBRawPackInfo) pbRawProcessor.msgHandler(clientId,pPackInfo.typ,pPackInfo.rawMsg) + recyclerReaderBytes(pPackInfo.rawMsg) + return nil } @@ -80,7 +82,8 @@ func (pbRawProcessor *PBRawProcessor) MakeRawMsg(msgType uint16,msg []byte,pbRaw pbRawPackInfo.rawMsg = msg } -func (pbRawProcessor *PBRawProcessor) UnknownMsgRoute(clientId string,msg interface{}){ +func (pbRawProcessor *PBRawProcessor) UnknownMsgRoute(clientId string,msg interface{},recyclerReaderBytes func(data []byte)){ + defer recyclerReaderBytes(msg.([]byte)) if pbRawProcessor.unknownMessageHandler == nil { return } diff --git a/network/processor/processor.go b/network/processor/processor.go index b28279a..b24530f 100644 --- a/network/processor/processor.go +++ b/network/processor/processor.go @@ -3,9 +3,9 @@ package processor type IProcessor interface { // must goroutine safe - MsgRoute(clientId string,msg interface{}) error + MsgRoute(clientId string,msg interface{},recyclerReaderBytes func(data []byte)) error //must goroutine safe - UnknownMsgRoute(clientId string,msg interface{}) + UnknownMsgRoute(clientId string,msg interface{},recyclerReaderBytes func(data []byte)) // connect event ConnectedRoute(clientId string) DisConnectedRoute(clientId string) diff --git a/network/tcp_conn.go b/network/tcp_conn.go index 6e1e02f..f02ce2a 100644 --- a/network/tcp_conn.go +++ b/network/tcp_conn.go @@ -129,6 +129,13 @@ func (tcpConn *TCPConn) ReadMsg() ([]byte, error) { return tcpConn.msgParser.Read(tcpConn) } +func (tcpConn *TCPConn) GetRecyclerReaderBytes() func (data []byte) { + bytePool := tcpConn.msgParser.IBytesMempool + return func(data []byte) { + bytePool.ReleaseBytes(data) + } +} + func (tcpConn *TCPConn) ReleaseReadMsg(byteBuff []byte){ tcpConn.msgParser.ReleaseBytes(byteBuff) } diff --git a/sysservice/tcpservice/tcpservice.go b/sysservice/tcpservice/tcpservice.go index f998165..7c9db6f 100644 --- a/sysservice/tcpservice/tcpservice.go +++ b/sysservice/tcpservice/tcpservice.go @@ -107,12 +107,16 @@ func (tcpService *TcpService) TcpEventHandler(ev event.IEvent) { case TPT_DisConnected: tcpService.process.DisConnectedRoute(pack.ClientId) case TPT_UnknownPack: - tcpService.process.UnknownMsgRoute(pack.ClientId,pack.Data) + tcpService.process.UnknownMsgRoute(pack.ClientId,pack.Data,tcpService.recyclerReaderBytes) case TPT_Pack: - tcpService.process.MsgRoute(pack.ClientId,pack.Data) + tcpService.process.MsgRoute(pack.ClientId,pack.Data,tcpService.recyclerReaderBytes) } } +func (tcpService *TcpService) recyclerReaderBytes(data []byte) { +} + + func (tcpService *TcpService) SetProcessor(process processor.IProcessor,handler event.IEventHandler){ tcpService.process = process tcpService.RegEventReceiverFunc(event.Sys_Event_Tcp,handler, tcpService.TcpEventHandler) diff --git a/sysservice/wsservice/wsservice.go b/sysservice/wsservice/wsservice.go index 621d0b2..54e031d 100644 --- a/sysservice/wsservice/wsservice.go +++ b/sysservice/wsservice/wsservice.go @@ -95,9 +95,9 @@ func (ws *WSService) WSEventHandler(ev event.IEvent) { case WPT_DisConnected: pack.MsgProcessor.DisConnectedRoute(pack.ClientId) case WPT_UnknownPack: - pack.MsgProcessor.UnknownMsgRoute(pack.ClientId,pack.Data) + pack.MsgProcessor.UnknownMsgRoute(pack.ClientId,pack.Data,ws.recyclerReaderBytes) case WPT_Pack: - pack.MsgProcessor.MsgRoute(pack.ClientId,pack.Data) + pack.MsgProcessor.MsgRoute(pack.ClientId,pack.Data,ws.recyclerReaderBytes) } } @@ -180,3 +180,5 @@ func (ws *WSService) Close(clientid string) { return } +func (ws *WSService) recyclerReaderBytes(data []byte) { +}