From e676587b4d3b3066af77829118a59e70303ade1f Mon Sep 17 00:00:00 2001 From: duanhf2012 Date: Tue, 11 Jan 2022 13:57:39 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=8D=8F=E8=AE=AE=E7=9A=84Pr?= =?UTF-8?q?ocessor=E5=87=BD=E6=95=B0=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- network/processor/pbrawprocessor.go | 20 ++++++++++---------- network/processor/processor.go | 24 ++++++++++++------------ network/tcp_conn.go | 24 +++++++++++++++++------- rpc/rpchandler.go | 2 +- sysservice/tcpservice/tcpservice.go | 20 +++++++++++++++----- 5 files changed, 55 insertions(+), 35 deletions(-) diff --git a/network/processor/pbrawprocessor.go b/network/processor/pbrawprocessor.go index 6a67b07..77476a5 100644 --- a/network/processor/pbrawprocessor.go +++ b/network/processor/pbrawprocessor.go @@ -38,14 +38,14 @@ func (pbRawProcessor *PBRawProcessor) SetByteOrder(littleEndian bool) { } // must goroutine safe -func (pbRawProcessor *PBRawProcessor ) MsgRoute(msg interface{},userdata interface{}) error{ +func (pbRawProcessor *PBRawProcessor ) MsgRoute(clientId uint64, msg interface{}) error{ pPackInfo := msg.(*PBRawPackInfo) - pbRawProcessor.msgHandler(userdata.(uint64),pPackInfo.typ,pPackInfo.rawMsg) + pbRawProcessor.msgHandler(clientId,pPackInfo.typ,pPackInfo.rawMsg) return nil } // must goroutine safe -func (pbRawProcessor *PBRawProcessor ) Unmarshal(data []byte) (interface{}, error) { +func (pbRawProcessor *PBRawProcessor ) Unmarshal(clientId uint64,data []byte) (interface{}, error) { var msgType uint16 if pbRawProcessor.LittleEndian == true { msgType = binary.LittleEndian.Uint16(data[:2]) @@ -57,7 +57,7 @@ func (pbRawProcessor *PBRawProcessor ) Unmarshal(data []byte) (interface{}, erro } // must goroutine safe -func (pbRawProcessor *PBRawProcessor ) Marshal(msg interface{}) ([]byte, error){ +func (pbRawProcessor *PBRawProcessor ) Marshal(clientId uint64,msg interface{}) ([]byte, error){ pMsg := msg.(*PBRawPackInfo) buff := make([]byte, 2, len(pMsg.rawMsg)+RawMsgTypeSize) @@ -81,20 +81,20 @@ func (pbRawProcessor *PBRawProcessor) MakeRawMsg(msgType uint16,msg []byte,pbRaw //return &PBRawPackInfo{typ:msgType,rawMsg:msg} } -func (pbRawProcessor *PBRawProcessor) UnknownMsgRoute(msg interface{}, userData interface{}){ +func (pbRawProcessor *PBRawProcessor) UnknownMsgRoute(clientId uint64,msg interface{}){ if pbRawProcessor.unknownMessageHandler == nil { return } - pbRawProcessor.unknownMessageHandler(userData.(uint64),msg.([]byte)) + pbRawProcessor.unknownMessageHandler(clientId,msg.([]byte)) } // connect event -func (pbRawProcessor *PBRawProcessor) ConnectedRoute(userData interface{}){ - pbRawProcessor.connectHandler(userData.(uint64)) +func (pbRawProcessor *PBRawProcessor) ConnectedRoute(clientId uint64){ + pbRawProcessor.connectHandler(clientId) } -func (pbRawProcessor *PBRawProcessor) DisConnectedRoute(userData interface{}){ - pbRawProcessor.disconnectHandler(userData.(uint64)) +func (pbRawProcessor *PBRawProcessor) DisConnectedRoute(clientId uint64){ + pbRawProcessor.disconnectHandler(clientId) } func (pbRawProcessor *PBRawProcessor) SetUnknownMsgHandler(unknownMessageHandler UnknownRawMessageHandler){ diff --git a/network/processor/processor.go b/network/processor/processor.go index 8818a73..581a3d1 100644 --- a/network/processor/processor.go +++ b/network/processor/processor.go @@ -3,30 +3,30 @@ package processor type IProcessor interface { // must goroutine safe - MsgRoute(msg interface{}, userData interface{}) error + MsgRoute(clientId uint64,msg interface{}) error //must goroutine safe - UnknownMsgRoute(msg interface{}, userData interface{}) + UnknownMsgRoute(clientId uint64,msg interface{}) // connect event - ConnectedRoute(userData interface{}) - DisConnectedRoute(userData interface{}) + ConnectedRoute(clientId uint64) + DisConnectedRoute(clientId uint64) // must goroutine safe - Unmarshal(data []byte) (interface{}, error) + Unmarshal(clientId uint64,data []byte) (interface{}, error) // must goroutine safe - Marshal(msg interface{}) ([]byte, error) + Marshal(clientId uint64,msg interface{}) ([]byte, error) } type IRawProcessor interface { SetByteOrder(littleEndian bool) - MsgRoute(msg interface{},userdata interface{}) error - Unmarshal(data []byte) (interface{}, error) - Marshal(msg interface{}) ([]byte, error) + MsgRoute(clientId uint64,msg interface{}) error + Unmarshal(clientId uint64,data []byte) (interface{}, error) + Marshal(clientId uint64,msg interface{}) ([]byte, error) SetRawMsgHandler(handle RawMessageHandler) MakeRawMsg(msgType uint16,msg []byte,pbRawPackInfo *PBRawPackInfo) - UnknownMsgRoute(msg interface{}, userData interface{}) - ConnectedRoute(userData interface{}) - DisConnectedRoute(userData interface{}) + UnknownMsgRoute(clientId uint64,msg interface{}) + ConnectedRoute(clientId uint64) + DisConnectedRoute(clientId uint64) SetUnknownMsgHandler(unknownMessageHandler UnknownRawMessageHandler) SetConnectedHandler(connectHandler RawConnectHandler) diff --git a/network/tcp_conn.go b/network/tcp_conn.go index f9c04b7..4f24f3f 100644 --- a/network/tcp_conn.go +++ b/network/tcp_conn.go @@ -1,11 +1,11 @@ package network import ( - "fmt" "github.com/duanhf2012/origin/log" "net" "sync" "time" + "errors" ) type ConnSet map[net.Conn]struct{} @@ -86,27 +86,28 @@ func (tcpConn *TCPConn) GetRemoteIp() string { return tcpConn.conn.RemoteAddr().String() } -func (tcpConn *TCPConn) doWrite(b []byte) { +func (tcpConn *TCPConn) doWrite(b []byte) error{ if len(tcpConn.writeChan) == cap(tcpConn.writeChan) { tcpConn.ReleaseReadMsg(b) log.SError("close conn: channel full") tcpConn.doDestroy() - return + return errors.New("close conn: channel full") } tcpConn.writeChan <- b + return nil } // b must not be modified by the others goroutines -func (tcpConn *TCPConn) Write(b []byte) { +func (tcpConn *TCPConn) Write(b []byte) error{ tcpConn.Lock() defer tcpConn.Unlock() if tcpConn.closeFlag || b == nil { tcpConn.ReleaseReadMsg(b) - return + return errors.New("conn is close") } - tcpConn.doWrite(b) + return tcpConn.doWrite(b) } func (tcpConn *TCPConn) Read(b []byte) (int, error) { @@ -131,11 +132,20 @@ func (tcpConn *TCPConn) ReleaseReadMsg(byteBuff []byte){ func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error { if tcpConn.closeFlag == true { - return fmt.Errorf("conn is close") + return errors.New("conn is close") } return tcpConn.msgParser.Write(tcpConn, args...) } +func (tcpConn *TCPConn) WriteRawMsg(args []byte) error { + if tcpConn.closeFlag == true { + return errors.New("conn is close") + } + + return tcpConn.Write(args) +} + + func (tcpConn *TCPConn) IsConnected() bool { return tcpConn.closeFlag == false } diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index 0b532f2..989851c 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -453,7 +453,7 @@ func (handler *RpcHandler) asyncCallRpc(nodeId int,serviceMethod string,args int err = errors.New("cannot find rpcClient from nodeId "+strNodeId+" "+serviceMethod) } fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) - log.SError("Call serviceMethod is error:%+v!",err.Error()) + log.SError("Call serviceMethod is error:",err.Error()) return nil } diff --git a/sysservice/tcpservice/tcpservice.go b/sysservice/tcpservice/tcpservice.go index 8679751..a0fbd01 100644 --- a/sysservice/tcpservice/tcpservice.go +++ b/sysservice/tcpservice/tcpservice.go @@ -10,6 +10,7 @@ import ( "github.com/duanhf2012/origin/service" "sync" "time" + "runtime" ) type TcpService struct { @@ -143,9 +144,9 @@ func (tcpService *TcpService) TcpEventHandler(ev event.IEvent) { case TPT_DisConnected: tcpService.process.DisConnectedRoute(pack.ClientId) case TPT_UnknownPack: - tcpService.process.UnknownMsgRoute(pack.Data,pack.ClientId) + tcpService.process.UnknownMsgRoute(pack.ClientId,pack.Data) case TPT_Pack: - tcpService.process.MsgRoute(pack.Data, pack.ClientId) + tcpService.process.MsgRoute(pack.ClientId,pack.Data) } } @@ -180,6 +181,15 @@ func (slf *Client) GetId() uint64 { } func (slf *Client) Run() { + defer func() { + if r := recover(); r != nil { + buf := make([]byte, 4096) + l := runtime.Stack(buf, false) + errString := fmt.Sprint(r) + log.SError("core dump info[",errString,"]\n",string(buf[:l])) + } + }() + slf.tcpService.NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp,Data:TcpPack{ClientId:slf.id,Type:TPT_Connected}}) for{ if slf.tcpConn == nil { @@ -192,7 +202,7 @@ func (slf *Client) Run() { log.SDebug("read client id ",slf.id," is error:",err.Error()) break } - data,err:=slf.tcpService.process.Unmarshal(bytes) + data,err:=slf.tcpService.process.Unmarshal(slf.id,bytes) if err != nil { slf.tcpService.NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp,Data:TcpPack{ClientId:slf.id,Type:TPT_UnknownPack,Data:bytes}}) @@ -218,7 +228,7 @@ func (tcpService *TcpService) SendMsg(clientId uint64,msg interface{}) error{ } tcpService.mapClientLocker.Unlock() - bytes,err := tcpService.process.Marshal(msg) + bytes,err := tcpService.process.Marshal(clientId,msg) if err != nil { return err } @@ -262,7 +272,7 @@ func (tcpService *TcpService) SendRawMsg(clientId uint64,msg []byte) error{ } tcpService.mapClientLocker.Unlock() client.tcpConn.SetWriteDeadline(tcpService.WriteDeadline) - return client.tcpConn.WriteMsg(msg) + return client.tcpConn.WriteRawMsg(msg) } func (tcpService *TcpService) GetConnNum() int {