mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-03 22:45:13 +08:00
优化rpc垃圾回收
This commit is contained in:
@@ -42,7 +42,9 @@ func Errorf(format string, a ...interface{}) *RpcError {
|
||||
type RpcMethodInfo struct {
|
||||
method reflect.Method
|
||||
iparam reflect.Value
|
||||
iInParam interface{}
|
||||
oParam reflect.Value
|
||||
iOutParam interface{}
|
||||
additionParam reflect.Value
|
||||
//addition *IRawAdditionParam
|
||||
hashAdditionParam bool
|
||||
@@ -79,6 +81,7 @@ type IRpcHandler interface {
|
||||
GoNode(nodeId int,serviceMethod string,args interface{}) error
|
||||
RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,serviceMethod string,args []byte,additionParam interface{}) error
|
||||
RawCastGo(rpcProcessorType RpcProcessorType,serviceMethod string,args []byte,additionParam interface{})
|
||||
IsSingleCoroutine() bool
|
||||
}
|
||||
|
||||
var rawAdditionParamValueNull reflect.Value
|
||||
@@ -153,6 +156,7 @@ func (slf *RpcHandler) suitableMethods(method reflect.Method) error {
|
||||
}
|
||||
|
||||
rpcMethodInfo.iparam = reflect.New(typ.In(parIdx).Elem()) //append(rpcMethodInfo.iparam,)
|
||||
rpcMethodInfo.iInParam = reflect.New(typ.In(parIdx).Elem()).Interface()
|
||||
pt,_ := GetProcessorType(rpcMethodInfo.iparam.Interface())
|
||||
rpcMethodInfo.rpcProcessorType = pt
|
||||
|
||||
@@ -243,9 +247,16 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
|
||||
|
||||
var paramList []reflect.Value
|
||||
var err error
|
||||
iparam := reflect.New(v.iparam.Type().Elem()).Interface()
|
||||
var iParam interface{}
|
||||
//单协程下减少gc
|
||||
if slf.IsSingleCoroutine(){
|
||||
iParam = v.iInParam
|
||||
}else{
|
||||
iParam = reflect.New(v.iparam.Type().Elem()).Interface()
|
||||
}
|
||||
|
||||
if request.bLocalRequest == false {
|
||||
err = request.rpcProcessor.Unmarshal(request.RpcRequestData.GetInParam(),iparam)
|
||||
err = request.rpcProcessor.Unmarshal(request.RpcRequestData.GetInParam(),iParam)
|
||||
if err!=nil {
|
||||
rerr := Errorf("Call Rpc %s Param error %+v",request.RpcRequestData.GetServiceMethod(),err)
|
||||
log.Error("%s",rerr.Error())
|
||||
@@ -256,7 +267,7 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
|
||||
}
|
||||
}else {
|
||||
if request.localRawParam!=nil {
|
||||
err = request.rpcProcessor.Unmarshal(request.localRawParam,iparam)
|
||||
err = request.rpcProcessor.Unmarshal(request.localRawParam,iParam)
|
||||
if err!=nil {
|
||||
rerr := Errorf("Call Rpc %s Param error %+v",request.RpcRequestData.GetServiceMethod(),err)
|
||||
log.Error("%s",rerr.Error())
|
||||
@@ -266,7 +277,7 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
|
||||
return
|
||||
}
|
||||
}else {
|
||||
iparam = request.localParam
|
||||
iParam = request.localParam
|
||||
}
|
||||
}
|
||||
|
||||
@@ -281,11 +292,13 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
|
||||
}
|
||||
}
|
||||
|
||||
paramList = append(paramList,reflect.ValueOf(iparam))
|
||||
paramList = append(paramList,reflect.ValueOf(iParam))
|
||||
var oParam reflect.Value
|
||||
if v.oParam.IsValid() {
|
||||
if request.localReply!=nil {
|
||||
oParam = reflect.ValueOf(request.localReply) //输出参数
|
||||
}else if slf.IsSingleCoroutine()==true{
|
||||
oParam = v.oParam
|
||||
}else{
|
||||
oParam = reflect.New(v.oParam.Type().Elem())
|
||||
}
|
||||
@@ -573,11 +586,10 @@ func (slf *RpcHandler) GetName() string{
|
||||
return slf.rpcHandler.GetName()
|
||||
}
|
||||
|
||||
func (slf *RpcHandler) IsSingleCoroutine() bool{
|
||||
return slf.rpcHandler.IsSingleCoroutine()
|
||||
}
|
||||
|
||||
//func (slf *RpcHandler) asyncCallRpc(serviceMethod string,mutiCoroutine bool,callback interface{},args ...interface{}) error {
|
||||
//func (slf *RpcHandler) callRpc(serviceMethod string,reply interface{},mutiCoroutine bool,args ...interface{}) error {
|
||||
//func (slf *RpcHandler) goRpc(serviceMethod string,mutiCoroutine bool,args ...interface{}) error {
|
||||
//(reply *int,err error) {}
|
||||
func (slf *RpcHandler) AsyncCall(serviceMethod string,args interface{},callback interface{}) error {
|
||||
return slf.asyncCallRpc(0,serviceMethod,args,callback)
|
||||
}
|
||||
@@ -586,7 +598,6 @@ func (slf *RpcHandler) Call(serviceMethod string,args interface{},reply interfac
|
||||
return slf.callRpc(0,serviceMethod,args,reply)
|
||||
}
|
||||
|
||||
|
||||
func (slf *RpcHandler) Go(serviceMethod string,args interface{}) error {
|
||||
return slf.goRpc(nil,false,0,serviceMethod,args)
|
||||
}
|
||||
|
||||
@@ -210,4 +210,7 @@ func (slf *Service) RegEventReciverFunc(eventType event.EventType,reciver event.
|
||||
|
||||
func (slf *Service) UnRegEventReciverFun(eventType event.EventType,reciver event.IEventHandler){
|
||||
slf.eventProcessor.UnRegEventReciverFun(eventType,reciver)
|
||||
}
|
||||
func (slf *Service) IsSingleCoroutine() bool {
|
||||
return slf.gorouterNum == 1
|
||||
}
|
||||
Reference in New Issue
Block a user