mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-03 22:45:13 +08:00
优化定时器
This commit is contained in:
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/duanhf2012/origin/log"
|
||||
"github.com/duanhf2012/origin/profiler"
|
||||
"github.com/duanhf2012/origin/service"
|
||||
"github.com/duanhf2012/origin/util/timer"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
@@ -185,6 +186,7 @@ func startNode(args interface{}) error{
|
||||
return fmt.Errorf("invalid option %s",param)
|
||||
}
|
||||
|
||||
timer.StartTimer(10*time.Millisecond,100000)
|
||||
log.Release("Start running server.")
|
||||
//2.初始化node
|
||||
initNode(nodeId)
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"github.com/duanhf2012/origin/event"
|
||||
"github.com/duanhf2012/origin/log"
|
||||
"github.com/duanhf2012/origin/util/timer"
|
||||
"github.com/duanhf2012/origin/util/timewheel"
|
||||
"reflect"
|
||||
"time"
|
||||
)
|
||||
@@ -37,7 +36,7 @@ type Module struct {
|
||||
parent IModule //父亲
|
||||
self IModule //自己
|
||||
child map[int64]IModule //孩子们
|
||||
mapActiveTimer map[*timewheel.Timer]interface{}
|
||||
mapActiveTimer map[*timer.Timer]interface{}
|
||||
dispatcher *timer.Dispatcher //timer
|
||||
|
||||
//根结点
|
||||
@@ -119,7 +118,7 @@ func (m *Module) ReleaseModule(moduleId int64){
|
||||
pModule.self.OnRelease()
|
||||
log.Debug("Release module %s.", m.GetModuleName())
|
||||
for pTimer,_ := range pModule.mapActiveTimer {
|
||||
pTimer.AdditionData.(timer.ITime).Close()
|
||||
pTimer.Cancel()
|
||||
}
|
||||
|
||||
delete(m.child,moduleId)
|
||||
@@ -160,11 +159,13 @@ func (m *Module) GetParent()IModule{
|
||||
return m.parent
|
||||
}
|
||||
|
||||
func (m *Module) OnCloseTimer(timer *timewheel.Timer){
|
||||
func (m *Module) OnCloseTimer(timer *timer.Timer){
|
||||
fmt.Printf("OnCloseTimer %p\n",timer)
|
||||
delete(m.mapActiveTimer,timer)
|
||||
}
|
||||
|
||||
func (m *Module) OnAddTimer(t *timewheel.Timer){
|
||||
func (m *Module) OnAddTimer(t *timer.Timer){
|
||||
fmt.Printf("OnAddTimer %p\n",t)
|
||||
if t != nil {
|
||||
m.mapActiveTimer[t] = nil
|
||||
}
|
||||
@@ -172,7 +173,7 @@ func (m *Module) OnAddTimer(t *timewheel.Timer){
|
||||
|
||||
func (m *Module) AfterFunc(d time.Duration, cb func()) *timer.Timer {
|
||||
if m.mapActiveTimer == nil {
|
||||
m.mapActiveTimer =map[*timewheel.Timer]interface{}{}
|
||||
m.mapActiveTimer =map[*timer.Timer]interface{}{}
|
||||
}
|
||||
|
||||
return m.dispatcher.AfterFunc(d,cb,m.OnCloseTimer,m.OnAddTimer)
|
||||
@@ -180,7 +181,7 @@ func (m *Module) AfterFunc(d time.Duration, cb func()) *timer.Timer {
|
||||
|
||||
func (m *Module) CronFunc(cronExpr *timer.CronExpr, cb func(*timer.Cron)) *timer.Cron {
|
||||
if m.mapActiveTimer == nil {
|
||||
m.mapActiveTimer =map[*timewheel.Timer]interface{}{}
|
||||
m.mapActiveTimer =map[*timer.Timer]interface{}{}
|
||||
}
|
||||
|
||||
return m.dispatcher.CronFunc(cronExpr,cb,m.OnCloseTimer,m.OnAddTimer)
|
||||
@@ -188,7 +189,7 @@ func (m *Module) CronFunc(cronExpr *timer.CronExpr, cb func(*timer.Cron)) *timer
|
||||
|
||||
func (m *Module) NewTicker(d time.Duration, cb func(*timer.Ticker)) *timer.Ticker {
|
||||
if m.mapActiveTimer == nil {
|
||||
m.mapActiveTimer =map[*timewheel.Timer]interface{}{}
|
||||
m.mapActiveTimer =map[*timer.Timer]interface{}{}
|
||||
}
|
||||
|
||||
return m.dispatcher.TickerFunc(d,cb,m.OnCloseTimer,m.OnAddTimer)
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"github.com/duanhf2012/origin/profiler"
|
||||
"github.com/duanhf2012/origin/rpc"
|
||||
"github.com/duanhf2012/origin/util/timer"
|
||||
"github.com/duanhf2012/origin/util/timewheel"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"sync"
|
||||
@@ -136,18 +135,14 @@ func (s *Service) Run() {
|
||||
analyzer = nil
|
||||
}
|
||||
case t := <- s.dispatcher.ChanTimer:
|
||||
if t.IsClose() == false {
|
||||
time := t.AdditionData.(timer.ITime)
|
||||
if s.profiler != nil {
|
||||
analyzer = s.profiler.Push("[timer]"+time.GetName())
|
||||
analyzer = s.profiler.Push("[timer]"+t.GetName())
|
||||
}
|
||||
time.Do()
|
||||
t.Do()
|
||||
if analyzer != nil {
|
||||
analyzer.Pop()
|
||||
analyzer = nil
|
||||
}
|
||||
timewheel.ReleaseTimer(t)
|
||||
}
|
||||
}
|
||||
|
||||
if bStop == true {
|
||||
|
||||
95
util/timer/heap.go
Normal file
95
util/timer/heap.go
Normal file
@@ -0,0 +1,95 @@
|
||||
package timer
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"github.com/duanhf2012/origin/log"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
func SetupTimer(timer *Timer) *Timer{
|
||||
timerHeapLock.Lock() // 使用锁规避竞争条件
|
||||
heap.Push(&timerHeap,timer)
|
||||
timerHeapLock.Unlock()
|
||||
return timer
|
||||
}
|
||||
|
||||
type _TimerHeap struct {
|
||||
timers []*Timer
|
||||
}
|
||||
|
||||
func (h *_TimerHeap) Len() int {
|
||||
return len(h.timers)
|
||||
}
|
||||
|
||||
func (h *_TimerHeap) Less(i, j int) bool {
|
||||
return h.timers[i].fireTime.Before(h.timers[j].fireTime)
|
||||
}
|
||||
|
||||
func (h *_TimerHeap) Swap(i, j int) {
|
||||
h.timers[i],h.timers[j] = h.timers[j],h.timers[i]
|
||||
}
|
||||
|
||||
func (h *_TimerHeap) Push(x interface{}) {
|
||||
h.timers = append(h.timers, x.(*Timer))
|
||||
}
|
||||
|
||||
func (h *_TimerHeap) Pop() (ret interface{}) {
|
||||
l := len(h.timers)
|
||||
h.timers, ret = h.timers[:l-1], h.timers[l-1]
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
timerHeap _TimerHeap // 定时器heap对象
|
||||
timerHeapLock sync.Mutex // 一个全局的锁
|
||||
timeOffset time.Duration
|
||||
)
|
||||
|
||||
func StartTimer(minTimerInterval time.Duration,maxTimerNum int){
|
||||
timerHeap.timers = make([]*Timer,0,maxTimerNum)
|
||||
heap.Init(&timerHeap) // 初始化定时器heap
|
||||
|
||||
go tickRoutine(minTimerInterval)
|
||||
}
|
||||
|
||||
func tickRoutine(minTimerInterval time.Duration){
|
||||
for{
|
||||
time.Sleep(minTimerInterval)
|
||||
tick()
|
||||
}
|
||||
}
|
||||
|
||||
func tick(){
|
||||
now := Now()
|
||||
timerHeapLock.Lock()
|
||||
if timerHeap.Len() <= 0 { // 没有任何定时器,立刻返回
|
||||
timerHeapLock.Unlock()
|
||||
return
|
||||
}
|
||||
nextFireTime := timerHeap.timers[0].fireTime
|
||||
if nextFireTime.After(now) { // 没有到时间的定时器,返回
|
||||
timerHeapLock.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
t := heap.Pop(&timerHeap).(*Timer)
|
||||
timerHeapLock.Unlock()
|
||||
if len(t.C)>= cap(t.C) {
|
||||
log.Error("Timer channel full!")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
t.C <- t
|
||||
}
|
||||
|
||||
func Now() time.Time{
|
||||
if timeOffset == 0 {
|
||||
return time.Now()
|
||||
}
|
||||
|
||||
return time.Now().Add(timeOffset)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,13 +1,34 @@
|
||||
package timer
|
||||
|
||||
import (
|
||||
"github.com/duanhf2012/origin/util/timewheel"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Timer
|
||||
type Timer struct {
|
||||
name string
|
||||
cancelled bool //是否关闭
|
||||
C chan *Timer //定时器管道
|
||||
interval time.Duration // 时间间隔(用于循环定时器)
|
||||
fireTime time.Time // 触发时间
|
||||
cb func()
|
||||
AdditionData interface{} //定时器附加数据
|
||||
}
|
||||
|
||||
// Ticker
|
||||
type Ticker struct {
|
||||
Timer
|
||||
}
|
||||
|
||||
// Cron
|
||||
type Cron struct {
|
||||
Timer
|
||||
}
|
||||
|
||||
var timerPool = sync.Pool{New: func() interface{}{
|
||||
return &Timer{}
|
||||
}}
|
||||
@@ -20,198 +41,163 @@ var tickerPool = sync.Pool{New: func() interface{}{
|
||||
return &Ticker{}
|
||||
}}
|
||||
|
||||
// one dispatcher per goroutine (goroutine not safe)
|
||||
type Dispatcher struct {
|
||||
ChanTimer chan *timewheel.Timer
|
||||
func newTimer(d time.Duration,c chan *Timer,cb func(),name string,additionData interface{}) *Timer{
|
||||
if c == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewDispatcher(l int) *Dispatcher {
|
||||
disp := new(Dispatcher)
|
||||
disp.ChanTimer = make(chan *timewheel.Timer, l)
|
||||
return disp
|
||||
}
|
||||
|
||||
type ITime interface {
|
||||
Close ()
|
||||
Do()
|
||||
GetName() string
|
||||
}
|
||||
|
||||
// Timer
|
||||
type Timer struct {
|
||||
t *timewheel.Timer
|
||||
cb func()
|
||||
name string
|
||||
onClose func(timer *timewheel.Timer)
|
||||
}
|
||||
|
||||
// Cron
|
||||
type Cron struct {
|
||||
Timer
|
||||
}
|
||||
|
||||
// Ticker
|
||||
type Ticker struct {
|
||||
Timer
|
||||
}
|
||||
|
||||
func NewTimer(t *timewheel.Timer,cb func(),name string,onClose func(timer *timewheel.Timer)) *Timer {
|
||||
timer := timerPool.Get().(*Timer)
|
||||
timer.t = t
|
||||
timer.AdditionData = additionData
|
||||
timer.C = c
|
||||
timer.fireTime = Now().Add(d)
|
||||
timer.cb = cb
|
||||
timer.onClose = onClose
|
||||
timer.name = name
|
||||
timer.interval = d
|
||||
|
||||
return timer
|
||||
}
|
||||
|
||||
func ReleaseTimer(timer *Timer) {
|
||||
func releaseTimer(timer *Timer) {
|
||||
timerPool.Put(timer)
|
||||
}
|
||||
|
||||
func (t *Timer) Close(){
|
||||
if t.t!=nil {
|
||||
t.t.Close()
|
||||
func newTicker() *Ticker {
|
||||
ticker := tickerPool.Get().(*Ticker)
|
||||
return ticker
|
||||
}
|
||||
if t.onClose!=nil {
|
||||
t.onClose(t.t)
|
||||
|
||||
func releaseTicker(ticker *Ticker) {
|
||||
tickerPool.Put(ticker)
|
||||
}
|
||||
ReleaseTimer(t)
|
||||
|
||||
func newCron() *Cron {
|
||||
cron := cronPool.Get().(*Cron)
|
||||
return cron
|
||||
}
|
||||
|
||||
func releaseCron(cron *Cron) {
|
||||
cronPool.Put(cron)
|
||||
}
|
||||
|
||||
// one dispatcher per goroutine (goroutine not safe)
|
||||
type Dispatcher struct {
|
||||
ChanTimer chan *Timer
|
||||
}
|
||||
|
||||
func (t *Timer) Do(){
|
||||
t.Close()
|
||||
if t.cb != nil {
|
||||
t.cb()
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Timer) GetInterval() time.Duration{
|
||||
return t.interval
|
||||
}
|
||||
|
||||
func (t *Timer) Cancel() {
|
||||
t.cancelled = true
|
||||
}
|
||||
|
||||
// 判断定时器是否已经取消
|
||||
func (t *Timer) IsActive() bool {
|
||||
return !t.cancelled
|
||||
}
|
||||
|
||||
func (t *Timer) GetName() string{
|
||||
return t.name
|
||||
}
|
||||
|
||||
func NewCron(t *timewheel.Timer,cb func(),name string,onClose func(timer *timewheel.Timer)) *Cron {
|
||||
cron := cronPool.Get().(*Cron)
|
||||
cron.t = t
|
||||
cron.cb = cb
|
||||
cron.onClose = onClose
|
||||
cron.name = name
|
||||
return cron
|
||||
func NewDispatcher(l int) *Dispatcher {
|
||||
disp := new(Dispatcher)
|
||||
disp.ChanTimer = make(chan *Timer, l)
|
||||
return disp
|
||||
}
|
||||
|
||||
func ReleaseCron(cron *Cron) {
|
||||
cronPool.Put(cron)
|
||||
}
|
||||
|
||||
func (c *Cron) Do(){
|
||||
if c.onClose!=nil {
|
||||
c.onClose(c.t)
|
||||
}
|
||||
|
||||
c.cb()
|
||||
}
|
||||
|
||||
func (c *Cron) Close() {
|
||||
if c.t != nil {
|
||||
c.t.Close()
|
||||
}
|
||||
|
||||
if c.onClose!=nil {
|
||||
c.onClose(c.t)
|
||||
}
|
||||
|
||||
ReleaseCron(c)
|
||||
}
|
||||
|
||||
func NewTicker(t *timewheel.Timer,cb func(),name string,onClose func(timer *timewheel.Timer)) *Ticker {
|
||||
ticker := tickerPool.Get().(*Ticker)
|
||||
ticker.t = t
|
||||
ticker.cb = cb
|
||||
ticker.onClose = onClose
|
||||
ticker.name = name
|
||||
|
||||
return ticker
|
||||
}
|
||||
|
||||
func ReleaseTicker(ticker *Ticker) {
|
||||
tickerPool.Put(ticker)
|
||||
}
|
||||
|
||||
func (tk *Ticker) Do(){
|
||||
//通知当前timer删除
|
||||
if tk.onClose!=nil {
|
||||
tk.onClose(tk.t)
|
||||
}
|
||||
tk.cb()
|
||||
}
|
||||
|
||||
func (tk *Ticker) Close() {
|
||||
if tk.t != nil {
|
||||
tk.t.Close()
|
||||
}
|
||||
|
||||
if tk.onClose!=nil {
|
||||
tk.onClose(tk.t)
|
||||
}
|
||||
|
||||
ReleaseTicker(tk)
|
||||
}
|
||||
|
||||
func (disp *Dispatcher) AfterFunc(d time.Duration, cb func(),onCloseTimer func(timer *timewheel.Timer),onAddTimer func(timer *timewheel.Timer)) *Timer {
|
||||
type OnTimerClose func(timer *Timer)
|
||||
func (disp *Dispatcher) AfterFunc(d time.Duration, cb func(),onTimerClose OnTimerClose,onAddTimer func(timer *Timer)) *Timer {
|
||||
funName := runtime.FuncForPC(reflect.ValueOf(cb).Pointer()).Name()
|
||||
t := NewTimer(nil,cb,funName,onCloseTimer)
|
||||
t.t = timewheel.NewTimerEx(d,disp.ChanTimer,t)
|
||||
onAddTimer(t.t)
|
||||
timer := newTimer(d,disp.ChanTimer,cb,funName,nil)
|
||||
cbFunc := func() {
|
||||
onTimerClose(timer)
|
||||
releaseTimer(timer)
|
||||
if timer.IsActive() == false {
|
||||
return
|
||||
}
|
||||
|
||||
cb()
|
||||
}
|
||||
|
||||
timer.cb = cbFunc
|
||||
t := SetupTimer(timer)
|
||||
onAddTimer(t)
|
||||
|
||||
return t
|
||||
}
|
||||
|
||||
func (disp *Dispatcher) CronFunc(cronExpr *CronExpr, cb func(*Cron),onCloseTimer func(timer *timewheel.Timer),onAddTimer func(timer *timewheel.Timer)) *Cron {
|
||||
now := time.Now()
|
||||
func (disp *Dispatcher) CronFunc(cronExpr *CronExpr, cb func(*Cron),onTimerClose OnTimerClose,onAddTimer func(timer *Timer)) *Cron {
|
||||
now := Now()
|
||||
nextTime := cronExpr.Next(now)
|
||||
if nextTime.IsZero() {
|
||||
return nil
|
||||
}
|
||||
|
||||
funcName := runtime.FuncForPC(reflect.ValueOf(cb).Pointer()).Name()
|
||||
cron := NewCron(nil,nil,funcName,onCloseTimer)
|
||||
cron := newCron()
|
||||
// callback
|
||||
var cbFunc func()
|
||||
cbFunc = func() {
|
||||
now := time.Now()
|
||||
if cron.IsActive() == false{
|
||||
onTimerClose(&cron.Timer)
|
||||
releaseCron(cron)
|
||||
return
|
||||
}
|
||||
|
||||
now := Now()
|
||||
nextTime := cronExpr.Next(now)
|
||||
if nextTime.IsZero() {
|
||||
cb(cron)
|
||||
return
|
||||
}
|
||||
|
||||
interval := nextTime.Sub(now)
|
||||
minTimeInterval := time.Millisecond*time.Duration(timewheel.GRANULARITY)
|
||||
if interval < minTimeInterval {
|
||||
interval = minTimeInterval
|
||||
}
|
||||
|
||||
cron.t = timewheel.NewTimerEx(interval,disp.ChanTimer,cron)
|
||||
onAddTimer(cron.t)
|
||||
cron.interval = nextTime.Sub(now)
|
||||
cron.fireTime = now.Add(cron.interval)
|
||||
SetupTimer(&cron.Timer)
|
||||
cb(cron)
|
||||
}
|
||||
cron.C = disp.ChanTimer
|
||||
cron.cb = cbFunc
|
||||
cron.t = timewheel.NewTimerEx(nextTime.Sub(now),disp.ChanTimer,cron)
|
||||
onAddTimer(cron.t)
|
||||
cron.name = funcName
|
||||
cron.interval = nextTime.Sub(now)
|
||||
cron.fireTime = Now().Add(cron.interval)
|
||||
fmt.Println(cron.interval.Milliseconds(),"\n")
|
||||
SetupTimer(&cron.Timer)
|
||||
onAddTimer(&cron.Timer)
|
||||
return cron
|
||||
}
|
||||
|
||||
func (disp *Dispatcher) TickerFunc(d time.Duration, cb func(*Ticker),onCloseTimer func(timer *timewheel.Timer),onAddTimer func(timer *timewheel.Timer)) *Ticker {
|
||||
func (disp *Dispatcher) TickerFunc(d time.Duration, cb func(*Ticker),onTimerClose OnTimerClose,onAddTimer func(timer *Timer)) *Ticker {
|
||||
funcName := runtime.FuncForPC(reflect.ValueOf(cb).Pointer()).Name()
|
||||
ticker := NewTicker(nil,nil,funcName,onCloseTimer)
|
||||
// callback
|
||||
var cbFunc func()
|
||||
cbFunc = func() {
|
||||
ticker.t = timewheel.NewTimerEx(d,disp.ChanTimer,ticker)
|
||||
onAddTimer(ticker.t)
|
||||
ticker := newTicker()
|
||||
cbFunc := func() {
|
||||
cb(ticker)
|
||||
if ticker.Timer.IsActive() == true{
|
||||
ticker.fireTime = Now().Add(d)
|
||||
SetupTimer(&ticker.Timer)
|
||||
}else{
|
||||
onTimerClose(&ticker.Timer)
|
||||
releaseTicker(ticker)
|
||||
}
|
||||
}
|
||||
|
||||
ticker.C = disp.ChanTimer
|
||||
ticker.fireTime = Now().Add(d)
|
||||
ticker.cb = cbFunc
|
||||
ticker.t = timewheel.NewTimerEx(d,disp.ChanTimer,ticker)
|
||||
onAddTimer(ticker.t)
|
||||
ticker.name = funcName
|
||||
ticker.interval = d
|
||||
|
||||
// callback
|
||||
SetupTimer(&ticker.Timer)
|
||||
onAddTimer(&ticker.Timer)
|
||||
|
||||
return ticker
|
||||
}
|
||||
|
||||
@@ -1,422 +0,0 @@
|
||||
package timewheel
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
//分别用位代表每个轮存在的轮子数(2^6,2^6,...2^8)
|
||||
//--------------------------
|
||||
//| 6 | 6 | 6 | 6 | 8 |
|
||||
//--------------------------
|
||||
//根据游戏定时器,将第一轮控制在12位,即40960ms以内。
|
||||
var GRANULARITY int64 = 10 //定时器的最小单位,每格10ms
|
||||
//var wheelBitSize =[]int{8,6,6,6,6} //32bit定时器
|
||||
var wheelBitSize =[]int{12,7,6,5,2} //32bit定时器
|
||||
|
||||
type wheelInfo struct {
|
||||
slotNum int //轮子slot数量
|
||||
threshold int64 //轮子最大表示数字范围,如果是第一个轮子2^8,第二个轮子是2^6...
|
||||
}
|
||||
|
||||
var tWheel *timeWheel //时间实例化对象指针
|
||||
var chanStartTimer chan *Timer //开始定时器Channel
|
||||
var chanStopTimer chan *Timer //停止定时器Channel
|
||||
const chanTimerLen int = 40960 //Channel
|
||||
var timerPool = sync.Pool{New: func() interface{}{
|
||||
return &Timer{}
|
||||
}}
|
||||
|
||||
|
||||
//构造时间轮对象与相关初始化
|
||||
func init(){
|
||||
tWheel = newTimeWheel()
|
||||
chanStartTimer = make(chan *Timer,chanTimerLen)
|
||||
chanStopTimer = make(chan *Timer,chanTimerLen)
|
||||
|
||||
go timerRunning()
|
||||
}
|
||||
|
||||
//定时器运行与驱动
|
||||
func timerRunning(){
|
||||
t := time.NewTicker(time.Millisecond*10)
|
||||
for {
|
||||
/*
|
||||
if test == true {
|
||||
testTimerRunning()
|
||||
}*/
|
||||
|
||||
select{
|
||||
case startTimer:=<-chanStartTimer:
|
||||
tWheel.addTimer(startTimer)
|
||||
case stopTimer:=<-chanStopTimer:
|
||||
tWheel.delTimer(stopTimer)
|
||||
case <-t.C:
|
||||
tWheel.Tick()
|
||||
}
|
||||
}
|
||||
}
|
||||
/*
|
||||
var test bool = false
|
||||
func testTimerRunning(){
|
||||
for {
|
||||
select {
|
||||
case startTimer := <-chanStartTimer:
|
||||
tWheel.addTimer(startTimer)
|
||||
case stopTimer := <-chanStopTimer:
|
||||
tWheel.delTimer(stopTimer)
|
||||
default:
|
||||
tWheel.TickOneFrame()
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
|
||||
|
||||
func NewTimerEx(d time.Duration,c chan *Timer,additionData interface{}) *Timer{
|
||||
if c == nil {
|
||||
c = make(chan *Timer, 1)
|
||||
}
|
||||
timer := tWheel.newTimer(d.Milliseconds()/GRANULARITY,additionData,c)
|
||||
chanStartTimer<-timer
|
||||
return timer
|
||||
}
|
||||
|
||||
func NewTimer(d time.Duration) *Timer{
|
||||
timer := tWheel.newTimer(d.Milliseconds()/GRANULARITY,nil,make(chan *Timer, 1))
|
||||
chanStartTimer<-timer
|
||||
return timer
|
||||
}
|
||||
|
||||
//链表结点
|
||||
type stNode struct {
|
||||
prev *Timer
|
||||
next *Timer
|
||||
}
|
||||
|
||||
//定时器结构体
|
||||
type Timer struct {
|
||||
stNode
|
||||
expireTicks int64 //到期滴答数
|
||||
end int32 //是否已经关闭0表示开启状态,1表示关闭
|
||||
bClose bool //是否关闭
|
||||
C chan *Timer //定时器管道
|
||||
AdditionData interface{} //定时器附加数据
|
||||
}
|
||||
|
||||
//停止停时器
|
||||
func (timer *Timer) Close(){
|
||||
timer.bClose = true
|
||||
if timer.bClose == true {
|
||||
return
|
||||
}
|
||||
|
||||
//将关闭标志设为1关闭状态
|
||||
if atomic.SwapInt32(&timer.end,1) == 0 {
|
||||
chanStopTimer<-timer
|
||||
}
|
||||
}
|
||||
|
||||
//定时器是否已经停止
|
||||
func (timer *Timer) IsClose() bool {
|
||||
return timer.bClose
|
||||
}
|
||||
|
||||
func (timer *Timer) IsEnd() bool{
|
||||
return atomic.LoadInt32(&timer.end) !=0
|
||||
}
|
||||
|
||||
func (timer *Timer) doTimeout(){
|
||||
if atomic.SwapInt32(&timer.end,1) != 0 {
|
||||
return
|
||||
}
|
||||
timer.prev = nil
|
||||
timer.next = nil
|
||||
select {
|
||||
case timer.C <- timer:
|
||||
}
|
||||
}
|
||||
|
||||
//每个时间轮上的刻度
|
||||
type slots struct {
|
||||
timer *Timer //定时器链表头
|
||||
restTicks int64 //当前刻度走完一圈剩余时间ticks
|
||||
}
|
||||
|
||||
//时间轮子
|
||||
type stWheel struct {
|
||||
slots []*slots //刻度切片
|
||||
slotIndex int //当前指针所在的位置索引
|
||||
}
|
||||
|
||||
//获取当前轮的总刻度数
|
||||
func (stw *stWheel) slotSize() int{
|
||||
return len(stw.slots)
|
||||
}
|
||||
|
||||
//添加定时器到slots上
|
||||
func (s *slots) addTimer(timer *Timer){
|
||||
timer.next = s.timer
|
||||
timer.prev = s.timer.prev
|
||||
s.timer.prev.next = timer
|
||||
s.timer.prev = timer
|
||||
}
|
||||
|
||||
//当前slots上是否没有定时器
|
||||
func (s *slots) isEmpty() bool{
|
||||
return s.timer == s.timer.next
|
||||
}
|
||||
|
||||
func (s *slots) makeEmpty() {
|
||||
s.timer.next = s.timer
|
||||
s.timer.prev = s.timer
|
||||
}
|
||||
|
||||
|
||||
//时间轮结构体
|
||||
type timeWheel struct {
|
||||
wheels []*stWheel //所有的轮子的切片
|
||||
wheelInfos []*wheelInfo //所有轮子的信息,预计算存储
|
||||
wheelSize int //轮子数
|
||||
|
||||
currentTime int64 //当前已经走进的的自然时间
|
||||
currentTicks int64 //当前已经走进的ticks数
|
||||
}
|
||||
|
||||
//构建时间轮对象
|
||||
func newTimeWheel() *timeWheel{
|
||||
tWheel := &timeWheel{}
|
||||
tWheel.set(wheelBitSize)
|
||||
tWheel.currentTime = GetNow()
|
||||
return tWheel
|
||||
}
|
||||
|
||||
//设置n位定时器
|
||||
func (t *timeWheel) set(wheelBitSize []int){
|
||||
t.wheelSize = len(wheelBitSize)
|
||||
t.wheelInfos = make([]*wheelInfo,len(wheelBitSize))
|
||||
t.wheels = make([]*stWheel,len(wheelBitSize))
|
||||
totalBitSize := 0
|
||||
|
||||
for idx,bitSize := range wheelBitSize {
|
||||
totalBitSize += bitSize
|
||||
//1.轮子信息
|
||||
t.wheelInfos[idx] = &wheelInfo{}
|
||||
t.wheelInfos[idx].slotNum = 1 << bitSize
|
||||
t.wheelInfos[idx].threshold = 1<< totalBitSize
|
||||
|
||||
//2.make轮子里面的slot
|
||||
t.wheels[idx] = &stWheel{}
|
||||
t.wheels[idx].slots = make([]*slots,t.wheelInfos[idx].slotNum)
|
||||
for slotIdx,_ := range t.wheels[idx].slots {
|
||||
t.wheels[idx].slots[slotIdx] = t.newSlot(t.wheels[idx].slots[slotIdx])
|
||||
|
||||
//计算当前slot走完剩余的ticks数,以下turns有个特殊处理
|
||||
//第一个轮子idx==0时每个slot的刻度代表1,从第二个轮子开始即idx>0根据
|
||||
//t.wheelInfos[idx-1].threshold获得每个刻度值
|
||||
//turns代表由于前一轮已经走一圈了,那么当前slotIdex应该加1,即:slotIdx+turns
|
||||
var perSlotTicks int64 = 1
|
||||
turns := 0
|
||||
if idx>0 {
|
||||
perSlotTicks = t.wheelInfos[idx-1].threshold
|
||||
turns = 1
|
||||
}
|
||||
s := ((1 << bitSize) - (slotIdx+turns))*int(perSlotTicks)
|
||||
t.wheels[idx].slots[slotIdx].restTicks = int64(s)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//构建一个slot(轮子上的刻度)
|
||||
func (t *timeWheel) newSlot(slot *slots) *slots{
|
||||
//如果是不存在的slot申请内存
|
||||
if slot == nil {
|
||||
slot = &slots{}
|
||||
timer := &Timer{}
|
||||
slot.timer = timer
|
||||
}
|
||||
|
||||
//构建双向循环链表
|
||||
slot.timer.next = slot.timer
|
||||
slot.timer.prev = slot.timer
|
||||
|
||||
return slot
|
||||
}
|
||||
|
||||
|
||||
//获取当前时间戳ms
|
||||
func GetNow() int64 {
|
||||
return time.Now().UnixNano()/int64(time.Millisecond)
|
||||
}
|
||||
|
||||
//创建定时器 ticks表示多少个ticks单位到期, additionData定时器附带数据, c到时通知的channel
|
||||
func (t *timeWheel) newTimer(ticks int64,additionData interface{},c chan *Timer) *Timer{
|
||||
pTimer := timerPool.Get().(*Timer)
|
||||
pTimer.end = 0
|
||||
pTimer.bClose = false
|
||||
pTimer.C = c
|
||||
pTimer.AdditionData = additionData
|
||||
|
||||
if ticks<=0 {
|
||||
ticks = 1
|
||||
}
|
||||
pTimer.expireTicks = ticks+t.currentTicks
|
||||
return pTimer
|
||||
}
|
||||
|
||||
func ReleaseTimer(timer *Timer) {
|
||||
timerPool.Put(timer)
|
||||
}
|
||||
|
||||
//添加定时器
|
||||
func (t *timeWheel) addTimer(timer *Timer) *Timer {
|
||||
//1.计算到期时间ticks
|
||||
ticks := timer.expireTicks - t.currentTicks
|
||||
if ticks<=0 {
|
||||
timer.doTimeout()
|
||||
return timer
|
||||
}
|
||||
//2.for遍历通过ticks找到适合的轮子插入,从底轮子往高找
|
||||
var slot *slots
|
||||
for wheelIndex,info := range t.wheelInfos {
|
||||
if ticks < info.threshold {
|
||||
var restTicks int64
|
||||
var slotTicks int64
|
||||
var slotIndex int
|
||||
//如果不是第0个轮子
|
||||
if wheelIndex != 0 {
|
||||
//计算前面所有的轮子剩余ticks数总和(即:当前轮子还有多少个ticks会移动到下一个)
|
||||
restTicks = t.getWheelSum(wheelIndex)
|
||||
//当前轮子每个刻度的ticks数
|
||||
slotTicks = t.wheelInfos[wheelIndex-1].threshold
|
||||
//计算当前落到哪个slotIndex中
|
||||
slotIndex = (t.wheels[wheelIndex].slotIndex+int((ticks-restTicks)/slotTicks)) % t.wheelInfos[wheelIndex].slotNum
|
||||
}else{
|
||||
slotIndex = (t.wheels[wheelIndex].slotIndex + int(ticks))%t.wheelInfos[wheelIndex].slotNum
|
||||
}
|
||||
//取得slot对象指针
|
||||
slot = t.wheels[wheelIndex].slots[slotIndex]
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
//3.如果都找不到失败
|
||||
if slot == nil {
|
||||
panic("cannot find slot!")
|
||||
//return nil
|
||||
}
|
||||
|
||||
//4.添加定时器timer到链表
|
||||
slot.addTimer(timer)
|
||||
return timer
|
||||
}
|
||||
|
||||
//删除定时器
|
||||
func (t *timeWheel) delTimer(timer *Timer) {
|
||||
if timer.next == nil {
|
||||
return
|
||||
}
|
||||
timer.prev.next = timer.next
|
||||
timer.next.prev = timer.prev
|
||||
ReleaseTimer(timer)
|
||||
}
|
||||
|
||||
//按照自然时间走动时间差计算loop,并且进行Tick
|
||||
func (t *timeWheel) Tick(){
|
||||
nowTime := GetNow()
|
||||
|
||||
loop := (nowTime - t.currentTime)/int64(GRANULARITY)
|
||||
if loop> 0 {
|
||||
t.currentTime = nowTime
|
||||
}
|
||||
|
||||
for i:=int64(0);i<loop;i++{
|
||||
t.TickOneFrame()
|
||||
}
|
||||
}
|
||||
|
||||
//Tick一帧
|
||||
func (t *timeWheel) TickOneFrame(){
|
||||
//1.往前走一个Tick
|
||||
t.currentTicks += 1
|
||||
|
||||
//2.将当前slot全部到时处理
|
||||
var nextTimer *Timer
|
||||
slot := t.wheels[0].slots[t.wheels[0].slotIndex]
|
||||
for currTimer := slot.timer.next;currTimer!=slot.timer; {
|
||||
nextTimer = currTimer.next
|
||||
//如果当前定时器已经停止,不做任何处理.否则放入到定时器的channel
|
||||
if currTimer.IsEnd() == true {
|
||||
currTimer = nextTimer
|
||||
continue
|
||||
}
|
||||
currTimer.doTimeout()
|
||||
currTimer = nextTimer
|
||||
}
|
||||
|
||||
//3.将timer全部清空处理
|
||||
t.wheels[0].slots[t.wheels[0].slotIndex].makeEmpty()
|
||||
|
||||
//4.指针转动
|
||||
t.wheels[0].slotIndex+=1
|
||||
|
||||
//5.如果当前刻度转完一轮时,从0表示,并处理下一个轮子的计算
|
||||
if t.wheels[0].slotIndex >= t.wheels[0].slotSize() {
|
||||
t.wheels[0].slotIndex = 0
|
||||
t.cascade(1)
|
||||
}
|
||||
}
|
||||
|
||||
//获得当前轮子的slots
|
||||
func (t *timeWheel) getCurrentSlot(wheelIndex int) *slots{
|
||||
return t.wheels[wheelIndex].slots[t.wheels[wheelIndex].slotIndex]
|
||||
}
|
||||
|
||||
//获取当前轮wheelIndex转动所需要的ticks数量
|
||||
func (t *timeWheel) getWheelSum(wheelIndex int) int64{
|
||||
var ticks int64
|
||||
//遍历前面n个轮子
|
||||
for i:=0;i<wheelIndex;i++{
|
||||
ticks += t.getCurrentSlot(i).restTicks
|
||||
}
|
||||
|
||||
return ticks
|
||||
}
|
||||
|
||||
//转动下一个轮子(即wheelIndex>0的轮子)
|
||||
func (t *timeWheel) cascade(wheelIndex int) {
|
||||
if wheelIndex<1 || wheelIndex>=t.wheelSize {
|
||||
return
|
||||
}
|
||||
|
||||
//1.取得对应的轮子上的slot
|
||||
wheel := t.wheels[wheelIndex]
|
||||
slot := wheel.slots[wheel.slotIndex]
|
||||
|
||||
//2.将当前的slot遍历并重新加入
|
||||
currentTimer := slot.timer.next
|
||||
for ;currentTimer!=slot.timer; {
|
||||
//先保存一个定时器指针,预防链表迭代失效问题
|
||||
nextTimer:=currentTimer.next
|
||||
//如果到时,直接送到channel
|
||||
if currentTimer.expireTicks<= t.currentTicks {
|
||||
if currentTimer.IsEnd() == false {
|
||||
currentTimer.doTimeout()
|
||||
}
|
||||
}else{//否则重新添加,会加到下一级轮中
|
||||
t.addTimer(currentTimer)
|
||||
}
|
||||
currentTimer = nextTimer
|
||||
}
|
||||
//3.将当前轮清空
|
||||
wheel.slots[wheel.slotIndex].makeEmpty()
|
||||
|
||||
//4.如果当前轮子跳过一轮,需要跳动到下一时间轮
|
||||
wheel.slotIndex++
|
||||
if wheel.slotIndex>=wheel.slotSize() {
|
||||
wheel.slotIndex = 0
|
||||
t.cascade(wheelIndex+1)
|
||||
}
|
||||
}
|
||||
@@ -1,31 +0,0 @@
|
||||
package timewheel
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
func Test_Example(t *testing.T) {
|
||||
timer:=NewTimer(time.Second*2)
|
||||
select {
|
||||
case <- timer.C:
|
||||
fmt.Println("It is time out!")
|
||||
}
|
||||
|
||||
timer2 := NewTimerEx(time.Second*2,nil,1)
|
||||
select {
|
||||
case t:=<- timer2.C:
|
||||
fmt.Println("It is time out!",t.AdditionData.(int))
|
||||
}
|
||||
|
||||
timer3 := NewTimerEx(time.Second*2,nil,1)
|
||||
timer3.Stop()
|
||||
time.Sleep(3*time.Second)
|
||||
select {
|
||||
case t:=<- timer2.C:
|
||||
fmt.Println("It is time out!",t.AdditionData.(int))
|
||||
default:
|
||||
fmt.Printf("time is stop")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user