diff --git a/network/processor/pbrawprocessor.go b/network/processor/pbrawprocessor.go index 77476a5..bb12508 100644 --- a/network/processor/pbrawprocessor.go +++ b/network/processor/pbrawprocessor.go @@ -78,7 +78,6 @@ func (pbRawProcessor *PBRawProcessor) SetRawMsgHandler(handle RawMessageHandler) func (pbRawProcessor *PBRawProcessor) MakeRawMsg(msgType uint16,msg []byte,pbRawPackInfo *PBRawPackInfo) { pbRawPackInfo.typ = msgType pbRawPackInfo.rawMsg = msg - //return &PBRawPackInfo{typ:msgType,rawMsg:msg} } func (pbRawProcessor *PBRawProcessor) UnknownMsgRoute(clientId uint64,msg interface{}){ diff --git a/network/processor/processor.go b/network/processor/processor.go index 581a3d1..19806f8 100644 --- a/network/processor/processor.go +++ b/network/processor/processor.go @@ -17,17 +17,11 @@ type IProcessor interface { } type IRawProcessor interface { - SetByteOrder(littleEndian bool) - MsgRoute(clientId uint64,msg interface{}) error - Unmarshal(clientId uint64,data []byte) (interface{}, error) - Marshal(clientId uint64,msg interface{}) ([]byte, error) + IProcessor + SetByteOrder(littleEndian bool) SetRawMsgHandler(handle RawMessageHandler) MakeRawMsg(msgType uint16,msg []byte,pbRawPackInfo *PBRawPackInfo) - UnknownMsgRoute(clientId uint64,msg interface{}) - ConnectedRoute(clientId uint64) - DisConnectedRoute(clientId uint64) - SetUnknownMsgHandler(unknownMessageHandler UnknownRawMessageHandler) SetConnectedHandler(connectHandler RawConnectHandler) SetDisConnectedHandler(disconnectHandler RawConnectHandler) diff --git a/network/slicepool.go b/network/slicepool.go index 9de5946..13e22fe 100644 --- a/network/slicepool.go +++ b/network/slicepool.go @@ -34,7 +34,6 @@ func (areaPool *memAreaPool) makePool() { for i := 0; i < poolLen; i++ { memSize := (areaPool.minAreaValue - 1) + (i+1)*areaPool.growthValue areaPool.pool[i] = sync.Pool{New: func() interface{} { - //fmt.Println("make memsize:",memSize) return make([]byte, memSize) }} } diff --git a/network/tcp_client.go b/network/tcp_client.go index 93271ed..94e8624 100644 --- a/network/tcp_client.go +++ b/network/tcp_client.go @@ -22,11 +22,7 @@ type TCPClient struct { closeFlag bool // msg parser - LenMsgLen int - MinMsgLen uint32 - MaxMsgLen uint32 - LittleEndian bool - msgParser *MsgParser + MsgParser } func (client *TCPClient) Start() { @@ -68,15 +64,21 @@ func (client *TCPClient) init() { if client.cons != nil { log.SFatal("client is running") } + if client.LenMsgLen == 0 { + client.LenMsgLen = Default_LenMsgLen + } + if client.MinMsgLen == 0 { + client.MinMsgLen = Default_MinMsgLen + } + if client.MaxMsgLen == 0 { + client.MaxMsgLen = Default_MaxMsgLen + } client.cons = make(ConnSet) client.closeFlag = false // msg parser - msgParser := NewMsgParser() - msgParser.SetMsgLen(client.LenMsgLen, client.MinMsgLen, client.MaxMsgLen) - msgParser.SetByteOrder(client.LittleEndian) - client.msgParser = msgParser + client.MsgParser.init() } func (client *TCPClient) GetCloseFlag() bool{ @@ -120,7 +122,7 @@ reconnect: client.cons[conn] = struct{}{} client.Unlock() - tcpConn := newTCPConn(conn, client.PendingWriteNum, client.msgParser,client.WriteDeadline) + tcpConn := newTCPConn(conn, client.PendingWriteNum, &client.MsgParser,client.WriteDeadline) agent := client.NewAgent(tcpConn) agent.Run() diff --git a/network/tcp_msg.go b/network/tcp_msg.go index 8542fcf..4f128fd 100644 --- a/network/tcp_msg.go +++ b/network/tcp_msg.go @@ -11,62 +11,42 @@ import ( // | len | data | // -------------- type MsgParser struct { - lenMsgLen int - minMsgLen uint32 - maxMsgLen uint32 - littleEndian bool + LenMsgLen int + MinMsgLen uint32 + MaxMsgLen uint32 + LittleEndian bool INetMempool } -func NewMsgParser() *MsgParser { - p := new(MsgParser) - p.lenMsgLen = 2 - p.minMsgLen = 1 - p.maxMsgLen = 4096 - p.littleEndian = false - p.INetMempool = NewMemAreaPool() - return p -} - -// It's dangerous to call the method on reading or writing -func (p *MsgParser) SetMsgLen(lenMsgLen int, minMsgLen uint32, maxMsgLen uint32) { - if lenMsgLen == 1 || lenMsgLen == 2 || lenMsgLen == 4 { - p.lenMsgLen = lenMsgLen - } - if minMsgLen != 0 { - p.minMsgLen = minMsgLen - } - if maxMsgLen != 0 { - p.maxMsgLen = maxMsgLen - } +func (p *MsgParser) init(){ var max uint32 - switch p.lenMsgLen { + switch p.LenMsgLen { case 1: max = math.MaxUint8 case 2: max = math.MaxUint16 case 4: max = math.MaxUint32 + default: + panic("LenMsgLen value must be 1 or 2 or 4") } - if p.minMsgLen > max { - p.minMsgLen = max + + if p.MinMsgLen > max { + p.MinMsgLen = max } - if p.maxMsgLen > max { - p.maxMsgLen = max + + if p.MaxMsgLen > max { + p.MaxMsgLen = max } } -// It's dangerous to call the method on reading or writing -func (p *MsgParser) SetByteOrder(littleEndian bool) { - p.littleEndian = littleEndian -} // goroutine safe func (p *MsgParser) Read(conn *TCPConn) ([]byte, error) { var b [4]byte - bufMsgLen := b[:p.lenMsgLen] + bufMsgLen := b[:p.LenMsgLen] // read len if _, err := io.ReadFull(conn, bufMsgLen); err != nil { @@ -75,17 +55,17 @@ func (p *MsgParser) Read(conn *TCPConn) ([]byte, error) { // parse len var msgLen uint32 - switch p.lenMsgLen { + switch p.LenMsgLen { case 1: msgLen = uint32(bufMsgLen[0]) case 2: - if p.littleEndian { + if p.LittleEndian { msgLen = uint32(binary.LittleEndian.Uint16(bufMsgLen)) } else { msgLen = uint32(binary.BigEndian.Uint16(bufMsgLen)) } case 4: - if p.littleEndian { + if p.LittleEndian { msgLen = binary.LittleEndian.Uint32(bufMsgLen) } else { msgLen = binary.BigEndian.Uint32(bufMsgLen) @@ -93,9 +73,9 @@ func (p *MsgParser) Read(conn *TCPConn) ([]byte, error) { } // check len - if msgLen > p.maxMsgLen { + if msgLen > p.MaxMsgLen { return nil, errors.New("message too long") - } else if msgLen < p.minMsgLen { + } else if msgLen < p.MinMsgLen { return nil, errors.New("message too short") } @@ -118,26 +98,26 @@ func (p *MsgParser) Write(conn *TCPConn, args ...[]byte) error { } // check len - if msgLen > p.maxMsgLen { + if msgLen > p.MaxMsgLen { return errors.New("message too long") - } else if msgLen < p.minMsgLen { + } else if msgLen < p.MinMsgLen { return errors.New("message too short") } //msg := make([]byte, uint32(p.lenMsgLen)+msgLen) - msg := p.MakeByteSlice(p.lenMsgLen+int(msgLen)) + msg := p.MakeByteSlice(p.LenMsgLen+int(msgLen)) // write len - switch p.lenMsgLen { + switch p.LenMsgLen { case 1: msg[0] = byte(msgLen) case 2: - if p.littleEndian { + if p.LittleEndian { binary.LittleEndian.PutUint16(msg, uint16(msgLen)) } else { binary.BigEndian.PutUint16(msg, uint16(msgLen)) } case 4: - if p.littleEndian { + if p.LittleEndian { binary.LittleEndian.PutUint32(msg, msgLen) } else { binary.BigEndian.PutUint32(msg, msgLen) @@ -145,7 +125,7 @@ func (p *MsgParser) Write(conn *TCPConn, args ...[]byte) error { } // write data - l := p.lenMsgLen + l := p.LenMsgLen for i := 0; i < len(args); i++ { copy(msg[l:], args[i]) l += len(args[i]) diff --git a/network/tcp_server.go b/network/tcp_server.go index 4cf479f..9a66263 100644 --- a/network/tcp_server.go +++ b/network/tcp_server.go @@ -9,12 +9,12 @@ import ( const Default_ReadDeadline = time.Second*30 //30s const Default_WriteDeadline = time.Second*30 //30s -const Default_MaxConnNum = 3000 +const Default_MaxConnNum = 9000 const Default_PendingWriteNum = 10000 const Default_LittleEndian = false const Default_MinMsgLen = 2 const Default_MaxMsgLen = 65535 - +const Default_LenMsgLen = 2 type TCPServer struct { Addr string @@ -22,21 +22,16 @@ type TCPServer struct { PendingWriteNum int ReadDeadline time.Duration WriteDeadline time.Duration + NewAgent func(*TCPConn) Agent ln net.Listener conns ConnSet mutexConns sync.Mutex wgLn sync.WaitGroup wgConns sync.WaitGroup - - + // msg parser - LenMsgLen int - MinMsgLen uint32 - MaxMsgLen uint32 - LittleEndian bool - msgParser *MsgParser - netMemPool INetMempool + MsgParser } func (server *TCPServer) Start() { @@ -73,35 +68,33 @@ func (server *TCPServer) init() { server.WriteDeadline = Default_WriteDeadline log.SRelease("invalid WriteDeadline, reset to ", server.WriteDeadline.Seconds(),"s") } + if server.ReadDeadline == 0 { server.ReadDeadline = Default_ReadDeadline log.SRelease("invalid ReadDeadline, reset to ", server.ReadDeadline.Seconds(),"s") } + if server.LenMsgLen == 0 { + server.LenMsgLen = Default_LenMsgLen + } + if server.NewAgent == nil { log.SFatal("NewAgent must not be nil") } server.ln = ln server.conns = make(ConnSet) + server.INetMempool = NewMemAreaPool() - // msg parser - msgParser := NewMsgParser() - if msgParser.INetMempool == nil { - msgParser.INetMempool = NewMemAreaPool() - } - - msgParser.SetMsgLen(server.LenMsgLen, server.MinMsgLen, server.MaxMsgLen) - msgParser.SetByteOrder(server.LittleEndian) - server.msgParser = msgParser + server.MsgParser.init() } func (server *TCPServer) SetNetMempool(mempool INetMempool){ - server.msgParser.INetMempool = mempool + server.INetMempool = mempool } func (server *TCPServer) GetNetMempool() INetMempool{ - return server.msgParser.INetMempool + return server.INetMempool } func (server *TCPServer) run() { @@ -127,6 +120,7 @@ func (server *TCPServer) run() { } return } + conn.(*net.TCPConn).SetNoDelay(true) tempDelay = 0 @@ -137,16 +131,16 @@ func (server *TCPServer) run() { log.SWarning("too many connections") continue } + server.conns[conn] = struct{}{} server.mutexConns.Unlock() - server.wgConns.Add(1) - tcpConn := newTCPConn(conn, server.PendingWriteNum, server.msgParser,server.WriteDeadline) + tcpConn := newTCPConn(conn, server.PendingWriteNum, &server.MsgParser,server.WriteDeadline) agent := server.NewAgent(tcpConn) + go func() { agent.Run() - // cleanup tcpConn.Close() server.mutexConns.Lock()