扩统rpc序列化与反序列化支持

This commit is contained in:
duanhf2012
2020-04-21 13:23:18 +08:00
parent a906002f29
commit f74f3a812e
12 changed files with 474 additions and 77 deletions

View File

@@ -4,11 +4,13 @@ import (
"fmt"
"github.com/duanhf2012/origin/event"
"github.com/duanhf2012/origin/example/GateService"
"github.com/duanhf2012/origin/example/msgpb"
"github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/node"
"github.com/duanhf2012/origin/service"
"github.com/duanhf2012/origin/sysmodule"
"github.com/duanhf2012/origin/sysservice"
"github.com/golang/protobuf/proto"
"time"
)
@@ -125,9 +127,32 @@ func (slf *Module4) OnRelease() {
fmt.Printf("Release Module4:%d\n",slf.GetModuleId())
}
func (slf *TestServiceCall) TestProtobufRpc(){
/* input := msgpb.InputRpc{}
input.Tag = proto.Int32(33333)
input.Msg = proto.String("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
slf.AsyncCall("TestService1.RPC_TestPB",&input, func(b *msgpb.OutputRpc,err error) {
fmt.Print(*b,err)
})
*/
//(a *Param,b *Param)
var input Param
input.Index = 1111
input.Pa = []string{"sadfsdf","cccccc"}
input.A = 33333
input.B ="asfasfasfd"
slf.AsyncCall("TestService1.RPC_Test",&input, func(b *Param,err error) {
fmt.Print(*b,err)
})
}
func (slf *TestServiceCall) OnInit() error {
slf.OpenProfiler()
slf.AfterFunc(time.Second*5,slf.TestProtobufRpc)
//slf.AfterFunc(time.Second*1,slf.Run)
//slf.AfterFunc(time.Second*1,slf.Test)
moduleid1,_ = slf.AddModule(&Module1{})
@@ -224,6 +249,14 @@ func (slf *TestService1) RPC_Test(a *Param,b *Param) error {
return nil
}
func (slf *TestService1) RPC_TestPB(a *msgpb.InputRpc,b *msgpb.OutputRpc) error {
b.Msg = proto.String(a.GetMsg())
b.Tag = proto.Int32(a.GetTag())
return nil
}
func (slf *TestService1) OnInit() error {
slf.OpenProfiler()
return nil
@@ -263,7 +296,7 @@ func (slf *TestService2) OnInit() error {
func main(){
//rpc.SetProcessor(&rpc.PBProcessor{})
//data := P{3, 4, 5, "CloudGeek"}
//buf := encode(data)

View File

@@ -62,23 +62,26 @@ func (slf *Client) AsycGo(rpcHandler IRpcHandler,serviceMethod string,callback r
call.rpcHandler = rpcHandler
call.ServiceMethod = serviceMethod
request := &RpcRequest{}
request.NoReply = false
call.Arg = args
slf.pendingLock.Lock()
slf.startSeq += 1
call.Seq = slf.startSeq
request.Seq = slf.startSeq
slf.pending[call.Seq] = call
slf.pendingLock.Unlock()
request.ServiceMethod = serviceMethod
var herr error
request.InParam,herr = processor.Marshal(args)
InParam,herr := processor.Marshal(args)
if herr != nil {
return herr
}
bytes,err := processor.Marshal(request)
request := &RpcRequest{}
call.Arg = args
slf.pendingLock.Lock()
slf.startSeq += 1
call.Seq = slf.startSeq
//request.Seq = slf.startSeq
//request.NoReply = false
//request.ServiceMethod = serviceMethod
request.RpcRequestData = processor.MakeRpcRequest(slf.startSeq,serviceMethod,false,InParam)
slf.pending[call.Seq] = call
slf.pendingLock.Unlock()
bytes,err := processor.Marshal(request.RpcRequestData)
if err != nil {
return err
}
@@ -98,25 +101,27 @@ func (slf *Client) Go(noReply bool,serviceMethod string, args interface{},reply
call.done = make(chan *Call,1)
call.Reply = reply
call.ServiceMethod = serviceMethod
InParam,err := processor.Marshal(args)
if err != nil {
call.Err = err
return call
}
request := &RpcRequest{}
request.NoReply = noReply
//request.NoReply = noReply
call.Arg = args
slf.pendingLock.Lock()
slf.startSeq += 1
call.Seq = slf.startSeq
request.Seq = slf.startSeq
//request.Seq = slf.startSeq
// request.ServiceMethod = serviceMethod
request.RpcRequestData = processor.MakeRpcRequest(slf.startSeq,serviceMethod,noReply,InParam)
slf.pending[call.Seq] = call
slf.pendingLock.Unlock()
request.ServiceMethod = serviceMethod
var herr error
request.InParam,herr = processor.Marshal(args)
if herr != nil {
call.Err = herr
return call
}
bytes,err := processor.Marshal(request)
bytes,err := processor.Marshal(request.RpcRequestData)
if err != nil {
call.Err = err
return call
@@ -138,13 +143,15 @@ func (slf *Client) Go(noReply bool,serviceMethod string, args interface{},reply
type RequestHandler func(Returns interface{},Err *RpcError)
type RpcRequest struct {
RpcRequestData IRpcRequestData
//packhead
Seq uint64 // sequence number chosen by client
/*Seq uint64 // sequence number chosen by client
ServiceMethod string // format: "Service.Method"
NoReply bool //是否需要返回
//packbody
InParam []byte
*/
//other data
localReply interface{}
localParam interface{} //本地调用的参数列表
@@ -153,12 +160,14 @@ type RpcRequest struct {
}
type RpcResponse struct {
RpcResponeData IRpcResponseData
/*
//head
Seq uint64 // sequence number chosen by client
Err *RpcError
//returns
Reply []byte
Reply []byte*/
}
@@ -180,32 +189,33 @@ func (slf *Client) Run(){
}
//1.解析head
respone := &RpcResponse{}
err = processor.Unmarshal(bytes,respone)
respone.RpcResponeData =processor.MakeRpcResponse(0,nil,nil)
err = processor.Unmarshal(bytes,respone.RpcResponeData)
if err != nil {
log.Error("rpcClient Unmarshal head error,error:%+v",err)
continue
}
slf.pendingLock.Lock()
v,ok := slf.pending[respone.Seq]
v,ok := slf.pending[respone.RpcResponeData.GetSeq()]
if ok == false {
log.Error("rpcClient cannot find seq %d in pending",respone.Seq)
log.Error("rpcClient cannot find seq %d in pending",respone.RpcResponeData.GetSeq())
slf.pendingLock.Unlock()
}else {
delete(slf.pending,respone.Seq)
delete(slf.pending,respone.RpcResponeData.GetSeq())
slf.pendingLock.Unlock()
v.Err = nil
if len(respone.Reply) >0 {
err = processor.Unmarshal(respone.Reply,v.Reply)
if len(respone.RpcResponeData.GetReply()) >0 {
err = processor.Unmarshal(respone.RpcResponeData.GetReply(),v.Reply)
if err != nil {
log.Error("rpcClient Unmarshal body error,error:%+v",err)
v.Err = err
}
}
if respone.Err != nil {
v.Err= respone.Err
if respone.RpcResponeData.GetErr() != nil {
v.Err= respone.RpcResponeData.GetErr()
}
if v.callback!=nil && v.callback.IsValid() {

View File

@@ -1 +0,0 @@
package gobrpc

View File

@@ -1,14 +1,27 @@
package rpc
import (
"encoding/json"
)
import "encoding/json"
type JsonProcessor struct {
}
type JsonRpcRequestData struct {
//packhead
Seq uint64 // sequence number chosen by client
ServiceMethod string // format: "Service.Method"
NoReply bool //是否需要返回
//packbody
InParam []byte
}
type JsonRpcResponseData struct {
//head
Seq uint64 // sequence number chosen by client
Err string
//returns
Reply []byte
}
func (slf *JsonProcessor) Marshal(v interface{}) ([]byte, error){
@@ -16,7 +29,56 @@ func (slf *JsonProcessor) Marshal(v interface{}) ([]byte, error){
}
func (slf *JsonProcessor) Unmarshal(data []byte, v interface{}) error{
return json.Unmarshal(data,v)
}
func (slf *JsonProcessor) MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData{
return &JsonRpcRequestData{Seq:seq,ServiceMethod:serviceMethod,NoReply:noReply,InParam:inParam}
}
func (slf *JsonProcessor) MakeRpcResponse(seq uint64,err *RpcError,reply []byte) IRpcResponseData {
return &JsonRpcResponseData{
Seq: seq,
Err: err.Error(),
Reply: reply,
}
}
func (slf *JsonRpcRequestData) IsReply() bool{
return slf.NoReply
}
func (slf *JsonRpcRequestData) GetSeq() uint64{
return slf.Seq
}
func (slf *JsonRpcRequestData) GetServiceMethod() string{
return slf.ServiceMethod
}
func (slf *JsonRpcRequestData) GetInParam() []byte{
return slf.InParam
}
func (slf *JsonRpcResponseData) GetSeq() uint64 {
return slf.Seq
}
func (slf *JsonRpcResponseData) GetErr() *RpcError {
if slf.Err == ""{
return nil
}
return Errorf(slf.Err)
}
func (slf *JsonRpcResponseData) GetReply() []byte{
return slf.Reply
}

58
rpc/pbprocessor.go Normal file
View File

@@ -0,0 +1,58 @@
package rpc
import (
"github.com/golang/protobuf/proto"
)
type PBProcessor struct {
}
func (slf *PBRpcRequestData) MakeRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte) *PBRpcRequestData{
slf.Seq = proto.Uint64(seq)
slf.ServiceMethod = proto.String(serviceMethod)
slf.NoReply = proto.Bool(noReply)
slf.InParam = inParam
return slf
}
func (slf *PBRpcResponseData) MakeRespone(seq uint64,err *RpcError,reply []byte) *PBRpcResponseData{
slf.Seq = proto.Uint64(seq)
slf.Error = proto.String(err.Error())
slf.Reply = reply
return slf
}
func (slf *PBProcessor) Marshal(v interface{}) ([]byte, error){
return proto.Marshal(v.(proto.Message))
}
func (slf *PBProcessor) Unmarshal(data []byte, msg interface{}) error{
protoMsg := msg.(proto.Message)
return proto.Unmarshal(data, protoMsg)
}
func (slf *PBProcessor) MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData{
return (&PBRpcRequestData{}).MakeRequest(seq,serviceMethod,noReply,inParam)
}
func (slf *PBProcessor) MakeRpcResponse(seq uint64,err *RpcError,reply []byte) IRpcResponseData {
return (&PBRpcResponseData{}).MakeRespone(seq,err,reply)
}
func (slf *PBRpcRequestData) IsReply() bool{
return slf.GetNoReply()
}
func (slf *PBRpcResponseData) GetErr() *RpcError {
return Errorf(slf.GetError())
}

1
rpc/probufprocessor.go Normal file
View File

@@ -0,0 +1 @@
package rpc

43
rpc/processor.go Normal file
View File

@@ -0,0 +1,43 @@
package rpc
/*
Seq uint64 // sequence number chosen by client
ServiceMethod string // format: "Service.Method"
NoReply bool //是否需要返回
//packbody
InParam []byte
type RpcResponse struct {
//head
Seq uint64 // sequence number chosen by client
Err *RpcError
//returns
Reply []byte
}
*/
type IRpcRequestData interface {
GetSeq() uint64
GetServiceMethod() string
GetInParam() []byte
IsReply() bool
}
type IRpcResponseData interface {
GetSeq() uint64
GetErr() *RpcError
GetReply() []byte
}
type IRpcProcessor interface {
Marshal(v interface{}) ([]byte, error)
Unmarshal(data []byte, v interface{}) error
MakeRpcRequest(seq uint64,serviceMethod string,noReply bool,inParam []byte) IRpcRequestData
MakeRpcResponse(seq uint64,err *RpcError,reply []byte) IRpcResponseData
}

161
rpc/rpc.pb.go Normal file
View File

@@ -0,0 +1,161 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: rpc.proto
package rpc
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type PBRpcRequestData struct {
Seq *uint64 `protobuf:"varint,1,opt,name=Seq" json:"Seq,omitempty"`
ServiceMethod *string `protobuf:"bytes,2,opt,name=ServiceMethod" json:"ServiceMethod,omitempty"`
NoReply *bool `protobuf:"varint,3,opt,name=NoReply" json:"NoReply,omitempty"`
InParam []byte `protobuf:"bytes,4,opt,name=InParam" json:"InParam,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *PBRpcRequestData) Reset() { *m = PBRpcRequestData{} }
func (m *PBRpcRequestData) String() string { return proto.CompactTextString(m) }
func (*PBRpcRequestData) ProtoMessage() {}
func (*PBRpcRequestData) Descriptor() ([]byte, []int) {
return fileDescriptor_77a6da22d6a3feb1, []int{0}
}
func (m *PBRpcRequestData) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_PBRpcRequestData.Unmarshal(m, b)
}
func (m *PBRpcRequestData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_PBRpcRequestData.Marshal(b, m, deterministic)
}
func (m *PBRpcRequestData) XXX_Merge(src proto.Message) {
xxx_messageInfo_PBRpcRequestData.Merge(m, src)
}
func (m *PBRpcRequestData) XXX_Size() int {
return xxx_messageInfo_PBRpcRequestData.Size(m)
}
func (m *PBRpcRequestData) XXX_DiscardUnknown() {
xxx_messageInfo_PBRpcRequestData.DiscardUnknown(m)
}
var xxx_messageInfo_PBRpcRequestData proto.InternalMessageInfo
func (m *PBRpcRequestData) GetSeq() uint64 {
if m != nil && m.Seq != nil {
return *m.Seq
}
return 0
}
func (m *PBRpcRequestData) GetServiceMethod() string {
if m != nil && m.ServiceMethod != nil {
return *m.ServiceMethod
}
return ""
}
func (m *PBRpcRequestData) GetNoReply() bool {
if m != nil && m.NoReply != nil {
return *m.NoReply
}
return false
}
func (m *PBRpcRequestData) GetInParam() []byte {
if m != nil {
return m.InParam
}
return nil
}
type PBRpcResponseData struct {
Seq *uint64 `protobuf:"varint,1,opt,name=Seq" json:"Seq,omitempty"`
Error *string `protobuf:"bytes,2,opt,name=Error" json:"Error,omitempty"`
Reply []byte `protobuf:"bytes,3,opt,name=Reply" json:"Reply,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *PBRpcResponseData) Reset() { *m = PBRpcResponseData{} }
func (m *PBRpcResponseData) String() string { return proto.CompactTextString(m) }
func (*PBRpcResponseData) ProtoMessage() {}
func (*PBRpcResponseData) Descriptor() ([]byte, []int) {
return fileDescriptor_77a6da22d6a3feb1, []int{1}
}
func (m *PBRpcResponseData) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_PBRpcResponseData.Unmarshal(m, b)
}
func (m *PBRpcResponseData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_PBRpcResponseData.Marshal(b, m, deterministic)
}
func (m *PBRpcResponseData) XXX_Merge(src proto.Message) {
xxx_messageInfo_PBRpcResponseData.Merge(m, src)
}
func (m *PBRpcResponseData) XXX_Size() int {
return xxx_messageInfo_PBRpcResponseData.Size(m)
}
func (m *PBRpcResponseData) XXX_DiscardUnknown() {
xxx_messageInfo_PBRpcResponseData.DiscardUnknown(m)
}
var xxx_messageInfo_PBRpcResponseData proto.InternalMessageInfo
func (m *PBRpcResponseData) GetSeq() uint64 {
if m != nil && m.Seq != nil {
return *m.Seq
}
return 0
}
func (m *PBRpcResponseData) GetError() string {
if m != nil && m.Error != nil {
return *m.Error
}
return ""
}
func (m *PBRpcResponseData) GetReply() []byte {
if m != nil {
return m.Reply
}
return nil
}
func init() {
proto.RegisterType((*PBRpcRequestData)(nil), "rpc.PBRpcRequestData")
proto.RegisterType((*PBRpcResponseData)(nil), "rpc.PBRpcResponseData")
}
func init() { proto.RegisterFile("rpc.proto", fileDescriptor_77a6da22d6a3feb1) }
var fileDescriptor_77a6da22d6a3feb1 = []byte{
// 173 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2c, 0x2a, 0x48, 0xd6,
0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2e, 0x2a, 0x48, 0x56, 0xaa, 0xe3, 0x12, 0x08, 0x70,
0x0a, 0x2a, 0x48, 0x0e, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x71, 0x49, 0x2c, 0x49, 0x14, 0x12,
0xe0, 0x62, 0x0e, 0x4e, 0x2d, 0x94, 0x60, 0x54, 0x60, 0xd4, 0x60, 0x09, 0x02, 0x31, 0x85, 0x54,
0xb8, 0x78, 0x83, 0x53, 0x8b, 0xca, 0x32, 0x93, 0x53, 0x7d, 0x53, 0x4b, 0x32, 0xf2, 0x53, 0x24,
0x98, 0x14, 0x18, 0x35, 0x38, 0x83, 0x50, 0x05, 0x85, 0x24, 0xb8, 0xd8, 0xfd, 0xf2, 0x83, 0x52,
0x0b, 0x72, 0x2a, 0x25, 0x98, 0x15, 0x18, 0x35, 0x38, 0x82, 0x60, 0x5c, 0x90, 0x8c, 0x67, 0x5e,
0x40, 0x62, 0x51, 0x62, 0xae, 0x04, 0x8b, 0x02, 0xa3, 0x06, 0x4f, 0x10, 0x8c, 0xab, 0x14, 0xc8,
0x25, 0x08, 0xb5, 0xbf, 0xb8, 0x20, 0x3f, 0xaf, 0x38, 0x15, 0x87, 0x03, 0x44, 0xb8, 0x58, 0x5d,
0x8b, 0x8a, 0xf2, 0x8b, 0xa0, 0x16, 0x43, 0x38, 0x20, 0x51, 0x84, 0x75, 0x3c, 0x41, 0x10, 0x0e,
0x20, 0x00, 0x00, 0xff, 0xff, 0x84, 0xbc, 0x24, 0x3a, 0xe3, 0x00, 0x00, 0x00,
}

19
rpc/rpc.proto Normal file
View File

@@ -0,0 +1,19 @@
syntax = "proto2";
package rpc;
message PBRpcRequestData{
optional uint64 Seq = 1;
optional string ServiceMethod = 2;
optional bool NoReply = 3;
optional bytes InParam = 4;
}
message PBRpcResponseData{
optional uint64 Seq = 1;
optional string Error = 2;
optional bytes Reply = 3;
}

View File

@@ -16,9 +16,12 @@ var NilError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem())
type RpcError string
func (e *RpcError) Error() string {
if e == nil {
return ""
}
func (e RpcError) Error() string {
return string(e)
return string(*e)
}
func ConvertError(e error) *RpcError{
@@ -193,7 +196,7 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
err := fmt.Errorf("%v: %s", r, buf[:l])
log.Error("Handler Rpc %s Core dump info:%+v\n",request.ServiceMethod,err)
log.Error("Handler Rpc %s Core dump info:%+v\n",request.RpcRequestData.GetServiceMethod(),err)
rpcErr := RpcError("call error : core dumps")
if request.requestHandle!=nil {
request.requestHandle(nil,&rpcErr)
@@ -201,9 +204,9 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
}
}()
v,ok := slf.mapfunctons[request.ServiceMethod]
v,ok := slf.mapfunctons[request.RpcRequestData.GetServiceMethod()]
if ok == false {
err := Errorf("RpcHandler %s cannot find %s",slf.rpcHandler.GetName(),request.ServiceMethod)
err := Errorf("RpcHandler %s cannot find %s",slf.rpcHandler.GetName(),request.RpcRequestData.GetServiceMethod())
log.Error("%s",err.Error())
if request.requestHandle!=nil {
request.requestHandle(nil,err)
@@ -215,9 +218,9 @@ func (slf *RpcHandler) HandlerRpcRequest(request *RpcRequest) {
var paramList []reflect.Value
var err error
if request.localParam==nil{
err = processor.Unmarshal(request.InParam,&v.iparam)
err = processor.Unmarshal(request.RpcRequestData.GetInParam(),v.iparam)
if err!=nil {
rerr := Errorf("Call Rpc %s Param error %+v",request.ServiceMethod,err)
rerr := Errorf("Call Rpc %s Param error %+v",request.RpcRequestData.GetServiceMethod(),err)
log.Error("%s",rerr.Error())
if request.requestHandle!=nil {
request.requestHandle(nil, rerr)

View File

@@ -10,9 +10,11 @@ import (
"strings"
)
var processor iprocessor = &JsonProcessor{}
var processor IRpcProcessor = &JsonProcessor{}
var LittleEndian bool
type Call struct {
Seq uint64
ServiceMethod string
@@ -30,10 +32,7 @@ func (slf *Call) Done() *Call{
return <-slf.done
}
type iprocessor interface {
Marshal(v interface{}) ([]byte, error)
Unmarshal(data []byte, v interface{}) error
}
type Server struct {
functions map[interface{}]interface{}
@@ -49,6 +48,10 @@ type RpcHandleFinder interface {
FindRpcHandler(serviceMethod string) IRpcHandler
}
func SetProcessor(proc IRpcProcessor) {
processor = proc
}
func (slf *Server) Init(rpcHandleFinder RpcHandleFinder) {
slf.cmdchannel = make(chan *Call,10000)
slf.rpcHandleFinder = rpcHandleFinder
@@ -80,17 +83,20 @@ type RpcAgent struct {
func (agent *RpcAgent) WriteRespone(serviceMethod string,seq uint64,reply interface{},err *RpcError) {
var rpcRespone RpcResponse
rpcRespone.Seq = seq
rpcRespone.Err = err
//rpcRespone.Seq = seq
//rpcRespone.Err = err
var mReply []byte
var rpcError *RpcError
var errM error
if reply!=nil {
rpcRespone.Reply,errM = processor.Marshal(reply)
mReply,errM = processor.Marshal(reply)
if errM != nil {
rpcRespone.Err = ConvertError(errM)
rpcError = ConvertError(errM)
}
}
bytes,errM := processor.Marshal(&rpcRespone)
rpcRespone.RpcResponeData = processor.MakeRpcResponse(seq,rpcError,mReply)
bytes,errM := processor.Marshal(rpcRespone.RpcResponeData)
if errM != nil {
log.Error("service method %s %+v Marshal error:%+v!", serviceMethod,rpcRespone,errM)
return
@@ -114,11 +120,12 @@ func (agent *RpcAgent) Run() {
//解析head
var req RpcRequest
err = processor.Unmarshal(data,&req)
req.RpcRequestData = processor.MakeRpcRequest(0,"",false,nil)
err = processor.Unmarshal(data,req.RpcRequestData)
if err != nil {
if req.Seq>0 {
if req.RpcRequestData.GetSeq()>0 {
rpcError := RpcError("rpc Unmarshal request is error")
agent.WriteRespone(req.ServiceMethod,req.Seq,nil,&rpcError)
agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
continue
}else{
log.Error("rpc Unmarshal request is error: %v", err)
@@ -128,25 +135,25 @@ func (agent *RpcAgent) Run() {
}
//交给程序处理
serviceMethod := strings.Split(req.ServiceMethod,".")
serviceMethod := strings.Split(req.RpcRequestData.GetServiceMethod(),".")
if len(serviceMethod)!=2 {
rpcError := RpcError("rpc request req.ServiceMethod is error")
agent.WriteRespone(req.ServiceMethod,req.Seq,nil,&rpcError)
agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
log.Debug("rpc request req.ServiceMethod is error")
continue
}
rpcHandler := agent.rpcserver.rpcHandleFinder.FindRpcHandler(serviceMethod[0])
if rpcHandler== nil {
rpcError := RpcError(fmt.Sprintf("service method %s not config!", req.ServiceMethod))
agent.WriteRespone(req.ServiceMethod,req.Seq,nil,&rpcError)
log.Error("service method %s not config!", req.ServiceMethod)
rpcError := RpcError(fmt.Sprintf("service method %s not config!", req.RpcRequestData.GetServiceMethod()))
agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),nil,&rpcError)
log.Error("service method %s not config!", req.RpcRequestData.GetServiceMethod())
continue
}
if req.NoReply == false {
if req.RpcRequestData.IsReply()== false {
req.requestHandle = func(Returns interface{},Err *RpcError){
agent.WriteRespone(req.ServiceMethod,req.Seq,Returns,Err)
agent.WriteRespone(req.RpcRequestData.GetServiceMethod(),req.RpcRequestData.GetSeq(),Returns,Err)
}
}
@@ -206,11 +213,12 @@ func (slf *Server) rpcHandlerGo(noReply bool,handlerName string,methodName strin
return pCall
}
var req RpcRequest
req.ServiceMethod = fmt.Sprintf("%s.%s",handlerName,methodName)
//req.ServiceMethod = fmt.Sprintf("%s.%s",handlerName,methodName)
req.localParam = args
req.localReply = reply
req.NoReply = noReply
//req.NoReply = noReply
req.RpcRequestData = processor.MakeRpcRequest(0,fmt.Sprintf("%s.%s",handlerName,methodName),noReply,nil)
if noReply == false {
req.requestHandle = func(Returns interface{},Err *RpcError){
if Err!=nil {
@@ -243,11 +251,11 @@ func (slf *Server) rpcHandlerAsyncGo(callerRpcHandler IRpcHandler,noReply bool,h
}
var req RpcRequest
req.ServiceMethod = fmt.Sprintf("%s.%s",handlerName,methodName)
//req.ServiceMethod = fmt.Sprintf("%s.%s",handlerName,methodName)
req.localParam = args
req.localReply = reply
req.NoReply = noReply
//req.NoReply = noReply
req.RpcRequestData = processor.MakeRpcRequest(0,fmt.Sprintf("%s.%s",handlerName,methodName),noReply,nil)
if noReply == false {
req.requestHandle = func(Returns interface{},Err *RpcError){
if Err == nil {

View File

@@ -109,7 +109,7 @@ func (slf *Service) Run() {
bStop = true
case rpcRequest :=<- rpcRequestChan:
if slf.profiler!=nil {
analyzer = slf.profiler.Push("Req_"+rpcRequest.ServiceMethod)
analyzer = slf.profiler.Push("Req_"+rpcRequest.RpcRequestData.GetServiceMethod())
}
slf.GetRpcHandler().HandlerRpcRequest(rpcRequest)