新增Ticker定時器

This commit is contained in:
boyce
2020-11-01 00:21:07 +08:00
parent d2f52b382d
commit 59c9d20071
4 changed files with 197 additions and 114 deletions

View File

@@ -5,8 +5,8 @@ import (
"github.com/duanhf2012/origin/event" "github.com/duanhf2012/origin/event"
"github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/util/timer" "github.com/duanhf2012/origin/util/timer"
"github.com/duanhf2012/origin/util/timewheel"
"reflect" "reflect"
"runtime"
"time" "time"
) )
const InitModuleId = 1e17 const InitModuleId = 1e17
@@ -37,8 +37,7 @@ type Module struct {
parent IModule //父亲 parent IModule //父亲
self IModule //自己 self IModule //自己
child map[int64]IModule //孩子们 child map[int64]IModule //孩子们
mapActiveTimer map[*timer.Timer]interface{} mapActiveTimer map[*timewheel.Timer]interface{}
mapActiveCron map[*timer.Cron]interface{}
dispatcher *timer.Dispatcher //timer dispatcher *timer.Dispatcher //timer
//根结点 //根结点
@@ -120,11 +119,7 @@ func (m *Module) ReleaseModule(moduleId int64){
pModule.self.OnRelease() pModule.self.OnRelease()
log.Debug("Release module %s.", m.GetModuleName()) log.Debug("Release module %s.", m.GetModuleName())
for pTimer,_ := range pModule.mapActiveTimer { for pTimer,_ := range pModule.mapActiveTimer {
pTimer.Close() pTimer.AdditionData.(timer.ITime).Close()
}
for pCron,_ := range pModule.mapActiveCron {
pCron.Close()
} }
delete(m.child,moduleId) delete(m.child,moduleId)
@@ -135,7 +130,6 @@ func (m *Module) ReleaseModule(moduleId int64){
pModule.parent = nil pModule.parent = nil
pModule.child = nil pModule.child = nil
pModule.mapActiveTimer = nil pModule.mapActiveTimer = nil
pModule.mapActiveCron = nil
pModule.dispatcher = nil pModule.dispatcher = nil
pModule.ancestor = nil pModule.ancestor = nil
pModule.descendants = nil pModule.descendants = nil
@@ -152,7 +146,7 @@ func (m *Module) GetAncestor()IModule{
func (m *Module) GetModule(moduleId int64) IModule{ func (m *Module) GetModule(moduleId int64) IModule{
iModule,ok := m.GetAncestor().getBaseModule().(*Module).descendants[moduleId] iModule,ok := m.GetAncestor().getBaseModule().(*Module).descendants[moduleId]
if ok == false{ if ok == false {
return nil return nil
} }
return iModule return iModule
@@ -166,32 +160,38 @@ func (m *Module) GetParent()IModule{
return m.parent return m.parent
} }
func (m *Module) OnCloseTimer(timer *timewheel.Timer){
delete(m.mapActiveTimer,timer)
}
func (m *Module) OnAddTimer(t *timewheel.Timer){
if t != nil {
m.mapActiveTimer[t] = nil
}
}
func (m *Module) AfterFunc(d time.Duration, cb func()) *timer.Timer { func (m *Module) AfterFunc(d time.Duration, cb func()) *timer.Timer {
if m.mapActiveTimer == nil { if m.mapActiveTimer == nil {
m.mapActiveTimer =map[*timer.Timer]interface{}{} m.mapActiveTimer =map[*timewheel.Timer]interface{}{}
} }
funName := runtime.FuncForPC(reflect.ValueOf(cb).Pointer()).Name() return m.dispatcher.AfterFunc(d,cb,m.OnCloseTimer,m.OnAddTimer)
tm := m.dispatcher.AfterFuncEx(funName,d,func(t *timer.Timer){
cb()
delete(m.mapActiveTimer,t)
})
m.mapActiveTimer[tm] = nil
return tm
} }
func (m *Module) CronFunc(cronExpr *timer.CronExpr, cb func()) *timer.Cron { func (m *Module) CronFunc(cronExpr *timer.CronExpr, cb func()) *timer.Cron {
if m.mapActiveCron == nil { if m.mapActiveTimer == nil {
m.mapActiveCron =map[*timer.Cron]interface{}{} m.mapActiveTimer =map[*timewheel.Timer]interface{}{}
} }
cron := m.dispatcher.CronFuncEx(cronExpr, func(cron *timer.Cron) { return m.dispatcher.CronFunc(cronExpr,cb,m.OnCloseTimer,m.OnAddTimer)
cb() }
})
m.mapActiveCron[cron] = nil func (m *Module) NewTicker(d time.Duration, cb func()) *timer.Ticker {
return cron if m.mapActiveTimer == nil {
m.mapActiveTimer =map[*timewheel.Timer]interface{}{}
}
return m.dispatcher.TickerFunc(d,cb,m.OnCloseTimer,m.OnAddTimer)
} }
func (m *Module) OnRelease(){ func (m *Module) OnRelease(){

View File

@@ -109,7 +109,7 @@ func (s *Service) Run() {
bStop = true bStop = true
case rpcRequest :=<- rpcRequestChan: case rpcRequest :=<- rpcRequestChan:
if s.profiler!=nil { if s.profiler!=nil {
analyzer = s.profiler.Push("Req_"+rpcRequest.RpcRequestData.GetServiceMethod()) analyzer = s.profiler.Push("[Req]"+rpcRequest.RpcRequestData.GetServiceMethod())
} }
s.GetRpcHandler().HandlerRpcRequest(rpcRequest) s.GetRpcHandler().HandlerRpcRequest(rpcRequest)
@@ -119,7 +119,7 @@ func (s *Service) Run() {
} }
case rpcResponseCB := <-rpcResponseCallBack: case rpcResponseCB := <-rpcResponseCallBack:
if s.profiler!=nil { if s.profiler!=nil {
analyzer = s.profiler.Push("Res_" + rpcResponseCB.ServiceMethod) analyzer = s.profiler.Push("[Res]" + rpcResponseCB.ServiceMethod)
} }
s.GetRpcHandler().HandlerRpcResponseCB(rpcResponseCB) s.GetRpcHandler().HandlerRpcResponseCB(rpcResponseCB)
if analyzer!=nil { if analyzer!=nil {
@@ -128,7 +128,7 @@ func (s *Service) Run() {
} }
case ev := <- eventChan: case ev := <- eventChan:
if s.profiler!=nil { if s.profiler!=nil {
analyzer = s.profiler.Push(fmt.Sprintf("Event_%d", int(ev.Type))) analyzer = s.profiler.Push(fmt.Sprintf("[Event]%d", int(ev.Type)))
} }
s.eventProcessor.EventHandler(ev) s.eventProcessor.EventHandler(ev)
if analyzer!=nil { if analyzer!=nil {
@@ -137,10 +137,11 @@ func (s *Service) Run() {
} }
case t := <- s.dispatcher.ChanTimer: case t := <- s.dispatcher.ChanTimer:
if t.IsClose() == false { if t.IsClose() == false {
time := t.AdditionData.(timer.ITime)
if s.profiler != nil { if s.profiler != nil {
analyzer = s.profiler.Push(fmt.Sprintf("Timer_%s", t.AdditionData.(*timer.Timer).GetFunctionName())) analyzer = s.profiler.Push("[timer]"+time.GetName())
} }
t.AdditionData.(*timer.Timer).Cb() time.Do()
if analyzer != nil { if analyzer != nil {
analyzer.Pop() analyzer.Pop()
analyzer = nil analyzer = nil

View File

@@ -1,14 +1,25 @@
package timer package timer
import ( import (
"fmt"
"github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/util/timewheel" "github.com/duanhf2012/origin/util/timewheel"
"reflect" "reflect"
"runtime" "runtime"
"sync"
"time" "time"
) )
var timerPool = sync.Pool{New: func() interface{}{
return &Timer{}
}}
var cronPool = sync.Pool{New: func() interface{}{
return &Cron{}
}}
var tickerPool = sync.Pool{New: func() interface{}{
return &Ticker{}
}}
// one dispatcher per goroutine (goroutine not safe) // one dispatcher per goroutine (goroutine not safe)
type Dispatcher struct { type Dispatcher struct {
ChanTimer chan *timewheel.Timer ChanTimer chan *timewheel.Timer
@@ -20,119 +31,187 @@ func NewDispatcher(l int) *Dispatcher {
return disp return disp
} }
type ITime interface {
Close ()
Do()
GetName() string
}
// Timer // Timer
type Timer struct { type Timer struct {
t *timewheel.Timer t *timewheel.Timer
cb func() cb func()
cbex func(*Timer)
name string name string
} onClose func(timer *timewheel.Timer)
func (t *Timer) Close() {
t.t.Close()
t.cb = nil
}
func (t *Timer) GetFunctionName() string {
return t.name
}
func (t *Timer) Cb() {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 40960)
l := runtime.Stack(buf, false)
err := fmt.Errorf("%v: %s", r, buf[:l])
log.Error("core dump info:%+v\n",err)
}
}()
if t.cbex!=nil {
t.cbex(t)
}else if t.cb!= nil {
t.cb()
}
}
func (disp *Dispatcher) AfterFunc(d time.Duration, cb func()) *Timer {
t := new(Timer)
t.cb = cb
t.name = reflect.TypeOf(cb).Name()
t.t = timewheel.NewTimerEx(d,disp.ChanTimer,t)
return t
}
func (disp *Dispatcher) AfterFuncEx(funName string,d time.Duration, cbex func(timer *Timer)) *Timer {
t := new(Timer)
t.cbex = cbex
t.name = funName
t.t = timewheel.NewTimerEx(d,disp.ChanTimer,t)
return t
} }
// Cron // Cron
type Cron struct { type Cron struct {
t *Timer Timer
}
// Ticker
type Ticker struct {
Timer
}
func NewTimer(t *timewheel.Timer,cb func(),name string,onClose func(timer *timewheel.Timer)) *Timer {
timer := timerPool.Get().(*Timer)
timer.t = t
timer.cb = cb
timer.onClose = onClose
timer.name = name
return timer
}
func ReleaseTimer(timer *Timer) {
timerPool.Put(timer)
}
func (t *Timer) Close(){
if t.t!=nil {
t.t.Close()
}
if t.onClose!=nil {
t.onClose(t.t)
}
ReleaseTimer(t)
}
func (t *Timer) Do(){
t.Close()
t.cb()
}
func (t *Timer) GetName() string{
return t.name
}
func NewCron(t *timewheel.Timer,cb func(),name string,onClose func(timer *timewheel.Timer)) *Cron {
cron := cronPool.Get().(*Cron)
cron.t = t
cron.cb = cb
cron.onClose = onClose
cron.name = name
return cron
}
func ReleaseCron(cron *Cron) {
cronPool.Put(cron)
}
func (c *Cron) Do(){
if c.onClose!=nil {
c.onClose(c.t)
}
c.cb()
} }
func (c *Cron) Close() { func (c *Cron) Close() {
if c.t != nil { if c.t != nil {
c.t.Close() c.t.Close()
} }
if c.onClose!=nil {
c.onClose(c.t)
}
ReleaseCron(c)
} }
func (disp *Dispatcher) CronFunc(cronExpr *CronExpr, _cb func()) *Cron { func NewTicker(t *timewheel.Timer,cb func(),name string,onClose func(timer *timewheel.Timer)) *Ticker {
c := new(Cron) ticker := tickerPool.Get().(*Ticker)
ticker.t = t
ticker.cb = cb
ticker.onClose = onClose
ticker.name = name
return ticker
}
func ReleaseTicker(ticker *Ticker) {
tickerPool.Put(ticker)
}
func (tk *Ticker) Do(){
//通知当前timer删除
if tk.onClose!=nil {
tk.onClose(tk.t)
}
tk.cb()
}
func (tk *Ticker) Close() {
if tk.t != nil {
tk.t.Close()
}
if tk.onClose!=nil {
tk.onClose(tk.t)
}
ReleaseTicker(tk)
}
func (disp *Dispatcher) AfterFunc(d time.Duration, cb func(),onCloseTimer func(timer *timewheel.Timer),onAddTimer func(timer *timewheel.Timer)) *Timer {
funName := runtime.FuncForPC(reflect.ValueOf(cb).Pointer()).Name()
t := NewTimer(nil,cb,funName,onCloseTimer)
t.t = timewheel.NewTimerEx(d,disp.ChanTimer,t)
onAddTimer(t.t)
return t
}
func (disp *Dispatcher) CronFunc(cronExpr *CronExpr, cb func(),onCloseTimer func(timer *timewheel.Timer),onAddTimer func(timer *timewheel.Timer)) *Cron {
now := time.Now() now := time.Now()
nextTime := cronExpr.Next(now) nextTime := cronExpr.Next(now)
if nextTime.IsZero() { if nextTime.IsZero() {
return c return nil
} }
funcName := runtime.FuncForPC(reflect.ValueOf(cb).Pointer()).Name()
cron := NewCron(nil,nil,funcName,onCloseTimer)
// callback // callback
var cb func() var cbFunc func()
cb = func() { cbFunc = func() {
defer _cb()
now := time.Now() now := time.Now()
nextTime := cronExpr.Next(now) nextTime := cronExpr.Next(now)
if nextTime.IsZero() { if nextTime.IsZero() {
cb()
return return
} }
c.t = disp.AfterFunc(nextTime.Sub(now), cb)
}
c.t = disp.AfterFunc(nextTime.Sub(now), cb) interval := nextTime.Sub(now)
return c minTimeInterval := time.Millisecond*time.Duration(timewheel.GRANULARITY)
if interval < minTimeInterval {
interval = minTimeInterval
}
cron.t = timewheel.NewTimerEx(interval,disp.ChanTimer,cron)
onAddTimer(cron.t)
cb()
}
cron.cb = cbFunc
cron.t = timewheel.NewTimerEx(nextTime.Sub(now),disp.ChanTimer,cron)
onAddTimer(cron.t)
return cron
} }
func (disp *Dispatcher) TickerFunc(d time.Duration, cb func(),onCloseTimer func(timer *timewheel.Timer),onAddTimer func(timer *timewheel.Timer)) *Ticker {
func (disp *Dispatcher) CronFuncEx(cronExpr *CronExpr, _cb func(*Cron)) *Cron { funcName := runtime.FuncForPC(reflect.ValueOf(cb).Pointer()).Name()
c := new(Cron) ticker := NewTicker(nil,nil,funcName,onCloseTimer)
now := time.Now()
nextTime := cronExpr.Next(now)
if nextTime.IsZero() {
return c
}
// callback // callback
var cb func() var cbFunc func()
cb = func() { cbFunc = func() {
defer _cb(c) ticker.t = timewheel.NewTimerEx(d,disp.ChanTimer,ticker)
onAddTimer(ticker.t)
now := time.Now() cb()
nextTime := cronExpr.Next(now)
if nextTime.IsZero() {
return
}
c.t = disp.AfterFunc(nextTime.Sub(now), cb)
} }
c.t = disp.AfterFunc(nextTime.Sub(now), cb) ticker.cb = cbFunc
return c ticker.t = timewheel.NewTimerEx(d,disp.ChanTimer,ticker)
} onAddTimer(ticker.t)
return ticker
}

View File

@@ -307,6 +307,9 @@ func (t *timeWheel) addTimer(timer *Timer) *Timer {
//删除定时器 //删除定时器
func (t *timeWheel) delTimer(timer *Timer) { func (t *timeWheel) delTimer(timer *Timer) {
if timer.next == nil {
return
}
timer.prev.next = timer.next timer.prev.next = timer.next
timer.next.prev = timer.prev timer.next.prev = timer.prev
ReleaseTimer(timer) ReleaseTimer(timer)