diff --git a/example/main.go b/example/main.go index 458e536..d5061e0 100644 --- a/example/main.go +++ b/example/main.go @@ -4,11 +4,13 @@ import ( "fmt" "github.com/duanhf2012/origin/event" "github.com/duanhf2012/origin/example/GateService" + "github.com/duanhf2012/origin/example/msgpb" "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/node" "github.com/duanhf2012/origin/service" "github.com/duanhf2012/origin/sysmodule" "github.com/duanhf2012/origin/sysservice" + "github.com/golang/protobuf/proto" "time" ) @@ -125,9 +127,32 @@ func (slf *Module4) OnRelease() { fmt.Printf("Release Module4:%d\n",slf.GetModuleId()) } +func (slf *TestServiceCall) TestProtobufRpc(){ +/* input := msgpb.InputRpc{} + input.Tag = proto.Int32(33333) + input.Msg = proto.String("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx") + + slf.AsyncCall("TestService1.RPC_TestPB",&input, func(b *msgpb.OutputRpc,err error) { + fmt.Print(*b,err) + }) + + */ + //(a *Param,b *Param) + var input Param + input.Index = 1111 + input.Pa = []string{"sadfsdf","cccccc"} + input.A = 33333 + input.B ="asfasfasfd" + + slf.AsyncCall("TestService1.RPC_Test",&input, func(b *Param,err error) { + fmt.Print(*b,err) + }) +} + func (slf *TestServiceCall) OnInit() error { slf.OpenProfiler() + slf.AfterFunc(time.Second*5,slf.TestProtobufRpc) //slf.AfterFunc(time.Second*1,slf.Run) //slf.AfterFunc(time.Second*1,slf.Test) moduleid1,_ = slf.AddModule(&Module1{}) @@ -224,6 +249,14 @@ func (slf *TestService1) RPC_Test(a *Param,b *Param) error { return nil } +func (slf *TestService1) RPC_TestPB(a *msgpb.InputRpc,b *msgpb.OutputRpc) error { + b.Msg = proto.String(a.GetMsg()) + b.Tag = proto.Int32(a.GetTag()) + + return nil +} + + func (slf *TestService1) OnInit() error { slf.OpenProfiler() return nil @@ -263,7 +296,7 @@ func (slf *TestService2) OnInit() error { func main(){ - + //rpc.SetProcessor(&rpc.PBProcessor{}) //data := P{3, 4, 5, "CloudGeek"} //buf := encode(data) diff --git a/rpc/client.go b/rpc/client.go index fa9dc6f..8b7142d 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -62,23 +62,26 @@ func (slf *Client) AsycGo(rpcHandler IRpcHandler,serviceMethod string,callback r call.rpcHandler = rpcHandler call.ServiceMethod = serviceMethod - request := &RpcRequest{} - request.NoReply = false - call.Arg = args - slf.pendingLock.Lock() - slf.startSeq += 1 - call.Seq = slf.startSeq - request.Seq = slf.startSeq - slf.pending[call.Seq] = call - slf.pendingLock.Unlock() - request.ServiceMethod = serviceMethod - var herr error - request.InParam,herr = processor.Marshal(args) + InParam,herr := processor.Marshal(args) if herr != nil { return herr } - bytes,err := processor.Marshal(request) + request := &RpcRequest{} + call.Arg = args + slf.pendingLock.Lock() + slf.startSeq += 1 + call.Seq = slf.startSeq + //request.Seq = slf.startSeq + //request.NoReply = false + //request.ServiceMethod = serviceMethod + request.RpcRequestData = processor.MakeRpcRequest(slf.startSeq,serviceMethod,false,InParam) + slf.pending[call.Seq] = call + slf.pendingLock.Unlock() + + + + bytes,err := processor.Marshal(request.RpcRequestData) if err != nil { return err } @@ -98,25 +101,27 @@ func (slf *Client) Go(noReply bool,serviceMethod string, args interface{},reply call.done = make(chan *Call,1) call.Reply = reply call.ServiceMethod = serviceMethod + + InParam,err := processor.Marshal(args) + if err != nil { + call.Err = err + return call + } + request := &RpcRequest{} - request.NoReply = noReply + //request.NoReply = noReply call.Arg = args slf.pendingLock.Lock() slf.startSeq += 1 call.Seq = slf.startSeq - request.Seq = slf.startSeq + //request.Seq = slf.startSeq + // request.ServiceMethod = serviceMethod + request.RpcRequestData = processor.MakeRpcRequest(slf.startSeq,serviceMethod,noReply,InParam) slf.pending[call.Seq] = call slf.pendingLock.Unlock() - request.ServiceMethod = serviceMethod - var herr error - request.InParam,herr = processor.Marshal(args) - if herr != nil { - call.Err = herr - return call - } - bytes,err := processor.Marshal(request) + bytes,err := processor.Marshal(request.RpcRequestData) if err != nil { call.Err = err return call @@ -138,13 +143,15 @@ func (slf *Client) Go(noReply bool,serviceMethod string, args interface{},reply type RequestHandler func(Returns interface{},Err *RpcError) type RpcRequest struct { + RpcRequestData IRpcRequestData + //packhead - Seq uint64 // sequence number chosen by client + /*Seq uint64 // sequence number chosen by client ServiceMethod string // format: "Service.Method" NoReply bool //是否需要返回 //packbody InParam []byte - +*/ //other data localReply interface{} localParam interface{} //本地调用的参数列表 @@ -153,12 +160,14 @@ type RpcRequest struct { } type RpcResponse struct { + RpcResponeData IRpcResponseData + /* //head Seq uint64 // sequence number chosen by client Err *RpcError //returns - Reply []byte + Reply []byte*/ } @@ -180,32 +189,33 @@ func (slf *Client) Run(){ } //1.解析head respone := &RpcResponse{} - err = processor.Unmarshal(bytes,respone) + respone.RpcResponeData =processor.MakeRpcResponse(0,nil,nil) + err = processor.Unmarshal(bytes,respone.RpcResponeData) if err != nil { log.Error("rpcClient Unmarshal head error,error:%+v",err) continue } slf.pendingLock.Lock() - v,ok := slf.pending[respone.Seq] + v,ok := slf.pending[respone.RpcResponeData.GetSeq()] if ok == false { - log.Error("rpcClient cannot find seq %d in pending",respone.Seq) + log.Error("rpcClient cannot find seq %d in pending",respone.RpcResponeData.GetSeq()) slf.pendingLock.Unlock() }else { - delete(slf.pending,respone.Seq) + delete(slf.pending,respone.RpcResponeData.GetSeq()) slf.pendingLock.Unlock() v.Err = nil - if len(respone.Reply) >0 { - err = processor.Unmarshal(respone.Reply,v.Reply) + if len(respone.RpcResponeData.GetReply()) >0 { + err = processor.Unmarshal(respone.RpcResponeData.GetReply(),v.Reply) if err != nil { log.Error("rpcClient Unmarshal body error,error:%+v",err) v.Err = err } } - if respone.Err != nil { - v.Err= respone.Err + if respone.RpcResponeData.GetErr() != nil { + v.Err= respone.RpcResponeData.GetErr() } if v.callback!=nil && v.callback.IsValid() { diff --git a/rpc/gobrpc/processor.go b/rpc/gobrpc/processor.go deleted file mode 100644 index 0272095..0000000 --- a/rpc/gobrpc/processor.go +++ /dev/null @@ -1 +0,0 @@ -package gobrpc diff --git a/rpc/jsonprocessor.go b/rpc/jsonprocessor.go index 2539171..0e5ae21 100644 --- a/rpc/jsonprocessor.go +++ b/rpc/jsonprocessor.go @@ -1,14 +1,27 @@ package rpc -import ( - - "encoding/json" - -) +import "encoding/json" type JsonProcessor struct { } +type JsonRpcRequestData struct { + //packhead + Seq uint64 // sequence number chosen by client + ServiceMethod string // format: "Service.Method" + NoReply bool //是否需要返回 + //packbody + InParam []byte +} + +type JsonRpcResponseData struct { + //head + Seq uint64 // sequence number chosen by client + Err string + + //returns + Reply []byte +} func (slf *JsonProcessor) Marshal(v interface{}) ([]byte, error){ @@ -16,7 +29,56 @@ func (slf *JsonProcessor) Marshal(v interface{}) ([]byte, error){ } func (slf *JsonProcessor) Unmarshal(data []byte, v interface{}) error{ - return json.Unmarshal(data,v) } +func (slf *JsonProcessor) MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData{ + return &JsonRpcRequestData{Seq:seq,ServiceMethod:serviceMethod,NoReply:noReply,InParam:inParam} +} + +func (slf *JsonProcessor) MakeRpcResponse(seq uint64,err *RpcError,reply []byte) IRpcResponseData { + return &JsonRpcResponseData{ + Seq: seq, + Err: err.Error(), + Reply: reply, + } +} + +func (slf *JsonRpcRequestData) IsReply() bool{ + return slf.NoReply +} + +func (slf *JsonRpcRequestData) GetSeq() uint64{ + return slf.Seq +} + +func (slf *JsonRpcRequestData) GetServiceMethod() string{ + return slf.ServiceMethod +} + +func (slf *JsonRpcRequestData) GetInParam() []byte{ + return slf.InParam +} + +func (slf *JsonRpcResponseData) GetSeq() uint64 { + return slf.Seq +} + +func (slf *JsonRpcResponseData) GetErr() *RpcError { + if slf.Err == ""{ + return nil + } + + return Errorf(slf.Err) +} + + +func (slf *JsonRpcResponseData) GetReply() []byte{ + return slf.Reply +} + + + + + + diff --git a/rpc/pbprocessor.go b/rpc/pbprocessor.go new file mode 100644 index 0000000..561a784 --- /dev/null +++ b/rpc/pbprocessor.go @@ -0,0 +1,58 @@ +package rpc + +import ( + "github.com/golang/protobuf/proto" +) + +type PBProcessor struct { +} + +func (slf *PBRpcRequestData) MakeRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte) *PBRpcRequestData{ + slf.Seq = proto.Uint64(seq) + slf.ServiceMethod = proto.String(serviceMethod) + slf.NoReply = proto.Bool(noReply) + slf.InParam = inParam + return slf +} + +func (slf *PBRpcResponseData) MakeRespone(seq uint64,err *RpcError,reply []byte) *PBRpcResponseData{ + slf.Seq = proto.Uint64(seq) + slf.Error = proto.String(err.Error()) + slf.Reply = reply + + return slf +} +func (slf *PBProcessor) Marshal(v interface{}) ([]byte, error){ + return proto.Marshal(v.(proto.Message)) +} + +func (slf *PBProcessor) Unmarshal(data []byte, msg interface{}) error{ + protoMsg := msg.(proto.Message) + return proto.Unmarshal(data, protoMsg) +} + + +func (slf *PBProcessor) MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData{ + return (&PBRpcRequestData{}).MakeRequest(seq,serviceMethod,noReply,inParam) +} + +func (slf *PBProcessor) MakeRpcResponse(seq uint64,err *RpcError,reply []byte) IRpcResponseData { + return (&PBRpcResponseData{}).MakeRespone(seq,err,reply) +} + +func (slf *PBRpcRequestData) IsReply() bool{ + return slf.GetNoReply() +} + +func (slf *PBRpcResponseData) GetErr() *RpcError { + return Errorf(slf.GetError()) +} + + + + + + + + + diff --git a/rpc/probufprocessor.go b/rpc/probufprocessor.go new file mode 100644 index 0000000..9ab1e3e --- /dev/null +++ b/rpc/probufprocessor.go @@ -0,0 +1 @@ +package rpc diff --git a/rpc/processor.go b/rpc/processor.go new file mode 100644 index 0000000..cc84a10 --- /dev/null +++ b/rpc/processor.go @@ -0,0 +1,43 @@ +package rpc + +/* + Seq uint64 // sequence number chosen by client + ServiceMethod string // format: "Service.Method" + NoReply bool //是否需要返回 + //packbody + InParam []byte + + + + +type RpcResponse struct { + //head + Seq uint64 // sequence number chosen by client + Err *RpcError + + //returns + Reply []byte +} +*/ +type IRpcRequestData interface { + GetSeq() uint64 + GetServiceMethod() string + GetInParam() []byte + IsReply() bool +} + +type IRpcResponseData interface { + GetSeq() uint64 + GetErr() *RpcError + GetReply() []byte +} + +type IRpcProcessor interface { + Marshal(v interface{}) ([]byte, error) + Unmarshal(data []byte, v interface{}) error + + MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData + MakeRpcResponse(seq uint64,err *RpcError,reply []byte) IRpcResponseData +} + + diff --git a/rpc/rpc.pb.go b/rpc/rpc.pb.go new file mode 100644 index 0000000..dcbe976 --- /dev/null +++ b/rpc/rpc.pb.go @@ -0,0 +1,161 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: rpc.proto + +package rpc + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type PBRpcRequestData struct { + Seq *uint64 `protobuf:"varint,1,opt,name=Seq" json:"Seq,omitempty"` + ServiceMethod *string `protobuf:"bytes,2,opt,name=ServiceMethod" json:"ServiceMethod,omitempty"` + NoReply *bool `protobuf:"varint,3,opt,name=NoReply" json:"NoReply,omitempty"` + InParam []byte `protobuf:"bytes,4,opt,name=InParam" json:"InParam,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *PBRpcRequestData) Reset() { *m = PBRpcRequestData{} } +func (m *PBRpcRequestData) String() string { return proto.CompactTextString(m) } +func (*PBRpcRequestData) ProtoMessage() {} +func (*PBRpcRequestData) Descriptor() ([]byte, []int) { + return fileDescriptor_77a6da22d6a3feb1, []int{0} +} + +func (m *PBRpcRequestData) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_PBRpcRequestData.Unmarshal(m, b) +} +func (m *PBRpcRequestData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_PBRpcRequestData.Marshal(b, m, deterministic) +} +func (m *PBRpcRequestData) XXX_Merge(src proto.Message) { + xxx_messageInfo_PBRpcRequestData.Merge(m, src) +} +func (m *PBRpcRequestData) XXX_Size() int { + return xxx_messageInfo_PBRpcRequestData.Size(m) +} +func (m *PBRpcRequestData) XXX_DiscardUnknown() { + xxx_messageInfo_PBRpcRequestData.DiscardUnknown(m) +} + +var xxx_messageInfo_PBRpcRequestData proto.InternalMessageInfo + +func (m *PBRpcRequestData) GetSeq() uint64 { + if m != nil && m.Seq != nil { + return *m.Seq + } + return 0 +} + +func (m *PBRpcRequestData) GetServiceMethod() string { + if m != nil && m.ServiceMethod != nil { + return *m.ServiceMethod + } + return "" +} + +func (m *PBRpcRequestData) GetNoReply() bool { + if m != nil && m.NoReply != nil { + return *m.NoReply + } + return false +} + +func (m *PBRpcRequestData) GetInParam() []byte { + if m != nil { + return m.InParam + } + return nil +} + +type PBRpcResponseData struct { + Seq *uint64 `protobuf:"varint,1,opt,name=Seq" json:"Seq,omitempty"` + Error *string `protobuf:"bytes,2,opt,name=Error" json:"Error,omitempty"` + Reply []byte `protobuf:"bytes,3,opt,name=Reply" json:"Reply,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *PBRpcResponseData) Reset() { *m = PBRpcResponseData{} } +func (m *PBRpcResponseData) String() string { return proto.CompactTextString(m) } +func (*PBRpcResponseData) ProtoMessage() {} +func (*PBRpcResponseData) Descriptor() ([]byte, []int) { + return fileDescriptor_77a6da22d6a3feb1, []int{1} +} + +func (m *PBRpcResponseData) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_PBRpcResponseData.Unmarshal(m, b) +} +func (m *PBRpcResponseData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_PBRpcResponseData.Marshal(b, m, deterministic) +} +func (m *PBRpcResponseData) XXX_Merge(src proto.Message) { + xxx_messageInfo_PBRpcResponseData.Merge(m, src) +} +func (m *PBRpcResponseData) XXX_Size() int { + return xxx_messageInfo_PBRpcResponseData.Size(m) +} +func (m *PBRpcResponseData) XXX_DiscardUnknown() { + xxx_messageInfo_PBRpcResponseData.DiscardUnknown(m) +} + +var xxx_messageInfo_PBRpcResponseData proto.InternalMessageInfo + +func (m *PBRpcResponseData) GetSeq() uint64 { + if m != nil && m.Seq != nil { + return *m.Seq + } + return 0 +} + +func (m *PBRpcResponseData) GetError() string { + if m != nil && m.Error != nil { + return *m.Error + } + return "" +} + +func (m *PBRpcResponseData) GetReply() []byte { + if m != nil { + return m.Reply + } + return nil +} + +func init() { + proto.RegisterType((*PBRpcRequestData)(nil), "rpc.PBRpcRequestData") + proto.RegisterType((*PBRpcResponseData)(nil), "rpc.PBRpcResponseData") +} + +func init() { proto.RegisterFile("rpc.proto", fileDescriptor_77a6da22d6a3feb1) } + +var fileDescriptor_77a6da22d6a3feb1 = []byte{ + // 173 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2c, 0x2a, 0x48, 0xd6, + 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2e, 0x2a, 0x48, 0x56, 0xaa, 0xe3, 0x12, 0x08, 0x70, + 0x0a, 0x2a, 0x48, 0x0e, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x71, 0x49, 0x2c, 0x49, 0x14, 0x12, + 0xe0, 0x62, 0x0e, 0x4e, 0x2d, 0x94, 0x60, 0x54, 0x60, 0xd4, 0x60, 0x09, 0x02, 0x31, 0x85, 0x54, + 0xb8, 0x78, 0x83, 0x53, 0x8b, 0xca, 0x32, 0x93, 0x53, 0x7d, 0x53, 0x4b, 0x32, 0xf2, 0x53, 0x24, + 0x98, 0x14, 0x18, 0x35, 0x38, 0x83, 0x50, 0x05, 0x85, 0x24, 0xb8, 0xd8, 0xfd, 0xf2, 0x83, 0x52, + 0x0b, 0x72, 0x2a, 0x25, 0x98, 0x15, 0x18, 0x35, 0x38, 0x82, 0x60, 0x5c, 0x90, 0x8c, 0x67, 0x5e, + 0x40, 0x62, 0x51, 0x62, 0xae, 0x04, 0x8b, 0x02, 0xa3, 0x06, 0x4f, 0x10, 0x8c, 0xab, 0x14, 0xc8, + 0x25, 0x08, 0xb5, 0xbf, 0xb8, 0x20, 0x3f, 0xaf, 0x38, 0x15, 0x87, 0x03, 0x44, 0xb8, 0x58, 0x5d, + 0x8b, 0x8a, 0xf2, 0x8b, 0xa0, 0x16, 0x43, 0x38, 0x20, 0x51, 0x84, 0x75, 0x3c, 0x41, 0x10, 0x0e, + 0x20, 0x00, 0x00, 0xff, 0xff, 0x84, 0xbc, 0x24, 0x3a, 0xe3, 0x00, 0x00, 0x00, +} diff --git a/rpc/rpc.proto b/rpc/rpc.proto new file mode 100644 index 0000000..3a593dd --- /dev/null +++ b/rpc/rpc.proto @@ -0,0 +1,19 @@ +syntax = "proto2"; + + +package rpc; + + +message PBRpcRequestData{ + optional uint64 Seq = 1; + optional string ServiceMethod = 2; + optional bool NoReply = 3; + optional bytes InParam = 4; +} + +message PBRpcResponseData{ + optional uint64 Seq = 1; + optional string Error = 2; + optional bytes Reply = 3; +} + diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index fb8211a..68f8a65 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -16,9 +16,12 @@ var NilError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem()) type RpcError string +func (e *RpcError) Error() string { + if e == nil { + return "" + } -func (e RpcError) Error() string { - return string(e) + return string(*e) } func ConvertError(e error) *RpcError{ @@ -193,7 +196,7 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) { buf := make([]byte, 4096) l := runtime.Stack(buf, false) err := fmt.Errorf("%v: %s", r, buf[:l]) - log.Error("Handler Rpc %s Core dump info:%+v\n",request.ServiceMethod,err) + log.Error("Handler Rpc %s Core dump info:%+v\n",request.RpcRequestData.GetServiceMethod(),err) rpcErr := RpcError("call error : core dumps") if request.requestHandle!=nil { request.requestHandle(nil,&rpcErr) @@ -201,9 +204,9 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) { } }() - v,ok := slf.mapfunctons[request.ServiceMethod] + v,ok := slf.mapfunctons[request.RpcRequestData.GetServiceMethod()] if ok == false { - err := Errorf("RpcHandler %s cannot find %s",slf.rpcHandler.GetName(),request.ServiceMethod) + err := Errorf("RpcHandler %s cannot find %s",slf.rpcHandler.GetName(),request.RpcRequestData.GetServiceMethod()) log.Error("%s",err.Error()) if request.requestHandle!=nil { request.requestHandle(nil,err) @@ -215,9 +218,9 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) { var paramList []reflect.Value var err error if request.localParam==nil{ - err = processor.Unmarshal(request.InParam,&v.iparam) + err = processor.Unmarshal(request.RpcRequestData.GetInParam(),v.iparam) if err!=nil { - rerr := Errorf("Call Rpc %s Param error %+v",request.ServiceMethod,err) + rerr := Errorf("Call Rpc %s Param error %+v",request.RpcRequestData.GetServiceMethod(),err) log.Error("%s",rerr.Error()) if request.requestHandle!=nil { request.requestHandle(nil, rerr) diff --git a/rpc/server.go b/rpc/server.go index 64b1239..68a089d 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -10,9 +10,11 @@ import ( "strings" ) -var processor iprocessor = &JsonProcessor{} +var processor IRpcProcessor = &JsonProcessor{} var LittleEndian bool + + type Call struct { Seq uint64 ServiceMethod string @@ -30,10 +32,7 @@ func (slf *Call) Done() *Call{ return <-slf.done } -type iprocessor interface { - Marshal(v interface{}) ([]byte, error) - Unmarshal(data []byte, v interface{}) error -} + type Server struct { functions map[interface{}]interface{} @@ -49,6 +48,10 @@ type RpcHandleFinder interface { FindRpcHandler(serviceMethod string) IRpcHandler } +func SetProcessor(proc IRpcProcessor) { + processor = proc +} + func (slf *Server) Init(rpcHandleFinder RpcHandleFinder) { slf.cmdchannel = make(chan *Call,10000) slf.rpcHandleFinder = rpcHandleFinder @@ -80,17 +83,20 @@ type RpcAgent struct { func (agent *RpcAgent) WriteRespone(serviceMethod string,seq uint64,reply interface{},err *RpcError) { var rpcRespone RpcResponse - rpcRespone.Seq = seq - rpcRespone.Err = err + + //rpcRespone.Seq = seq + //rpcRespone.Err = err + var mReply []byte + var rpcError *RpcError var errM error if reply!=nil { - rpcRespone.Reply,errM = processor.Marshal(reply) + mReply,errM = processor.Marshal(reply) if errM != nil { - rpcRespone.Err = ConvertError(errM) + rpcError = ConvertError(errM) } } - - bytes,errM := processor.Marshal(&rpcRespone) + rpcRespone.RpcResponeData = processor.MakeRpcResponse(seq,rpcError,mReply) + bytes,errM := processor.Marshal(rpcRespone.RpcResponeData) if errM != nil { log.Error("service method %s %+v Marshal error:%+v!", serviceMethod,rpcRespone,errM) return @@ -114,11 +120,12 @@ func (agent *RpcAgent) Run() { //解析head var req RpcRequest - err = processor.Unmarshal(data,&req) + req.RpcRequestData = processor.MakeRpcRequest(0,"",false,nil) + err = processor.Unmarshal(data,req.RpcRequestData) if err != nil { - if req.Seq>0 { + if req.RpcRequestData.GetSeq()>0 { rpcError := RpcError("rpc Unmarshal request is error") - agent.WriteRespone(req.ServiceMethod,req.Seq,nil,&rpcError) + agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError) continue }else{ log.Error("rpc Unmarshal request is error: %v", err) @@ -128,25 +135,25 @@ func (agent *RpcAgent) Run() { } //交给程序处理 - serviceMethod := strings.Split(req.ServiceMethod,".") + serviceMethod := strings.Split(req.RpcRequestData.GetServiceMethod(),".") if len(serviceMethod)!=2 { rpcError := RpcError("rpc request req.ServiceMethod is error") - agent.WriteRespone(req.ServiceMethod,req.Seq,nil,&rpcError) + agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError) log.Debug("rpc request req.ServiceMethod is error") continue } rpcHandler := agent.rpcserver.rpcHandleFinder.FindRpcHandler(serviceMethod[0]) if rpcHandler== nil { - rpcError := RpcError(fmt.Sprintf("service method %s not config!", req.ServiceMethod)) - agent.WriteRespone(req.ServiceMethod,req.Seq,nil,&rpcError) - log.Error("service method %s not config!", req.ServiceMethod) + rpcError := RpcError(fmt.Sprintf("service method %s not config!", req.RpcRequestData.GetServiceMethod())) + agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError) + log.Error("service method %s not config!", req.RpcRequestData.GetServiceMethod()) continue } - if req.NoReply == false { + if req.RpcRequestData.IsReply()== false { req.requestHandle = func(Returns interface{},Err *RpcError){ - agent.WriteRespone(req.ServiceMethod,req.Seq,Returns,Err) + agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),Returns,Err) } } @@ -206,11 +213,12 @@ func (slf *Server) rpcHandlerGo(noReply bool,handlerName string,methodName strin return pCall } var req RpcRequest - req.ServiceMethod = fmt.Sprintf("%s.%s",handlerName,methodName) + + //req.ServiceMethod = fmt.Sprintf("%s.%s",handlerName,methodName) req.localParam = args req.localReply = reply - req.NoReply = noReply - + //req.NoReply = noReply + req.RpcRequestData = processor.MakeRpcRequest(0,fmt.Sprintf("%s.%s",handlerName,methodName),noReply,nil) if noReply == false { req.requestHandle = func(Returns interface{},Err *RpcError){ if Err!=nil { @@ -243,11 +251,11 @@ func (slf *Server) rpcHandlerAsyncGo(callerRpcHandler IRpcHandler,noReply bool,h } var req RpcRequest - req.ServiceMethod = fmt.Sprintf("%s.%s",handlerName,methodName) + //req.ServiceMethod = fmt.Sprintf("%s.%s",handlerName,methodName) req.localParam = args req.localReply = reply - req.NoReply = noReply - + //req.NoReply = noReply + req.RpcRequestData = processor.MakeRpcRequest(0,fmt.Sprintf("%s.%s",handlerName,methodName),noReply,nil) if noReply == false { req.requestHandle = func(Returns interface{},Err *RpcError){ if Err == nil { diff --git a/service/service.go b/service/service.go index 6f77599..d0fe596 100644 --- a/service/service.go +++ b/service/service.go @@ -109,7 +109,7 @@ func (slf *Service) Run() { bStop = true case rpcRequest :=<- rpcRequestChan: if slf.profiler!=nil { - analyzer = slf.profiler.Push("Req_"+rpcRequest.ServiceMethod) + analyzer = slf.profiler.Push("Req_"+rpcRequest.RpcRequestData.GetServiceMethod()) } slf.GetRpcHandler().HandlerRpcRequest(rpcRequest)