mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-23 13:44:41 +08:00
优化rpc超时
This commit is contained in:
@@ -32,7 +32,7 @@ type IRealClient interface {
|
|||||||
|
|
||||||
AsyncCall(timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{},cancelable bool) (CancelRpc,error)
|
AsyncCall(timeout time.Duration,rpcHandler IRpcHandler, serviceMethod string, callback reflect.Value, args interface{}, replyParam interface{},cancelable bool) (CancelRpc,error)
|
||||||
Go(timeout time.Duration,rpcHandler IRpcHandler, noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call
|
Go(timeout time.Duration,rpcHandler IRpcHandler, noReply bool, serviceMethod string, args interface{}, reply interface{}) *Call
|
||||||
RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call
|
RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call
|
||||||
IsConnected() bool
|
IsConnected() bool
|
||||||
|
|
||||||
Run()
|
Run()
|
||||||
|
|||||||
@@ -70,15 +70,16 @@ func (lc *LClient) Go(timeout time.Duration,rpcHandler IRpcHandler,noReply bool,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func (rc *LClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceName string, rawArgs []byte, reply interface{}) *Call {
|
func (rc *LClient) RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceName string, rawArgs []byte, reply interface{}) *Call {
|
||||||
pLocalRpcServer := rpcHandler.GetRpcServer()()
|
pLocalRpcServer := rpcHandler.GetRpcServer()()
|
||||||
|
|
||||||
call := MakeCall()
|
|
||||||
call.ServiceMethod = serviceName
|
|
||||||
call.Reply = reply
|
|
||||||
|
|
||||||
//服务自我调用
|
//服务自我调用
|
||||||
if serviceName == rpcHandler.GetName() {
|
if serviceName == rpcHandler.GetName() {
|
||||||
|
call := MakeCall()
|
||||||
|
call.ServiceMethod = serviceName
|
||||||
|
call.Reply = reply
|
||||||
|
call.TimeOut = timeout
|
||||||
|
|
||||||
err := pLocalRpcServer.myselfRpcHandlerGo(rc.selfClient,serviceName, serviceName, rawArgs, requestHandlerNull,nil)
|
err := pLocalRpcServer.myselfRpcHandlerGo(rc.selfClient,serviceName, serviceName, rawArgs, requestHandlerNull,nil)
|
||||||
call.Err = err
|
call.Err = err
|
||||||
call.done <- call
|
call.done <- call
|
||||||
@@ -87,7 +88,7 @@ func (rc *LClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply
|
|||||||
}
|
}
|
||||||
|
|
||||||
//其他的rpcHandler的处理器
|
//其他的rpcHandler的处理器
|
||||||
return pLocalRpcServer.selfNodeRpcHandlerGo(DefaultRpcTimeout,processor,rc.selfClient, true, serviceName, rpcMethodId, serviceName, nil, nil, rawArgs)
|
return pLocalRpcServer.selfNodeRpcHandlerGo(timeout,processor,rc.selfClient, true, serviceName, rpcMethodId, serviceName, nil, nil, rawArgs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -52,14 +52,15 @@ func (rc *RClient) Go(timeout time.Duration,rpcHandler IRpcHandler,noReply bool,
|
|||||||
return call
|
return call
|
||||||
}
|
}
|
||||||
|
|
||||||
return rc.RawGo(rpcHandler,processor, noReply, 0, serviceMethod, InParam, reply)
|
return rc.RawGo(timeout,rpcHandler,processor, noReply, 0, serviceMethod, InParam, reply)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rc *RClient) RawGo(rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call {
|
func (rc *RClient) RawGo(timeout time.Duration,rpcHandler IRpcHandler,processor IRpcProcessor, noReply bool, rpcMethodId uint32, serviceMethod string, rawArgs []byte, reply interface{}) *Call {
|
||||||
call := MakeCall()
|
call := MakeCall()
|
||||||
call.ServiceMethod = serviceMethod
|
call.ServiceMethod = serviceMethod
|
||||||
call.Reply = reply
|
call.Reply = reply
|
||||||
call.Seq = rc.selfClient.generateSeq()
|
call.Seq = rc.selfClient.generateSeq()
|
||||||
|
call.TimeOut = timeout
|
||||||
|
|
||||||
request := MakeRpcRequest(processor, call.Seq, rpcMethodId, serviceMethod, noReply, rawArgs)
|
request := MakeRpcRequest(processor, call.Seq, rpcMethodId, serviceMethod, noReply, rawArgs)
|
||||||
bytes, err := processor.Marshal(request.RpcRequestData)
|
bytes, err := processor.Marshal(request.RpcRequestData)
|
||||||
|
|||||||
@@ -133,6 +133,8 @@ func (call *Call) Clear() *Call{
|
|||||||
call.connId = 0
|
call.connId = 0
|
||||||
call.callback = nil
|
call.callback = nil
|
||||||
call.rpcHandler = nil
|
call.rpcHandler = nil
|
||||||
|
call.TimeOut = 0
|
||||||
|
|
||||||
return call
|
return call
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -599,7 +599,7 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType, nodeId i
|
|||||||
//如果调用本结点服务
|
//如果调用本结点服务
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
//跨node调用
|
//跨node调用
|
||||||
pCall := handler.pClientList[i].RawGo(handler.rpcHandler,processor, true, rpcMethodId, serviceName, rawArgs, nil)
|
pCall := handler.pClientList[i].RawGo(DefaultRpcTimeout,handler.rpcHandler,processor, true, rpcMethodId, serviceName, rawArgs, nil)
|
||||||
if pCall.Err != nil {
|
if pCall.Err != nil {
|
||||||
err = pCall.Err
|
err = pCall.Err
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user