mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-28 10:00:25 +08:00
优化网络层的GC
This commit is contained in:
@@ -11,4 +11,5 @@ type Conn interface {
|
|||||||
RemoteAddr() net.Addr
|
RemoteAddr() net.Addr
|
||||||
Close()
|
Close()
|
||||||
Destroy()
|
Destroy()
|
||||||
|
ReleaseReadMsg(byteBuff []byte)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package network
|
package network
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"github.com/duanhf2012/origin/log"
|
"github.com/duanhf2012/origin/log"
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -16,26 +17,37 @@ type TCPConn struct {
|
|||||||
msgParser *MsgParser
|
msgParser *MsgParser
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
func freeChannel(conn *TCPConn){
|
||||||
|
for;len(conn.writeChan)>0;{
|
||||||
|
byteBuff := <- conn.writeChan
|
||||||
|
if byteBuff != nil {
|
||||||
|
conn.ReleaseReadMsg(byteBuff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func newTCPConn(conn net.Conn, pendingWriteNum int, msgParser *MsgParser) *TCPConn {
|
func newTCPConn(conn net.Conn, pendingWriteNum int, msgParser *MsgParser) *TCPConn {
|
||||||
tcpConn := new(TCPConn)
|
tcpConn := new(TCPConn)
|
||||||
tcpConn.conn = conn
|
tcpConn.conn = conn
|
||||||
tcpConn.writeChan = make(chan []byte, pendingWriteNum)
|
tcpConn.writeChan = make(chan []byte, pendingWriteNum)
|
||||||
tcpConn.msgParser = msgParser
|
tcpConn.msgParser = msgParser
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for b := range tcpConn.writeChan {
|
for b := range tcpConn.writeChan {
|
||||||
if b == nil {
|
if b == nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := conn.Write(b)
|
_, err := conn.Write(b)
|
||||||
|
releaseByteSlice(b)
|
||||||
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
conn.Close()
|
conn.Close()
|
||||||
tcpConn.Lock()
|
tcpConn.Lock()
|
||||||
|
freeChannel(tcpConn)
|
||||||
tcpConn.closeFlag = true
|
tcpConn.closeFlag = true
|
||||||
tcpConn.Unlock()
|
tcpConn.Unlock()
|
||||||
}()
|
}()
|
||||||
@@ -77,7 +89,8 @@ func (tcpConn *TCPConn) GetRemoteIp() string {
|
|||||||
|
|
||||||
func (tcpConn *TCPConn) doWrite(b []byte) {
|
func (tcpConn *TCPConn) doWrite(b []byte) {
|
||||||
if len(tcpConn.writeChan) == cap(tcpConn.writeChan) {
|
if len(tcpConn.writeChan) == cap(tcpConn.writeChan) {
|
||||||
log.Debug("close conn: channel full")
|
tcpConn.ReleaseReadMsg(b)
|
||||||
|
log.Error("close conn: channel full")
|
||||||
tcpConn.doDestroy()
|
tcpConn.doDestroy()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -90,6 +103,7 @@ func (tcpConn *TCPConn) Write(b []byte) {
|
|||||||
tcpConn.Lock()
|
tcpConn.Lock()
|
||||||
defer tcpConn.Unlock()
|
defer tcpConn.Unlock()
|
||||||
if tcpConn.closeFlag || b == nil {
|
if tcpConn.closeFlag || b == nil {
|
||||||
|
tcpConn.ReleaseReadMsg(b)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -112,7 +126,14 @@ func (tcpConn *TCPConn) ReadMsg() ([]byte, error) {
|
|||||||
return tcpConn.msgParser.Read(tcpConn)
|
return tcpConn.msgParser.Read(tcpConn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (tcpConn *TCPConn) ReleaseReadMsg(byteBuff []byte){
|
||||||
|
releaseByteSlice(byteBuff)
|
||||||
|
}
|
||||||
|
|
||||||
func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error {
|
func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error {
|
||||||
|
if tcpConn.closeFlag == true {
|
||||||
|
return fmt.Errorf("conn is close")
|
||||||
|
}
|
||||||
return tcpConn.msgParser.Write(tcpConn, args...)
|
return tcpConn.msgParser.Write(tcpConn, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -99,8 +99,10 @@ func (p *MsgParser) Read(conn *TCPConn) ([]byte, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// data
|
// data
|
||||||
msgData := make([]byte, msgLen)
|
//msgData := make([]byte, msgLen)
|
||||||
|
msgData := makeByteSlice(int(msgLen))
|
||||||
if _, err := io.ReadFull(conn, msgData); err != nil {
|
if _, err := io.ReadFull(conn, msgData); err != nil {
|
||||||
|
releaseByteSlice(msgData)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -123,8 +125,8 @@ func (p *MsgParser) Write(conn *TCPConn, args ...[]byte) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//msgLen -= 2
|
//msgLen -= 2
|
||||||
msg := make([]byte, uint32(p.lenMsgLen)+msgLen)
|
//msg := make([]byte, uint32(p.lenMsgLen)+msgLen)
|
||||||
|
msg := makeByteSlice(p.lenMsgLen+int(msgLen))
|
||||||
// write len
|
// write len
|
||||||
switch p.lenMsgLen {
|
switch p.lenMsgLen {
|
||||||
case 1:
|
case 1:
|
||||||
|
|||||||
@@ -265,6 +265,7 @@ func (slf *Client) Run(){
|
|||||||
|
|
||||||
processor := GetProcessor(uint8(bytes[0]))
|
processor := GetProcessor(uint8(bytes[0]))
|
||||||
if processor==nil {
|
if processor==nil {
|
||||||
|
slf.conn.ReleaseReadMsg(bytes)
|
||||||
log.Error("rpcClient %s ReadMsg head error:%+v",slf.Addr,err)
|
log.Error("rpcClient %s ReadMsg head error:%+v",slf.Addr,err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -274,6 +275,7 @@ func (slf *Client) Run(){
|
|||||||
respone.RpcResponeData =processor.MakeRpcResponse(0,nil,nil)
|
respone.RpcResponeData =processor.MakeRpcResponse(0,nil,nil)
|
||||||
|
|
||||||
err = processor.Unmarshal(bytes[1:],respone.RpcResponeData)
|
err = processor.Unmarshal(bytes[1:],respone.RpcResponeData)
|
||||||
|
slf.conn.ReleaseReadMsg(bytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
processor.ReleaseRpcRespose(respone.RpcResponeData)
|
processor.ReleaseRpcRespose(respone.RpcResponeData)
|
||||||
log.Error("rpcClient Unmarshal head error,error:%+v",err)
|
log.Error("rpcClient Unmarshal head error,error:%+v",err)
|
||||||
|
|||||||
@@ -125,6 +125,7 @@ func (agent *RpcAgent) Run() {
|
|||||||
}
|
}
|
||||||
processor := GetProcessor(uint8(data[0]))
|
processor := GetProcessor(uint8(data[0]))
|
||||||
if processor==nil {
|
if processor==nil {
|
||||||
|
agent.conn.ReleaseReadMsg(data)
|
||||||
log.Error("remote rpc %s data head error:%+v",agent.conn.RemoteAddr(),err)
|
log.Error("remote rpc %s data head error:%+v",agent.conn.RemoteAddr(),err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -134,6 +135,7 @@ func (agent *RpcAgent) Run() {
|
|||||||
req.rpcProcessor = processor
|
req.rpcProcessor = processor
|
||||||
req.RpcRequestData = processor.MakeRpcRequest(0,"",false,nil,nil)
|
req.RpcRequestData = processor.MakeRpcRequest(0,"",false,nil,nil)
|
||||||
err = processor.Unmarshal(data[1:],req.RpcRequestData)
|
err = processor.Unmarshal(data[1:],req.RpcRequestData)
|
||||||
|
agent.conn.ReleaseReadMsg(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("rpc Unmarshal request is error: %v", err)
|
log.Error("rpc Unmarshal request is error: %v", err)
|
||||||
if req.RpcRequestData.GetSeq()>0 {
|
if req.RpcRequestData.GetSeq()>0 {
|
||||||
|
|||||||
@@ -172,6 +172,7 @@ func (slf *Client) Run() {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
data,err:=slf.tcpService.process.Unmarshal(bytes)
|
data,err:=slf.tcpService.process.Unmarshal(bytes)
|
||||||
|
slf.tcpConn.ReleaseReadMsg(bytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slf.tcpService.NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp,Data:&TcpPack{ClientId:slf.id,Type:TPT_UnknownPack,Data:bytes,MsgProcessor:slf.tcpService.process}})
|
slf.tcpService.NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp,Data:&TcpPack{ClientId:slf.id,Type:TPT_UnknownPack,Data:bytes,MsgProcessor:slf.tcpService.process}})
|
||||||
continue
|
continue
|
||||||
|
|||||||
Reference in New Issue
Block a user