From 64fb9368bf93802ca43260ceaaf2946dc717930d Mon Sep 17 00:00:00 2001 From: orgin Date: Thu, 13 Oct 2022 10:39:29 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=BD=91=E7=BB=9C=E5=BA=95?= =?UTF-8?q?=E5=B1=82=E5=86=99=E8=B6=85=E6=97=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- network/tcp_client.go | 14 +++++++++-- network/tcp_conn.go | 4 ++- network/tcp_server.go | 38 ++++++++++++++++++++++++++--- rpc/server.go | 3 ++- sysservice/tcpservice/tcpservice.go | 28 +++------------------ 5 files changed, 55 insertions(+), 32 deletions(-) diff --git a/network/tcp_client.go b/network/tcp_client.go index 212976f..c74d8ce 100644 --- a/network/tcp_client.go +++ b/network/tcp_client.go @@ -13,6 +13,8 @@ type TCPClient struct { ConnNum int ConnectInterval time.Duration PendingWriteNum int + ReadDeadline time.Duration + WriteDeadline time.Duration AutoReconnect bool NewAgent func(*TCPConn) Agent cons ConnSet @@ -52,6 +54,14 @@ func (client *TCPClient) init() { client.PendingWriteNum = 1000 log.SRelease("invalid PendingWriteNum, reset to ", client.PendingWriteNum) } + if client.ReadDeadline == 0 { + client.ReadDeadline = 15*time.Second + log.SRelease("invalid ReadDeadline, reset to ", client.ReadDeadline,"s") + } + if client.WriteDeadline == 0 { + client.WriteDeadline = 15*time.Second + log.SRelease("invalid WriteDeadline, reset to ", client.WriteDeadline,"s") + } if client.NewAgent == nil { log.SFatal("NewAgent must not be nil") } @@ -93,7 +103,7 @@ reconnect: if conn == nil { return } - + client.Lock() if client.closeFlag { client.Unlock() @@ -103,7 +113,7 @@ reconnect: client.cons[conn] = struct{}{} client.Unlock() - tcpConn := newTCPConn(conn, client.PendingWriteNum, client.msgParser) + tcpConn := newTCPConn(conn, client.PendingWriteNum, client.msgParser,client.WriteDeadline) agent := client.NewAgent(tcpConn) agent.Run() diff --git a/network/tcp_conn.go b/network/tcp_conn.go index 4f24f3f..0ebb9b7 100644 --- a/network/tcp_conn.go +++ b/network/tcp_conn.go @@ -27,7 +27,7 @@ func freeChannel(conn *TCPConn){ } } -func newTCPConn(conn net.Conn, pendingWriteNum int, msgParser *MsgParser) *TCPConn { +func newTCPConn(conn net.Conn, pendingWriteNum int, msgParser *MsgParser,writeDeadline time.Duration) *TCPConn { tcpConn := new(TCPConn) tcpConn.conn = conn tcpConn.writeChan = make(chan []byte, pendingWriteNum) @@ -37,6 +37,8 @@ func newTCPConn(conn net.Conn, pendingWriteNum int, msgParser *MsgParser) *TCPCo if b == nil { break } + + conn.SetWriteDeadline(time.Now().Add(writeDeadline)) _, err := conn.Write(b) tcpConn.msgParser.ReleaseByteSlice(b) diff --git a/network/tcp_server.go b/network/tcp_server.go index 834d1a4..c12a570 100644 --- a/network/tcp_server.go +++ b/network/tcp_server.go @@ -7,10 +7,21 @@ import ( "time" ) +const Default_ReadDeadline = 30 //30s +const Default_WriteDeadline = 30 //30s +const Default_MaxConnNum = 3000 +const Default_PendingWriteNum = 10000 +const Default_LittleEndian = false +const Default_MinMsgLen = 2 +const Default_MaxMsgLen = 65535 + + type TCPServer struct { Addr string MaxConnNum int PendingWriteNum int + ReadDeadline time.Duration + WriteDeadline time.Duration NewAgent func(*TCPConn) Agent ln net.Listener conns ConnSet @@ -18,6 +29,7 @@ type TCPServer struct { wgLn sync.WaitGroup wgConns sync.WaitGroup + // msg parser LenMsgLen int MinMsgLen uint32 @@ -39,13 +51,33 @@ func (server *TCPServer) init() { } if server.MaxConnNum <= 0 { - server.MaxConnNum = 100 + server.MaxConnNum = Default_MaxConnNum log.SRelease("invalid MaxConnNum, reset to ", server.MaxConnNum) } if server.PendingWriteNum <= 0 { - server.PendingWriteNum = 100 + server.PendingWriteNum = Default_PendingWriteNum log.SRelease("invalid PendingWriteNum, reset to ", server.PendingWriteNum) } + + if server.MinMsgLen <= 0 { + server.MinMsgLen = Default_MinMsgLen + log.SRelease("invalid MinMsgLen, reset to ", server.MinMsgLen) + } + + if server.MaxMsgLen <= 0 { + server.MaxMsgLen = Default_MaxMsgLen + log.SRelease("invalid MaxMsgLen, reset to ", server.MaxMsgLen) + } + + if server.WriteDeadline == 0 { + server.WriteDeadline = time.Second*Default_WriteDeadline + log.SRelease("invalid WriteDeadline, reset to ", server.WriteDeadline.Seconds(),"s") + } + if server.ReadDeadline == 0 { + server.ReadDeadline = time.Second*Default_ReadDeadline + log.SRelease("invalid ReadDeadline, reset to ", server.ReadDeadline.Seconds(),"s") + } + if server.NewAgent == nil { log.SFatal("NewAgent must not be nil") } @@ -110,7 +142,7 @@ func (server *TCPServer) run() { server.wgConns.Add(1) - tcpConn := newTCPConn(conn, server.PendingWriteNum, server.msgParser) + tcpConn := newTCPConn(conn, server.PendingWriteNum, server.msgParser,server.WriteDeadline) agent := server.NewAgent(tcpConn) go func() { agent.Run() diff --git a/rpc/server.go b/rpc/server.go index df56baa..8c89a74 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -62,6 +62,7 @@ func (server *Server) Init(rpcHandleFinder RpcHandleFinder) { server.rpcServer = &network.TCPServer{} } + func (server *Server) Start(listenAddr string, maxRpcParamLen uint32) { splitAddr := strings.Split(listenAddr, ":") if len(splitAddr) != 2 { @@ -77,7 +78,7 @@ func (server *Server) Start(listenAddr string, maxRpcParamLen uint32) { server.rpcServer.MaxMsgLen = math.MaxUint32 } - server.rpcServer.MaxConnNum = 10000 + server.rpcServer.MaxConnNum = 100000 server.rpcServer.PendingWriteNum = 2000000 server.rpcServer.NewAgent = server.NewAgent server.rpcServer.LittleEndian = LittleEndian diff --git a/sysservice/tcpservice/tcpservice.go b/sysservice/tcpservice/tcpservice.go index d6aee8a..3678204 100644 --- a/sysservice/tcpservice/tcpservice.go +++ b/sysservice/tcpservice/tcpservice.go @@ -21,9 +21,6 @@ type TcpService struct { mapClientLocker sync.RWMutex mapClient map[uint64] *Client process processor.IProcessor - - ReadDeadline time.Duration - WriteDeadline time.Duration } type TcpPackType int8 @@ -34,14 +31,6 @@ const( TPT_UnknownPack TcpPackType = 3 ) -const Default_MaxConnNum = 3000 -const Default_PendingWriteNum = 10000 -const Default_LittleEndian = false -const Default_MinMsgLen = 2 -const Default_MaxMsgLen = 65535 -const Default_ReadDeadline = 180 //30s -const Default_WriteDeadline = 180 //30s - const ( MaxNodeId = 1<<14 - 1 //最大值 16383 MaxSeed = 1<<19 - 1 //最大值 524287 @@ -89,14 +78,6 @@ func (tcpService *TcpService) OnInit() error{ } tcpService.tcpServer.Addr = addr.(string) - tcpService.tcpServer.MaxConnNum = Default_MaxConnNum - tcpService.tcpServer.PendingWriteNum = Default_PendingWriteNum - tcpService.tcpServer.LittleEndian = Default_LittleEndian - tcpService.tcpServer.MinMsgLen = Default_MinMsgLen - tcpService.tcpServer.MaxMsgLen = Default_MaxMsgLen - tcpService.ReadDeadline = Default_ReadDeadline - tcpService.WriteDeadline = Default_WriteDeadline - MaxConnNum,ok := tcpCfg["MaxConnNum"] if ok == true { tcpService.tcpServer.MaxConnNum = int(MaxConnNum.(float64)) @@ -120,12 +101,12 @@ func (tcpService *TcpService) OnInit() error{ readDeadline,ok := tcpCfg["ReadDeadline"] if ok == true { - tcpService.ReadDeadline = time.Second*time.Duration(readDeadline.(float64)) + tcpService.tcpServer.ReadDeadline = time.Second*time.Duration(readDeadline.(float64)) } writeDeadline,ok := tcpCfg["WriteDeadline"] if ok == true { - tcpService.WriteDeadline = time.Second*time.Duration(writeDeadline.(float64)) + tcpService.tcpServer.WriteDeadline = time.Second*time.Duration(writeDeadline.(float64)) } tcpService.mapClient = make( map[uint64] *Client, tcpService.tcpServer.MaxConnNum) @@ -195,7 +176,7 @@ func (slf *Client) Run() { break } - slf.tcpConn.SetReadDeadline(slf.tcpService.ReadDeadline) + slf.tcpConn.SetReadDeadline(slf.tcpService.tcpServer.ReadDeadline) bytes,err := slf.tcpConn.ReadMsg() if err != nil { log.SDebug("read client id ",slf.id," is error:",err.Error()) @@ -231,7 +212,6 @@ func (tcpService *TcpService) SendMsg(clientId uint64,msg interface{}) error{ if err != nil { return err } - client.tcpConn.SetWriteDeadline(tcpService.WriteDeadline) return client.tcpConn.WriteMsg(bytes) } @@ -271,7 +251,6 @@ func (tcpService *TcpService) SendRawMsg(clientId uint64,msg []byte) error{ return fmt.Errorf("client %d is disconnect!",clientId) } tcpService.mapClientLocker.Unlock() - client.tcpConn.SetWriteDeadline(tcpService.WriteDeadline) return client.tcpConn.WriteMsg(msg) } @@ -283,7 +262,6 @@ func (tcpService *TcpService) SendRawData(clientId uint64,data []byte) error{ return fmt.Errorf("client %d is disconnect!",clientId) } tcpService.mapClientLocker.Unlock() - client.tcpConn.SetWriteDeadline(tcpService.WriteDeadline) return client.tcpConn.WriteRawMsg(data) }