优化原始RPC

This commit is contained in:
boyce
2021-01-06 18:26:13 +08:00
parent a282c7c268
commit 4116b9ae12
3 changed files with 84 additions and 55 deletions

View File

@@ -10,10 +10,8 @@ type RpcRequest struct {
ref bool
RpcRequestData IRpcRequestData
bLocalRequest bool
inParam interface{}
localReply interface{}
localParam interface{} //本地调用的参数列表
inputArgs IRawInputArgs
requestHandle RequestHandler
callback *reflect.Value
@@ -87,11 +85,9 @@ func init(){
func (slf *RpcRequest) Clear() *RpcRequest{
slf.RpcRequestData = nil
slf.localReply = nil
slf.localParam = nil
slf.inParam = nil
slf.requestHandle = nil
slf.callback = nil
slf.bLocalRequest = false
slf.inputArgs = nil
slf.rpcProcessor = nil
return slf
}

View File

@@ -1,6 +1,7 @@
package rpc
import (
"errors"
"fmt"
"github.com/duanhf2012/origin/log"
"reflect"
@@ -45,7 +46,12 @@ type RpcMethodInfo struct {
rpcProcessorType RpcProcessorType
}
type RawRpcCallBack func(rawData []byte)
type RawRpcCallBack interface {
Unmarshal(data []byte) (interface{},error)
CB(data interface{})
}
//type RawRpcCallBack func(rawData []byte)
type RpcHandler struct {
callRequest chan *RpcRequest
rpcHandler IRpcHandler
@@ -81,11 +87,13 @@ type IRpcHandler interface {
AsyncCallNode(nodeId int,serviceMethod string,args interface{},callback interface{}) error
CallNode(nodeId int,serviceMethod string,args interface{},reply interface{}) error
GoNode(nodeId int,serviceMethod string,args interface{}) error
RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,rpcMethodId uint32,serviceName string,args IRawInputArgs) error
RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,rpcMethodId uint32,serviceName string,rawArgs []byte) error
CastGo(serviceMethod string,args interface{})
IsSingleCoroutine() bool
UnmarshalInParam(rpcProcessor IRpcProcessor,serviceMethod string,rawRpcMethodId uint32,inParam []byte) (interface{},error)
}
func reqHandlerNull(Returns interface{},Err RpcError) {
}
@@ -240,9 +248,6 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
ReleaseRpcRequest(request)
}
}()
if request.inputArgs!=nil {
defer request.inputArgs.DoGc()
}
//如果是原始RPC请求
rawRpcId := request.RpcRequestData.GetRpcMethodId()
@@ -253,12 +258,8 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
log.Error(err)
return
}
if request.inputArgs != nil {
v(request.inputArgs.GetRawData())
}else{
v(request.RpcRequestData.GetInParam())
}
v.CB(request.inParam)
return
}
@@ -275,21 +276,6 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
var paramList []reflect.Value
var err error
iParam := reflect.New(v.inParamValue.Type().Elem()).Interface()
if request.bLocalRequest == false {
err = request.rpcProcessor.Unmarshal(request.RpcRequestData.GetInParam(),iParam)
if err!=nil {
rErr := "Call Rpc "+request.RpcRequestData.GetServiceMethod()+" Param error "+err.Error()
log.Error(rErr)
if request.requestHandle!=nil {
request.requestHandle(nil, RpcError(rErr))
}
return
}
}else {
iParam = request.localParam
}
//生成Call参数
paramList = append(paramList,reflect.ValueOf(handler.GetRpcHandler())) //接受者
if v.hasResponder == true {
@@ -301,7 +287,7 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
}
}
paramList = append(paramList,reflect.ValueOf(iParam))
paramList = append(paramList,reflect.ValueOf(request.inParam))
var oParam reflect.Value
if v.outParamValue.IsValid() {
if request.localReply!=nil {
@@ -567,17 +553,17 @@ func (handler *RpcHandler) CastGo(serviceMethod string,args interface{}) {
handler.goRpc(nil,true,0,serviceMethod,args)
}
func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,rpcMethodId uint32,serviceName string,args IRawInputArgs) error {
func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,rpcMethodId uint32,serviceName string,rawArgs []byte) error {
processor := GetProcessor(uint8(rpcProcessorType))
var pClientList [maxClusterNode]*Client
err,count := handler.funcRpcClient(nodeId,serviceName,pClientList[:])
if count==0||err != nil {
args.DoGc()
//args.DoGc()
log.Error("Call serviceMethod is error:%+v!",err)
return err
}
if count > 1 {
args.DoGc()
//args.DoGc()
log.Error("Cannot call more then 1 node!")
return fmt.Errorf("Cannot call more then 1 node!")
}
@@ -589,13 +575,13 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId in
pLocalRpcServer:= handler.funcRpcServer()
//调用自己rpcHandler处理器
if serviceName == handler.rpcHandler.GetName() { //自己服务调用
err:= pLocalRpcServer.myselfRpcHandlerGo(serviceName,serviceName,args,nil)
args.DoGc()
err:= pLocalRpcServer.myselfRpcHandlerGo(serviceName,serviceName,rawArgs,nil)
//args.DoGc()
return err
}
//其他的rpcHandler的处理器
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClientList[i],true,serviceName,rpcMethodId,serviceName,nil,nil,args)
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClientList[i],true,serviceName,rpcMethodId,serviceName,nil,nil,rawArgs)
if pCall.Err!=nil {
err = pCall.Err
}
@@ -605,8 +591,7 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId in
}
//跨node调用
pCall := pClientList[i].RawGo(processor,true,rpcMethodId,serviceName,args.GetRawData(),nil)
args.DoGc()
pCall := pClientList[i].RawGo(processor,true,rpcMethodId,serviceName,rawArgs,nil)
if pCall.Err!=nil {
err = pCall.Err
}
@@ -620,3 +605,33 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId in
func (handler *RpcHandler) RegRawRpc(rpcMethodId uint32,rawRpcCB RawRpcCallBack){
handler.mapRawFunctions[rpcMethodId] = rawRpcCB
}
func (handler *RpcHandler) UnmarshalInParam(rpcProcessor IRpcProcessor,serviceMethod string,rawRpcMethodId uint32,inParam []byte) (interface{},error){
if rawRpcMethodId>0 {
v,ok := handler.mapRawFunctions[rawRpcMethodId]
if ok == false {
err := fmt.Errorf("RpcHandler cannot find request rpc id %d!",rawRpcMethodId)
log.Error(err.Error())
return nil,err
}
msg,err := v.Unmarshal(inParam)
if err != nil {
err := fmt.Errorf("RpcHandler cannot Unmarshal rpc id %d!",rawRpcMethodId)
log.Error(err.Error())
return nil,err
}
return msg,err
}
v,ok := handler.mapFunctions[serviceMethod]
if ok == false {
return nil,errors.New( "RpcHandler "+handler.rpcHandler.GetName()+"cannot find "+serviceMethod)
}
var err error
param := reflect.New(v.inParamValue.Type().Elem()).Interface()
err = rpcProcessor.Unmarshal(inParam,inParam)
return param,err
}

