优化异步调用,错误通过回调返回

This commit is contained in:
duanhf2012
2020-03-31 19:28:16 +08:00
parent 5344da1276
commit 9c9f72d559
2 changed files with 31 additions and 13 deletions

View File

@@ -33,7 +33,7 @@ type IRpcHandler interface {
GetName() string GetName() string
InitRpcHandler(rpcHandler IRpcHandler,getClientFun FuncRpcClient,getServerFun FuncRpcServer) InitRpcHandler(rpcHandler IRpcHandler,getClientFun FuncRpcClient,getServerFun FuncRpcServer)
GetRpcHandler() IRpcHandler GetRpcHandler() IRpcHandler
PushRequest(callinfo *RpcRequest) PushRequest(callinfo *RpcRequest) error
HandlerRpcRequest(request *RpcRequest) HandlerRpcRequest(request *RpcRequest)
HandlerRpcResponeCB(call *Call) HandlerRpcResponeCB(call *Call)
@@ -131,8 +131,13 @@ func (slf *RpcHandler) RegisterRpc(rpcHandler IRpcHandler) error {
return nil return nil
} }
func (slf *RpcHandler) PushRequest(req *RpcRequest) { func (slf *RpcHandler) PushRequest(req *RpcRequest) error{
if len(slf.callRequest) >= cap(slf.callRequest){
return fmt.Errorf("RpcHandler %s Rpc Channel is full.",slf.GetName())
}
slf.callRequest <- req slf.callRequest <- req
return nil
} }
func (slf *RpcHandler) GetRpcRequestChan() (chan *RpcRequest) { func (slf *RpcHandler) GetRpcRequestChan() (chan *RpcRequest) {
@@ -181,11 +186,8 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
var oParam reflect.Value var oParam reflect.Value
paramList = append(paramList,reflect.ValueOf(slf.GetRpcHandler())) //接受者 paramList = append(paramList,reflect.ValueOf(slf.GetRpcHandler())) //接受者
if request.localReply!=nil { oParam = reflect.New(v.oParam.Type().Elem())
oParam = reflect.ValueOf(request.localReply)
}else{
oParam = reflect.New(v.oParam.Type().Elem())
}
paramList = append(paramList,reflect.ValueOf(v.iparam)) paramList = append(paramList,reflect.ValueOf(v.iparam))
paramList = append(paramList,oParam) //输出参数 paramList = append(paramList,oParam) //输出参数
@@ -324,12 +326,16 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,mutiCoroutin
reply := reflect.New(fVal.Type().In(0).Elem()).Interface() reply := reflect.New(fVal.Type().In(0).Elem()).Interface()
pClientList,err := slf.funcRpcClient(nodeid,serviceMethod) pClientList,err := slf.funcRpcClient(nodeid,serviceMethod)
if err != nil { if err != nil {
fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)})
log.Error("Call serviceMethod is error:%+v!",err) log.Error("Call serviceMethod is error:%+v!",err)
return err return nil
} }
if len(pClientList) > 1 { if len(pClientList) > 1 {
err := fmt.Errorf("Cannot call more then 1 node!")
fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)})
log.Error("Cannot call more then 1 node!") log.Error("Cannot call more then 1 node!")
return fmt.Errorf("Cannot call more then 1 node!") return nil
} }
//2.rpcclient调用 //2.rpcclient调用
@@ -341,8 +347,9 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,mutiCoroutin
sMethod := strings.Split(serviceMethod,".") sMethod := strings.Split(serviceMethod,".")
if len(sMethod)!=2 { if len(sMethod)!=2 {
err := fmt.Errorf("Call serviceMethod %s is error!",serviceMethod) err := fmt.Errorf("Call serviceMethod %s is error!",serviceMethod)
fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)})
log.Error("%+v",err) log.Error("%+v",err)
return err return nil
} }
//调用自己rpcHandler处理器 //调用自己rpcHandler处理器
if sMethod[0] == slf.rpcHandler.GetName() { //自己服务调用 if sMethod[0] == slf.rpcHandler.GetName() { //自己服务调用
@@ -357,7 +364,11 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,mutiCoroutin
//其他的rpcHandler的处理器 //其他的rpcHandler的处理器
if callback!=nil { if callback!=nil {
return pLocalRpcServer.rpcHandlerAsyncGo(slf,false,mutiCoroutine,sMethod[0],sMethod[1],args,reply,fVal) err = pLocalRpcServer.rpcHandlerAsyncGo(slf,false,mutiCoroutine,sMethod[0],sMethod[1],args,reply,fVal)
if err != nil {
fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)})
}
return nil
} }
pCall := pLocalRpcServer.rpcHandlerGo(false,mutiCoroutine,sMethod[0],sMethod[1],args,reply) pCall := pLocalRpcServer.rpcHandlerGo(false,mutiCoroutine,sMethod[0],sMethod[1],args,reply)
pResult := pCall.Done() pResult := pCall.Done()
@@ -365,7 +376,11 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,mutiCoroutin
} }
//跨node调用 //跨node调用
return pClient.AsycGo(slf,mutiCoroutine,serviceMethod,fVal,args,reply) err = pClient.AsycGo(slf,mutiCoroutine,serviceMethod,fVal,args,reply)
if err != nil {
fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)})
}
return nil
} }
func (slf *RpcHandler) GetName() string{ func (slf *RpcHandler) GetName() string{

View File

@@ -253,7 +253,10 @@ func (slf *Server) rpcHandlerAsyncGo(callerRpcHandler IRpcHandler,noReply bool,m
if mutiCoroutine == true { if mutiCoroutine == true {
go rpcHandler.HandlerRpcRequest(&req) go rpcHandler.HandlerRpcRequest(&req)
}else{ }else{
rpcHandler.PushRequest(&req) err := rpcHandler.PushRequest(&req)
if err != nil {
return err
}
} }
return nil return nil