From 4b67c59b1e2c279ca792fb317f65ead9bec68ce0 Mon Sep 17 00:00:00 2001 From: boyce Date: Mon, 21 Dec 2020 16:34:06 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=8E=9F=E5=A7=8Brpc?= =?UTF-8?q?=EF=BC=8C=E9=87=87=E7=94=A8register=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpc/client.go | 8 +- rpc/jsonprocessor.go | 14 +- rpc/pbprocessor.go | 8 +- rpc/processor.go | 2 +- rpc/rpc.go | 5 +- rpc/rpc.pb.go | 323 ++++++++++++++++++++++++++++--------------- rpc/rpc.proto | 9 +- rpc/rpchandler.go | 107 ++++++-------- rpc/server.go | 8 +- service/service.go | 4 + 10 files changed, 294 insertions(+), 194 deletions(-) diff --git a/rpc/client.go b/rpc/client.go index 76fda36..93c05a3 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -177,7 +177,7 @@ func (client *Client) AsyncCall(rpcHandler IRpcHandler,serviceMethod string,call } seq := client.generateSeq() - request:=MakeRpcRequest(processor,seq,serviceMethod,false,InParam) + request:=MakeRpcRequest(processor,seq,0,serviceMethod,false,InParam) bytes,err := processor.Marshal(request.RpcRequestData) ReleaseRpcRequest(request) if err != nil { @@ -206,13 +206,13 @@ func (client *Client) AsyncCall(rpcHandler IRpcHandler,serviceMethod string,call return nil } -func (client *Client) RawGo(processor IRpcProcessor,noReply bool,serviceMethod string,args []byte,reply interface{}) *Call { +func (client *Client) RawGo(processor IRpcProcessor,noReply bool,rpcMethodId uint32,serviceMethod string,args []byte,reply interface{}) *Call { call := MakeCall() call.ServiceMethod = serviceMethod call.Reply = reply call.Seq = client.generateSeq() - request := MakeRpcRequest(processor,call.Seq,serviceMethod,noReply,args) + request := MakeRpcRequest(processor,call.Seq,rpcMethodId,serviceMethod,noReply,args) bytes,err := processor.Marshal(request.RpcRequestData) ReleaseRpcRequest(request) if err != nil { @@ -250,7 +250,7 @@ func (client *Client) Go(noReply bool,serviceMethod string, args interface{},rep return call } - return client.RawGo(processor,noReply,serviceMethod,InParam,reply) + return client.RawGo(processor,noReply,0,serviceMethod,InParam,reply) } func (client *Client) Run(){ diff --git a/rpc/jsonprocessor.go b/rpc/jsonprocessor.go index f240b61..d63a450 100644 --- a/rpc/jsonprocessor.go +++ b/rpc/jsonprocessor.go @@ -12,11 +12,12 @@ type JsonProcessor struct { type JsonRpcRequestData struct { //packhead - Seq uint64 // sequence number chosen by client + Seq uint64 // sequence number chosen by client + rpcMethodId uint32 ServiceMethod string // format: "Service.Method" - NoReply bool //是否需要返回 + NoReply bool //是否需要返回 //packbody - InParam []byte + InParam []byte } type JsonRpcResponseData struct { @@ -49,9 +50,10 @@ func (jsonProcessor *JsonProcessor) Unmarshal(data []byte, v interface{}) error{ return json.Unmarshal(data,v) } -func (jsonProcessor *JsonProcessor) MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData{ +func (jsonProcessor *JsonProcessor) MakeRpcRequest(seq uint64,rpcMethodId uint32,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData{ jsonRpcRequestData := rpcJsonRequestDataPool.Get().(*JsonRpcRequestData) jsonRpcRequestData.Seq = seq + jsonRpcRequestData.rpcMethodId = rpcMethodId jsonRpcRequestData.ServiceMethod = serviceMethod jsonRpcRequestData.NoReply = noReply jsonRpcRequestData.InParam = inParam @@ -92,6 +94,10 @@ func (jsonRpcRequestData *JsonRpcRequestData) GetSeq() uint64{ return jsonRpcRequestData.Seq } +func (jsonRpcRequestData *JsonRpcRequestData) GetRpcMethodId() uint32{ + return jsonRpcRequestData.rpcMethodId +} + func (jsonRpcRequestData *JsonRpcRequestData) GetServiceMethod() string{ return jsonRpcRequestData.ServiceMethod } diff --git a/rpc/pbprocessor.go b/rpc/pbprocessor.go index 0feba7a..9d0fa9d 100644 --- a/rpc/pbprocessor.go +++ b/rpc/pbprocessor.go @@ -22,8 +22,9 @@ func init(){ } } -func (slf *PBRpcRequestData) MakeRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte) *PBRpcRequestData{ +func (slf *PBRpcRequestData) MakeRequest(seq uint64,rpcMethodId uint32,serviceMethod string,noReply bool,inParam []byte) *PBRpcRequestData{ slf.Seq = seq + slf.RpcMethodId = rpcMethodId slf.ServiceMethod = serviceMethod slf.NoReply = noReply slf.InParam = inParam @@ -31,6 +32,7 @@ func (slf *PBRpcRequestData) MakeRequest(seq uint64,serviceMethod string,noReply return slf } + func (slf *PBRpcResponseData) MakeRespone(seq uint64,err RpcError,reply []byte) *PBRpcResponseData{ slf.Seq = seq slf.Error = err.Error() @@ -48,9 +50,9 @@ func (slf *PBProcessor) Unmarshal(data []byte, msg interface{}) error{ return proto.Unmarshal(data, protoMsg) } -func (slf *PBProcessor) MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData{ +func (slf *PBProcessor) MakeRpcRequest(seq uint64,rpcMethodId uint32,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData{ pPbRpcRequestData := rpcPbRequestDataPool.Get().(*PBRpcRequestData) - pPbRpcRequestData.MakeRequest(seq,serviceMethod,noReply,inParam) + pPbRpcRequestData.MakeRequest(seq,rpcMethodId,serviceMethod,noReply,inParam) return pPbRpcRequestData } diff --git a/rpc/processor.go b/rpc/processor.go index 3b3bf89..9d278bb 100644 --- a/rpc/processor.go +++ b/rpc/processor.go @@ -3,7 +3,7 @@ package rpc type IRpcProcessor interface { Marshal(v interface{}) ([]byte, error) //b表示自定义缓冲区,可以填nil,由系统自动分配 Unmarshal(data []byte, v interface{}) error - MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData + MakeRpcRequest(seq uint64,rpcMethodId uint32,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData MakeRpcResponse(seq uint64,err RpcError,reply []byte) IRpcResponseData ReleaseRpcRequest(rpcRequestData IRpcRequestData) diff --git a/rpc/rpc.go b/rpc/rpc.go index b5d18fe..291b5d3 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -40,6 +40,7 @@ type IRpcRequestData interface { GetServiceMethod() string GetInParam() []byte IsNoReply() bool + GetRpcMethodId() uint32 } type IRpcResponseData interface { @@ -120,10 +121,10 @@ func (call *Call) Done() *Call{ return <-call.done } -func MakeRpcRequest(rpcProcessor IRpcProcessor,seq uint64,serviceMethod string,noReply bool,inParam []byte) *RpcRequest{ +func MakeRpcRequest(rpcProcessor IRpcProcessor,seq uint64,rpcMethodId uint32,serviceMethod string,noReply bool,inParam []byte) *RpcRequest{ rpcRequest := rpcRequestPool.Get().(*RpcRequest).Clear() rpcRequest.rpcProcessor = rpcProcessor - rpcRequest.RpcRequestData = rpcRequest.rpcProcessor.MakeRpcRequest(seq,serviceMethod,noReply,inParam) + rpcRequest.RpcRequestData = rpcRequest.rpcProcessor.MakeRpcRequest(seq,rpcMethodId,serviceMethod,noReply,inParam) rpcRequest.ref = true return rpcRequest diff --git a/rpc/rpc.pb.go b/rpc/rpc.pb.go index e517f45..8e97ee5 100644 --- a/rpc/rpc.pb.go +++ b/rpc/rpc.pb.go @@ -1,163 +1,268 @@ // Code generated by protoc-gen-go. DO NOT EDIT. -// source: rpcproto/rpc.proto +// versions: +// protoc-gen-go v1.23.0 +// protoc v3.11.4 +// source: proto/rpcproto/rpc.proto package rpc import ( - fmt "fmt" proto "github.com/golang/protobuf/proto" - math "math" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" ) -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package +// This is a compile-time assertion that a sufficiently up-to-date version +// of the legacy proto package is being used. +const _ = proto.ProtoPackageIsVersion4 type PBRpcRequestData struct { - Seq uint64 `protobuf:"varint,1,opt,name=Seq,proto3" json:"Seq,omitempty"` - ServiceMethod string `protobuf:"bytes,2,opt,name=ServiceMethod,proto3" json:"ServiceMethod,omitempty"` - NoReply bool `protobuf:"varint,3,opt,name=NoReply,proto3" json:"NoReply,omitempty"` - InParam []byte `protobuf:"bytes,4,opt,name=InParam,proto3" json:"InParam,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Seq uint64 `protobuf:"varint,1,opt,name=Seq,proto3" json:"Seq,omitempty"` + RpcMethodId uint32 `protobuf:"varint,2,opt,name=RpcMethodId,proto3" json:"RpcMethodId,omitempty"` + ServiceMethod string `protobuf:"bytes,3,opt,name=ServiceMethod,proto3" json:"ServiceMethod,omitempty"` + NoReply bool `protobuf:"varint,4,opt,name=NoReply,proto3" json:"NoReply,omitempty"` + InParam []byte `protobuf:"bytes,5,opt,name=InParam,proto3" json:"InParam,omitempty"` } -func (m *PBRpcRequestData) Reset() { *m = PBRpcRequestData{} } -func (m *PBRpcRequestData) String() string { return proto.CompactTextString(m) } -func (*PBRpcRequestData) ProtoMessage() {} +func (x *PBRpcRequestData) Reset() { + *x = PBRpcRequestData{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_rpcproto_rpc_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PBRpcRequestData) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PBRpcRequestData) ProtoMessage() {} + +func (x *PBRpcRequestData) ProtoReflect() protoreflect.Message { + mi := &file_proto_rpcproto_rpc_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PBRpcRequestData.ProtoReflect.Descriptor instead. func (*PBRpcRequestData) Descriptor() ([]byte, []int) { - return fileDescriptor_0008ed1de5480352, []int{0} + return file_proto_rpcproto_rpc_proto_rawDescGZIP(), []int{0} } -func (m *PBRpcRequestData) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_PBRpcRequestData.Unmarshal(m, b) -} -func (m *PBRpcRequestData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_PBRpcRequestData.Marshal(b, m, deterministic) -} -func (m *PBRpcRequestData) XXX_Merge(src proto.Message) { - xxx_messageInfo_PBRpcRequestData.Merge(m, src) -} -func (m *PBRpcRequestData) XXX_Size() int { - return xxx_messageInfo_PBRpcRequestData.Size(m) -} -func (m *PBRpcRequestData) XXX_DiscardUnknown() { - xxx_messageInfo_PBRpcRequestData.DiscardUnknown(m) -} - -var xxx_messageInfo_PBRpcRequestData proto.InternalMessageInfo - -func (m *PBRpcRequestData) GetSeq() uint64 { - if m != nil { - return m.Seq +func (x *PBRpcRequestData) GetSeq() uint64 { + if x != nil { + return x.Seq } return 0 } -func (m *PBRpcRequestData) GetServiceMethod() string { - if m != nil { - return m.ServiceMethod +func (x *PBRpcRequestData) GetRpcMethodId() uint32 { + if x != nil { + return x.RpcMethodId + } + return 0 +} + +func (x *PBRpcRequestData) GetServiceMethod() string { + if x != nil { + return x.ServiceMethod } return "" } -func (m *PBRpcRequestData) GetNoReply() bool { - if m != nil { - return m.NoReply +func (x *PBRpcRequestData) GetNoReply() bool { + if x != nil { + return x.NoReply } return false } -func (m *PBRpcRequestData) GetInParam() []byte { - if m != nil { - return m.InParam +func (x *PBRpcRequestData) GetInParam() []byte { + if x != nil { + return x.InParam } return nil } type PBRpcResponseData struct { - Seq uint64 `protobuf:"varint,1,opt,name=Seq,proto3" json:"Seq,omitempty"` - Error string `protobuf:"bytes,2,opt,name=Error,proto3" json:"Error,omitempty"` - Reply []byte `protobuf:"bytes,3,opt,name=Reply,proto3" json:"Reply,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Seq uint64 `protobuf:"varint,1,opt,name=Seq,proto3" json:"Seq,omitempty"` + Error string `protobuf:"bytes,2,opt,name=Error,proto3" json:"Error,omitempty"` + Reply []byte `protobuf:"bytes,3,opt,name=Reply,proto3" json:"Reply,omitempty"` } -func (m *PBRpcResponseData) Reset() { *m = PBRpcResponseData{} } -func (m *PBRpcResponseData) String() string { return proto.CompactTextString(m) } -func (*PBRpcResponseData) ProtoMessage() {} +func (x *PBRpcResponseData) Reset() { + *x = PBRpcResponseData{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_rpcproto_rpc_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PBRpcResponseData) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PBRpcResponseData) ProtoMessage() {} + +func (x *PBRpcResponseData) ProtoReflect() protoreflect.Message { + mi := &file_proto_rpcproto_rpc_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PBRpcResponseData.ProtoReflect.Descriptor instead. func (*PBRpcResponseData) Descriptor() ([]byte, []int) { - return fileDescriptor_0008ed1de5480352, []int{1} + return file_proto_rpcproto_rpc_proto_rawDescGZIP(), []int{1} } -func (m *PBRpcResponseData) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_PBRpcResponseData.Unmarshal(m, b) -} -func (m *PBRpcResponseData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_PBRpcResponseData.Marshal(b, m, deterministic) -} -func (m *PBRpcResponseData) XXX_Merge(src proto.Message) { - xxx_messageInfo_PBRpcResponseData.Merge(m, src) -} -func (m *PBRpcResponseData) XXX_Size() int { - return xxx_messageInfo_PBRpcResponseData.Size(m) -} -func (m *PBRpcResponseData) XXX_DiscardUnknown() { - xxx_messageInfo_PBRpcResponseData.DiscardUnknown(m) -} - -var xxx_messageInfo_PBRpcResponseData proto.InternalMessageInfo - -func (m *PBRpcResponseData) GetSeq() uint64 { - if m != nil { - return m.Seq +func (x *PBRpcResponseData) GetSeq() uint64 { + if x != nil { + return x.Seq } return 0 } -func (m *PBRpcResponseData) GetError() string { - if m != nil { - return m.Error +func (x *PBRpcResponseData) GetError() string { + if x != nil { + return x.Error } return "" } -func (m *PBRpcResponseData) GetReply() []byte { - if m != nil { - return m.Reply +func (x *PBRpcResponseData) GetReply() []byte { + if x != nil { + return x.Reply } return nil } -func init() { - proto.RegisterType((*PBRpcRequestData)(nil), "rpc.PBRpcRequestData") - proto.RegisterType((*PBRpcResponseData)(nil), "rpc.PBRpcResponseData") +var File_proto_rpcproto_rpc_proto protoreflect.FileDescriptor + +var file_proto_rpcproto_rpc_proto_rawDesc = []byte{ + 0x0a, 0x18, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x72, 0x70, 0x63, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2f, 0x72, 0x70, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x72, 0x70, 0x63, 0x22, + 0xa0, 0x01, 0x0a, 0x10, 0x50, 0x42, 0x52, 0x70, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x44, 0x61, 0x74, 0x61, 0x12, 0x10, 0x0a, 0x03, 0x53, 0x65, 0x71, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x04, 0x52, 0x03, 0x53, 0x65, 0x71, 0x12, 0x20, 0x0a, 0x0b, 0x52, 0x70, 0x63, 0x4d, 0x65, 0x74, + 0x68, 0x6f, 0x64, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0b, 0x52, 0x70, 0x63, + 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x49, 0x64, 0x12, 0x24, 0x0a, 0x0d, 0x53, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0d, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x18, + 0x0a, 0x07, 0x4e, 0x6f, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, + 0x07, 0x4e, 0x6f, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x49, 0x6e, 0x50, 0x61, + 0x72, 0x61, 0x6d, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x49, 0x6e, 0x50, 0x61, 0x72, + 0x61, 0x6d, 0x22, 0x51, 0x0a, 0x11, 0x50, 0x42, 0x52, 0x70, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x44, 0x61, 0x74, 0x61, 0x12, 0x10, 0x0a, 0x03, 0x53, 0x65, 0x71, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x04, 0x52, 0x03, 0x53, 0x65, 0x71, 0x12, 0x14, 0x0a, 0x05, 0x45, 0x72, 0x72, + 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x12, + 0x14, 0x0a, 0x05, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, + 0x52, 0x65, 0x70, 0x6c, 0x79, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x3b, 0x72, 0x70, 0x63, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } -func init() { proto.RegisterFile("rpcproto/rpc.proto", fileDescriptor_0008ed1de5480352) } +var ( + file_proto_rpcproto_rpc_proto_rawDescOnce sync.Once + file_proto_rpcproto_rpc_proto_rawDescData = file_proto_rpcproto_rpc_proto_rawDesc +) -var fileDescriptor_0008ed1de5480352 = []byte{ - // 193 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2a, 0x2a, 0x48, 0x2e, - 0x28, 0xca, 0x2f, 0xc9, 0xd7, 0x2f, 0x2a, 0x48, 0xd6, 0x03, 0xb3, 0x84, 0x98, 0x8b, 0x0a, 0x92, - 0x95, 0xea, 0xb8, 0x04, 0x02, 0x9c, 0x82, 0x0a, 0x92, 0x83, 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, - 0x5c, 0x12, 0x4b, 0x12, 0x85, 0x04, 0xb8, 0x98, 0x83, 0x53, 0x0b, 0x25, 0x18, 0x15, 0x18, 0x35, - 0x58, 0x82, 0x40, 0x4c, 0x21, 0x15, 0x2e, 0xde, 0xe0, 0xd4, 0xa2, 0xb2, 0xcc, 0xe4, 0x54, 0xdf, - 0xd4, 0x92, 0x8c, 0xfc, 0x14, 0x09, 0x26, 0x05, 0x46, 0x0d, 0xce, 0x20, 0x54, 0x41, 0x21, 0x09, - 0x2e, 0x76, 0xbf, 0xfc, 0xa0, 0xd4, 0x82, 0x9c, 0x4a, 0x09, 0x66, 0x05, 0x46, 0x0d, 0x8e, 0x20, - 0x18, 0x17, 0x24, 0xe3, 0x99, 0x17, 0x90, 0x58, 0x94, 0x98, 0x2b, 0xc1, 0xa2, 0xc0, 0xa8, 0xc1, - 0x13, 0x04, 0xe3, 0x2a, 0x05, 0x72, 0x09, 0x42, 0xed, 0x2f, 0x2e, 0xc8, 0xcf, 0x2b, 0x4e, 0xc5, - 0xe1, 0x00, 0x11, 0x2e, 0x56, 0xd7, 0xa2, 0xa2, 0xfc, 0x22, 0xa8, 0xc5, 0x10, 0x0e, 0x48, 0x14, - 0x61, 0x1d, 0x4f, 0x10, 0x84, 0xe3, 0xc4, 0x1e, 0xc5, 0xaa, 0x67, 0x5d, 0x54, 0x90, 0x9c, 0xc4, - 0x06, 0xf6, 0xa7, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0xcf, 0x5b, 0x74, 0xd8, 0xfd, 0x00, 0x00, - 0x00, +func file_proto_rpcproto_rpc_proto_rawDescGZIP() []byte { + file_proto_rpcproto_rpc_proto_rawDescOnce.Do(func() { + file_proto_rpcproto_rpc_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_rpcproto_rpc_proto_rawDescData) + }) + return file_proto_rpcproto_rpc_proto_rawDescData +} + +var file_proto_rpcproto_rpc_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_proto_rpcproto_rpc_proto_goTypes = []interface{}{ + (*PBRpcRequestData)(nil), // 0: rpc.PBRpcRequestData + (*PBRpcResponseData)(nil), // 1: rpc.PBRpcResponseData +} +var file_proto_rpcproto_rpc_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_proto_rpcproto_rpc_proto_init() } +func file_proto_rpcproto_rpc_proto_init() { + if File_proto_rpcproto_rpc_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_rpcproto_rpc_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PBRpcRequestData); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_rpcproto_rpc_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PBRpcResponseData); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_rpcproto_rpc_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_proto_rpcproto_rpc_proto_goTypes, + DependencyIndexes: file_proto_rpcproto_rpc_proto_depIdxs, + MessageInfos: file_proto_rpcproto_rpc_proto_msgTypes, + }.Build() + File_proto_rpcproto_rpc_proto = out.File + file_proto_rpcproto_rpc_proto_rawDesc = nil + file_proto_rpcproto_rpc_proto_goTypes = nil + file_proto_rpcproto_rpc_proto_depIdxs = nil } diff --git a/rpc/rpc.proto b/rpc/rpc.proto index 0066093..501c021 100644 --- a/rpc/rpc.proto +++ b/rpc/rpc.proto @@ -2,10 +2,11 @@ syntax = "proto3"; package rpc; option go_package =".;rpc"; message PBRpcRequestData{ - uint64 Seq = 1; - string ServiceMethod = 2; - bool NoReply = 3; - bytes InParam = 4; + uint64 Seq = 1; + uint32 RpcMethodId = 2; + string ServiceMethod = 3; + bool NoReply = 4; + bytes InParam = 5; } message PBRpcResponseData{ diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index b142fb7..cf80e84 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -45,10 +45,12 @@ type RpcMethodInfo struct { rpcProcessorType RpcProcessorType } +type RawRpcCallBack func(rawData []byte) type RpcHandler struct { callRequest chan *RpcRequest rpcHandler IRpcHandler mapFunctions map[string]RpcMethodInfo + mapRawFunctions map[uint32] RawRpcCallBack funcRpcClient FuncRpcClient funcRpcServer FuncRpcServer @@ -79,7 +81,7 @@ type IRpcHandler interface { AsyncCallNode(nodeId int,serviceMethod string,args interface{},callback interface{}) error CallNode(nodeId int,serviceMethod string,args interface{},reply interface{}) error GoNode(nodeId int,serviceMethod string,args interface{}) error - RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,serviceMethod string,args IRawInputArgs) error + RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,rpcMethodId uint32,serviceName string,args IRawInputArgs) error CastGo(serviceMethod string,args interface{}) IsSingleCoroutine() bool } @@ -99,7 +101,7 @@ func (handler *RpcHandler) GetRpcHandler() IRpcHandler{ func (handler *RpcHandler) InitRpcHandler(rpcHandler IRpcHandler,getClientFun FuncRpcClient,getServerFun FuncRpcServer) { handler.callRequest = make(chan *RpcRequest,1000000) handler.callResponseCallBack = make(chan *Call,1000000) - + handler.mapRawFunctions = make(map[uint32] RawRpcCallBack) handler.rpcHandler = rpcHandler handler.mapFunctions = map[string]RpcMethodInfo{} handler.funcRpcClient = getClientFun @@ -157,15 +159,11 @@ func (handler *RpcHandler) suitableMethods(method reflect.Method) error { return fmt.Errorf("%s Unsupported parameter types!",method.Name) } } - a := typ.In(parIdx).Kind() - if a == reflect.Interface { - rpcMethodInfo.inParam = nil - }else{ - rpcMethodInfo.inParamValue = reflect.New(typ.In(parIdx).Elem()) //append(rpcMethodInfo.iparam,) - rpcMethodInfo.inParam = reflect.New(typ.In(parIdx).Elem()).Interface() - pt,_ := GetProcessorType(rpcMethodInfo.inParamValue.Interface()) - rpcMethodInfo.rpcProcessorType = pt - } + + rpcMethodInfo.inParamValue = reflect.New(typ.In(parIdx).Elem()) //append(rpcMethodInfo.iparam,) + rpcMethodInfo.inParam = reflect.New(typ.In(parIdx).Elem()).Interface() + pt,_ := GetProcessorType(rpcMethodInfo.inParamValue.Interface()) + rpcMethodInfo.rpcProcessorType = pt parIdx++ if parIdx< typ.NumIn() { @@ -243,6 +241,20 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) { defer request.inputArgs.DoGc() } + //如果是原始RPC请求 + rawRpcId := request.RpcRequestData.GetRpcMethodId() + if rawRpcId>0 { + v,ok := handler.mapRawFunctions[rawRpcId] + if ok == false { + err := fmt.Sprintf("RpcHandler cannot find request rpc id %d!",rawRpcId) + log.Error(err) + return + } + v(request.inputArgs.GetRawData()) + return + } + + //普通的rpc请求 v,ok := handler.mapFunctions[request.RpcRequestData.GetServiceMethod()] if ok == false { err := "RpcHandler "+handler.rpcHandler.GetName()+"cannot find "+request.RpcRequestData.GetServiceMethod() @@ -255,45 +267,22 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) { var paramList []reflect.Value var err error - var iParam interface{} - //单协程或非异步调用时直接使用预置对象 - if v.inParam!= nil { - iParam = reflect.New(v.inParamValue.Type().Elem()).Interface() - } - + iParam := reflect.New(v.inParamValue.Type().Elem()).Interface() if request.bLocalRequest == false { - if iParam == nil { - //原始调用 - iParam = request.RpcRequestData.GetInParam() - }else{ - err = request.rpcProcessor.Unmarshal(request.RpcRequestData.GetInParam(),iParam) - if err!=nil { - rErr := "Call Rpc "+request.RpcRequestData.GetServiceMethod()+" Param error "+err.Error() - log.Error(rErr) - if request.requestHandle!=nil { - request.requestHandle(nil, RpcError(rErr)) - } - return + err = request.rpcProcessor.Unmarshal(request.RpcRequestData.GetInParam(),iParam) + if err!=nil { + rErr := "Call Rpc "+request.RpcRequestData.GetServiceMethod()+" Param error "+err.Error() + log.Error(rErr) + if request.requestHandle!=nil { + request.requestHandle(nil, RpcError(rErr)) } + return } }else { - if iParam == nil { - iParam = request.inputArgs.GetRawData() - }else if request.inputArgs!=nil { - err = request.rpcProcessor.Unmarshal(request.inputArgs.GetRawData(),iParam) - if err!=nil { - rErr := "Call Rpc "+request.RpcRequestData.GetServiceMethod()+" Param error "+err.Error() - log.Error(rErr) - if request.requestHandle!=nil { - request.requestHandle(nil, RpcError(rErr)) - } - return - } - }else { - iParam = request.localParam - } + iParam = request.localParam } + //生成Call参数 paramList = append(paramList,reflect.ValueOf(handler.GetRpcHandler())) //接受者 if v.hasResponder == true { if request.requestHandle!=nil { @@ -387,7 +376,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,s return pLocalRpcServer.myselfRpcHandlerGo(serviceName,serviceMethod,args,nil) } //其他的rpcHandler的处理器 - pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClientList[i],true,serviceName,serviceMethod,args,nil,nil) + pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClientList[i],true,serviceName,0,serviceMethod,args,nil,nil) if pCall.Err!=nil { err = pCall.Err } @@ -438,7 +427,7 @@ func (handler *RpcHandler) callRpc(nodeId int,serviceMethod string,args interfac return pLocalRpcServer.myselfRpcHandlerGo(serviceName,serviceMethod,args,reply) } //其他的rpcHandler的处理器 - pCall := pLocalRpcServer.selfNodeRpcHandlerGo(nil,pClient,false,serviceName,serviceMethod,args,reply,nil) + pCall := pLocalRpcServer.selfNodeRpcHandlerGo(nil,pClient,false,serviceName,0,serviceMethod,args,reply,nil) err = pCall.Done().Err pClient.RemovePending(pCall.Seq) ReleaseCall(pCall) @@ -570,11 +559,11 @@ func (handler *RpcHandler) CastGo(serviceMethod string,args interface{}) { handler.goRpc(nil,true,0,serviceMethod,args) } - -func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,serviceMethod string,args IRawInputArgs) error { +//RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,serviceName string,rpcMethodId uint32,args IRawInputArgs) error +func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,rpcMethodId uint32,serviceName string,args IRawInputArgs) error { processor := GetProcessor(uint8(rpcProcessorType)) var pClientList [maxClusterNode]*Client - err,count := handler.funcRpcClient(nodeId,serviceMethod,pClientList[:]) + err,count := handler.funcRpcClient(nodeId,serviceName,pClientList[:]) if count==0||err != nil { args.DoGc() log.Error("Call serviceMethod is error:%+v!",err) @@ -591,26 +580,15 @@ func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId in for i:=0;i