From 93b9c4f89a410b27070024e7e6721b92a141f469 Mon Sep 17 00:00:00 2001 From: boyce Date: Fri, 10 Jul 2020 16:17:56 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96rpc=E8=BF=94=E5=9B=9E?= =?UTF-8?q?=E5=8D=8F=E8=AE=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: boyce --- cluster/cluster.go | 30 +++++++----------------------- cluster/parsecfg.go | 16 +++++++++++----- rpc/client.go | 8 +++++++- rpc/jsonprocessor.go | 2 +- rpc/msgpprocessor.go | 2 +- rpc/pbprocessor.go | 2 +- rpc/rpc.go | 2 +- rpc/rpchandler.go | 18 +++++++++++------- rpc/server.go | 8 ++++++-- 9 files changed, 46 insertions(+), 42 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 04b5108..77fce3e 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -2,7 +2,6 @@ package cluster import ( "fmt" - "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/rpc" "github.com/duanhf2012/origin/service" "strings" @@ -108,39 +107,24 @@ func (slf *Cluster) GetRpcClient(nodeid int) *rpc.Client { return c.client } -func GetRpcClient(nodeId int,serviceMethod string) ([]*rpc.Client,error) { - var rpcClientList []*rpc.Client +func GetRpcClient(nodeId int,serviceMethod string,clientList *[]*rpc.Client) error { if nodeId>0 { pClient := GetCluster().GetRpcClient(nodeId) if pClient==nil { - return rpcClientList,fmt.Errorf("cannot find nodeid %d!",nodeId) + return fmt.Errorf("cannot find nodeid %d!",nodeId) } - rpcClientList = append(rpcClientList,pClient) - return rpcClientList,nil + *clientList = append(*clientList,pClient) + return nil } serviceAndMethod := strings.Split(serviceMethod,".") if len(serviceAndMethod)!=2 { - return nil,fmt.Errorf("servicemethod param %s is error!",serviceMethod) + return fmt.Errorf("servicemethod param %s is error!",serviceMethod) } //1.找到对应的rpcnodeid - - nodeidList := GetCluster().GetNodeIdByService(serviceAndMethod[0]) - if len(nodeidList) ==0 { - return rpcClientList,fmt.Errorf("Cannot Find %s nodeid",serviceMethod) - } - - for _,nodeid:= range nodeidList { - pClient := GetCluster().GetRpcClient(nodeid) - if pClient==nil { - log.Error("Cannot connect node id %d",nodeid) - continue - } - rpcClientList = append(rpcClientList,pClient) - } - - return rpcClientList,nil + GetCluster().GetNodeIdByService(serviceAndMethod[0],clientList) + return nil } func GetRpcServer() *rpc.Server{ diff --git a/cluster/parsecfg.go b/cluster/parsecfg.go index 42ee37e..893a3f2 100644 --- a/cluster/parsecfg.go +++ b/cluster/parsecfg.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "github.com/duanhf2012/origin/log" + "github.com/duanhf2012/origin/rpc" "io/ioutil" "strings" ) @@ -195,18 +196,23 @@ func (slf *Cluster) IsConfigService(servicename string) bool { return ok } -func (slf *Cluster) GetNodeIdByService(servicename string) []int{ - var nodelist []int + + +func (slf *Cluster) GetNodeIdByService(servicename string,rpcClientList *[]*rpc.Client) { nodeInfoList,ok := slf.localSubNetMapService[servicename] if ok == true { for _,node := range nodeInfoList { - nodelist = append(nodelist,node.NodeId) + pClient := GetCluster().GetRpcClient(node.NodeId) + if pClient==nil { + log.Error("Cannot connect node id %d",node.NodeId) + continue + } + *rpcClientList = append(*rpcClientList,pClient) } } - - return nodelist } + func (slf *Cluster) getServiceCfg(servicename string) interface{}{ v,ok := slf.localServiceCfg[servicename] if ok == false { diff --git a/rpc/client.go b/rpc/client.go index 84bf68c..457e2a6 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -105,7 +105,7 @@ func (slf *Client) generateSeq() uint64{ return atomic.AddUint64(&slf.startSeq,1) } -func (slf *Client) AsycGo(rpcHandler IRpcHandler,serviceMethod string,callback reflect.Value, args interface{},replyParam interface{}) error { +func (slf *Client) AsycCall(rpcHandler IRpcHandler,serviceMethod string,callback reflect.Value, args interface{},replyParam interface{}) error { call := MakeCall() call.Reply = replyParam call.callback = &callback @@ -161,21 +161,27 @@ func (slf *Client) Go(noReply bool,serviceMethod string, args interface{},reply request := &RpcRequest{} call.Arg = args call.Seq = slf.generateSeq() + if noReply == false { + slf.AddPending(call) + } request.RpcRequestData = processor.MakeRpcRequest(slf.startSeq,serviceMethod,noReply,InParam) bytes,err := processor.Marshal(request.RpcRequestData) processor.ReleaseRpcRequest(request.RpcRequestData) if err != nil { call.Err = err + slf.RemovePending(call.Seq) return call } if slf.conn == nil { call.Err = fmt.Errorf("call %s is fail,rpc client is disconnect.",serviceMethod) + slf.RemovePending(call.Seq) return call } err = slf.conn.WriteMsg(bytes) if err != nil { + slf.RemovePending(call.Seq) call.Err = err } diff --git a/rpc/jsonprocessor.go b/rpc/jsonprocessor.go index 99cf66a..9967454 100644 --- a/rpc/jsonprocessor.go +++ b/rpc/jsonprocessor.go @@ -77,7 +77,7 @@ func (slf *JsonProcessor) ReleaseRpcRespose(rpcRequestData IRpcResponseData){ rpcJsonResponeDataPool.Put(rpcRequestData) } -func (slf *JsonRpcRequestData) IsReply() bool{ +func (slf *JsonRpcRequestData) IsNoReply() bool{ return slf.NoReply } diff --git a/rpc/msgpprocessor.go b/rpc/msgpprocessor.go index 6b36bc5..8a52000 100644 --- a/rpc/msgpprocessor.go +++ b/rpc/msgpprocessor.go @@ -84,7 +84,7 @@ func (slf *MsgpProcessor) ReleaseRpcRespose(rpcRequestData IRpcResponseData){ rpcResponeDataPool.Put(rpcRequestData) } -func (slf *MsgpRpcRequestData) IsReply() bool{ +func (slf *MsgpRpcRequestData) IsNoReply() bool{ return slf.NoReply } diff --git a/rpc/pbprocessor.go b/rpc/pbprocessor.go index d397695..d8b8e79 100644 --- a/rpc/pbprocessor.go +++ b/rpc/pbprocessor.go @@ -69,7 +69,7 @@ func (slf *PBProcessor) ReleaseRpcRespose(rpcRequestData IRpcRequestData){ } -func (slf *PBRpcRequestData) IsReply() bool{ +func (slf *PBRpcRequestData) IsNoReply() bool{ return slf.GetNoReply() } diff --git a/rpc/rpc.go b/rpc/rpc.go index acee113..6b04461 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -22,7 +22,7 @@ type IRpcRequestData interface { GetSeq() uint64 GetServiceMethod() string GetInParam() []byte - IsReply() bool + IsNoReply() bool } type IRpcResponseData interface { diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index 10a1406..9cceef6 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -10,7 +10,7 @@ import ( "unicode/utf8" ) -type FuncRpcClient func(nodeid int,serviceMethod string) ([]*Client,error) +type FuncRpcClient func(nodeid int,serviceMethod string,client *[]*Client) error type FuncRpcServer func() (*Server) var NilError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()) @@ -279,7 +279,8 @@ func (slf *RpcHandler) CallMethod(ServiceMethod string,param interface{},reply i } func (slf *RpcHandler) goRpc(bCast bool,nodeId int,serviceMethod string,args interface{}) error { - pClientList,err := slf.funcRpcClient(nodeId,serviceMethod) + var pClientList []*Client + err := slf.funcRpcClient(nodeId,serviceMethod,&pClientList) if err != nil { log.Error("Call serviceMethod is error:%+v!",err) return err @@ -319,7 +320,7 @@ func (slf *RpcHandler) goRpc(bCast bool,nodeId int,serviceMethod string,args int } //跨node调用 - pCall := pClient.Go(false,serviceMethod,args,nil) + pCall := pClient.Go(true,serviceMethod,args,nil) if pCall.Err!=nil { err = pCall.Err } @@ -330,7 +331,8 @@ func (slf *RpcHandler) goRpc(bCast bool,nodeId int,serviceMethod string,args int } func (slf *RpcHandler) callRpc(nodeId int,serviceMethod string,args interface{},reply interface{}) error { - pClientList,err := slf.funcRpcClient(nodeId,serviceMethod) + var pClientList []*Client + err := slf.funcRpcClient(nodeId,serviceMethod,&pClientList) if err != nil { log.Error("Call serviceMethod is error:%+v!",err) return err @@ -367,6 +369,7 @@ func (slf *RpcHandler) callRpc(nodeId int,serviceMethod string,args interface{}, //跨node调用 pCall := pClient.Go(false,serviceMethod,args,reply) if pCall.Err != nil { + ReleaseCall(pCall) return pCall.Err } err = pCall.Done().Err @@ -395,14 +398,15 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args interfa } reply := reflect.New(fVal.Type().In(0).Elem()).Interface() - pClientList,err := slf.funcRpcClient(nodeid,serviceMethod) + var pClientList []*Client + err := slf.funcRpcClient(nodeid,serviceMethod,&pClientList) if err != nil { fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) log.Error("Call serviceMethod is error:%+v!",err) return nil } - if len(pClientList) > 1 { + if pClientList== nil || len(pClientList) > 1 { err := fmt.Errorf("Cannot call more then 1 node!") fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) log.Error("Cannot call more then 1 node!") @@ -449,7 +453,7 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args interfa } //跨node调用 - err = pClient.AsycGo(slf,serviceMethod,fVal,args,reply) + err = pClient.AsycCall(slf,serviceMethod,fVal,args,reply) if err != nil { fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) } diff --git a/rpc/server.go b/rpc/server.go index 57f1223..08cccfc 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -140,7 +140,7 @@ func (agent *RpcAgent) Run() { continue } - if req.RpcRequestData.IsReply()== false { + if req.RpcRequestData.IsNoReply()==false { req.requestHandle = func(Returns interface{},Err *RpcError){ agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),Returns,Err) } @@ -149,7 +149,11 @@ func (agent *RpcAgent) Run() { err = rpcHandler.PushRequest(req) if err != nil { rpcError := RpcError(err.Error()) - agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError) + + if req.RpcRequestData.IsNoReply() { + agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError) + } + processor.ReleaseRpcRequest(req.RpcRequestData) ReleaseRpcRequest(req) }