mirror of
https://github.com/duanhf2012/origin.git
synced 2026-02-04 06:54:45 +08:00
优化协程池退出
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/duanhf2012/origin/v2/log"
|
||||
"github.com/duanhf2012/origin/v2/util/queue"
|
||||
"context"
|
||||
)
|
||||
|
||||
var idleTimeout = int64(2 * time.Second)
|
||||
@@ -30,6 +31,9 @@ type dispatch struct {
|
||||
|
||||
waitWorker sync.WaitGroup
|
||||
waitDispatch sync.WaitGroup
|
||||
|
||||
cancelContext context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (d *dispatch) open(minGoroutineNum int32, maxGoroutineNum int32, tasks chan task, cbChannel chan func(error)) {
|
||||
@@ -40,7 +44,7 @@ func (d *dispatch) open(minGoroutineNum int32, maxGoroutineNum int32, tasks chan
|
||||
d.workerQueue = make(chan task)
|
||||
d.cbChannel = cbChannel
|
||||
d.queueIdChannel = make(chan int64, cap(tasks))
|
||||
|
||||
d.cancelContext,d.cancel = context.WithCancel(context.Background())
|
||||
d.waitDispatch.Add(1)
|
||||
go d.run()
|
||||
}
|
||||
@@ -64,10 +68,12 @@ func (d *dispatch) run() {
|
||||
d.processqueueEvent(queueId)
|
||||
case <-timeout.C:
|
||||
d.processTimer()
|
||||
if atomic.LoadInt32(&d.minConcurrentNum) == -1 && len(d.tasks) == 0 {
|
||||
atomic.StoreInt64(&idleTimeout,int64(time.Millisecond * 10))
|
||||
}
|
||||
case <- d.cancelContext.Done():
|
||||
atomic.StoreInt64(&idleTimeout,int64(time.Millisecond * 5))
|
||||
timeout.Reset(time.Duration(atomic.LoadInt64(&idleTimeout)))
|
||||
for i:=int32(0);i<d.workerNum;i++{
|
||||
d.processIdle()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -166,6 +172,8 @@ func (c *dispatch) pushAsyncDoCallbackEvent(cb func(err error)) {
|
||||
|
||||
func (d *dispatch) close() {
|
||||
atomic.StoreInt32(&d.minConcurrentNum, -1)
|
||||
d.cancel()
|
||||
|
||||
|
||||
breakFor:
|
||||
for {
|
||||
|
||||
Reference in New Issue
Block a user