优化协议的Processor函数参数

This commit is contained in:
duanhf2012
2022-01-11 13:57:39 +08:00
parent 313cc032a1
commit e676587b4d
5 changed files with 55 additions and 35 deletions

View File

@@ -38,14 +38,14 @@ func (pbRawProcessor *PBRawProcessor) SetByteOrder(littleEndian bool) {
} }
// must goroutine safe // must goroutine safe
func (pbRawProcessor *PBRawProcessor ) MsgRoute(msg interface{},userdata interface{}) error{ func (pbRawProcessor *PBRawProcessor ) MsgRoute(clientId uint64, msg interface{}) error{
pPackInfo := msg.(*PBRawPackInfo) pPackInfo := msg.(*PBRawPackInfo)
pbRawProcessor.msgHandler(userdata.(uint64),pPackInfo.typ,pPackInfo.rawMsg) pbRawProcessor.msgHandler(clientId,pPackInfo.typ,pPackInfo.rawMsg)
return nil return nil
} }
// must goroutine safe // must goroutine safe
func (pbRawProcessor *PBRawProcessor ) Unmarshal(data []byte) (interface{}, error) { func (pbRawProcessor *PBRawProcessor ) Unmarshal(clientId uint64,data []byte) (interface{}, error) {
var msgType uint16 var msgType uint16
if pbRawProcessor.LittleEndian == true { if pbRawProcessor.LittleEndian == true {
msgType = binary.LittleEndian.Uint16(data[:2]) msgType = binary.LittleEndian.Uint16(data[:2])
@@ -57,7 +57,7 @@ func (pbRawProcessor *PBRawProcessor ) Unmarshal(data []byte) (interface{}, erro
} }
// must goroutine safe // must goroutine safe
func (pbRawProcessor *PBRawProcessor ) Marshal(msg interface{}) ([]byte, error){ func (pbRawProcessor *PBRawProcessor ) Marshal(clientId uint64,msg interface{}) ([]byte, error){
pMsg := msg.(*PBRawPackInfo) pMsg := msg.(*PBRawPackInfo)
buff := make([]byte, 2, len(pMsg.rawMsg)+RawMsgTypeSize) buff := make([]byte, 2, len(pMsg.rawMsg)+RawMsgTypeSize)
@@ -81,20 +81,20 @@ func (pbRawProcessor *PBRawProcessor) MakeRawMsg(msgType uint16,msg []byte,pbRaw
//return &PBRawPackInfo{typ:msgType,rawMsg:msg} //return &PBRawPackInfo{typ:msgType,rawMsg:msg}
} }
func (pbRawProcessor *PBRawProcessor) UnknownMsgRoute(msg interface{}, userData interface{}){ func (pbRawProcessor *PBRawProcessor) UnknownMsgRoute(clientId uint64,msg interface{}){
if pbRawProcessor.unknownMessageHandler == nil { if pbRawProcessor.unknownMessageHandler == nil {
return return
} }
pbRawProcessor.unknownMessageHandler(userData.(uint64),msg.([]byte)) pbRawProcessor.unknownMessageHandler(clientId,msg.([]byte))
} }
// connect event // connect event
func (pbRawProcessor *PBRawProcessor) ConnectedRoute(userData interface{}){ func (pbRawProcessor *PBRawProcessor) ConnectedRoute(clientId uint64){
pbRawProcessor.connectHandler(userData.(uint64)) pbRawProcessor.connectHandler(clientId)
} }
func (pbRawProcessor *PBRawProcessor) DisConnectedRoute(userData interface{}){ func (pbRawProcessor *PBRawProcessor) DisConnectedRoute(clientId uint64){
pbRawProcessor.disconnectHandler(userData.(uint64)) pbRawProcessor.disconnectHandler(clientId)
} }
func (pbRawProcessor *PBRawProcessor) SetUnknownMsgHandler(unknownMessageHandler UnknownRawMessageHandler){ func (pbRawProcessor *PBRawProcessor) SetUnknownMsgHandler(unknownMessageHandler UnknownRawMessageHandler){

View File

@@ -3,30 +3,30 @@ package processor
type IProcessor interface { type IProcessor interface {
// must goroutine safe // must goroutine safe
MsgRoute(msg interface{}, userData interface{}) error MsgRoute(clientId uint64,msg interface{}) error
//must goroutine safe //must goroutine safe
UnknownMsgRoute(msg interface{}, userData interface{}) UnknownMsgRoute(clientId uint64,msg interface{})
// connect event // connect event
ConnectedRoute(userData interface{}) ConnectedRoute(clientId uint64)
DisConnectedRoute(userData interface{}) DisConnectedRoute(clientId uint64)
// must goroutine safe // must goroutine safe
Unmarshal(data []byte) (interface{}, error) Unmarshal(clientId uint64,data []byte) (interface{}, error)
// must goroutine safe // must goroutine safe
Marshal(msg interface{}) ([]byte, error) Marshal(clientId uint64,msg interface{}) ([]byte, error)
} }
type IRawProcessor interface { type IRawProcessor interface {
SetByteOrder(littleEndian bool) SetByteOrder(littleEndian bool)
MsgRoute(msg interface{},userdata interface{}) error MsgRoute(clientId uint64,msg interface{}) error
Unmarshal(data []byte) (interface{}, error) Unmarshal(clientId uint64,data []byte) (interface{}, error)
Marshal(msg interface{}) ([]byte, error) Marshal(clientId uint64,msg interface{}) ([]byte, error)
SetRawMsgHandler(handle RawMessageHandler) SetRawMsgHandler(handle RawMessageHandler)
MakeRawMsg(msgType uint16,msg []byte,pbRawPackInfo *PBRawPackInfo) MakeRawMsg(msgType uint16,msg []byte,pbRawPackInfo *PBRawPackInfo)
UnknownMsgRoute(msg interface{}, userData interface{}) UnknownMsgRoute(clientId uint64,msg interface{})
ConnectedRoute(userData interface{}) ConnectedRoute(clientId uint64)
DisConnectedRoute(userData interface{}) DisConnectedRoute(clientId uint64)
SetUnknownMsgHandler(unknownMessageHandler UnknownRawMessageHandler) SetUnknownMsgHandler(unknownMessageHandler UnknownRawMessageHandler)
SetConnectedHandler(connectHandler RawConnectHandler) SetConnectedHandler(connectHandler RawConnectHandler)

View File

@@ -1,11 +1,11 @@
package network package network
import ( import (
"fmt"
"github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/log"
"net" "net"
"sync" "sync"
"time" "time"
"errors"
) )
type ConnSet map[net.Conn]struct{} type ConnSet map[net.Conn]struct{}
@@ -86,27 +86,28 @@ func (tcpConn *TCPConn) GetRemoteIp() string {
return tcpConn.conn.RemoteAddr().String() return tcpConn.conn.RemoteAddr().String()
} }
func (tcpConn *TCPConn) doWrite(b []byte) { func (tcpConn *TCPConn) doWrite(b []byte) error{
if len(tcpConn.writeChan) == cap(tcpConn.writeChan) { if len(tcpConn.writeChan) == cap(tcpConn.writeChan) {
tcpConn.ReleaseReadMsg(b) tcpConn.ReleaseReadMsg(b)
log.SError("close conn: channel full") log.SError("close conn: channel full")
tcpConn.doDestroy() tcpConn.doDestroy()
return return errors.New("close conn: channel full")
} }
tcpConn.writeChan <- b tcpConn.writeChan <- b
return nil
} }
// b must not be modified by the others goroutines // b must not be modified by the others goroutines
func (tcpConn *TCPConn) Write(b []byte) { func (tcpConn *TCPConn) Write(b []byte) error{
tcpConn.Lock() tcpConn.Lock()
defer tcpConn.Unlock() defer tcpConn.Unlock()
if tcpConn.closeFlag || b == nil { if tcpConn.closeFlag || b == nil {
tcpConn.ReleaseReadMsg(b) tcpConn.ReleaseReadMsg(b)
return return errors.New("conn is close")
} }
tcpConn.doWrite(b) return tcpConn.doWrite(b)
} }
func (tcpConn *TCPConn) Read(b []byte) (int, error) { func (tcpConn *TCPConn) Read(b []byte) (int, error) {
@@ -131,11 +132,20 @@ func (tcpConn *TCPConn) ReleaseReadMsg(byteBuff []byte){
func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error { func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error {
if tcpConn.closeFlag == true { if tcpConn.closeFlag == true {
return fmt.Errorf("conn is close") return errors.New("conn is close")
} }
return tcpConn.msgParser.Write(tcpConn, args...) return tcpConn.msgParser.Write(tcpConn, args...)
} }
func (tcpConn *TCPConn) WriteRawMsg(args []byte) error {
if tcpConn.closeFlag == true {
return errors.New("conn is close")
}
return tcpConn.Write(args)
}
func (tcpConn *TCPConn) IsConnected() bool { func (tcpConn *TCPConn) IsConnected() bool {
return tcpConn.closeFlag == false return tcpConn.closeFlag == false
} }

View File

@@ -453,7 +453,7 @@ func (handler *RpcHandler) asyncCallRpc(nodeId int,serviceMethod string,args int
err = errors.New("cannot find rpcClient from nodeId "+strNodeId+" "+serviceMethod) err = errors.New("cannot find rpcClient from nodeId "+strNodeId+" "+serviceMethod)
} }
fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)})
log.SError("Call serviceMethod is error:%+v!",err.Error()) log.SError("Call serviceMethod is error:",err.Error())
return nil return nil
} }

View File

@@ -10,6 +10,7 @@ import (
"github.com/duanhf2012/origin/service" "github.com/duanhf2012/origin/service"
"sync" "sync"
"time" "time"
"runtime"
) )
type TcpService struct { type TcpService struct {
@@ -143,9 +144,9 @@ func (tcpService *TcpService) TcpEventHandler(ev event.IEvent) {
case TPT_DisConnected: case TPT_DisConnected:
tcpService.process.DisConnectedRoute(pack.ClientId) tcpService.process.DisConnectedRoute(pack.ClientId)
case TPT_UnknownPack: case TPT_UnknownPack:
tcpService.process.UnknownMsgRoute(pack.Data,pack.ClientId) tcpService.process.UnknownMsgRoute(pack.ClientId,pack.Data)
case TPT_Pack: case TPT_Pack:
tcpService.process.MsgRoute(pack.Data, pack.ClientId) tcpService.process.MsgRoute(pack.ClientId,pack.Data)
} }
} }
@@ -180,6 +181,15 @@ func (slf *Client) GetId() uint64 {
} }
func (slf *Client) Run() { func (slf *Client) Run() {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.SError("core dump info[",errString,"]\n",string(buf[:l]))
}
}()
slf.tcpService.NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp,Data:TcpPack{ClientId:slf.id,Type:TPT_Connected}}) slf.tcpService.NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp,Data:TcpPack{ClientId:slf.id,Type:TPT_Connected}})
for{ for{
if slf.tcpConn == nil { if slf.tcpConn == nil {
@@ -192,7 +202,7 @@ func (slf *Client) Run() {
log.SDebug("read client id ",slf.id," is error:",err.Error()) log.SDebug("read client id ",slf.id," is error:",err.Error())
break break
} }
data,err:=slf.tcpService.process.Unmarshal(bytes) data,err:=slf.tcpService.process.Unmarshal(slf.id,bytes)
if err != nil { if err != nil {
slf.tcpService.NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp,Data:TcpPack{ClientId:slf.id,Type:TPT_UnknownPack,Data:bytes}}) slf.tcpService.NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp,Data:TcpPack{ClientId:slf.id,Type:TPT_UnknownPack,Data:bytes}})
@@ -218,7 +228,7 @@ func (tcpService *TcpService) SendMsg(clientId uint64,msg interface{}) error{
} }
tcpService.mapClientLocker.Unlock() tcpService.mapClientLocker.Unlock()
bytes,err := tcpService.process.Marshal(msg) bytes,err := tcpService.process.Marshal(clientId,msg)
if err != nil { if err != nil {
return err return err
} }
@@ -262,7 +272,7 @@ func (tcpService *TcpService) SendRawMsg(clientId uint64,msg []byte) error{
} }
tcpService.mapClientLocker.Unlock() tcpService.mapClientLocker.Unlock()
client.tcpConn.SetWriteDeadline(tcpService.WriteDeadline) client.tcpConn.SetWriteDeadline(tcpService.WriteDeadline)
return client.tcpConn.WriteMsg(msg) return client.tcpConn.WriteRawMsg(msg)
} }
func (tcpService *TcpService) GetConnNum() int { func (tcpService *TcpService) GetConnNum() int {