diff --git a/network/processor/jsonprocessor.go b/network/processor/jsonprocessor.go index 240ac8a..1a5b7e8 100644 --- a/network/processor/jsonprocessor.go +++ b/network/processor/jsonprocessor.go @@ -3,6 +3,7 @@ package processor import ( "encoding/json" "fmt" + "github.com/duanhf2012/origin/network" "reflect" ) @@ -53,6 +54,7 @@ func (jsonProcessor *JsonProcessor ) MsgRoute(msg interface{},userdata interface func (jsonProcessor *JsonProcessor) Unmarshal(data []byte) (interface{}, error) { typeStruct := struct {Type int `json:"typ"`}{} + defer network.ReleaseByteSlice(data) err := json.Unmarshal(data, &typeStruct) if err != nil { return nil, err @@ -64,13 +66,13 @@ func (jsonProcessor *JsonProcessor) Unmarshal(data []byte) (interface{}, error) return nil,fmt.Errorf("Cannot find register %d msgType!",msgType) } - msg := reflect.New(info.msgType.Elem()).Interface() - err = json.Unmarshal(data, msg) + msgData := reflect.New(info.msgType.Elem()).Interface() + err = json.Unmarshal(data, msgData) if err != nil { return nil,err } - return &JsonPackInfo{typ:msgType,msg:msg},nil + return &JsonPackInfo{typ:msgType,msg:msgData},nil } func (jsonProcessor *JsonProcessor) Marshal(msg interface{}) ([]byte, error) { diff --git a/network/processor/pbprocessor.go b/network/processor/pbprocessor.go index 8b2165f..0a55e91 100644 --- a/network/processor/pbprocessor.go +++ b/network/processor/pbprocessor.go @@ -3,6 +3,7 @@ package processor import ( "encoding/binary" "fmt" + "github.com/duanhf2012/origin/network" "github.com/golang/protobuf/proto" "reflect" ) @@ -63,6 +64,7 @@ func (pbProcessor *PBProcessor ) MsgRoute(msg interface{},userdata interface{}) // must goroutine safe func (pbProcessor *PBProcessor ) Unmarshal(data []byte) (interface{}, error) { + defer network.ReleaseByteSlice(data) var msgType uint16 if pbProcessor.LittleEndian == true { msgType = binary.LittleEndian.Uint16(data[:2]) diff --git a/network/processor/pbrawprocessor.go b/network/processor/pbrawprocessor.go index d46ad38..46a1e1b 100644 --- a/network/processor/pbrawprocessor.go +++ b/network/processor/pbrawprocessor.go @@ -53,7 +53,7 @@ func (pbRawProcessor *PBRawProcessor ) Unmarshal(data []byte) (interface{}, erro msgType = binary.BigEndian.Uint16(data[:2]) } - return &PBRawPackInfo{typ:msgType,rawMsg:data[2:]},nil + return &PBRawPackInfo{typ:msgType,rawMsg:data},nil } // must goroutine safe diff --git a/network/tcp_conn.go b/network/tcp_conn.go index 78354fe..fbe242a 100644 --- a/network/tcp_conn.go +++ b/network/tcp_conn.go @@ -37,8 +37,7 @@ func newTCPConn(conn net.Conn, pendingWriteNum int, msgParser *MsgParser) *TCPCo break } _, err := conn.Write(b) - releaseByteSlice(b) - + ReleaseByteSlice(b) if err != nil { break @@ -126,7 +125,7 @@ func (tcpConn *TCPConn) ReadMsg() ([]byte, error) { } func (tcpConn *TCPConn) ReleaseReadMsg(byteBuff []byte){ - releaseByteSlice(byteBuff) + ReleaseByteSlice(byteBuff) } func (tcpConn *TCPConn) WriteMsg(args ...[]byte) error { diff --git a/network/tcp_msg.go b/network/tcp_msg.go index 55bfcea..49603e1 100644 --- a/network/tcp_msg.go +++ b/network/tcp_msg.go @@ -101,7 +101,7 @@ func (p *MsgParser) Read(conn *TCPConn) ([]byte, error) { //msgData := make([]byte, msgLen) msgData := makeByteSlice(int(msgLen)) if _, err := io.ReadFull(conn, msgData); err != nil { - releaseByteSlice(msgData) + ReleaseByteSlice(msgData) return nil, err } @@ -122,7 +122,7 @@ func (p *MsgParser) Write(conn *TCPConn, args ...[]byte) error { } else if msgLen < p.minMsgLen { return errors.New("message too short") } - + //msg := make([]byte, uint32(p.lenMsgLen)+msgLen) msg := makeByteSlice(p.lenMsgLen+int(msgLen)) // write len diff --git a/rpc/client.go b/rpc/client.go index bb825b6..ca62954 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -175,7 +175,6 @@ func (client *Client) AsyncCall(rpcHandler IRpcHandler,serviceMethod string,call } request := &RpcRequest{} - call.Arg = args call.Seq = client.generateSeq() request.RpcRequestData = processor.MakeRpcRequest(client.startSeq,serviceMethod,false,InParam,nil) client.AddPending(call) @@ -209,7 +208,6 @@ func (client *Client) RawGo(processor IRpcProcessor,noReply bool,serviceMethod s call.Reply = reply request := &RpcRequest{} - call.Arg = args call.Seq = client.generateSeq() if noReply == false { client.AddPending(call) diff --git a/rpc/pbprocessor.go b/rpc/pbprocessor.go index d562808..5effaa3 100644 --- a/rpc/pbprocessor.go +++ b/rpc/pbprocessor.go @@ -61,26 +61,26 @@ func (slf *PBRpcRequestData) MakeRequest(seq uint64,serviceMethod string,noReply } switch inAdditionParam.(type) { - case *int: - slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_SParam{int64(*inAdditionParam.(*int))}} - case *int32: - slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_SParam{int64(*inAdditionParam.(*int32))}} - case *int16: - slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_SParam{int64(*inAdditionParam.(*int16))}} - case *int64: - slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_SParam{*inAdditionParam.(*int64)}} - case *uint: - slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_UParam{uint64(*inAdditionParam.(*uint))}} - case *uint32: - slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_UParam{uint64(*inAdditionParam.(*uint32))}} - case *uint16: - slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_UParam{uint64(*inAdditionParam.(*uint16))}} - case *uint64: - slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_UParam{*inAdditionParam.(*uint64)}} - case *string: - slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_StrParam{*inAdditionParam.(*string)}} - case *[]byte: - slf.AddtionParam = &AdditionParam{AdditionOneof: &AdditionParam_BParam{*inAdditionParam.(*[]byte)}} + case int: + slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_SParam{int64(inAdditionParam.(int))}} + case int32: + slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_SParam{int64(inAdditionParam.(int32))}} + case int16: + slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_SParam{int64(inAdditionParam.(int16))}} + case int64: + slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_SParam{inAdditionParam.(int64)}} + case uint: + slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_UParam{uint64(inAdditionParam.(uint))}} + case uint32: + slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_UParam{uint64(inAdditionParam.(uint32))}} + case uint16: + slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_UParam{uint64(inAdditionParam.(uint16))}} + case uint64: + slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_UParam{inAdditionParam.(uint64)}} + case string: + slf.AddtionParam = &AdditionParam{AdditionOneof:&AdditionParam_StrParam{inAdditionParam.(string)}} + case []byte: + slf.AddtionParam = &AdditionParam{AdditionOneof: &AdditionParam_BParam{inAdditionParam.([]byte)}} default: panic(fmt.Sprintf("not support type %+v",inAdditionParam)) } diff --git a/rpc/rpc.go b/rpc/rpc.go index 238de0e..5f23dd4 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -12,7 +12,8 @@ type RpcRequest struct { bLocalRequest bool localReply interface{} localParam interface{} //本地调用的参数列表 - localRawParam []byte + inputArgs IRawInputArgs + requestHandle RequestHandler callback *reflect.Value rpcProcessor IRpcProcessor @@ -44,6 +45,12 @@ type IRpcResponseData interface { GetReply() []byte } +type IRawInputArgs interface { + GetRawData() []byte //获取原始数据 + GetAdditionParam() interface{} //获取附加数据 + DoGc() //处理完成,回收内存 +} + type RpcHandleFinder interface { FindRpcHandler(serviceMethod string) IRpcHandler } @@ -55,7 +62,6 @@ type RawAdditionParamNull struct { type Call struct { Seq uint64 ServiceMethod string - Arg interface{} Reply interface{} Response *RpcResponse Err error @@ -93,7 +99,6 @@ func (rpcResponse *RpcResponse) Clear() *RpcResponse{ func (call *Call) Clear() *Call{ call.Seq = 0 call.ServiceMethod = "" - call.Arg = nil call.Reply = nil call.Response = nil call.Err = nil diff --git a/rpc/rpchandler.go b/rpc/rpchandler.go index 692cbc5..ea7f320 100644 --- a/rpc/rpchandler.go +++ b/rpc/rpchandler.go @@ -77,8 +77,7 @@ type IRpcHandler interface { AsyncCallNode(nodeId int,serviceMethod string,args interface{},callback interface{}) error CallNode(nodeId int,serviceMethod string,args interface{},reply interface{}) error GoNode(nodeId int,serviceMethod string,args interface{}) error - RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,serviceMethod string,args []byte,additionParam interface{}) error - RawCastGo(rpcProcessorType RpcProcessorType,serviceMethod string,args []byte,additionParam interface{}) + RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,serviceMethod string,args IRawInputArgs) error IsSingleCoroutine() bool } @@ -230,6 +229,9 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) { }() defer ReleaseRpcRequest(request) defer request.rpcProcessor.ReleaseRpcRequest(request.RpcRequestData) + if request.inputArgs!=nil { + defer request.inputArgs.DoGc() + } v,ok := handler.mapFunctions[request.RpcRequestData.GetServiceMethod()] if ok == false { @@ -262,8 +264,8 @@ func (handler *RpcHandler) HandlerRpcRequest(request *RpcRequest) { return } }else { - if request.localRawParam!=nil { - err = request.rpcProcessor.Unmarshal(request.localRawParam,iParam) + if request.inputArgs!=nil { + err = request.rpcProcessor.Unmarshal(request.inputArgs.GetRawData(),iParam) if err!=nil { rErr := Errorf("Call Rpc %s Param error %+v",request.RpcRequestData.GetServiceMethod(),err) log.Error("%s", rErr.Error()) @@ -373,7 +375,7 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,s return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,nil) } //其他的rpcHandler的处理器 - pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClient,true,sMethod[0],sMethod[1],args,nil,nil,nil) + pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClient,true,sMethod[0],sMethod[1],args,nil,nil) if pCall.Err!=nil { err = pCall.Err } @@ -392,59 +394,6 @@ func (handler *RpcHandler) goRpc(processor IRpcProcessor,bCast bool,nodeId int,s return err } -func (handler *RpcHandler) rawGoRpc(processor IRpcProcessor,bCast bool,nodeId int,serviceMethod string,args []byte,additionParam interface{}) error { - var pClientList []*Client - err := handler.funcRpcClient(nodeId,serviceMethod,&pClientList) - if err != nil { - log.Error("Call serviceMethod is error:%+v!",err) - return err - } - if len(pClientList) > 1 && bCast == false { - log.Error("Cannot call more then 1 node!") - return fmt.Errorf("Cannot call more then 1 node!") - } - - //2.rpcclient调用 - //如果调用本结点服务 - for _,pClient := range pClientList { - if pClient.bSelfNode == true { - pLocalRpcServer:= handler.funcRpcServer() - //判断是否是同一服务 - sMethod := strings.Split(serviceMethod,".") - if len(sMethod)!=2 { - serr := fmt.Errorf("Call serviceMethod %s is error!",serviceMethod) - log.Error("%+v",serr) - if serr!= nil { - err = serr - } - continue - } - //调用自己rpcHandler处理器 - if sMethod[0] == handler.rpcHandler.GetName() { //自己服务调用 - // - return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,nil) - } - //其他的rpcHandler的处理器 - pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClient,true,sMethod[0],sMethod[1],nil,args,nil,additionParam) - if pCall.Err!=nil { - err = pCall.Err - } - ReleaseCall(pCall) - continue - } - - //跨node调用 - pCall := pClient.RawGo(processor,true,serviceMethod,args,additionParam,nil) - if pCall.Err!=nil { - err = pCall.Err - } - ReleaseCall(pCall) - } - - return err -} - - func (handler *RpcHandler) callRpc(nodeId int,serviceMethod string,args interface{},reply interface{}) error { var pClientList []*Client err := handler.funcRpcClient(nodeId,serviceMethod,&pClientList) @@ -475,7 +424,7 @@ func (handler *RpcHandler) callRpc(nodeId int,serviceMethod string,args interfac return pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,reply) } //其他的rpcHandler的处理器 - pCall := pLocalRpcServer.selfNodeRpcHandlerGo(nil,pClient,false,sMethod[0],sMethod[1],args,nil,reply,nil) + pCall := pLocalRpcServer.selfNodeRpcHandlerGo(nil,pClient,false,sMethod[0],sMethod[1],args,reply,nil) err = pCall.Done().Err pClient.RemovePending(pCall.Seq) ReleaseCall(pCall) @@ -560,7 +509,7 @@ func (handler *RpcHandler) asyncCallRpc(nodeid int,serviceMethod string,args int } return nil } - pCall := pLocalRpcServer.selfNodeRpcHandlerGo(nil,pClient,false,sMethod[0],sMethod[1],args,nil,reply,nil) + pCall := pLocalRpcServer.selfNodeRpcHandlerGo(nil,pClient,false,sMethod[0],sMethod[1],args,reply,nil) err = pCall.Done().Err pClient.RemovePending(pCall.Seq) ReleaseCall(pCall) @@ -612,11 +561,61 @@ func (handler *RpcHandler) CastGo(serviceMethod string,args interface{}) { handler.goRpc(nil,true,0,serviceMethod,args) } -func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,serviceMethod string,args []byte,additionParam interface{}) error { - return handler.rawGoRpc(GetProcessor(uint8(rpcProcessorType)),false,nodeId,serviceMethod,args,additionParam) -} - -func (handler *RpcHandler) RawCastGo(rpcProcessorType RpcProcessorType,serviceMethod string,args []byte,additionParam interface{}) { - handler.goRpc(GetProcessor(uint8(rpcProcessorType)),true,0,serviceMethod,args) +func (handler *RpcHandler) RawGoNode(rpcProcessorType RpcProcessorType,nodeId int,serviceMethod string,args IRawInputArgs) error { + processor := GetProcessor(uint8(rpcProcessorType)) + var pClientList []*Client + err := handler.funcRpcClient(nodeId,serviceMethod,&pClientList) + if err != nil { + args.DoGc() + log.Error("Call serviceMethod is error:%+v!",err) + return err + } + if len(pClientList) > 1 { + args.DoGc() + log.Error("Cannot call more then 1 node!") + return fmt.Errorf("Cannot call more then 1 node!") + } + + //2.rpcclient调用 + //如果调用本结点服务 + for _,pClient := range pClientList { + if pClient.bSelfNode == true { + pLocalRpcServer:= handler.funcRpcServer() + //判断是否是同一服务 + sMethod := strings.Split(serviceMethod,".") + if len(sMethod)!=2 { + serr := fmt.Errorf("Call serviceMethod %s is error!",serviceMethod) + log.Error("%+v",serr) + if serr!= nil { + err = serr + } + continue + } + //调用自己rpcHandler处理器 + if sMethod[0] == handler.rpcHandler.GetName() { //自己服务调用 + err:= pLocalRpcServer.myselfRpcHandlerGo(sMethod[0],sMethod[1],args,nil) + args.DoGc() + return err + } + + //其他的rpcHandler的处理器 + pCall := pLocalRpcServer.selfNodeRpcHandlerGo(processor,pClient,true,sMethod[0],sMethod[1],nil,nil,args) + if pCall.Err!=nil { + err = pCall.Err + } + ReleaseCall(pCall) + continue + } + + //跨node调用 + pCall := pClient.RawGo(processor,true,serviceMethod,args.GetRawData(),args.GetAdditionParam(),nil) + args.DoGc() + if pCall.Err!=nil { + err = pCall.Err + } + ReleaseCall(pCall) + } + + return err } diff --git a/rpc/server.go b/rpc/server.go index 1745b2c..c124713 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -214,8 +214,8 @@ func (agent *RpcAgent) Destroy() { agent.conn.Destroy() } -func (server *Server) NewAgent(conn *network.TCPConn) network.Agent { - agent := &RpcAgent{conn: conn, rpcServer: server} +func (server *Server) NewAgent(c *network.TCPConn) network.Agent { + agent := &RpcAgent{conn: c, rpcServer: server} return agent } @@ -232,12 +232,15 @@ func (server *Server) myselfRpcHandlerGo(handlerName string,methodName string, a } -func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Client,noReply bool,handlerName string,methodName string, args interface{},rawArgs []byte,reply interface{},additionParam interface{}) *Call { +func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Client,noReply bool,handlerName string,methodName string, args interface{},reply interface{},inputArgs IRawInputArgs) *Call { pCall := MakeCall() pCall.Seq = client.generateSeq() rpcHandler := server.rpcHandleFinder.FindRpcHandler(handlerName) if rpcHandler== nil { + if inputArgs!= nil { + inputArgs.DoGc() + } pCall.Err = fmt.Errorf("service method %s.%s not config!", handlerName,methodName) log.Error("%s",pCall.Err.Error()) pCall.done <- pCall @@ -248,11 +251,14 @@ func (server *Server) selfNodeRpcHandlerGo(processor IRpcProcessor,client *Clien req.bLocalRequest = true req.localParam = args req.localReply = reply - req.localRawParam = rawArgs + req.inputArgs = inputArgs if processor == nil { _,processor = GetProcessorType(args) } - + var additionParam interface{} + if inputArgs!=nil { + additionParam = inputArgs.GetAdditionParam() + } req.RpcRequestData = processor.MakeRpcRequest(0,fmt.Sprintf("%s.%s",handlerName,methodName),noReply,nil,additionParam) req.rpcProcessor = processor if noReply == false { diff --git a/sysservice/tcpgateway/Router.go b/sysservice/tcpgateway/Router.go index ac87759..3867a32 100644 --- a/sysservice/tcpgateway/Router.go +++ b/sysservice/tcpgateway/Router.go @@ -2,10 +2,10 @@ package tcpgateway import ( "github.com/duanhf2012/origin/log" + "github.com/duanhf2012/origin/network" "github.com/duanhf2012/origin/node" "github.com/duanhf2012/origin/rpc" "github.com/duanhf2012/origin/sysservice/tcpservice" - "github.com/golang/protobuf/proto" "strings" ) @@ -187,6 +187,31 @@ func (r *Router) SetRouterId(clientId uint64,serviceName *string,routerId int){ r.mapClientRouterCache[clientId][*serviceName] = routerId } +type RawInputArgs struct { + rawData []byte + clientId uint64 +} + +var rawInputEmpty []byte +func (args RawInputArgs) GetRawData() []byte{ + if len(args.rawData) < 2 { + return args.rawData + } + + return args.rawData[2:] +} + +func (args RawInputArgs) GetAdditionParam() interface{}{ + return args.clientId +} + +func (args RawInputArgs) DoGc() { + if len(args.rawData) < 2 { + return + } + network.ReleaseByteSlice(args.rawData) +} + func (r *Router) RouterMessage(clientId uint64,msgType uint16,msg []byte) { routerInfo:= r.GetMsgRouterService(msgType) if routerInfo==nil { @@ -201,7 +226,7 @@ func (r *Router) RouterMessage(clientId uint64,msgType uint16,msg []byte) { } if routerId>0 { - r.rpcHandler.RawGoNode(rpc.RpcProcessorPb,routerId,routerInfo.Rpc,msg,proto.Uint64(clientId)) + r.rpcHandler.RawGoNode(rpc.RpcProcessorPb,routerId,routerInfo.Rpc,RawInputArgs{rawData: msg,clientId: clientId}) } } @@ -222,7 +247,7 @@ func (r *Router) RouterEvent(clientId uint64,eventType string) bool{ } if routerId>0 { - r.rpcHandler.RawGoNode(rpc.RpcProcessorPb,routerId,routerInfo.Rpc,[]byte{},proto.Uint64(clientId)) + r.rpcHandler.RawGoNode(rpc.RpcProcessorPb,routerId,routerInfo.Rpc,RawInputArgs{rawData: rawInputEmpty,clientId: clientId}) return true } diff --git a/sysservice/tcpservice/tcpservice.go b/sysservice/tcpservice/tcpservice.go index 87c31b8..1974d11 100644 --- a/sysservice/tcpservice/tcpservice.go +++ b/sysservice/tcpservice/tcpservice.go @@ -45,7 +45,7 @@ var seedLocker sync.Mutex type TcpPack struct { Type TcpPackType //0表示连接 1表示断开 2表示数据 - MsgProcessor processor.IProcessor + //MsgProcessor processor.IProcessor ClientId uint64 Data interface{} } @@ -118,16 +118,16 @@ func (tcpService *TcpService) OnInit() error{ } func (tcpService *TcpService) TcpEventHandler(ev *event.Event) { - pack := ev.Data.(*TcpPack) + pack := ev.Data.(TcpPack) switch pack.Type { case TPT_Connected: - pack.MsgProcessor.ConnectedRoute(pack.ClientId) + tcpService.process.ConnectedRoute(pack.ClientId) case TPT_DisConnected: - pack.MsgProcessor.DisConnectedRoute(pack.ClientId) + tcpService.process.DisConnectedRoute(pack.ClientId) case TPT_UnknownPack: - pack.MsgProcessor.UnknownMsgRoute(pack.Data,pack.ClientId) + tcpService.process.UnknownMsgRoute(pack.Data,pack.ClientId) case TPT_Pack: - pack.MsgProcessor.MsgRoute(pack.Data, pack.ClientId) + tcpService.process.MsgRoute(pack.Data, pack.ClientId) } } @@ -162,7 +162,7 @@ func (slf *Client) GetId() uint64 { } func (slf *Client) Run() { - slf.tcpService.NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp,Data:&TcpPack{ClientId:slf.id,Type:TPT_Connected,MsgProcessor:slf.tcpService.process}}) + slf.tcpService.NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp,Data:TcpPack{ClientId:slf.id,Type:TPT_Connected}}) for{ if slf.tcpConn == nil { break @@ -173,17 +173,17 @@ func (slf *Client) Run() { break } data,err:=slf.tcpService.process.Unmarshal(bytes) - slf.tcpConn.ReleaseReadMsg(bytes) + if err != nil { - slf.tcpService.NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp,Data:&TcpPack{ClientId:slf.id,Type:TPT_UnknownPack,Data:bytes,MsgProcessor:slf.tcpService.process}}) + slf.tcpService.NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp,Data:TcpPack{ClientId:slf.id,Type:TPT_UnknownPack,Data:bytes}}) continue } - slf.tcpService.NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp,Data:&TcpPack{ClientId:slf.id,Type:TPT_Pack,Data:data,MsgProcessor:slf.tcpService.process}}) + slf.tcpService.NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp,Data:TcpPack{ClientId:slf.id,Type:TPT_Pack,Data:data}}) } } func (slf *Client) OnClose(){ - slf.tcpService.NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp,Data:&TcpPack{ClientId:slf.id,Type:TPT_DisConnected,MsgProcessor:slf.tcpService.process}}) + slf.tcpService.NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp,Data:TcpPack{ClientId:slf.id,Type:TPT_DisConnected}}) slf.tcpService.mapClientLocker.Lock() defer slf.tcpService.mapClientLocker.Unlock() delete (slf.tcpService.mapClient,slf.GetId())