diff --git a/rpc/gogopbprocessor.go b/rpc/gogopbprocessor.go index 9df0dd5..d3c9b21 100644 --- a/rpc/gogopbprocessor.go +++ b/rpc/gogopbprocessor.go @@ -41,7 +41,10 @@ func (slf *GoGoPBProcessor) Marshal(v interface{}) ([]byte, error){ } func (slf *GoGoPBProcessor) Unmarshal(data []byte, msg interface{}) error{ - protoMsg := msg.(proto.Message) + protoMsg,ok := msg.(proto.Message) + if ok == false { + return fmt.Errorf("%+v is not of proto.Message type",msg) + } return proto.Unmarshal(data, protoMsg) } diff --git a/rpc/server.go b/rpc/server.go index a268956..52e7524 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -147,7 +147,6 @@ func (agent *RpcAgent) Run() { ReleaseRpcRequest(req) continue } else { - //will close tcpconn ReleaseRpcRequest(req) break } @@ -303,21 +302,22 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie if reply != nil && Returns != reply && Returns != nil { byteReturns, err := req.rpcProcessor.Marshal(Returns) if err != nil { - log.SError("returns data cannot be marshal ", callSeq) - ReleaseRpcRequest(req) - } - - err = req.rpcProcessor.Unmarshal(byteReturns, reply) - if err != nil { - log.SError("returns data cannot be Unmarshal ", callSeq) - ReleaseRpcRequest(req) + Err = ConvertError(err) + log.SError("returns data cannot be marshal,callSeq is ", callSeq," error is ",err.Error()) + }else{ + err = req.rpcProcessor.Unmarshal(byteReturns, reply) + if err != nil { + Err = ConvertError(err) + log.SError("returns data cannot be Unmarshal,callSeq is ", callSeq," error is ",err.Error()) + } } } + ReleaseRpcRequest(req) v := client.RemovePending(callSeq) if v == nil { log.SError("rpcClient cannot find seq ",callSeq, " in pending") - ReleaseRpcRequest(req) + return } if len(Err) == 0 { @@ -326,7 +326,6 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor, client *Clie v.Err = Err } v.done <- v - ReleaseRpcRequest(req) } }