新增时间轮定时器

This commit is contained in:
boyce
2020-10-24 16:55:55 +08:00
parent 18281e2b09
commit 2b05358f04
5 changed files with 171 additions and 194 deletions

View File

@@ -44,7 +44,7 @@ func usage(val interface{}) error{
return nil return nil
} }
fmt.Fprintf(os.Stderr, `orgin version: orgin/2.11.20201023 fmt.Fprintf(os.Stderr, `orgin version: orgin/2.12.20201024
Usage: originserver [-help] [-start node=1] [-stop] [-config path] [-pprof 0.0.0.0:6060] Usage: originserver [-help] [-start node=1] [-stop] [-config path] [-pprof 0.0.0.0:6060]
`) `)
console.PrintDefaults() console.PrintDefaults()

View File

@@ -138,13 +138,15 @@ func (slf *Service) Run() {
analyzer = nil analyzer = nil
} }
case t := <- slf.dispatcher.ChanTimer: case t := <- slf.dispatcher.ChanTimer:
if slf.profiler!=nil { if t.IsStop() == false {
analyzer = slf.profiler.Push(fmt.Sprintf("Timer_%s", t.GetFunctionName())) if slf.profiler != nil {
} analyzer = slf.profiler.Push(fmt.Sprintf("Timer_%s", t.AdditionData.(*timer.Timer).GetFunctionName()))
t.Cb() }
if analyzer!=nil { t.AdditionData.(*timer.Timer).Cb()
analyzer.Pop() if analyzer != nil {
analyzer = nil analyzer.Pop()
analyzer = nil
}
} }
} }

View File

