优化rpc模块代码规范

This commit is contained in:
黎非易
2021-07-12 11:38:34 +08:00
parent f1b9a1220a
commit c9f47d796c
6 changed files with 44 additions and 49 deletions

View File

@@ -2,13 +2,14 @@ package rpc
import ( import (
"container/list" "container/list"
"fmt" "errors"
"github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/network" "github.com/duanhf2012/origin/network"
"github.com/duanhf2012/origin/util/timer" "github.com/duanhf2012/origin/util/timer"
"math" "math"
"reflect" "reflect"
"runtime" "runtime"
"strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -100,7 +101,8 @@ func (client *Client) checkRpcCallTimeout(){
} }
pCall := pElem.Value.(*Call) pCall := pElem.Value.(*Call)
if now.Sub(pCall.callTime) > client.callRpcTimeout { if now.Sub(pCall.callTime) > client.callRpcTimeout {
pCall.Err = fmt.Errorf("RPC call takes more than %d seconds!", client.callRpcTimeout/time.Second) strTimeout := strconv.FormatInt(int64(client.callRpcTimeout/time.Second), 10)
pCall.Err = errors.New("RPC call takes more than "+strTimeout+ " seconds")
client.makeCallFail(pCall) client.makeCallFail(pCall)
client.pendingLock.Unlock() client.pendingLock.Unlock()
continue continue
@@ -113,7 +115,7 @@ func (client *Client) ResetPending(){
client.pendingLock.Lock() client.pendingLock.Lock()
if client.pending != nil { if client.pending != nil {
for _,v := range client.pending { for _,v := range client.pending {
v.Value.(*Call).Err = fmt.Errorf("node is disconnect.") v.Value.(*Call).Err = errors.New("node is disconnect")
v.Value.(*Call).done <- v.Value.(*Call) v.Value.(*Call).done <- v.Value.(*Call)
} }
} }
@@ -186,7 +188,7 @@ func (client *Client) AsyncCall(rpcHandler IRpcHandler,serviceMethod string,call
} }
if client.conn == nil { if client.conn == nil {
return fmt.Errorf("Rpc server is disconnect,call %s is fail!",serviceMethod) return errors.New("Rpc server is disconnect,call "+serviceMethod)
} }
call := MakeCall() call := MakeCall()
@@ -224,7 +226,7 @@ func (client *Client) RawGo(processor IRpcProcessor,noReply bool,rpcMethodId uin
if client.conn == nil { if client.conn == nil {
call.Seq = 0 call.Seq = 0
call.Err = fmt.Errorf("call %s is fail,rpc client is disconnect.",serviceMethod) call.Err = errors.New(serviceMethod+" was called failed,rpc client is disconnect")
return call return call
} }
@@ -271,7 +273,7 @@ func (client *Client) Run(){
return return
} }
processor := GetProcessor(uint8(bytes[0])) processor := GetProcessor(bytes[0])
if processor==nil { if processor==nil {
client.conn.ReleaseReadMsg(bytes) client.conn.ReleaseReadMsg(bytes)
log.SError("rpcClient ",client.Addr," ReadMsg head error:",err.Error()) log.SError("rpcClient ",client.Addr," ReadMsg head error:",err.Error())
@@ -280,7 +282,7 @@ func (client *Client) Run(){
//1.解析head //1.解析head
response := RpcResponse{} response := RpcResponse{}
response.RpcResponseData =processor.MakeRpcResponse(0,RpcError(""),nil) response.RpcResponseData =processor.MakeRpcResponse(0,"",nil)
err = processor.Unmarshal(bytes[1:], response.RpcResponseData) err = processor.Unmarshal(bytes[1:], response.RpcResponseData)
client.conn.ReleaseReadMsg(bytes) client.conn.ReleaseReadMsg(bytes)

View File

@@ -81,7 +81,9 @@ func (slf *GoGoPBRpcResponseData) GetErr() *RpcError {
if slf.GetError() == "" { if slf.GetError() == "" {
return nil return nil
} }
return Errorf(slf.GetError())
err := RpcError(slf.GetError())
return &err
} }

View File

@@ -1,8 +1,8 @@
package rpc package rpc
import ( import (
jsoniter "github.com/json-iterator/go"
"github.com/duanhf2012/origin/util/sync" "github.com/duanhf2012/origin/util/sync"
jsoniter "github.com/json-iterator/go"
) )
var json = jsoniter.ConfigCompatibleWithStandardLibrary var json = jsoniter.ConfigCompatibleWithStandardLibrary
@@ -110,7 +110,8 @@ func (jsonRpcResponseData *JsonRpcResponseData) GetErr() *RpcError {
return nil return nil
} }
return Errorf(jsonRpcResponseData.Err) err := RpcError(jsonRpcResponseData.Err)
return &err
} }
func (jsonRpcResponseData *JsonRpcResponseData) GetReply() []byte{ func (jsonRpcResponseData *JsonRpcResponseData) GetReply() []byte{

View File

@@ -1,8 +1,8 @@
package rpc package rpc
import ( import (
"reflect"
"github.com/duanhf2012/origin/util/sync" "github.com/duanhf2012/origin/util/sync"
"reflect"
"time" "time"
) )

View File

@@ -14,7 +14,7 @@ import (
const maxClusterNode int = 128 const maxClusterNode int = 128
type FuncRpcClient func(nodeId int,serviceMethod string,client []*Client) (error,int) type FuncRpcClient func(nodeId int,serviceMethod string,client []*Client) (error,int)
type FuncRpcServer func() (*Server) type FuncRpcServer func() *Server
var nilError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()) var nilError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem())
type RpcError string type RpcError string
@@ -32,12 +32,6 @@ func ConvertError(e error) RpcError{
return rpcErr return rpcErr
} }
func Errorf(format string, a ...interface{}) *RpcError {
rpcErr := RpcError(fmt.Sprintf(format,a...))
return &rpcErr
}
type RpcMethodInfo struct { type RpcMethodInfo struct {
method reflect.Method method reflect.Method
inParamValue reflect.Value inParamValue reflect.Value
@@ -52,7 +46,6 @@ type RawRpcCallBack interface {
CB(data interface{}) CB(data interface{})
} }
//type RawRpcCallBack func(rawData []byte)
type RpcHandler struct { type RpcHandler struct {
callRequest chan *RpcRequest callRequest chan *RpcRequest
rpcHandler IRpcHandler rpcHandler IRpcHandler
@@ -89,7 +82,7 @@ type IRpcHandler interface {
CallNode(nodeId int,serviceMethod string,args interface{},reply interface{}) error CallNode(nodeId int,serviceMethod string,args interface{},reply interface{}) error
GoNode(nodeId int,serviceMethod string,args interface{}) error GoNode(nodeId int,serviceMethod string,args interface{}) error
RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,rpcMethodId uint32,serviceName string,rawArgs IRawInputArgs) error RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,rpcMethodId uint32,serviceName string,rawArgs IRawInputArgs) error
CastGo(serviceMethod string,args interface{}) CastGo(serviceMethod string,args interface{})error
IsSingleCoroutine() bool IsSingleCoroutine() bool
UnmarshalInParam(rpcProcessor IRpcProcessor,serviceMethod string,rawRpcMethodId uint32,inParam []byte) (interface{},error) UnmarshalInParam(rpcProcessor IRpcProcessor,serviceMethod string,rawRpcMethodId uint32,inParam []byte) (interface{},error)
} }
@@ -145,19 +138,19 @@ func (handler *RpcHandler) suitableMethods(method reflect.Method) error {
var rpcMethodInfo RpcMethodInfo var rpcMethodInfo RpcMethodInfo
typ := method.Type typ := method.Type
if typ.NumOut() != 1 { if typ.NumOut() != 1 {
return fmt.Errorf("%s The number of returned arguments must be 1!",method.Name) return fmt.Errorf("%s The number of returned arguments must be 1",method.Name)
} }
if typ.Out(0).String() != "error" { if typ.Out(0).String() != "error" {
return fmt.Errorf("%s The return parameter must be of type error!",method.Name) return fmt.Errorf("%s The return parameter must be of type error",method.Name)
} }
if typ.NumIn() <2 || typ.NumIn() > 4 { if typ.NumIn() <2 || typ.NumIn() > 4 {
return fmt.Errorf("%s Unsupported parameter format!",method.Name) return fmt.Errorf("%s Unsupported parameter format",method.Name)
} }
//1.判断第一个参数 //1.判断第一个参数
var parIdx int = 1 var parIdx = 1
if typ.In(parIdx).String() == "rpc.RequestHandler" { if typ.In(parIdx).String() == "rpc.RequestHandler" {
parIdx += 1 parIdx += 1
rpcMethodInfo.hasResponder = true rpcMethodInfo.hasResponder = true
@@ -165,11 +158,11 @@ func (handler *RpcHandler) suitableMethods(method reflect.Method) error {
for i:= parIdx ;i<typ.NumIn();i++{ for i:= parIdx ;i<typ.NumIn();i++{
if handler.isExportedOrBuiltinType(typ.In(i)) == false { if handler.isExportedOrBuiltinType(typ.In(i)) == false {
return fmt.Errorf("%s Unsupported parameter types!",method.Name) return fmt.Errorf("%s Unsupported parameter types",method.Name)
} }
} }
rpcMethodInfo.inParamValue = reflect.New(typ.In(parIdx).Elem()) //append(rpcMethodInfo.iparam,) rpcMethodInfo.inParamValue = reflect.New(typ.In(parIdx).Elem())
rpcMethodInfo.inParam = reflect.New(typ.In(parIdx).Elem()).Interface() rpcMethodInfo.inParam = reflect.New(typ.In(parIdx).Elem()).Interface()
pt,_ := GetProcessorType(rpcMethodInfo.inParamValue.Interface()) pt,_ := GetProcessorType(rpcMethodInfo.inParamValue.Interface())
rpcMethodInfo.rpcProcessorType = pt rpcMethodInfo.rpcProcessorType = pt
@@ -199,14 +192,14 @@ func (handler *RpcHandler) RegisterRpc(rpcHandler IRpcHandler) error {
func (handler *RpcHandler) PushRequest(req *RpcRequest) error{ func (handler *RpcHandler) PushRequest(req *RpcRequest) error{
if len(handler.callRequest) >= cap(handler.callRequest){ if len(handler.callRequest) >= cap(handler.callRequest){
return fmt.Errorf("RpcHandler %s Rpc Channel is full.", handler.GetName()) return fmt.Errorf("RpcHandler %s Rpc Channel is full", handler.GetName())
} }
handler.callRequest <- req handler.callRequest <- req
return nil return nil
} }
func (handler *RpcHandler) GetRpcRequestChan() (chan *RpcRequest) { func (handler *RpcHandler) GetRpcRequestChan() chan *RpcRequest {
return handler.callRequest return handler.callRequest
} }
@@ -345,10 +338,10 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,s
} }
if count > 1 && bCast == false{ if count > 1 && bCast == false{
log.SError("Cannot call %s more then 1 node!",serviceMethod) log.SError("Cannot call %s more then 1 node!",serviceMethod)
return fmt.Errorf("Cannot call more then 1 node!") return errors.New("cannot call more then 1 node")
} }
//2.rpcclient调用 //2.rpcClient调用
//如果调用本结点服务 //如果调用本结点服务
for i:=0;i<count;i++{ for i:=0;i<count;i++{
if pClientList[i].bSelfNode == true { if pClientList[i].bSelfNode == true {
@@ -358,9 +351,8 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,s
if findIndex==-1 { if findIndex==-1 {
sErr := errors.New("Call serviceMethod "+serviceMethod+" is error!") sErr := errors.New("Call serviceMethod "+serviceMethod+" is error!")
log.SError(sErr.Error()) log.SError(sErr.Error())
if sErr != nil { err = sErr
err = sErr
}
continue continue
} }
serviceName := serviceMethod[:findIndex] serviceName := serviceMethod[:findIndex]
@@ -402,10 +394,10 @@ func (handler *RpcHandler) callRpc(nodeId int,serviceMethod string,args interfac
return err return err
}else if count > 1 { }else if count > 1 {
log.SError("Cannot call more then 1 node!") log.SError("Cannot call more then 1 node!")
return errors.New("Cannot call more then 1 node!") return errors.New("cannot call more then 1 node")
} }
//2.rpcclient调用 //2.rpcClient调用
//如果调用本结点服务 //如果调用本结点服务
pClient := pClientList[0] pClient := pClientList[0]
if pClient.bSelfNode == true { if pClient.bSelfNode == true {
@@ -443,7 +435,7 @@ func (handler *RpcHandler) callRpc(nodeId int,serviceMethod string,args interfac
return err return err
} }
func (handler *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args interface{},callback interface{}) error { func (handler *RpcHandler) asyncCallRpc(nodeId int,serviceMethod string,args interface{},callback interface{}) error {
fVal := reflect.ValueOf(callback) fVal := reflect.ValueOf(callback)
if fVal.Kind()!=reflect.Func{ if fVal.Kind()!=reflect.Func{
err := errors.New("call "+serviceMethod+" input callback param is error!") err := errors.New("call "+serviceMethod+" input callback param is error!")
@@ -465,11 +457,11 @@ func (handler *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args int
reply := reflect.New(fVal.Type().In(0).Elem()).Interface() reply := reflect.New(fVal.Type().In(0).Elem()).Interface()
var pClientList [maxClusterNode]*Client var pClientList [maxClusterNode]*Client
err,count := handler.funcRpcClient(nodeid,serviceMethod,pClientList[:]) err,count := handler.funcRpcClient(nodeId,serviceMethod,pClientList[:])
if count==0||err != nil { if count==0||err != nil {
strNodeId := strconv.Itoa(nodeid) strNodeId := strconv.Itoa(nodeId)
if err == nil { if err == nil {
err = errors.New("cannot find rpcclient from nodeid "+strNodeId+" "+serviceMethod) err = errors.New("cannot find rpcClient from nodeId "+strNodeId+" "+serviceMethod)
} }
fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)})
log.SError("Call serviceMethod is error:%+v!",err.Error()) log.SError("Call serviceMethod is error:%+v!",err.Error())
@@ -477,13 +469,13 @@ func (handler *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args int
} }
if count > 1 { if count > 1 {
err := errors.New("Cannot call more then 1 node!") err := errors.New("cannot call more then 1 node")
fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)})
log.SError(err.Error()) log.SError(err.Error())
return nil return nil
} }
//2.rpcclient调用 //2.rpcClient调用
//如果调用本结点服务 //如果调用本结点服务
pClient := pClientList[0] pClient := pClientList[0]
if pClient.bSelfNode == true { if pClient.bSelfNode == true {
@@ -555,8 +547,8 @@ func (handler *RpcHandler) GoNode(nodeId int,serviceMethod string,args interface
return handler.goRpc(nil,false,nodeId,serviceMethod,args) return handler.goRpc(nil,false,nodeId,serviceMethod,args)
} }
func (handler *RpcHandler) CastGo(serviceMethod string,args interface{}) { func (handler *RpcHandler) CastGo(serviceMethod string,args interface{}) error{
handler.goRpc(nil,true,0,serviceMethod,args) return handler.goRpc(nil,true,0,serviceMethod,args)
} }
func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,rpcMethodId uint32,serviceName string,rawArgs IRawInputArgs) error { func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,rpcMethodId uint32,serviceName string,rawArgs IRawInputArgs) error {
@@ -570,12 +562,12 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId in
} }
if count > 1 { if count > 1 {
//args.DoGc() //args.DoGc()
err := errors.New("Cannot call more then 1 node!") err := errors.New("cannot call more then 1 node")
log.SError(err.Error()) log.SError(err.Error())
return err return err
} }
//2.rpcclient调用 //2.rpcClient调用
//如果调用本结点服务 //如果调用本结点服务
for i:=0;i<count;i++{ for i:=0;i<count;i++{
if pClientList[i].bSelfNode == true { if pClientList[i].bSelfNode == true {

View File

@@ -24,7 +24,6 @@ var LittleEndian bool
type Server struct { type Server struct {
functions map[interface{}]interface{} functions map[interface{}]interface{}
cmdChannel chan *Call
rpcHandleFinder RpcHandleFinder rpcHandleFinder RpcHandleFinder
rpcServer *network.TCPServer rpcServer *network.TCPServer
} }
@@ -58,7 +57,6 @@ func GetProcessor(processorType uint8) IRpcProcessor{
} }
func (server *Server) Init(rpcHandleFinder RpcHandleFinder) { func (server *Server) Init(rpcHandleFinder RpcHandleFinder) {
server.cmdChannel = make(chan *Call,100000)
server.rpcHandleFinder = rpcHandleFinder server.rpcHandleFinder = rpcHandleFinder
server.rpcServer = &network.TCPServer{} server.rpcServer = &network.TCPServer{}
} }
@@ -118,10 +116,10 @@ func (agent *RpcAgent) Run() {
break break
} }
processor := GetProcessor(uint8(data[0])) processor := GetProcessor(data[0])
if processor==nil { if processor==nil {
agent.conn.ReleaseReadMsg(data) agent.conn.ReleaseReadMsg(data)
log.SError("remote rpc ",agent.conn.RemoteAddr()," data head error:",err.Error()) log.SError("remote rpc ",agent.conn.RemoteAddr()," cannot find processor:",data[0])
return return
} }