From 9c26c742fe1f467cb64b62f059431aaa5dd5f43a Mon Sep 17 00:00:00 2001 From: boyce Date: Fri, 14 Jun 2024 15:42:23 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=8D=8F=E7=A8=8B=E6=B1=A0?= =?UTF-8?q?=E9=80=80=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- concurrent/dispatch.go | 16 ++++++++++++---- rpc/rpchandler.go | 5 ----- service/service.go | 18 ++++++++---------- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/concurrent/dispatch.go b/concurrent/dispatch.go index becc634..787713b 100644 --- a/concurrent/dispatch.go +++ b/concurrent/dispatch.go @@ -10,6 +10,7 @@ import ( "github.com/duanhf2012/origin/v2/log" "github.com/duanhf2012/origin/v2/util/queue" + "context" ) var idleTimeout = int64(2 * time.Second) @@ -30,6 +31,9 @@ type dispatch struct { waitWorker sync.WaitGroup waitDispatch sync.WaitGroup + + cancelContext context.Context + cancel context.CancelFunc } func (d *dispatch) open(minGoroutineNum int32, maxGoroutineNum int32, tasks chan task, cbChannel chan func(error)) { @@ -40,7 +44,7 @@ func (d *dispatch) open(minGoroutineNum int32, maxGoroutineNum int32, tasks chan d.workerQueue = make(chan task) d.cbChannel = cbChannel d.queueIdChannel = make(chan int64, cap(tasks)) - + d.cancelContext,d.cancel = context.WithCancel(context.Background()) d.waitDispatch.Add(1) go d.run() } @@ -64,10 +68,12 @@ func (d *dispatch) run() { d.processqueueEvent(queueId) case <-timeout.C: d.processTimer() - if atomic.LoadInt32(&d.minConcurrentNum) == -1 && len(d.tasks) == 0 { - atomic.StoreInt64(&idleTimeout,int64(time.Millisecond * 10)) - } + case <- d.cancelContext.Done(): + atomic.StoreInt64(&idleTimeout,int64(time.Millisecond * 5)) timeout.Reset(time.Duration(atomic.LoadInt64(&idleTimeout))) + for i:=int32(0);i