From 59c9d20071c5f42b3a5cdcc9bd62b246421c27f2 Mon Sep 17 00:00:00 2001 From: boyce Date: Sun, 1 Nov 2020 00:21:07 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9ETicker=E5=AE=9A=E6=99=82?= =?UTF-8?q?=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- service/module.go | 52 ++++---- service/service.go | 11 +- util/timer/timer.go | 245 ++++++++++++++++++++++++------------ util/timewheel/timewheel.go | 3 + 4 files changed, 197 insertions(+), 114 deletions(-) diff --git a/service/module.go b/service/module.go index 29865ff..4c7bebe 100644 --- a/service/module.go +++ b/service/module.go @@ -5,8 +5,8 @@ import ( "github.com/duanhf2012/origin/event" "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/util/timer" + "github.com/duanhf2012/origin/util/timewheel" "reflect" - "runtime" "time" ) const InitModuleId = 1e17 @@ -37,8 +37,7 @@ type Module struct { parent IModule //父亲 self IModule //自己 child map[int64]IModule //孩子们 - mapActiveTimer map[*timer.Timer]interface{} - mapActiveCron map[*timer.Cron]interface{} + mapActiveTimer map[*timewheel.Timer]interface{} dispatcher *timer.Dispatcher //timer //根结点 @@ -120,11 +119,7 @@ func (m *Module) ReleaseModule(moduleId int64){ pModule.self.OnRelease() log.Debug("Release module %s.", m.GetModuleName()) for pTimer,_ := range pModule.mapActiveTimer { - pTimer.Close() - } - - for pCron,_ := range pModule.mapActiveCron { - pCron.Close() + pTimer.AdditionData.(timer.ITime).Close() } delete(m.child,moduleId) @@ -135,7 +130,6 @@ func (m *Module) ReleaseModule(moduleId int64){ pModule.parent = nil pModule.child = nil pModule.mapActiveTimer = nil - pModule.mapActiveCron = nil pModule.dispatcher = nil pModule.ancestor = nil pModule.descendants = nil @@ -152,7 +146,7 @@ func (m *Module) GetAncestor()IModule{ func (m *Module) GetModule(moduleId int64) IModule{ iModule,ok := m.GetAncestor().getBaseModule().(*Module).descendants[moduleId] - if ok == false{ + if ok == false { return nil } return iModule @@ -166,32 +160,38 @@ func (m *Module) GetParent()IModule{ return m.parent } +func (m *Module) OnCloseTimer(timer *timewheel.Timer){ + delete(m.mapActiveTimer,timer) +} + +func (m *Module) OnAddTimer(t *timewheel.Timer){ + if t != nil { + m.mapActiveTimer[t] = nil + } +} + func (m *Module) AfterFunc(d time.Duration, cb func()) *timer.Timer { if m.mapActiveTimer == nil { - m.mapActiveTimer =map[*timer.Timer]interface{}{} + m.mapActiveTimer =map[*timewheel.Timer]interface{}{} } - funName := runtime.FuncForPC(reflect.ValueOf(cb).Pointer()).Name() - tm := m.dispatcher.AfterFuncEx(funName,d,func(t *timer.Timer){ - cb() - delete(m.mapActiveTimer,t) - }) - - m.mapActiveTimer[tm] = nil - return tm + return m.dispatcher.AfterFunc(d,cb,m.OnCloseTimer,m.OnAddTimer) } func (m *Module) CronFunc(cronExpr *timer.CronExpr, cb func()) *timer.Cron { - if m.mapActiveCron == nil { - m.mapActiveCron =map[*timer.Cron]interface{}{} + if m.mapActiveTimer == nil { + m.mapActiveTimer =map[*timewheel.Timer]interface{}{} } - cron := m.dispatcher.CronFuncEx(cronExpr, func(cron *timer.Cron) { - cb() - }) + return m.dispatcher.CronFunc(cronExpr,cb,m.OnCloseTimer,m.OnAddTimer) +} - m.mapActiveCron[cron] = nil - return cron +func (m *Module) NewTicker(d time.Duration, cb func()) *timer.Ticker { + if m.mapActiveTimer == nil { + m.mapActiveTimer =map[*timewheel.Timer]interface{}{} + } + + return m.dispatcher.TickerFunc(d,cb,m.OnCloseTimer,m.OnAddTimer) } func (m *Module) OnRelease(){ diff --git a/service/service.go b/service/service.go index 85f100f..f0bb93d 100644 --- a/service/service.go +++ b/service/service.go @@ -109,7 +109,7 @@ func (s *Service) Run() { bStop = true case rpcRequest :=<- rpcRequestChan: if s.profiler!=nil { - analyzer = s.profiler.Push("Req_"+rpcRequest.RpcRequestData.GetServiceMethod()) + analyzer = s.profiler.Push("[Req]"+rpcRequest.RpcRequestData.GetServiceMethod()) } s.GetRpcHandler().HandlerRpcRequest(rpcRequest) @@ -119,7 +119,7 @@ func (s *Service) Run() { } case rpcResponseCB := <-rpcResponseCallBack: if s.profiler!=nil { - analyzer = s.profiler.Push("Res_" + rpcResponseCB.ServiceMethod) + analyzer = s.profiler.Push("[Res]" + rpcResponseCB.ServiceMethod) } s.GetRpcHandler().HandlerRpcResponseCB(rpcResponseCB) if analyzer!=nil { @@ -128,7 +128,7 @@ func (s *Service) Run() { } case ev := <- eventChan: if s.profiler!=nil { - analyzer = s.profiler.Push(fmt.Sprintf("Event_%d", int(ev.Type))) + analyzer = s.profiler.Push(fmt.Sprintf("[Event]%d", int(ev.Type))) } s.eventProcessor.EventHandler(ev) if analyzer!=nil { @@ -137,10 +137,11 @@ func (s *Service) Run() { } case t := <- s.dispatcher.ChanTimer: if t.IsClose() == false { + time := t.AdditionData.(timer.ITime) if s.profiler != nil { - analyzer = s.profiler.Push(fmt.Sprintf("Timer_%s", t.AdditionData.(*timer.Timer).GetFunctionName())) + analyzer = s.profiler.Push("[timer]"+time.GetName()) } - t.AdditionData.(*timer.Timer).Cb() + time.Do() if analyzer != nil { analyzer.Pop() analyzer = nil diff --git a/util/timer/timer.go b/util/timer/timer.go index 40c2ae3..20f3f1d 100644 --- a/util/timer/timer.go +++ b/util/timer/timer.go @@ -1,14 +1,25 @@ package timer import ( - "fmt" - "github.com/duanhf2012/origin/log" "github.com/duanhf2012/origin/util/timewheel" "reflect" "runtime" + "sync" "time" ) +var timerPool = sync.Pool{New: func() interface{}{ + return &Timer{} +}} + +var cronPool = sync.Pool{New: func() interface{}{ + return &Cron{} +}} + +var tickerPool = sync.Pool{New: func() interface{}{ + return &Ticker{} +}} + // one dispatcher per goroutine (goroutine not safe) type Dispatcher struct { ChanTimer chan *timewheel.Timer @@ -20,119 +31,187 @@ func NewDispatcher(l int) *Dispatcher { return disp } +type ITime interface { + Close () + Do() + GetName() string +} + // Timer type Timer struct { t *timewheel.Timer cb func() - cbex func(*Timer) name string -} - -func (t *Timer) Close() { - t.t.Close() - t.cb = nil -} - -func (t *Timer) GetFunctionName() string { - return t.name -} - -func (t *Timer) Cb() { - defer func() { - if r := recover(); r != nil { - buf := make([]byte, 40960) - l := runtime.Stack(buf, false) - err := fmt.Errorf("%v: %s", r, buf[:l]) - log.Error("core dump info:%+v\n",err) - } - }() - - if t.cbex!=nil { - t.cbex(t) - }else if t.cb!= nil { - t.cb() - } - -} - -func (disp *Dispatcher) AfterFunc(d time.Duration, cb func()) *Timer { - t := new(Timer) - t.cb = cb - t.name = reflect.TypeOf(cb).Name() - t.t = timewheel.NewTimerEx(d,disp.ChanTimer,t) - - return t -} - -func (disp *Dispatcher) AfterFuncEx(funName string,d time.Duration, cbex func(timer *Timer)) *Timer { - t := new(Timer) - t.cbex = cbex - t.name = funName - t.t = timewheel.NewTimerEx(d,disp.ChanTimer,t) - - return t + onClose func(timer *timewheel.Timer) } // Cron type Cron struct { - t *Timer + Timer +} + +// Ticker +type Ticker struct { + Timer +} + +func NewTimer(t *timewheel.Timer,cb func(),name string,onClose func(timer *timewheel.Timer)) *Timer { + timer := timerPool.Get().(*Timer) + timer.t = t + timer.cb = cb + timer.onClose = onClose + timer.name = name + + return timer +} + +func ReleaseTimer(timer *Timer) { + timerPool.Put(timer) +} + +func (t *Timer) Close(){ + if t.t!=nil { + t.t.Close() + } + if t.onClose!=nil { + t.onClose(t.t) + } + ReleaseTimer(t) +} + +func (t *Timer) Do(){ + t.Close() + t.cb() +} + +func (t *Timer) GetName() string{ + return t.name +} + +func NewCron(t *timewheel.Timer,cb func(),name string,onClose func(timer *timewheel.Timer)) *Cron { + cron := cronPool.Get().(*Cron) + cron.t = t + cron.cb = cb + cron.onClose = onClose + cron.name = name + return cron +} + +func ReleaseCron(cron *Cron) { + cronPool.Put(cron) +} + +func (c *Cron) Do(){ + if c.onClose!=nil { + c.onClose(c.t) + } + + c.cb() } func (c *Cron) Close() { if c.t != nil { c.t.Close() } + + if c.onClose!=nil { + c.onClose(c.t) + } + + ReleaseCron(c) } -func (disp *Dispatcher) CronFunc(cronExpr *CronExpr, _cb func()) *Cron { - c := new(Cron) +func NewTicker(t *timewheel.Timer,cb func(),name string,onClose func(timer *timewheel.Timer)) *Ticker { + ticker := tickerPool.Get().(*Ticker) + ticker.t = t + ticker.cb = cb + ticker.onClose = onClose + ticker.name = name + return ticker +} + +func ReleaseTicker(ticker *Ticker) { + tickerPool.Put(ticker) +} + +func (tk *Ticker) Do(){ + //通知当前timer删除 + if tk.onClose!=nil { + tk.onClose(tk.t) + } + tk.cb() +} + +func (tk *Ticker) Close() { + if tk.t != nil { + tk.t.Close() + } + + if tk.onClose!=nil { + tk.onClose(tk.t) + } + + ReleaseTicker(tk) +} + +func (disp *Dispatcher) AfterFunc(d time.Duration, cb func(),onCloseTimer func(timer *timewheel.Timer),onAddTimer func(timer *timewheel.Timer)) *Timer { + funName := runtime.FuncForPC(reflect.ValueOf(cb).Pointer()).Name() + t := NewTimer(nil,cb,funName,onCloseTimer) + t.t = timewheel.NewTimerEx(d,disp.ChanTimer,t) + onAddTimer(t.t) + + return t +} + +func (disp *Dispatcher) CronFunc(cronExpr *CronExpr, cb func(),onCloseTimer func(timer *timewheel.Timer),onAddTimer func(timer *timewheel.Timer)) *Cron { now := time.Now() nextTime := cronExpr.Next(now) if nextTime.IsZero() { - return c + return nil } + funcName := runtime.FuncForPC(reflect.ValueOf(cb).Pointer()).Name() + cron := NewCron(nil,nil,funcName,onCloseTimer) // callback - var cb func() - cb = func() { - defer _cb() - + var cbFunc func() + cbFunc = func() { now := time.Now() nextTime := cronExpr.Next(now) if nextTime.IsZero() { + cb() return } - c.t = disp.AfterFunc(nextTime.Sub(now), cb) - } - c.t = disp.AfterFunc(nextTime.Sub(now), cb) - return c + interval := nextTime.Sub(now) + minTimeInterval := time.Millisecond*time.Duration(timewheel.GRANULARITY) + if interval < minTimeInterval { + interval = minTimeInterval + } + + cron.t = timewheel.NewTimerEx(interval,disp.ChanTimer,cron) + onAddTimer(cron.t) + cb() + } + cron.cb = cbFunc + cron.t = timewheel.NewTimerEx(nextTime.Sub(now),disp.ChanTimer,cron) + onAddTimer(cron.t) + return cron } - -func (disp *Dispatcher) CronFuncEx(cronExpr *CronExpr, _cb func(*Cron)) *Cron { - c := new(Cron) - - now := time.Now() - nextTime := cronExpr.Next(now) - if nextTime.IsZero() { - return c - } - +func (disp *Dispatcher) TickerFunc(d time.Duration, cb func(),onCloseTimer func(timer *timewheel.Timer),onAddTimer func(timer *timewheel.Timer)) *Ticker { + funcName := runtime.FuncForPC(reflect.ValueOf(cb).Pointer()).Name() + ticker := NewTicker(nil,nil,funcName,onCloseTimer) // callback - var cb func() - cb = func() { - defer _cb(c) - - now := time.Now() - nextTime := cronExpr.Next(now) - if nextTime.IsZero() { - return - } - c.t = disp.AfterFunc(nextTime.Sub(now), cb) + var cbFunc func() + cbFunc = func() { + ticker.t = timewheel.NewTimerEx(d,disp.ChanTimer,ticker) + onAddTimer(ticker.t) + cb() } - c.t = disp.AfterFunc(nextTime.Sub(now), cb) - return c -} \ No newline at end of file + ticker.cb = cbFunc + ticker.t = timewheel.NewTimerEx(d,disp.ChanTimer,ticker) + onAddTimer(ticker.t) + return ticker +} diff --git a/util/timewheel/timewheel.go b/util/timewheel/timewheel.go index 969a834..e0370a1 100644 --- a/util/timewheel/timewheel.go +++ b/util/timewheel/timewheel.go @@ -307,6 +307,9 @@ func (t *timeWheel) addTimer(timer *Timer) *Timer { //删除定时器 func (t *timeWheel) delTimer(timer *Timer) { + if timer.next == nil { + return + } timer.prev.next = timer.next timer.next.prev = timer.prev ReleaseTimer(timer)