Files
origin/util/timer/timer.go
2023-08-15 15:46:38 +08:00

377 lines
6.9 KiB
Go

package timer
import (
"fmt"
"github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/util/sync"
"reflect"
"runtime"
"time"
"sync/atomic"
)
// ITimer
type ITimer interface {
GetId() uint64
Cancel()
GetName() string
IsActive() bool
IsOpen() bool
Open(bOpen bool)
AppendChannel(timer ITimer)
Do()
GetFireTime() time.Time
SetupTimer(now time.Time) error
}
type OnCloseTimer func(timer ITimer)
type OnAddTimer func(timer ITimer)
// Timer
type Timer struct {
Id uint64
cancelled int32 //是否关闭
C chan ITimer //定时器管道
interval time.Duration // 时间间隔(用于循环定时器)
fireTime time.Time // 触发时间
cb func(uint64, interface{})
cbEx func(t *Timer)
cbCronEx func(t *Cron)
cbTickerEx func(t *Ticker)
cbOnCloseTimer OnCloseTimer
cronExpr *CronExpr
AdditionData interface{} //定时器附加数据
rOpen bool //是否重新打开
ref bool
}
// Ticker
type Ticker struct {
Timer
}
// Cron
type Cron struct {
Timer
}
var timerPool = sync.NewPoolEx(make(chan sync.IPoolData, 102400), func() sync.IPoolData {
return &Timer{}
})
var cronPool = sync.NewPoolEx(make(chan sync.IPoolData, 10240), func() sync.IPoolData {
return &Cron{}
})
var tickerPool = sync.NewPoolEx(make(chan sync.IPoolData, 102400), func() sync.IPoolData {
return &Ticker{}
})
func newTimer(d time.Duration, c chan ITimer, cb func(uint64, interface{}), additionData interface{}) *Timer {
timer := timerPool.Get().(*Timer)
timer.AdditionData = additionData
timer.C = c
timer.fireTime = Now().Add(d)
timer.cb = cb
timer.interval = d
timer.rOpen = false
return timer
}
func releaseTimer(timer *Timer) {
timerPool.Put(timer)
}
func newTicker() *Ticker {
ticker := tickerPool.Get().(*Ticker)
return ticker
}
func releaseTicker(ticker *Ticker) {
tickerPool.Put(ticker)
}
func newCron() *Cron {
cron := cronPool.Get().(*Cron)
return cron
}
func releaseCron(cron *Cron) {
cronPool.Put(cron)
}
// one dispatcher per goroutine (goroutine not safe)
type Dispatcher struct {
ChanTimer chan ITimer
}
func (t *Timer) GetId() uint64 {
return t.Id
}
func (t *Timer) GetFireTime() time.Time {
return t.fireTime
}
func (t *Timer) Open(bOpen bool) {
t.rOpen = bOpen
}
func (t *Timer) AppendChannel(timer ITimer) {
t.C <- timer
}
func (t *Timer) IsOpen() bool {
return t.rOpen
}
func (t *Timer) Do() {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]),log.String("error",errString))
}
}()
if t.IsActive() == false {
if t.cbOnCloseTimer != nil {
t.cbOnCloseTimer(t)
}
releaseTimer(t)
return
}
if t.cb != nil {
t.cb(t.Id, t.AdditionData)
} else if t.cbEx != nil {
t.cbEx(t)
}
if t.rOpen == false {
if t.cbOnCloseTimer != nil {
t.cbOnCloseTimer(t)
}
releaseTimer(t)
}
}
func (t *Timer) SetupTimer(now time.Time) error {
t.fireTime = now.Add(t.interval)
if SetupTimer(t) == nil {
return fmt.Errorf("failed to install timer")
}
return nil
}
func (t *Timer) GetInterval() time.Duration {
return t.interval
}
func (t *Timer) Cancel() {
atomic.StoreInt32(&t.cancelled,1)
}
// 判断定时器是否已经取消
func (t *Timer) IsActive() bool {
return atomic.LoadInt32(&t.cancelled) == 0
}
func (t *Timer) GetName() string {
if t.cb != nil {
return runtime.FuncForPC(reflect.ValueOf(t.cb).Pointer()).Name()
} else if t.cbEx != nil {
return runtime.FuncForPC(reflect.ValueOf(t.cbEx).Pointer()).Name()
}
return ""
}
var emptyTimer Timer
func (t *Timer) Reset() {
*t = emptyTimer
}
func (t *Timer) IsRef() bool {
return t.ref
}
func (t *Timer) Ref() {
t.ref = true
}
func (t *Timer) UnRef() {
t.ref = false
}
func (c *Cron) Reset() {
c.Timer.Reset()
}
func (c *Cron) Do() {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]),log.String("error",errString))
}
}()
if c.IsActive() == false {
if c.cbOnCloseTimer != nil {
c.cbOnCloseTimer(c)
}
releaseCron(c)
return
}
now := Now()
nextTime := c.cronExpr.Next(now)
if nextTime.IsZero() {
c.cbCronEx(c)
return
}
if c.cb != nil {
c.cb(c.Id, c.AdditionData)
} else if c.cbCronEx != nil {
c.cbCronEx(c)
}
if c.IsActive() == true {
c.interval = nextTime.Sub(now)
c.fireTime = now.Add(c.interval)
SetupTimer(c)
} else {
if c.cbOnCloseTimer != nil {
c.cbOnCloseTimer(c)
}
releaseCron(c)
return
}
}
func (c *Cron) IsRef() bool {
return c.ref
}
func (c *Cron) Ref() {
c.ref = true
}
func (c *Cron) UnRef() {
c.ref = false
}
func (c *Ticker) Do() {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.Dump(string(buf[:l]),log.String("error",errString))
}
}()
if c.IsActive() == false {
if c.cbOnCloseTimer != nil {
c.cbOnCloseTimer(c)
}
releaseTicker(c)
return
}
if c.cb != nil {
c.cb(c.Id, c.AdditionData)
} else if c.cbTickerEx != nil {
c.cbTickerEx(c)
}
if c.IsActive() == true {
c.fireTime = Now().Add(c.interval)
SetupTimer(c)
} else {
if c.cbOnCloseTimer != nil {
c.cbOnCloseTimer(c)
}
releaseTicker(c)
}
}
func (c *Ticker) Reset() {
c.Timer.Reset()
}
func (c *Ticker) IsRef() bool {
return c.ref
}
func (c *Ticker) Ref() {
c.ref = true
}
func (c *Ticker) UnRef() {
c.ref = false
}
func NewDispatcher(l int) *Dispatcher {
dispatcher := new(Dispatcher)
dispatcher.ChanTimer = make(chan ITimer, l)
return dispatcher
}
func (dispatcher *Dispatcher) AfterFunc(d time.Duration, cb func(uint64, interface{}), cbEx func(*Timer), onTimerClose OnCloseTimer, onAddTimer OnAddTimer) *Timer {
timer := newTimer(d, dispatcher.ChanTimer, nil, nil)
timer.cb = cb
timer.cbEx = cbEx
timer.cbOnCloseTimer = onTimerClose
t := SetupTimer(timer)
if onAddTimer != nil && t != nil {
onAddTimer(t)
}
return timer
}
func (dispatcher *Dispatcher) CronFunc(cronExpr *CronExpr, cb func(uint64, interface{}), cbEx func(*Cron), onTimerClose OnCloseTimer, onAddTimer OnAddTimer) *Cron {
now := Now()
nextTime := cronExpr.Next(now)
if nextTime.IsZero() {
return nil
}
cron := newCron()
cron.cb = cb
cron.cbCronEx = cbEx
cron.cbOnCloseTimer = onTimerClose
cron.cronExpr = cronExpr
cron.C = dispatcher.ChanTimer
cron.interval = nextTime.Sub(now)
cron.fireTime = nextTime
SetupTimer(cron)
onAddTimer(cron)
return cron
}
func (dispatcher *Dispatcher) TickerFunc(d time.Duration, cb func(uint64, interface{}), cbEx func(*Ticker), onTimerClose OnCloseTimer, onAddTimer OnAddTimer) *Ticker {
ticker := newTicker()
ticker.C = dispatcher.ChanTimer
ticker.fireTime = Now().Add(d)
ticker.cb = cb
ticker.cbTickerEx = cbEx
ticker.interval = d
// callback
SetupTimer(ticker)
onAddTimer(ticker)
return ticker
}