diff --git a/rpc/client.go b/rpc/client.go index 062a9a5..9a90c92 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -226,7 +226,7 @@ func (slf *Client) RawGo(processor IRpcProcessor,noReply bool,serviceMethod stri return call } - err = slf.conn.WriteMsg(bytes) + err = slf.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())},bytes) if err != nil { slf.RemovePending(call.Seq) call.Err = err diff --git a/rpc/jsonprocessor.go b/rpc/jsonprocessor.go index de038ab..4f74dad 100644 --- a/rpc/jsonprocessor.go +++ b/rpc/jsonprocessor.go @@ -87,6 +87,11 @@ func (slf *JsonProcessor) IsParse(param interface{}) bool { } +func (slf *JsonProcessor) GetProcessorType() RpcProcessorType{ + return RPC_PROCESSOR_JSON +} + + func (slf *JsonRpcRequestData) IsNoReply() bool{ return slf.NoReply } @@ -132,5 +137,3 @@ func (slf *JsonRpcResponseData) GetReply() []byte{ - - diff --git a/rpc/pbprocessor.go b/rpc/pbprocessor.go index d0ff233..2a26d49 100644 --- a/rpc/pbprocessor.go +++ b/rpc/pbprocessor.go @@ -134,6 +134,11 @@ func (slf *PBProcessor) IsParse(param interface{}) bool { } +func (slf *PBProcessor) GetProcessorType() RpcProcessorType{ + return RPC_PROCESSOR_PB +} + + func (slf *PBRpcRequestData) IsNoReply() bool{ return slf.GetNoReply() } diff --git a/rpc/processor.go b/rpc/processor.go index 591f9bc..42c3570 100644 --- a/rpc/processor.go +++ b/rpc/processor.go @@ -9,6 +9,7 @@ type IRpcProcessor interface { ReleaseRpcRequest(rpcRequestData IRpcRequestData) ReleaseRpcRespose(rpcRequestData IRpcResponseData) IsParse(param interface{}) bool //是否可解析 + GetProcessorType() RpcProcessorType } diff --git a/rpc/server.go b/rpc/server.go index 6b34db3..0f834dd 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -108,8 +108,7 @@ func (agent *RpcAgent) WriteRespone(processor IRpcProcessor,serviceMethod string return } - - errM = agent.conn.WriteMsg(bytes) + errM = agent.conn.WriteMsg([]byte{uint8(processor.GetProcessorType())},bytes) if errM != nil { log.Error("Rpc %s return is error:%+v",serviceMethod,errM) }