新增安全定时器功能接口;优化定时器性能

This commit is contained in:
duanhf2012
2021-08-16 14:15:40 +08:00
parent 81a484b6a9
commit 41e1c27760
4 changed files with 318 additions and 141 deletions

View File

@@ -7,18 +7,19 @@ import (
rpcHandle "github.com/duanhf2012/origin/rpc"
"github.com/duanhf2012/origin/util/timer"
"reflect"
"sync/atomic"
"time"
)
const InitModuleId = 1e17
const InitModuleId = 1e9
type IModule interface {
SetModuleId(moduleId int64) bool
GetModuleId() int64
AddModule(module IModule) (int64,error)
GetModule(moduleId int64) IModule
SetModuleId(moduleId uint32) bool
GetModuleId() uint32
AddModule(module IModule) (uint32,error)
GetModule(moduleId uint32) IModule
GetAncestor()IModule
ReleaseModule(moduleId int64)
NewModuleId() int64
ReleaseModule(moduleId uint32)
NewModuleId() uint32
GetParent()IModule
OnInit() error
OnRelease()
@@ -37,24 +38,25 @@ type IModuleTimer interface {
type Module struct {
rpcHandle.IRpcHandler
moduleId int64 //模块Id
moduleId uint32 //模块Id
moduleName string //模块名称
parent IModule //父亲
self IModule //自己
child map[int64]IModule //孩子们
mapActiveTimer map[*timer.Timer]interface{}
child map[uint32]IModule //孩子们
mapActiveTimer map[timer.ITimer]struct{}
mapActiveIdTimer map[uint64]timer.ITimer
dispatcher *timer.Dispatcher //timer
//根结点
ancestor IModule //始祖
seedModuleId int64 //模块id种子
descendants map[int64]IModule //始祖的后裔们
seedModuleId uint32 //模块id种子
descendants map[uint32]IModule //始祖的后裔们
//事件管道
eventHandler event.IEventHandler
}
func (m *Module) SetModuleId(moduleId int64) bool{
func (m *Module) SetModuleId(moduleId uint32) bool{
if m.moduleId > 0 {
return false
}
@@ -63,7 +65,7 @@ func (m *Module) SetModuleId(moduleId int64) bool{
return true
}
func (m *Module) GetModuleId() int64{
func (m *Module) GetModuleId() uint32{
return m.moduleId
}
@@ -75,7 +77,7 @@ func (m *Module) OnInit() error{
return nil
}
func (m *Module) AddModule(module IModule) (int64,error){
func (m *Module) AddModule(module IModule) (uint32,error){
//没有事件处理器不允许加入其他模块
if m.GetEventProcessor() == nil {
return 0,fmt.Errorf("module %+v Event Processor is nil", m.self)
@@ -87,7 +89,7 @@ func (m *Module) AddModule(module IModule) (int64,error){
}
if m.child == nil {
m.child = map[int64]IModule{}
m.child = map[uint32]IModule{}
}
_,ok := m.child[module.GetModuleId()]
if ok == true {
@@ -113,7 +115,7 @@ func (m *Module) AddModule(module IModule) (int64,error){
return module.GetModuleId(),nil
}
func (m *Module) ReleaseModule(moduleId int64){
func (m *Module) ReleaseModule(moduleId uint32){
pModule := m.GetModule(moduleId).getBaseModule().(*Module)
//释放子孙
@@ -128,6 +130,10 @@ func (m *Module) ReleaseModule(moduleId int64){
pTimer.Cancel()
}
for _,t := range pModule.mapActiveIdTimer {
t.Cancel()
}
delete(m.child,moduleId)
delete (m.ancestor.getBaseModule().(*Module).descendants,moduleId)
@@ -140,18 +146,32 @@ func (m *Module) ReleaseModule(moduleId int64){
pModule.ancestor = nil
pModule.descendants = nil
pModule.IRpcHandler = nil
pModule.mapActiveIdTimer = nil
}
func (m *Module) NewModuleId() int64{
func (m *Module) NewModuleId() uint32{
m.ancestor.getBaseModule().(*Module).seedModuleId+=1
return m.ancestor.getBaseModule().(*Module).seedModuleId
}
var timerSeedId uint32
func (m *Module) GenTimerId() uint64{
for{
newTimerId := (uint64(m.GetModuleId())<<32)|uint64(atomic.AddUint32(&timerSeedId,1))
if _,ok := m.mapActiveIdTimer[newTimerId];ok == true {
continue
}
return newTimerId
}
}
func (m *Module) GetAncestor()IModule{
return m.ancestor
}
func (m *Module) GetModule(moduleId int64) IModule{
func (m *Module) GetModule(moduleId uint32) IModule{
iModule,ok := m.GetAncestor().getBaseModule().(*Module).descendants[moduleId]
if ok == false {
return nil
@@ -167,40 +187,108 @@ func (m *Module) GetParent()IModule{
return m.parent
}
func (m *Module) OnCloseTimer(timer *timer.Timer){
delete(m.mapActiveTimer,timer)
func (m *Module) OnCloseTimer(t timer.ITimer){
delete(m.mapActiveIdTimer,t.GetId())
delete(m.mapActiveTimer,t)
}
func (m *Module) OnAddTimer(t *timer.Timer){
func (m *Module) OnAddTimer(t timer.ITimer){
if t != nil {
m.mapActiveTimer[t] = nil
if m.mapActiveTimer == nil {
m.mapActiveTimer = map[timer.ITimer]struct{}{}
}
m.mapActiveTimer[t] = struct{}{}
}
}
func (m *Module) AfterFunc(d time.Duration, cb func(*timer.Timer)) *timer.Timer {
if m.mapActiveTimer == nil {
m.mapActiveTimer =map[*timer.Timer]interface{}{}
m.mapActiveTimer =map[timer.ITimer]struct{}{}
}
return m.dispatcher.AfterFunc(d,cb,m.OnCloseTimer,m.OnAddTimer)
return m.dispatcher.AfterFunc(d,nil,cb,m.OnCloseTimer,m.OnAddTimer)
}
func (m *Module) CronFunc(cronExpr *timer.CronExpr, cb func(*timer.Cron)) *timer.Cron {
if m.mapActiveTimer == nil {
m.mapActiveTimer =map[*timer.Timer]interface{}{}
m.mapActiveTimer =map[timer.ITimer]struct{}{}
}
return m.dispatcher.CronFunc(cronExpr,cb,m.OnCloseTimer,m.OnAddTimer)
return m.dispatcher.CronFunc(cronExpr,nil,cb,m.OnCloseTimer,m.OnAddTimer)
}
func (m *Module) NewTicker(d time.Duration, cb func(*timer.Ticker)) *timer.Ticker {
if m.mapActiveTimer == nil {
m.mapActiveTimer =map[*timer.Timer]interface{}{}
m.mapActiveTimer =map[timer.ITimer]struct{}{}
}
return m.dispatcher.TickerFunc(d,cb,m.OnCloseTimer,m.OnAddTimer)
return m.dispatcher.TickerFunc(d,nil,cb,m.OnCloseTimer,m.OnAddTimer)
}
func (m *Module) cb(*timer.Timer){
}
func (m *Module) SafeAfterFunc(timerId *uint64,d time.Duration, AdditionData interface{},cb func(interface{})) {
if m.mapActiveIdTimer == nil {
m.mapActiveIdTimer = map[uint64]timer.ITimer{}
}
if *timerId != 0 {
m.CancelTimerId(timerId)
}
*timerId = m.GenTimerId()
t := m.dispatcher.AfterFunc(d,cb,nil,m.OnCloseTimer,m.OnAddTimer)
t.AdditionData = AdditionData
t.Id = *timerId
m.mapActiveIdTimer[*timerId] = t
}
func (m *Module) SafeCronFunc(cronId *uint64,cronExpr *timer.CronExpr, AdditionData interface{}, cb func(interface{})) {
if m.mapActiveIdTimer == nil {
m.mapActiveIdTimer = map[uint64]timer.ITimer{}
}
*cronId = m.GenTimerId()
c := m.dispatcher.CronFunc(cronExpr,cb,nil,m.OnCloseTimer,m.OnAddTimer)
c.AdditionData = AdditionData
c.Id = *cronId
m.mapActiveIdTimer[*cronId] = c
}
func (m *Module) SafeNewTicker(tickerId *uint64,d time.Duration, AdditionData interface{}, cb func(interface{})) {
if m.mapActiveIdTimer == nil {
m.mapActiveIdTimer = map[uint64]timer.ITimer{}
}
*tickerId = m.GenTimerId()
t := m.dispatcher.TickerFunc(d,cb,nil,m.OnCloseTimer,m.OnAddTimer)
t.AdditionData = AdditionData
t.Id = *tickerId
m.mapActiveIdTimer[*tickerId] = t
}
func (m *Module) CancelTimerId(timerId *uint64) bool{
if m.mapActiveIdTimer == nil {
log.SError("mapActiveIdTimer is nil")
return false
}
t,ok := m.mapActiveIdTimer[*timerId]
if ok == false {
log.SError("cannot find timer id ",timerId)
return false
}
t.Cancel()
*timerId = 0
return true
}
func (m *Module) OnRelease(){
}

View File

@@ -95,7 +95,7 @@ func (s *Service) Init(iService IService,getClientFun rpc.FuncRpcClient,getServe
//初始化祖先
s.ancestor = iService.(IModule)
s.seedModuleId =InitModuleId
s.descendants = map[int64]IModule{}
s.descendants = map[uint32]IModule{}
s.serviceCfg = serviceCfg
s.goroutineNum = 1
s.eventProcessor = event.NewEventProcessor()

View File

@@ -2,17 +2,16 @@ package timer
import (
"container/heap"
"github.com/duanhf2012/origin/log"
"sync"
"time"
)
func SetupTimer(timer *Timer) *Timer{
if timer.rOpen == true {
func SetupTimer(timer ITimer) ITimer{
if timer.IsOpen() == true {
return nil
}
timer.rOpen = true
timer.Open(true)
timerHeapLock.Lock() // 使用锁规避竞争条件
heap.Push(&timerHeap,timer)
timerHeapLock.Unlock()
@@ -20,9 +19,10 @@ func SetupTimer(timer *Timer) *Timer{
}
func NewTimer(d time.Duration) *Timer{
c := make(chan *Timer,1)
timer := newTimer(d,c,nil,"",nil)
c := make(chan ITimer,1)
timer := newTimer(d,c,nil,"")
SetupTimer(timer)
return timer
}
@@ -31,7 +31,7 @@ func ReleaseTimer(timer *Timer) {
}
type _TimerHeap struct {
timers []*Timer
timers []ITimer
}
func (h *_TimerHeap) Len() int {
@@ -39,7 +39,7 @@ func (h *_TimerHeap) Len() int {
}
func (h *_TimerHeap) Less(i, j int) bool {
return h.timers[i].fireTime.Before(h.timers[j].fireTime)
return h.timers[i].GetFireTime().Before(h.timers[j].GetFireTime())
}
func (h *_TimerHeap) Swap(i, j int) {
@@ -47,7 +47,7 @@ func (h *_TimerHeap) Swap(i, j int) {
}
func (h *_TimerHeap) Push(x interface{}) {
h.timers = append(h.timers, x.(*Timer))
h.timers = append(h.timers, x.(ITimer))
}
func (h *_TimerHeap) Pop() (ret interface{}) {
@@ -63,7 +63,7 @@ var (
)
func StartTimer(minTimerInterval time.Duration,maxTimerNum int){
timerHeap.timers = make([]*Timer,0,maxTimerNum)
timerHeap.timers = make([]ITimer,0,maxTimerNum)
heap.Init(&timerHeap) // 初始化定时器heap
go tickRoutine(minTimerInterval)
@@ -86,21 +86,17 @@ func tick() bool{
timerHeapLock.Unlock()
return false
}
nextFireTime := timerHeap.timers[0].fireTime
nextFireTime := timerHeap.timers[0].GetFireTime()
if nextFireTime.After(now) { // 没有到时间的定时器,返回
timerHeapLock.Unlock()
return false
}
t := heap.Pop(&timerHeap).(*Timer)
t := heap.Pop(&timerHeap).(ITimer)
timerHeapLock.Unlock()
if len(t.C)>= cap(t.C) {
log.SError("Timer channel full!")
t.Open(false)
t.AppendChannel(t)
return true
}
t.rOpen = false
t.C <- t
return true
}

View File

@@ -9,14 +9,36 @@ import (
"time"
)
// ITimer
type ITimer interface {
GetId() uint64
Cancel()
GetName()string
IsActive() bool
IsOpen() bool
Open(bOpen bool)
AppendChannel(timer ITimer)
Do()
GetFireTime() time.Time
SetupTimer(now time.Time) error
}
type OnCloseTimer func(timer ITimer)
type OnAddTimer func(timer ITimer)
// Timer
type Timer struct {
name string
Id uint64
cancelled bool //是否关闭
C chan *Timer //定时器管道
C chan ITimer //定时器管道
interval time.Duration // 时间间隔(用于循环定时器)
fireTime time.Time // 触发时间
cb func()
cb func(interface{})
cbEx func(t *Timer)
cbCronEx func(t *Cron)
cbTickerEx func(t *Ticker)
cbOnCloseTimer OnCloseTimer
cronExpr *CronExpr
AdditionData interface{} //定时器附加数据
rOpen bool //是否重新打开
@@ -45,17 +67,12 @@ var tickerPool =sync.NewPoolEx(make(chan sync.IPoolData,1000),func() sync.IPoolD
return &Ticker{}
})
func newTimer(d time.Duration,c chan *Timer,cb func(),name string,additionData interface{}) *Timer{
if c == nil {
return nil
}
func newTimer(d time.Duration,c chan ITimer,cb func(interface{}),additionData interface{}) *Timer{
timer := timerPool.Get().(*Timer)
timer.AdditionData = additionData
timer.C = c
timer.fireTime = Now().Add(d)
timer.cb = cb
timer.name = name
timer.interval = d
timer.rOpen = false
return timer
@@ -85,20 +102,59 @@ func releaseCron(cron *Cron) {
// one dispatcher per goroutine (goroutine not safe)
type Dispatcher struct {
ChanTimer chan *Timer
ChanTimer chan ITimer
}
func (t *Timer) GetId() uint64{
return t.Id
}
func (t *Timer) GetFireTime() time.Time{
return t.fireTime
}
func (t *Timer) Open(bOpen bool){
t.rOpen = bOpen
}
func (t *Timer) AppendChannel(timer ITimer){
t.C <- timer
}
func (t *Timer) IsOpen() bool{
return t.rOpen
}
func (t *Timer) Do(){
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.SError("core dump info[",errString,"]\n",string(buf[:l]))
}
}()
if t.IsActive() == false {
if t.cbOnCloseTimer!=nil {
t.cbOnCloseTimer(t)
}
releaseTimer(t)
return
}
if t.cb != nil {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.SError("core dump info[",errString,"]\n",string(buf[:l]))
}
}()
t.cb()
t.cb(t.AdditionData)
}else if t.cbEx != nil {
t.cbEx(t)
}
if t.rOpen ==false {
if t.cbOnCloseTimer!=nil {
t.cbOnCloseTimer(t)
}
releaseTimer(t)
}
}
@@ -124,17 +180,18 @@ func (t *Timer) IsActive() bool {
}
func (t *Timer) GetName() string{
return t.name
if t.cb!=nil {
return runtime.FuncForPC(reflect.ValueOf(t.cb).Pointer()).Name()
}else if t.cbEx!=nil {
return runtime.FuncForPC(reflect.ValueOf(t.cbEx).Pointer()).Name()
}
return ""
}
var emptyTimer Timer
func (t *Timer) Reset(){
t.name = ""
t.cancelled = false
t.C = nil
t.interval = 0
t.cb = nil
t.AdditionData = nil
t.rOpen = false
*t = emptyTimer
}
func (t *Timer) IsRef()bool{
@@ -153,6 +210,50 @@ func (c *Cron) Reset(){
c.Timer.Reset()
}
func (c *Cron) Do() {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.SError("core dump info[",errString,"]\n",string(buf[:l]))
}
}()
if c.IsActive() == false{
if c.cbOnCloseTimer != nil {
c.cbOnCloseTimer(c)
}
releaseCron(c)
return
}
now := Now()
nextTime := c.cronExpr.Next(now)
if nextTime.IsZero() {
c.cbCronEx(c)
return
}
if c.cb!=nil {
c.cb(c.AdditionData)
}else if c.cbEx !=nil {
c.cbCronEx(c)
}
if c.IsActive() == true{
c.interval = nextTime.Sub(now)
c.fireTime = now.Add(c.interval)
SetupTimer(c)
}else{
if c.cbOnCloseTimer!=nil {
c.cbOnCloseTimer(c)
}
releaseCron(c)
return
}
}
func (c *Cron) IsRef()bool{
return c.ref
}
@@ -165,6 +266,42 @@ func (c *Cron) UnRef(){
c.ref = false
}
func (c *Ticker) Do() {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.SError("core dump info[", errString, "]\n", string(buf[:l]))
}
}()
if c.IsActive() == false {
if c.cbOnCloseTimer!=nil {
c.cbOnCloseTimer(c)
}
releaseTicker(c)
return
}
if c.cb!=nil{
c.cb(c.AdditionData)
} else if c.cbTickerEx != nil{
c.cbTickerEx(c)
}
if c.IsActive() == true{
c.fireTime = Now().Add(c.interval)
SetupTimer(c)
}else{
if c.cbOnCloseTimer!=nil {
c.cbOnCloseTimer(c)
}
releaseTicker(c)
}
}
func (c *Ticker) Reset(){
c.Timer.Reset()
}
@@ -183,99 +320,55 @@ func (c *Ticker) UnRef(){
func NewDispatcher(l int) *Dispatcher {
dispatcher := new(Dispatcher)
dispatcher.ChanTimer = make(chan *Timer, l)
dispatcher.ChanTimer = make(chan ITimer, l)
return dispatcher
}
type OnTimerClose func(timer *Timer)
func (dispatcher *Dispatcher) AfterFunc(d time.Duration, cb func(*Timer),onTimerClose OnTimerClose,onAddTimer func(timer *Timer)) *Timer {
funName := runtime.FuncForPC(reflect.ValueOf(cb).Pointer()).Name()
timer := newTimer(d,dispatcher.ChanTimer,nil,funName,nil)
cbFunc := func() {
if timer.IsActive() == false {
onTimerClose(timer)
releaseTimer(timer)
return
}
func (dispatcher *Dispatcher) AfterFunc(d time.Duration, cb func(data interface{}),cbEx func(*Timer),onTimerClose OnCloseTimer,onAddTimer OnAddTimer) *Timer {
timer := newTimer(d,dispatcher.ChanTimer,nil,nil)
timer.cb = cb
timer.cbEx = cbEx
timer.cbOnCloseTimer = onTimerClose
cb(timer)
if timer.rOpen ==false {
onTimerClose(timer)
releaseTimer(timer)
}
t := SetupTimer(timer)
if onAddTimer!= nil && t!=nil {
onAddTimer(t)
}
timer.cb = cbFunc
t := SetupTimer(timer)
onAddTimer(t)
return t
return timer
}
func (dispatcher *Dispatcher) CronFunc(cronExpr *CronExpr, cb func(*Cron),onTimerClose OnTimerClose,onAddTimer func(timer *Timer)) *Cron {
func (dispatcher *Dispatcher) CronFunc(cronExpr *CronExpr,cb func(data interface{}), cbEx func(*Cron),onTimerClose OnCloseTimer,onAddTimer OnAddTimer) *Cron {
now := Now()
nextTime := cronExpr.Next(now)
if nextTime.IsZero() {
return nil
}
funcName := runtime.FuncForPC(reflect.ValueOf(cb).Pointer()).Name()
cron := newCron()
// callback
var cbFunc func()
cbFunc = func() {
if cron.IsActive() == false{
onTimerClose(&cron.Timer)
releaseCron(cron)
return
}
now := Now()
nextTime := cronExpr.Next(now)
if nextTime.IsZero() {
cb(cron)
return
}
cron.interval = nextTime.Sub(now)
cron.fireTime = now.Add(cron.interval)
SetupTimer(&cron.Timer)
cb(cron)
}
cron.cb = cb
cron.cbCronEx = cbEx
cron.cbOnCloseTimer = onTimerClose
cron.cronExpr = cronExpr
cron.C = dispatcher.ChanTimer
cron.cb = cbFunc
cron.name = funcName
cron.interval = nextTime.Sub(now)
cron.fireTime = Now().Add(cron.interval)
SetupTimer(&cron.Timer)
onAddTimer(&cron.Timer)
SetupTimer(cron)
onAddTimer(cron)
return cron
}
func (dispatcher *Dispatcher) TickerFunc(d time.Duration, cb func(*Ticker),onTimerClose OnTimerClose,onAddTimer func(timer *Timer)) *Ticker {
funcName := runtime.FuncForPC(reflect.ValueOf(cb).Pointer()).Name()
func (dispatcher *Dispatcher) TickerFunc(d time.Duration,cb func(data interface{}), cbEx func(*Ticker),onTimerClose OnCloseTimer,onAddTimer OnAddTimer) *Ticker {
ticker := newTicker()
cbFunc := func() {
cb(ticker)
if ticker.Timer.IsActive() == true{
ticker.fireTime = Now().Add(d)
SetupTimer(&ticker.Timer)
}else{
onTimerClose(&ticker.Timer)
releaseTicker(ticker)
}
}
ticker.C = dispatcher.ChanTimer
ticker.fireTime = Now().Add(d)
ticker.cb = cbFunc
ticker.name = funcName
ticker.cb = cb
ticker.cbTickerEx = cbEx
ticker.interval = d
// callback
SetupTimer(&ticker.Timer)
onAddTimer(&ticker.Timer)
SetupTimer(ticker)
onAddTimer(ticker)
return ticker
}