优化服务自我rpc调用

This commit is contained in:
orgin
2022-10-25 17:06:13 +08:00
parent da18cf3158
commit 2da3ccae39
2 changed files with 90 additions and 30 deletions

View File

@@ -85,7 +85,7 @@ type IRpcHandler interface {
GetRpcHandler() IRpcHandler
HandlerRpcRequest(request *RpcRequest)
HandlerRpcResponseCB(call *Call)
CallMethod(ServiceMethod string, param interface{},callBack reflect.Value, reply interface{}) error
CallMethod(client *Client,ServiceMethod string, param interface{},callBack reflect.Value, reply interface{}) error
AsyncCall(serviceMethod string, args interface{}, callback interface{}) error
Call(serviceMethod string, args interface{}, reply interface{}) error
Go(serviceMethod string, args interface{}) error
@@ -299,7 +299,7 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
}
}
func (handler *RpcHandler) CallMethod(ServiceMethod string, param interface{},callBack reflect.Value, reply interface{}) error {
func (handler *RpcHandler) CallMethod(client *Client,ServiceMethod string, param interface{},callBack reflect.Value, reply interface{}) error {
var err error
v, ok := handler.mapFunctions[ServiceMethod]
if ok == false {
@@ -310,27 +310,60 @@ func (handler *RpcHandler) CallMethod(ServiceMethod string, param interface{},ca
var paramList []reflect.Value
var returnValues []reflect.Value
var pCall *Call
var callSeq uint64
if v.hasResponder == true {
paramList = append(paramList, reflect.ValueOf(handler.GetRpcHandler())) //接受者
pCall = MakeCall()
pCall.Seq = client.generateSeq()
callSeq = pCall.Seq
client.AddPending(pCall)
//有返回值时
if reply != nil {
//如果是Call同步调用
hander :=func(Returns interface{}, Err RpcError) {
//如果返回错误
if len(Err)!=0 {
if callBack!=requestHandlerNull {
callBack.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(errors.New(Err.Error()))})
}else{
rpcCall := client.FindPending(callSeq)
//如果找不到,说明已经超时
if rpcCall!= nil {
rpcCall.Err = errors.New(Err.Error())
rpcCall.Done()
}else{
log.SError("cannot find call seq ",callSeq)
}
}
return
}
//解析数据
_, processor := GetProcessorType(Returns)
bytes,errs := processor.Marshal(Returns)
if errs != nil {
callBack.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(errs)})
if errs == nil {
errs = processor.Unmarshal(bytes,reply)
}
errs = processor.Unmarshal(bytes,reply)
if callBack!=requestHandlerNull {
if errs != nil {
callBack.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(errs)})
} else{
callBack.Call([]reflect.Value{reflect.ValueOf(reply), nilError})
}
}
if len(Err)==0{
callBack.Call([]reflect.Value{reflect.ValueOf(reply), nilError})
rpcCall := client.FindPending(callSeq)
//如果找不到,说明已经超时
if rpcCall!= nil {
rpcCall.Err = errs
rpcCall.done<-rpcCall
}else{
callBack.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(errors.New(Err.Error()))})
log.SError("cannot find call seq ",callSeq)
}
}
paramList = append(paramList, reflect.ValueOf(hander))
}else{//无返回值时,是一个requestHandlerNull空回调
paramList = append(paramList, callBack)
@@ -342,23 +375,45 @@ func (handler *RpcHandler) CallMethod(ServiceMethod string, param interface{},ca
//判断返回值是否错误,有错误时则回调
errInter := returnValues[0].Interface()
if errInter != nil {
if errInter != nil && callBack!=requestHandlerNull{
err = errInter.(error)
callBack.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
}
}else{
paramList = append(paramList, reflect.ValueOf(handler.GetRpcHandler())) //接受者
paramList = append(paramList, reflect.ValueOf(param))
paramList = append(paramList, reflect.ValueOf(reply)) //输出参数
//被调用RPC函数有返回值时
if v.outParamValue.IsValid() {
//不带返回值参数的RPC函数
if reply == nil {
paramList = append(paramList, reflect.New(v.outParamValue.Type().Elem()))
}else{
//带返回值参数的RPC函数
paramList = append(paramList, reflect.ValueOf(reply)) //输出参数
}
}
returnValues = v.method.Func.Call(paramList)
errInter := returnValues[0].Interface()
if errInter != nil {
err = errInter.(error)
callBack.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
//如果无回调
if callBack != requestHandlerNull {
valErr := nilError
if errInter != nil {
err = errInter.(error)
valErr = reflect.ValueOf(err)
}
callBack.Call([]reflect.Value{reflect.ValueOf(reply),valErr })
}
}
if pCall != nil {
err = pCall.Done().Err
client.RemovePending(pCall.Seq)
ReleaseCall(pCall)
}
return err
}
@@ -397,7 +452,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId int
serviceName := serviceMethod[:findIndex]
if serviceName == handler.rpcHandler.GetName() { //自己服务调用
//调用自己rpcHandler处理器
return pLocalRpcServer.myselfRpcHandlerGo(serviceName, serviceMethod, args, requestHandlerNull,nil)
return pLocalRpcServer.myselfRpcHandlerGo(pClientList[i],serviceName, serviceMethod, args, requestHandlerNull,nil)
}
//其他的rpcHandler的处理器
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor, pClientList[i], true, serviceName, 0, serviceMethod, args, nil, nil)
@@ -451,7 +506,7 @@ func (handler *RpcHandler) callRpc(nodeId int, serviceMethod string, args interf
serviceName := serviceMethod[:findIndex]
if serviceName == handler.rpcHandler.GetName() { //自己服务调用
//调用自己rpcHandler处理器
return pLocalRpcServer.myselfRpcHandlerGo(serviceName, serviceMethod, args,requestHandlerNull, reply)
return pLocalRpcServer.myselfRpcHandlerGo(pClient,serviceName, serviceMethod, args,requestHandlerNull, reply)
}
//其他的rpcHandler的处理器
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(nil, pClient, false, serviceName, 0, serviceMethod, args, reply, nil)
@@ -530,7 +585,7 @@ func (handler *RpcHandler) asyncCallRpc(nodeId int, serviceMethod string, args i
serviceName := serviceMethod[:findIndex]
//调用自己rpcHandler处理器
if serviceName == handler.rpcHandler.GetName() { //自己服务调用
return pLocalRpcServer.myselfRpcHandlerGo(serviceName, serviceMethod, args,fVal ,reply)
return pLocalRpcServer.myselfRpcHandlerGo(pClient,serviceName, serviceMethod, args,fVal ,reply)
}
//其他的rpcHandler的处理器
@@ -607,7 +662,7 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId i
pLocalRpcServer := handler.funcRpcServer()
//调用自己rpcHandler处理器
if serviceName == handler.rpcHandler.GetName() { //自己服务调用
err := pLocalRpcServer.myselfRpcHandlerGo(serviceName, serviceName, rawArgs.GetRawData(), requestHandlerNull,nil)
err := pLocalRpcServer.myselfRpcHandlerGo(handler.pClientList[i],serviceName, serviceName, rawArgs.GetRawData(), requestHandlerNull,nil)
//args.DoGc()
return err
}