rpc临时内存池优化

This commit is contained in:
boyce
2020-07-09 20:01:45 +08:00
parent 825c61e626
commit 323e5313fb
8 changed files with 234 additions and 143 deletions

View File

@@ -13,46 +13,19 @@ import (
var processor IRpcProcessor = &JsonProcessor{}
var LittleEndian bool
type Call struct {
Seq uint64
ServiceMethod string
Arg interface{}
Reply interface{}
Respone *RpcResponse
Err error
done chan *Call // Strobes when call is complete.
connid int
callback *reflect.Value
rpcHandler IRpcHandler
}
func (slf *Call) Done() *Call{
return <-slf.done
}
type Server struct {
functions map[interface{}]interface{}
cmdchannel chan *Call
rpcHandleFinder RpcHandleFinder
rpcserver *network.TCPServer
}
type RpcHandleFinder interface {
FindRpcHandler(serviceMethod string) IRpcHandler
}
func SetProcessor(proc IRpcProcessor) {
processor = proc
}
func (slf *Server) Init(rpcHandleFinder RpcHandleFinder) {
slf.cmdchannel = make(chan *Call,10000)
slf.cmdchannel = make(chan *Call,100000)
slf.rpcHandleFinder = rpcHandleFinder
slf.rpcserver = &network.TCPServer{}
}
@@ -84,13 +57,10 @@ type RpcAgent struct {
func (agent *RpcAgent) WriteRespone(serviceMethod string,seq uint64,reply interface{},err *RpcError) {
var rpcRespone RpcResponse
//rpcRespone.Seq = seq
//rpcRespone.Err = err
var mReply []byte
var rpcError *RpcError
var errM error
if err != nil {
rpcError = err
} else {
@@ -102,13 +72,15 @@ func (agent *RpcAgent) WriteRespone(serviceMethod string,seq uint64,reply interf
}
}
rpcRespone.RpcResponeData = processor.MakeRpcResponse(seq,rpcError,mReply)
bytes,errM := processor.Marshal(rpcRespone.RpcResponeData)
var rpcResponse RpcResponse
rpcResponse.RpcResponeData = processor.MakeRpcResponse(seq,rpcError,mReply)
bytes,errM := processor.Marshal(rpcResponse.RpcResponeData)
if errM != nil {
log.Error("service method %s %+v Marshal error:%+v!", serviceMethod,rpcRespone,errM)
log.Error("service method %s %+v Marshal error:%+v!", serviceMethod,rpcResponse,errM)
return
}
processor.ReleaseRpcRespose(rpcResponse.RpcResponeData)
errM = agent.conn.WriteMsg(bytes)
if errM != nil {
log.Error("Rpc %s return is error:%+v",serviceMethod,errM)
@@ -126,17 +98,21 @@ func (agent *RpcAgent) Run() {
}
//解析head
var req RpcRequest
req := MakeRpcRequest()
req.RpcRequestData = processor.MakeRpcRequest(0,"",false,nil)
err = processor.Unmarshal(data,req.RpcRequestData)
if err != nil {
if req.RpcRequestData.GetSeq()>0 {
rpcError := RpcError("rpc Unmarshal request is error")
agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
processor.ReleaseRpcRequest(req.RpcRequestData)
ReleaseRpcRequest(req)
continue
}else{
log.Error("rpc Unmarshal request is error: %v", err)
//will close tcpconn
processor.ReleaseRpcRequest(req.RpcRequestData)
ReleaseRpcRequest(req)
break
}
}
@@ -146,6 +122,8 @@ func (agent *RpcAgent) Run() {
if len(serviceMethod)!=2 {
rpcError := RpcError("rpc request req.ServiceMethod is error")
agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
processor.ReleaseRpcRequest(req.RpcRequestData)
ReleaseRpcRequest(req)
log.Debug("rpc request req.ServiceMethod is error")
continue
}
@@ -154,6 +132,8 @@ func (agent *RpcAgent) Run() {
if rpcHandler== nil {
rpcError := RpcError(fmt.Sprintf("service method %s not config!", req.RpcRequestData.GetServiceMethod()))
agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
processor.ReleaseRpcRequest(req.RpcRequestData)
ReleaseRpcRequest(req)
log.Error("service method %s not config!", req.RpcRequestData.GetServiceMethod())
continue
}
@@ -164,7 +144,13 @@ func (agent *RpcAgent) Run() {
}
}
rpcHandler.PushRequest(&req)
err = rpcHandler.PushRequest(req)
if err != nil {
rpcError := RpcError(err.Error())
agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
processor.ReleaseRpcRequest(req.RpcRequestData)
ReleaseRpcRequest(req)
}
}
}
@@ -210,8 +196,8 @@ func (slf *Server) myselfRpcHandlerGo(handlerName string,methodName string, args
func (slf *Server) rpcHandlerGo(noReply bool,handlerName string,methodName string, args interface{},reply interface{}) *Call {
pCall := &Call{}
pCall.done = make( chan *Call,1)
pCall := MakeCall()//&Call{}
//pCall.done = make( chan *Call,1)
rpcHandler := slf.rpcHandleFinder.FindRpcHandler(handlerName)
if rpcHandler== nil {
pCall.Err = fmt.Errorf("service method %s.%s not config!", handlerName,methodName)
@@ -219,12 +205,10 @@ func (slf *Server) rpcHandlerGo(noReply bool,handlerName string,methodName strin
pCall.done <- pCall
return pCall
}
var req RpcRequest
req := MakeRpcRequest()
//req.ServiceMethod = fmt.Sprintf("%s.%s",handlerName,methodName)
req.localParam = args
req.localReply = reply
//req.NoReply = noReply
req.RpcRequestData = processor.MakeRpcRequest(0,fmt.Sprintf("%s.%s",handlerName,methodName),noReply,nil)
if noReply == false {
req.requestHandle = func(Returns interface{},Err *RpcError){
@@ -238,15 +222,19 @@ func (slf *Server) rpcHandlerGo(noReply bool,handlerName string,methodName strin
}
}
rpcHandler.PushRequest(&req)
err := rpcHandler.PushRequest(req)
if err != nil {
processor.ReleaseRpcRequest(req.RpcRequestData)
ReleaseRpcRequest(req)
pCall.Err = err
pCall.done <- pCall
}
return pCall
}
func (slf *Server) rpcHandlerAsyncGo(callerRpcHandler IRpcHandler,noReply bool,handlerName string,methodName string,args interface{},reply interface{},callback reflect.Value) error {
pCall := &Call{}
//pCall.done = make( chan *Call,1)
pCall := MakeCall()
pCall.rpcHandler = callerRpcHandler
pCall.callback = &callback
pCall.Reply = reply
@@ -257,11 +245,9 @@ func (slf *Server) rpcHandlerAsyncGo(callerRpcHandler IRpcHandler,noReply bool,h
return err
}
var req RpcRequest
//req.ServiceMethod = fmt.Sprintf("%s.%s",handlerName,methodName)
req := MakeRpcRequest()
req.localParam = args
req.localReply = reply
//req.NoReply = noReply
req.RpcRequestData = processor.MakeRpcRequest(0,fmt.Sprintf("%s.%s",handlerName,methodName),noReply,nil)
if noReply == false {
req.requestHandle = func(Returns interface{},Err *RpcError){
@@ -279,8 +265,11 @@ func (slf *Server) rpcHandlerAsyncGo(callerRpcHandler IRpcHandler,noReply bool,h
}
err := rpcHandler.PushRequest(&req)
err := rpcHandler.PushRequest(req)
if err != nil {
processor.ReleaseRpcRequest(req.RpcRequestData)
ReleaseRpcRequest(req)
return err
}