新增对rpc参数数据类型自动识别(同时支持json与protobuf方式序列化与反序列化)

This commit is contained in:
boyce
2020-10-08 14:12:57 +08:00
parent a4d2a0c4ac
commit 6edc5a0762
8 changed files with 102 additions and 37 deletions

View File

@@ -164,6 +164,7 @@ func (slf *Client) AsycCall(rpcHandler IRpcHandler,serviceMethod string,callback
call.rpcHandler = rpcHandler
call.ServiceMethod = serviceMethod
processorType, processor := GetProcessorType(args)
InParam,herr := processor.Marshal(args)
if herr != nil {
ReleaseCall(call)
@@ -190,7 +191,7 @@ func (slf *Client) AsycCall(rpcHandler IRpcHandler,serviceMethod string,callback
return fmt.Errorf("Rpc server is disconnect,call %s is fail!",serviceMethod)
}
err = slf.conn.WriteMsg(bytes)
err = slf.conn.WriteMsg([]byte{uint8(processorType)},bytes)
if err != nil {
slf.RemovePending(call.Seq)
ReleaseCall(call)
@@ -199,7 +200,7 @@ func (slf *Client) AsycCall(rpcHandler IRpcHandler,serviceMethod string,callback
return err
}
func (slf *Client) RawGo(noReply bool,serviceMethod string,args []byte,additionParam interface{},reply interface{}) *Call {
func (slf *Client) RawGo(processor IRpcProcessor,noReply bool,serviceMethod string,args []byte,additionParam interface{},reply interface{}) *Call {
call := MakeCall()
call.ServiceMethod = serviceMethod
call.Reply = reply
@@ -235,13 +236,14 @@ func (slf *Client) RawGo(noReply bool,serviceMethod string,args []byte,additionP
}
func (slf *Client) Go(noReply bool,serviceMethod string, args interface{},reply interface{}) *Call {
_,processor := GetProcessorType(args)
InParam,err := processor.Marshal(args)
if err != nil {
call := MakeCall()
call.Err = err
}
return slf.RawGo(noReply,serviceMethod,InParam,nil,reply)
return slf.RawGo(processor,noReply,serviceMethod,InParam,nil,reply)
}
func (slf *Client) Run(){
@@ -260,11 +262,18 @@ func (slf *Client) Run(){
log.Error("rpcClient %s ReadMsg error:%+v",slf.Addr,err)
return
}
processor := GetProcessor(uint8(bytes[0]))
if processor==nil {
log.Error("rpcClient %s ReadMsg head error:%+v",slf.Addr,err)
return
}
//1.解析head
respone := &RpcResponse{}
respone.RpcResponeData =processor.MakeRpcResponse(0,nil,nil)
err = processor.Unmarshal(bytes,respone.RpcResponeData)
err = processor.Unmarshal(bytes[1:],respone.RpcResponeData)
if err != nil {
processor.ReleaseRpcRespose(respone.RpcResponeData)
log.Error("rpcClient Unmarshal head error,error:%+v",err)

View File

@@ -81,6 +81,12 @@ func (slf *JsonProcessor) ReleaseRpcRespose(rpcRequestData IRpcResponseData){
rpcJsonResponeDataPool.Put(rpcRequestData)
}
func (slf *JsonProcessor) IsParse(param interface{}) bool {
_,err := json.Marshal(param)
return err==nil
}
func (slf *JsonRpcRequestData) IsNoReply() bool{
return slf.NoReply
}

View File

@@ -128,6 +128,11 @@ func (slf *PBProcessor) ReleaseRpcRespose(rpcRequestData IRpcResponseData){
rpcPbResponeDataPool.Put(rpcRequestData)
}
func (slf *PBProcessor) IsParse(param interface{}) bool {
_,ok := param.(proto.Message)
return ok
}
func (slf *PBRpcRequestData) IsNoReply() bool{
return slf.GetNoReply()
@@ -147,4 +152,3 @@ func (slf *PBRpcResponseData) GetErr() *RpcError {

View File

@@ -8,6 +8,7 @@ type IRpcProcessor interface {
ReleaseRpcRequest(rpcRequestData IRpcRequestData)
ReleaseRpcRespose(rpcRequestData IRpcResponseData)
IsParse(param interface{}) bool //是否可解析
}

View File

@@ -15,6 +15,7 @@ type RpcRequest struct {
localRawParam []byte
requestHandle RequestHandler
callback *reflect.Value
rpcProcessor IRpcProcessor
}
type RpcResponse struct {

View File

@@ -46,6 +46,7 @@ type RpcMethodInfo struct {
additionParam reflect.Value
//addition *IRawAdditionParam
hashAdditionParam bool
rpcProcessorType RpcProcessorType
}
type RpcHandler struct {
@@ -76,8 +77,8 @@ type IRpcHandler interface {
AsyncCallNode(nodeId int,serviceMethod string,args interface{},callback interface{}) error
CallNode(nodeId int,serviceMethod string,args interface{},reply interface{}) error
GoNode(nodeId int,serviceMethod string,args interface{}) error
RawGoNode(nodeId int,serviceMethod string,args []byte,additionParam interface{}) error
RawCastGo(serviceMethod string,args []byte,additionParam interface{})
RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,serviceMethod string,args []byte,additionParam interface{}) error
RawCastGo(rpcProcessorType RpcProcessorType,serviceMethod string,args []byte,additionParam interface{})
}
var rawAdditionParamValueNull reflect.Value
@@ -152,6 +153,9 @@ func (slf *RpcHandler) suitableMethods(method reflect.Method) error {
}
rpcMethodInfo.iparam = reflect.New(typ.In(parIdx).Elem()) //append(rpcMethodInfo.iparam,)
pt,_ := GetProcessorType(rpcMethodInfo.iparam.Interface())
rpcMethodInfo.rpcProcessorType = pt
parIdx++
if parIdx< typ.NumIn() {
rpcMethodInfo.oParam = reflect.New(typ.In(parIdx).Elem())
@@ -225,7 +229,7 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
}
}()
defer ReleaseRpcRequest(request)
defer processor.ReleaseRpcRequest(request.RpcRequestData)
defer request.rpcProcessor.ReleaseRpcRequest(request.RpcRequestData)
v,ok := slf.mapfunctons[request.RpcRequestData.GetServiceMethod()]
if ok == false {
@@ -241,7 +245,7 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
var err error
iparam := reflect.New(v.iparam.Type().Elem()).Interface()
if request.bLocalRequest == false {
err = processor.Unmarshal(request.RpcRequestData.GetInParam(),iparam)
err = request.rpcProcessor.Unmarshal(request.RpcRequestData.GetInParam(),iparam)
if err!=nil {
rerr := Errorf("Call Rpc %s Param error %+v",request.RpcRequestData.GetServiceMethod(),err)
log.Error("%s",rerr.Error())
@@ -252,7 +256,7 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
}
}else {
if request.localRawParam!=nil {
err = processor.Unmarshal(request.localRawParam,iparam)
err = request.rpcProcessor.Unmarshal(request.localRawParam,iparam)
if err!=nil {
rerr := Errorf("Call Rpc %s Param error %+v",request.RpcRequestData.GetServiceMethod(),err)
log.Error("%s",rerr.Error())
@@ -327,7 +331,7 @@ func (slf *RpcHandler) CallMethod(ServiceMethod string,param interface{},reply i
return err
}
func (slf *RpcHandler) goRpc(bCast bool,nodeId int,serviceMethod string,args interface{}) error {
func (slf *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,serviceMethod string,args interface{}) error {
var pClientList []*Client
err := slf.funcRpcClient(nodeId,serviceMethod,&pClientList)
if err != nil {
@@ -360,7 +364,7 @@ func (slf *RpcHandler) goRpc(bCast bool,nodeId int,serviceMethod string,args int
return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,nil)
}
//其他的rpcHandler的处理器
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(pClient,true,sMethod[0],sMethod[1],args,nil,nil,nil)
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClient,true,sMethod[0],sMethod[1],args,nil,nil,nil)
if pCall.Err!=nil {
err = pCall.Err
}
@@ -381,7 +385,7 @@ func (slf *RpcHandler) goRpc(bCast bool,nodeId int,serviceMethod string,args int
func (slf *RpcHandler) rawGoRpc(bCast bool,nodeId int,serviceMethod string,args []byte,additionParam interface{}) error {
func (slf *RpcHandler) rawGoRpc(processor IRpcProcessor,bCast bool,nodeId int,serviceMethod string,args []byte,additionParam interface{}) error {
var pClientList []*Client
err := slf.funcRpcClient(nodeId,serviceMethod,&pClientList)
if err != nil {
@@ -414,7 +418,7 @@ func (slf *RpcHandler) rawGoRpc(bCast bool,nodeId int,serviceMethod string,args
return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,nil)
}
//其他的rpcHandler的处理器
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(pClient,true,sMethod[0],sMethod[1],nil,args,nil,additionParam)
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClient,true,sMethod[0],sMethod[1],nil,args,nil,additionParam)
if pCall.Err!=nil {
err = pCall.Err
}
@@ -423,7 +427,7 @@ func (slf *RpcHandler) rawGoRpc(bCast bool,nodeId int,serviceMethod string,args
}
//跨node调用
pCall := pClient.RawGo(true,serviceMethod,args,additionParam,nil)
pCall := pClient.RawGo(processor,true,serviceMethod,args,additionParam,nil)
if pCall.Err!=nil {
err = pCall.Err
}
@@ -464,7 +468,7 @@ func (slf *RpcHandler) callRpc(nodeId int,serviceMethod string,args interface{},
return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,reply)
}
//其他的rpcHandler的处理器
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(pClient,false,sMethod[0],sMethod[1],args,nil,reply,nil)
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(nil,pClient,false,sMethod[0],sMethod[1],args,nil,reply,nil)
err = pCall.Done().Err
pClient.RemovePending(pCall.Seq)
ReleaseCall(pCall)
@@ -549,7 +553,7 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args interfa
}
return nil
}
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(pClient,false,sMethod[0],sMethod[1],args,nil,reply,nil)
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(nil,pClient,false,sMethod[0],sMethod[1],args,nil,reply,nil)
err = pCall.Done().Err
pClient.RemovePending(pCall.Seq)
ReleaseCall(pCall)
@@ -584,7 +588,7 @@ func (slf *RpcHandler) Call(serviceMethod string,args interface{},reply interfac
func (slf *RpcHandler) Go(serviceMethod string,args interface{}) error {
return slf.goRpc(false,0,serviceMethod,args)
return slf.goRpc(nil,false,0,serviceMethod,args)
}
func (slf *RpcHandler) AsyncCallNode(nodeId int,serviceMethod string,args interface{},callback interface{}) error {
@@ -596,18 +600,18 @@ func (slf *RpcHandler) CallNode(nodeId int,serviceMethod string,args interface{}
}
func (slf *RpcHandler) GoNode(nodeId int,serviceMethod string,args interface{}) error {
return slf.goRpc(false,nodeId,serviceMethod,args)
return slf.goRpc(nil,false,nodeId,serviceMethod,args)
}
func (slf *RpcHandler) CastGo(serviceMethod string,args interface{}) {
slf.goRpc(true,0,serviceMethod,args)
slf.goRpc(nil,true,0,serviceMethod,args)
}
func (slf *RpcHandler) RawGoNode(nodeId int,serviceMethod string,args []byte,additionParam interface{}) error {
return slf.rawGoRpc(false,nodeId,serviceMethod,args,additionParam)
func (slf *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,serviceMethod string,args []byte,additionParam interface{}) error {
return slf.rawGoRpc(GetProcessor(uint8(rpcProcessorType)),false,nodeId,serviceMethod,args,additionParam)
}
func (slf *RpcHandler) RawCastGo(serviceMethod string,args []byte,additionParam interface{}) {
slf.goRpc(true,0,serviceMethod,args)
func (slf *RpcHandler) RawCastGo(rpcProcessorType RpcProcessorType,serviceMethod string,args []byte,additionParam interface{}) {
slf.goRpc(GetProcessor(uint8(rpcProcessorType)),true,0,serviceMethod,args)
}

View File

@@ -9,8 +9,16 @@ import (
"reflect"
"strings"
)
type RpcProcessorType uint8
var processor IRpcProcessor = &JsonProcessor{}
const (
RPC_PROCESSOR_JSON RpcProcessorType = 0
RPC_PROCESSOR_PB RpcProcessorType = 1
)
//var processor IRpcProcessor = &JsonProcessor{}
var arrayProcessor = []IRpcProcessor{&JsonProcessor{},&PBProcessor{}}
var arrayProcessorLen uint8 = 2
var LittleEndian bool
type Server struct {
@@ -20,8 +28,26 @@ type Server struct {
rpcserver *network.TCPServer
}
func SetProcessor(proc IRpcProcessor) {
processor = proc
func AppendProcessor(rpcProcessor IRpcProcessor) {
arrayProcessor = append(arrayProcessor,rpcProcessor)
arrayProcessorLen++
}
func GetProcessorType(param interface{}) (RpcProcessorType,IRpcProcessor){
for i:=uint8(1);i<arrayProcessorLen;i++{
if arrayProcessor[i].IsParse(param) == true {
return RpcProcessorType(i),arrayProcessor[i]
}
}
return RPC_PROCESSOR_JSON,arrayProcessor[RPC_PROCESSOR_JSON]
}
func GetProcessor(processorType uint8) IRpcProcessor{
if processorType>=arrayProcessorLen{
return nil
}
return arrayProcessor[processorType]
}
func (slf *Server) Init(rpcHandleFinder RpcHandleFinder) {
@@ -56,7 +82,7 @@ type RpcAgent struct {
}
func (agent *RpcAgent) WriteRespone(serviceMethod string,seq uint64,reply interface{},err *RpcError) {
func (agent *RpcAgent) WriteRespone(processor IRpcProcessor,serviceMethod string,seq uint64,reply interface{},err *RpcError) {
var mReply []byte
var rpcError *RpcError
var errM error
@@ -98,15 +124,22 @@ func (agent *RpcAgent) Run() {
//will close tcpconn
break
}
processor := GetProcessor(uint8(data[0]))
if processor==nil {
log.Error("remote rpc %s data head error:%+v",agent.conn.RemoteAddr(),err)
return
}
//解析head
req := MakeRpcRequest()
req.rpcProcessor = processor
req.RpcRequestData = processor.MakeRpcRequest(0,"",false,nil,nil)
err = processor.Unmarshal(data,req.RpcRequestData)
err = processor.Unmarshal(data[1:],req.RpcRequestData)
if err != nil {
log.Error("rpc Unmarshal request is error: %v", err)
if req.RpcRequestData.GetSeq()>0 {
rpcError := RpcError(err.Error())
agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
agent.WriteRespone(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
processor.ReleaseRpcRequest(req.RpcRequestData)
ReleaseRpcRequest(req)
continue
@@ -122,7 +155,7 @@ func (agent *RpcAgent) Run() {
serviceMethod := strings.Split(req.RpcRequestData.GetServiceMethod(),".")
if len(serviceMethod)!=2 {
rpcError := RpcError("rpc request req.ServiceMethod is error")
agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
agent.WriteRespone(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
processor.ReleaseRpcRequest(req.RpcRequestData)
ReleaseRpcRequest(req)
log.Debug("rpc request req.ServiceMethod is error")
@@ -132,7 +165,7 @@ func (agent *RpcAgent) Run() {
rpcHandler := agent.rpcserver.rpcHandleFinder.FindRpcHandler(serviceMethod[0])
if rpcHandler== nil {
rpcError := RpcError(fmt.Sprintf("service method %s not config!", req.RpcRequestData.GetServiceMethod()))
agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
agent.WriteRespone(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
processor.ReleaseRpcRequest(req.RpcRequestData)
ReleaseRpcRequest(req)
log.Error("service method %s not config!", req.RpcRequestData.GetServiceMethod())
@@ -141,7 +174,7 @@ func (agent *RpcAgent) Run() {
if req.RpcRequestData.IsNoReply()==false {
req.requestHandle = func(Returns interface{},Err *RpcError){
agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),Returns,Err)
agent.WriteRespone(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),Returns,Err)
}
}
@@ -150,7 +183,7 @@ func (agent *RpcAgent) Run() {
rpcError := RpcError(err.Error())
if req.RpcRequestData.IsNoReply() {
agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
agent.WriteRespone(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
}
processor.ReleaseRpcRequest(req.RpcRequestData)
@@ -200,7 +233,7 @@ func (slf *Server) myselfRpcHandlerGo(handlerName string,methodName string, args
}
func (slf *Server) selfNodeRpcHandlerGo(client *Client,noReply bool,handlerName string,methodName string, args interface{},rawArgs []byte,reply interface{},additionParam interface{}) *Call {
func (slf *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Client,noReply bool,handlerName string,methodName string, args interface{},rawArgs []byte,reply interface{},additionParam interface{}) *Call {
pCall := MakeCall()
pCall.Seq = client.generateSeq()
@@ -217,7 +250,12 @@ func (slf *Server) selfNodeRpcHandlerGo(client *Client,noReply bool,handlerName
req.localParam = args
req.localReply = reply
req.localRawParam = rawArgs
if processor == nil {
_,processor = GetProcessorType(args)
}
req.RpcRequestData = processor.MakeRpcRequest(0,fmt.Sprintf("%s.%s",handlerName,methodName),noReply,nil,additionParam)
req.rpcProcessor = processor
if noReply == false {
client.AddPending(pCall)
req.requestHandle = func(Returns interface{},Err *RpcError){
@@ -267,6 +305,8 @@ func (slf *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler IRp
req.localParam = args
req.localReply = reply
req.bLocalRequest = true
_,processor := GetProcessorType(args)
req.rpcProcessor =processor
req.RpcRequestData = processor.MakeRpcRequest(0,fmt.Sprintf("%s.%s",handlerName,methodName),noReply,nil,nil)
if noReply == false {
client.AddPending(pCall)