Compare commits

..

3 Commits

Author SHA1 Message Date
duanhf2012
fa8cbfb40e 优化rpc处理异常问题 2023-08-29 18:23:20 +08:00
duanhf2012
388b946401 优化日志的ErrorAttr 2023-08-24 11:34:43 +08:00
duanhf2012
582a0faa6f 替换gogoprotobuf为google标准库 2023-08-21 15:44:55 +08:00
14 changed files with 2463 additions and 8611 deletions

3
go.mod
View File

@@ -4,11 +4,12 @@ go 1.21
require (
github.com/go-sql-driver/mysql v1.6.0
github.com/gogo/protobuf v1.3.2
github.com/gomodule/redigo v1.8.8
github.com/gorilla/websocket v1.5.0
github.com/json-iterator/go v1.1.12
github.com/pierrec/lz4/v4 v4.1.18
go.mongodb.org/mongo-driver v1.9.1
google.golang.org/protobuf v1.31.0
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
)

9
go.sum
View File

@@ -7,12 +7,14 @@ github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gomodule/redigo v1.8.8 h1:f6cXq6RRfiyrOJEV7p3JhLDlmawGBVBBP1MggY8Mo4E=
github.com/gomodule/redigo v1.8.8/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
@@ -32,6 +34,8 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -92,6 +96,9 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@@ -340,6 +340,10 @@ func Close() {
}
func ErrorAttr(key string,value error) slog.Attr{
if value== nil {
return slog.Attr{key, slog.StringValue("nil")}
}
return slog.Attr{key, slog.StringValue(value.Error())}
}

View File

