package network import ( "github.com/duanhf2012/origin/log" "net" "sync" ) type ConnSet map[net.Conn]struct{} type TCPConn struct { sync.Mutex conn net.Conn writeChan chan []byte closeFlag bool msgParser *MsgParser } func newTCPConn(conn net.Conn, pendingWriteNum int, msgParser *MsgParser) *TCPConn { tcpConn := new(TCPConn) tcpConn.conn = conn tcpConn.writeChan = make(chan []byte, pendingWriteNum) tcpConn.msgParser = msgParser go func() { for b := range tcpConn.writeChan { if b == nil { break } _, err := conn.Write(b) if err != nil { break } } conn.Close() tcpConn.Lock() tcpConn.closeFlag = true tcpConn.Unlock() }() return tcpConn } func (tcpConn *TCPConn) doDestroy() { tcpConn.conn.(*net.TCPConn).SetLinger(0) tcpConn.conn.Close() if !tcpConn.closeFlag { close(tcpConn.writeChan) tcpConn.closeFlag = true } } func (tcpConn *TCPConn) Destroy() { tcpConn.Lock() defer tcpConn.Unlock() tcpConn.doDestroy() } func (tcpConn *TCPConn) Close() { tcpConn.Lock() defer tcpConn.Unlock() if tcpConn.closeFlag { return } tcpConn.doWrite(nil) tcpConn.closeFlag = true } func (tcpConn *TCPConn) doWrite(b []byte) { if len(tcpConn.writeChan) == cap(tcpConn.writeChan) { log.Debug("close conn: channel full") tcpConn.doDestroy() return } tcpConn.writeChan <- b } // b must not be modified by the others goroutines func (tcpConn *TCPConn) Write(b []byte) { tcpConn.Lock() defer tcpConn.Unlock() if tcpConn.closeFlag || b == nil { return } tcpConn.doWrite(b) } func (tcpConn *TCPConn) Read(b []byte) (int, error) { return tcpConn.conn.Read(b) } func (tcpConn *TCPConn) LocalAddr() net.Addr { return tcpConn.conn.LocalAddr() } func (tcpConn *TCPConn) RemoteAddr() net.Addr { return tcpConn.conn.RemoteAddr() } func (tcpConn *TCPConn) ReadMsg() ([]byte, error) { return tcpConn.msgParser.Read(tcpConn) } func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error { return tcpConn.msgParser.Write(tcpConn, args...) } func (tcpConn *TCPConn) IsConnected() bool { return tcpConn.closeFlag == false }