mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-03 22:45:13 +08:00
gc优化
This commit is contained in:
@@ -49,7 +49,7 @@ func (client *TCPClient) init() {
|
||||
log.Release("invalid ConnectInterval, reset to %v", client.ConnectInterval)
|
||||
}
|
||||
if client.PendingWriteNum <= 0 {
|
||||
client.PendingWriteNum = 100
|
||||
client.PendingWriteNum = 1000
|
||||
log.Release("invalid PendingWriteNum, reset to %v", client.PendingWriteNum)
|
||||
}
|
||||
if client.NewAgent == nil {
|
||||
|
||||
@@ -273,7 +273,7 @@ func (client *Client) Run(){
|
||||
|
||||
//1.解析head
|
||||
response := RpcResponse{}
|
||||
response.RpcResponseData =processor.MakeRpcResponse(0,nil,nil)
|
||||
response.RpcResponseData =processor.MakeRpcResponse(0,RpcError(""),nil)
|
||||
|
||||
err = processor.Unmarshal(bytes[1:], response.RpcResponseData)
|
||||
client.conn.ReleaseReadMsg(bytes)
|
||||
|
||||
@@ -60,7 +60,7 @@ func (jsonProcessor *JsonProcessor) MakeRpcRequest(seq uint64,serviceMethod stri
|
||||
return jsonRpcRequestData
|
||||
}
|
||||
|
||||
func (jsonProcessor *JsonProcessor) MakeRpcResponse(seq uint64,err *RpcError,reply []byte) IRpcResponseData {
|
||||
func (jsonProcessor *JsonProcessor) MakeRpcResponse(seq uint64,err RpcError,reply []byte) IRpcResponseData {
|
||||
jsonRpcResponseData := rpcJsonResponseDataPool.Get().(*JsonRpcResponseData)
|
||||
jsonRpcResponseData.Seq = seq
|
||||
jsonRpcResponseData.Err = err.Error()
|
||||
|
||||
@@ -88,11 +88,9 @@ func (slf *PBRpcRequestData) MakeRequest(seq uint64,serviceMethod string,noReply
|
||||
return slf
|
||||
}
|
||||
|
||||
func (slf *PBRpcResponseData) MakeRespone(seq uint64,err *RpcError,reply []byte) *PBRpcResponseData{
|
||||
func (slf *PBRpcResponseData) MakeRespone(seq uint64,err RpcError,reply []byte) *PBRpcResponseData{
|
||||
slf.Seq = proto.Uint64(seq)
|
||||
if err != nil {
|
||||
slf.Error = proto.String(err.Error())
|
||||
}
|
||||
slf.Reply = reply
|
||||
|
||||
return slf
|
||||
@@ -113,7 +111,7 @@ func (slf *PBProcessor) MakeRpcRequest(seq uint64,serviceMethod string,noReply b
|
||||
return pPbRpcRequestData
|
||||
}
|
||||
|
||||
func (slf *PBProcessor) MakeRpcResponse(seq uint64,err *RpcError,reply []byte) IRpcResponseData {
|
||||
func (slf *PBProcessor) MakeRpcResponse(seq uint64,err RpcError,reply []byte) IRpcResponseData {
|
||||
pPBRpcResponseData := rpcPbResponseDataPool.Get().(*PBRpcResponseData)
|
||||
pPBRpcResponseData.MakeRespone(seq,err,reply)
|
||||
return pPBRpcResponseData
|
||||
|
||||
@@ -4,7 +4,7 @@ type IRpcProcessor interface {
|
||||
Marshal(v interface{}) ([]byte, error) //b表示自定义缓冲区,可以填nil,由系统自动分配
|
||||
Unmarshal(data []byte, v interface{}) error
|
||||
MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte,additionParam interface{}) IRpcRequestData
|
||||
MakeRpcResponse(seq uint64,err *RpcError,reply []byte) IRpcResponseData
|
||||
MakeRpcResponse(seq uint64,err RpcError,reply []byte) IRpcResponseData
|
||||
|
||||
ReleaseRpcRequest(rpcRequestData IRpcRequestData)
|
||||
ReleaseRpcResponse(rpcRequestData IRpcResponseData)
|
||||
|
||||
@@ -55,7 +55,7 @@ type RpcHandleFinder interface {
|
||||
FindRpcHandler(serviceMethod string) IRpcHandler
|
||||
}
|
||||
|
||||
type RequestHandler func(Returns interface{},Err *RpcError)
|
||||
type RequestHandler func(Returns interface{},Err RpcError)
|
||||
type RawAdditionParamNull struct {
|
||||
}
|
||||
|
||||
|
||||
@@ -17,21 +17,17 @@ var NilError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem())
|
||||
|
||||
type RpcError string
|
||||
|
||||
func (e *RpcError) Error() string {
|
||||
func (e RpcError) Error() string {
|
||||
return string(e)
|
||||
}
|
||||
|
||||
func ConvertError(e error) RpcError{
|
||||
if e == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
return string(*e)
|
||||
}
|
||||
|
||||
func ConvertError(e error) *RpcError{
|
||||
if e == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
rpcErr := RpcError(e.Error())
|
||||
return &rpcErr
|
||||
return rpcErr
|
||||
}
|
||||
|
||||
func Errorf(format string, a ...interface{}) *RpcError {
|
||||
@@ -224,7 +220,7 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
|
||||
log.Error("Handler Rpc %s Core dump info:%+v\n",request.RpcRequestData.GetServiceMethod(),err)
|
||||
rpcErr := RpcError("call error : core dumps")
|
||||
if request.requestHandle!=nil {
|
||||
request.requestHandle(nil,&rpcErr)
|
||||
request.requestHandle(nil,rpcErr)
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -236,10 +232,10 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
|
||||
|
||||
v,ok := handler.mapFunctions[request.RpcRequestData.GetServiceMethod()]
|
||||
if ok == false {
|
||||
err := Errorf("RpcHandler %s cannot find %s", handler.rpcHandler.GetName(),request.RpcRequestData.GetServiceMethod())
|
||||
log.Error("%s",err.Error())
|
||||
err := "RpcHandler "+handler.rpcHandler.GetName()+"cannot find "+request.RpcRequestData.GetServiceMethod()
|
||||
log.Error(err)
|
||||
if request.requestHandle!=nil {
|
||||
request.requestHandle(nil,err)
|
||||
request.requestHandle(nil,RpcError(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -257,10 +253,10 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
|
||||
if request.bLocalRequest == false {
|
||||
err = request.rpcProcessor.Unmarshal(request.RpcRequestData.GetInParam(),iParam)
|
||||
if err!=nil {
|
||||
rerr := Errorf("Call Rpc %s Param error %+v",request.RpcRequestData.GetServiceMethod(),err)
|
||||
log.Error("%s",rerr.Error())
|
||||
rErr := "Call Rpc "+request.RpcRequestData.GetServiceMethod()+" Param error "+err.Error()
|
||||
log.Error(rErr)
|
||||
if request.requestHandle!=nil {
|
||||
request.requestHandle(nil, rerr)
|
||||
request.requestHandle(nil, RpcError(rErr))
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -268,10 +264,10 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
|
||||
if request.inputArgs!=nil {
|
||||
err = request.rpcProcessor.Unmarshal(request.inputArgs.GetRawData(),iParam)
|
||||
if err!=nil {
|
||||
rErr := Errorf("Call Rpc %s Param error %+v",request.RpcRequestData.GetServiceMethod(),err)
|
||||
log.Error("%s", rErr.Error())
|
||||
rErr := "Call Rpc "+request.RpcRequestData.GetServiceMethod()+" Param error "+err.Error()
|
||||
log.Error(rErr)
|
||||
if request.requestHandle!=nil {
|
||||
request.requestHandle(nil, rErr)
|
||||
request.requestHandle(nil, RpcError(rErr))
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -302,10 +298,10 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
|
||||
oParam = reflect.New(v.outParamValue.Type().Elem())
|
||||
}
|
||||
paramList = append(paramList,oParam) //输出参数
|
||||
}else if(request.requestHandle != nil){ //调用方有返回值,但被调用函数没有返回参数
|
||||
rErr := Errorf("Call Rpc %s without return parameter!",request.RpcRequestData.GetServiceMethod())
|
||||
log.Error("%s",rErr.Error())
|
||||
request.requestHandle(nil, rErr)
|
||||
}else if request.requestHandle != nil { //调用方有返回值,但被调用函数没有返回参数
|
||||
rErr := "Call Rpc "+request.RpcRequestData.GetServiceMethod()+"without return parameter!"
|
||||
log.Error(rErr)
|
||||
request.requestHandle(nil, RpcError(rErr))
|
||||
return
|
||||
}
|
||||
returnValues := v.method.Func.Call(paramList)
|
||||
|
||||
@@ -81,21 +81,18 @@ func (server *Server) Start(listenAddr string) {
|
||||
|
||||
func (agent *RpcAgent) OnDestroy() {}
|
||||
|
||||
func (agent *RpcAgent) WriteResponse(processor IRpcProcessor,serviceMethod string,seq uint64,reply interface{},err *RpcError) {
|
||||
func (agent *RpcAgent) WriteResponse(processor IRpcProcessor,serviceMethod string,seq uint64,reply interface{},err RpcError) {
|
||||
var mReply []byte
|
||||
var rpcError *RpcError
|
||||
var rpcError RpcError
|
||||
var errM error
|
||||
|
||||
if err != nil {
|
||||
rpcError = err
|
||||
} else {
|
||||
|
||||
if reply!=nil {
|
||||
mReply,errM = processor.Marshal(reply)
|
||||
if errM != nil {
|
||||
rpcError = ConvertError(errM)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var rpcResponse RpcResponse
|
||||
rpcResponse.RpcResponseData = processor.MakeRpcResponse(seq,rpcError,mReply)
|
||||
@@ -139,7 +136,7 @@ func (agent *RpcAgent) Run() {
|
||||
log.Error("rpc Unmarshal request is error: %v", err)
|
||||
if req.RpcRequestData.GetSeq()>0 {
|
||||
rpcError := RpcError(err.Error())
|
||||
agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
|
||||
agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError)
|
||||
processor.ReleaseRpcRequest(req.RpcRequestData)
|
||||
ReleaseRpcRequest(req)
|
||||
continue
|
||||
@@ -155,7 +152,7 @@ func (agent *RpcAgent) Run() {
|
||||
serviceMethod := strings.Split(req.RpcRequestData.GetServiceMethod(),".")
|
||||
if len(serviceMethod)!=2 {
|
||||
rpcError := RpcError("rpc request req.ServiceMethod is error")
|
||||
agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
|
||||
agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError)
|
||||
processor.ReleaseRpcRequest(req.RpcRequestData)
|
||||
ReleaseRpcRequest(req)
|
||||
log.Debug("rpc request req.ServiceMethod is error")
|
||||
@@ -165,7 +162,7 @@ func (agent *RpcAgent) Run() {
|
||||
rpcHandler := agent.rpcServer.rpcHandleFinder.FindRpcHandler(serviceMethod[0])
|
||||
if rpcHandler== nil {
|
||||
rpcError := RpcError(fmt.Sprintf("service method %s not config!", req.RpcRequestData.GetServiceMethod()))
|
||||
agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
|
||||
agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError)
|
||||
processor.ReleaseRpcRequest(req.RpcRequestData)
|
||||
ReleaseRpcRequest(req)
|
||||
log.Error("service method %s not config!", req.RpcRequestData.GetServiceMethod())
|
||||
@@ -173,7 +170,7 @@ func (agent *RpcAgent) Run() {
|
||||
}
|
||||
|
||||
if req.RpcRequestData.IsNoReply()==false {
|
||||
req.requestHandle = func(Returns interface{},Err *RpcError){
|
||||
req.requestHandle = func(Returns interface{},Err RpcError){
|
||||
agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),Returns,Err)
|
||||
}
|
||||
}
|
||||
@@ -183,7 +180,7 @@ func (agent *RpcAgent) Run() {
|
||||
rpcError := RpcError(err.Error())
|
||||
|
||||
if req.RpcRequestData.IsNoReply() {
|
||||
agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
|
||||
agent.WriteResponse(processor,req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,rpcError)
|
||||
}
|
||||
|
||||
processor.ReleaseRpcRequest(req.RpcRequestData)
|
||||
@@ -264,20 +261,14 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Clien
|
||||
req.rpcProcessor = processor
|
||||
if noReply == false {
|
||||
client.AddPending(pCall)
|
||||
req.requestHandle = func(Returns interface{},Err *RpcError){
|
||||
req.requestHandle = func(Returns interface{},Err RpcError){
|
||||
v := client.RemovePending(pCall.Seq)
|
||||
if v == nil {
|
||||
log.Error("rpcClient cannot find seq %d in pending",pCall.Seq)
|
||||
ReleaseCall(pCall)
|
||||
return
|
||||
}
|
||||
|
||||
if Err!=nil {
|
||||
pCall.Err = Err
|
||||
}else{
|
||||
pCall.Err = nil
|
||||
}
|
||||
|
||||
pCall.done <- pCall
|
||||
}
|
||||
}
|
||||
@@ -316,7 +307,7 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler
|
||||
req.RpcRequestData = processor.MakeRpcRequest(0,serviceMethod,noReply,nil,nil)
|
||||
if noReply == false {
|
||||
client.AddPending(pCall)
|
||||
req.requestHandle = func(Returns interface{},Err *RpcError){
|
||||
req.requestHandle = func(Returns interface{},Err RpcError){
|
||||
v := client.RemovePending(pCall.Seq)
|
||||
if v == nil {
|
||||
log.Error("rpcClient cannot find seq %d in pending",pCall.Seq)
|
||||
@@ -326,11 +317,8 @@ func (server *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler
|
||||
return
|
||||
}
|
||||
|
||||
if Err == nil {
|
||||
pCall.Err = nil
|
||||
}else{
|
||||
pCall.Err = Err
|
||||
}
|
||||
|
||||
|
||||
if Returns!=nil {
|
||||
pCall.Reply = Returns
|
||||
|
||||
Reference in New Issue
Block a user