@@ -4,7 +4,7 @@ import (
"encoding/binary"
"fmt"
"github.com/duanhf2012/origin/util/bytespool"
"github.com/gogo/protobuf/proto"
"google.golang.org/protobuf/proto"
"reflect"
)

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
syntax = "proto3";
package rpc;
option go_package = "./rpc";
option go_package = ".;rpc";
message NodeInfo{
int32 NodeId = 1;

View File

@@ -1,106 +0,0 @@
package rpc
import (
"github.com/duanhf2012/origin/util/sync"
"github.com/gogo/protobuf/proto"
"fmt"
)
type GoGoPBProcessor struct {
}
var rpcGoGoPbResponseDataPool =sync.NewPool(make(chan interface{},10240), func()interface{}{
return &GoGoPBRpcResponseData{}
})
var rpcGoGoPbRequestDataPool =sync.NewPool(make(chan interface{},10240), func()interface{}{
return &GoGoPBRpcRequestData{}
})
func (slf *GoGoPBRpcRequestData) MakeRequest(seq uint64,rpcMethodId uint32,serviceMethod string,noReply bool,inParam []byte) *GoGoPBRpcRequestData{
slf.Seq = seq
slf.RpcMethodId = rpcMethodId
slf.ServiceMethod = serviceMethod
slf.NoReply = noReply
slf.InParam = inParam
return slf
}
func (slf *GoGoPBRpcResponseData) MakeRespone(seq uint64,err RpcError,reply []byte) *GoGoPBRpcResponseData{
slf.Seq = seq
slf.Error = err.Error()
slf.Reply = reply
return slf
}
func (slf *GoGoPBProcessor) Marshal(v interface{}) ([]byte, error){
return proto.Marshal(v.(proto.Message))
}
func (slf *GoGoPBProcessor) Unmarshal(data []byte, msg interface{}) error{
protoMsg,ok := msg.(proto.Message)
if ok == false {
return fmt.Errorf("%+v is not of proto.Message type",msg)
}
return proto.Unmarshal(data, protoMsg)
}
func (slf *GoGoPBProcessor) MakeRpcRequest(seq uint64,rpcMethodId uint32,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData{
pGogoPbRpcRequestData := rpcGoGoPbRequestDataPool.Get().(*GoGoPBRpcRequestData)
pGogoPbRpcRequestData.MakeRequest(seq,rpcMethodId,serviceMethod,noReply,inParam)
return pGogoPbRpcRequestData
}
func (slf *GoGoPBProcessor) MakeRpcResponse(seq uint64,err RpcError,reply []byte) IRpcResponseData {
pGoGoPBRpcResponseData := rpcGoGoPbResponseDataPool.Get().(*GoGoPBRpcResponseData)
pGoGoPBRpcResponseData.MakeRespone(seq,err,reply)
return pGoGoPBRpcResponseData
}
func (slf *GoGoPBProcessor) ReleaseRpcRequest(rpcRequestData IRpcRequestData){
rpcGoGoPbRequestDataPool.Put(rpcRequestData)
}
func (slf *GoGoPBProcessor) ReleaseRpcResponse(rpcResponseData IRpcResponseData){
rpcGoGoPbResponseDataPool.Put(rpcResponseData)
}
func (slf *GoGoPBProcessor) IsParse(param interface{}) bool {
_,ok := param.(proto.Message)
return ok
}
func (slf *GoGoPBProcessor) GetProcessorType() RpcProcessorType{
return RpcProcessorGoGoPB
}
func (slf *GoGoPBProcessor) Clone(src interface{}) (interface{},error){
srcMsg,ok := src.(proto.Message)
if ok == false {
return nil,fmt.Errorf("param is not of proto.message type")
}
return proto.Clone(srcMsg),nil
}
func (slf *GoGoPBRpcRequestData) IsNoReply() bool{
return slf.GetNoReply()
}
func (slf *GoGoPBRpcResponseData) GetErr() *RpcError {
if slf.GetError() == "" {
return nil
}
err := RpcError(slf.GetError())
return &err
}

View File

@@ -1,769 +0,0 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: gogorpc.proto
package rpc
import (
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
io "io"
math "math"
math_bits "math/bits"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type GoGoPBRpcRequestData struct {
Seq uint64 `protobuf:"varint,1,opt,name=Seq,proto3" json:"Seq,omitempty"`
RpcMethodId uint32 `protobuf:"varint,2,opt,name=RpcMethodId,proto3" json:"RpcMethodId,omitempty"`
ServiceMethod string `protobuf:"bytes,3,opt,name=ServiceMethod,proto3" json:"ServiceMethod,omitempty"`
NoReply bool `protobuf:"varint,4,opt,name=NoReply,proto3" json:"NoReply,omitempty"`
InParam []byte `protobuf:"bytes,5,opt,name=InParam,proto3" json:"InParam,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *GoGoPBRpcRequestData) Reset() { *m = GoGoPBRpcRequestData{} }
func (m *GoGoPBRpcRequestData) String() string { return proto.CompactTextString(m) }
func (*GoGoPBRpcRequestData) ProtoMessage() {}
func (*GoGoPBRpcRequestData) Descriptor() ([]byte, []int) {
return fileDescriptor_d0e25d3af112ec8f, []int{0}
}
func (m *GoGoPBRpcRequestData) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *GoGoPBRpcRequestData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_GoGoPBRpcRequestData.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *GoGoPBRpcRequestData) XXX_Merge(src proto.Message) {
xxx_messageInfo_GoGoPBRpcRequestData.Merge(m, src)
}
func (m *GoGoPBRpcRequestData) XXX_Size() int {
return m.Size()
}
func (m *GoGoPBRpcRequestData) XXX_DiscardUnknown() {
xxx_messageInfo_GoGoPBRpcRequestData.DiscardUnknown(m)
}
var xxx_messageInfo_GoGoPBRpcRequestData proto.InternalMessageInfo
func (m *GoGoPBRpcRequestData) GetSeq() uint64 {
if m != nil {
return m.Seq
}
return 0
}
func (m *GoGoPBRpcRequestData) GetRpcMethodId() uint32 {
if m != nil {
return m.RpcMethodId
}
return 0
}
func (m *GoGoPBRpcRequestData) GetServiceMethod() string {
if m != nil {
return m.ServiceMethod
}
return ""
}
func (m *GoGoPBRpcRequestData) GetNoReply() bool {
if m != nil {
return m.NoReply
}
return false
}
func (m *GoGoPBRpcRequestData) GetInParam() []byte {
if m != nil {
return m.InParam
}
return nil
}
type GoGoPBRpcResponseData struct {
Seq uint64 `protobuf:"varint,1,opt,name=Seq,proto3" json:"Seq,omitempty"`
Error string `protobuf:"bytes,2,opt,name=Error,proto3" json:"Error,omitempty"`
Reply []byte `protobuf:"bytes,3,opt,name=Reply,proto3" json:"Reply,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *GoGoPBRpcResponseData) Reset() { *m = GoGoPBRpcResponseData{} }
func (m *GoGoPBRpcResponseData) String() string { return proto.CompactTextString(m) }
func (*GoGoPBRpcResponseData) ProtoMessage() {}
func (*GoGoPBRpcResponseData) Descriptor() ([]byte, []int) {
return fileDescriptor_d0e25d3af112ec8f, []int{1}
}
func (m *GoGoPBRpcResponseData) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *GoGoPBRpcResponseData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_GoGoPBRpcResponseData.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *GoGoPBRpcResponseData) XXX_Merge(src proto.Message) {
xxx_messageInfo_GoGoPBRpcResponseData.Merge(m, src)
}
func (m *GoGoPBRpcResponseData) XXX_Size() int {
return m.Size()
}
func (m *GoGoPBRpcResponseData) XXX_DiscardUnknown() {
xxx_messageInfo_GoGoPBRpcResponseData.DiscardUnknown(m)
}
var xxx_messageInfo_GoGoPBRpcResponseData proto.InternalMessageInfo
func (m *GoGoPBRpcResponseData) GetSeq() uint64 {
if m != nil {
return m.Seq
}
return 0
}
func (m *GoGoPBRpcResponseData) GetError() string {
if m != nil {
return m.Error
}
return ""
}
func (m *GoGoPBRpcResponseData) GetReply() []byte {
if m != nil {
return m.Reply
}
return nil
}
func init() {
proto.RegisterType((*GoGoPBRpcRequestData)(nil), "rpc.GoGoPBRpcRequestData")
proto.RegisterType((*GoGoPBRpcResponseData)(nil), "rpc.GoGoPBRpcResponseData")
}
func init() { proto.RegisterFile("gogorpc.proto", fileDescriptor_d0e25d3af112ec8f) }
var fileDescriptor_d0e25d3af112ec8f = []byte{
// 233 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4d, 0xcf, 0x4f, 0xcf,
0x2f, 0x2a, 0x48, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2e, 0x2a, 0x48, 0x56, 0x5a,
0xc2, 0xc8, 0x25, 0xe2, 0x9e, 0xef, 0x9e, 0x1f, 0xe0, 0x14, 0x54, 0x90, 0x1c, 0x94, 0x5a, 0x58,
0x9a, 0x5a, 0x5c, 0xe2, 0x92, 0x58, 0x92, 0x28, 0x24, 0xc0, 0xc5, 0x1c, 0x9c, 0x5a, 0x28, 0xc1,
0xa8, 0xc0, 0xa8, 0xc1, 0x12, 0x04, 0x62, 0x0a, 0x29, 0x70, 0x71, 0x07, 0x15, 0x24, 0xfb, 0xa6,
0x96, 0x64, 0xe4, 0xa7, 0x78, 0xa6, 0x48, 0x30, 0x29, 0x30, 0x6a, 0xf0, 0x06, 0x21, 0x0b, 0x09,
0xa9, 0x70, 0xf1, 0x06, 0xa7, 0x16, 0x95, 0x65, 0x26, 0xa7, 0x42, 0x84, 0x24, 0x98, 0x15, 0x18,
0x35, 0x38, 0x83, 0x50, 0x05, 0x85, 0x24, 0xb8, 0xd8, 0xfd, 0xf2, 0x83, 0x52, 0x0b, 0x72, 0x2a,
0x25, 0x58, 0x14, 0x18, 0x35, 0x38, 0x82, 0x60, 0x5c, 0x90, 0x8c, 0x67, 0x5e, 0x40, 0x62, 0x51,
0x62, 0xae, 0x04, 0xab, 0x02, 0xa3, 0x06, 0x4f, 0x10, 0x8c, 0xab, 0x14, 0xca, 0x25, 0x8a, 0xe4,
0xca, 0xe2, 0x82, 0xfc, 0xbc, 0xe2, 0x54, 0x1c, 0xce, 0x14, 0xe1, 0x62, 0x75, 0x2d, 0x2a, 0xca,
0x2f, 0x02, 0x3b, 0x90, 0x33, 0x08, 0xc2, 0x01, 0x89, 0x42, 0xac, 0x64, 0x06, 0x1b, 0x0c, 0xe1,
0x38, 0x09, 0x9f, 0x78, 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c, 0xe3, 0x83, 0x47, 0x72, 0x8c, 0x51,
0xac, 0x7a, 0xfa, 0x45, 0x05, 0xc9, 0x49, 0x6c, 0xe0, 0xe0, 0x31, 0x06, 0x04, 0x00, 0x00, 0xff,
0xff, 0x26, 0xcf, 0x31, 0x39, 0x2f, 0x01, 0x00, 0x00,
}
func (m *GoGoPBRpcRequestData) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *GoGoPBRpcRequestData) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *GoGoPBRpcRequestData) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.InParam) > 0 {
i -= len(m.InParam)
copy(dAtA[i:], m.InParam)
i = encodeVarintGogorpc(dAtA, i, uint64(len(m.InParam)))
i--
dAtA[i] = 0x2a
}
if m.NoReply {
i--
if m.NoReply {
dAtA[i] = 1
} else {
dAtA[i] = 0
}
i--
dAtA[i] = 0x20
}
if len(m.ServiceMethod) > 0 {
i -= len(m.ServiceMethod)
copy(dAtA[i:], m.ServiceMethod)
i = encodeVarintGogorpc(dAtA, i, uint64(len(m.ServiceMethod)))
i--
dAtA[i] = 0x1a
}
if m.RpcMethodId != 0 {
i = encodeVarintGogorpc(dAtA, i, uint64(m.RpcMethodId))
i--
dAtA[i] = 0x10
}
if m.Seq != 0 {
i = encodeVarintGogorpc(dAtA, i, uint64(m.Seq))
i--
dAtA[i] = 0x8
}
return len(dAtA) - i, nil
}
func (m *GoGoPBRpcResponseData) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *GoGoPBRpcResponseData) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *GoGoPBRpcResponseData) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.XXX_unrecognized != nil {
i -= len(m.XXX_unrecognized)
copy(dAtA[i:], m.XXX_unrecognized)
}
if len(m.Reply) > 0 {
i -= len(m.Reply)
copy(dAtA[i:], m.Reply)
i = encodeVarintGogorpc(dAtA, i, uint64(len(m.Reply)))
i--
dAtA[i] = 0x1a
}
if len(m.Error) > 0 {
i -= len(m.Error)
copy(dAtA[i:], m.Error)
i = encodeVarintGogorpc(dAtA, i, uint64(len(m.Error)))
i--
dAtA[i] = 0x12
}
if m.Seq != 0 {
i = encodeVarintGogorpc(dAtA, i, uint64(m.Seq))
i--
dAtA[i] = 0x8
}
return len(dAtA) - i, nil
}
func encodeVarintGogorpc(dAtA []byte, offset int, v uint64) int {
offset -= sovGogorpc(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *GoGoPBRpcRequestData) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Seq != 0 {
n += 1 + sovGogorpc(uint64(m.Seq))
}
if m.RpcMethodId != 0 {
n += 1 + sovGogorpc(uint64(m.RpcMethodId))
}
l = len(m.ServiceMethod)
if l > 0 {
n += 1 + l + sovGogorpc(uint64(l))
}
if m.NoReply {
n += 2
}
l = len(m.InParam)
if l > 0 {
n += 1 + l + sovGogorpc(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func (m *GoGoPBRpcResponseData) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Seq != 0 {
n += 1 + sovGogorpc(uint64(m.Seq))
}
l = len(m.Error)
if l > 0 {
n += 1 + l + sovGogorpc(uint64(l))
}
l = len(m.Reply)
if l > 0 {
n += 1 + l + sovGogorpc(uint64(l))
}
if m.XXX_unrecognized != nil {
n += len(m.XXX_unrecognized)
}
return n
}
func sovGogorpc(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
func sozGogorpc(x uint64) (n int) {
return sovGogorpc(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *GoGoPBRpcRequestData) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowGogorpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: GoGoPBRpcRequestData: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: GoGoPBRpcRequestData: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Seq", wireType)
}
m.Seq = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowGogorpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Seq |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field RpcMethodId", wireType)
}
m.RpcMethodId = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowGogorpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.RpcMethodId |= uint32(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field ServiceMethod", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowGogorpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthGogorpc
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthGogorpc
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.ServiceMethod = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 4:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field NoReply", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowGogorpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
m.NoReply = bool(v != 0)
case 5:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field InParam", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowGogorpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthGogorpc
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthGogorpc
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.InParam = append(m.InParam[:0], dAtA[iNdEx:postIndex]...)
if m.InParam == nil {
m.InParam = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipGogorpc(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthGogorpc
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthGogorpc
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *GoGoPBRpcResponseData) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowGogorpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: GoGoPBRpcResponseData: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: GoGoPBRpcResponseData: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Seq", wireType)
}
m.Seq = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowGogorpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Seq |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Error", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowGogorpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthGogorpc
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthGogorpc
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Error = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Reply", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowGogorpc
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthGogorpc
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthGogorpc
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Reply = append(m.Reply[:0], dAtA[iNdEx:postIndex]...)
if m.Reply == nil {
m.Reply = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipGogorpc(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthGogorpc
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthGogorpc
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipGogorpc(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowGogorpc
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowGogorpc
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
case 1:
iNdEx += 8
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowGogorpc
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthGogorpc
}
iNdEx += length
case 3:
depth++
case 4:
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupGogorpc
}
depth--
case 5:
iNdEx += 4
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthGogorpc
}
if depth == 0 {
return iNdEx, nil
}
}
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthGogorpc = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowGogorpc = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupGogorpc = fmt.Errorf("proto: unexpected end of group")
)

File diff suppressed because it is too large Load Diff

106
rpc/pbprocessor.go Normal file
View File

@@ -0,0 +1,106 @@
package rpc
import (
"github.com/duanhf2012/origin/util/sync"
"google.golang.org/protobuf/proto"
"fmt"
)
type PBProcessor struct {
}
var rpcPbResponseDataPool =sync.NewPool(make(chan interface{},10240), func()interface{}{
return &PBRpcResponseData{}
})
var rpcPbRequestDataPool =sync.NewPool(make(chan interface{},10240), func()interface{}{
return &PBRpcRequestData{}
})
func (slf *PBRpcRequestData) MakeRequest(seq uint64,rpcMethodId uint32,serviceMethod string,noReply bool,inParam []byte) *PBRpcRequestData{
slf.Seq = seq
slf.RpcMethodId = rpcMethodId
slf.ServiceMethod = serviceMethod
slf.NoReply = noReply
slf.InParam = inParam
return slf
}
func (slf *PBRpcResponseData) MakeRespone(seq uint64,err RpcError,reply []byte) *PBRpcResponseData{
slf.Seq = seq
slf.Error = err.Error()
slf.Reply = reply
return slf
}
func (slf *PBProcessor) Marshal(v interface{}) ([]byte, error){
return proto.Marshal(v.(proto.Message))
}
func (slf *PBProcessor) Unmarshal(data []byte, msg interface{}) error{
protoMsg,ok := msg.(proto.Message)
if ok == false {
return fmt.Errorf("%+v is not of proto.Message type",msg)
}
return proto.Unmarshal(data, protoMsg)
}
func (slf *PBProcessor) MakeRpcRequest(seq uint64,rpcMethodId uint32,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData{
pGogoPbRpcRequestData := rpcPbRequestDataPool.Get().(*PBRpcRequestData)
pGogoPbRpcRequestData.MakeRequest(seq,rpcMethodId,serviceMethod,noReply,inParam)
return pGogoPbRpcRequestData
}
func (slf *PBProcessor) MakeRpcResponse(seq uint64,err RpcError,reply []byte) IRpcResponseData {
pPBRpcResponseData := rpcPbResponseDataPool.Get().(*PBRpcResponseData)
pPBRpcResponseData.MakeRespone(seq,err,reply)
return pPBRpcResponseData
}
func (slf *PBProcessor) ReleaseRpcRequest(rpcRequestData IRpcRequestData){
rpcPbRequestDataPool.Put(rpcRequestData)
}
func (slf *PBProcessor) ReleaseRpcResponse(rpcResponseData IRpcResponseData){
rpcPbResponseDataPool.Put(rpcResponseData)
}
func (slf *PBProcessor) IsParse(param interface{}) bool {
_,ok := param.(proto.Message)
return ok
}
func (slf *PBProcessor) GetProcessorType() RpcProcessorType{
return RpcProcessorGoGoPB
}
func (slf *PBProcessor) Clone(src interface{}) (interface{},error){
srcMsg,ok := src.(proto.Message)
if ok == false {
return nil,fmt.Errorf("param is not of proto.message type")
}
return proto.Clone(srcMsg),nil
}
func (slf *PBRpcRequestData) IsNoReply() bool{
return slf.GetNoReply()
}
func (slf *PBRpcResponseData) GetErr() *RpcError {
if slf.GetError() == "" {
return nil
}
err := RpcError(slf.GetError())
return &err
}

263
rpc/protorpc.pb.go Normal file
View File

@@ -0,0 +1,263 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.31.0
// protoc v3.11.4
// source: test/rpc/protorpc.proto
package rpc
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type PBRpcRequestData struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Seq uint64 `protobuf:"varint,1,opt,name=Seq,proto3" json:"Seq,omitempty"`
RpcMethodId uint32 `protobuf:"varint,2,opt,name=RpcMethodId,proto3" json:"RpcMethodId,omitempty"`
ServiceMethod string `protobuf:"bytes,3,opt,name=ServiceMethod,proto3" json:"ServiceMethod,omitempty"`
NoReply bool `protobuf:"varint,4,opt,name=NoReply,proto3" json:"NoReply,omitempty"`
InParam []byte `protobuf:"bytes,5,opt,name=InParam,proto3" json:"InParam,omitempty"`
}
func (x *PBRpcRequestData) Reset() {
*x = PBRpcRequestData{}
if protoimpl.UnsafeEnabled {
mi := &file_test_rpc_protorpc_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *PBRpcRequestData) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PBRpcRequestData) ProtoMessage() {}
func (x *PBRpcRequestData) ProtoReflect() protoreflect.Message {
mi := &file_test_rpc_protorpc_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PBRpcRequestData.ProtoReflect.Descriptor instead.
func (*PBRpcRequestData) Descriptor() ([]byte, []int) {
return file_test_rpc_protorpc_proto_rawDescGZIP(), []int{0}
}
func (x *PBRpcRequestData) GetSeq() uint64 {
if x != nil {
return x.Seq
}
return 0
}
func (x *PBRpcRequestData) GetRpcMethodId() uint32 {
if x != nil {
return x.RpcMethodId
}
return 0
}
func (x *PBRpcRequestData) GetServiceMethod() string {
if x != nil {
return x.ServiceMethod
}
return ""
}
func (x *PBRpcRequestData) GetNoReply() bool {
if x != nil {
return x.NoReply
}
return false
}
func (x *PBRpcRequestData) GetInParam() []byte {
if x != nil {
return x.InParam
}
return nil
}
type PBRpcResponseData struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Seq uint64 `protobuf:"varint,1,opt,name=Seq,proto3" json:"Seq,omitempty"`
Error string `protobuf:"bytes,2,opt,name=Error,proto3" json:"Error,omitempty"`
Reply []byte `protobuf:"bytes,3,opt,name=Reply,proto3" json:"Reply,omitempty"`
}
func (x *PBRpcResponseData) Reset() {
*x = PBRpcResponseData{}
if protoimpl.UnsafeEnabled {
mi := &file_test_rpc_protorpc_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *PBRpcResponseData) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PBRpcResponseData) ProtoMessage() {}
func (x *PBRpcResponseData) ProtoReflect() protoreflect.Message {
mi := &file_test_rpc_protorpc_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PBRpcResponseData.ProtoReflect.Descriptor instead.
func (*PBRpcResponseData) Descriptor() ([]byte, []int) {
return file_test_rpc_protorpc_proto_rawDescGZIP(), []int{1}
}
func (x *PBRpcResponseData) GetSeq() uint64 {
if x != nil {
return x.Seq
}
return 0
}
func (x *PBRpcResponseData) GetError() string {
if x != nil {
return x.Error
}
return ""
}
func (x *PBRpcResponseData) GetReply() []byte {
if x != nil {
return x.Reply
}
return nil
}
var File_test_rpc_protorpc_proto protoreflect.FileDescriptor
var file_test_rpc_protorpc_proto_rawDesc = []byte{
0x0a, 0x17, 0x74, 0x65, 0x73, 0x74, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x72, 0x70, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x72, 0x70, 0x63, 0x22, 0xa0,
0x01, 0x0a, 0x10, 0x50, 0x42, 0x52, 0x70, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x44,
0x61, 0x74, 0x61, 0x12, 0x10, 0x0a, 0x03, 0x53, 0x65, 0x71, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04,
0x52, 0x03, 0x53, 0x65, 0x71, 0x12, 0x20, 0x0a, 0x0b, 0x52, 0x70, 0x63, 0x4d, 0x65, 0x74, 0x68,
0x6f, 0x64, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0b, 0x52, 0x70, 0x63, 0x4d,
0x65, 0x74, 0x68, 0x6f, 0x64, 0x49, 0x64, 0x12, 0x24, 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69,
0x63, 0x65, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d,
0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x18, 0x0a,
0x07, 0x4e, 0x6f, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07,
0x4e, 0x6f, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x49, 0x6e, 0x50, 0x61, 0x72,
0x61, 0x6d, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x49, 0x6e, 0x50, 0x61, 0x72, 0x61,
0x6d, 0x22, 0x51, 0x0a, 0x11, 0x50, 0x42, 0x52, 0x70, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x44, 0x61, 0x74, 0x61, 0x12, 0x10, 0x0a, 0x03, 0x53, 0x65, 0x71, 0x18, 0x01, 0x20,
0x01, 0x28, 0x04, 0x52, 0x03, 0x53, 0x65, 0x71, 0x12, 0x14, 0x0a, 0x05, 0x45, 0x72, 0x72, 0x6f,
0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x14,
0x0a, 0x05, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x52,
0x65, 0x70, 0x6c, 0x79, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x3b, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_test_rpc_protorpc_proto_rawDescOnce sync.Once
file_test_rpc_protorpc_proto_rawDescData = file_test_rpc_protorpc_proto_rawDesc
)
func file_test_rpc_protorpc_proto_rawDescGZIP() []byte {
file_test_rpc_protorpc_proto_rawDescOnce.Do(func() {
file_test_rpc_protorpc_proto_rawDescData = protoimpl.X.CompressGZIP(file_test_rpc_protorpc_proto_rawDescData)
})
return file_test_rpc_protorpc_proto_rawDescData
}
var file_test_rpc_protorpc_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_test_rpc_protorpc_proto_goTypes = []interface{}{
(*PBRpcRequestData)(nil), // 0: rpc.PBRpcRequestData
(*PBRpcResponseData)(nil), // 1: rpc.PBRpcResponseData
}
var file_test_rpc_protorpc_proto_depIdxs = []int32{
0, // [0:0] is the sub-list for method output_type
0, // [0:0] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_test_rpc_protorpc_proto_init() }
func file_test_rpc_protorpc_proto_init() {
if File_test_rpc_protorpc_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_test_rpc_protorpc_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PBRpcRequestData); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_test_rpc_protorpc_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PBRpcResponseData); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_test_rpc_protorpc_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_test_rpc_protorpc_proto_goTypes,
DependencyIndexes: file_test_rpc_protorpc_proto_depIdxs,
MessageInfos: file_test_rpc_protorpc_proto_msgTypes,
}.Build()
File_test_rpc_protorpc_proto = out.File
file_test_rpc_protorpc_proto_rawDesc = nil
file_test_rpc_protorpc_proto_goTypes = nil
file_test_rpc_protorpc_proto_depIdxs = nil
}

