优化网络处理器

This commit is contained in:
boyce
2024-06-14 16:21:52 +08:00
parent 9c26c742fe
commit 1014bc54e4
7 changed files with 36 additions and 16 deletions

View File

@@ -45,8 +45,10 @@ func (jsonProcessor *JsonProcessor) SetByteOrder(littleEndian bool) {
} }
// must goroutine safe // must goroutine safe
func (jsonProcessor *JsonProcessor ) MsgRoute(clientId string,msg interface{}) error{ func (jsonProcessor *JsonProcessor ) MsgRoute(clientId string,msg interface{},recyclerReaderBytes func(data []byte)) error{
pPackInfo := msg.(*JsonPackInfo) pPackInfo := msg.(*JsonPackInfo)
defer recyclerReaderBytes(pPackInfo.rawMsg)
v,ok := jsonProcessor.mapMsg[pPackInfo.typ] v,ok := jsonProcessor.mapMsg[pPackInfo.typ]
if ok == false { if ok == false {
return fmt.Errorf("cannot find msgtype %d is register!",pPackInfo.typ) return fmt.Errorf("cannot find msgtype %d is register!",pPackInfo.typ)
@@ -58,7 +60,6 @@ func (jsonProcessor *JsonProcessor ) MsgRoute(clientId string,msg interface{}) e
func (jsonProcessor *JsonProcessor) Unmarshal(clientId string,data []byte) (interface{}, error) { func (jsonProcessor *JsonProcessor) Unmarshal(clientId string,data []byte) (interface{}, error) {
typeStruct := struct {Type int `json:"typ"`}{} typeStruct := struct {Type int `json:"typ"`}{}
defer jsonProcessor.ReleaseBytes(data)
err := json.Unmarshal(data, &typeStruct) err := json.Unmarshal(data, &typeStruct)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -76,7 +77,7 @@ func (jsonProcessor *JsonProcessor) Unmarshal(clientId string,data []byte) (inte
return nil,err return nil,err
} }
return &JsonPackInfo{typ:msgType,msg:msgData},nil return &JsonPackInfo{typ:msgType,msg:msgData,rawMsg: data},nil
} }
func (jsonProcessor *JsonProcessor) Marshal(clientId string,msg interface{}) ([]byte, error) { func (jsonProcessor *JsonProcessor) Marshal(clientId string,msg interface{}) ([]byte, error) {
@@ -104,7 +105,8 @@ func (jsonProcessor *JsonProcessor) MakeRawMsg(msgType uint16,msg []byte) *JsonP
return &JsonPackInfo{typ:msgType,rawMsg:msg} return &JsonPackInfo{typ:msgType,rawMsg:msg}
} }
func (jsonProcessor *JsonProcessor) UnknownMsgRoute(clientId string,msg interface{}){ func (jsonProcessor *JsonProcessor) UnknownMsgRoute(clientId string,msg interface{},recyclerReaderBytes func(data []byte)){
defer recyclerReaderBytes(msg.([]byte))
if jsonProcessor.unknownMessageHandler==nil { if jsonProcessor.unknownMessageHandler==nil {
log.Debug("Unknown message",log.String("clientId",clientId)) log.Debug("Unknown message",log.String("clientId",clientId))
return return

View File

@@ -54,8 +54,10 @@ func (slf *PBPackInfo) GetMsg() proto.Message {
} }
// must goroutine safe // must goroutine safe
func (pbProcessor *PBProcessor) MsgRoute(clientId string, msg interface{}) error { func (pbProcessor *PBProcessor) MsgRoute(clientId string, msg interface{},recyclerReaderBytes func(data []byte)) error {
pPackInfo := msg.(*PBPackInfo) pPackInfo := msg.(*PBPackInfo)
defer recyclerReaderBytes(pPackInfo.rawMsg)
v, ok := pbProcessor.mapMsg[pPackInfo.typ] v, ok := pbProcessor.mapMsg[pPackInfo.typ]
if ok == false { if ok == false {
return fmt.Errorf("Cannot find msgtype %d is register!", pPackInfo.typ) return fmt.Errorf("Cannot find msgtype %d is register!", pPackInfo.typ)
@@ -67,7 +69,6 @@ func (pbProcessor *PBProcessor) MsgRoute(clientId string, msg interface{}) error
// must goroutine safe // must goroutine safe
func (pbProcessor *PBProcessor) Unmarshal(clientId string, data []byte) (interface{}, error) { func (pbProcessor *PBProcessor) Unmarshal(clientId string, data []byte) (interface{}, error) {
defer pbProcessor.ReleaseBytes(data)
return pbProcessor.UnmarshalWithOutRelease(clientId, data) return pbProcessor.UnmarshalWithOutRelease(clientId, data)
} }
@@ -91,7 +92,7 @@ func (pbProcessor *PBProcessor) UnmarshalWithOutRelease(clientId string, data []
return nil, err return nil, err
} }
return &PBPackInfo{typ: msgType, msg: protoMsg}, nil return &PBPackInfo{typ: msgType, msg: protoMsg,rawMsg:data}, nil
} }
// must goroutine safe // must goroutine safe
@@ -133,8 +134,9 @@ func (pbProcessor *PBProcessor) MakeRawMsg(msgType uint16, msg []byte) *PBPackIn
return &PBPackInfo{typ: msgType, rawMsg: msg} return &PBPackInfo{typ: msgType, rawMsg: msg}
} }
func (pbProcessor *PBProcessor) UnknownMsgRoute(clientId string, msg interface{}) { func (pbProcessor *PBProcessor) UnknownMsgRoute(clientId string, msg interface{},recyclerReaderBytes func(data []byte)) {
pbProcessor.unknownMessageHandler(clientId, msg.([]byte)) pbProcessor.unknownMessageHandler(clientId, msg.([]byte))
recyclerReaderBytes(msg.([]byte))
} }
// connect event // connect event

View File

@@ -38,9 +38,11 @@ func (pbRawProcessor *PBRawProcessor) SetByteOrder(littleEndian bool) {
} }
// must goroutine safe // must goroutine safe
func (pbRawProcessor *PBRawProcessor ) MsgRoute(clientId string, msg interface{}) error{ func (pbRawProcessor *PBRawProcessor ) MsgRoute(clientId string, msg interface{},recyclerReaderBytes func(data []byte)) error{
pPackInfo := msg.(*PBRawPackInfo) pPackInfo := msg.(*PBRawPackInfo)
pbRawProcessor.msgHandler(clientId,pPackInfo.typ,pPackInfo.rawMsg) pbRawProcessor.msgHandler(clientId,pPackInfo.typ,pPackInfo.rawMsg)
recyclerReaderBytes(pPackInfo.rawMsg)
return nil return nil
} }
@@ -80,7 +82,8 @@ func (pbRawProcessor *PBRawProcessor) MakeRawMsg(msgType uint16,msg []byte,pbRaw
pbRawPackInfo.rawMsg = msg pbRawPackInfo.rawMsg = msg
} }
func (pbRawProcessor *PBRawProcessor) UnknownMsgRoute(clientId string,msg interface{}){ func (pbRawProcessor *PBRawProcessor) UnknownMsgRoute(clientId string,msg interface{},recyclerReaderBytes func(data []byte)){
defer recyclerReaderBytes(msg.([]byte))
if pbRawProcessor.unknownMessageHandler == nil { if pbRawProcessor.unknownMessageHandler == nil {
return return
} }

View File

@@ -3,9 +3,9 @@ package processor
type IProcessor interface { type IProcessor interface {
// must goroutine safe // must goroutine safe
MsgRoute(clientId string,msg interface{}) error MsgRoute(clientId string,msg interface{},recyclerReaderBytes func(data []byte)) error
//must goroutine safe //must goroutine safe
UnknownMsgRoute(clientId string,msg interface{}) UnknownMsgRoute(clientId string,msg interface{},recyclerReaderBytes func(data []byte))
// connect event // connect event
ConnectedRoute(clientId string) ConnectedRoute(clientId string)
DisConnectedRoute(clientId string) DisConnectedRoute(clientId string)

View File

@@ -129,6 +129,13 @@ func (tcpConn *TCPConn) ReadMsg() ([]byte, error) {
return tcpConn.msgParser.Read(tcpConn) return tcpConn.msgParser.Read(tcpConn)
} }
func (tcpConn *TCPConn) GetRecyclerReaderBytes() func (data []byte) {
bytePool := tcpConn.msgParser.IBytesMempool
return func(data []byte) {
bytePool.ReleaseBytes(data)
}
}
func (tcpConn *TCPConn) ReleaseReadMsg(byteBuff []byte){ func (tcpConn *TCPConn) ReleaseReadMsg(byteBuff []byte){
tcpConn.msgParser.ReleaseBytes(byteBuff) tcpConn.msgParser.ReleaseBytes(byteBuff)
} }

View File

@@ -107,12 +107,16 @@ func (tcpService *TcpService) TcpEventHandler(ev event.IEvent) {
case TPT_DisConnected: case TPT_DisConnected:
tcpService.process.DisConnectedRoute(pack.ClientId) tcpService.process.DisConnectedRoute(pack.ClientId)
case TPT_UnknownPack: case TPT_UnknownPack:
tcpService.process.UnknownMsgRoute(pack.ClientId,pack.Data) tcpService.process.UnknownMsgRoute(pack.ClientId,pack.Data,tcpService.recyclerReaderBytes)
case TPT_Pack: case TPT_Pack:
tcpService.process.MsgRoute(pack.ClientId,pack.Data) tcpService.process.MsgRoute(pack.ClientId,pack.Data,tcpService.recyclerReaderBytes)
} }
} }
func (tcpService *TcpService) recyclerReaderBytes(data []byte) {
}
func (tcpService *TcpService) SetProcessor(process processor.IProcessor,handler event.IEventHandler){ func (tcpService *TcpService) SetProcessor(process processor.IProcessor,handler event.IEventHandler){
tcpService.process = process tcpService.process = process
tcpService.RegEventReceiverFunc(event.Sys_Event_Tcp,handler, tcpService.TcpEventHandler) tcpService.RegEventReceiverFunc(event.Sys_Event_Tcp,handler, tcpService.TcpEventHandler)

View File

@@ -95,9 +95,9 @@ func (ws *WSService) WSEventHandler(ev event.IEvent) {
case WPT_DisConnected: case WPT_DisConnected:
pack.MsgProcessor.DisConnectedRoute(pack.ClientId) pack.MsgProcessor.DisConnectedRoute(pack.ClientId)
case WPT_UnknownPack: case WPT_UnknownPack:
pack.MsgProcessor.UnknownMsgRoute(pack.ClientId,pack.Data) pack.MsgProcessor.UnknownMsgRoute(pack.ClientId,pack.Data,ws.recyclerReaderBytes)
case WPT_Pack: case WPT_Pack:
pack.MsgProcessor.MsgRoute(pack.ClientId,pack.Data) pack.MsgProcessor.MsgRoute(pack.ClientId,pack.Data,ws.recyclerReaderBytes)
} }
} }
@@ -180,3 +180,5 @@ func (ws *WSService) Close(clientid string) {
return return
} }
func (ws *WSService) recyclerReaderBytes(data []byte) {
}