diff --git a/rpc/client.go b/rpc/client.go index 6acf2eb..e54f05f 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -2,13 +2,14 @@ package rpc import ( "container/list" - "fmt" + "errors" "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/network" "github.com/duanhf2012/origin/util/timer" "math" "reflect" "runtime" + "strconv" "sync" "sync/atomic" "time" @@ -100,7 +101,8 @@ func (client *Client) checkRpcCallTimeout(){ } pCall := pElem.Value.(*Call) if now.Sub(pCall.callTime) > client.callRpcTimeout { - pCall.Err = fmt.Errorf("RPC call takes more than %d seconds!", client.callRpcTimeout/time.Second) + strTimeout := strconv.FormatInt(int64(client.callRpcTimeout/time.Second), 10) + pCall.Err = errors.New("RPC call takes more than "+strTimeout+ " seconds") client.makeCallFail(pCall) client.pendingLock.Unlock() continue @@ -113,7 +115,7 @@ func (client *Client) ResetPending(){ client.pendingLock.Lock() if client.pending != nil { for _,v := range client.pending { - v.Value.(*Call).Err = fmt.Errorf("node is disconnect.") + v.Value.(*Call).Err = errors.New("node is disconnect") v.Value.(*Call).done <- v.Value.(*Call) } } @@ -186,7 +188,7 @@ func (client *Client) AsyncCall(rpcHandler IRpcHandler,serviceMethod string,call } if client.conn == nil { - return fmt.Errorf("Rpc server is disconnect,call %s is fail!",serviceMethod) + return errors.New("Rpc server is disconnect,call "+serviceMethod) } call := MakeCall() @@ -224,7 +226,7 @@ func (client *Client) RawGo(processor IRpcProcessor,noReply bool,rpcMethodId uin if client.conn == nil { call.Seq = 0 - call.Err = fmt.Errorf("call %s is fail,rpc client is disconnect.",serviceMethod) + call.Err = errors.New(serviceMethod+" was called failed,rpc client is disconnect") return call } @@ -271,7 +273,7 @@ func (client *Client) Run(){ return } - processor := GetProcessor(uint8(bytes[0])) + processor := GetProcessor(bytes[0]) if processor==nil { client.conn.ReleaseReadMsg(bytes) log.SError("rpcClient ",client.Addr," ReadMsg head error:",err.Error()) @@ -280,7 +282,7 @@ func (client *Client) Run(){ //1.解析head response := RpcResponse{} - response.RpcResponseData =processor.MakeRpcResponse(0,RpcError(""),nil) + response.RpcResponseData =processor.MakeRpcResponse(0,"",nil) err = processor.Unmarshal(bytes[1:], response.RpcResponseData) client.conn.ReleaseReadMsg(bytes) diff --git a/rpc/gogopbprocessor.go b/rpc/gogopbprocessor.go index 40bf39f..2c2892f 100644 --- a/rpc/gogopbprocessor.go +++ b/rpc/gogopbprocessor.go @@ -81,7 +81,9 @@ func (slf *GoGoPBRpcResponseData) GetErr() *RpcError { if slf.GetError() == "" { return nil } - return Errorf(slf.GetError()) + + err := RpcError(slf.GetError()) + return &err } diff --git a/rpc/jsonprocessor.go b/rpc/jsonprocessor.go index fe48f1a..01bf6c1 100644 --- a/rpc/jsonprocessor.go +++ b/rpc/jsonprocessor.go @@ -1,8 +1,8 @@ package rpc import ( - jsoniter "github.com/json-iterator/go" "github.com/duanhf2012/origin/util/sync" + jsoniter "github.com/json-iterator/go" ) var json = jsoniter.ConfigCompatibleWithStandardLibrary @@ -110,7 +110,8 @@ func (jsonRpcResponseData *JsonRpcResponseData) GetErr() *RpcError { return nil } - return Errorf(jsonRpcResponseData.Err) + err := RpcError(jsonRpcResponseData.Err) + return &err } func (jsonRpcResponseData *JsonRpcResponseData) GetReply() []byte{ diff --git a/rpc/rpc.go b/rpc/rpc.go index cf911f1..1b8252a 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -1,8 +1,8 @@ package rpc import ( - "reflect" "github.com/duanhf2012/origin/util/sync" + "reflect" "time" ) diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index 7cad748..c817f2c 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -14,7 +14,7 @@ import ( const maxClusterNode int = 128 type FuncRpcClient func(nodeId int,serviceMethod string,client []*Client) (error,int) -type FuncRpcServer func() (*Server) +type FuncRpcServer func() *Server var nilError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()) type RpcError string @@ -32,12 +32,6 @@ func ConvertError(e error) RpcError{ return rpcErr } -func Errorf(format string, a ...interface{}) *RpcError { - rpcErr := RpcError(fmt.Sprintf(format,a...)) - - return &rpcErr -} - type RpcMethodInfo struct { method reflect.Method inParamValue reflect.Value @@ -52,7 +46,6 @@ type RawRpcCallBack interface { CB(data interface{}) } -//type RawRpcCallBack func(rawData []byte) type RpcHandler struct { callRequest chan *RpcRequest rpcHandler IRpcHandler @@ -89,7 +82,7 @@ type IRpcHandler interface { CallNode(nodeId int,serviceMethod string,args interface{},reply interface{}) error GoNode(nodeId int,serviceMethod string,args interface{}) error RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,rpcMethodId uint32,serviceName string,rawArgs IRawInputArgs) error - CastGo(serviceMethod string,args interface{}) + CastGo(serviceMethod string,args interface{})error IsSingleCoroutine() bool UnmarshalInParam(rpcProcessor IRpcProcessor,serviceMethod string,rawRpcMethodId uint32,inParam []byte) (interface{},error) } @@ -145,19 +138,19 @@ func (handler *RpcHandler) suitableMethods(method reflect.Method) error { var rpcMethodInfo RpcMethodInfo typ := method.Type if typ.NumOut() != 1 { - return fmt.Errorf("%s The number of returned arguments must be 1!",method.Name) + return fmt.Errorf("%s The number of returned arguments must be 1",method.Name) } if typ.Out(0).String() != "error" { - return fmt.Errorf("%s The return parameter must be of type error!",method.Name) + return fmt.Errorf("%s The return parameter must be of type error",method.Name) } if typ.NumIn() <2 || typ.NumIn() > 4 { - return fmt.Errorf("%s Unsupported parameter format!",method.Name) + return fmt.Errorf("%s Unsupported parameter format",method.Name) } //1.判断第一个参数 - var parIdx int = 1 + var parIdx = 1 if typ.In(parIdx).String() == "rpc.RequestHandler" { parIdx += 1 rpcMethodInfo.hasResponder = true @@ -165,11 +158,11 @@ func (handler *RpcHandler) suitableMethods(method reflect.Method) error { for i:= parIdx ;i= cap(handler.callRequest){ - return fmt.Errorf("RpcHandler %s Rpc Channel is full.", handler.GetName()) + return fmt.Errorf("RpcHandler %s Rpc Channel is full", handler.GetName()) } handler.callRequest <- req return nil } -func (handler *RpcHandler) GetRpcRequestChan() (chan *RpcRequest) { +func (handler *RpcHandler) GetRpcRequestChan() chan *RpcRequest { return handler.callRequest } @@ -345,10 +338,10 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,s } if count > 1 && bCast == false{ log.SError("Cannot call %s more then 1 node!",serviceMethod) - return fmt.Errorf("Cannot call more then 1 node!") + return errors.New("cannot call more then 1 node") } - //2.rpcclient调用 + //2.rpcClient调用 //如果调用本结点服务 for i:=0;i 1 { log.SError("Cannot call more then 1 node!") - return errors.New("Cannot call more then 1 node!") + return errors.New("cannot call more then 1 node") } - //2.rpcclient调用 + //2.rpcClient调用 //如果调用本结点服务 pClient := pClientList[0] if pClient.bSelfNode == true { @@ -443,7 +435,7 @@ func (handler *RpcHandler) callRpc(nodeId int,serviceMethod string,args interfac return err } -func (handler *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args interface{},callback interface{}) error { +func (handler *RpcHandler) asyncCallRpc(nodeId int,serviceMethod string,args interface{},callback interface{}) error { fVal := reflect.ValueOf(callback) if fVal.Kind()!=reflect.Func{ err := errors.New("call "+serviceMethod+" input callback param is error!") @@ -465,11 +457,11 @@ func (handler *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args int reply := reflect.New(fVal.Type().In(0).Elem()).Interface() var pClientList [maxClusterNode]*Client - err,count := handler.funcRpcClient(nodeid,serviceMethod,pClientList[:]) + err,count := handler.funcRpcClient(nodeId,serviceMethod,pClientList[:]) if count==0||err != nil { - strNodeId := strconv.Itoa(nodeid) + strNodeId := strconv.Itoa(nodeId) if err == nil { - err = errors.New("cannot find rpcclient from nodeid "+strNodeId+" "+serviceMethod) + err = errors.New("cannot find rpcClient from nodeId "+strNodeId+" "+serviceMethod) } fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) log.SError("Call serviceMethod is error:%+v!",err.Error()) @@ -477,13 +469,13 @@ func (handler *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args int } if count > 1 { - err := errors.New("Cannot call more then 1 node!") + err := errors.New("cannot call more then 1 node") fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) log.SError(err.Error()) return nil } - //2.rpcclient调用 + //2.rpcClient调用 //如果调用本结点服务 pClient := pClientList[0] if pClient.bSelfNode == true { @@ -555,8 +547,8 @@ func (handler *RpcHandler) GoNode(nodeId int,serviceMethod string,args interface return handler.goRpc(nil,false,nodeId,serviceMethod,args) } -func (handler *RpcHandler) CastGo(serviceMethod string,args interface{}) { - handler.goRpc(nil,true,0,serviceMethod,args) +func (handler *RpcHandler) CastGo(serviceMethod string,args interface{}) error{ + return handler.goRpc(nil,true,0,serviceMethod,args) } func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,rpcMethodId uint32,serviceName string,rawArgs IRawInputArgs) error { @@ -570,12 +562,12 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId in } if count > 1 { //args.DoGc() - err := errors.New("Cannot call more then 1 node!") + err := errors.New("cannot call more then 1 node") log.SError(err.Error()) return err } - //2.rpcclient调用 + //2.rpcClient调用 //如果调用本结点服务 for i:=0;i