From 5e30083ce8ab00e71b508d0211c998bf3c76e10d Mon Sep 17 00:00:00 2001 From: boyce Date: Thu, 29 Oct 2020 19:40:44 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=BD=91=E7=BB=9C=E5=B1=82?= =?UTF-8?q?=E7=9A=84GC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- network/conn.go | 1 + network/tcp_conn.go | 29 +++++++++++++++++++++++++---- network/tcp_msg.go | 8 +++++--- rpc/client.go | 2 ++ rpc/server.go | 2 ++ sysservice/tcpservice/tcpservice.go | 1 + 6 files changed, 36 insertions(+), 7 deletions(-) diff --git a/network/conn.go b/network/conn.go index 9f7a2c5..1f7aaf6 100644 --- a/network/conn.go +++ b/network/conn.go @@ -11,4 +11,5 @@ type Conn interface { RemoteAddr() net.Addr Close() Destroy() + ReleaseReadMsg(byteBuff []byte) } diff --git a/network/tcp_conn.go b/network/tcp_conn.go index 668bf92..90da047 100644 --- a/network/tcp_conn.go +++ b/network/tcp_conn.go @@ -1,6 +1,7 @@ package network import ( + "fmt" "github.com/duanhf2012/origin/log" "net" "sync" @@ -16,26 +17,37 @@ type TCPConn struct { msgParser *MsgParser } + +func freeChannel(conn *TCPConn){ + for;len(conn.writeChan)>0;{ + byteBuff := <- conn.writeChan + if byteBuff != nil { + conn.ReleaseReadMsg(byteBuff) + } + } +} + func newTCPConn(conn net.Conn, pendingWriteNum int, msgParser *MsgParser) *TCPConn { tcpConn := new(TCPConn) tcpConn.conn = conn tcpConn.writeChan = make(chan []byte, pendingWriteNum) tcpConn.msgParser = msgParser - go func() { for b := range tcpConn.writeChan { if b == nil { break } - _, err := conn.Write(b) + releaseByteSlice(b) + + if err != nil { break } } - conn.Close() tcpConn.Lock() + freeChannel(tcpConn) tcpConn.closeFlag = true tcpConn.Unlock() }() @@ -77,7 +89,8 @@ func (tcpConn *TCPConn) GetRemoteIp() string { func (tcpConn *TCPConn) doWrite(b []byte) { if len(tcpConn.writeChan) == cap(tcpConn.writeChan) { - log.Debug("close conn: channel full") + tcpConn.ReleaseReadMsg(b) + log.Error("close conn: channel full") tcpConn.doDestroy() return } @@ -90,6 +103,7 @@ func (tcpConn *TCPConn) Write(b []byte) { tcpConn.Lock() defer tcpConn.Unlock() if tcpConn.closeFlag || b == nil { + tcpConn.ReleaseReadMsg(b) return } @@ -112,7 +126,14 @@ func (tcpConn *TCPConn) ReadMsg() ([]byte, error) { return tcpConn.msgParser.Read(tcpConn) } +func (tcpConn *TCPConn) ReleaseReadMsg(byteBuff []byte){ + releaseByteSlice(byteBuff) +} + func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error { + if tcpConn.closeFlag == true { + return fmt.Errorf("conn is close") + } return tcpConn.msgParser.Write(tcpConn, args...) } diff --git a/network/tcp_msg.go b/network/tcp_msg.go index 811837e..c153735 100644 --- a/network/tcp_msg.go +++ b/network/tcp_msg.go @@ -99,8 +99,10 @@ func (p *MsgParser) Read(conn *TCPConn) ([]byte, error) { } // data - msgData := make([]byte, msgLen) + //msgData := make([]byte, msgLen) + msgData := makeByteSlice(int(msgLen)) if _, err := io.ReadFull(conn, msgData); err != nil { + releaseByteSlice(msgData) return nil, err } @@ -123,8 +125,8 @@ func (p *MsgParser) Write(conn *TCPConn, args ...[]byte) error { } //msgLen -= 2 - msg := make([]byte, uint32(p.lenMsgLen)+msgLen) - + //msg := make([]byte, uint32(p.lenMsgLen)+msgLen) + msg := makeByteSlice(p.lenMsgLen+int(msgLen)) // write len switch p.lenMsgLen { case 1: diff --git a/rpc/client.go b/rpc/client.go index 9a90c92..54f4319 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -265,6 +265,7 @@ func (slf *Client) Run(){ processor := GetProcessor(uint8(bytes[0])) if processor==nil { + slf.conn.ReleaseReadMsg(bytes) log.Error("rpcClient %s ReadMsg head error:%+v",slf.Addr,err) return } @@ -274,6 +275,7 @@ func (slf *Client) Run(){ respone.RpcResponeData =processor.MakeRpcResponse(0,nil,nil) err = processor.Unmarshal(bytes[1:],respone.RpcResponeData) + slf.conn.ReleaseReadMsg(bytes) if err != nil { processor.ReleaseRpcRespose(respone.RpcResponeData) log.Error("rpcClient Unmarshal head error,error:%+v",err) diff --git a/rpc/server.go b/rpc/server.go index 0f834dd..5d649c1 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -125,6 +125,7 @@ func (agent *RpcAgent) Run() { } processor := GetProcessor(uint8(data[0])) if processor==nil { + agent.conn.ReleaseReadMsg(data) log.Error("remote rpc %s data head error:%+v",agent.conn.RemoteAddr(),err) return } @@ -134,6 +135,7 @@ func (agent *RpcAgent) Run() { req.rpcProcessor = processor req.RpcRequestData = processor.MakeRpcRequest(0,"",false,nil,nil) err = processor.Unmarshal(data[1:],req.RpcRequestData) + agent.conn.ReleaseReadMsg(data) if err != nil { log.Error("rpc Unmarshal request is error: %v", err) if req.RpcRequestData.GetSeq()>0 { diff --git a/sysservice/tcpservice/tcpservice.go b/sysservice/tcpservice/tcpservice.go index f5ebfe9..7caf5ac 100644 --- a/sysservice/tcpservice/tcpservice.go +++ b/sysservice/tcpservice/tcpservice.go @@ -172,6 +172,7 @@ func (slf *Client) Run() { break } data,err:=slf.tcpService.process.Unmarshal(bytes) + slf.tcpConn.ReleaseReadMsg(bytes) if err != nil { slf.tcpService.NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp,Data:&TcpPack{ClientId:slf.id,Type:TPT_UnknownPack,Data:bytes,MsgProcessor:slf.tcpService.process}}) continue