新增异步函数执行功能

This commit is contained in:
duanhf2012
2023-02-22 09:53:50 +08:00
parent 0ebbe0e31d
commit 8111b12da5
11 changed files with 1674 additions and 24 deletions

195
concurrent/dispatch.go Normal file
View File

@@ -0,0 +1,195 @@
package concurrent
import (
"sync"
"sync/atomic"
"time"
"fmt"
"runtime"
"github.com/duanhf2012/origin/log"
"github.com/duanhf2012/origin/util/queue"
)
var idleTimeout = 2 * time.Second
type dispatch struct {
minConcurrentNum int32
maxConcurrentNum int32
queueIdChannel chan int64
workerQueue chan task
tasks chan task
idle bool
workerNum int32
cbChannel chan func(error)
mapTaskQueueSession map[int64]*queue.Deque[task]
waitWorker sync.WaitGroup
waitDispatch sync.WaitGroup
}
func (d *dispatch) open(minGoroutineNum int32, maxGoroutineNum int32, tasks chan task, cbChannel chan func(error)) {
d.minConcurrentNum = minGoroutineNum
d.maxConcurrentNum = maxGoroutineNum
d.tasks = tasks
d.mapTaskQueueSession = make(map[int64]*queue.Deque[task], 1024)
d.workerQueue = make(chan task)
d.cbChannel = cbChannel
d.queueIdChannel = make(chan int64, cap(tasks))
d.waitDispatch.Add(1)
go d.run()
}
func (d *dispatch) run() {
defer d.waitDispatch.Done()
timeout := time.NewTimer(idleTimeout)
for {
select {
case queueId := <-d.queueIdChannel:
d.processqueueEvent(queueId)
default:
select {
case t, ok := <-d.tasks:
if ok == false {
return
}
d.processTask(&t)
case queueId := <-d.queueIdChannel:
d.processqueueEvent(queueId)
case <-timeout.C:
d.processTimer()
if atomic.LoadInt32(&d.minConcurrentNum) == -1 && len(d.tasks) == 0 {
idleTimeout = time.Millisecond * 10
}
timeout.Reset(idleTimeout)
}
}
if atomic.LoadInt32(&d.minConcurrentNum) == -1 && d.workerNum == 0 {
d.waitWorker.Wait()
d.cbChannel <- nil
return
}
}
}
func (d *dispatch) processTimer() {
if d.idle == true && d.workerNum > d.minConcurrentNum {
d.processIdle()
}
d.idle = true
}
func (d *dispatch) processqueueEvent(queueId int64) {
d.idle = false
queueSession := d.mapTaskQueueSession[queueId]
if queueSession == nil {
return
}
queueSession.PopFront()
if queueSession.Len() == 0 {
return
}
t := queueSession.Front()
d.executeTask(&t)
}
func (d *dispatch) executeTask(t *task) {
select {
case d.workerQueue <- *t:
return
default:
if d.workerNum < d.maxConcurrentNum {
var work worker
work.start(&d.waitWorker, t, d)
return
}
}
d.workerQueue <- *t
}
func (d *dispatch) processTask(t *task) {
d.idle = false
//处理有排队任务
if t.queueId != 0 {
queueSession := d.mapTaskQueueSession[t.queueId]
if queueSession == nil {
queueSession = &queue.Deque[task]{}
d.mapTaskQueueSession[t.queueId] = queueSession
}
//没有正在执行的任务,则直接执行
if queueSession.Len() == 0 {
d.executeTask(t)
}
queueSession.PushBack(*t)
return
}
//普通任务
d.executeTask(t)
}
func (d *dispatch) processIdle() {
select {
case d.workerQueue <- task{}:
d.workerNum--
default:
}
}
func (d *dispatch) pushQueueTaskFinishEvent(queueId int64) {
d.queueIdChannel <- queueId
}
func (c *dispatch) pushAsyncDoCallbackEvent(cb func(err error)) {
if cb == nil {
//不需要回调的情况
return
}
c.cbChannel <- cb
}
func (d *dispatch) close() {
atomic.StoreInt32(&d.minConcurrentNum, -1)
breakFor:
for {
select {
case cb := <-d.cbChannel:
if cb == nil {
break breakFor
}
cb(nil)
}
}
d.waitDispatch.Wait()
}
func (d *dispatch) DoCallback(cb func(err error)) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
l := runtime.Stack(buf, false)
errString := fmt.Sprint(r)
log.SError("core dump info[", errString, "]\n", string(buf[:l]))
}
}()
cb(nil)
}