Files
origin/concurrent/worker.go
2024-11-29 13:47:27 +08:00

77 lines
1.1 KiB
Go

package concurrent
import (
"sync"
"errors"
"fmt"
"github.com/duanhf2012/origin/v2/log"
)
type task struct {
queueId int64
fn func() bool
cb func(err error)
}
type worker struct {
*dispatch
}
func (t *task) isExistTask() bool {
return t.fn == nil
}
func (w *worker) start(waitGroup *sync.WaitGroup, t *task, d *dispatch) {
w.dispatch = d
d.workerNum += 1
waitGroup.Add(1)
go w.run(waitGroup, *t)
}
func (w *worker) run(waitGroup *sync.WaitGroup, t task) {
defer waitGroup.Done()
w.exec(&t)
for {
select {
case tw := <-w.workerQueue:
if tw.isExistTask() {
//exit goroutine
log.Info("worker goroutine exit")
return
}
w.exec(&tw)
}
}
}
func (w *worker) exec(t *task) {
defer func() {
if r := recover(); r != nil {
errString := fmt.Sprint(r)
cb := t.cb
t.cb = func(err error) {
cb(errors.New(errString))
}
log.StackError(errString)
w.endCallFun(true, t)
}
}()
w.endCallFun(t.fn(), t)
}
func (w *worker) endCallFun(isDoCallBack bool, t *task) {
if isDoCallBack {
w.pushAsyncDoCallbackEvent(t.cb)
}
if t.queueId != 0 {
w.pushQueueTaskFinishEvent(t.queueId)
}
}