From 89328aa3dac1d9591abcd4550537a4ae9161badc Mon Sep 17 00:00:00 2001 From: boyce <6549168@qq.com> Date: Mon, 28 Oct 2019 16:22:46 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E6=96=B0=E5=A2=9ECallEx=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cluster/cluster.go | 95 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 92 insertions(+), 3 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index f143661..8ef7f6b 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -1,13 +1,13 @@ package cluster import ( + "errors" "fmt" "math/rand" "net" "os" "sort" "strings" - "sync" "time" "github.com/duanhf2012/origin/rpc" @@ -34,7 +34,6 @@ type CCluster struct { writer net.Conn LocalRpcClient *rpc.Client - localRpcLocker sync.Mutex innerLocalServiceList map[string]bool } @@ -188,7 +187,7 @@ func (slf *CCluster) ConnService() error { if slf.LocalRpcClient.IsClosed() { slf.ReSetLocalRpcClient() } - time.Sleep(time.Second * 2) + time.Sleep(time.Second * 4) } return nil @@ -372,6 +371,8 @@ func (slf *CCluster) Go(bCast bool, NodeServiceMethod string, args interface{}, return slf.goImpl(bCast, NodeServiceMethod, args, queueModle, true) } + + func (slf *CCluster) goImpl(bCast bool, NodeServiceMethod string, args interface{}, queueModle bool, log bool) error { var callServiceName string var serviceName string @@ -521,6 +522,94 @@ func Call(NodeServiceMethod string, args interface{}, reply interface{}) error { return InstanceClusterMgr().Call(NodeServiceMethod, args, reply) } +func (slf *CCluster) CallEx(NodeServiceMethod string, args interface{}, reply interface{}) *RpcCallResult { + return slf.rawcall(NodeServiceMethod, args, reply, false) +} + +type RpcCallResult struct { + chanRet chan *rpc.Call + err error + rets *rpc.Call +} + +func (slf *RpcCallResult) Make() { + slf.chanRet = make(chan *rpc.Call, 1) + slf.rets = nil +} + +func (slf *RpcCallResult) WaitReturn(waittm time.Duration) error { + if slf.chanRet == nil { + return errors.New("cannot make rpccallresult") + } + + if waittm <= 0 { + select { + case ret := <-slf.chanRet: + return ret.Error + } + } else { + // + select { + case ret := <-slf.chanRet: + return ret.Error + case <-time.After(waittm): + return errors.New("is time out") + } + } + + return errors.New("unknow error.") +} + +func (slf *CCluster) rawcall(NodeServiceMethod string, args interface{}, reply interface{}, queueModle bool) *RpcCallResult { + var rpcRet RpcCallResult + rpcRet.Make() + + var callServiceName string + var serviceName string + nodeidList := slf.GetNodeList(NodeServiceMethod, &callServiceName, &serviceName) + if len(nodeidList) > 1 || len(nodeidList) < 1 { + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Call(%s) find nodes count %d is error.", NodeServiceMethod, len(nodeidList)) + rpcRet.err = fmt.Errorf("CCluster.Call(%s) find nodes count %d is error.", NodeServiceMethod, len(nodeidList)) + return &rpcRet + } + + nodeid := nodeidList[0] + if nodeid == GetNodeId() { + //判断服务是否已经完成初始化 + iService := service.InstanceServiceMgr().FindService(serviceName) + if iService == nil { + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Call(%s): NodeId %d cannot find service.", NodeServiceMethod, nodeid) + rpcRet.err = fmt.Errorf("CCluster.Call(%s): NodeId %d cannot find service..", NodeServiceMethod, nodeid) + return &rpcRet + } + + if iService.IsInit() == false { + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Call(%s): NodeId %d is not init.", NodeServiceMethod, nodeid) + rpcRet.err = fmt.Errorf("CCluster.Call(%s): NodeId %d is not init.", NodeServiceMethod, nodeid) + return &rpcRet + } + + rpcRet.rets = slf.LocalRpcClient.Go(callServiceName, args, reply, rpcRet.chanRet, queueModle) + return &rpcRet + } + + pclient := slf.GetClusterClient(nodeid) + if pclient == nil { + service.GetLogger().Printf(sysmodule.LEVER_ERROR, "CCluster.Call(%s): NodeId %d is not find.", NodeServiceMethod, nodeid) + rpcRet.err = fmt.Errorf("CCluster.Call(%s): NodeId %d is not find.", NodeServiceMethod, nodeid) + return &rpcRet + } + + rpcRet.rets = pclient.Go(callServiceName, args, reply, rpcRet.chanRet, queueModle) + + return &rpcRet + +} + +func CallEx(NodeServiceMethod string, args interface{}, reply interface{}) *RpcCallResult { + return InstanceClusterMgr().rawcall(NodeServiceMethod, args, reply, false) +} + func CallNode(NodeId int, servicemethod string, args interface{}, reply interface{}) error { return InstanceClusterMgr().CallNode(NodeId, servicemethod, args, reply) } From 5cb659e073b12817817df7c331c5c4dc3c278aa3 Mon Sep 17 00:00:00 2001 From: boyce Date: Tue, 29 Oct 2019 10:54:16 +0800 Subject: [PATCH 2/2] =?UTF-8?q?Revert=20"=E5=A2=9E=E5=8A=A0=E7=AE=A1?= =?UTF-8?q?=E9=81=93=E6=A8=A1=E5=BC=8F=EF=BC=8C=E5=8D=B3=E6=9C=AC=E5=9C=B0?= =?UTF-8?q?=E8=B0=83=E7=94=A8=E6=97=B6=EF=BC=8C=E5=8F=91=E7=94=9F=E9=94=99?= =?UTF-8?q?=E8=AF=AF=E4=B8=8D=E6=96=AD=E5=BC=80=E8=BF=9E=E6=8E=A5=E7=8A=B6?= =?UTF-8?q?=E6=80=81"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit e266c50ed635c5f3ad131289583c3aa92b5376e6. --- rpc/client.go | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/rpc/client.go b/rpc/client.go index 4cef3e0..7865491 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -51,7 +51,6 @@ type Client struct { pending map[uint64]*Call closing bool // user has called Close shutdown bool // server has told us to stop - bPipe bool } // A ClientCodec implements writing of RPC requests and @@ -152,9 +151,6 @@ func (client *Client) input() { if err != nil { call.Error = errors.New("reading body " + err.Error()) } - if client.bPipe { - err = nil - } call.done() } } @@ -203,19 +199,18 @@ func (call *Call) done() { // so no interlocking is required. However each half may be accessed // concurrently so the implementation of conn should protect against // concurrent reads or concurrent writes. -func NewClient(conn io.ReadWriteCloser, isPipe bool) *Client { +func NewClient(conn io.ReadWriteCloser) *Client { encBuf := bufio.NewWriter(conn) client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf} - return NewClientWithCodec(client, isPipe) + return NewClientWithCodec(client) } // NewClientWithCodec is like NewClient but uses the specified // codec to encode requests and decode responses. -func NewClientWithCodec(codec ClientCodec, isPipe bool) *Client { +func NewClientWithCodec(codec ClientCodec) *Client { client := &Client{ codec: codec, pending: make(map[uint64]*Call), - bPipe: isPipe, } go client.input() return client @@ -274,7 +269,7 @@ func DialHTTPPath(network, address, path string) (*Client, error) { // before switching to RPC protocol. resp, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"}) if err == nil && resp.Status == connected { - return NewClient(conn, false), nil + return NewClient(conn), nil } if err == nil { err = errors.New("unexpected HTTP response: " + resp.Status) @@ -296,7 +291,7 @@ func Dial(network, address string) (*Client, error) { } tcpconn, _ := conn.(*net.TCPConn) tcpconn.SetNoDelay(true) - return NewClient(conn, false), nil + return NewClient(conn), nil } func DialTimeOut(network, address string, timeout time.Duration) (*Client, error) { @@ -306,7 +301,7 @@ func DialTimeOut(network, address string, timeout time.Duration) (*Client, error } tcpconn, _ := conn.(*net.TCPConn) tcpconn.SetNoDelay(true) - return NewClient(conn, false), nil + return NewClient(conn), nil } // Close calls the underlying codec's Close method. If the connection is already