mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-03 22:45:13 +08:00
优化服务内的RPC调用
This commit is contained in:
@@ -85,7 +85,7 @@ type IRpcHandler interface {
|
||||
GetRpcHandler() IRpcHandler
|
||||
HandlerRpcRequest(request *RpcRequest)
|
||||
HandlerRpcResponseCB(call *Call)
|
||||
CallMethod(ServiceMethod string, param interface{}, reply interface{}) error
|
||||
CallMethod(ServiceMethod string, param interface{},callBack reflect.Value, reply interface{}) error
|
||||
AsyncCall(serviceMethod string, args interface{}, callback interface{}) error
|
||||
Call(serviceMethod string, args interface{}, reply interface{}) error
|
||||
Go(serviceMethod string, args interface{}) error
|
||||
@@ -299,7 +299,7 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
|
||||
}
|
||||
}
|
||||
|
||||
func (handler *RpcHandler) CallMethod(ServiceMethod string, param interface{}, reply interface{}) error {
|
||||
func (handler *RpcHandler) CallMethod(ServiceMethod string, param interface{},callBack reflect.Value, reply interface{}) error {
|
||||
var err error
|
||||
v, ok := handler.mapFunctions[ServiceMethod]
|
||||
if ok == false {
|
||||
@@ -309,16 +309,57 @@ func (handler *RpcHandler) CallMethod(ServiceMethod string, param interface{}, r
|
||||
}
|
||||
|
||||
var paramList []reflect.Value
|
||||
paramList = append(paramList, reflect.ValueOf(handler.GetRpcHandler())) //接受者
|
||||
paramList = append(paramList, reflect.ValueOf(param))
|
||||
paramList = append(paramList, reflect.ValueOf(reply)) //输出参数
|
||||
var returnValues []reflect.Value
|
||||
if v.hasResponder == true {
|
||||
paramList = append(paramList, reflect.ValueOf(handler.GetRpcHandler())) //接受者
|
||||
//有返回值时
|
||||
if reply != nil {
|
||||
hander :=func(Returns interface{}, Err RpcError) {
|
||||
_, processor := GetProcessorType(Returns)
|
||||
bytes,errs := processor.Marshal(Returns)
|
||||
if errs != nil {
|
||||
callBack.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(errs)})
|
||||
}
|
||||
|
||||
returnValues := v.method.Func.Call(paramList)
|
||||
errInter := returnValues[0].Interface()
|
||||
if errInter != nil {
|
||||
err = errInter.(error)
|
||||
errs = processor.Unmarshal(bytes,reply)
|
||||
|
||||
if len(Err)==0{
|
||||
callBack.Call([]reflect.Value{reflect.ValueOf(reply), nilError})
|
||||
}else{
|
||||
callBack.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(errors.New(Err.Error()))})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
paramList = append(paramList, reflect.ValueOf(hander))
|
||||
}else{//无返回值时,是一个requestHandlerNull空回调
|
||||
paramList = append(paramList, callBack)
|
||||
}
|
||||
paramList = append(paramList, reflect.ValueOf(param))
|
||||
|
||||
//rpc函数被调用
|
||||
returnValues = v.method.Func.Call(paramList)
|
||||
|
||||
//判断返回值是否错误,有错误时则回调
|
||||
errInter := returnValues[0].Interface()
|
||||
if errInter != nil {
|
||||
err = errInter.(error)
|
||||
callBack.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
|
||||
}
|
||||
}else{
|
||||
paramList = append(paramList, reflect.ValueOf(handler.GetRpcHandler())) //接受者
|
||||
paramList = append(paramList, reflect.ValueOf(param))
|
||||
paramList = append(paramList, reflect.ValueOf(reply)) //输出参数
|
||||
returnValues = v.method.Func.Call(paramList)
|
||||
errInter := returnValues[0].Interface()
|
||||
if errInter != nil {
|
||||
err = errInter.(error)
|
||||
callBack.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -356,7 +397,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor, bCast bool, nodeId int
|
||||
serviceName := serviceMethod[:findIndex]
|
||||
if serviceName == handler.rpcHandler.GetName() { //自己服务调用
|
||||
//调用自己rpcHandler处理器
|
||||
return pLocalRpcServer.myselfRpcHandlerGo(serviceName, serviceMethod, args, nil)
|
||||
return pLocalRpcServer.myselfRpcHandlerGo(serviceName, serviceMethod, args, requestHandlerNull,nil)
|
||||
}
|
||||
//其他的rpcHandler的处理器
|
||||
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor, pClientList[i], true, serviceName, 0, serviceMethod, args, nil, nil)
|
||||
@@ -410,7 +451,7 @@ func (handler *RpcHandler) callRpc(nodeId int, serviceMethod string, args interf
|
||||
serviceName := serviceMethod[:findIndex]
|
||||
if serviceName == handler.rpcHandler.GetName() { //自己服务调用
|
||||
//调用自己rpcHandler处理器
|
||||
return pLocalRpcServer.myselfRpcHandlerGo(serviceName, serviceMethod, args, reply)
|
||||
return pLocalRpcServer.myselfRpcHandlerGo(serviceName, serviceMethod, args,requestHandlerNull, reply)
|
||||
}
|
||||
//其他的rpcHandler的处理器
|
||||
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(nil, pClient, false, serviceName, 0, serviceMethod, args, reply, nil)
|
||||
@@ -489,12 +530,7 @@ func (handler *RpcHandler) asyncCallRpc(nodeId int, serviceMethod string, args i
|
||||
serviceName := serviceMethod[:findIndex]
|
||||
//调用自己rpcHandler处理器
|
||||
if serviceName == handler.rpcHandler.GetName() { //自己服务调用
|
||||
err := pLocalRpcServer.myselfRpcHandlerGo(serviceName, serviceMethod, args, reply)
|
||||
if err == nil {
|
||||
fVal.Call([]reflect.Value{reflect.ValueOf(reply), nilError})
|
||||
} else {
|
||||
fVal.Call([]reflect.Value{reflect.ValueOf(reply), reflect.ValueOf(err)})
|
||||
}
|
||||
return pLocalRpcServer.myselfRpcHandlerGo(serviceName, serviceMethod, args,fVal ,reply)
|
||||
}
|
||||
|
||||
//其他的rpcHandler的处理器
|
||||
@@ -571,7 +607,7 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId i
|
||||
pLocalRpcServer := handler.funcRpcServer()
|
||||
//调用自己rpcHandler处理器
|
||||
if serviceName == handler.rpcHandler.GetName() { //自己服务调用
|
||||
err := pLocalRpcServer.myselfRpcHandlerGo(serviceName, serviceName, rawArgs.GetRawData(), nil)
|
||||
err := pLocalRpcServer.myselfRpcHandlerGo(serviceName, serviceName, rawArgs.GetRawData(), requestHandlerNull,nil)
|
||||
//args.DoGc()
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -234,7 +234,7 @@ func (server *Server) NewAgent(c *network.TCPConn) network.Agent {
|
||||
return agent
|
||||
}
|
||||
|
||||
func (server *Server) myselfRpcHandlerGo(handlerName string, serviceMethod string, args interface{}, reply interface{}) error {
|
||||
func (server *Server) myselfRpcHandlerGo(handlerName string, serviceMethod string, args interface{},callBack reflect.Value, reply interface{}) error {
|
||||
rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName)
|
||||
if rpcHandler == nil {
|
||||
err := errors.New("service method " + serviceMethod + " not config!")
|
||||
@@ -242,7 +242,9 @@ func (server *Server) myselfRpcHandlerGo(handlerName string, serviceMethod strin
|
||||
return err
|
||||
}
|
||||
|
||||
return rpcHandler.CallMethod(serviceMethod, args, reply)
|
||||
|
||||
|
||||
return rpcHandler.CallMethod(serviceMethod, args,callBack, reply)
|
||||
}
|
||||
|
||||
func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Client, noReply bool, handlerName string, rpcMethodId uint32, serviceMethod string, args interface{}, reply interface{}, rawArgs []byte) *Call {
|
||||
|
||||
Reference in New Issue
Block a user