新增RawGoNode与RawCastGo接口,支持RPC附带原始数据

This commit is contained in:
boyce
2020-09-02 13:39:18 +08:00
parent 663cb18ce0
commit 0185fe350f
9 changed files with 462 additions and 57 deletions

View File

@@ -173,7 +173,7 @@ func (slf *Client) AsycCall(rpcHandler IRpcHandler,serviceMethod string,callback
request := &RpcRequest{} request := &RpcRequest{}
call.Arg = args call.Arg = args
call.Seq = slf.generateSeq() call.Seq = slf.generateSeq()
request.RpcRequestData = processor.MakeRpcRequest(slf.startSeq,serviceMethod,false,InParam) request.RpcRequestData = processor.MakeRpcRequest(slf.startSeq,serviceMethod,false,InParam,nil)
slf.AddPending(call) slf.AddPending(call)
bytes,err := processor.Marshal(request.RpcRequestData) bytes,err := processor.Marshal(request.RpcRequestData)
@@ -199,16 +199,10 @@ func (slf *Client) AsycCall(rpcHandler IRpcHandler,serviceMethod string,callback
return err return err
} }
func (slf *Client) Go(noReply bool,serviceMethod string, args interface{},reply interface{}) *Call { func (slf *Client) RawGo(noReply bool,serviceMethod string,args []byte,additionParam interface{},reply interface{}) *Call {
call := MakeCall() call := MakeCall()
call.Reply = reply
call.ServiceMethod = serviceMethod call.ServiceMethod = serviceMethod
call.Reply = reply
InParam,err := processor.Marshal(args)
if err != nil {
call.Err = err
return call
}
request := &RpcRequest{} request := &RpcRequest{}
call.Arg = args call.Arg = args
@@ -216,7 +210,7 @@ func (slf *Client) Go(noReply bool,serviceMethod string, args interface{},reply
if noReply == false { if noReply == false {
slf.AddPending(call) slf.AddPending(call)
} }
request.RpcRequestData = processor.MakeRpcRequest(slf.startSeq,serviceMethod,noReply,InParam) request.RpcRequestData = processor.MakeRpcRequest(slf.startSeq,serviceMethod,noReply,args,additionParam)
bytes,err := processor.Marshal(request.RpcRequestData) bytes,err := processor.Marshal(request.RpcRequestData)
processor.ReleaseRpcRequest(request.RpcRequestData) processor.ReleaseRpcRequest(request.RpcRequestData)
if err != nil { if err != nil {
@@ -230,7 +224,7 @@ func (slf *Client) Go(noReply bool,serviceMethod string, args interface{},reply
slf.RemovePending(call.Seq) slf.RemovePending(call.Seq)
return call return call
} }
err = slf.conn.WriteMsg(bytes) err = slf.conn.WriteMsg(bytes)
if err != nil { if err != nil {
slf.RemovePending(call.Seq) slf.RemovePending(call.Seq)
@@ -240,6 +234,16 @@ func (slf *Client) Go(noReply bool,serviceMethod string, args interface{},reply
return call return call
} }
func (slf *Client) Go(noReply bool,serviceMethod string, args interface{},reply interface{}) *Call {
InParam,err := processor.Marshal(args)
if err != nil {
call := MakeCall()
call.Err = err
}
return slf.RawGo(noReply,serviceMethod,InParam,nil,reply)
}
func (slf *Client) Run(){ func (slf *Client) Run(){
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {

View File

@@ -17,6 +17,7 @@ type JsonRpcRequestData struct {
NoReply bool //是否需要返回 NoReply bool //是否需要返回
//packbody //packbody
InParam []byte InParam []byte
AdditionParam interface{}
} }
@@ -54,13 +55,13 @@ func (slf *JsonProcessor) Unmarshal(data []byte, v interface{}) error{
} }
func (slf *JsonProcessor) MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte,additionParam interface{}) IRpcRequestData{
func (slf *JsonProcessor) MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData{
jsonRpcRequestData := rpcJsonRequestDataPool.Get().(*JsonRpcRequestData) jsonRpcRequestData := rpcJsonRequestDataPool.Get().(*JsonRpcRequestData)
jsonRpcRequestData.Seq = seq jsonRpcRequestData.Seq = seq
jsonRpcRequestData.ServiceMethod = serviceMethod jsonRpcRequestData.ServiceMethod = serviceMethod
jsonRpcRequestData.NoReply = noReply jsonRpcRequestData.NoReply = noReply
jsonRpcRequestData.InParam = inParam jsonRpcRequestData.InParam = inParam
jsonRpcRequestData.AdditionParam = additionParam
return jsonRpcRequestData return jsonRpcRequestData
} }
@@ -96,6 +97,15 @@ func (slf *JsonRpcRequestData) GetInParam() []byte{
return slf.InParam return slf.InParam
} }
func (slf *JsonRpcRequestData) GetParamValue() interface{}{
return slf.AdditionParam
}
func (slf *JsonRpcRequestData) GetAdditionParams() IRawAdditionParam{
return slf
}
func (slf *JsonRpcResponseData) GetSeq() uint64 { func (slf *JsonRpcResponseData) GetSeq() uint64 {
return slf.Seq return slf.Seq
} }

View File

@@ -2,6 +2,7 @@ package rpc
import ( import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"fmt"
"sync" "sync"
) )
@@ -22,11 +23,68 @@ func init(){
} }
} }
func (slf *PBRpcRequestData) MakeRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte) *PBRpcRequestData{ func (m *PBRpcRequestData) GetParamValue() interface{}{
if m.GetAddtionParam() == nil {
return nil
}
switch x := m.AddtionParam.AdditionOneof.(type) {
case *AdditionParam_SParam:
return x.SParam
case *AdditionParam_UParam:
return x.UParam
case *AdditionParam_StrParam:
return x.StrParam
case *AdditionParam_BParam:
return x.BParam
}
return nil
}
func (m *PBRpcRequestData) GetAdditionParams() IRawAdditionParam{
if m.GetAddtionParam() == nil {
return nil
}
return m
}
func (slf *PBRpcRequestData) MakeRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte,inAdditionParam interface{}) *PBRpcRequestData{
slf.Seq = proto.Uint64(seq) slf.Seq = proto.Uint64(seq)
slf.ServiceMethod = proto.String(serviceMethod) slf.ServiceMethod = proto.String(serviceMethod)
slf.NoReply = proto.Bool(noReply) slf.NoReply = proto.Bool(noReply)
slf.InParam = inParam slf.InParam = inParam
if inAdditionParam == nil {
return slf
}
switch inAdditionParam.(type) {
case *int:
slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_SParam{int64(*inAdditionParam.(*int))}}
case *int32:
slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_SParam{int64(*inAdditionParam.(*int32))}}
case *int16:
slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_SParam{int64(*inAdditionParam.(*int16))}}
case *int64:
slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_SParam{*inAdditionParam.(*int64)}}
case *uint:
slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_UParam{uint64(*inAdditionParam.(*uint))}}
case *uint32:
slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_UParam{uint64(*inAdditionParam.(*uint32))}}
case *uint16:
slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_UParam{uint64(*inAdditionParam.(*uint16))}}
case *uint64:
slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_UParam{*inAdditionParam.(*uint64)}}
case *string:
slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_StrParam{*inAdditionParam.(*string)}}
case *[]byte:
slf.AddtionParam = &AdditionParam{AdditionOneof: &AdditionParam_BParam{*inAdditionParam.(*[]byte)}}
default:
panic(fmt.Sprintf("not support type %+v",inAdditionParam))
}
return slf return slf
} }
@@ -50,9 +108,9 @@ func (slf *PBProcessor) Unmarshal(data []byte, msg interface{}) error{
} }
func (slf *PBProcessor) MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData{ func (slf *PBProcessor) MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte,inAdditionParam interface{}) IRpcRequestData{
pPbRpcRequestData := rpcPbRequestDataPool.Get().(*PBRpcRequestData) pPbRpcRequestData := rpcPbRequestDataPool.Get().(*PBRpcRequestData)
pPbRpcRequestData.MakeRequest(seq,serviceMethod,noReply,inParam) pPbRpcRequestData.MakeRequest(seq,serviceMethod,noReply,inParam,inAdditionParam)
return pPbRpcRequestData return pPbRpcRequestData
} }

