mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-16 00:44:42 +08:00
框架修复
This commit is contained in:
@@ -71,13 +71,14 @@ func (slf *PBProcessor ) Unmarshal(data []byte) (interface{}, error) {
|
|||||||
|
|
||||||
// must goroutine safe
|
// must goroutine safe
|
||||||
func (slf *PBProcessor ) Marshal(msg interface{}) ([]byte, error){
|
func (slf *PBProcessor ) Marshal(msg interface{}) ([]byte, error){
|
||||||
bytes,err := proto.Marshal(msg.(proto.Message))
|
pMsg := msg.(*PBPackInfo)
|
||||||
|
|
||||||
|
bytes,err := proto.Marshal(pMsg.msg.(proto.Message))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil,err
|
return nil,err
|
||||||
}
|
}
|
||||||
|
|
||||||
buff := make([]byte, 0, len(bytes)+MsgTypeSize)
|
buff := make([]byte, 2, len(bytes)+MsgTypeSize)
|
||||||
pMsg := msg.(*PBPackInfo)
|
|
||||||
if slf.LittleEndian == true {
|
if slf.LittleEndian == true {
|
||||||
binary.LittleEndian.PutUint16(buff[:2],pMsg.typ)
|
binary.LittleEndian.PutUint16(buff[:2],pMsg.typ)
|
||||||
}else{
|
}else{
|
||||||
|
|||||||
@@ -89,6 +89,7 @@ func (p *MsgParser) Read(conn *TCPConn) ([]byte, error) {
|
|||||||
msgLen = binary.BigEndian.Uint32(bufMsgLen)
|
msgLen = binary.BigEndian.Uint32(bufMsgLen)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
msgLen -= 2
|
||||||
|
|
||||||
// check len
|
// check len
|
||||||
if msgLen > p.maxMsgLen {
|
if msgLen > p.maxMsgLen {
|
||||||
@@ -121,6 +122,7 @@ func (p *MsgParser) Write(conn *TCPConn, args ...[]byte) error {
|
|||||||
return errors.New("message too short")
|
return errors.New("message too short")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//msgLen -= 2
|
||||||
msg := make([]byte, uint32(p.lenMsgLen)+msgLen)
|
msg := make([]byte, uint32(p.lenMsgLen)+msgLen)
|
||||||
|
|
||||||
// write len
|
// write len
|
||||||
@@ -131,7 +133,7 @@ func (p *MsgParser) Write(conn *TCPConn, args ...[]byte) error {
|
|||||||
if p.littleEndian {
|
if p.littleEndian {
|
||||||
binary.LittleEndian.PutUint16(msg, uint16(msgLen))
|
binary.LittleEndian.PutUint16(msg, uint16(msgLen))
|
||||||
} else {
|
} else {
|
||||||
binary.BigEndian.PutUint16(msg, uint16(msgLen))
|
binary.BigEndian.PutUint16(msg, uint16(msgLen)+uint16(p.lenMsgLen))
|
||||||
}
|
}
|
||||||
case 4:
|
case 4:
|
||||||
if p.littleEndian {
|
if p.littleEndian {
|
||||||
|
|||||||
@@ -40,6 +40,16 @@ type IRpcHandler interface {
|
|||||||
GetRpcRequestChan() chan *RpcRequest
|
GetRpcRequestChan() chan *RpcRequest
|
||||||
GetRpcResponeChan() chan *Call
|
GetRpcResponeChan() chan *Call
|
||||||
CallMethod(ServiceMethod string,param interface{},reply interface{}) error
|
CallMethod(ServiceMethod string,param interface{},reply interface{}) error
|
||||||
|
|
||||||
|
AsyncCall(serviceMethod string,args interface{},callback interface{}) error
|
||||||
|
GRAsyncCall(serviceMethod string,args interface{},callback interface{}) error
|
||||||
|
Call(serviceMethod string,args interface{},reply interface{}) error
|
||||||
|
GRCall(serviceMethod string,args interface{},reply interface{}) error
|
||||||
|
Go(serviceMethod string,args interface{}) error
|
||||||
|
GRGo(serviceMethod string,args interface{}) error
|
||||||
|
AsyncCallNode(nodeId int,serviceMethod string,args interface{},callback interface{}) error
|
||||||
|
CallNode(nodeId int,serviceMethod string,args interface{},reply interface{}) error
|
||||||
|
GoNode(nodeId int,serviceMethod string,args interface{}) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (slf *RpcHandler) GetRpcHandler() IRpcHandler{
|
func (slf *RpcHandler) GetRpcHandler() IRpcHandler{
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package service
|
package service
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/duanhf2012/origin/event"
|
||||||
"github.com/duanhf2012/origin/rpc"
|
"github.com/duanhf2012/origin/rpc"
|
||||||
"github.com/duanhf2012/origin/util/timer"
|
"github.com/duanhf2012/origin/util/timer"
|
||||||
"reflect"
|
"reflect"
|
||||||
@@ -94,8 +95,8 @@ func (slf *Service) Run() {
|
|||||||
slf.GetRpcHandler().HandlerRpcRequest(rpcRequest)
|
slf.GetRpcHandler().HandlerRpcRequest(rpcRequest)
|
||||||
case rpcResponeCB := <- rpcResponeCallBack:
|
case rpcResponeCB := <- rpcResponeCallBack:
|
||||||
slf.GetRpcHandler().HandlerRpcResponeCB(rpcResponeCB)
|
slf.GetRpcHandler().HandlerRpcResponeCB(rpcResponeCB)
|
||||||
case event := <- eventChan:
|
case ev := <- eventChan:
|
||||||
slf.OnEventHandler(event)
|
slf.this.(event.IEventProcessor).OnEventHandler(ev)
|
||||||
case t := <- slf.dispatcher.ChanTimer:
|
case t := <- slf.dispatcher.ChanTimer:
|
||||||
t.Cb()
|
t.Cb()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -86,7 +86,7 @@ func (slf *TcpService) NewClient(conn *network.TCPConn) network.Agent {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
pClient := &Client{tcpConn:conn}
|
pClient := &Client{tcpConn:conn, id:slf.initClientId}
|
||||||
pClient.tcpService = slf
|
pClient.tcpService = slf
|
||||||
slf.mapClient[slf.initClientId] = pClient
|
slf.mapClient[slf.initClientId] = pClient
|
||||||
return pClient
|
return pClient
|
||||||
|
|||||||
Reference in New Issue
Block a user