回滚版本

This commit is contained in:
boyce
2019-10-29 14:33:00 +08:00
parent d59f42fb59
commit 37bc5f2256

View File

@@ -51,6 +51,7 @@ type Client struct {
pending map[uint64]*Call pending map[uint64]*Call
closing bool // user has called Close closing bool // user has called Close
shutdown bool // server has told us to stop shutdown bool // server has told us to stop
bPipe bool
} }
// A ClientCodec implements writing of RPC requests and // A ClientCodec implements writing of RPC requests and
@@ -151,6 +152,9 @@ func (client *Client) input() {
if err != nil { if err != nil {
call.Error = errors.New("reading body " + err.Error()) call.Error = errors.New("reading body " + err.Error())
} }
if client.bPipe {
err = nil
}
call.done() call.done()
} }
} }
@@ -199,18 +203,19 @@ func (call *Call) done() {
// so no interlocking is required. However each half may be accessed // so no interlocking is required. However each half may be accessed
// concurrently so the implementation of conn should protect against // concurrently so the implementation of conn should protect against
// concurrent reads or concurrent writes. // concurrent reads or concurrent writes.
func NewClient(conn io.ReadWriteCloser) *Client { func NewClient(conn io.ReadWriteCloser, isPipe bool) *Client {
encBuf := bufio.NewWriter(conn) encBuf := bufio.NewWriter(conn)
client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf} client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
return NewClientWithCodec(client) return NewClientWithCodec(client, isPipe)
} }
// NewClientWithCodec is like NewClient but uses the specified // NewClientWithCodec is like NewClient but uses the specified
// codec to encode requests and decode responses. // codec to encode requests and decode responses.
func NewClientWithCodec(codec ClientCodec) *Client { func NewClientWithCodec(codec ClientCodec, isPipe bool) *Client {
client := &Client{ client := &Client{
codec: codec, codec: codec,
pending: make(map[uint64]*Call), pending: make(map[uint64]*Call),
bPipe: isPipe,
} }
go client.input() go client.input()
return client return client
@@ -269,7 +274,7 @@ func DialHTTPPath(network, address, path string) (*Client, error) {
// before switching to RPC protocol. // before switching to RPC protocol.
resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"}) resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})
if err == nil && resp.Status == connected { if err == nil && resp.Status == connected {
return NewClient(conn), nil return NewClient(conn, false), nil
} }
if err == nil { if err == nil {
err = errors.New("unexpected HTTP response: " + resp.Status) err = errors.New("unexpected HTTP response: " + resp.Status)
@@ -291,7 +296,7 @@ func Dial(network, address string) (*Client, error) {
} }
tcpconn, _ := conn.(*net.TCPConn) tcpconn, _ := conn.(*net.TCPConn)
tcpconn.SetNoDelay(true) tcpconn.SetNoDelay(true)
return NewClient(conn), nil return NewClient(conn, false), nil
} }
func DialTimeOut(network, address string, timeout time.Duration) (*Client, error) { func DialTimeOut(network, address string, timeout time.Duration) (*Client, error) {
@@ -301,7 +306,7 @@ func DialTimeOut(network, address string, timeout time.Duration) (*Client, error
} }
tcpconn, _ := conn.(*net.TCPConn) tcpconn, _ := conn.(*net.TCPConn)
tcpconn.SetNoDelay(true) tcpconn.SetNoDelay(true)
return NewClient(conn), nil return NewClient(conn, false), nil
} }
// Close calls the underlying codec's Close method. If the connection is already // Close calls the underlying codec's Close method. If the connection is already