From 8e331e155bbedff2083e0e53ca1c3858be726393 Mon Sep 17 00:00:00 2001 From: boyce Date: Tue, 15 Dec 2020 17:25:10 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=AE=9A=E6=97=B6=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- node/node.go | 2 + service/module.go | 17 +- service/service.go | 19 +- util/timer/heap.go | 95 +++++++ util/timer/timer.go | 268 ++++++++++---------- util/timewheel/timewheel.go | 422 ------------------------------- util/timewheel/timewheel_test.go | 31 --- 7 files changed, 240 insertions(+), 614 deletions(-) create mode 100644 util/timer/heap.go delete mode 100644 util/timewheel/timewheel.go delete mode 100644 util/timewheel/timewheel_test.go diff --git a/node/node.go b/node/node.go index 467e9e3..45b2f59 100644 --- a/node/node.go +++ b/node/node.go @@ -7,6 +7,7 @@ import ( "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/profiler" "github.com/duanhf2012/origin/service" + "github.com/duanhf2012/origin/util/timer" "io/ioutil" "net/http" _ "net/http/pprof" @@ -185,6 +186,7 @@ func startNode(args interface{}) error{ return fmt.Errorf("invalid option %s",param) } + timer.StartTimer(10*time.Millisecond,100000) log.Release("Start running server.") //2.初始化node initNode(nodeId) diff --git a/service/module.go b/service/module.go index 96e04ae..9882845 100644 --- a/service/module.go +++ b/service/module.go @@ -5,7 +5,6 @@ import ( "github.com/duanhf2012/origin/event" "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/util/timer" - "github.com/duanhf2012/origin/util/timewheel" "reflect" "time" ) @@ -37,7 +36,7 @@ type Module struct { parent IModule //父亲 self IModule //自己 child map[int64]IModule //孩子们 - mapActiveTimer map[*timewheel.Timer]interface{} + mapActiveTimer map[*timer.Timer]interface{} dispatcher *timer.Dispatcher //timer //根结点 @@ -119,7 +118,7 @@ func (m *Module) ReleaseModule(moduleId int64){ pModule.self.OnRelease() log.Debug("Release module %s.", m.GetModuleName()) for pTimer,_ := range pModule.mapActiveTimer { - pTimer.AdditionData.(timer.ITime).Close() + pTimer.Cancel() } delete(m.child,moduleId) @@ -160,11 +159,13 @@ func (m *Module) GetParent()IModule{ return m.parent } -func (m *Module) OnCloseTimer(timer *timewheel.Timer){ +func (m *Module) OnCloseTimer(timer *timer.Timer){ + fmt.Printf("OnCloseTimer %p\n",timer) delete(m.mapActiveTimer,timer) } -func (m *Module) OnAddTimer(t *timewheel.Timer){ +func (m *Module) OnAddTimer(t *timer.Timer){ + fmt.Printf("OnAddTimer %p\n",t) if t != nil { m.mapActiveTimer[t] = nil } @@ -172,7 +173,7 @@ func (m *Module) OnAddTimer(t *timewheel.Timer){ func (m *Module) AfterFunc(d time.Duration, cb func()) *timer.Timer { if m.mapActiveTimer == nil { - m.mapActiveTimer =map[*timewheel.Timer]interface{}{} + m.mapActiveTimer =map[*timer.Timer]interface{}{} } return m.dispatcher.AfterFunc(d,cb,m.OnCloseTimer,m.OnAddTimer) @@ -180,7 +181,7 @@ func (m *Module) AfterFunc(d time.Duration, cb func()) *timer.Timer { func (m *Module) CronFunc(cronExpr *timer.CronExpr, cb func(*timer.Cron)) *timer.Cron { if m.mapActiveTimer == nil { - m.mapActiveTimer =map[*timewheel.Timer]interface{}{} + m.mapActiveTimer =map[*timer.Timer]interface{}{} } return m.dispatcher.CronFunc(cronExpr,cb,m.OnCloseTimer,m.OnAddTimer) @@ -188,7 +189,7 @@ func (m *Module) CronFunc(cronExpr *timer.CronExpr, cb func(*timer.Cron)) *timer func (m *Module) NewTicker(d time.Duration, cb func(*timer.Ticker)) *timer.Ticker { if m.mapActiveTimer == nil { - m.mapActiveTimer =map[*timewheel.Timer]interface{}{} + m.mapActiveTimer =map[*timer.Timer]interface{}{} } return m.dispatcher.TickerFunc(d,cb,m.OnCloseTimer,m.OnAddTimer) diff --git a/service/service.go b/service/service.go index c2365f2..2fd98ab 100644 --- a/service/service.go +++ b/service/service.go @@ -7,7 +7,6 @@ import ( "github.com/duanhf2012/origin/profiler" "github.com/duanhf2012/origin/rpc" "github.com/duanhf2012/origin/util/timer" - "github.com/duanhf2012/origin/util/timewheel" "reflect" "runtime" "sync" @@ -136,17 +135,13 @@ func (s *Service) Run() { analyzer = nil } case t := <- s.dispatcher.ChanTimer: - if t.IsClose() == false { - time := t.AdditionData.(timer.ITime) - if s.profiler != nil { - analyzer = s.profiler.Push("[timer]"+time.GetName()) - } - time.Do() - if analyzer != nil { - analyzer.Pop() - analyzer = nil - } - timewheel.ReleaseTimer(t) + if s.profiler != nil { + analyzer = s.profiler.Push("[timer]"+t.GetName()) + } + t.Do() + if analyzer != nil { + analyzer.Pop() + analyzer = nil } } diff --git a/util/timer/heap.go b/util/timer/heap.go new file mode 100644 index 0000000..f315762 --- /dev/null +++ b/util/timer/heap.go @@ -0,0 +1,95 @@ +package timer + +import ( + "container/heap" + "github.com/duanhf2012/origin/log" + "sync" + "time" +) + +func SetupTimer(timer *Timer) *Timer{ + timerHeapLock.Lock() // 使用锁规避竞争条件 + heap.Push(&timerHeap,timer) + timerHeapLock.Unlock() + return timer +} + +type _TimerHeap struct { + timers []*Timer +} + +func (h *_TimerHeap) Len() int { + return len(h.timers) +} + +func (h *_TimerHeap) Less(i, j int) bool { + return h.timers[i].fireTime.Before(h.timers[j].fireTime) +} + +func (h *_TimerHeap) Swap(i, j int) { + h.timers[i],h.timers[j] = h.timers[j],h.timers[i] +} + +func (h *_TimerHeap) Push(x interface{}) { + h.timers = append(h.timers, x.(*Timer)) +} + +func (h *_TimerHeap) Pop() (ret interface{}) { + l := len(h.timers) + h.timers, ret = h.timers[:l-1], h.timers[l-1] + return +} + +var ( + timerHeap _TimerHeap // 定时器heap对象 + timerHeapLock sync.Mutex // 一个全局的锁 + timeOffset time.Duration +) + +func StartTimer(minTimerInterval time.Duration,maxTimerNum int){ + timerHeap.timers = make([]*Timer,0,maxTimerNum) + heap.Init(&timerHeap) // 初始化定时器heap + + go tickRoutine(minTimerInterval) +} + +func tickRoutine(minTimerInterval time.Duration){ + for{ + time.Sleep(minTimerInterval) + tick() + } +} + +func tick(){ + now := Now() + timerHeapLock.Lock() + if timerHeap.Len() <= 0 { // 没有任何定时器,立刻返回 + timerHeapLock.Unlock() + return + } + nextFireTime := timerHeap.timers[0].fireTime + if nextFireTime.After(now) { // 没有到时间的定时器,返回 + timerHeapLock.Unlock() + return + } + + t := heap.Pop(&timerHeap).(*Timer) + timerHeapLock.Unlock() + if len(t.C)>= cap(t.C) { + log.Error("Timer channel full!") + + return + } + + t.C <- t +} + +func Now() time.Time{ + if timeOffset == 0 { + return time.Now() + } + + return time.Now().Add(timeOffset) +} + + diff --git a/util/timer/timer.go b/util/timer/timer.go index ad5a8df..60c947e 100644 --- a/util/timer/timer.go +++ b/util/timer/timer.go @@ -1,13 +1,34 @@ package timer import ( - "github.com/duanhf2012/origin/util/timewheel" + "fmt" "reflect" "runtime" "sync" "time" ) +// Timer +type Timer struct { + name string + cancelled bool //是否关闭 + C chan *Timer //定时器管道 + interval time.Duration // 时间间隔(用于循环定时器) + fireTime time.Time // 触发时间 + cb func() + AdditionData interface{} //定时器附加数据 +} + +// Ticker +type Ticker struct { + Timer +} + +// Cron +type Cron struct { + Timer +} + var timerPool = sync.Pool{New: func() interface{}{ return &Timer{} }} @@ -20,198 +41,163 @@ var tickerPool = sync.Pool{New: func() interface{}{ return &Ticker{} }} -// one dispatcher per goroutine (goroutine not safe) -type Dispatcher struct { - ChanTimer chan *timewheel.Timer -} +func newTimer(d time.Duration,c chan *Timer,cb func(),name string,additionData interface{}) *Timer{ + if c == nil { + return nil + } -func NewDispatcher(l int) *Dispatcher { - disp := new(Dispatcher) - disp.ChanTimer = make(chan *timewheel.Timer, l) - return disp -} - -type ITime interface { - Close () - Do() - GetName() string -} - -// Timer -type Timer struct { - t *timewheel.Timer - cb func() - name string - onClose func(timer *timewheel.Timer) -} - -// Cron -type Cron struct { - Timer -} - -// Ticker -type Ticker struct { - Timer -} - -func NewTimer(t *timewheel.Timer,cb func(),name string,onClose func(timer *timewheel.Timer)) *Timer { timer := timerPool.Get().(*Timer) - timer.t = t + timer.AdditionData = additionData + timer.C = c + timer.fireTime = Now().Add(d) timer.cb = cb - timer.onClose = onClose timer.name = name + timer.interval = d return timer } -func ReleaseTimer(timer *Timer) { +func releaseTimer(timer *Timer) { timerPool.Put(timer) } -func (t *Timer) Close(){ - if t.t!=nil { - t.t.Close() - } - if t.onClose!=nil { - t.onClose(t.t) - } - ReleaseTimer(t) +func newTicker() *Ticker { + ticker := tickerPool.Get().(*Ticker) + return ticker +} + +func releaseTicker(ticker *Ticker) { + tickerPool.Put(ticker) +} + +func newCron() *Cron { + cron := cronPool.Get().(*Cron) + return cron +} + +func releaseCron(cron *Cron) { + cronPool.Put(cron) +} + +// one dispatcher per goroutine (goroutine not safe) +type Dispatcher struct { + ChanTimer chan *Timer } func (t *Timer) Do(){ - t.Close() - t.cb() + if t.cb != nil { + t.cb() + } +} + +func (t *Timer) GetInterval() time.Duration{ + return t.interval +} + +func (t *Timer) Cancel() { + t.cancelled = true +} + +// 判断定时器是否已经取消 +func (t *Timer) IsActive() bool { + return !t.cancelled } func (t *Timer) GetName() string{ return t.name } -func NewCron(t *timewheel.Timer,cb func(),name string,onClose func(timer *timewheel.Timer)) *Cron { - cron := cronPool.Get().(*Cron) - cron.t = t - cron.cb = cb - cron.onClose = onClose - cron.name = name - return cron +func NewDispatcher(l int) *Dispatcher { + disp := new(Dispatcher) + disp.ChanTimer = make(chan *Timer, l) + return disp } -func ReleaseCron(cron *Cron) { - cronPool.Put(cron) -} - -func (c *Cron) Do(){ - if c.onClose!=nil { - c.onClose(c.t) - } - - c.cb() -} - -func (c *Cron) Close() { - if c.t != nil { - c.t.Close() - } - - if c.onClose!=nil { - c.onClose(c.t) - } - - ReleaseCron(c) -} - -func NewTicker(t *timewheel.Timer,cb func(),name string,onClose func(timer *timewheel.Timer)) *Ticker { - ticker := tickerPool.Get().(*Ticker) - ticker.t = t - ticker.cb = cb - ticker.onClose = onClose - ticker.name = name - - return ticker -} - -func ReleaseTicker(ticker *Ticker) { - tickerPool.Put(ticker) -} - -func (tk *Ticker) Do(){ - //通知当前timer删除 - if tk.onClose!=nil { - tk.onClose(tk.t) - } - tk.cb() -} - -func (tk *Ticker) Close() { - if tk.t != nil { - tk.t.Close() - } - - if tk.onClose!=nil { - tk.onClose(tk.t) - } - - ReleaseTicker(tk) -} - -func (disp *Dispatcher) AfterFunc(d time.Duration, cb func(),onCloseTimer func(timer *timewheel.Timer),onAddTimer func(timer *timewheel.Timer)) *Timer { +type OnTimerClose func(timer *Timer) +func (disp *Dispatcher) AfterFunc(d time.Duration, cb func(),onTimerClose OnTimerClose,onAddTimer func(timer *Timer)) *Timer { funName := runtime.FuncForPC(reflect.ValueOf(cb).Pointer()).Name() - t := NewTimer(nil,cb,funName,onCloseTimer) - t.t = timewheel.NewTimerEx(d,disp.ChanTimer,t) - onAddTimer(t.t) + timer := newTimer(d,disp.ChanTimer,cb,funName,nil) + cbFunc := func() { + onTimerClose(timer) + releaseTimer(timer) + if timer.IsActive() == false { + return + } + + cb() + } + + timer.cb = cbFunc + t := SetupTimer(timer) + onAddTimer(t) return t } -func (disp *Dispatcher) CronFunc(cronExpr *CronExpr, cb func(*Cron),onCloseTimer func(timer *timewheel.Timer),onAddTimer func(timer *timewheel.Timer)) *Cron { - now := time.Now() +func (disp *Dispatcher) CronFunc(cronExpr *CronExpr, cb func(*Cron),onTimerClose OnTimerClose,onAddTimer func(timer *Timer)) *Cron { + now := Now() nextTime := cronExpr.Next(now) if nextTime.IsZero() { return nil } funcName := runtime.FuncForPC(reflect.ValueOf(cb).Pointer()).Name() - cron := NewCron(nil,nil,funcName,onCloseTimer) + cron := newCron() // callback var cbFunc func() cbFunc = func() { - now := time.Now() + if cron.IsActive() == false{ + onTimerClose(&cron.Timer) + releaseCron(cron) + return + } + + now := Now() nextTime := cronExpr.Next(now) if nextTime.IsZero() { cb(cron) return } - interval := nextTime.Sub(now) - minTimeInterval := time.Millisecond*time.Duration(timewheel.GRANULARITY) - if interval < minTimeInterval { - interval = minTimeInterval - } - - cron.t = timewheel.NewTimerEx(interval,disp.ChanTimer,cron) - onAddTimer(cron.t) + cron.interval = nextTime.Sub(now) + cron.fireTime = now.Add(cron.interval) + SetupTimer(&cron.Timer) cb(cron) } + cron.C = disp.ChanTimer cron.cb = cbFunc - cron.t = timewheel.NewTimerEx(nextTime.Sub(now),disp.ChanTimer,cron) - onAddTimer(cron.t) + cron.name = funcName + cron.interval = nextTime.Sub(now) + cron.fireTime = Now().Add(cron.interval) + fmt.Println(cron.interval.Milliseconds(),"\n") + SetupTimer(&cron.Timer) + onAddTimer(&cron.Timer) return cron } -func (disp *Dispatcher) TickerFunc(d time.Duration, cb func(*Ticker),onCloseTimer func(timer *timewheel.Timer),onAddTimer func(timer *timewheel.Timer)) *Ticker { +func (disp *Dispatcher) TickerFunc(d time.Duration, cb func(*Ticker),onTimerClose OnTimerClose,onAddTimer func(timer *Timer)) *Ticker { funcName := runtime.FuncForPC(reflect.ValueOf(cb).Pointer()).Name() - ticker := NewTicker(nil,nil,funcName,onCloseTimer) - // callback - var cbFunc func() - cbFunc = func() { - ticker.t = timewheel.NewTimerEx(d,disp.ChanTimer,ticker) - onAddTimer(ticker.t) + ticker := newTicker() + cbFunc := func() { cb(ticker) + if ticker.Timer.IsActive() == true{ + ticker.fireTime = Now().Add(d) + SetupTimer(&ticker.Timer) + }else{ + onTimerClose(&ticker.Timer) + releaseTicker(ticker) + } } + ticker.C = disp.ChanTimer + ticker.fireTime = Now().Add(d) ticker.cb = cbFunc - ticker.t = timewheel.NewTimerEx(d,disp.ChanTimer,ticker) - onAddTimer(ticker.t) + ticker.name = funcName + ticker.interval = d + + // callback + SetupTimer(&ticker.Timer) + onAddTimer(&ticker.Timer) + return ticker } diff --git a/util/timewheel/timewheel.go b/util/timewheel/timewheel.go deleted file mode 100644 index d30de7d..0000000 --- a/util/timewheel/timewheel.go +++ /dev/null @@ -1,422 +0,0 @@ -package timewheel - -import ( - "sync" - "sync/atomic" - "time" -) - -//分别用位代表每个轮存在的轮子数(2^6,2^6,...2^8) -//-------------------------- -//| 6 | 6 | 6 | 6 | 8 | -//-------------------------- -//根据游戏定时器,将第一轮控制在12位,即40960ms以内。 -var GRANULARITY int64 = 10 //定时器的最小单位,每格10ms -//var wheelBitSize =[]int{8,6,6,6,6} //32bit定时器 -var wheelBitSize =[]int{12,7,6,5,2} //32bit定时器 - -type wheelInfo struct { - slotNum int //轮子slot数量 - threshold int64 //轮子最大表示数字范围,如果是第一个轮子2^8,第二个轮子是2^6... -} - -var tWheel *timeWheel //时间实例化对象指针 -var chanStartTimer chan *Timer //开始定时器Channel -var chanStopTimer chan *Timer //停止定时器Channel -const chanTimerLen int = 40960 //Channel -var timerPool = sync.Pool{New: func() interface{}{ - return &Timer{} -}} - - -//构造时间轮对象与相关初始化 -func init(){ - tWheel = newTimeWheel() - chanStartTimer = make(chan *Timer,chanTimerLen) - chanStopTimer = make(chan *Timer,chanTimerLen) - - go timerRunning() -} - -//定时器运行与驱动 -func timerRunning(){ - t := time.NewTicker(time.Millisecond*10) - for { - /* - if test == true { - testTimerRunning() - }*/ - - select{ - case startTimer:=<-chanStartTimer: - tWheel.addTimer(startTimer) - case stopTimer:=<-chanStopTimer: - tWheel.delTimer(stopTimer) - case <-t.C: - tWheel.Tick() - } - } -} -/* -var test bool = false -func testTimerRunning(){ - for { - select { - case startTimer := <-chanStartTimer: - tWheel.addTimer(startTimer) - case stopTimer := <-chanStopTimer: - tWheel.delTimer(stopTimer) - default: - tWheel.TickOneFrame() - } - } -} -*/ - - - -func NewTimerEx(d time.Duration,c chan *Timer,additionData interface{}) *Timer{ - if c == nil { - c = make(chan *Timer, 1) - } - timer := tWheel.newTimer(d.Milliseconds()/GRANULARITY,additionData,c) - chanStartTimer<-timer - return timer -} - -func NewTimer(d time.Duration) *Timer{ - timer := tWheel.newTimer(d.Milliseconds()/GRANULARITY,nil,make(chan *Timer, 1)) - chanStartTimer<-timer - return timer -} - -//链表结点 -type stNode struct { - prev *Timer - next *Timer -} - -//定时器结构体 -type Timer struct { - stNode - expireTicks int64 //到期滴答数 - end int32 //是否已经关闭0表示开启状态,1表示关闭 - bClose bool //是否关闭 - C chan *Timer //定时器管道 - AdditionData interface{} //定时器附加数据 -} - -//停止停时器 -func (timer *Timer) Close(){ - timer.bClose = true - if timer.bClose == true { - return - } - - //将关闭标志设为1关闭状态 - if atomic.SwapInt32(&timer.end,1) == 0 { - chanStopTimer<-timer - } -} - -//定时器是否已经停止 -func (timer *Timer) IsClose() bool { - return timer.bClose -} - -func (timer *Timer) IsEnd() bool{ - return atomic.LoadInt32(&timer.end) !=0 -} - -func (timer *Timer) doTimeout(){ - if atomic.SwapInt32(&timer.end,1) != 0 { - return - } - timer.prev = nil - timer.next = nil - select { - case timer.C <- timer: - } -} - -//每个时间轮上的刻度 -type slots struct { - timer *Timer //定时器链表头 - restTicks int64 //当前刻度走完一圈剩余时间ticks -} - -//时间轮子 -type stWheel struct { - slots []*slots //刻度切片 - slotIndex int //当前指针所在的位置索引 -} - -//获取当前轮的总刻度数 -func (stw *stWheel) slotSize() int{ - return len(stw.slots) -} - -//添加定时器到slots上 -func (s *slots) addTimer(timer *Timer){ - timer.next = s.timer - timer.prev = s.timer.prev - s.timer.prev.next = timer - s.timer.prev = timer -} - -//当前slots上是否没有定时器 -func (s *slots) isEmpty() bool{ - return s.timer == s.timer.next -} - -func (s *slots) makeEmpty() { - s.timer.next = s.timer - s.timer.prev = s.timer -} - - -//时间轮结构体 -type timeWheel struct { - wheels []*stWheel //所有的轮子的切片 - wheelInfos []*wheelInfo //所有轮子的信息,预计算存储 - wheelSize int //轮子数 - - currentTime int64 //当前已经走进的的自然时间 - currentTicks int64 //当前已经走进的ticks数 -} - -//构建时间轮对象 -func newTimeWheel() *timeWheel{ - tWheel := &timeWheel{} - tWheel.set(wheelBitSize) - tWheel.currentTime = GetNow() - return tWheel -} - -//设置n位定时器 -func (t *timeWheel) set(wheelBitSize []int){ - t.wheelSize = len(wheelBitSize) - t.wheelInfos = make([]*wheelInfo,len(wheelBitSize)) - t.wheels = make([]*stWheel,len(wheelBitSize)) - totalBitSize := 0 - - for idx,bitSize := range wheelBitSize { - totalBitSize += bitSize - //1.轮子信息 - t.wheelInfos[idx] = &wheelInfo{} - t.wheelInfos[idx].slotNum = 1 << bitSize - t.wheelInfos[idx].threshold = 1<< totalBitSize - - //2.make轮子里面的slot - t.wheels[idx] = &stWheel{} - t.wheels[idx].slots = make([]*slots,t.wheelInfos[idx].slotNum) - for slotIdx,_ := range t.wheels[idx].slots { - t.wheels[idx].slots[slotIdx] = t.newSlot(t.wheels[idx].slots[slotIdx]) - - //计算当前slot走完剩余的ticks数,以下turns有个特殊处理 - //第一个轮子idx==0时每个slot的刻度代表1,从第二个轮子开始即idx>0根据 - //t.wheelInfos[idx-1].threshold获得每个刻度值 - //turns代表由于前一轮已经走一圈了,那么当前slotIdex应该加1,即:slotIdx+turns - var perSlotTicks int64 = 1 - turns := 0 - if idx>0 { - perSlotTicks = t.wheelInfos[idx-1].threshold - turns = 1 - } - s := ((1 << bitSize) - (slotIdx+turns))*int(perSlotTicks) - t.wheels[idx].slots[slotIdx].restTicks = int64(s) - } - } -} - -//构建一个slot(轮子上的刻度) -func (t *timeWheel) newSlot(slot *slots) *slots{ - //如果是不存在的slot申请内存 - if slot == nil { - slot = &slots{} - timer := &Timer{} - slot.timer = timer - } - - //构建双向循环链表 - slot.timer.next = slot.timer - slot.timer.prev = slot.timer - - return slot -} - - -//获取当前时间戳ms -func GetNow() int64 { - return time.Now().UnixNano()/int64(time.Millisecond) -} - -//创建定时器 ticks表示多少个ticks单位到期, additionData定时器附带数据, c到时通知的channel -func (t *timeWheel) newTimer(ticks int64,additionData interface{},c chan *Timer) *Timer{ - pTimer := timerPool.Get().(*Timer) - pTimer.end = 0 - pTimer.bClose = false - pTimer.C = c - pTimer.AdditionData = additionData - - if ticks<=0 { - ticks = 1 - } - pTimer.expireTicks = ticks+t.currentTicks - return pTimer -} - -func ReleaseTimer(timer *Timer) { - timerPool.Put(timer) -} - -//添加定时器 -func (t *timeWheel) addTimer(timer *Timer) *Timer { - //1.计算到期时间ticks - ticks := timer.expireTicks - t.currentTicks - if ticks<=0 { - timer.doTimeout() - return timer - } - //2.for遍历通过ticks找到适合的轮子插入,从底轮子往高找 - var slot *slots - for wheelIndex,info := range t.wheelInfos { - if ticks < info.threshold { - var restTicks int64 - var slotTicks int64 - var slotIndex int - //如果不是第0个轮子 - if wheelIndex != 0 { - //计算前面所有的轮子剩余ticks数总和(即:当前轮子还有多少个ticks会移动到下一个) - restTicks = t.getWheelSum(wheelIndex) - //当前轮子每个刻度的ticks数 - slotTicks = t.wheelInfos[wheelIndex-1].threshold - //计算当前落到哪个slotIndex中 - slotIndex = (t.wheels[wheelIndex].slotIndex+int((ticks-restTicks)/slotTicks)) % t.wheelInfos[wheelIndex].slotNum - }else{ - slotIndex = (t.wheels[wheelIndex].slotIndex + int(ticks))%t.wheelInfos[wheelIndex].slotNum - } - //取得slot对象指针 - slot = t.wheels[wheelIndex].slots[slotIndex] - break - } - } - - //3.如果都找不到失败 - if slot == nil { - panic("cannot find slot!") - //return nil - } - - //4.添加定时器timer到链表 - slot.addTimer(timer) - return timer -} - -//删除定时器 -func (t *timeWheel) delTimer(timer *Timer) { - if timer.next == nil { - return - } - timer.prev.next = timer.next - timer.next.prev = timer.prev - ReleaseTimer(timer) -} - -//按照自然时间走动时间差计算loop,并且进行Tick -func (t *timeWheel) Tick(){ - nowTime := GetNow() - - loop := (nowTime - t.currentTime)/int64(GRANULARITY) - if loop> 0 { - t.currentTime = nowTime - } - - for i:=int64(0);i= t.wheels[0].slotSize() { - t.wheels[0].slotIndex = 0 - t.cascade(1) - } -} - -//获得当前轮子的slots -func (t *timeWheel) getCurrentSlot(wheelIndex int) *slots{ - return t.wheels[wheelIndex].slots[t.wheels[wheelIndex].slotIndex] -} - -//获取当前轮wheelIndex转动所需要的ticks数量 -func (t *timeWheel) getWheelSum(wheelIndex int) int64{ - var ticks int64 - //遍历前面n个轮子 - for i:=0;i0的轮子) -func (t *timeWheel) cascade(wheelIndex int) { - if wheelIndex<1 || wheelIndex>=t.wheelSize { - return - } - - //1.取得对应的轮子上的slot - wheel := t.wheels[wheelIndex] - slot := wheel.slots[wheel.slotIndex] - - //2.将当前的slot遍历并重新加入 - currentTimer := slot.timer.next - for ;currentTimer!=slot.timer; { - //先保存一个定时器指针,预防链表迭代失效问题 - nextTimer:=currentTimer.next - //如果到时,直接送到channel - if currentTimer.expireTicks<= t.currentTicks { - if currentTimer.IsEnd() == false { - currentTimer.doTimeout() - } - }else{//否则重新添加,会加到下一级轮中 - t.addTimer(currentTimer) - } - currentTimer = nextTimer - } - //3.将当前轮清空 - wheel.slots[wheel.slotIndex].makeEmpty() - - //4.如果当前轮子跳过一轮,需要跳动到下一时间轮 - wheel.slotIndex++ - if wheel.slotIndex>=wheel.slotSize() { - wheel.slotIndex = 0 - t.cascade(wheelIndex+1) - } -} diff --git a/util/timewheel/timewheel_test.go b/util/timewheel/timewheel_test.go deleted file mode 100644 index b15db7a..0000000 --- a/util/timewheel/timewheel_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package timewheel - -import ( - "testing" - "time" - "fmt" -) - -func Test_Example(t *testing.T) { - timer:=NewTimer(time.Second*2) - select { - case <- timer.C: - fmt.Println("It is time out!") - } - - timer2 := NewTimerEx(time.Second*2,nil,1) - select { - case t:=<- timer2.C: - fmt.Println("It is time out!",t.AdditionData.(int)) - } - - timer3 := NewTimerEx(time.Second*2,nil,1) - timer3.Stop() - time.Sleep(3*time.Second) - select { - case t:=<- timer2.C: - fmt.Println("It is time out!",t.AdditionData.(int)) - default: - fmt.Printf("time is stop") - } -}