diff --git a/event/event.go b/event/event.go index efad7e4..963489f 100644 --- a/event/event.go +++ b/event/event.go @@ -23,13 +23,17 @@ type IEventHandler interface { GetEventProcessor() IEventProcessor //获得事件 NotifyEvent(*Event) + Desctory() + //注册了事件 addRegInfo(eventType EventType,eventProcessor IEventProcessor) removeRegInfo(eventType EventType,eventProcessor IEventProcessor) + } type IEventProcessor interface { + //同一个IEventHandler,只能接受一个EventType类型回调 RegEventReciverFunc(eventType EventType,reciver IEventHandler,callback EventCallBack) UnRegEventReciverFun(eventType EventType,reciver IEventHandler) SetEventChannel(channelNum int) bool @@ -172,7 +176,7 @@ func (slf *EventProcessor) UnRegEventReciverFun(eventType EventType,reciver IEve reciver.removeRegInfo(eventType,slf) } -func (slf *EventHandler) desctory(){ +func (slf *EventHandler) Desctory(){ for eventTyp,mapEventProcess := range slf.mapRegEvent { if mapEventProcess == nil { continue diff --git a/event/eventtype.go b/event/eventtype.go index 734ade6..249ec9d 100644 --- a/event/eventtype.go +++ b/event/eventtype.go @@ -4,9 +4,8 @@ type EventType int //大于Sys_Event_User_Define给用户定义 const ( - Sys_Event_Tcp EventType = 5 - Sys_Event_Http_Event EventType = 4 - + Sys_Event_Tcp EventType = 1 + Sys_Event_Http_Event EventType = 2 Sys_Event_User_Define EventType = 1000 ) diff --git a/example/main.go b/example/main.go index 405fd34..458e536 100644 --- a/example/main.go +++ b/example/main.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "github.com/duanhf2012/origin/event" "github.com/duanhf2012/origin/example/GateService" "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/node" @@ -51,29 +52,62 @@ var moduleid4 int64 func (slf *Module1) OnInit() error { fmt.Printf("I'm Module1:%d\n",slf.GetModuleId()) + slf.AfterFunc(time.Second*5,func(){ + slf.NotifyEvent(&event.Event{ + Type: Event1, + Data: "xxxxxxxxxxx", + }) + }) return nil } func (slf *Module2) OnInit() error { fmt.Printf("I'm Module2:%d\n",slf.GetModuleId()) + slf.GetEventProcessor().RegEventReciverFunc(Event1,slf.GetEventHandler(),slf.Module2Test) + + moduleid3,_ = slf.AddModule(&Module3{}) + slf.AfterFunc(time.Second*3, func() { + slf.ReleaseModule(moduleid3) + }) return nil } + + +func (slf *Module2) Module2Test(ev *event.Event){ + fmt.Print("\n>>>>>>>>Module2:",ev) +} + + func (slf *Module3) OnInit() error { + slf.GetParent().GetParent().GetEventProcessor().RegEventReciverFunc(Event1,slf.GetEventHandler(),slf.Module3Test) + fmt.Printf("I'm Module3:%d\n",slf.GetModuleId()) moduleid4,_ = slf.AddModule(&Module4{}) return nil } +func (slf *Module3) Module3Test(ev *event.Event){ + fmt.Print("\n>>>>>>>>Module3:",ev) +} + +const ( + Event1 event.EventType = 10002 +) func (slf *Module4) OnInit() error { fmt.Printf("I'm Module4:%d\n",slf.GetModuleId()) //pService := slf.GetService().(*TestServiceCall) //pService.RPC_Test(nil,nil) slf.AfterFunc(time.Second*10,slf.TimerTest) + slf.GetParent().GetParent().GetParent().GetEventProcessor().RegEventReciverFunc(Event1,slf.GetEventHandler(),slf.Module4Test) return nil } +func (slf *Module4) Module4Test(ev *event.Event){ + fmt.Print("\n>>>>>>>>>>>Module4:",ev) +} + func (slf *Module4) TimerTest(){ fmt.Printf("Module4 tigger timer\n") } @@ -95,7 +129,7 @@ func (slf *TestServiceCall) OnInit() error { slf.OpenProfiler() //slf.AfterFunc(time.Second*1,slf.Run) - slf.AfterFunc(time.Second*1,slf.Test) + //slf.AfterFunc(time.Second*1,slf.Test) moduleid1,_ = slf.AddModule(&Module1{}) moduleid2,_ = slf.AddModule(&Module2{}) fmt.Print(moduleid1,moduleid2) @@ -111,8 +145,8 @@ func (slf *TestServiceCall) OnInit() error { } func (slf *TestServiceCall) Release(){ - slf.ReleaseModule(moduleid1) - slf.ReleaseModule(moduleid2) + /*slf.ReleaseModule(moduleid1) + slf.ReleaseModule(moduleid2)*/ } diff --git a/service/module.go b/service/module.go index 95c4a56..261c578 100644 --- a/service/module.go +++ b/service/module.go @@ -10,7 +10,7 @@ import ( "time" ) -const InitModuleId = 1e18 +const InitModuleId = 1e17 type IModule interface { @@ -53,7 +53,6 @@ type Module struct { //事件管道 moduleName string eventHandler event.EventHandler - //eventChan chan *SEvent } @@ -79,6 +78,10 @@ func (slf *Module) OnInit() error{ } func (slf *Module) AddModule(module IModule) (int64,error){ + //没有事件处理器不允许加入其他模块 + if slf.GetEventProcessor() == nil { + return 0,fmt.Errorf("module %+v is not Event Processor is nil",slf.self) + } pAddModule := module.getBaseModule().(*Module) if pAddModule.GetModuleId()==0 { pAddModule.moduleId = slf.NewModuleId() @@ -111,13 +114,14 @@ func (slf *Module) AddModule(module IModule) (int64,error){ } func (slf *Module) ReleaseModule(moduleId int64){ - //pBaseModule := slf.GetModule(moduleId).getBaseModule().(*Module) pModule := slf.GetModule(moduleId).getBaseModule().(*Module) //释放子孙 for id,_ := range pModule.child { slf.ReleaseModule(id) } + + pModule.GetEventHandler().Desctory() pModule.self.OnRelease() log.Debug("Release module %s.",slf.GetModuleName()) for pTimer,_ := range pModule.mapActiveTimer { diff --git a/service/service.go b/service/service.go index 1d69ac0..6f77599 100644 --- a/service/service.go +++ b/service/service.go @@ -39,7 +39,6 @@ type Service struct { name string //service name closeSig chan bool wg sync.WaitGroup - this IService serviceCfg interface{} gorouterNum int32 startStatus bool @@ -62,9 +61,9 @@ func (slf *Service) OpenProfiler() { func (slf *Service) Init(iservice IService,getClientFun rpc.FuncRpcClient,getServerFun rpc.FuncRpcServer,serviceCfg interface{}) { slf.dispatcher =timer.NewDispatcher(timerDispatcherLen) - slf.this = iservice - slf.InitRpcHandler(iservice.(rpc.IRpcHandler),getClientFun,getServerFun) + slf.InitRpcHandler(iservice.(rpc.IRpcHandler),getClientFun,getServerFun) + slf.self = iservice.(IModule) //初始化祖先 slf.ancestor = iservice.(IModule) slf.seedModuleId =InitModuleId @@ -72,7 +71,7 @@ func (slf *Service) Init(iservice IService,getClientFun rpc.FuncRpcClient,getSer slf.serviceCfg = serviceCfg slf.gorouterNum = 1 slf.eventHandler.Init(&slf.eventProcessor) - slf.this.OnInit() + slf.self.OnInit() } func (slf *Service) SetGoRouterNum(gorouterNum int32) bool { @@ -171,7 +170,7 @@ func (slf *Service) Release(){ log.Error("core dump info:%+v\n",err) } }() - slf.this.OnRelease() + slf.self.OnRelease() log.Debug("Release Service %s.",slf.GetName()) } @@ -182,7 +181,6 @@ func (slf *Service) OnInit() error { return nil } - func (slf *Service) Wait(){ slf.wg.Wait() } @@ -191,13 +189,10 @@ func (slf *Service) GetServiceCfg()interface{}{ return slf.serviceCfg } - func (slf *Service) GetProfiler() *profiler.Profiler{ return slf.profiler } - - func (slf *Service) RegEventReciverFunc(eventType event.EventType,reciver event.IEventHandler,callback event.EventCallBack){ slf.eventProcessor.RegEventReciverFunc(eventType,reciver,callback) } diff --git a/sysservice/tcpservice.go b/sysservice/tcpservice.go index 5061c38..422bc62 100644 --- a/sysservice/tcpservice.go +++ b/sysservice/tcpservice.go @@ -20,14 +20,27 @@ type TcpService struct { process network.Processor } +type TcpPackType int8 +const( + TPT_Connected TcpPackType = 0 + TPT_DisConnected TcpPackType = 1 + TPT_Pack TcpPackType = 2 + TPT_UnknownPack TcpPackType = 3 +) + +type TcpPack struct { + Type TcpPackType //0表示连接 1表示断开 2表示数据 + MsgProcessor network.Processor + ClientId uint64 + Data interface{} +} + const Default_MaxConnNum = 3000 const Default_PendingWriteNum = 10000 const Default_LittleEndian = false const Default_MinMsgLen = 2 const Default_MaxMsgLen = 65535 - - func (slf *TcpService) OnInit() error{ iConfig := slf.GetServiceCfg() if iConfig == nil { @@ -67,40 +80,21 @@ func (slf *TcpService) OnInit() error{ slf.mapClient = make( map[uint64] *Client,slf.tcpServer.MaxConnNum) slf.tcpServer.NewAgent =slf.NewClient slf.tcpServer.Start() - //加载配置 return nil } - - - -type TcpPackType int8 -const( - TPT_Connected TcpPackType = 0 - TPT_DisConnected TcpPackType = 1 - TPT_Pack TcpPackType = 2 - TPT_UnknownPack TcpPackType = 3 -) - -type TcpPack struct { - Type TcpPackType //0表示连接 1表示断开 2表示数据 - MsgProcessor network.Processor - ClientId uint64 - Data interface{} -} - - func (slf *TcpService) TcpEventHandler(ev *event.Event) { - pack := ev.Data.(*TcpPack) - if pack.Type == TPT_Connected { - pack.MsgProcessor.ConnectedRoute(pack.ClientId) - }else if pack.Type == TPT_DisConnected { - pack.MsgProcessor.DisConnectedRoute(pack.ClientId) - } else if pack.Type == TPT_UnknownPack{ - pack.MsgProcessor.UnknownMsgRoute(pack.Data,pack.ClientId) - } else if pack.Type == TPT_Pack { - pack.MsgProcessor.MsgRoute(pack.Data, pack.ClientId) - } + pack := ev.Data.(*TcpPack) + switch pack.Type { + case TPT_Connected: + pack.MsgProcessor.ConnectedRoute(pack.ClientId) + case TPT_DisConnected: + pack.MsgProcessor.DisConnectedRoute(pack.ClientId) + case TPT_UnknownPack: + pack.MsgProcessor.UnknownMsgRoute(pack.Data,pack.ClientId) + case TPT_Pack: + pack.MsgProcessor.MsgRoute(pack.Data, pack.ClientId) + } } func (slf *TcpService) SetProcessor(process network.Processor,handler event.IEventHandler){ @@ -128,16 +122,12 @@ func (slf *TcpService) NewClient(conn *network.TCPConn) network.Agent { return nil } - - type Client struct { id uint64 tcpConn *network.TCPConn tcpService *TcpService } - - func (slf *Client) GetId() uint64 { return slf.id } @@ -183,7 +173,6 @@ func (slf *TcpService) SendMsg(clientid uint64,msg interface{}) error{ } func (slf *TcpService) Close(clientid uint64) { - // slf.mapClientLocker.Lock() defer slf.mapClientLocker.Unlock() @@ -199,6 +188,3 @@ func (slf *TcpService) Close(clientid uint64) { return } -func (slf *TcpService) OnRelease() { - -} \ No newline at end of file