From 7b0072bbfff31852e3809d3635ddf7600839b2ad Mon Sep 17 00:00:00 2001 From: boyce Date: Sat, 11 Jul 2020 15:44:25 +0800 Subject: [PATCH] =?UTF-8?q?1.=E5=88=A0=E9=99=A4msgp=E5=BA=8F=E5=88=97?= =?UTF-8?q?=E5=8C=96=E6=96=B9=E5=BC=8F=202.=E4=BC=98=E5=8C=96=E4=B8=B4?= =?UTF-8?q?=E6=97=B6=E5=86=85=E5=AD=98=E6=B1=A0=E5=87=8F=E5=B0=91GC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpc/client.go | 20 +- rpc/jsonprocessor.go | 5 +- rpc/msgpprocessor.go | 124 ------------ rpc/msgpprocessor_gen.go | 421 --------------------------------------- rpc/pbprocessor.go | 4 +- rpc/probufprocessor.go | 1 - rpc/rpc.go | 27 ++- rpc/server.go | 10 +- 8 files changed, 42 insertions(+), 570 deletions(-) delete mode 100644 rpc/msgpprocessor.go delete mode 100644 rpc/msgpprocessor_gen.go delete mode 100644 rpc/probufprocessor.go diff --git a/rpc/client.go b/rpc/client.go index 98c6af0..7dc884c 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -97,7 +97,6 @@ func (slf *Client) checkRpcCallTimerout(){ } slf.pendingLock.Unlock() } - } func (slf *Client) ResetPending(){ @@ -122,19 +121,21 @@ func (slf *Client) AddPending(call *Call){ slf.pendingLock.Unlock() } -func (slf *Client) RemovePending(seq uint64){ +func (slf *Client) RemovePending(seq uint64) *Call{ slf.pendingLock.Lock() - slf.removePending(seq) + call := slf.removePending(seq) slf.pendingLock.Unlock() + return call } -func (slf *Client) removePending(seq uint64){ +func (slf *Client) removePending(seq uint64) *Call{ v,ok := slf.pending[seq] if ok == false{ - return + return nil } slf.pendingTimer.Remove(v) delete(slf.pending,seq) + return v.Value.(*Call) } @@ -258,20 +259,19 @@ func (slf *Client) Run(){ //1.解析head respone := &RpcResponse{} respone.RpcResponeData =processor.MakeRpcResponse(0,nil,nil) - defer processor.ReleaseRpcRespose(respone.RpcResponeData) + err = processor.Unmarshal(bytes,respone.RpcResponeData) if err != nil { + processor.ReleaseRpcRespose(respone.RpcResponeData) log.Error("rpcClient Unmarshal head error,error:%+v",err) continue } - v := slf.FindPending(respone.RpcResponeData.GetSeq()) + v := slf.RemovePending(respone.RpcResponeData.GetSeq()) if v == nil { log.Error("rpcClient cannot find seq %d in pending",respone.RpcResponeData.GetSeq()) }else { - slf.RemovePending(respone.RpcResponeData.GetSeq()) v.Err = nil - if len(respone.RpcResponeData.GetReply()) >0 { err = processor.Unmarshal(respone.RpcResponeData.GetReply(),v.Reply) if err != nil { @@ -290,6 +290,8 @@ func (slf *Client) Run(){ v.done <- v } } + + processor.ReleaseRpcRespose(respone.RpcResponeData) } } diff --git a/rpc/jsonprocessor.go b/rpc/jsonprocessor.go index 9967454..107964e 100644 --- a/rpc/jsonprocessor.go +++ b/rpc/jsonprocessor.go @@ -17,6 +17,7 @@ type JsonRpcRequestData struct { InParam []byte } + type JsonRpcResponseData struct { //head Seq uint64 // sequence number chosen by client @@ -27,7 +28,6 @@ type JsonRpcResponseData struct { } - var rpcJsonResponeDataPool sync.Pool var rpcJsonRequestDataPool sync.Pool @@ -51,13 +51,14 @@ func (slf *JsonProcessor) Unmarshal(data []byte, v interface{}) error{ return json.Unmarshal(data,v) } + + func (slf *JsonProcessor) MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData{ jsonRpcRequestData := rpcJsonRequestDataPool.Get().(*JsonRpcRequestData) jsonRpcRequestData.Seq = seq jsonRpcRequestData.ServiceMethod = serviceMethod jsonRpcRequestData.NoReply = noReply jsonRpcRequestData.InParam = inParam - return jsonRpcRequestData } diff --git a/rpc/msgpprocessor.go b/rpc/msgpprocessor.go deleted file mode 100644 index 8a52000..0000000 --- a/rpc/msgpprocessor.go +++ /dev/null @@ -1,124 +0,0 @@ -package rpc - -import "sync" - -type IMsgp interface { - UnmarshalMsg(bts []byte) (o []byte, err error) - MarshalMsg(b []byte) (o []byte, err error) -} - -var rpcResponeDataPool sync.Pool -var rpcRequestDataPool sync.Pool - -type MsgpProcessor struct { - -} - -func init(){ - rpcResponeDataPool.New = func()interface{}{ - return &MsgpRpcResponseData{} - } - - rpcRequestDataPool.New = func()interface{}{ - return &MsgpRpcRequestData{} - } -} - -//go:generate msgp -type MsgpRpcRequestData struct { - //packhead - Seq uint64 // sequence number chosen by client - ServiceMethod string // format: "Service.Method" - NoReply bool //是否需要返回 - //packbody - InParam []byte -} - -//go:generate msgp -type MsgpRpcResponseData struct { - //head - Seq uint64 // sequence number chosen by client - Err string - - //returns - Reply []byte -} - - -func (slf *MsgpProcessor) Marshal(v interface{}) ([]byte, error){ - msgp := v.(IMsgp) - - return msgp.MarshalMsg(nil) -} - -func (slf *MsgpProcessor) Unmarshal(data []byte, v interface{}) error{ - msgp := v.(IMsgp) - _,err := msgp.UnmarshalMsg(data) - return err -} - -func (slf *MsgpProcessor) MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData{ - rpcRequestData := rpcRequestDataPool.Get().(*MsgpRpcRequestData) - rpcRequestData.Seq = seq - rpcRequestData.ServiceMethod = serviceMethod - rpcRequestData.NoReply = noReply - rpcRequestData.InParam = inParam - - return rpcRequestData//&MsgpRpcRequestData{Seq:seq,ServiceMethod:serviceMethod,NoReply:noReply,InParam:inParam} -} - -func (slf *MsgpProcessor) MakeRpcResponse(seq uint64,err *RpcError,reply []byte) IRpcResponseData { - rpcRequestData := rpcResponeDataPool.Get().(*MsgpRpcResponseData) - rpcRequestData.Seq = seq - rpcRequestData.Err = err.Error() - rpcRequestData.Reply = reply - - return rpcRequestData -} - -func (slf *MsgpProcessor) ReleaseRpcRequest(rpcRequestData IRpcRequestData){ - rpcRequestDataPool.Put(rpcRequestData) -} - -func (slf *MsgpProcessor) ReleaseRpcRespose(rpcRequestData IRpcResponseData){ - rpcResponeDataPool.Put(rpcRequestData) -} - -func (slf *MsgpRpcRequestData) IsNoReply() bool{ - return slf.NoReply -} - -func (slf *MsgpRpcRequestData) GetSeq() uint64{ - return slf.Seq -} - -func (slf *MsgpRpcRequestData) GetServiceMethod() string{ - return slf.ServiceMethod -} - -func (slf *MsgpRpcRequestData) GetInParam() []byte{ - return slf.InParam -} - -func (slf *MsgpRpcResponseData) GetSeq() uint64 { - return slf.Seq -} - -func (slf *MsgpRpcResponseData) GetErr() *RpcError { - if slf.Err == ""{ - return nil - } - - return Errorf(slf.Err) -} - - -func (slf *MsgpRpcResponseData) GetReply() []byte{ - return slf.Reply -} - - - - - - diff --git a/rpc/msgpprocessor_gen.go b/rpc/msgpprocessor_gen.go deleted file mode 100644 index a2a1b15..0000000 --- a/rpc/msgpprocessor_gen.go +++ /dev/null @@ -1,421 +0,0 @@ -package rpc - -// Code generated by github.com/tinylib/msgp DO NOT EDIT. - -import ( - "github.com/tinylib/msgp/msgp" -) - -// DecodeMsg implements msgp.Decodable -func (z *MsgpProcessor) DecodeMsg(dc *msgp.Reader) (err error) { - var field []byte - _ = field - var zb0001 uint32 - zb0001, err = dc.ReadMapHeader() - if err != nil { - err = msgp.WrapError(err) - return - } - for zb0001 > 0 { - zb0001-- - field, err = dc.ReadMapKeyPtr() - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - default: - err = dc.Skip() - if err != nil { - err = msgp.WrapError(err) - return - } - } - } - return -} - -// EncodeMsg implements msgp.Encodable -func (z MsgpProcessor) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 0 - err = en.Append(0x80) - if err != nil { - return - } - return -} - -// MarshalMsg implements msgp.Marshaler -func (z MsgpProcessor) MarshalMsg(b []byte) (o []byte, err error) { - o = msgp.Require(b, z.Msgsize()) - // map header, size 0 - o = append(o, 0x80) - return -} - -// UnmarshalMsg implements msgp.Unmarshaler -func (z *MsgpProcessor) UnmarshalMsg(bts []byte) (o []byte, err error) { - var field []byte - _ = field - var zb0001 uint32 - zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - for zb0001 > 0 { - zb0001-- - field, bts, err = msgp.ReadMapKeyZC(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - default: - bts, err = msgp.Skip(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - } - } - o = bts - return -} - -// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z MsgpProcessor) Msgsize() (s int) { - s = 1 - return -} - -// DecodeMsg implements msgp.Decodable -func (z *MsgpRpcRequestData) DecodeMsg(dc *msgp.Reader) (err error) { - var field []byte - _ = field - var zb0001 uint32 - zb0001, err = dc.ReadMapHeader() - if err != nil { - err = msgp.WrapError(err) - return - } - for zb0001 > 0 { - zb0001-- - field, err = dc.ReadMapKeyPtr() - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - case "Seq": - z.Seq, err = dc.ReadUint64() - if err != nil { - err = msgp.WrapError(err, "Seq") - return - } - case "ServiceMethod": - z.ServiceMethod, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "ServiceMethod") - return - } - case "NoReply": - z.NoReply, err = dc.ReadBool() - if err != nil { - err = msgp.WrapError(err, "NoReply") - return - } - case "InParam": - z.InParam, err = dc.ReadBytes(z.InParam) - if err != nil { - err = msgp.WrapError(err, "InParam") - return - } - default: - err = dc.Skip() - if err != nil { - err = msgp.WrapError(err) - return - } - } - } - return -} - -// EncodeMsg implements msgp.Encodable -func (z *MsgpRpcRequestData) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 4 - // write "Seq" - err = en.Append(0x84, 0xa3, 0x53, 0x65, 0x71) - if err != nil { - return - } - err = en.WriteUint64(z.Seq) - if err != nil { - err = msgp.WrapError(err, "Seq") - return - } - // write "ServiceMethod" - err = en.Append(0xad, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64) - if err != nil { - return - } - err = en.WriteString(z.ServiceMethod) - if err != nil { - err = msgp.WrapError(err, "ServiceMethod") - return - } - // write "NoReply" - err = en.Append(0xa7, 0x4e, 0x6f, 0x52, 0x65, 0x70, 0x6c, 0x79) - if err != nil { - return - } - err = en.WriteBool(z.NoReply) - if err != nil { - err = msgp.WrapError(err, "NoReply") - return - } - // write "InParam" - err = en.Append(0xa7, 0x49, 0x6e, 0x50, 0x61, 0x72, 0x61, 0x6d) - if err != nil { - return - } - err = en.WriteBytes(z.InParam) - if err != nil { - err = msgp.WrapError(err, "InParam") - return - } - return -} - -// MarshalMsg implements msgp.Marshaler -func (z *MsgpRpcRequestData) MarshalMsg(b []byte) (o []byte, err error) { - o = msgp.Require(b, z.Msgsize()) - // map header, size 4 - // string "Seq" - o = append(o, 0x84, 0xa3, 0x53, 0x65, 0x71) - o = msgp.AppendUint64(o, z.Seq) - // string "ServiceMethod" - o = append(o, 0xad, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64) - o = msgp.AppendString(o, z.ServiceMethod) - // string "NoReply" - o = append(o, 0xa7, 0x4e, 0x6f, 0x52, 0x65, 0x70, 0x6c, 0x79) - o = msgp.AppendBool(o, z.NoReply) - // string "InParam" - o = append(o, 0xa7, 0x49, 0x6e, 0x50, 0x61, 0x72, 0x61, 0x6d) - o = msgp.AppendBytes(o, z.InParam) - return -} - -// UnmarshalMsg implements msgp.Unmarshaler -func (z *MsgpRpcRequestData) UnmarshalMsg(bts []byte) (o []byte, err error) { - var field []byte - _ = field - var zb0001 uint32 - zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - for zb0001 > 0 { - zb0001-- - field, bts, err = msgp.ReadMapKeyZC(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - case "Seq": - z.Seq, bts, err = msgp.ReadUint64Bytes(bts) - if err != nil { - err = msgp.WrapError(err, "Seq") - return - } - case "ServiceMethod": - z.ServiceMethod, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "ServiceMethod") - return - } - case "NoReply": - z.NoReply, bts, err = msgp.ReadBoolBytes(bts) - if err != nil { - err = msgp.WrapError(err, "NoReply") - return - } - case "InParam": - z.InParam, bts, err = msgp.ReadBytesBytes(bts, z.InParam) - if err != nil { - err = msgp.WrapError(err, "InParam") - return - } - default: - bts, err = msgp.Skip(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - } - } - o = bts - return -} - -// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z *MsgpRpcRequestData) Msgsize() (s int) { - s = 1 + 4 + msgp.Uint64Size + 14 + msgp.StringPrefixSize + len(z.ServiceMethod) + 8 + msgp.BoolSize + 8 + msgp.BytesPrefixSize + len(z.InParam) - return -} - -// DecodeMsg implements msgp.Decodable -func (z *MsgpRpcResponseData) DecodeMsg(dc *msgp.Reader) (err error) { - var field []byte - _ = field - var zb0001 uint32 - zb0001, err = dc.ReadMapHeader() - if err != nil { - err = msgp.WrapError(err) - return - } - for zb0001 > 0 { - zb0001-- - field, err = dc.ReadMapKeyPtr() - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - case "Seq": - z.Seq, err = dc.ReadUint64() - if err != nil { - err = msgp.WrapError(err, "Seq") - return - } - case "Err": - z.Err, err = dc.ReadString() - if err != nil { - err = msgp.WrapError(err, "Err") - return - } - case "Reply": - z.Reply, err = dc.ReadBytes(z.Reply) - if err != nil { - err = msgp.WrapError(err, "Reply") - return - } - default: - err = dc.Skip() - if err != nil { - err = msgp.WrapError(err) - return - } - } - } - return -} - -// EncodeMsg implements msgp.Encodable -func (z *MsgpRpcResponseData) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 3 - // write "Seq" - err = en.Append(0x83, 0xa3, 0x53, 0x65, 0x71) - if err != nil { - return - } - err = en.WriteUint64(z.Seq) - if err != nil { - err = msgp.WrapError(err, "Seq") - return - } - // write "Err" - err = en.Append(0xa3, 0x45, 0x72, 0x72) - if err != nil { - return - } - err = en.WriteString(z.Err) - if err != nil { - err = msgp.WrapError(err, "Err") - return - } - // write "Reply" - err = en.Append(0xa5, 0x52, 0x65, 0x70, 0x6c, 0x79) - if err != nil { - return - } - err = en.WriteBytes(z.Reply) - if err != nil { - err = msgp.WrapError(err, "Reply") - return - } - return -} - -// MarshalMsg implements msgp.Marshaler -func (z *MsgpRpcResponseData) MarshalMsg(b []byte) (o []byte, err error) { - o = msgp.Require(b, z.Msgsize()) - // map header, size 3 - // string "Seq" - o = append(o, 0x83, 0xa3, 0x53, 0x65, 0x71) - o = msgp.AppendUint64(o, z.Seq) - // string "Err" - o = append(o, 0xa3, 0x45, 0x72, 0x72) - o = msgp.AppendString(o, z.Err) - // string "Reply" - o = append(o, 0xa5, 0x52, 0x65, 0x70, 0x6c, 0x79) - o = msgp.AppendBytes(o, z.Reply) - return -} - -// UnmarshalMsg implements msgp.Unmarshaler -func (z *MsgpRpcResponseData) UnmarshalMsg(bts []byte) (o []byte, err error) { - var field []byte - _ = field - var zb0001 uint32 - zb0001, bts, err = msgp.ReadMapHeaderBytes(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - for zb0001 > 0 { - zb0001-- - field, bts, err = msgp.ReadMapKeyZC(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - switch msgp.UnsafeString(field) { - case "Seq": - z.Seq, bts, err = msgp.ReadUint64Bytes(bts) - if err != nil { - err = msgp.WrapError(err, "Seq") - return - } - case "Err": - z.Err, bts, err = msgp.ReadStringBytes(bts) - if err != nil { - err = msgp.WrapError(err, "Err") - return - } - case "Reply": - z.Reply, bts, err = msgp.ReadBytesBytes(bts, z.Reply) - if err != nil { - err = msgp.WrapError(err, "Reply") - return - } - default: - bts, err = msgp.Skip(bts) - if err != nil { - err = msgp.WrapError(err) - return - } - } - } - o = bts - return -} - -// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message -func (z *MsgpRpcResponseData) Msgsize() (s int) { - s = 1 + 4 + msgp.Uint64Size + 4 + msgp.StringPrefixSize + len(z.Err) + 6 + msgp.BytesPrefixSize + len(z.Reply) - return -} diff --git a/rpc/pbprocessor.go b/rpc/pbprocessor.go index d8b8e79..54c1a3e 100644 --- a/rpc/pbprocessor.go +++ b/rpc/pbprocessor.go @@ -14,11 +14,11 @@ var rpcPbRequestDataPool sync.Pool func init(){ rpcPbResponeDataPool.New = func()interface{}{ - return &JsonRpcResponseData{} + return &PBRpcResponseData{} } rpcPbRequestDataPool.New = func()interface{}{ - return &JsonRpcRequestData{} + return &PBRpcRequestData{} } } diff --git a/rpc/probufprocessor.go b/rpc/probufprocessor.go deleted file mode 100644 index 9ab1e3e..0000000 --- a/rpc/probufprocessor.go +++ /dev/null @@ -1 +0,0 @@ -package rpc diff --git a/rpc/rpc.go b/rpc/rpc.go index 534e9e5..47ad843 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -19,6 +19,20 @@ type RpcResponse struct { RpcResponeData IRpcResponseData } +func (slf *RpcRequest) Clear() *RpcRequest{ + slf.RpcRequestData = nil + slf.localReply = nil + slf.localParam = nil + slf.requestHandle = nil + slf.callback = nil + return slf +} + +func (slf *RpcResponse) Clear() *RpcResponse{ + slf.RpcResponeData = nil + return slf +} + type IRpcRequestData interface { GetSeq() uint64 GetServiceMethod() string @@ -49,7 +63,7 @@ type Call struct { calltime time.Time } -func (slf *Call) Clear(){ +func (slf *Call) Clear() *Call{ slf.Seq = 0 slf.ServiceMethod = "" slf.Arg = nil @@ -59,6 +73,7 @@ func (slf *Call) Clear(){ slf.connid = 0 slf.callback = nil slf.rpcHandler = nil + return slf } func (slf *Call) Done() *Call{ @@ -74,6 +89,8 @@ var rpcResponePool sync.Pool var rpcRequestPool sync.Pool var rpcCallPool sync.Pool + + func init(){ rpcResponePool.New = func()interface{}{ return &RpcResponse{} @@ -89,17 +106,15 @@ func init(){ } func MakeRpcResponse() *RpcResponse{ - return rpcResponePool.Get().(*RpcResponse) + return rpcResponePool.Get().(*RpcResponse).Clear() } func MakeRpcRequest() *RpcRequest{ - return rpcRequestPool.Get().(*RpcRequest) + return rpcRequestPool.Get().(*RpcRequest).Clear() } func MakeCall() *Call { - call := rpcCallPool.Get().(*Call) - - return call + return rpcCallPool.Get().(*Call).Clear() } func ReleaseRpcResponse(rpcRespone *RpcResponse){ diff --git a/rpc/server.go b/rpc/server.go index e0c7d4f..1352672 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -220,7 +220,7 @@ func (slf *Server) selfNodeRpcHandlerGo(client *Client,noReply bool,handlerName if noReply == false { client.AddPending(pCall) req.requestHandle = func(Returns interface{},Err *RpcError){ - v := client.FindPending(pCall.Seq) + v := client.RemovePending(pCall.Seq) if v == nil { log.Error("rpcClient cannot find seq %d in pending",pCall.Seq) ReleaseCall(pCall) @@ -258,6 +258,7 @@ func (slf *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler IRp if rpcHandler== nil { err := fmt.Errorf("service method %s.%s not config!", handlerName,methodName) log.Error("%+v",err) + ReleaseCall(pCall) return err } @@ -268,13 +269,12 @@ func (slf *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler IRp if noReply == false { client.AddPending(pCall) req.requestHandle = func(Returns interface{},Err *RpcError){ - //processor.ReleaseRpcRequest(req.RpcRequestData) - //ReleaseRpcRequest(req) - v := client.FindPending(pCall.Seq) + v := client.RemovePending(pCall.Seq) if v == nil { log.Error("rpcClient cannot find seq %d in pending",pCall.Seq) - ReleaseCall(pCall) + processor.ReleaseRpcRequest(req.RpcRequestData) + ReleaseRpcRequest(req) return }