优化原始rpc,采用register方式

This commit is contained in:
boyce
2020-12-21 16:34:06 +08:00
parent ce2a14fa83
commit 4b67c59b1e
10 changed files with 294 additions and 194 deletions

View File

@@ -45,10 +45,12 @@ type RpcMethodInfo struct {
rpcProcessorType RpcProcessorType
}
type RawRpcCallBack func(rawData []byte)
type RpcHandler struct {
callRequest chan *RpcRequest
rpcHandler IRpcHandler
mapFunctions map[string]RpcMethodInfo
mapRawFunctions map[uint32] RawRpcCallBack
funcRpcClient FuncRpcClient
funcRpcServer FuncRpcServer
@@ -79,7 +81,7 @@ type IRpcHandler interface {
AsyncCallNode(nodeId int,serviceMethod string,args interface{},callback interface{}) error
CallNode(nodeId int,serviceMethod string,args interface{},reply interface{}) error
GoNode(nodeId int,serviceMethod string,args interface{}) error
RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,serviceMethod string,args IRawInputArgs) error
RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,rpcMethodId uint32,serviceName string,args IRawInputArgs) error
CastGo(serviceMethod string,args interface{})
IsSingleCoroutine() bool
}
@@ -99,7 +101,7 @@ func (handler *RpcHandler) GetRpcHandler() IRpcHandler{
func (handler *RpcHandler) InitRpcHandler(rpcHandler IRpcHandler,getClientFun FuncRpcClient,getServerFun FuncRpcServer) {
handler.callRequest = make(chan *RpcRequest,1000000)
handler.callResponseCallBack = make(chan *Call,1000000)
handler.mapRawFunctions = make(map[uint32] RawRpcCallBack)
handler.rpcHandler = rpcHandler
handler.mapFunctions = map[string]RpcMethodInfo{}
handler.funcRpcClient = getClientFun
@@ -157,15 +159,11 @@ func (handler *RpcHandler) suitableMethods(method reflect.Method) error {
return fmt.Errorf("%s Unsupported parameter types!",method.Name)
}
}
a := typ.In(parIdx).Kind()
if a == reflect.Interface {
rpcMethodInfo.inParam = nil
}else{
rpcMethodInfo.inParamValue = reflect.New(typ.In(parIdx).Elem()) //append(rpcMethodInfo.iparam,)
rpcMethodInfo.inParam = reflect.New(typ.In(parIdx).Elem()).Interface()
pt,_ := GetProcessorType(rpcMethodInfo.inParamValue.Interface())
rpcMethodInfo.rpcProcessorType = pt
}
rpcMethodInfo.inParamValue = reflect.New(typ.In(parIdx).Elem()) //append(rpcMethodInfo.iparam,)
rpcMethodInfo.inParam = reflect.New(typ.In(parIdx).Elem()).Interface()
pt,_ := GetProcessorType(rpcMethodInfo.inParamValue.Interface())
rpcMethodInfo.rpcProcessorType = pt
parIdx++
if parIdx< typ.NumIn() {
@@ -243,6 +241,20 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
defer request.inputArgs.DoGc()
}
//如果是原始RPC请求
rawRpcId := request.RpcRequestData.GetRpcMethodId()
if rawRpcId>0 {
v,ok := handler.mapRawFunctions[rawRpcId]
if ok == false {
err := fmt.Sprintf("RpcHandler cannot find request rpc id %d!",rawRpcId)
log.Error(err)
return
}
v(request.inputArgs.GetRawData())
return
}
//普通的rpc请求
v,ok := handler.mapFunctions[request.RpcRequestData.GetServiceMethod()]
if ok == false {
err := "RpcHandler "+handler.rpcHandler.GetName()+"cannot find "+request.RpcRequestData.GetServiceMethod()
@@ -255,45 +267,22 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
var paramList []reflect.Value
var err error
var iParam interface{}
//单协程或非异步调用时直接使用预置对象
if v.inParam!= nil {
iParam = reflect.New(v.inParamValue.Type().Elem()).Interface()
}
iParam := reflect.New(v.inParamValue.Type().Elem()).Interface()
if request.bLocalRequest == false {
if iParam == nil {
//原始调用
iParam = request.RpcRequestData.GetInParam()
}else{
err = request.rpcProcessor.Unmarshal(request.RpcRequestData.GetInParam(),iParam)
if err!=nil {
rErr := "Call Rpc "+request.RpcRequestData.GetServiceMethod()+" Param error "+err.Error()
log.Error(rErr)
if request.requestHandle!=nil {
request.requestHandle(nil, RpcError(rErr))
}
return
err = request.rpcProcessor.Unmarshal(request.RpcRequestData.GetInParam(),iParam)
if err!=nil {
rErr := "Call Rpc "+request.RpcRequestData.GetServiceMethod()+" Param error "+err.Error()
log.Error(rErr)
if request.requestHandle!=nil {
request.requestHandle(nil, RpcError(rErr))
}
return
}
}else {
if iParam == nil {
iParam = request.inputArgs.GetRawData()
}else if request.inputArgs!=nil {
err = request.rpcProcessor.Unmarshal(request.inputArgs.GetRawData(),iParam)
if err!=nil {
rErr := "Call Rpc "+request.RpcRequestData.GetServiceMethod()+" Param error "+err.Error()
log.Error(rErr)
if request.requestHandle!=nil {
request.requestHandle(nil, RpcError(rErr))
}
return
}
}else {
iParam = request.localParam
}
iParam = request.localParam
}
//生成Call参数
paramList = append(paramList,reflect.ValueOf(handler.GetRpcHandler())) //接受者
if v.hasResponder == true {
if request.requestHandle!=nil {
@@ -387,7 +376,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,s
return pLocalRpcServer.myselfRpcHandlerGo(serviceName,serviceMethod,args,nil)
}
//其他的rpcHandler的处理器
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClientList[i],true,serviceName,serviceMethod,args,nil,nil)
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClientList[i],true,serviceName,0,serviceMethod,args,nil,nil)
if pCall.Err!=nil {
err = pCall.Err
}
@@ -438,7 +427,7 @@ func (handler *RpcHandler) callRpc(nodeId int,serviceMethod string,args interfac
return pLocalRpcServer.myselfRpcHandlerGo(serviceName,serviceMethod,args,reply)
}
//其他的rpcHandler的处理器
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(nil,pClient,false,serviceName,serviceMethod,args,reply,nil)
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(nil,pClient,false,serviceName,0,serviceMethod,args,reply,nil)
err = pCall.Done().Err
pClient.RemovePending(pCall.Seq)
ReleaseCall(pCall)
@@ -570,11 +559,11 @@ func (handler *RpcHandler) CastGo(serviceMethod string,args interface{}) {
handler.goRpc(nil,true,0,serviceMethod,args)
}
func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,serviceMethod string,args IRawInputArgs) error {
//RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,serviceName string,rpcMethodId uint32,args IRawInputArgs) error
func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,rpcMethodId uint32,serviceName string,args IRawInputArgs) error {
processor := GetProcessor(uint8(rpcProcessorType))
var pClientList [maxClusterNode]*Client
err,count := handler.funcRpcClient(nodeId,serviceMethod,pClientList[:])
err,count := handler.funcRpcClient(nodeId,serviceName,pClientList[:])
if count==0||err != nil {
args.DoGc()
log.Error("Call serviceMethod is error:%+v!",err)
@@ -591,26 +580,15 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId in
for i:=0;i<count;i++{
if pClientList[i].bSelfNode == true {
pLocalRpcServer:= handler.funcRpcServer()
//判断是否是同一服务
findIndex := strings.Index(serviceMethod,".")
if findIndex==-1 {
serr := fmt.Errorf("Call serviceMethod %s is error!",serviceMethod)
log.Error("%+v",serr)
if serr!= nil {
err = serr
}
continue
}
serviceName := serviceMethod[:findIndex]
//调用自己rpcHandler处理器
if serviceName == handler.rpcHandler.GetName() { //自己服务调用
err:= pLocalRpcServer.myselfRpcHandlerGo(serviceName,serviceMethod,args,nil)
err:= pLocalRpcServer.myselfRpcHandlerGo(serviceName,serviceName,args,nil)
args.DoGc()
return err
}
//其他的rpcHandler的处理器
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClientList[i],true,serviceName,serviceMethod,nil,nil,args)
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClientList[i],true,serviceName,rpcMethodId,serviceName,nil,nil,args)
if pCall.Err!=nil {
err = pCall.Err
}
@@ -620,7 +598,7 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId in
}
//跨node调用
pCall := pClientList[i].RawGo(processor,true,serviceMethod,args.GetRawData(),nil)
pCall := pClientList[i].RawGo(processor,true,rpcMethodId,serviceName,args.GetRawData(),nil)
args.DoGc()
if pCall.Err!=nil {
err = pCall.Err
@@ -632,3 +610,6 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId in
return err
}
func (handler *RpcHandler) RegRawRpc(rpcMethodId uint32,rawRpcCB RawRpcCallBack){
handler.mapRawFunctions[rpcMethodId] = rawRpcCB
}