View File

@@ -1,8 +1,8 @@
syntax = "proto3";
package rpc;
option go_package = "./rpc";
option go_package = ".;rpc";
message GoGoPBRpcRequestData{
message PBRpcRequestData{
uint64 Seq = 1;
uint32 RpcMethodId = 2;
string ServiceMethod = 3;
@@ -10,7 +10,7 @@ message GoGoPBRpcRequestData{
bytes InParam = 5;
}
message GoGoPBRpcResponseData{
message PBRpcResponseData{
uint64 Seq = 1;
string Error = 2;
bytes Reply = 3;

File diff suppressed because it is too large Load Diff

View File

@@ -10,6 +10,7 @@ import (
"reflect"
"strings"
"time"
"runtime"
)
type RpcProcessorType uint8
@@ -19,7 +20,7 @@ const (
RpcProcessorGoGoPB RpcProcessorType = 1
)
var arrayProcessor = []IRpcProcessor{&JsonProcessor{}, &GoGoPBProcessor{}}
var arrayProcessor = []IRpcProcessor{&JsonProcessor{}, &PBProcessor{}}
var arrayProcessorLen uint8 = 2
var LittleEndian bool
@@ -141,6 +142,15 @@ func (agent *RpcAgent) WriteResponse(processor IRpcProcessor, serviceMethod stri
}
func (agent *RpcAgent) Run() {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]),log.String("error",errString))
}
}()
for {
data, err := agent.conn.ReadMsg()
if err != nil {
@@ -226,12 +236,13 @@ func (agent *RpcAgent) Run() {
req.inParam, err = rpcHandler.UnmarshalInParam(req.rpcProcessor, req.RpcRequestData.GetServiceMethod(), req.RpcRequestData.GetRpcMethodId(), req.RpcRequestData.GetInParam())
if err != nil {
rErr := "Call Rpc " + req.RpcRequestData.GetServiceMethod() + " Param error " + err.Error()
log.Error("call rpc param error",log.String("serviceMethod",req.RpcRequestData.GetServiceMethod()),log.ErrorAttr("error",err))
if req.requestHandle != nil {
req.requestHandle(nil, RpcError(rErr))
} else {
ReleaseRpcRequest(req)
}
log.Error("call rpc param error",log.String("serviceMethod",req.RpcRequestData.GetServiceMethod()),log.ErrorAttr("error",err))
continue
}