From 18281e2b096b780ad833cb240ff5bf06677f3573 Mon Sep 17 00:00:00 2001 From: boyce Date: Fri, 23 Oct 2020 21:03:25 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E6=97=B6=E9=97=B4=E8=BD=AE?= =?UTF-8?q?=E5=AE=9A=E6=97=B6=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- util/timewheel/timewheel.go | 345 +++++++++++++++++++++++++++++++ util/timewheel/timewheel_test.go | 74 +++++++ 2 files changed, 419 insertions(+) create mode 100644 util/timewheel/timewheel.go create mode 100644 util/timewheel/timewheel_test.go diff --git a/util/timewheel/timewheel.go b/util/timewheel/timewheel.go new file mode 100644 index 0000000..05b1b28 --- /dev/null +++ b/util/timewheel/timewheel.go @@ -0,0 +1,345 @@ +package timewheel + +import ( + "sync/atomic" + "time" +) + +//分别用位代表每个轮存在的轮子数(2^6,2^6,...2^8) +//-------------------------- +//| 6 | 6 | 6 | 6 | 8 | +//-------------------------- + +var GRANULARITY int64 = 10//10ms +var OnTimerChannelSize int = 1000 +var wheelBitSize =[]int{8,6,6,6,6} //32bit定时器 + +type wheelInfo struct { + slotNum int //slot数量 + threshold int64 // + //mask int + //bits int +} + +//type OnTimerCB func() +var tWheel *timeWheel +var chanStartTimer chan *Timer +var chanStopTimer chan *Timer +const chanTimerLen int = 40960 + +func init(){ + tWheel = newTimeWheel() + chanStartTimer = make(chan *Timer,chanTimerLen) + chanStopTimer = make(chan *Timer,chanTimerLen) + + go timerRunning() +} + +func timerRunning(){ + t := time.NewTicker(time.Microsecond*5) + for { + select{ + case startTimer:=<-chanStartTimer: + tWheel.addTimer(startTimer) + case stopTimer:=<-chanStopTimer: + tWheel.delTimer(stopTimer) + case <-t.C: + tWheel.Tick() + } + } +} + +func NewCBTimer(d time.Duration,f func(),c chan *Timer) *Timer{ + if c == nil { + c = make(chan *Timer, 1) + } + timer := tWheel.newTimer(d.Milliseconds(),f,c) + chanStartTimer<-timer + return timer +} + +func NewTimer(d time.Duration) *Timer{ + timer := tWheel.newTimer(d.Milliseconds()/GRANULARITY,nil,make(chan *Timer, 1)) + chanStartTimer<-timer + return timer +} + + +type stNode struct { + prev *Timer + next *Timer +} + +type Timer struct { + stNode + timerCB func() + expireTicks int64 //到期滴答数 + isClose int32 + C chan *Timer +} + +func (timer *Timer) Stop(){ + atomic.StoreInt32(&timer.isClose,1) + chanStopTimer<-timer +} + +func (timer *Timer) IsStop() bool { + return atomic.LoadInt32(&timer.isClose) != 0 +} + +type Slots struct { + timer *Timer + restTicks int64 +} + +type stWheel struct { + slots []*Slots + slotIndex int +} + +func (stw *stWheel) slotSize() int{ + return len(stw.slots) +} + +func (s *Slots) addTimer(timer *Timer){ + timer.next = s.timer + timer.prev = s.timer.prev + s.timer.prev.next = timer + s.timer.prev = timer +} + +func (s *Slots) isEmpty() bool{ + return s.timer == s.timer.next +} + +type timeWheel struct { + wheels []*stWheel + wheelInfos []*wheelInfo + wheelSize int + //chanTimer chan *Timer + + currentTime int64 // + currentTicks int64 //当前检查的帧数 +} + +func newTimeWheel() *timeWheel{ + tWheel := &timeWheel{} + tWheel.Set(wheelBitSize) + //tWheel.chanTimer = make(chan *Timer,OnTimerChannelSize) + tWheel.currentTime = GetNow() + return tWheel +} + + +func (t *timeWheel) Set(wheelBitSize []int){ + t.wheelSize = len(wheelBitSize) + t.wheelInfos = make([]*wheelInfo,len(wheelBitSize)) + t.wheels = make([]*stWheel,len(wheelBitSize)) + totalBitSize := 0 + for idx,bitSize := range wheelBitSize { + totalBitSize += bitSize + //1.轮子信息 + t.wheelInfos[idx] = &wheelInfo{} + t.wheelInfos[idx].slotNum = 1 << bitSize + t.wheelInfos[idx].threshold = 1<< totalBitSize + + //2.make轮子里面的slot + t.wheels[idx] = &stWheel{} + t.wheels[idx].slots = make([]*Slots,t.wheelInfos[idx].slotNum) + for slotIdx,_ := range t.wheels[idx].slots { + t.wheels[idx].slots[slotIdx] = t.newSlot(t.wheels[idx].slots[slotIdx]) + var perSlotTicks int64 = 1 + turns := 0 + if idx>0 { + perSlotTicks = t.wheelInfos[idx-1].threshold + turns = 1 + } + s := ((1 << bitSize) - (slotIdx+turns))*int(perSlotTicks) + t.wheels[idx].slots[slotIdx].restTicks = int64(s) + } + } +} + +func (t *timeWheel) newSlot(slots *Slots) *Slots{ + if slots == nil { + slots = &Slots{} + timer := &Timer{} + slots.timer = timer + } + + slots.timer.next = slots.timer + slots.timer.prev = slots.timer + + return slots +} + +func GetNow() int64 { + return time.Now().UnixNano()/int64(time.Millisecond) +} + + +func (t *timeWheel) newTimer(ticks int64,f func(),c chan *Timer) *Timer{ + return &Timer{timerCB: f,expireTicks:ticks+t.currentTicks,C:c} +} + + +func ReleaseTimer(timer *Timer) { +} +/* +func (t *timeWheel) AddTimer(milSeconds int,onTimer OnTimerCB) *Timer { + ticks := milSeconds / GRANULARITY + timer := t.newTimer(milSeconds,ticks,onTimer) + return t.addTimer(timer) +} +*/ + +func (t *timeWheel) addTimer(timer *Timer) *Timer { + ticks := timer.expireTicks - t.currentTicks + var slot *Slots + for wheelIndex,info := range t.wheelInfos { + if ticks < info.threshold { + var subValue int64 + var offSet int64 + index := 0 + if wheelIndex != 0 { + subValue = t.getWheelSum(wheelIndex)//t.wheelInfos[wheelIndex-1].threshold*t.wheels[wheelIndex].slotIndex + offSet = t.wheelInfos[wheelIndex-1].threshold + index = (t.wheels[wheelIndex].slotIndex+int((ticks-subValue)/offSet)) % t.wheelInfos[wheelIndex].slotNum///(ticks - subValue + offSet)>>bits&info.mask + }else{ + index = (t.wheels[wheelIndex].slotIndex + int(ticks))%t.wheelInfos[wheelIndex].slotNum + } + + slot = t.wheels[wheelIndex].slots[index] + break + } + } + + //插入到指定位置 + if slot == nil { + return nil + } + + slot.addTimer(timer) + return timer +} + +func (t *timeWheel) delTimer(timer *Timer) { + timer.prev.next = timer.next + timer.next.prev = timer.prev + //ReleaseTimer(timer) +} + + +func (t *timeWheel) Tick(){ + nowTime := GetNow() + + loop := (nowTime - t.currentTime)/int64(GRANULARITY) + if loop> 0 { + t.currentTime = nowTime + } + + for i:=int64(0);i= t.wheels[0].slotSize() { + t.wheels[0].slotIndex = 0 + t.cascade(1) + } +} + +func (t *timeWheel) getCurrentSlot(wheelIndex int) *Slots{ + return t.wheels[wheelIndex].slots[t.wheels[wheelIndex].slotIndex] +} + +func (t *timeWheel) getWheelSum(wheelIndex int) int64{ + var sum int64 + for i:=0;i=t.wheelSize { + return + } + + //1.取得对应的轮子上的slot + wheel := t.wheels[wheelIndex] + slot := wheel.slots[wheel.slotIndex] + + //2.将当前的slot遍历并重新加入 + currentTimer := slot.timer.next + bEmpty := true + for ;currentTimer!=slot.timer; { + nextTimer:=currentTimer.next + //如果到时 + if currentTimer.expireTicks<= t.currentTicks { + if currentTimer.IsStop() == false { + select { + case currentTimer.C<-currentTimer: + } + } + }else{//否则重新添加,会加到下一级轮中 + t.addTimer(currentTimer) + } + currentTimer = nextTimer + bEmpty = false + } + + if bEmpty == false { + wheel.slots[wheel.slotIndex] = t.newSlot(wheel.slots[wheel.slotIndex]) + } + + + //如果轮子到了最大值,需要跳轮 + wheel.slotIndex++ + if wheel.slotIndex>=wheel.slotSize() { + wheel.slotIndex = 0 + t.cascade(wheelIndex+1) + } +} diff --git a/util/timewheel/timewheel_test.go b/util/timewheel/timewheel_test.go new file mode 100644 index 0000000..82c64d8 --- /dev/null +++ b/util/timewheel/timewheel_test.go @@ -0,0 +1,74 @@ +package timewheel + +import ( + "fmt" + "testing" + "time" +) + +var timerCount int + +var mapId map[int] interface{} + +func Test_Example(t *testing.T) { + now := time.Now() + timer := NewTimer(time.Millisecond*20) + select { + case <-timer.C: + fmt.Print("xxx:") + } + fmt.Println(time.Now().Sub(now).Milliseconds()) + /* + rand.Seed(time.Now().UnixNano()) + timeWheel := NewTimeWheel() + mapId = map[int] interface{}{} + + time.Sleep(time.Duration(rand.Intn(100))*time.Millisecond) + timeWheel.Tick() + time.AfterFunc() + + for i:=100000000;i<200000000;i++{ + r := rand.Intn(100) + timeWheel.AddTimer(i+r,func(){ + fmt.Print("+\n") + }) + + time.NewTicker() + time.AfterFunc() + + timerCount+=1 + } + + fmt.Println("add finish..") + + go func(){ + for{ + select { + case t:=<-timeWheel.chanTimer: + + timerCount-- + if timerCount == 0 { + fmt.Printf("finish...\n") + } + if t.tmp-t.expireTicks >1 { + fmt.Printf("err:%d:%d\n",t.expireTicks,t.tmp-t.expireTicks) + }else{ + + if t.expireTicks%100000 == 0 { + fmt.Printf("%d:%d:%d\n",t.expireTicks,t.tmp-t.expireTicks,t.tmpMilSeconds) + } + + //t.timerCB() + } + } + } + }() + + for{ + + timeWheel.TickOneFrame() + //time.Sleep(1*time.Microsecond) + //fmt.Println(".") + } +*/ +}