diff --git a/service/module.go b/service/module.go index 49416e9..6ae6651 100644 --- a/service/module.go +++ b/service/module.go @@ -7,18 +7,19 @@ import ( rpcHandle "github.com/duanhf2012/origin/rpc" "github.com/duanhf2012/origin/util/timer" "reflect" + "sync/atomic" "time" ) -const InitModuleId = 1e17 +const InitModuleId = 1e9 type IModule interface { - SetModuleId(moduleId int64) bool - GetModuleId() int64 - AddModule(module IModule) (int64,error) - GetModule(moduleId int64) IModule + SetModuleId(moduleId uint32) bool + GetModuleId() uint32 + AddModule(module IModule) (uint32,error) + GetModule(moduleId uint32) IModule GetAncestor()IModule - ReleaseModule(moduleId int64) - NewModuleId() int64 + ReleaseModule(moduleId uint32) + NewModuleId() uint32 GetParent()IModule OnInit() error OnRelease() @@ -37,24 +38,25 @@ type IModuleTimer interface { type Module struct { rpcHandle.IRpcHandler - moduleId int64 //模块Id + moduleId uint32 //模块Id moduleName string //模块名称 parent IModule //父亲 self IModule //自己 - child map[int64]IModule //孩子们 - mapActiveTimer map[*timer.Timer]interface{} + child map[uint32]IModule //孩子们 + mapActiveTimer map[timer.ITimer]struct{} + mapActiveIdTimer map[uint64]timer.ITimer dispatcher *timer.Dispatcher //timer //根结点 ancestor IModule //始祖 - seedModuleId int64 //模块id种子 - descendants map[int64]IModule //始祖的后裔们 + seedModuleId uint32 //模块id种子 + descendants map[uint32]IModule //始祖的后裔们 //事件管道 eventHandler event.IEventHandler } -func (m *Module) SetModuleId(moduleId int64) bool{ +func (m *Module) SetModuleId(moduleId uint32) bool{ if m.moduleId > 0 { return false } @@ -63,7 +65,7 @@ func (m *Module) SetModuleId(moduleId int64) bool{ return true } -func (m *Module) GetModuleId() int64{ +func (m *Module) GetModuleId() uint32{ return m.moduleId } @@ -75,7 +77,7 @@ func (m *Module) OnInit() error{ return nil } -func (m *Module) AddModule(module IModule) (int64,error){ +func (m *Module) AddModule(module IModule) (uint32,error){ //没有事件处理器不允许加入其他模块 if m.GetEventProcessor() == nil { return 0,fmt.Errorf("module %+v Event Processor is nil", m.self) @@ -87,7 +89,7 @@ func (m *Module) AddModule(module IModule) (int64,error){ } if m.child == nil { - m.child = map[int64]IModule{} + m.child = map[uint32]IModule{} } _,ok := m.child[module.GetModuleId()] if ok == true { @@ -113,7 +115,7 @@ func (m *Module) AddModule(module IModule) (int64,error){ return module.GetModuleId(),nil } -func (m *Module) ReleaseModule(moduleId int64){ +func (m *Module) ReleaseModule(moduleId uint32){ pModule := m.GetModule(moduleId).getBaseModule().(*Module) //释放子孙 @@ -128,6 +130,10 @@ func (m *Module) ReleaseModule(moduleId int64){ pTimer.Cancel() } + for _,t := range pModule.mapActiveIdTimer { + t.Cancel() + } + delete(m.child,moduleId) delete (m.ancestor.getBaseModule().(*Module).descendants,moduleId) @@ -140,18 +146,32 @@ func (m *Module) ReleaseModule(moduleId int64){ pModule.ancestor = nil pModule.descendants = nil pModule.IRpcHandler = nil + pModule.mapActiveIdTimer = nil } -func (m *Module) NewModuleId() int64{ +func (m *Module) NewModuleId() uint32{ m.ancestor.getBaseModule().(*Module).seedModuleId+=1 return m.ancestor.getBaseModule().(*Module).seedModuleId } +var timerSeedId uint32 +func (m *Module) GenTimerId() uint64{ + for{ + newTimerId := (uint64(m.GetModuleId())<<32)|uint64(atomic.AddUint32(&timerSeedId,1)) + if _,ok := m.mapActiveIdTimer[newTimerId];ok == true { + continue + } + + return newTimerId + } +} + + func (m *Module) GetAncestor()IModule{ return m.ancestor } -func (m *Module) GetModule(moduleId int64) IModule{ +func (m *Module) GetModule(moduleId uint32) IModule{ iModule,ok := m.GetAncestor().getBaseModule().(*Module).descendants[moduleId] if ok == false { return nil @@ -167,40 +187,108 @@ func (m *Module) GetParent()IModule{ return m.parent } -func (m *Module) OnCloseTimer(timer *timer.Timer){ - delete(m.mapActiveTimer,timer) +func (m *Module) OnCloseTimer(t timer.ITimer){ + delete(m.mapActiveIdTimer,t.GetId()) + delete(m.mapActiveTimer,t) } -func (m *Module) OnAddTimer(t *timer.Timer){ +func (m *Module) OnAddTimer(t timer.ITimer){ if t != nil { - m.mapActiveTimer[t] = nil + if m.mapActiveTimer == nil { + m.mapActiveTimer = map[timer.ITimer]struct{}{} + } + + m.mapActiveTimer[t] = struct{}{} } } func (m *Module) AfterFunc(d time.Duration, cb func(*timer.Timer)) *timer.Timer { if m.mapActiveTimer == nil { - m.mapActiveTimer =map[*timer.Timer]interface{}{} + m.mapActiveTimer =map[timer.ITimer]struct{}{} } - return m.dispatcher.AfterFunc(d,cb,m.OnCloseTimer,m.OnAddTimer) + return m.dispatcher.AfterFunc(d,nil,cb,m.OnCloseTimer,m.OnAddTimer) } func (m *Module) CronFunc(cronExpr *timer.CronExpr, cb func(*timer.Cron)) *timer.Cron { if m.mapActiveTimer == nil { - m.mapActiveTimer =map[*timer.Timer]interface{}{} + m.mapActiveTimer =map[timer.ITimer]struct{}{} } - return m.dispatcher.CronFunc(cronExpr,cb,m.OnCloseTimer,m.OnAddTimer) + return m.dispatcher.CronFunc(cronExpr,nil,cb,m.OnCloseTimer,m.OnAddTimer) } func (m *Module) NewTicker(d time.Duration, cb func(*timer.Ticker)) *timer.Ticker { if m.mapActiveTimer == nil { - m.mapActiveTimer =map[*timer.Timer]interface{}{} + m.mapActiveTimer =map[timer.ITimer]struct{}{} } - return m.dispatcher.TickerFunc(d,cb,m.OnCloseTimer,m.OnAddTimer) + return m.dispatcher.TickerFunc(d,nil,cb,m.OnCloseTimer,m.OnAddTimer) } +func (m *Module) cb(*timer.Timer){ + +} + +func (m *Module) SafeAfterFunc(timerId *uint64,d time.Duration, AdditionData interface{},cb func(interface{})) { + if m.mapActiveIdTimer == nil { + m.mapActiveIdTimer = map[uint64]timer.ITimer{} + } + + if *timerId != 0 { + m.CancelTimerId(timerId) + } + + *timerId = m.GenTimerId() + t := m.dispatcher.AfterFunc(d,cb,nil,m.OnCloseTimer,m.OnAddTimer) + t.AdditionData = AdditionData + t.Id = *timerId + m.mapActiveIdTimer[*timerId] = t +} + +func (m *Module) SafeCronFunc(cronId *uint64,cronExpr *timer.CronExpr, AdditionData interface{}, cb func(interface{})) { + if m.mapActiveIdTimer == nil { + m.mapActiveIdTimer = map[uint64]timer.ITimer{} + } + + *cronId = m.GenTimerId() + c := m.dispatcher.CronFunc(cronExpr,cb,nil,m.OnCloseTimer,m.OnAddTimer) + c.AdditionData = AdditionData + c.Id = *cronId + m.mapActiveIdTimer[*cronId] = c +} + +func (m *Module) SafeNewTicker(tickerId *uint64,d time.Duration, AdditionData interface{}, cb func(interface{})) { + if m.mapActiveIdTimer == nil { + m.mapActiveIdTimer = map[uint64]timer.ITimer{} + } + + *tickerId = m.GenTimerId() + t := m.dispatcher.TickerFunc(d,cb,nil,m.OnCloseTimer,m.OnAddTimer) + t.AdditionData = AdditionData + t.Id = *tickerId + m.mapActiveIdTimer[*tickerId] = t +} + +func (m *Module) CancelTimerId(timerId *uint64) bool{ + if m.mapActiveIdTimer == nil { + log.SError("mapActiveIdTimer is nil") + return false + } + + t,ok := m.mapActiveIdTimer[*timerId] + if ok == false { + log.SError("cannot find timer id ",timerId) + return false + } + + t.Cancel() + *timerId = 0 + return true +} + + + func (m *Module) OnRelease(){ } diff --git a/service/service.go b/service/service.go index c907585..31970b1 100644 --- a/service/service.go +++ b/service/service.go @@ -95,7 +95,7 @@ func (s *Service) Init(iService IService,getClientFun rpc.FuncRpcClient,getServe //初始化祖先 s.ancestor = iService.(IModule) s.seedModuleId =InitModuleId - s.descendants = map[int64]IModule{} + s.descendants = map[uint32]IModule{} s.serviceCfg = serviceCfg s.goroutineNum = 1 s.eventProcessor = event.NewEventProcessor() diff --git a/util/timer/heap.go b/util/timer/heap.go index 8b47249..f620cf0 100644 --- a/util/timer/heap.go +++ b/util/timer/heap.go @@ -2,17 +2,16 @@ package timer import ( "container/heap" - "github.com/duanhf2012/origin/log" "sync" "time" ) -func SetupTimer(timer *Timer) *Timer{ - if timer.rOpen == true { +func SetupTimer(timer ITimer) ITimer{ + if timer.IsOpen() == true { return nil } - timer.rOpen = true + timer.Open(true) timerHeapLock.Lock() // 使用锁规避竞争条件 heap.Push(&timerHeap,timer) timerHeapLock.Unlock() @@ -20,9 +19,10 @@ func SetupTimer(timer *Timer) *Timer{ } func NewTimer(d time.Duration) *Timer{ - c := make(chan *Timer,1) - timer := newTimer(d,c,nil,"",nil) + c := make(chan ITimer,1) + timer := newTimer(d,c,nil,"") SetupTimer(timer) + return timer } @@ -31,7 +31,7 @@ func ReleaseTimer(timer *Timer) { } type _TimerHeap struct { - timers []*Timer + timers []ITimer } func (h *_TimerHeap) Len() int { @@ -39,7 +39,7 @@ func (h *_TimerHeap) Len() int { } func (h *_TimerHeap) Less(i, j int) bool { - return h.timers[i].fireTime.Before(h.timers[j].fireTime) + return h.timers[i].GetFireTime().Before(h.timers[j].GetFireTime()) } func (h *_TimerHeap) Swap(i, j int) { @@ -47,7 +47,7 @@ func (h *_TimerHeap) Swap(i, j int) { } func (h *_TimerHeap) Push(x interface{}) { - h.timers = append(h.timers, x.(*Timer)) + h.timers = append(h.timers, x.(ITimer)) } func (h *_TimerHeap) Pop() (ret interface{}) { @@ -63,7 +63,7 @@ var ( ) func StartTimer(minTimerInterval time.Duration,maxTimerNum int){ - timerHeap.timers = make([]*Timer,0,maxTimerNum) + timerHeap.timers = make([]ITimer,0,maxTimerNum) heap.Init(&timerHeap) // 初始化定时器heap go tickRoutine(minTimerInterval) @@ -86,21 +86,17 @@ func tick() bool{ timerHeapLock.Unlock() return false } - nextFireTime := timerHeap.timers[0].fireTime + nextFireTime := timerHeap.timers[0].GetFireTime() if nextFireTime.After(now) { // 没有到时间的定时器,返回 timerHeapLock.Unlock() return false } - t := heap.Pop(&timerHeap).(*Timer) + t := heap.Pop(&timerHeap).(ITimer) timerHeapLock.Unlock() - if len(t.C)>= cap(t.C) { - log.SError("Timer channel full!") + t.Open(false) + t.AppendChannel(t) - return true - } - t.rOpen = false - t.C <- t return true } diff --git a/util/timer/timer.go b/util/timer/timer.go index 87a9612..fb4948d 100644 --- a/util/timer/timer.go +++ b/util/timer/timer.go @@ -9,14 +9,36 @@ import ( "time" ) +// ITimer +type ITimer interface { + GetId() uint64 + Cancel() + GetName()string + IsActive() bool + IsOpen() bool + Open(bOpen bool) + AppendChannel(timer ITimer) + Do() + GetFireTime() time.Time + SetupTimer(now time.Time) error +} + +type OnCloseTimer func(timer ITimer) +type OnAddTimer func(timer ITimer) + // Timer type Timer struct { - name string + Id uint64 cancelled bool //是否关闭 - C chan *Timer //定时器管道 + C chan ITimer //定时器管道 interval time.Duration // 时间间隔(用于循环定时器) fireTime time.Time // 触发时间 - cb func() + cb func(interface{}) + cbEx func(t *Timer) + cbCronEx func(t *Cron) + cbTickerEx func(t *Ticker) + cbOnCloseTimer OnCloseTimer + cronExpr *CronExpr AdditionData interface{} //定时器附加数据 rOpen bool //是否重新打开 @@ -45,17 +67,12 @@ var tickerPool =sync.NewPoolEx(make(chan sync.IPoolData,1000),func() sync.IPoolD return &Ticker{} }) -func newTimer(d time.Duration,c chan *Timer,cb func(),name string,additionData interface{}) *Timer{ - if c == nil { - return nil - } - +func newTimer(d time.Duration,c chan ITimer,cb func(interface{}),additionData interface{}) *Timer{ timer := timerPool.Get().(*Timer) timer.AdditionData = additionData timer.C = c timer.fireTime = Now().Add(d) timer.cb = cb - timer.name = name timer.interval = d timer.rOpen = false return timer @@ -85,20 +102,59 @@ func releaseCron(cron *Cron) { // one dispatcher per goroutine (goroutine not safe) type Dispatcher struct { - ChanTimer chan *Timer + ChanTimer chan ITimer +} + +func (t *Timer) GetId() uint64{ + return t.Id +} + +func (t *Timer) GetFireTime() time.Time{ + return t.fireTime +} + +func (t *Timer) Open(bOpen bool){ + t.rOpen = bOpen +} + +func (t *Timer) AppendChannel(timer ITimer){ + t.C <- timer +} + +func (t *Timer) IsOpen() bool{ + return t.rOpen } func (t *Timer) Do(){ + defer func() { + if r := recover(); r != nil { + buf := make([]byte, 4096) + l := runtime.Stack(buf, false) + errString := fmt.Sprint(r) + log.SError("core dump info[",errString,"]\n",string(buf[:l])) + } + }() + + if t.IsActive() == false { + if t.cbOnCloseTimer!=nil { + t.cbOnCloseTimer(t) + } + + releaseTimer(t) + return + } + if t.cb != nil { - defer func() { - if r := recover(); r != nil { - buf := make([]byte, 4096) - l := runtime.Stack(buf, false) - errString := fmt.Sprint(r) - log.SError("core dump info[",errString,"]\n",string(buf[:l])) - } - }() - t.cb() + t.cb(t.AdditionData) + }else if t.cbEx != nil { + t.cbEx(t) + } + + if t.rOpen ==false { + if t.cbOnCloseTimer!=nil { + t.cbOnCloseTimer(t) + } + releaseTimer(t) } } @@ -124,17 +180,18 @@ func (t *Timer) IsActive() bool { } func (t *Timer) GetName() string{ - return t.name + if t.cb!=nil { + return runtime.FuncForPC(reflect.ValueOf(t.cb).Pointer()).Name() + }else if t.cbEx!=nil { + return runtime.FuncForPC(reflect.ValueOf(t.cbEx).Pointer()).Name() + } + + return "" } +var emptyTimer Timer func (t *Timer) Reset(){ - t.name = "" - t.cancelled = false - t.C = nil - t.interval = 0 - t.cb = nil - t.AdditionData = nil - t.rOpen = false + *t = emptyTimer } func (t *Timer) IsRef()bool{ @@ -153,6 +210,50 @@ func (c *Cron) Reset(){ c.Timer.Reset() } +func (c *Cron) Do() { + defer func() { + if r := recover(); r != nil { + buf := make([]byte, 4096) + l := runtime.Stack(buf, false) + errString := fmt.Sprint(r) + log.SError("core dump info[",errString,"]\n",string(buf[:l])) + } + }() + + if c.IsActive() == false{ + if c.cbOnCloseTimer != nil { + c.cbOnCloseTimer(c) + } + releaseCron(c) + return + } + + now := Now() + nextTime := c.cronExpr.Next(now) + if nextTime.IsZero() { + c.cbCronEx(c) + return + } + + if c.cb!=nil { + c.cb(c.AdditionData) + }else if c.cbEx !=nil { + c.cbCronEx(c) + } + + if c.IsActive() == true{ + c.interval = nextTime.Sub(now) + c.fireTime = now.Add(c.interval) + SetupTimer(c) + }else{ + if c.cbOnCloseTimer!=nil { + c.cbOnCloseTimer(c) + } + releaseCron(c) + return + } +} + func (c *Cron) IsRef()bool{ return c.ref } @@ -165,6 +266,42 @@ func (c *Cron) UnRef(){ c.ref = false } +func (c *Ticker) Do() { + defer func() { + if r := recover(); r != nil { + buf := make([]byte, 4096) + l := runtime.Stack(buf, false) + errString := fmt.Sprint(r) + log.SError("core dump info[", errString, "]\n", string(buf[:l])) + } + }() + + if c.IsActive() == false { + if c.cbOnCloseTimer!=nil { + c.cbOnCloseTimer(c) + } + + releaseTicker(c) + return + } + + if c.cb!=nil{ + c.cb(c.AdditionData) + } else if c.cbTickerEx != nil{ + c.cbTickerEx(c) + } + + if c.IsActive() == true{ + c.fireTime = Now().Add(c.interval) + SetupTimer(c) + }else{ + if c.cbOnCloseTimer!=nil { + c.cbOnCloseTimer(c) + } + releaseTicker(c) + } +} + func (c *Ticker) Reset(){ c.Timer.Reset() } @@ -183,99 +320,55 @@ func (c *Ticker) UnRef(){ func NewDispatcher(l int) *Dispatcher { dispatcher := new(Dispatcher) - dispatcher.ChanTimer = make(chan *Timer, l) + dispatcher.ChanTimer = make(chan ITimer, l) return dispatcher } -type OnTimerClose func(timer *Timer) -func (dispatcher *Dispatcher) AfterFunc(d time.Duration, cb func(*Timer),onTimerClose OnTimerClose,onAddTimer func(timer *Timer)) *Timer { - funName := runtime.FuncForPC(reflect.ValueOf(cb).Pointer()).Name() - timer := newTimer(d,dispatcher.ChanTimer,nil,funName,nil) - cbFunc := func() { - if timer.IsActive() == false { - onTimerClose(timer) - releaseTimer(timer) - return - } +func (dispatcher *Dispatcher) AfterFunc(d time.Duration, cb func(data interface{}),cbEx func(*Timer),onTimerClose OnCloseTimer,onAddTimer OnAddTimer) *Timer { + timer := newTimer(d,dispatcher.ChanTimer,nil,nil) + timer.cb = cb + timer.cbEx = cbEx + timer.cbOnCloseTimer = onTimerClose - cb(timer) - - if timer.rOpen ==false { - onTimerClose(timer) - releaseTimer(timer) - } + t := SetupTimer(timer) + if onAddTimer!= nil && t!=nil { + onAddTimer(t) } - timer.cb = cbFunc - t := SetupTimer(timer) - onAddTimer(t) - - return t + return timer } -func (dispatcher *Dispatcher) CronFunc(cronExpr *CronExpr, cb func(*Cron),onTimerClose OnTimerClose,onAddTimer func(timer *Timer)) *Cron { +func (dispatcher *Dispatcher) CronFunc(cronExpr *CronExpr,cb func(data interface{}), cbEx func(*Cron),onTimerClose OnCloseTimer,onAddTimer OnAddTimer) *Cron { now := Now() nextTime := cronExpr.Next(now) if nextTime.IsZero() { return nil } - funcName := runtime.FuncForPC(reflect.ValueOf(cb).Pointer()).Name() cron := newCron() - // callback - var cbFunc func() - cbFunc = func() { - if cron.IsActive() == false{ - onTimerClose(&cron.Timer) - releaseCron(cron) - return - } - - now := Now() - nextTime := cronExpr.Next(now) - if nextTime.IsZero() { - cb(cron) - return - } - - cron.interval = nextTime.Sub(now) - cron.fireTime = now.Add(cron.interval) - SetupTimer(&cron.Timer) - cb(cron) - } + cron.cb = cb + cron.cbCronEx = cbEx + cron.cbOnCloseTimer = onTimerClose + cron.cronExpr = cronExpr cron.C = dispatcher.ChanTimer - cron.cb = cbFunc - cron.name = funcName cron.interval = nextTime.Sub(now) cron.fireTime = Now().Add(cron.interval) - SetupTimer(&cron.Timer) - onAddTimer(&cron.Timer) + SetupTimer(cron) + onAddTimer(cron) return cron } -func (dispatcher *Dispatcher) TickerFunc(d time.Duration, cb func(*Ticker),onTimerClose OnTimerClose,onAddTimer func(timer *Timer)) *Ticker { - funcName := runtime.FuncForPC(reflect.ValueOf(cb).Pointer()).Name() +func (dispatcher *Dispatcher) TickerFunc(d time.Duration,cb func(data interface{}), cbEx func(*Ticker),onTimerClose OnCloseTimer,onAddTimer OnAddTimer) *Ticker { ticker := newTicker() - cbFunc := func() { - cb(ticker) - if ticker.Timer.IsActive() == true{ - ticker.fireTime = Now().Add(d) - SetupTimer(&ticker.Timer) - }else{ - onTimerClose(&ticker.Timer) - releaseTicker(ticker) - } - } - ticker.C = dispatcher.ChanTimer ticker.fireTime = Now().Add(d) - ticker.cb = cbFunc - ticker.name = funcName + ticker.cb = cb + ticker.cbTickerEx = cbEx ticker.interval = d // callback - SetupTimer(&ticker.Timer) - onAddTimer(&ticker.Timer) + SetupTimer(ticker) + onAddTimer(ticker) return ticker }