diff --git a/concurrent/dispatch.go b/concurrent/dispatch.go index becc634..787713b 100644 --- a/concurrent/dispatch.go +++ b/concurrent/dispatch.go @@ -10,6 +10,7 @@ import ( "github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/util/queue" + "context" ) var idleTimeout = int64(2 * time.Second) @@ -30,6 +31,9 @@ type dispatch struct { waitWorker sync.WaitGroup waitDispatch sync.WaitGroup + + cancelContext context.Context + cancel context.CancelFunc } func (d *dispatch) open(minGoroutineNum int32, maxGoroutineNum int32, tasks chan task, cbChannel chan func(error)) { @@ -40,7 +44,7 @@ func (d *dispatch) open(minGoroutineNum int32, maxGoroutineNum int32, tasks chan d.workerQueue = make(chan task) d.cbChannel = cbChannel d.queueIdChannel = make(chan int64, cap(tasks)) - + d.cancelContext,d.cancel = context.WithCancel(context.Background()) d.waitDispatch.Add(1) go d.run() } @@ -64,10 +68,12 @@ func (d *dispatch) run() { d.processqueueEvent(queueId) case <-timeout.C: d.processTimer() - if atomic.LoadInt32(&d.minConcurrentNum) == -1 && len(d.tasks) == 0 { - atomic.StoreInt64(&idleTimeout,int64(time.Millisecond * 10)) - } + case <- d.cancelContext.Done(): + atomic.StoreInt64(&idleTimeout,int64(time.Millisecond * 5)) timeout.Reset(time.Duration(atomic.LoadInt64(&idleTimeout))) + for i:=int32(0);i