View File

@@ -3,7 +3,7 @@ package rpc
type IRpcProcessor interface { type IRpcProcessor interface {
Marshal(v interface{}) ([]byte, error) //b表示自定义缓冲区可以填nil由系统自动分配 Marshal(v interface{}) ([]byte, error) //b表示自定义缓冲区可以填nil由系统自动分配
Unmarshal(data []byte, v interface{}) error Unmarshal(data []byte, v interface{}) error
MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte,additionParam interface{}) IRpcRequestData
MakeRpcResponse(seq uint64,err *RpcError,reply []byte) IRpcResponseData MakeRpcResponse(seq uint64,err *RpcError,reply []byte) IRpcResponseData
ReleaseRpcRequest(rpcRequestData IRpcRequestData) ReleaseRpcRequest(rpcRequestData IRpcRequestData)

View File

@@ -9,8 +9,10 @@ import (
type RpcRequest struct { type RpcRequest struct {
RpcRequestData IRpcRequestData RpcRequestData IRpcRequestData
bLocalRequest bool
localReply interface{} localReply interface{}
localParam interface{} //本地调用的参数列表 localParam interface{} //本地调用的参数列表
localRawParam []byte
requestHandle RequestHandler requestHandle RequestHandler
callback *reflect.Value callback *reflect.Value
} }
@@ -33,11 +35,16 @@ func (slf *RpcResponse) Clear() *RpcResponse{
return slf return slf
} }
type IRawAdditionParam interface {
GetParamValue() interface{}
}
type IRpcRequestData interface { type IRpcRequestData interface {
GetSeq() uint64 GetSeq() uint64
GetServiceMethod() string GetServiceMethod() string
GetInParam() []byte GetInParam() []byte
IsNoReply() bool IsNoReply() bool
GetAdditionParams() IRawAdditionParam
} }
type IRpcResponseData interface { type IRpcResponseData interface {
@@ -48,6 +55,13 @@ type IRpcResponseData interface {
type RequestHandler func(Returns interface{},Err *RpcError) type RequestHandler func(Returns interface{},Err *RpcError)
type RawAdditionParamNull struct {
}
func (slf *RawAdditionParamNull) GetParamValue() interface{}{
return nil
}
type Call struct { type Call struct {
Seq uint64 Seq uint64

View File

@@ -20,21 +20,216 @@ var _ = math.Inf
// proto package needs to be updated. // proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type AdditionParam struct {
// Types that are valid to be assigned to AdditionOneof:
// *AdditionParam_SParam
// *AdditionParam_UParam
// *AdditionParam_StrParam
// *AdditionParam_BParam
AdditionOneof isAdditionParam_AdditionOneof `protobuf_oneof:"addition_oneof"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *AdditionParam) Reset() { *m = AdditionParam{} }
func (m *AdditionParam) String() string { return proto.CompactTextString(m) }
func (*AdditionParam) ProtoMessage() {}
func (*AdditionParam) Descriptor() ([]byte, []int) {
return fileDescriptor_77a6da22d6a3feb1, []int{0}
}
func (m *AdditionParam) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_AdditionParam.Unmarshal(m, b)
}
func (m *AdditionParam) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_AdditionParam.Marshal(b, m, deterministic)
}
func (m *AdditionParam) XXX_Merge(src proto.Message) {
xxx_messageInfo_AdditionParam.Merge(m, src)
}
func (m *AdditionParam) XXX_Size() int {
return xxx_messageInfo_AdditionParam.Size(m)
}
func (m *AdditionParam) XXX_DiscardUnknown() {
xxx_messageInfo_AdditionParam.DiscardUnknown(m)
}
var xxx_messageInfo_AdditionParam proto.InternalMessageInfo
type isAdditionParam_AdditionOneof interface {
isAdditionParam_AdditionOneof()
}
type AdditionParam_SParam struct {
SParam int64 `protobuf:"varint,10,opt,name=SParam,oneof"`
}
type AdditionParam_UParam struct {
UParam uint64 `protobuf:"varint,11,opt,name=UParam,oneof"`
}
type AdditionParam_StrParam struct {
StrParam string `protobuf:"bytes,12,opt,name=StrParam,oneof"`
}
type AdditionParam_BParam struct {
BParam []byte `protobuf:"bytes,13,opt,name=BParam,oneof"`
}
func (*AdditionParam_SParam) isAdditionParam_AdditionOneof() {}
func (*AdditionParam_UParam) isAdditionParam_AdditionOneof() {}
func (*AdditionParam_StrParam) isAdditionParam_AdditionOneof() {}
func (*AdditionParam_BParam) isAdditionParam_AdditionOneof() {}
func (m *AdditionParam) GetAdditionOneof() isAdditionParam_AdditionOneof {
if m != nil {
return m.AdditionOneof
}
return nil
}
func (m *AdditionParam) GetSParam() int64 {
if x, ok := m.GetAdditionOneof().(*AdditionParam_SParam); ok {
return x.SParam
}
return 0
}
func (m *AdditionParam) GetUParam() uint64 {
if x, ok := m.GetAdditionOneof().(*AdditionParam_UParam); ok {
return x.UParam
}
return 0
}
func (m *AdditionParam) GetStrParam() string {
if x, ok := m.GetAdditionOneof().(*AdditionParam_StrParam); ok {
return x.StrParam
}
return ""
}
func (m *AdditionParam) GetBParam() []byte {
if x, ok := m.GetAdditionOneof().(*AdditionParam_BParam); ok {
return x.BParam
}
return nil
}
// XXX_OneofFuncs is for the internal use of the proto package.
func (*AdditionParam) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
return _AdditionParam_OneofMarshaler, _AdditionParam_OneofUnmarshaler, _AdditionParam_OneofSizer, []interface{}{
(*AdditionParam_SParam)(nil),
(*AdditionParam_UParam)(nil),
(*AdditionParam_StrParam)(nil),
(*AdditionParam_BParam)(nil),
}
}
func _AdditionParam_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
m := msg.(*AdditionParam)
// addition_oneof
switch x := m.AdditionOneof.(type) {
case *AdditionParam_SParam:
b.EncodeVarint(10<<3 | proto.WireVarint)
b.EncodeVarint(uint64(x.SParam))
case *AdditionParam_UParam:
b.EncodeVarint(11<<3 | proto.WireVarint)
b.EncodeVarint(uint64(x.UParam))
case *AdditionParam_StrParam:
b.EncodeVarint(12<<3 | proto.WireBytes)
b.EncodeStringBytes(x.StrParam)
case *AdditionParam_BParam:
b.EncodeVarint(13<<3 | proto.WireBytes)
b.EncodeRawBytes(x.BParam)
case nil:
default:
return fmt.Errorf("AdditionParam.AdditionOneof has unexpected type %T", x)
}
return nil
}
func _AdditionParam_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) {
m := msg.(*AdditionParam)
switch tag {
case 10: // addition_oneof.SParam
if wire != proto.WireVarint {
return true, proto.ErrInternalBadWireType
}
x, err := b.DecodeVarint()
m.AdditionOneof = &AdditionParam_SParam{int64(x)}
return true, err
case 11: // addition_oneof.UParam
if wire != proto.WireVarint {
return true, proto.ErrInternalBadWireType
}
x, err := b.DecodeVarint()
m.AdditionOneof = &AdditionParam_UParam{x}
return true, err
case 12: // addition_oneof.StrParam
if wire != proto.WireBytes {
return true, proto.ErrInternalBadWireType
}
x, err := b.DecodeStringBytes()
m.AdditionOneof = &AdditionParam_StrParam{x}
return true, err
case 13: // addition_oneof.BParam
if wire != proto.WireBytes {
return true, proto.ErrInternalBadWireType
}
x, err := b.DecodeRawBytes(true)
m.AdditionOneof = &AdditionParam_BParam{x}
return true, err
default:
return false, nil
}
}
func _AdditionParam_OneofSizer(msg proto.Message) (n int) {
m := msg.(*AdditionParam)
// addition_oneof
switch x := m.AdditionOneof.(type) {
case *AdditionParam_SParam:
n += 1 // tag and wire
n += proto.SizeVarint(uint64(x.SParam))
case *AdditionParam_UParam:
n += 1 // tag and wire
n += proto.SizeVarint(uint64(x.UParam))
case *AdditionParam_StrParam:
n += 1 // tag and wire
n += proto.SizeVarint(uint64(len(x.StrParam)))
n += len(x.StrParam)
case *AdditionParam_BParam:
n += 1 // tag and wire
n += proto.SizeVarint(uint64(len(x.BParam)))
n += len(x.BParam)
case nil:
default:
panic(fmt.Sprintf("proto: unexpected type %T in oneof", x))
}
return n
}
type PBRpcRequestData struct { type PBRpcRequestData struct {
Seq *uint64 `protobuf:"varint,1,opt,name=Seq" json:"Seq,omitempty"` Seq *uint64 `protobuf:"varint,1,opt,name=Seq" json:"Seq,omitempty"`
ServiceMethod *string `protobuf:"bytes,2,opt,name=ServiceMethod" json:"ServiceMethod,omitempty"` ServiceMethod *string `protobuf:"bytes,2,opt,name=ServiceMethod" json:"ServiceMethod,omitempty"`
NoReply *bool `protobuf:"varint,3,opt,name=NoReply" json:"NoReply,omitempty"` NoReply *bool `protobuf:"varint,3,opt,name=NoReply" json:"NoReply,omitempty"`
InParam []byte `protobuf:"bytes,4,opt,name=InParam" json:"InParam,omitempty"` InParam []byte `protobuf:"bytes,4,opt,name=InParam" json:"InParam,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` AddtionParam *AdditionParam `protobuf:"bytes,5,opt,name=addtionParam" json:"addtionParam,omitempty"`
XXX_unrecognized []byte `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
} }
func (m *PBRpcRequestData) Reset() { *m = PBRpcRequestData{} } func (m *PBRpcRequestData) Reset() { *m = PBRpcRequestData{} }
func (m *PBRpcRequestData) String() string { return proto.CompactTextString(m) } func (m *PBRpcRequestData) String() string { return proto.CompactTextString(m) }
func (*PBRpcRequestData) ProtoMessage() {} func (*PBRpcRequestData) ProtoMessage() {}
func (*PBRpcRequestData) Descriptor() ([]byte, []int) { func (*PBRpcRequestData) Descriptor() ([]byte, []int) {
return fileDescriptor_77a6da22d6a3feb1, []int{0} return fileDescriptor_77a6da22d6a3feb1, []int{1}
} }
func (m *PBRpcRequestData) XXX_Unmarshal(b []byte) error { func (m *PBRpcRequestData) XXX_Unmarshal(b []byte) error {
@@ -83,6 +278,13 @@ func (m *PBRpcRequestData) GetInParam() []byte {
return nil return nil
} }
func (m *PBRpcRequestData) GetAddtionParam() *AdditionParam {
if m != nil {
return m.AddtionParam
}
return nil
}
type PBRpcResponseData struct { type PBRpcResponseData struct {
Seq *uint64 `protobuf:"varint,1,opt,name=Seq" json:"Seq,omitempty"` Seq *uint64 `protobuf:"varint,1,opt,name=Seq" json:"Seq,omitempty"`
Error *string `protobuf:"bytes,2,opt,name=Error" json:"Error,omitempty"` Error *string `protobuf:"bytes,2,opt,name=Error" json:"Error,omitempty"`
@@ -96,7 +298,7 @@ func (m *PBRpcResponseData) Reset() { *m = PBRpcResponseData{} }
func (m *PBRpcResponseData) String() string { return proto.CompactTextString(m) } func (m *PBRpcResponseData) String() string { return proto.CompactTextString(m) }
func (*PBRpcResponseData) ProtoMessage() {} func (*PBRpcResponseData) ProtoMessage() {}
func (*PBRpcResponseData) Descriptor() ([]byte, []int) { func (*PBRpcResponseData) Descriptor() ([]byte, []int) {
return fileDescriptor_77a6da22d6a3feb1, []int{1} return fileDescriptor_77a6da22d6a3feb1, []int{2}
} }
func (m *PBRpcResponseData) XXX_Unmarshal(b []byte) error { func (m *PBRpcResponseData) XXX_Unmarshal(b []byte) error {
@@ -139,6 +341,7 @@ func (m *PBRpcResponseData) GetReply() []byte {
} }
func init() { func init() {
proto.RegisterType((*AdditionParam)(nil), "rpc.AdditionParam")
proto.RegisterType((*PBRpcRequestData)(nil), "rpc.PBRpcRequestData") proto.RegisterType((*PBRpcRequestData)(nil), "rpc.PBRpcRequestData")
proto.RegisterType((*PBRpcResponseData)(nil), "rpc.PBRpcResponseData") proto.RegisterType((*PBRpcResponseData)(nil), "rpc.PBRpcResponseData")
} }
@@ -146,16 +349,23 @@ func init() {
func init() { proto.RegisterFile("rpc.proto", fileDescriptor_77a6da22d6a3feb1) } func init() { proto.RegisterFile("rpc.proto", fileDescriptor_77a6da22d6a3feb1) }
var fileDescriptor_77a6da22d6a3feb1 = []byte{ var fileDescriptor_77a6da22d6a3feb1 = []byte{
// 173 bytes of a gzipped FileDescriptorProto // 274 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2c, 0x2a, 0x48, 0xd6, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x90, 0xb1, 0x4e, 0xc3, 0x30,
0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2e, 0x2a, 0x48, 0x56, 0xaa, 0xe3, 0x12, 0x08, 0x70, 0x10, 0x86, 0x6b, 0xd2, 0x42, 0x7b, 0x4d, 0x50, 0xb0, 0x18, 0x3c, 0x30, 0x58, 0x11, 0x83, 0xa7,
0x0a, 0x2a, 0x48, 0x0e, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x71, 0x49, 0x2c, 0x49, 0x14, 0x12, 0x0e, 0x0c, 0xec, 0x44, 0x20, 0x95, 0x01, 0x54, 0x2e, 0x62, 0x46, 0x56, 0x62, 0x44, 0x24, 0x88,
0xe0, 0x62, 0x0e, 0x4e, 0x2d, 0x94, 0x60, 0x54, 0x60, 0xd4, 0x60, 0x09, 0x02, 0x31, 0x85, 0x54, 0x5d, 0xc7, 0x20, 0xf1, 0x10, 0xbc, 0x0e, 0xcf, 0x87, 0x0e, 0xa7, 0x45, 0x19, 0xd8, 0xee, 0xbb,
0xb8, 0x78, 0x83, 0x53, 0x8b, 0xca, 0x32, 0x93, 0x53, 0x7d, 0x53, 0x4b, 0x32, 0xf2, 0x53, 0x24, 0xcf, 0xa7, 0xfb, 0x7d, 0xb0, 0xf0, 0xae, 0x5e, 0x39, 0x6f, 0x83, 0xe5, 0x89, 0x77, 0x75, 0xf1,
0x98, 0x14, 0x18, 0x35, 0x38, 0x83, 0x50, 0x05, 0x85, 0x24, 0xb8, 0xd8, 0xfd, 0xf2, 0x83, 0x52, 0xc5, 0x20, 0xbb, 0x6a, 0x9a, 0x36, 0xb4, 0xb6, 0xdb, 0x68, 0xaf, 0xdf, 0xb8, 0x80, 0xc3, 0xea,
0x0b, 0x72, 0x2a, 0x25, 0x98, 0x15, 0x18, 0x35, 0x38, 0x82, 0x60, 0x5c, 0x90, 0x8c, 0x67, 0x5e, 0xb7, 0x12, 0x20, 0x99, 0x4a, 0xd6, 0x13, 0x1c, 0x98, 0xcc, 0x63, 0x34, 0x4b, 0xc9, 0xd4, 0x94,
0x40, 0x62, 0x51, 0x62, 0xae, 0x04, 0x8b, 0x02, 0xa3, 0x06, 0x4f, 0x10, 0x8c, 0xab, 0x14, 0xc8, 0x4c, 0x64, 0x7e, 0x06, 0xf3, 0x2a, 0xf8, 0xe8, 0x52, 0xc9, 0xd4, 0x62, 0x3d, 0xc1, 0x7d, 0x87,
0x25, 0x08, 0xb5, 0xbf, 0xb8, 0x20, 0x3f, 0xaf, 0x38, 0x15, 0x87, 0x03, 0x44, 0xb8, 0x58, 0x5d, 0xe6, 0xca, 0xe8, 0x32, 0xc9, 0x54, 0x4a, 0x73, 0x91, 0xcb, 0x1c, 0x8e, 0xf5, 0xb0, 0xfc, 0xc9,
0x8b, 0x8a, 0xf2, 0x8b, 0xa0, 0x16, 0x43, 0x38, 0x20, 0x51, 0x84, 0x75, 0x3c, 0x41, 0x10, 0x0e, 0x76, 0xc6, 0x3e, 0x17, 0xdf, 0x0c, 0xf2, 0x4d, 0x89, 0xae, 0x46, 0xb3, 0x7d, 0x37, 0x7d, 0xb8,
0x20, 0x00, 0x00, 0xff, 0xff, 0x84, 0xbc, 0x24, 0x3a, 0xe3, 0x00, 0x00, 0x00, 0xd6, 0x41, 0xf3, 0x1c, 0x92, 0xca, 0x6c, 0x05, 0xa3, 0xad, 0x48, 0x25, 0x3f, 0x87, 0xac, 0x32,
0xfe, 0xa3, 0xad, 0xcd, 0x9d, 0x09, 0x2f, 0xb6, 0x11, 0x07, 0xb4, 0x15, 0xc7, 0x4d, 0x2e, 0xe0,
0xe8, 0xde, 0xa2, 0x71, 0xaf, 0x9f, 0x22, 0x91, 0x4c, 0xcd, 0x71, 0x87, 0x64, 0x6e, 0xe3, 0x7f,
0xc5, 0x94, 0x32, 0xe1, 0x0e, 0xf9, 0x25, 0xa4, 0xba, 0x69, 0xf6, 0xe7, 0x10, 0x33, 0xc9, 0xd4,
0xf2, 0x82, 0xaf, 0xe8, 0x6e, 0xa3, 0x43, 0xe1, 0xe8, 0x5d, 0xf1, 0x00, 0x27, 0x43, 0xee, 0xde,
0xd9, 0xae, 0x37, 0xff, 0x04, 0x3f, 0x85, 0xd9, 0x8d, 0xf7, 0xd6, 0x0f, 0x81, 0x23, 0x50, 0xf7,
0x2f, 0x66, 0x8a, 0x11, 0x7e, 0x02, 0x00, 0x00, 0xff, 0xff, 0x00, 0x25, 0x65, 0x17, 0xac, 0x01,
0x00, 0x00,
} }

View File

@@ -3,12 +3,22 @@ syntax = "proto2";
package rpc; package rpc;
message AdditionParam {
oneof addition_oneof {
int64 SParam = 10;
uint64 UParam = 11;
string StrParam = 12;
bytes BParam = 13;
}
}
message PBRpcRequestData{
message PBRpcRequestData{
optional uint64 Seq = 1; optional uint64 Seq = 1;
optional string ServiceMethod = 2; optional string ServiceMethod = 2;
optional bool NoReply = 3; optional bool NoReply = 3;
optional bytes InParam = 4; optional bytes InParam = 4;
optional AdditionParam addtionParam = 5;
} }
message PBRpcResponseData{ message PBRpcResponseData{

View File

@@ -43,6 +43,9 @@ type RpcMethodInfo struct {
method reflect.Method method reflect.Method
iparam reflect.Value iparam reflect.Value
oParam reflect.Value oParam reflect.Value
additionParam reflect.Value
//addition *IRawAdditionParam
hashAdditionParam bool
} }
type RpcHandler struct { type RpcHandler struct {
@@ -75,6 +78,10 @@ type IRpcHandler interface {
GoNode(nodeId int,serviceMethod string,args interface{}) error GoNode(nodeId int,serviceMethod string,args interface{}) error
} }
var rawAdditionParamValueNull reflect.Value
func init(){
rawAdditionParamValueNull = reflect.ValueOf(&RawAdditionParamNull{})
}
func (slf *RpcHandler) GetRpcHandler() IRpcHandler{ func (slf *RpcHandler) GetRpcHandler() IRpcHandler{
return slf.rpcHandler return slf.rpcHandler
} }
@@ -125,16 +132,25 @@ func (slf *RpcHandler) suitableMethods(method reflect.Method) error {
return fmt.Errorf("%s The return parameter must be of type error!",method.Name) return fmt.Errorf("%s The return parameter must be of type error!",method.Name)
} }
if typ.NumIn() != 3 { if typ.NumIn() > 4 {
return fmt.Errorf("%s The number of input arguments must be 1!",method.Name) return fmt.Errorf("%s The number of input arguments must be 1!",method.Name)
} }
if slf.isExportedOrBuiltinType(typ.In(1)) == false || slf.isExportedOrBuiltinType(typ.In(2)) == false { if slf.isExportedOrBuiltinType(typ.In(1)) == false || slf.isExportedOrBuiltinType(typ.In(2)) == false{
return fmt.Errorf("%s Unsupported parameter types!",method.Name) return fmt.Errorf("%s Unsupported parameter types!",method.Name)
} }
rpcMethodInfo.iparam = reflect.New(typ.In(1).Elem()) //append(rpcMethodInfo.iparam,) parIdx := 1
rpcMethodInfo.oParam = reflect.New(typ.In(2).Elem()) if typ.NumIn() == 4 {
if slf.isExportedOrBuiltinType(typ.In(3)) == false {
return fmt.Errorf("%s Unsupported parameter types!",method.Name)
}
rpcMethodInfo.hashAdditionParam = true
parIdx++
}
rpcMethodInfo.iparam = reflect.New(typ.In(parIdx).Elem()) //append(rpcMethodInfo.iparam,)
parIdx++
rpcMethodInfo.oParam = reflect.New(typ.In(parIdx).Elem())
rpcMethodInfo.method = method rpcMethodInfo.method = method
slf.mapfunctons[slf.rpcHandler.GetName()+"."+method.Name] = rpcMethodInfo slf.mapfunctons[slf.rpcHandler.GetName()+"."+method.Name] = rpcMethodInfo
@@ -213,16 +229,13 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
if request.requestHandle!=nil { if request.requestHandle!=nil {
request.requestHandle(nil,err) request.requestHandle(nil,err)
} }
return return
} }
var paramList []reflect.Value var paramList []reflect.Value
var err error var err error
iparam := reflect.New(v.iparam.Type().Elem()).Interface() iparam := reflect.New(v.iparam.Type().Elem()).Interface()
if request.bLocalRequest == false {
if request.localParam==nil{
err = processor.Unmarshal(request.RpcRequestData.GetInParam(),iparam) err = processor.Unmarshal(request.RpcRequestData.GetInParam(),iparam)
if err!=nil { if err!=nil {
rerr := Errorf("Call Rpc %s Param error %+v",request.RpcRequestData.GetServiceMethod(),err) rerr := Errorf("Call Rpc %s Param error %+v",request.RpcRequestData.GetServiceMethod(),err)
@@ -232,11 +245,32 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
} }
} }
}else { }else {
iparam = request.localParam if request.localRawParam!=nil {
err = processor.Unmarshal(request.localRawParam,iparam)
if err!=nil {
rerr := Errorf("Call Rpc %s Param error %+v",request.RpcRequestData.GetServiceMethod(),err)
log.Error("%s",rerr.Error())
if request.requestHandle!=nil {
request.requestHandle(nil, rerr)
}
}
}else {
iparam = request.localParam
}
} }
var oParam reflect.Value var oParam reflect.Value
paramList = append(paramList,reflect.ValueOf(slf.GetRpcHandler())) //接受者 paramList = append(paramList,reflect.ValueOf(slf.GetRpcHandler())) //接受者
additionParams := request.RpcRequestData.GetAdditionParams()
if v.hashAdditionParam == true{
if additionParams!=nil && additionParams.GetParamValue()!=nil{
additionVal := reflect.ValueOf(additionParams)
paramList = append(paramList,additionVal)
}else{
paramList = append(paramList,rawAdditionParamValueNull)
}
}
paramList = append(paramList,reflect.ValueOf(iparam)) paramList = append(paramList,reflect.ValueOf(iparam))
if request.localReply!=nil { if request.localReply!=nil {
oParam = reflect.ValueOf(request.localReply) //输出参数 oParam = reflect.ValueOf(request.localReply) //输出参数
@@ -312,7 +346,7 @@ func (slf *RpcHandler) goRpc(bCast bool,nodeId int,serviceMethod string,args int
return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,nil) return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,nil)
} }
//其他的rpcHandler的处理器 //其他的rpcHandler的处理器
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(pClient,true,sMethod[0],sMethod[1],args,nil) pCall := pLocalRpcServer.selfNodeRpcHandlerGo(pClient,true,sMethod[0],sMethod[1],args,nil,nil,nil)
if pCall.Err!=nil { if pCall.Err!=nil {
err = pCall.Err err = pCall.Err
} }
@@ -331,6 +365,61 @@ func (slf *RpcHandler) goRpc(bCast bool,nodeId int,serviceMethod string,args int
return err return err
} }
func (slf *RpcHandler) rawGoRpc(bCast bool,nodeId int,serviceMethod string,args []byte,additionParam interface{}) error {
var pClientList []*Client
err := slf.funcRpcClient(nodeId,serviceMethod,&pClientList)
if err != nil {
log.Error("Call serviceMethod is error:%+v!",err)
return err
}
if len(pClientList) > 1 && bCast == false {
log.Error("Cannot call more then 1 node!")
return fmt.Errorf("Cannot call more then 1 node!")
}
//2.rpcclient调用
//如果调用本结点服务
for _,pClient := range pClientList {
if pClient.bSelfNode == true {
pLocalRpcServer:=slf.funcRpcServer()
//判断是否是同一服务
sMethod := strings.Split(serviceMethod,".")
if len(sMethod)!=2 {
serr := fmt.Errorf("Call serviceMethod %s is error!",serviceMethod)
log.Error("%+v",serr)
if serr!= nil {
err = serr
}
continue
}
//调用自己rpcHandler处理器
if sMethod[0] == slf.rpcHandler.GetName() { //自己服务调用
//
return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,nil)
}
//其他的rpcHandler的处理器
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(pClient,true,sMethod[0],sMethod[1],nil,args,nil,additionParam)
if pCall.Err!=nil {
err = pCall.Err
}
ReleaseCall(pCall)
continue
}
//跨node调用
pCall := pClient.RawGo(true,serviceMethod,args,additionParam,nil)
if pCall.Err!=nil {
err = pCall.Err
}
ReleaseCall(pCall)
}
return err
}
func (slf *RpcHandler) callRpc(nodeId int,serviceMethod string,args interface{},reply interface{}) error { func (slf *RpcHandler) callRpc(nodeId int,serviceMethod string,args interface{},reply interface{}) error {
var pClientList []*Client var pClientList []*Client
err := slf.funcRpcClient(nodeId,serviceMethod,&pClientList) err := slf.funcRpcClient(nodeId,serviceMethod,&pClientList)
@@ -361,7 +450,7 @@ func (slf *RpcHandler) callRpc(nodeId int,serviceMethod string,args interface{},
return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,reply) return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,reply)
} }
//其他的rpcHandler的处理器 //其他的rpcHandler的处理器
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(pClient,false,sMethod[0],sMethod[1],args,reply) pCall := pLocalRpcServer.selfNodeRpcHandlerGo(pClient,false,sMethod[0],sMethod[1],args,nil,reply,nil)
err = pCall.Done().Err err = pCall.Done().Err
pClient.RemovePending(pCall.Seq) pClient.RemovePending(pCall.Seq)
ReleaseCall(pCall) ReleaseCall(pCall)
@@ -436,7 +525,6 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args interfa
}else{ }else{
fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)}) fVal.Call([]reflect.Value{reflect.ValueOf(reply),reflect.ValueOf(err)})
} }
} }
//其他的rpcHandler的处理器 //其他的rpcHandler的处理器
@@ -447,7 +535,7 @@ func (slf *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args interfa
} }
return nil return nil
} }
pCall := pLocalRpcServer.selfNodeRpcHandlerGo(pClient,false,sMethod[0],sMethod[1],args,reply) pCall := pLocalRpcServer.selfNodeRpcHandlerGo(pClient,false,sMethod[0],sMethod[1],args,nil,reply,nil)
err = pCall.Done().Err err = pCall.Done().Err
pClient.RemovePending(pCall.Seq) pClient.RemovePending(pCall.Seq)
ReleaseCall(pCall) ReleaseCall(pCall)
@@ -500,3 +588,12 @@ func (slf *RpcHandler) GoNode(nodeId int,serviceMethod string,args interface{})
func (slf *RpcHandler) CastGo(serviceMethod string,args interface{}) { func (slf *RpcHandler) CastGo(serviceMethod string,args interface{}) {
slf.goRpc(true,0,serviceMethod,args) slf.goRpc(true,0,serviceMethod,args)
} }
func (slf *RpcHandler) RawGoNode(nodeId int,serviceMethod string,args []byte,additionParam interface{}) error {
return slf.rawGoRpc(false,nodeId,serviceMethod,args,additionParam)
}
func (slf *RpcHandler) RawCastGo(serviceMethod string,args []byte,additionParam interface{}) {
slf.goRpc(true,0,serviceMethod,args)
}

View File

@@ -98,10 +98,9 @@ func (agent *RpcAgent) Run() {
//will close tcpconn //will close tcpconn
break break
} }
//解析head //解析head
req := MakeRpcRequest() req := MakeRpcRequest()
req.RpcRequestData = processor.MakeRpcRequest(0,"",false,nil) req.RpcRequestData = processor.MakeRpcRequest(0,"",false,nil,nil)
err = processor.Unmarshal(data,req.RpcRequestData) err = processor.Unmarshal(data,req.RpcRequestData)
if err != nil { if err != nil {
log.Error("rpc Unmarshal request is error: %v", err) log.Error("rpc Unmarshal request is error: %v", err)
@@ -201,7 +200,7 @@ func (slf *Server) myselfRpcHandlerGo(handlerName string,methodName string, args
} }
func (slf *Server) selfNodeRpcHandlerGo(client *Client,noReply bool,handlerName string,methodName string, args interface{},reply interface{}) *Call { func (slf *Server) selfNodeRpcHandlerGo(client *Client,noReply bool,handlerName string,methodName string, args interface{},rawArgs []byte,reply interface{},additionParam interface{}) *Call {
pCall := MakeCall() pCall := MakeCall()
pCall.Seq = client.generateSeq() pCall.Seq = client.generateSeq()
@@ -214,9 +213,11 @@ func (slf *Server) selfNodeRpcHandlerGo(client *Client,noReply bool,handlerName
} }
req := MakeRpcRequest() req := MakeRpcRequest()
req.bLocalRequest = true
req.localParam = args req.localParam = args
req.localReply = reply req.localReply = reply
req.RpcRequestData = processor.MakeRpcRequest(0,fmt.Sprintf("%s.%s",handlerName,methodName),noReply,nil) req.localRawParam = rawArgs
req.RpcRequestData = processor.MakeRpcRequest(0,fmt.Sprintf("%s.%s",handlerName,methodName),noReply,nil,additionParam)
if noReply == false { if noReply == false {
client.AddPending(pCall) client.AddPending(pCall)
req.requestHandle = func(Returns interface{},Err *RpcError){ req.requestHandle = func(Returns interface{},Err *RpcError){
@@ -265,7 +266,8 @@ func (slf *Server) selfNodeRpcHandlerAsyncGo(client *Client,callerRpcHandler IRp
req := MakeRpcRequest() req := MakeRpcRequest()
req.localParam = args req.localParam = args
req.localReply = reply req.localReply = reply
req.RpcRequestData = processor.MakeRpcRequest(0,fmt.Sprintf("%s.%s",handlerName,methodName),noReply,nil) req.bLocalRequest = true
req.RpcRequestData = processor.MakeRpcRequest(0,fmt.Sprintf("%s.%s",handlerName,methodName),noReply,nil,nil)
if noReply == false { if noReply == false {
client.AddPending(pCall) client.AddPending(pCall)
req.requestHandle = func(Returns interface{},Err *RpcError){ req.requestHandle = func(Returns interface{},Err *RpcError){