View File

@@ -132,7 +132,9 @@ func (agent *RpcAgent) Run() {
log.Error("rpc Unmarshal request is error: %v", err)
if req.RpcRequestData.GetSeq()>0 {
rpcError := RpcError(err.Error())
agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError)
if req.RpcRequestData.IsNoReply()==false {
agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError)
}
ReleaseRpcRequest(req)
continue
}else{
@@ -146,7 +148,9 @@ func (agent *RpcAgent) Run() {
serviceMethod := strings.Split(req.RpcRequestData.GetServiceMethod(),".")
if len(serviceMethod) < 1 {
rpcError := RpcError("rpc request req.ServiceMethod is error")
agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError)
if req.RpcRequestData.IsNoReply()==false {
agent.WriteResponse(processor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetSeq(), nil, rpcError)
}
ReleaseRpcRequest(req)
log.Debug("rpc request req.ServiceMethod is error")
continue
@@ -155,7 +159,9 @@ func (agent *RpcAgent) Run() {
rpcHandler := agent.rpcServer.rpcHandleFinder.FindRpcHandler(serviceMethod[0])
if rpcHandler== nil {
rpcError := RpcError(fmt.Sprintf("service method %s not config!", req.RpcRequestData.GetServiceMethod()))
agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError)
if req.RpcRequestData.IsNoReply()==false {
agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError)
}
ReleaseRpcRequest(req)
log.Error("service method %s not config!", req.RpcRequestData.GetServiceMethod())
continue
@@ -168,6 +174,17 @@ func (agent *RpcAgent) Run() {
}
}
req.inParam,err = rpcHandler.UnmarshalInParam(req.rpcProcessor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetRpcMethodId(),req.RpcRequestData.GetInParam())
if err != nil {
rErr := "Call Rpc "+req.RpcRequestData.GetServiceMethod()+" Param error "+err.Error()
if req.requestHandle!=nil {
req.requestHandle(nil, RpcError(rErr))
}
log.Error(rErr)
ReleaseRpcRequest(req)
continue
}
err = rpcHandler.PushRequest(req)
if err != nil {
rpcError := RpcError(err.Error())
@@ -221,15 +238,12 @@ func (server *Server) myselfRpcHandlerGo(handlerName string,serviceMethod string
}
func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Client,noReply bool,handlerName string,rpcMethodId uint32,serviceMethod string, args interface{},reply interface{},inputArgs IRawInputArgs) *Call {
func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Client,noReply bool,handlerName string,rpcMethodId uint32,serviceMethod string, args interface{},reply interface{},rawArgs []byte) *Call {
pCall := MakeCall()
pCall.Seq = client.generateSeq()
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler== nil {
if inputArgs!= nil {
inputArgs.DoGc()
}
pCall.Seq = 0
pCall.Err = fmt.Errorf("service method %s not config!", serviceMethod)
log.Error("%s",pCall.Err.Error())
@@ -242,11 +256,18 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Clien
_,processor = GetProcessorType(args)
}
req := MakeRpcRequest(processor,0,rpcMethodId, serviceMethod,noReply,nil)
req.bLocalRequest = true
req.localParam = args
req.inParam = args
req.localReply = reply
req.inputArgs = inputArgs
if rawArgs!=nil {
var err error
req.inParam,err = rpcHandler.UnmarshalInParam(processor,serviceMethod,rpcMethodId,rawArgs)
if err != nil {
ReleaseRpcRequest(req)
pCall.Err = err
pCall.done <- pCall
}
}
//req.inputArgs = inputArgs
if noReply == false {
client.AddPending(pCall)
@@ -285,12 +306,9 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler
_,processor := GetProcessorType(args)
req := MakeRpcRequest(processor,0,0,serviceMethod,noReply,nil)
req.localParam = args
req.inParam = args
req.localReply = reply
req.bLocalRequest = true
//req.rpcProcessor =processor
//req.RpcRequestData = processor.MakeRpcRequest(0,serviceMethod,noReply,nil)
if noReply == false {
callSeq := client.generateSeq()
pCall := MakeCall()