修复cron定时器触发问题

This commit is contained in:
orgin
2022-03-09 11:24:46 +08:00
parent 93be70877b
commit 484e505ea4

View File

@@ -13,7 +13,7 @@ import (
type ITimer interface { type ITimer interface {
GetId() uint64 GetId() uint64
Cancel() Cancel()
GetName()string GetName() string
IsActive() bool IsActive() bool
IsOpen() bool IsOpen() bool
Open(bOpen bool) Open(bOpen bool)
@@ -28,21 +28,21 @@ type OnAddTimer func(timer ITimer)
// Timer // Timer
type Timer struct { type Timer struct {
Id uint64 Id uint64
cancelled bool //是否关闭 cancelled bool //是否关闭
C chan ITimer //定时器管道 C chan ITimer //定时器管道
interval time.Duration // 时间间隔(用于循环定时器) interval time.Duration // 时间间隔(用于循环定时器)
fireTime time.Time // 触发时间 fireTime time.Time // 触发时间
cb func(uint64,interface{}) cb func(uint64, interface{})
cbEx func(t *Timer) cbEx func(t *Timer)
cbCronEx func(t *Cron) cbCronEx func(t *Cron)
cbTickerEx func(t *Ticker) cbTickerEx func(t *Ticker)
cbOnCloseTimer OnCloseTimer cbOnCloseTimer OnCloseTimer
cronExpr *CronExpr cronExpr *CronExpr
AdditionData interface{} //定时器附加数据 AdditionData interface{} //定时器附加数据
rOpen bool //是否重新打开 rOpen bool //是否重新打开
ref bool ref bool
} }
// Ticker // Ticker
@@ -55,19 +55,19 @@ type Cron struct {
Timer Timer
} }
var timerPool = sync.NewPoolEx(make(chan sync.IPoolData,102400),func() sync.IPoolData{ var timerPool = sync.NewPoolEx(make(chan sync.IPoolData, 102400), func() sync.IPoolData {
return &Timer{} return &Timer{}
}) })
var cronPool = sync.NewPoolEx(make(chan sync.IPoolData,10240),func() sync.IPoolData{ var cronPool = sync.NewPoolEx(make(chan sync.IPoolData, 10240), func() sync.IPoolData {
return &Cron{} return &Cron{}
}) })
var tickerPool =sync.NewPoolEx(make(chan sync.IPoolData,102400),func() sync.IPoolData{ var tickerPool = sync.NewPoolEx(make(chan sync.IPoolData, 102400), func() sync.IPoolData {
return &Ticker{} return &Ticker{}
}) })
func newTimer(d time.Duration,c chan ITimer,cb func(uint64,interface{}),additionData interface{}) *Timer{ func newTimer(d time.Duration, c chan ITimer, cb func(uint64, interface{}), additionData interface{}) *Timer {
timer := timerPool.Get().(*Timer) timer := timerPool.Get().(*Timer)
timer.AdditionData = additionData timer.AdditionData = additionData
timer.C = c timer.C = c
@@ -105,38 +105,38 @@ type Dispatcher struct {
ChanTimer chan ITimer ChanTimer chan ITimer
} }
func (t *Timer) GetId() uint64{ func (t *Timer) GetId() uint64 {
return t.Id return t.Id
} }
func (t *Timer) GetFireTime() time.Time{ func (t *Timer) GetFireTime() time.Time {
return t.fireTime return t.fireTime
} }
func (t *Timer) Open(bOpen bool){ func (t *Timer) Open(bOpen bool) {
t.rOpen = bOpen t.rOpen = bOpen
} }
func (t *Timer) AppendChannel(timer ITimer){ func (t *Timer) AppendChannel(timer ITimer) {
t.C <- timer t.C <- timer
} }
func (t *Timer) IsOpen() bool{ func (t *Timer) IsOpen() bool {
return t.rOpen return t.rOpen
} }
func (t *Timer) Do(){ func (t *Timer) Do() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
buf := make([]byte, 4096) buf := make([]byte, 4096)
l := runtime.Stack(buf, false) l := runtime.Stack(buf, false)
errString := fmt.Sprint(r) errString := fmt.Sprint(r)
log.SError("core dump info[",errString,"]\n",string(buf[:l])) log.SError("core dump info[", errString, "]\n", string(buf[:l]))
} }
}() }()
if t.IsActive() == false { if t.IsActive() == false {
if t.cbOnCloseTimer!=nil { if t.cbOnCloseTimer != nil {
t.cbOnCloseTimer(t) t.cbOnCloseTimer(t)
} }
@@ -145,20 +145,20 @@ func (t *Timer) Do(){
} }
if t.cb != nil { if t.cb != nil {
t.cb(t.Id,t.AdditionData) t.cb(t.Id, t.AdditionData)
}else if t.cbEx != nil { } else if t.cbEx != nil {
t.cbEx(t) t.cbEx(t)
} }
if t.rOpen ==false { if t.rOpen == false {
if t.cbOnCloseTimer!=nil { if t.cbOnCloseTimer != nil {
t.cbOnCloseTimer(t) t.cbOnCloseTimer(t)
} }
releaseTimer(t) releaseTimer(t)
} }
} }
func (t *Timer) SetupTimer(now time.Time) error{ func (t *Timer) SetupTimer(now time.Time) error {
t.fireTime = now.Add(t.interval) t.fireTime = now.Add(t.interval)
if SetupTimer(t) == nil { if SetupTimer(t) == nil {
return fmt.Errorf("failed to install timer") return fmt.Errorf("failed to install timer")
@@ -166,7 +166,7 @@ func (t *Timer) SetupTimer(now time.Time) error{
return nil return nil
} }
func (t *Timer) GetInterval() time.Duration{ func (t *Timer) GetInterval() time.Duration {
return t.interval return t.interval
} }
@@ -179,10 +179,10 @@ func (t *Timer) IsActive() bool {
return !t.cancelled return !t.cancelled
} }
func (t *Timer) GetName() string{ func (t *Timer) GetName() string {
if t.cb!=nil { if t.cb != nil {
return runtime.FuncForPC(reflect.ValueOf(t.cb).Pointer()).Name() return runtime.FuncForPC(reflect.ValueOf(t.cb).Pointer()).Name()
}else if t.cbEx!=nil { } else if t.cbEx != nil {
return runtime.FuncForPC(reflect.ValueOf(t.cbEx).Pointer()).Name() return runtime.FuncForPC(reflect.ValueOf(t.cbEx).Pointer()).Name()
} }
@@ -190,23 +190,24 @@ func (t *Timer) GetName() string{
} }
var emptyTimer Timer var emptyTimer Timer
func (t *Timer) Reset(){
func (t *Timer) Reset() {
*t = emptyTimer *t = emptyTimer
} }
func (t *Timer) IsRef()bool{ func (t *Timer) IsRef() bool {
return t.ref return t.ref
} }
func (t *Timer) Ref(){ func (t *Timer) Ref() {
t.ref = true t.ref = true
} }
func (t *Timer) UnRef(){ func (t *Timer) UnRef() {
t.ref = false t.ref = false
} }
func (c *Cron) Reset(){ func (c *Cron) Reset() {
c.Timer.Reset() c.Timer.Reset()
} }
@@ -216,11 +217,11 @@ func (c *Cron) Do() {
buf := make([]byte, 4096) buf := make([]byte, 4096)
l := runtime.Stack(buf, false) l := runtime.Stack(buf, false)
errString := fmt.Sprint(r) errString := fmt.Sprint(r)
log.SError("core dump info[",errString,"]\n",string(buf[:l])) log.SError("core dump info[", errString, "]\n", string(buf[:l]))
} }
}() }()
if c.IsActive() == false{ if c.IsActive() == false {
if c.cbOnCloseTimer != nil { if c.cbOnCloseTimer != nil {
c.cbOnCloseTimer(c) c.cbOnCloseTimer(c)
} }
@@ -235,18 +236,18 @@ func (c *Cron) Do() {
return return
} }
if c.cb!=nil { if c.cb != nil {
c.cb(c.Id,c.AdditionData) c.cb(c.Id, c.AdditionData)
}else if c.cbEx !=nil { } else if c.cbCronEx != nil {
c.cbCronEx(c) c.cbCronEx(c)
} }
if c.IsActive() == true{ if c.IsActive() == true {
c.interval = nextTime.Sub(now) c.interval = nextTime.Sub(now)
c.fireTime = now.Add(c.interval) c.fireTime = now.Add(c.interval)
SetupTimer(c) SetupTimer(c)
}else{ } else {
if c.cbOnCloseTimer!=nil { if c.cbOnCloseTimer != nil {
c.cbOnCloseTimer(c) c.cbOnCloseTimer(c)
} }
releaseCron(c) releaseCron(c)
@@ -254,15 +255,15 @@ func (c *Cron) Do() {
} }
} }
func (c *Cron) IsRef()bool{ func (c *Cron) IsRef() bool {
return c.ref return c.ref
} }
func (c *Cron) Ref(){ func (c *Cron) Ref() {
c.ref = true c.ref = true
} }
func (c *Cron) UnRef(){ func (c *Cron) UnRef() {
c.ref = false c.ref = false
} }
@@ -277,7 +278,7 @@ func (c *Ticker) Do() {
}() }()
if c.IsActive() == false { if c.IsActive() == false {
if c.cbOnCloseTimer!=nil { if c.cbOnCloseTimer != nil {
c.cbOnCloseTimer(c) c.cbOnCloseTimer(c)
} }
@@ -285,36 +286,36 @@ func (c *Ticker) Do() {
return return
} }
if c.cb!=nil{ if c.cb != nil {
c.cb(c.Id,c.AdditionData) c.cb(c.Id, c.AdditionData)
} else if c.cbTickerEx != nil{ } else if c.cbTickerEx != nil {
c.cbTickerEx(c) c.cbTickerEx(c)
} }
if c.IsActive() == true{ if c.IsActive() == true {
c.fireTime = Now().Add(c.interval) c.fireTime = Now().Add(c.interval)
SetupTimer(c) SetupTimer(c)
}else{ } else {
if c.cbOnCloseTimer!=nil { if c.cbOnCloseTimer != nil {
c.cbOnCloseTimer(c) c.cbOnCloseTimer(c)
} }
releaseTicker(c) releaseTicker(c)
} }
} }
func (c *Ticker) Reset(){ func (c *Ticker) Reset() {
c.Timer.Reset() c.Timer.Reset()
} }
func (c *Ticker) IsRef()bool{ func (c *Ticker) IsRef() bool {
return c.ref return c.ref
} }
func (c *Ticker) Ref(){ func (c *Ticker) Ref() {
c.ref = true c.ref = true
} }
func (c *Ticker) UnRef(){ func (c *Ticker) UnRef() {
c.ref = false c.ref = false
} }
@@ -324,21 +325,21 @@ func NewDispatcher(l int) *Dispatcher {
return dispatcher return dispatcher
} }
func (dispatcher *Dispatcher) AfterFunc(d time.Duration, cb func(uint64,interface{}),cbEx func(*Timer),onTimerClose OnCloseTimer,onAddTimer OnAddTimer) *Timer { func (dispatcher *Dispatcher) AfterFunc(d time.Duration, cb func(uint64, interface{}), cbEx func(*Timer), onTimerClose OnCloseTimer, onAddTimer OnAddTimer) *Timer {
timer := newTimer(d,dispatcher.ChanTimer,nil,nil) timer := newTimer(d, dispatcher.ChanTimer, nil, nil)
timer.cb = cb timer.cb = cb
timer.cbEx = cbEx timer.cbEx = cbEx
timer.cbOnCloseTimer = onTimerClose timer.cbOnCloseTimer = onTimerClose
t := SetupTimer(timer) t := SetupTimer(timer)
if onAddTimer!= nil && t!=nil { if onAddTimer != nil && t != nil {
onAddTimer(t) onAddTimer(t)
} }
return timer return timer
} }
func (dispatcher *Dispatcher) CronFunc(cronExpr *CronExpr,cb func(uint64,interface{}), cbEx func(*Cron),onTimerClose OnCloseTimer,onAddTimer OnAddTimer) *Cron { func (dispatcher *Dispatcher) CronFunc(cronExpr *CronExpr, cb func(uint64, interface{}), cbEx func(*Cron), onTimerClose OnCloseTimer, onAddTimer OnAddTimer) *Cron {
now := Now() now := Now()
nextTime := cronExpr.Next(now) nextTime := cronExpr.Next(now)
if nextTime.IsZero() { if nextTime.IsZero() {
@@ -352,13 +353,13 @@ func (dispatcher *Dispatcher) CronFunc(cronExpr *CronExpr,cb func(uint64,interfa
cron.cronExpr = cronExpr cron.cronExpr = cronExpr
cron.C = dispatcher.ChanTimer cron.C = dispatcher.ChanTimer
cron.interval = nextTime.Sub(now) cron.interval = nextTime.Sub(now)
cron.fireTime = Now().Add(cron.interval) cron.fireTime = nextTime
SetupTimer(cron) SetupTimer(cron)
onAddTimer(cron) onAddTimer(cron)
return cron return cron
} }
func (dispatcher *Dispatcher) TickerFunc(d time.Duration,cb func(uint64,interface{}), cbEx func(*Ticker),onTimerClose OnCloseTimer,onAddTimer OnAddTimer) *Ticker { func (dispatcher *Dispatcher) TickerFunc(d time.Duration, cb func(uint64, interface{}), cbEx func(*Ticker), onTimerClose OnCloseTimer, onAddTimer OnAddTimer) *Ticker {
ticker := newTicker() ticker := newTicker()
ticker.C = dispatcher.ChanTimer ticker.C = dispatcher.ChanTimer
ticker.fireTime = Now().Add(d) ticker.fireTime = Now().Add(d)