@@ -3,6 +3,7 @@ package timer
import ( import (
"fmt" "fmt"
"github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/util/timewheel"
"reflect" "reflect"
"runtime" "runtime"
"time" "time"
@@ -10,18 +11,18 @@ import (
// one dispatcher per goroutine (goroutine not safe) // one dispatcher per goroutine (goroutine not safe)
type Dispatcher struct { type Dispatcher struct {
ChanTimer chan *Timer ChanTimer chan *timewheel.Timer
} }
func NewDispatcher(l int) *Dispatcher { func NewDispatcher(l int) *Dispatcher {
disp := new(Dispatcher) disp := new(Dispatcher)
disp.ChanTimer = make(chan *Timer, l) disp.ChanTimer = make(chan *timewheel.Timer, l)
return disp return disp
} }
// Timer // Timer
type Timer struct { type Timer struct {
t *time.Timer t *timewheel.Timer
cb func() cb func()
cbex func(*Timer) cbex func(*Timer)
name string name string
@@ -58,10 +59,7 @@ func (disp *Dispatcher) AfterFunc(d time.Duration, cb func()) *Timer {
t := new(Timer) t := new(Timer)
t.cb = cb t.cb = cb
t.name = reflect.TypeOf(cb).Name() t.name = reflect.TypeOf(cb).Name()
t.t = timewheel.NewTimerEx(d,disp.ChanTimer,t)
t.t = time.AfterFunc(d, func() {
disp.ChanTimer <- t
})
return t return t
} }
@@ -69,14 +67,9 @@ func (disp *Dispatcher) AfterFunc(d time.Duration, cb func()) *Timer {
func (disp *Dispatcher) AfterFuncEx(funName string,d time.Duration, cbex func(timer *Timer)) *Timer { func (disp *Dispatcher) AfterFuncEx(funName string,d time.Duration, cbex func(timer *Timer)) *Timer {
t := new(Timer) t := new(Timer)
t.cbex = cbex t.cbex = cbex
t.name = funName//reflect.TypeOf(cbex).Name() t.name = funName
//t.name = runtime.FuncForPC(reflect.ValueOf(cbex).Pointer()).Name() t.t = timewheel.NewTimerEx(d,disp.ChanTimer,t)
t.t = time.AfterFunc(d, func() {
if disp == nil {
return
}
disp.ChanTimer <- t
})
return t return t
} }

View File

@@ -9,24 +9,22 @@ import (
//-------------------------- //--------------------------
//| 6 | 6 | 6 | 6 | 8 | //| 6 | 6 | 6 | 6 | 8 |
//-------------------------- //--------------------------
//根据游戏定时器将第一轮控制在12位即40960ms以内。
var GRANULARITY int64 = 10//10ms var GRANULARITY int64 = 10 //定时器的最小单位,每格10ms
var OnTimerChannelSize int = 1000 //var wheelBitSize =[]int{8,6,6,6,6} //32bit定时器
var wheelBitSize =[]int{8,6,6,6,6} //32bit定时器 var wheelBitSize =[]int{12,7,6,5,2} //32bit定时器
type wheelInfo struct { type wheelInfo struct {
slotNum int //slot数量 slotNum int //轮子slot数量
threshold int64 // threshold int64 //轮子最大表示数字范围,如果是第一个轮子2^8,第二个轮子是2^6...
//mask int
//bits int
} }
//type OnTimerCB func() var tWheel *timeWheel //时间实例化对象指针
var tWheel *timeWheel var chanStartTimer chan *Timer //开始定时器Channel
var chanStartTimer chan *Timer var chanStopTimer chan *Timer //停止定时器Channel
var chanStopTimer chan *Timer const chanTimerLen int = 40960 //Channel
const chanTimerLen int = 40960
//构造时间轮对象与相关初始化
func init(){ func init(){
tWheel = newTimeWheel() tWheel = newTimeWheel()
chanStartTimer = make(chan *Timer,chanTimerLen) chanStartTimer = make(chan *Timer,chanTimerLen)
@@ -35,9 +33,14 @@ func init(){
go timerRunning() go timerRunning()
} }
//定时器运行与驱动
func timerRunning(){ func timerRunning(){
t := time.NewTicker(time.Microsecond*5) t := time.NewTicker(time.Microsecond*5)
for { for {
/* if test == true {
testTimerRunning()
}
*/
select{ select{
case startTimer:=<-chanStartTimer: case startTimer:=<-chanStartTimer:
tWheel.addTimer(startTimer) tWheel.addTimer(startTimer)
@@ -48,12 +51,20 @@ func timerRunning(){
} }
} }
} }
/*
var test bool = false
func testTimerRunning(){
for {
tWheel.TickOneFrame()
}
}
*/
func NewCBTimer(d time.Duration,f func(),c chan *Timer) *Timer{ func NewTimerEx(d time.Duration,c chan *Timer,additionData interface{}) *Timer{
if c == nil { if c == nil {
c = make(chan *Timer, 1) c = make(chan *Timer, 1)
} }
timer := tWheel.newTimer(d.Milliseconds(),f,c) timer := tWheel.newTimer(d.Milliseconds()/GRANULARITY,additionData,c)
chanStartTimer<-timer chanStartTimer<-timer
return timer return timer
} }
@@ -64,78 +75,94 @@ func NewTimer(d time.Duration) *Timer{
return timer return timer
} }
//链表结点
type stNode struct { type stNode struct {
prev *Timer prev *Timer
next *Timer next *Timer
} }
//定时器结构体
type Timer struct { type Timer struct {
stNode stNode
timerCB func() expireTicks int64 //到期滴答数
expireTicks int64 //到期滴答数 isClose int32 //是否已经关闭0表示开启状态,1表示关闭
isClose int32 C chan *Timer //定时器管道
C chan *Timer AdditionData interface{} //定时器附加数据
} }
//停止停时器
func (timer *Timer) Stop(){ func (timer *Timer) Stop(){
//将关闭标志设为1关闭状态
atomic.StoreInt32(&timer.isClose,1) atomic.StoreInt32(&timer.isClose,1)
chanStopTimer<-timer chanStopTimer<-timer
} }
//定时器是否已经停止
func (timer *Timer) IsStop() bool { func (timer *Timer) IsStop() bool {
return atomic.LoadInt32(&timer.isClose) != 0 return atomic.LoadInt32(&timer.isClose) != 0
} }
type Slots struct { //每个时间轮上的刻度
timer *Timer type slots struct {
restTicks int64 timer *Timer //定时器链表头
restTicks int64 //当前刻度走完一圈剩余时间ticks
} }
//时间轮子
type stWheel struct { type stWheel struct {
slots []*Slots slots []*slots //刻度切片
slotIndex int slotIndex int //当前指针所在的位置索引
} }
//获取当前轮的总刻度数
func (stw *stWheel) slotSize() int{ func (stw *stWheel) slotSize() int{
return len(stw.slots) return len(stw.slots)
} }
func (s *Slots) addTimer(timer *Timer){ //添加定时器到slots上
func (s *slots) addTimer(timer *Timer){
timer.next = s.timer timer.next = s.timer
timer.prev = s.timer.prev timer.prev = s.timer.prev
s.timer.prev.next = timer s.timer.prev.next = timer
s.timer.prev = timer s.timer.prev = timer
} }
func (s *Slots) isEmpty() bool{ //当前slots上是否没有定时器
func (s *slots) isEmpty() bool{
return s.timer == s.timer.next return s.timer == s.timer.next
} }
type timeWheel struct { func (s *slots) makeEmpty() {
wheels []*stWheel s.timer.next = s.timer
wheelInfos []*wheelInfo s.timer.prev = s.timer
wheelSize int
//chanTimer chan *Timer
currentTime int64 //
currentTicks int64 //当前检查的帧数
} }
//时间轮结构体
type timeWheel struct {
wheels []*stWheel //所有的轮子的切片
wheelInfos []*wheelInfo //所有轮子的信息,预计算存储
wheelSize int //轮子数
currentTime int64 //当前已经走进的的自然时间
currentTicks int64 //当前已经走进的ticks数
}
//构建时间轮对象
func newTimeWheel() *timeWheel{ func newTimeWheel() *timeWheel{
tWheel := &timeWheel{} tWheel := &timeWheel{}
tWheel.Set(wheelBitSize) tWheel.set(wheelBitSize)
//tWheel.chanTimer = make(chan *Timer,OnTimerChannelSize)
tWheel.currentTime = GetNow() tWheel.currentTime = GetNow()
return tWheel return tWheel
} }
//设置n位定时器
func (t *timeWheel) Set(wheelBitSize []int){ func (t *timeWheel) set(wheelBitSize []int){
t.wheelSize = len(wheelBitSize) t.wheelSize = len(wheelBitSize)
t.wheelInfos = make([]*wheelInfo,len(wheelBitSize)) t.wheelInfos = make([]*wheelInfo,len(wheelBitSize))
t.wheels = make([]*stWheel,len(wheelBitSize)) t.wheels = make([]*stWheel,len(wheelBitSize))
totalBitSize := 0 totalBitSize := 0
for idx,bitSize := range wheelBitSize { for idx,bitSize := range wheelBitSize {
totalBitSize += bitSize totalBitSize += bitSize
//1.轮子信息 //1.轮子信息
@@ -145,9 +172,14 @@ func (t *timeWheel) Set(wheelBitSize []int){
//2.make轮子里面的slot //2.make轮子里面的slot
t.wheels[idx] = &stWheel{} t.wheels[idx] = &stWheel{}
t.wheels[idx].slots = make([]*Slots,t.wheelInfos[idx].slotNum) t.wheels[idx].slots = make([]*slots,t.wheelInfos[idx].slotNum)
for slotIdx,_ := range t.wheels[idx].slots { for slotIdx,_ := range t.wheels[idx].slots {
t.wheels[idx].slots[slotIdx] = t.newSlot(t.wheels[idx].slots[slotIdx]) t.wheels[idx].slots[slotIdx] = t.newSlot(t.wheels[idx].slots[slotIdx])
//计算当前slot走完剩余的ticks数以下turns有个特殊处理
//第一个轮子idx==0时每个slot的刻度代表1,从第二个轮子开始即idx>0根据
//t.wheelInfos[idx-1].threshold获得每个刻度值
//turns代表由于前一轮已经走一圈了,那么当前slotIdex应该加1slotIdx+turns
var perSlotTicks int64 = 1 var perSlotTicks int64 = 1
turns := 0 turns := 0
if idx>0 { if idx>0 {
@@ -160,76 +192,86 @@ func (t *timeWheel) Set(wheelBitSize []int){
} }
} }
func (t *timeWheel) newSlot(slots *Slots) *Slots{ //构建一个slot(轮子上的刻度)
if slots == nil { func (t *timeWheel) newSlot(slot *slots) *slots{
slots = &Slots{} //如果是不存在的slot申请内存
if slot == nil {
slot = &slots{}
timer := &Timer{} timer := &Timer{}
slots.timer = timer slot.timer = timer
} }
slots.timer.next = slots.timer //构建双向循环链表
slots.timer.prev = slots.timer slot.timer.next = slot.timer
slot.timer.prev = slot.timer
return slots return slot
} }
//获取当前时间戳ms
func GetNow() int64 { func GetNow() int64 {
return time.Now().UnixNano()/int64(time.Millisecond) return time.Now().UnixNano()/int64(time.Millisecond)
} }
//创建定时器 ticks表示多少个ticks单位到期, additionData定时器附带数据, c到时通知的channel
func (t *timeWheel) newTimer(ticks int64,f func(),c chan *Timer) *Timer{ func (t *timeWheel) newTimer(ticks int64,additionData interface{},c chan *Timer) *Timer{
return &Timer{timerCB: f,expireTicks:ticks+t.currentTicks,C:c} return &Timer{AdditionData: additionData,expireTicks:ticks+t.currentTicks,C:c}
} }
func ReleaseTimer(timer *Timer) { func ReleaseTimer(timer *Timer) {
} }
/*
func (t *timeWheel) AddTimer(milSeconds int,onTimer OnTimerCB) *Timer {
ticks := milSeconds / GRANULARITY
timer := t.newTimer(milSeconds,ticks,onTimer)
return t.addTimer(timer)
}
*/
//添加定时器
func (t *timeWheel) addTimer(timer *Timer) *Timer { func (t *timeWheel) addTimer(timer *Timer) *Timer {
//1.计算到期时间ticks
ticks := timer.expireTicks - t.currentTicks ticks := timer.expireTicks - t.currentTicks
var slot *Slots
//2.for遍历通过ticks找到适合的轮子插入,从底轮子往高找
var slot *slots
for wheelIndex,info := range t.wheelInfos { for wheelIndex,info := range t.wheelInfos {
if ticks < info.threshold { if ticks < info.threshold {
var subValue int64 var restTicks int64
var offSet int64 var slotTicks int64
index := 0 var slotIndex int
//如果不是第0个轮子
if wheelIndex != 0 { if wheelIndex != 0 {
subValue = t.getWheelSum(wheelIndex)//t.wheelInfos[wheelIndex-1].threshold*t.wheels[wheelIndex].slotIndex //计算前面所有的轮子剩余ticks数总和(即当前轮子还有多少个ticks会移动到下一个)
offSet = t.wheelInfos[wheelIndex-1].threshold restTicks = t.getWheelSum(wheelIndex)
index = (t.wheels[wheelIndex].slotIndex+int((ticks-subValue)/offSet)) % t.wheelInfos[wheelIndex].slotNum///(ticks - subValue + offSet)>>bits&info.mask //当前轮子每个刻度的ticks数
slotTicks = t.wheelInfos[wheelIndex-1].threshold
//计算当前落到哪个slotIndex中
slotIndex = (t.wheels[wheelIndex].slotIndex+int((ticks-restTicks)/slotTicks)) % t.wheelInfos[wheelIndex].slotNum
}else{ }else{
index = (t.wheels[wheelIndex].slotIndex + int(ticks))%t.wheelInfos[wheelIndex].slotNum slotIndex = (t.wheels[wheelIndex].slotIndex + int(ticks))%t.wheelInfos[wheelIndex].slotNum
} }
//取得slot对象指针
slot = t.wheels[wheelIndex].slots[index] slot = t.wheels[wheelIndex].slots[slotIndex]
break break
} }
} }
//插入到指定位置 //3.如果都找不到失败
if slot == nil { if slot == nil {
return nil panic("cannot find slot!")
//return nil
} }
//4.添加定时器timer到链表
slot.addTimer(timer) slot.addTimer(timer)
return timer return timer
} }
//删除定时器
func (t *timeWheel) delTimer(timer *Timer) { func (t *timeWheel) delTimer(timer *Timer) {
timer.prev.next = timer.next timer.prev.next = timer.next
timer.next.prev = timer.prev timer.next.prev = timer.prev
//ReleaseTimer(timer) timer.next = nil
timer.prev = nil
} }
//按照自然时间走动时间差计算loop并且进行Tick
func (t *timeWheel) Tick(){ func (t *timeWheel) Tick(){
nowTime := GetNow() nowTime := GetNow()
@@ -243,15 +285,15 @@ func (t *timeWheel) Tick(){
} }
} }
//Tick一帧
func (t *timeWheel) TickOneFrame(){ func (t *timeWheel) TickOneFrame(){
//1.往前走一个Tick //1.往前走一个Tick
t.currentTicks += 1 t.currentTicks += 1
//2.将当前slot全部时处理 //2.将当前slot全部时处理
slot := t.wheels[0].slots[t.wheels[0].slotIndex] slot := t.wheels[0].slots[t.wheels[0].slotIndex]
bEmpty := true
for currTimer := slot.timer.next;currTimer!=slot.timer;currTimer = currTimer.next { for currTimer := slot.timer.next;currTimer!=slot.timer;currTimer = currTimer.next {
bEmpty = false //如果当前定时器已经停止,不做任何处理.否则放入到定时器的channel
if currTimer.IsStop() == true { if currTimer.IsStop() == true {
continue continue
} }
@@ -260,49 +302,36 @@ func (t *timeWheel) TickOneFrame(){
} }
} }
//重新构建 //3.将timer全部清空处理
if bEmpty == false { t.wheels[0].slots[t.wheels[0].slotIndex].makeEmpty()
t.wheels[0].slots[t.wheels[0].slotIndex] = t.newSlot(slot)
}
//3.指针转动 //4.指针转动
t.wheels[0].slotIndex+=1 t.wheels[0].slotIndex+=1
//5.如果当前刻度转完一轮时,从0表示并处理下一个轮子的计算
if t.wheels[0].slotIndex >= t.wheels[0].slotSize() { if t.wheels[0].slotIndex >= t.wheels[0].slotSize() {
t.wheels[0].slotIndex = 0 t.wheels[0].slotIndex = 0
t.cascade(1) t.cascade(1)
} }
} }
func (t *timeWheel) getCurrentSlot(wheelIndex int) *Slots{ //获得当前轮子的slots
func (t *timeWheel) getCurrentSlot(wheelIndex int) *slots{
return t.wheels[wheelIndex].slots[t.wheels[wheelIndex].slotIndex] return t.wheels[wheelIndex].slots[t.wheels[wheelIndex].slotIndex]
} }
//获取当前轮wheelIndex转动所需要的ticks数量
func (t *timeWheel) getWheelSum(wheelIndex int) int64{ func (t *timeWheel) getWheelSum(wheelIndex int) int64{
var sum int64 var ticks int64
//遍历前面n个轮子
for i:=0;i<wheelIndex;i++{ for i:=0;i<wheelIndex;i++{
sum += t.getCurrentSlot(i).restTicks ticks += t.getCurrentSlot(i).restTicks
}
/*
if sum != t.getWheelSumEx(wheelIndex) {
sum = 0
}
*/
return sum
}
/*
func (t *timeWheel) getWheelSumEx(wheelIndex int) int{
sum := 0
for i:=0;i<wheelIndex;i++{
if i == 0 {
sum += t.wheels[i].slotSize() - t.wheels[i].slotIndex
}else{
sum += (t.wheels[i].slotSize() - (t.wheels[i].slotIndex+1))*t.wheelInfos[i-1].threshold
}
} }
return sum return ticks
} }
*/
//转动下一个轮子(即wheelIndex>0的轮子)
func (t *timeWheel) cascade(wheelIndex int) { func (t *timeWheel) cascade(wheelIndex int) {
if wheelIndex<1 || wheelIndex>=t.wheelSize { if wheelIndex<1 || wheelIndex>=t.wheelSize {
return return
@@ -314,10 +343,10 @@ func (t *timeWheel) cascade(wheelIndex int) {
//2.将当前的slot遍历并重新加入 //2.将当前的slot遍历并重新加入
currentTimer := slot.timer.next currentTimer := slot.timer.next
bEmpty := true
for ;currentTimer!=slot.timer; { for ;currentTimer!=slot.timer; {
//先保存一个定时器指针,预防链表迭代失效问题
nextTimer:=currentTimer.next nextTimer:=currentTimer.next
//如果到时 //如果到时,直接送到channel
if currentTimer.expireTicks<= t.currentTicks { if currentTimer.expireTicks<= t.currentTicks {
if currentTimer.IsStop() == false { if currentTimer.IsStop() == false {
select { select {
@@ -328,15 +357,11 @@ func (t *timeWheel) cascade(wheelIndex int) {
t.addTimer(currentTimer) t.addTimer(currentTimer)
} }
currentTimer = nextTimer currentTimer = nextTimer
bEmpty = false
} }
//3.将当前轮清空
wheel.slots[wheel.slotIndex].makeEmpty()
if bEmpty == false { //4.如果当前轮子跳过一轮,需要跳动到下一时间轮
wheel.slots[wheel.slotIndex] = t.newSlot(wheel.slots[wheel.slotIndex])
}
//如果轮子到了最大值,需要跳轮
wheel.slotIndex++ wheel.slotIndex++
if wheel.slotIndex>=wheel.slotSize() { if wheel.slotIndex>=wheel.slotSize() {
wheel.slotIndex = 0 wheel.slotIndex = 0

View File

@@ -1,74 +1,31 @@
package timewheel package timewheel
import ( import (
"fmt"
"testing" "testing"
"time" "time"
"fmt"
) )
var timerCount int
var mapId map[int] interface{}
func Test_Example(t *testing.T) { func Test_Example(t *testing.T) {
now := time.Now() timer:=NewTimer(time.Second*2)
timer := NewTimer(time.Millisecond*20)
select { select {
case <-timer.C: case <- timer.C:
fmt.Print("xxx:") fmt.Println("It is time out!")
}
fmt.Println(time.Now().Sub(now).Milliseconds())
/*
rand.Seed(time.Now().UnixNano())
timeWheel := NewTimeWheel()
mapId = map[int] interface{}{}
time.Sleep(time.Duration(rand.Intn(100))*time.Millisecond)
timeWheel.Tick()
time.AfterFunc()
for i:=100000000;i<200000000;i++{
r := rand.Intn(100)
timeWheel.AddTimer(i+r,func(){
fmt.Print("+\n")
})
time.NewTicker()
time.AfterFunc()
timerCount+=1
} }
fmt.Println("add finish..") timer2 := NewTimerEx(time.Second*2,nil,1)
select {
go func(){ case t:=<- timer2.C:
for{ fmt.Println("It is time out!",t.AdditionData.(int))
select { }
case t:=<-timeWheel.chanTimer:
timer3 := NewTimerEx(time.Second*2,nil,1)
timerCount-- timer3.Stop()
if timerCount == 0 { time.Sleep(3*time.Second)
fmt.Printf("finish...\n") select {
} case t:=<- timer2.C:
if t.tmp-t.expireTicks >1 { fmt.Println("It is time out!",t.AdditionData.(int))
fmt.Printf("err:%d:%d\n",t.expireTicks,t.tmp-t.expireTicks) default:
}else{ fmt.Printf("time is stop")
if t.expireTicks%100000 == 0 {
fmt.Printf("%d:%d:%d\n",t.expireTicks,t.tmp-t.expireTicks,t.tmpMilSeconds)
}
//t.timerCB()
}
}
}
}()
for{
timeWheel.TickOneFrame()
//time.Sleep(1*time.Microsecond)
//fmt.Println(".")
} }
*/
} }