From bffc2b5f79bfca0ed6eb77dd3cf04d2c1e57c5b2 Mon Sep 17 00:00:00 2001 From: lifeiyi <736926938@qq.com> Date: Sat, 28 Mar 2020 19:53:03 +0800 Subject: [PATCH] =?UTF-8?q?=E6=A1=86=E6=9E=B6=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- network/processor/pbprocessor.go | 7 ++++--- network/tcp_msg.go | 4 +++- rpc/rpchandler.go | 10 ++++++++++ service/service.go | 5 +++-- sysservice/tcpservice.go | 2 +- 5 files changed, 21 insertions(+), 7 deletions(-) diff --git a/network/processor/pbprocessor.go b/network/processor/pbprocessor.go index c880d7e..c2cf3a6 100644 --- a/network/processor/pbprocessor.go +++ b/network/processor/pbprocessor.go @@ -71,13 +71,14 @@ func (slf *PBProcessor ) Unmarshal(data []byte) (interface{}, error) { // must goroutine safe func (slf *PBProcessor ) Marshal(msg interface{}) ([]byte, error){ - bytes,err := proto.Marshal(msg.(proto.Message)) + pMsg := msg.(*PBPackInfo) + + bytes,err := proto.Marshal(pMsg.msg.(proto.Message)) if err != nil { return nil,err } - buff := make([]byte, 0, len(bytes)+MsgTypeSize) - pMsg := msg.(*PBPackInfo) + buff := make([]byte, 2, len(bytes)+MsgTypeSize) if slf.LittleEndian == true { binary.LittleEndian.PutUint16(buff[:2],pMsg.typ) }else{ diff --git a/network/tcp_msg.go b/network/tcp_msg.go index 6b309c4..db06d4a 100644 --- a/network/tcp_msg.go +++ b/network/tcp_msg.go @@ -89,6 +89,7 @@ func (p *MsgParser) Read(conn *TCPConn) ([]byte, error) { msgLen = binary.BigEndian.Uint32(bufMsgLen) } } + msgLen -= 2 // check len if msgLen > p.maxMsgLen { @@ -121,6 +122,7 @@ func (p *MsgParser) Write(conn *TCPConn, args ...[]byte) error { return errors.New("message too short") } + //msgLen -= 2 msg := make([]byte, uint32(p.lenMsgLen)+msgLen) // write len @@ -131,7 +133,7 @@ func (p *MsgParser) Write(conn *TCPConn, args ...[]byte) error { if p.littleEndian { binary.LittleEndian.PutUint16(msg, uint16(msgLen)) } else { - binary.BigEndian.PutUint16(msg, uint16(msgLen)) + binary.BigEndian.PutUint16(msg, uint16(msgLen)+uint16(p.lenMsgLen)) } case 4: if p.littleEndian { diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index 625e2d9..5fdc825 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -40,6 +40,16 @@ type IRpcHandler interface { GetRpcRequestChan() chan *RpcRequest GetRpcResponeChan() chan *Call CallMethod(ServiceMethod string,param interface{},reply interface{}) error + + AsyncCall(serviceMethod string,args interface{},callback interface{}) error + GRAsyncCall(serviceMethod string,args interface{},callback interface{}) error + Call(serviceMethod string,args interface{},reply interface{}) error + GRCall(serviceMethod string,args interface{},reply interface{}) error + Go(serviceMethod string,args interface{}) error + GRGo(serviceMethod string,args interface{}) error + AsyncCallNode(nodeId int,serviceMethod string,args interface{},callback interface{}) error + CallNode(nodeId int,serviceMethod string,args interface{},reply interface{}) error + GoNode(nodeId int,serviceMethod string,args interface{}) error } func (slf *RpcHandler) GetRpcHandler() IRpcHandler{ diff --git a/service/service.go b/service/service.go index d7a9a7a..b852128 100644 --- a/service/service.go +++ b/service/service.go @@ -1,6 +1,7 @@ package service import ( + "github.com/duanhf2012/origin/event" "github.com/duanhf2012/origin/rpc" "github.com/duanhf2012/origin/util/timer" "reflect" @@ -94,8 +95,8 @@ func (slf *Service) Run() { slf.GetRpcHandler().HandlerRpcRequest(rpcRequest) case rpcResponeCB := <- rpcResponeCallBack: slf.GetRpcHandler().HandlerRpcResponeCB(rpcResponeCB) - case event := <- eventChan: - slf.OnEventHandler(event) + case ev := <- eventChan: + slf.this.(event.IEventProcessor).OnEventHandler(ev) case t := <- slf.dispatcher.ChanTimer: t.Cb() } diff --git a/sysservice/tcpservice.go b/sysservice/tcpservice.go index aaf78ae..a7edd91 100644 --- a/sysservice/tcpservice.go +++ b/sysservice/tcpservice.go @@ -86,7 +86,7 @@ func (slf *TcpService) NewClient(conn *network.TCPConn) network.Agent { continue } - pClient := &Client{tcpConn:conn} + pClient := &Client{tcpConn:conn, id:slf.initClientId} pClient.tcpService = slf slf.mapClient[slf.initClientId] = pClient return pClient