From e266c50ed635c5f3ad131289583c3aa92b5376e6 Mon Sep 17 00:00:00 2001 From: boyce <6549168@qq.com> Date: Wed, 25 Sep 2019 15:21:56 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=AE=A1=E9=81=93=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F=EF=BC=8C=E5=8D=B3=E6=9C=AC=E5=9C=B0=E8=B0=83=E7=94=A8?= =?UTF-8?q?=E6=97=B6=EF=BC=8C=E5=8F=91=E7=94=9F=E9=94=99=E8=AF=AF=E4=B8=8D?= =?UTF-8?q?=E6=96=AD=E5=BC=80=E8=BF=9E=E6=8E=A5=E7=8A=B6=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster/cluster.go | 4 +++- rpc/client.go | 17 +++++++++++------ 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index d028e95..f143661 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -7,6 +7,7 @@ import ( "os" "sort" "strings" + "sync" "time" "github.com/duanhf2012/origin/rpc" @@ -33,6 +34,7 @@ type CCluster struct { writer net.Conn LocalRpcClient *rpc.Client + localRpcLocker sync.Mutex innerLocalServiceList map[string]bool } @@ -186,7 +188,7 @@ func (slf *CCluster) ConnService() error { if slf.LocalRpcClient.IsClosed() { slf.ReSetLocalRpcClient() } - time.Sleep(time.Second * 4) + time.Sleep(time.Second * 2) } return nil diff --git a/rpc/client.go b/rpc/client.go index 7865491..4cef3e0 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -51,6 +51,7 @@ type Client struct { pending map[uint64]*Call closing bool // user has called Close shutdown bool // server has told us to stop + bPipe bool } // A ClientCodec implements writing of RPC requests and @@ -151,6 +152,9 @@ func (client *Client) input() { if err != nil { call.Error = errors.New("reading body " + err.Error()) } + if client.bPipe { + err = nil + } call.done() } } @@ -199,18 +203,19 @@ func (call *Call) done() { // so no interlocking is required. However each half may be accessed // concurrently so the implementation of conn should protect against // concurrent reads or concurrent writes. -func NewClient(conn io.ReadWriteCloser) *Client { +func NewClient(conn io.ReadWriteCloser, isPipe bool) *Client { encBuf := bufio.NewWriter(conn) client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf} - return NewClientWithCodec(client) + return NewClientWithCodec(client, isPipe) } // NewClientWithCodec is like NewClient but uses the specified // codec to encode requests and decode responses. -func NewClientWithCodec(codec ClientCodec) *Client { +func NewClientWithCodec(codec ClientCodec, isPipe bool) *Client { client := &Client{ codec: codec, pending: make(map[uint64]*Call), + bPipe: isPipe, } go client.input() return client @@ -269,7 +274,7 @@ func DialHTTPPath(network, address, path string) (*Client, error) { // before switching to RPC protocol. resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"}) if err == nil && resp.Status == connected { - return NewClient(conn), nil + return NewClient(conn, false), nil } if err == nil { err = errors.New("unexpected HTTP response: " + resp.Status) @@ -291,7 +296,7 @@ func Dial(network, address string) (*Client, error) { } tcpconn, _ := conn.(*net.TCPConn) tcpconn.SetNoDelay(true) - return NewClient(conn), nil + return NewClient(conn, false), nil } func DialTimeOut(network, address string, timeout time.Duration) (*Client, error) { @@ -301,7 +306,7 @@ func DialTimeOut(network, address string, timeout time.Duration) (*Client, error } tcpconn, _ := conn.(*net.TCPConn) tcpconn.SetNoDelay(true) - return NewClient(conn), nil + return NewClient(conn, false), nil } // Close calls the underlying codec's Close method. If the connection is already