优化网络模块

This commit is contained in:
duanhf2012
2022-12-27 14:20:18 +08:00
parent 66770f07a5
commit 60064cbba6
6 changed files with 59 additions and 91 deletions

View File

@@ -78,7 +78,6 @@ func (pbRawProcessor *PBRawProcessor) SetRawMsgHandler(handle RawMessageHandler)
func (pbRawProcessor *PBRawProcessor) MakeRawMsg(msgType uint16,msg []byte,pbRawPackInfo *PBRawPackInfo) { func (pbRawProcessor *PBRawProcessor) MakeRawMsg(msgType uint16,msg []byte,pbRawPackInfo *PBRawPackInfo) {
pbRawPackInfo.typ = msgType pbRawPackInfo.typ = msgType
pbRawPackInfo.rawMsg = msg pbRawPackInfo.rawMsg = msg
//return &PBRawPackInfo{typ:msgType,rawMsg:msg}
} }
func (pbRawProcessor *PBRawProcessor) UnknownMsgRoute(clientId uint64,msg interface{}){ func (pbRawProcessor *PBRawProcessor) UnknownMsgRoute(clientId uint64,msg interface{}){

View File

@@ -17,17 +17,11 @@ type IProcessor interface {
} }
type IRawProcessor interface { type IRawProcessor interface {
SetByteOrder(littleEndian bool) IProcessor
MsgRoute(clientId uint64,msg interface{}) error
Unmarshal(clientId uint64,data []byte) (interface{}, error)
Marshal(clientId uint64,msg interface{}) ([]byte, error)
SetByteOrder(littleEndian bool)
SetRawMsgHandler(handle RawMessageHandler) SetRawMsgHandler(handle RawMessageHandler)
MakeRawMsg(msgType uint16,msg []byte,pbRawPackInfo *PBRawPackInfo) MakeRawMsg(msgType uint16,msg []byte,pbRawPackInfo *PBRawPackInfo)
UnknownMsgRoute(clientId uint64,msg interface{})
ConnectedRoute(clientId uint64)
DisConnectedRoute(clientId uint64)
SetUnknownMsgHandler(unknownMessageHandler UnknownRawMessageHandler) SetUnknownMsgHandler(unknownMessageHandler UnknownRawMessageHandler)
SetConnectedHandler(connectHandler RawConnectHandler) SetConnectedHandler(connectHandler RawConnectHandler)
SetDisConnectedHandler(disconnectHandler RawConnectHandler) SetDisConnectedHandler(disconnectHandler RawConnectHandler)

View File

@@ -34,7 +34,6 @@ func (areaPool *memAreaPool) makePool() {
for i := 0; i < poolLen; i++ { for i := 0; i < poolLen; i++ {
memSize := (areaPool.minAreaValue - 1) + (i+1)*areaPool.growthValue memSize := (areaPool.minAreaValue - 1) + (i+1)*areaPool.growthValue
areaPool.pool[i] = sync.Pool{New: func() interface{} { areaPool.pool[i] = sync.Pool{New: func() interface{} {
//fmt.Println("make memsize:",memSize)
return make([]byte, memSize) return make([]byte, memSize)
}} }}
} }

View File

@@ -22,11 +22,7 @@ type TCPClient struct {
closeFlag bool closeFlag bool
// msg parser // msg parser
LenMsgLen int MsgParser
MinMsgLen uint32
MaxMsgLen uint32
LittleEndian bool
msgParser *MsgParser
} }
func (client *TCPClient) Start() { func (client *TCPClient) Start() {
@@ -68,15 +64,21 @@ func (client *TCPClient) init() {
if client.cons != nil { if client.cons != nil {
log.SFatal("client is running") log.SFatal("client is running")
} }
if client.LenMsgLen == 0 {
client.LenMsgLen = Default_LenMsgLen
}
if client.MinMsgLen == 0 {
client.MinMsgLen = Default_MinMsgLen
}
if client.MaxMsgLen == 0 {
client.MaxMsgLen = Default_MaxMsgLen
}
client.cons = make(ConnSet) client.cons = make(ConnSet)
client.closeFlag = false client.closeFlag = false
// msg parser // msg parser
msgParser := NewMsgParser() client.MsgParser.init()
msgParser.SetMsgLen(client.LenMsgLen, client.MinMsgLen, client.MaxMsgLen)
msgParser.SetByteOrder(client.LittleEndian)
client.msgParser = msgParser
} }
func (client *TCPClient) GetCloseFlag() bool{ func (client *TCPClient) GetCloseFlag() bool{
@@ -120,7 +122,7 @@ reconnect:
client.cons[conn] = struct{}{} client.cons[conn] = struct{}{}
client.Unlock() client.Unlock()
tcpConn := newTCPConn(conn, client.PendingWriteNum, client.msgParser,client.WriteDeadline) tcpConn := newTCPConn(conn, client.PendingWriteNum, &client.MsgParser,client.WriteDeadline)
agent := client.NewAgent(tcpConn) agent := client.NewAgent(tcpConn)
agent.Run() agent.Run()

View File

@@ -11,62 +11,42 @@ import (
// | len | data | // | len | data |
// -------------- // --------------
type MsgParser struct { type MsgParser struct {
lenMsgLen int LenMsgLen int
minMsgLen uint32 MinMsgLen uint32
maxMsgLen uint32 MaxMsgLen uint32
littleEndian bool LittleEndian bool
INetMempool INetMempool
} }
func NewMsgParser() *MsgParser {
p := new(MsgParser)
p.lenMsgLen = 2
p.minMsgLen = 1
p.maxMsgLen = 4096
p.littleEndian = false
p.INetMempool = NewMemAreaPool()
return p
}
// It's dangerous to call the method on reading or writing
func (p *MsgParser) SetMsgLen(lenMsgLen int, minMsgLen uint32, maxMsgLen uint32) {
if lenMsgLen == 1 || lenMsgLen == 2 || lenMsgLen == 4 {
p.lenMsgLen = lenMsgLen
}
if minMsgLen != 0 {
p.minMsgLen = minMsgLen
}
if maxMsgLen != 0 {
p.maxMsgLen = maxMsgLen
}
func (p *MsgParser) init(){
var max uint32 var max uint32
switch p.lenMsgLen { switch p.LenMsgLen {
case 1: case 1:
max = math.MaxUint8 max = math.MaxUint8
case 2: case 2:
max = math.MaxUint16 max = math.MaxUint16
case 4: case 4:
max = math.MaxUint32 max = math.MaxUint32
default:
panic("LenMsgLen value must be 1 or 2 or 4")
} }
if p.minMsgLen > max {
p.minMsgLen = max if p.MinMsgLen > max {
p.MinMsgLen = max
} }
if p.maxMsgLen > max {
p.maxMsgLen = max if p.MaxMsgLen > max {
p.MaxMsgLen = max
} }
} }
// It's dangerous to call the method on reading or writing
func (p *MsgParser) SetByteOrder(littleEndian bool) {
p.littleEndian = littleEndian
}
// goroutine safe // goroutine safe
func (p *MsgParser) Read(conn *TCPConn) ([]byte, error) { func (p *MsgParser) Read(conn *TCPConn) ([]byte, error) {
var b [4]byte var b [4]byte
bufMsgLen := b[:p.lenMsgLen] bufMsgLen := b[:p.LenMsgLen]
// read len // read len
if _, err := io.ReadFull(conn, bufMsgLen); err != nil { if _, err := io.ReadFull(conn, bufMsgLen); err != nil {
@@ -75,17 +55,17 @@ func (p *MsgParser) Read(conn *TCPConn) ([]byte, error) {
// parse len // parse len
var msgLen uint32 var msgLen uint32
switch p.lenMsgLen { switch p.LenMsgLen {
case 1: case 1:
msgLen = uint32(bufMsgLen[0]) msgLen = uint32(bufMsgLen[0])
case 2: case 2:
if p.littleEndian { if p.LittleEndian {
msgLen = uint32(binary.LittleEndian.Uint16(bufMsgLen)) msgLen = uint32(binary.LittleEndian.Uint16(bufMsgLen))
} else { } else {
msgLen = uint32(binary.BigEndian.Uint16(bufMsgLen)) msgLen = uint32(binary.BigEndian.Uint16(bufMsgLen))
} }
case 4: case 4:
if p.littleEndian { if p.LittleEndian {
msgLen = binary.LittleEndian.Uint32(bufMsgLen) msgLen = binary.LittleEndian.Uint32(bufMsgLen)
} else { } else {
msgLen = binary.BigEndian.Uint32(bufMsgLen) msgLen = binary.BigEndian.Uint32(bufMsgLen)
@@ -93,9 +73,9 @@ func (p *MsgParser) Read(conn *TCPConn) ([]byte, error) {
} }
// check len // check len
if msgLen > p.maxMsgLen { if msgLen > p.MaxMsgLen {
return nil, errors.New("message too long") return nil, errors.New("message too long")
} else if msgLen < p.minMsgLen { } else if msgLen < p.MinMsgLen {
return nil, errors.New("message too short") return nil, errors.New("message too short")
} }
@@ -118,26 +98,26 @@ func (p *MsgParser) Write(conn *TCPConn, args ...[]byte) error {
} }
// check len // check len
if msgLen > p.maxMsgLen { if msgLen > p.MaxMsgLen {
return errors.New("message too long") return errors.New("message too long")
} else if msgLen < p.minMsgLen { } else if msgLen < p.MinMsgLen {
return errors.New("message too short") return errors.New("message too short")
} }
//msg := make([]byte, uint32(p.lenMsgLen)+msgLen) //msg := make([]byte, uint32(p.lenMsgLen)+msgLen)
msg := p.MakeByteSlice(p.lenMsgLen+int(msgLen)) msg := p.MakeByteSlice(p.LenMsgLen+int(msgLen))
// write len // write len
switch p.lenMsgLen { switch p.LenMsgLen {
case 1: case 1:
msg[0] = byte(msgLen) msg[0] = byte(msgLen)
case 2: case 2:
if p.littleEndian { if p.LittleEndian {
binary.LittleEndian.PutUint16(msg, uint16(msgLen)) binary.LittleEndian.PutUint16(msg, uint16(msgLen))
} else { } else {
binary.BigEndian.PutUint16(msg, uint16(msgLen)) binary.BigEndian.PutUint16(msg, uint16(msgLen))
} }
case 4: case 4:
if p.littleEndian { if p.LittleEndian {
binary.LittleEndian.PutUint32(msg, msgLen) binary.LittleEndian.PutUint32(msg, msgLen)
} else { } else {
binary.BigEndian.PutUint32(msg, msgLen) binary.BigEndian.PutUint32(msg, msgLen)
@@ -145,7 +125,7 @@ func (p *MsgParser) Write(conn *TCPConn, args ...[]byte) error {
} }
// write data // write data
l := p.lenMsgLen l := p.LenMsgLen
for i := 0; i < len(args); i++ { for i := 0; i < len(args); i++ {
copy(msg[l:], args[i]) copy(msg[l:], args[i])
l += len(args[i]) l += len(args[i])

View File

@@ -9,12 +9,12 @@ import (
const Default_ReadDeadline = time.Second*30 //30s const Default_ReadDeadline = time.Second*30 //30s
const Default_WriteDeadline = time.Second*30 //30s const Default_WriteDeadline = time.Second*30 //30s
const Default_MaxConnNum = 3000 const Default_MaxConnNum = 9000
const Default_PendingWriteNum = 10000 const Default_PendingWriteNum = 10000
const Default_LittleEndian = false const Default_LittleEndian = false
const Default_MinMsgLen = 2 const Default_MinMsgLen = 2
const Default_MaxMsgLen = 65535 const Default_MaxMsgLen = 65535
const Default_LenMsgLen = 2
type TCPServer struct { type TCPServer struct {
Addr string Addr string
@@ -22,21 +22,16 @@ type TCPServer struct {
PendingWriteNum int PendingWriteNum int
ReadDeadline time.Duration ReadDeadline time.Duration
WriteDeadline time.Duration WriteDeadline time.Duration
NewAgent func(*TCPConn) Agent NewAgent func(*TCPConn) Agent
ln net.Listener ln net.Listener
conns ConnSet conns ConnSet
mutexConns sync.Mutex mutexConns sync.Mutex
wgLn sync.WaitGroup wgLn sync.WaitGroup
wgConns sync.WaitGroup wgConns sync.WaitGroup
// msg parser // msg parser
LenMsgLen int MsgParser
MinMsgLen uint32
MaxMsgLen uint32
LittleEndian bool
msgParser *MsgParser
netMemPool INetMempool
} }
func (server *TCPServer) Start() { func (server *TCPServer) Start() {
@@ -73,35 +68,33 @@ func (server *TCPServer) init() {
server.WriteDeadline = Default_WriteDeadline server.WriteDeadline = Default_WriteDeadline
log.SRelease("invalid WriteDeadline, reset to ", server.WriteDeadline.Seconds(),"s") log.SRelease("invalid WriteDeadline, reset to ", server.WriteDeadline.Seconds(),"s")
} }
if server.ReadDeadline == 0 { if server.ReadDeadline == 0 {
server.ReadDeadline = Default_ReadDeadline server.ReadDeadline = Default_ReadDeadline
log.SRelease("invalid ReadDeadline, reset to ", server.ReadDeadline.Seconds(),"s") log.SRelease("invalid ReadDeadline, reset to ", server.ReadDeadline.Seconds(),"s")
} }
if server.LenMsgLen == 0 {
server.LenMsgLen = Default_LenMsgLen
}
if server.NewAgent == nil { if server.NewAgent == nil {
log.SFatal("NewAgent must not be nil") log.SFatal("NewAgent must not be nil")
} }
server.ln = ln server.ln = ln
server.conns = make(ConnSet) server.conns = make(ConnSet)
server.INetMempool = NewMemAreaPool()
// msg parser server.MsgParser.init()
msgParser := NewMsgParser()
if msgParser.INetMempool == nil {
msgParser.INetMempool = NewMemAreaPool()
}
msgParser.SetMsgLen(server.LenMsgLen, server.MinMsgLen, server.MaxMsgLen)
msgParser.SetByteOrder(server.LittleEndian)
server.msgParser = msgParser
} }
func (server *TCPServer) SetNetMempool(mempool INetMempool){ func (server *TCPServer) SetNetMempool(mempool INetMempool){
server.msgParser.INetMempool = mempool server.INetMempool = mempool
} }
func (server *TCPServer) GetNetMempool() INetMempool{ func (server *TCPServer) GetNetMempool() INetMempool{
return server.msgParser.INetMempool return server.INetMempool
} }
func (server *TCPServer) run() { func (server *TCPServer) run() {
@@ -127,6 +120,7 @@ func (server *TCPServer) run() {
} }
return return
} }
conn.(*net.TCPConn).SetNoDelay(true) conn.(*net.TCPConn).SetNoDelay(true)
tempDelay = 0 tempDelay = 0
@@ -137,16 +131,16 @@ func (server *TCPServer) run() {
log.SWarning("too many connections") log.SWarning("too many connections")
continue continue
} }
server.conns[conn] = struct{}{} server.conns[conn] = struct{}{}
server.mutexConns.Unlock() server.mutexConns.Unlock()
server.wgConns.Add(1) server.wgConns.Add(1)
tcpConn := newTCPConn(conn, server.PendingWriteNum, server.msgParser,server.WriteDeadline) tcpConn := newTCPConn(conn, server.PendingWriteNum, &server.MsgParser,server.WriteDeadline)
agent := server.NewAgent(tcpConn) agent := server.NewAgent(tcpConn)
go func() { go func() {
agent.Run() agent.Run()
// cleanup // cleanup
tcpConn.Close() tcpConn.Close()
server.mutexConns.Lock() server.mutexConns.Lock()