From 742ffc410eed4a9ca31e45d1cef74168764c971f Mon Sep 17 00:00:00 2001 From: duanhf2012 Date: Mon, 20 Apr 2020 15:38:42 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E6=9C=8D=E5=8A=A1=E9=97=B4?= =?UTF-8?q?=E4=BA=8B=E4=BB=B6=E9=80=9A=E7=9F=A5=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- event/event.go | 248 +++++++++++++++++++++-------- event/eventtype.go | 7 +- example/GateService/GateService.go | 27 +--- example/main.go | 2 +- network/processor.go | 8 +- network/processor/pbprocessor.go | 37 ++++- service/module.go | 17 +- service/service.go | 16 +- sysservice/httpservice.go | 22 +-- sysservice/tcpservice.go | 50 ++++-- 10 files changed, 311 insertions(+), 123 deletions(-) diff --git a/event/event.go b/event/event.go index c0a01b9..efad7e4 100644 --- a/event/event.go +++ b/event/event.go @@ -10,91 +10,190 @@ import ( const Default_EventChannelLen = 10000 //事件接受器 -type EventReciverFunc func(event *Event) +type EventCallBack func(event *Event) + type Event struct { Type EventType Data interface{} } -type IEventProcessor interface { +type IEventHandler interface { + + GetEventProcessor() IEventProcessor //获得事件 NotifyEvent(*Event) - SetEventReciver(eventProcessor IEventProcessor) - GetEventReciver() IEventProcessor - SetEventChanNum(num int32) bool - RegEventReciverFunc(eventType EventType,reciverFunc EventReciverFunc) - UnRegEventReciverFun(eventType EventType) + //注册了事件 + addRegInfo(eventType EventType,eventProcessor IEventProcessor) + removeRegInfo(eventType EventType,eventProcessor IEventProcessor) + } +type IEventProcessor interface { + RegEventReciverFunc(eventType EventType,reciver IEventHandler,callback EventCallBack) + UnRegEventReciverFun(eventType EventType,reciver IEventHandler) + SetEventChannel(channelNum int) bool + + castEvent(event *Event) //广播事件 + pushEvent(event *Event) + addBindEvent(eventType EventType,reciver IEventHandler,callback EventCallBack) + addListen(eventType EventType,reciver IEventHandler) + removeBindEvent(eventType EventType,reciver IEventHandler) + removeListen(eventType EventType,reciver IEventHandler) +} + +type EventHandler struct { + //已经注册的事件类型 + eventProcessor IEventProcessor + + //已经注册的事件 + locker sync.RWMutex + mapRegEvent map[EventType]map[IEventProcessor]interface{} //向其他事件处理器监听的事件类型 +} + + type EventProcessor struct { - //事件管道 - EventChan chan *Event - eventReciver IEventProcessor + eventChannel chan *Event - eventChanNumLocker sync.RWMutex - eventChanNum int32 - mapEventReciverFunc map[EventType]EventReciverFunc + locker sync.RWMutex + mapListenerEvent map[EventType]map[IEventProcessor]int //监听者信息 + mapBindHandlerEvent map[EventType]map[IEventHandler]EventCallBack//收到事件处理 } -func (slf *EventProcessor) RegEventReciverFunc(eventType EventType,reciverFunc EventReciverFunc){ - if slf.mapEventReciverFunc == nil { - slf.mapEventReciverFunc = map[EventType]EventReciverFunc{} - } - slf.mapEventReciverFunc[eventType] = reciverFunc -} - -func (slf *EventProcessor) UnRegEventReciverFun(eventType EventType){ - delete(slf.mapEventReciverFunc,eventType) -} - -func (slf *EventProcessor) NotifyEvent(pEvent *Event) { - if len(slf.EventChan) >= int(slf.eventChanNum) { - log.Error("event queue is full!") - } - slf.EventChan <-pEvent -} - - - -func (slf *EventProcessor) GetEventChan() chan *Event{ - slf.eventChanNumLocker.Lock() - defer slf.eventChanNumLocker.Unlock() - - if slf.eventChanNum == 0 { - slf.eventChanNum = Default_EventChannelLen +func (slf *EventHandler) addRegInfo(eventType EventType,eventProcessor IEventProcessor){ + slf.locker.Lock() + defer slf.locker.Unlock() + if slf.mapRegEvent == nil { + slf.mapRegEvent = map[EventType]map[IEventProcessor]interface{}{} } - if slf.EventChan == nil { - slf.EventChan = make(chan *Event,slf.eventChanNum) + if _,ok := slf.mapRegEvent[eventType] ;ok == false{ + slf.mapRegEvent[eventType] = map[IEventProcessor]interface{}{} } - - return slf.EventChan + slf.mapRegEvent[eventType][eventProcessor] = nil } -//不允许重复设置 -func (slf *EventProcessor) SetEventChanNum(num int32) bool { - slf.eventChanNumLocker.Lock() - defer slf.eventChanNumLocker.Unlock() - if slf.eventChanNum>0 { +func (slf *EventHandler) removeRegInfo(eventType EventType,eventProcessor IEventProcessor){ + if _,ok :=slf.mapRegEvent[eventType];ok == true { + delete(slf.mapRegEvent[eventType],eventProcessor) + } +} + +func (slf *EventHandler) GetEventProcessor() IEventProcessor{ + return slf.eventProcessor +} + +func (slf *EventHandler) NotifyEvent(ev *Event){ + slf.GetEventProcessor().castEvent(ev) +} + +func (slf *EventHandler) Init(processor IEventProcessor){ + slf.eventProcessor = processor +} + + +func (slf *EventProcessor) SetEventChannel(channelNum int) bool{ + slf.locker.Lock() + defer slf.locker.Unlock() + if slf.eventChannel!=nil { return false } - slf.eventChanNum = num + if channelNum == 0 { + channelNum = Default_EventChannelLen + } + + slf.eventChannel = make(chan *Event,channelNum) return true } -func (slf *EventProcessor) SetEventReciver(eventProcessor IEventProcessor){ - slf.eventReciver = eventProcessor +func (slf *EventProcessor) addBindEvent(eventType EventType,reciver IEventHandler,callback EventCallBack){ + //mapBindHandlerEvent map[EventType]map[IEventHandler]EventCallBack//收到事件处理 + slf.locker.Lock() + defer slf.locker.Unlock() + if slf.mapBindHandlerEvent == nil { + slf.mapBindHandlerEvent = map[EventType]map[IEventHandler]EventCallBack{} + } + + if _,ok := slf.mapBindHandlerEvent[eventType]; ok == false { + slf.mapBindHandlerEvent[eventType] = map[IEventHandler]EventCallBack{} + } + + slf.mapBindHandlerEvent[eventType][reciver] = callback } +func (slf *EventProcessor) addListen(eventType EventType,reciver IEventHandler){ + slf.locker.Lock() + defer slf.locker.Unlock() -func (slf *EventProcessor) GetEventReciver() IEventProcessor{ - return slf.eventReciver + //mapListenerEvent map[EventType]map[IEventProcessor]int + if slf.mapListenerEvent == nil { + slf.mapListenerEvent = map[EventType]map[IEventProcessor]int{} + } + + if _,ok :=slf.mapListenerEvent[eventType];ok == false{ + slf.mapListenerEvent[eventType] = map[IEventProcessor]int{} + } + + slf.mapListenerEvent[eventType][reciver.GetEventProcessor()] += 1 } -type IHttpEventData interface { - Handle() +func (slf *EventProcessor) removeBindEvent(eventType EventType,reciver IEventHandler){ + slf.locker.Lock() + defer slf.locker.Unlock() + if _,ok := slf.mapBindHandlerEvent[eventType];ok == true{ + delete(slf.mapBindHandlerEvent[eventType],reciver) + } +} + +func (slf *EventProcessor) removeListen(eventType EventType,reciver IEventHandler){ + slf.locker.Lock() + defer slf.locker.Unlock() + if _,ok := slf.mapListenerEvent[eventType];ok == true{ + slf.mapListenerEvent[eventType][reciver.GetEventProcessor()]-=1 + if slf.mapListenerEvent[eventType][reciver.GetEventProcessor()] <= 0 { + delete(slf.mapListenerEvent[eventType],reciver.GetEventProcessor()) + } + } +} + +func (slf *EventProcessor) RegEventReciverFunc(eventType EventType,reciver IEventHandler,callback EventCallBack){ + //记录reciver自己注册过的事件 + reciver.addRegInfo(eventType,slf) + //记录当前所属IEventProcessor注册的回调 + reciver.GetEventProcessor().addBindEvent(eventType,reciver,callback) + //将注册加入到监听中 + slf.addListen(eventType,reciver) +} + +func (slf *EventProcessor) UnRegEventReciverFun(eventType EventType,reciver IEventHandler) { + slf.removeListen(eventType,reciver) + reciver.GetEventProcessor().removeBindEvent(eventType,reciver) + reciver.removeRegInfo(eventType,slf) +} + +func (slf *EventHandler) desctory(){ + for eventTyp,mapEventProcess := range slf.mapRegEvent { + if mapEventProcess == nil { + continue + } + + //map[IEventProcessor]interface{} + for eventProcess,_ := range mapEventProcess { + eventProcess.UnRegEventReciverFun(eventTyp,slf) + } + } +} + +func (slf *EventProcessor) GetEventChan() chan *Event{ + slf.locker.Lock() + defer slf.locker.Unlock() + + if slf.eventChannel == nil { + slf.eventChannel =make(chan *Event,Default_EventChannelLen) + } + + return slf.eventChannel } func (slf *EventProcessor) EventHandler(ev *Event) { @@ -107,25 +206,40 @@ func (slf *EventProcessor) EventHandler(ev *Event) { } }() - if slf.innerEventHandler(ev) == true { + mapCallBack,ok := slf.mapBindHandlerEvent[ev.Type] + if ok == false { return } - - if fun,ok := slf.mapEventReciverFunc[ev.Type];ok == false{ - return - }else{ - fun(ev) + for _,callback := range mapCallBack { + callback(ev) } } -func (slf *EventProcessor) innerEventHandler(ev *Event) bool { - switch ev.Type { - case Sys_Event_Http_Event: - ev.Data.(IHttpEventData).Handle() - return true + + + +func (slf *EventProcessor) pushEvent(event *Event){ + if len(slf.eventChannel)>=cap(slf.eventChannel){ + log.Error("event process channel is full.") + return } - return false + slf.eventChannel<-event } +func (slf *EventProcessor) castEvent(event *Event){ + if slf.mapListenerEvent == nil{ + log.Error("mapListenerEvent not init!") + return + } + processor,ok :=slf.mapListenerEvent[event.Type] + if ok == false || processor == nil{ + log.Debug("event type %d not listen.",event.Type) + return + } + + for proc,_ := range processor { + proc.pushEvent(event) + } +} diff --git a/event/eventtype.go b/event/eventtype.go index e6b42f6..734ade6 100644 --- a/event/eventtype.go +++ b/event/eventtype.go @@ -4,11 +4,8 @@ type EventType int //大于Sys_Event_User_Define给用户定义 const ( - Sys_Event_Tcp_Connected EventType= 1 - Sys_Event_Tcp_DisConnected EventType= 2 - Sys_Event_Tcp_RecvPack EventType = 3 - Sys_Event_Tcp_PackException EventType = 4 - Sys_Event_Http_Event EventType = 5 + Sys_Event_Tcp EventType = 5 + Sys_Event_Http_Event EventType = 4 Sys_Event_User_Define EventType = 1000 ) diff --git a/example/GateService/GateService.go b/example/GateService/GateService.go index b8c4f83..ac4fcf1 100644 --- a/example/GateService/GateService.go +++ b/example/GateService/GateService.go @@ -3,8 +3,6 @@ package GateService import ( "encoding/json" "fmt" - "github.com/duanhf2012/origin/event" - "github.com/duanhf2012/origin/network" "github.com/duanhf2012/origin/network/processor" "github.com/duanhf2012/origin/node" "github.com/duanhf2012/origin/service" @@ -15,18 +13,20 @@ import ( type GateService struct { service.Service - processor network.Processor + processor *processor.PBProcessor httpRouter sysservice.IHttpRouter } func (slf *GateService) OnInit() error{ tcpervice := node.GetService("TcpService").(*sysservice.TcpService) slf.processor = &processor.PBProcessor{} - tcpervice.SetProcessor(slf.processor) + slf.processor.RegisterDisConnected(slf.OnDisconnected) + slf.processor.RegisterConnected(slf.OnConnected) + tcpervice.SetProcessor(slf.processor,slf.GetEventHandler()) httpervice := node.GetService("HttpService").(*sysservice.HttpService) - slf.httpRouter = sysservice.NewHttpHttpRouter(slf) - httpervice.SetHttpRouter(slf.httpRouter) + slf.httpRouter = sysservice.NewHttpHttpRouter() + httpervice.SetHttpRouter(slf.httpRouter,slf.GetEventHandler()) slf.httpRouter.GET("/get/query", slf.HttpTest) slf.httpRouter.POST("/post/query", slf.HttpTestPost) @@ -70,21 +70,6 @@ func (slf *GateService) HttpTestPost(session *sysservice.HttpSession) { session.WriteJsonDone(http.StatusOK,"asdasda") } -func (slf *GateService) OnEventHandler(ev *event.Event) error{ - - if ev.Type == event.Sys_Event_Tcp_RecvPack { - pPack := ev.Data.(*sysservice.TcpPack) - slf.processor.Route(ev.Data,pPack.ClientId) - }else if ev.Type == event.Sys_Event_Tcp_Connected { - pPack := ev.Data.(*sysservice.TcpPack) - slf.OnConnected(pPack.ClientId) - }else if ev.Type == event.Sys_Event_Tcp_DisConnected { - pPack := ev.Data.(*sysservice.TcpPack) - slf.OnDisconnected(pPack.ClientId) - } - return nil -} - func (slf *GateService) OnConnected(clientid uint64){ fmt.Printf("client id %d connected",clientid) } diff --git a/example/main.go b/example/main.go index 15ede55..405fd34 100644 --- a/example/main.go +++ b/example/main.go @@ -235,7 +235,7 @@ func main(){ tcpService := &sysservice.TcpService{} gateService := &GateService.GateService{} - tcpService.SetEventReciver(gateService) + httpService := &sysservice.HttpService{} diff --git a/network/processor.go b/network/processor.go index b51faef..48d1828 100644 --- a/network/processor.go +++ b/network/processor.go @@ -3,7 +3,13 @@ package network type Processor interface { // must goroutine safe - Route(msg interface{}, userData interface{}) error + MsgRoute(msg interface{}, userData interface{}) error + //must goroutine safe + UnknownMsgRoute(msg interface{}, userData interface{}) + // connect event + ConnectedRoute(userData interface{}) + DisConnectedRoute(userData interface{}) + // must goroutine safe Unmarshal(data []byte) (interface{}, error) // must goroutine safe diff --git a/network/processor/pbprocessor.go b/network/processor/pbprocessor.go index 6f6c033..8bf6c79 100644 --- a/network/processor/pbprocessor.go +++ b/network/processor/pbprocessor.go @@ -13,10 +13,17 @@ type MessageInfo struct { } type MessageHandler func(clientid uint64,msg proto.Message) +type ConnectHandler func(clientid uint64) +type UnknownMessageHandler func(clientid uint64,msg []byte) + const MsgTypeSize = 2 type PBProcessor struct { mapMsg map[uint16]MessageInfo LittleEndian bool + + unknownMessageHandler UnknownMessageHandler + connectHandler ConnectHandler + disconnectHandler ConnectHandler } func NewPBProcessor() *PBProcessor { @@ -44,7 +51,7 @@ func (slf *PBPackInfo) GetMsg() proto.Message { } // must goroutine safe -func (slf *PBProcessor ) Route(msg interface{},userdata interface{}) error{ +func (slf *PBProcessor ) MsgRoute(msg interface{},userdata interface{}) error{ pPackInfo := msg.(*PBPackInfo) v,ok := slf.mapMsg[pPackInfo.typ] if ok == false { @@ -117,3 +124,31 @@ func (slf *PBProcessor) MakeMsg(msgType uint16,protoMsg proto.Message) *PBPackIn func (slf *PBProcessor) MakeRawMsg(msgType uint16,msg []byte) *PBPackInfo { return &PBPackInfo{typ:msgType,rawMsg:msg} } + +func (slf *PBProcessor) UnknownMsgRoute(msg interface{}, userData interface{}){ + slf.unknownMessageHandler(userData.(uint64),msg.([]byte)) +} + +// connect event +func (slf *PBProcessor) ConnectedRoute(userData interface{}){ + slf.connectHandler(userData.(uint64)) +} + +func (slf *PBProcessor) DisConnectedRoute(userData interface{}){ + slf.disconnectHandler(userData.(uint64)) +} + +func (slf *PBProcessor) RegisterUnknownMsg(unknownMessageHandler UnknownMessageHandler){ + slf.unknownMessageHandler = unknownMessageHandler +} + +func (slf *PBProcessor) RegisterConnected(connectHandler ConnectHandler){ + slf.connectHandler = connectHandler +} + +func (slf *PBProcessor) RegisterDisConnected(disconnectHandler ConnectHandler){ + slf.disconnectHandler = disconnectHandler +} + + + diff --git a/service/module.go b/service/module.go index 8873ba1..95c4a56 100644 --- a/service/module.go +++ b/service/module.go @@ -26,8 +26,9 @@ type IModule interface { OnRelease() getBaseModule() IModule GetService() IService - GetEventChan() chan *event.Event GetModuleName() string + GetEventProcessor()event.IEventProcessor + NotifyEvent(ev *event.Event) } @@ -50,8 +51,8 @@ type Module struct { descendants map[int64]IModule//始祖的后裔们 //事件管道 - event.EventProcessor moduleName string + eventHandler event.EventHandler //eventChan chan *SEvent } @@ -96,6 +97,7 @@ func (slf *Module) AddModule(module IModule) (int64,error){ pAddModule.dispatcher = slf.GetAncestor().getBaseModule().(*Module).dispatcher pAddModule.ancestor = slf.ancestor pAddModule.moduleName = reflect.Indirect(reflect.ValueOf(module)).Type().Name() + pAddModule.eventHandler.Init(slf.eventHandler.GetEventProcessor()) err := module.OnInit() if err != nil { return 0,err @@ -202,3 +204,14 @@ func (slf *Module) GetService() IService { return slf.GetAncestor().(IService) } +func (slf *Module) GetEventProcessor() event.IEventProcessor{ + return slf.eventHandler.GetEventProcessor() +} + +func (slf *Module) NotifyEvent(ev *event.Event){ + slf.eventHandler.NotifyEvent(ev) +} + +func (slf *Module) GetEventHandler() event.IEventHandler{ + return &slf.eventHandler +} \ No newline at end of file diff --git a/service/service.go b/service/service.go index 31ed0f6..1d69ac0 100644 --- a/service/service.go +++ b/service/service.go @@ -2,6 +2,7 @@ package service import ( "fmt" + "github.com/duanhf2012/origin/event" "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/profiler" "github.com/duanhf2012/origin/rpc" @@ -42,7 +43,7 @@ type Service struct { serviceCfg interface{} gorouterNum int32 startStatus bool - + eventProcessor event.EventProcessor //事件接收者 profiler *profiler.Profiler //性能分析器 } @@ -70,6 +71,7 @@ func (slf *Service) Init(iservice IService,getClientFun rpc.FuncRpcClient,getSer slf.descendants = map[int64]IModule{} slf.serviceCfg = serviceCfg slf.gorouterNum = 1 + slf.eventHandler.Init(&slf.eventProcessor) slf.this.OnInit() } @@ -101,7 +103,7 @@ func (slf *Service) Run() { for{ rpcRequestChan := slf.GetRpcRequestChan() rpcResponeCallBack := slf.GetRpcResponeChan() - eventChan := slf.GetEventChan() + eventChan := slf.eventProcessor.GetEventChan() var analyzer *profiler.Analyzer select { case <- closeSig: @@ -129,7 +131,7 @@ func (slf *Service) Run() { if slf.profiler!=nil { analyzer = slf.profiler.Push(fmt.Sprintf("Event_%d", int(ev.Type))) } - slf.EventHandler(ev) + slf.eventProcessor.EventHandler(ev) if analyzer!=nil { analyzer.Pop() analyzer = nil @@ -195,3 +197,11 @@ func (slf *Service) GetProfiler() *profiler.Profiler{ } + +func (slf *Service) RegEventReciverFunc(eventType event.EventType,reciver event.IEventHandler,callback event.EventCallBack){ + slf.eventProcessor.RegEventReciverFunc(eventType,reciver,callback) +} + +func (slf *Service) UnRegEventReciverFun(eventType event.EventType,reciver event.IEventHandler){ + slf.eventProcessor.UnRegEventReciverFun(eventType,reciver) +} \ No newline at end of file diff --git a/sysservice/httpservice.go b/sysservice/httpservice.go index cd5bad7..93dcf82 100644 --- a/sysservice/httpservice.go +++ b/sysservice/httpservice.go @@ -57,7 +57,6 @@ type IHttpRouter interface { POST(url string, handle HttpHandle) bool Router(session *HttpSession) - PutHttpSession(httpSession *HttpSession) SetServeFile(method HTTP_METHOD, urlpath string, dirname string) error SetFormFileKey(formFileKey string) GetFormFileKey()string @@ -67,7 +66,7 @@ type IHttpRouter interface { type HttpRouter struct { pathRouter map[HTTP_METHOD] map[string] routerMatchData //url地址,对应本service地址 serveFileData map[string] *routerServeFileData - eventReciver event.IEventProcessor + //eventReciver event.IEventHandler httpFiltrateList [] HttpFiltrate formFileKey string @@ -104,9 +103,9 @@ type HttpService struct { -func NewHttpHttpRouter(eventReciver event.IEventProcessor) IHttpRouter { +func NewHttpHttpRouter() IHttpRouter { httpRouter := &HttpRouter{} - httpRouter.eventReciver = eventReciver + //httpRouter.eventReciver = eventHandler httpRouter.pathRouter =map[HTTP_METHOD] map[string] routerMatchData{} httpRouter.serveFileData = map[string] *routerServeFileData{} httpRouter.formFileKey = "file" @@ -185,8 +184,6 @@ func (slf *HttpSession) WriteJsonDone(statusCode int,msgJson interface{}) error return err } - - func (slf *HttpSession) flush() { slf.w.WriteHeader(slf.statusCode) if slf.msg!=nil { @@ -235,9 +232,6 @@ func (slf *HttpRouter) GetFormFileKey()string{ return slf.formFileKey } -func (slf *HttpRouter) PutHttpSession(httpSession *HttpSession){ - slf.eventReciver.NotifyEvent(&event.Event{Type:event.Sys_Event_Http_Event,Data:httpSession}) -} func (slf *HttpRouter) GET(url string, handle HttpHandle) bool { return slf.regRouter(METHOD_GET, url, handle) @@ -297,8 +291,14 @@ func (slf *HttpRouter) Router(session *HttpSession){ session.Done() } -func (slf *HttpService) SetHttpRouter(httpRouter IHttpRouter) { + +func (slf *HttpService) HttpEventHandler(ev *event.Event) { + ev.Data.(*HttpSession).Handle() +} + +func (slf *HttpService) SetHttpRouter(httpRouter IHttpRouter,eventHandler event.IEventHandler) { slf.httpRouter = httpRouter + slf.RegEventReciverFunc(event.Sys_Event_Http_Event,eventHandler,slf.HttpEventHandler) } @@ -521,7 +521,7 @@ func (slf *HttpService) ServeHTTP(w http.ResponseWriter, r *http.Request) { } session.body = body - slf.httpRouter.PutHttpSession(session) + slf.GetEventHandler().NotifyEvent(&event.Event{Type:event.Sys_Event_Http_Event,Data:session}) ticker := time.NewTicker(slf.processTimeout) select { case <-ticker.C: diff --git a/sysservice/tcpservice.go b/sysservice/tcpservice.go index a295d79..5061c38 100644 --- a/sysservice/tcpservice.go +++ b/sysservice/tcpservice.go @@ -71,8 +71,41 @@ func (slf *TcpService) OnInit() error{ return nil } -func (slf *TcpService) SetProcessor(process network.Processor){ + + + +type TcpPackType int8 +const( + TPT_Connected TcpPackType = 0 + TPT_DisConnected TcpPackType = 1 + TPT_Pack TcpPackType = 2 + TPT_UnknownPack TcpPackType = 3 +) + +type TcpPack struct { + Type TcpPackType //0表示连接 1表示断开 2表示数据 + MsgProcessor network.Processor + ClientId uint64 + Data interface{} +} + + +func (slf *TcpService) TcpEventHandler(ev *event.Event) { + pack := ev.Data.(*TcpPack) + if pack.Type == TPT_Connected { + pack.MsgProcessor.ConnectedRoute(pack.ClientId) + }else if pack.Type == TPT_DisConnected { + pack.MsgProcessor.DisConnectedRoute(pack.ClientId) + } else if pack.Type == TPT_UnknownPack{ + pack.MsgProcessor.UnknownMsgRoute(pack.Data,pack.ClientId) + } else if pack.Type == TPT_Pack { + pack.MsgProcessor.MsgRoute(pack.Data, pack.ClientId) + } +} + +func (slf *TcpService) SetProcessor(process network.Processor,handler event.IEventHandler){ slf.process = process + slf.RegEventReciverFunc(event.Sys_Event_Tcp,handler,slf.TcpEventHandler) } func (slf *TcpService) NewClient(conn *network.TCPConn) network.Agent { @@ -103,17 +136,14 @@ type Client struct { tcpService *TcpService } -type TcpPack struct { - ClientId uint64 - Data interface{} -} + func (slf *Client) GetId() uint64 { return slf.id } func (slf *Client) Run() { - slf.tcpService.GetEventReciver().NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp_Connected,Data:&TcpPack{ClientId:slf.id}}) + slf.tcpService.NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp,Data:&TcpPack{ClientId:slf.id,Type:TPT_Connected,MsgProcessor:slf.tcpService.process}}) for{ bytes,err := slf.tcpConn.ReadMsg() if err != nil { @@ -122,17 +152,15 @@ func (slf *Client) Run() { } data,err:=slf.tcpService.process.Unmarshal(bytes) if err != nil { - slf.tcpService.GetEventReciver().NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp_PackException,Data:&TcpPack{ClientId:slf.id,Data:bytes}}) - //log.Debug("process.Unmarshal is error:%+v",err) + slf.tcpService.NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp,Data:&TcpPack{ClientId:slf.id,Type:TPT_UnknownPack,Data:bytes,MsgProcessor:slf.tcpService.process}}) continue } - - slf.tcpService.GetEventReciver().NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp_RecvPack,Data:&TcpPack{ClientId:slf.id,Data:data}}) + slf.tcpService.NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp,Data:&TcpPack{ClientId:slf.id,Type:TPT_Pack,Data:data,MsgProcessor:slf.tcpService.process}}) } } func (slf *Client) OnClose(){ - slf.tcpService.GetEventReciver().NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp_DisConnected,Data:&TcpPack{ClientId:slf.id}}) + slf.tcpService.NotifyEvent(&event.Event{Type:event.Sys_Event_Tcp,Data:&TcpPack{ClientId:slf.id,Type:TPT_DisConnected,MsgProcessor:slf.tcpService.process}}) slf.tcpService.mapClientLocker.Lock() defer slf.tcpService.mapClientLocker.Unlock() delete (slf.tcpService.mapClient,slf.GetId())