diff --git a/README.md b/README.md index 2ed26b4..1a889e9 100644 --- a/README.md +++ b/README.md @@ -767,34 +767,57 @@ func (slf *TestService7) GoTest(){ //slf.OpenConcurrent(5, 10, 1000000) ``` -普通调用可以使用以下方法: +使用示例如下: ``` -func (slf *TestService1) testAsyncDo() { + +func (slf *TestService13) testAsyncDo() { var context struct { data int64 } - slf.AsyncDo(func() { + + //1.示例普通使用 + //参数一的函数在其他协程池中执行完成,将执行完成事件放入服务工作协程, + //参数二的函数在服务协程中执行,是协程安全的。 + slf.AsyncDo(func() bool { //该函数回调在协程池中执行 context.data = 100 + return true }, func(err error) { //函数将在服务协程中执行 fmt.Print(context.data) //显示100 }) -} -``` -以下方法将函数扔到任务管道中,由协程池去抢执行。但某些任务是由先后顺序的,可以使用以下方法: -``` -func (slf *TestService1) testAsyncDoByQueue() { - queueId := int64(1) + //2.示例按队列顺序 + //参数一传入队列Id,同一个队列Id将在协程池中被排队执行 //以下进行两次调用,因为两次都传入参数queueId都为1,所以它们会都进入queueId为1的排队执行 + queueId := int64(1) for i := 0; i < 2; i++ { - slf.AsyncDoByQueue(queueId, func() { + slf.AsyncDoByQueue(queueId, func() bool { //该函数会被2次调用,但是会排队执行 + return true }, func(err error) { //函数将在服务协程中执行 }) } + + //3.函数参数可以某中一个为空 + //参数二函数将被延迟执行 + slf.AsyncDo(nil, func(err error) { + //将在下 + }) + + //参数一函数在协程池中执行,但没有在服务协程中回调 + slf.AsyncDo(func() bool { + return true + }, nil) + + //4.函数返回值控制不进行回调 + slf.AsyncDo(func() bool { + //返回false时,参数二函数将不会被执行; 为true时,则会被执行 + return false + }, func(err error) { + //该函数将不会被执行 + }) } ``` diff --git a/concurrent/concurrent.go b/concurrent/concurrent.go index 419f714..08c5826 100644 --- a/concurrent/concurrent.go +++ b/concurrent/concurrent.go @@ -12,8 +12,8 @@ const defaultMaxTaskChannelNum = 1000000 type IConcurrent interface { OpenConcurrentByNumCPU(cpuMul float32) OpenConcurrent(minGoroutineNum int32, maxGoroutineNum int32, maxTaskChannelNum int) - AsyncDoByQueue(queueId int64, fn func(), cb func(err error)) - AsyncDo(f func(), cb func(err error)) + AsyncDoByQueue(queueId int64, fn func() bool, cb func(err error)) + AsyncDo(f func() bool, cb func(err error)) } type Concurrent struct { @@ -40,11 +40,11 @@ func (c *Concurrent) OpenConcurrent(minGoroutineNum int32, maxGoroutineNum int32 c.dispatch.open(minGoroutineNum, maxGoroutineNum, c.tasks, c.cbChannel) } -func (c *Concurrent) AsyncDo(f func(), cb func(err error)) { +func (c *Concurrent) AsyncDo(f func() bool, cb func(err error)) { c.AsyncDoByQueue(0, f, cb) } -func (c *Concurrent) AsyncDoByQueue(queueId int64, fn func(), cb func(err error)) { +func (c *Concurrent) AsyncDoByQueue(queueId int64, fn func() bool, cb func(err error)) { if cap(c.tasks) == 0 { panic("not open concurrent") } diff --git a/concurrent/worker.go b/concurrent/worker.go index 682943d..80f0400 100644 --- a/concurrent/worker.go +++ b/concurrent/worker.go @@ -12,7 +12,7 @@ import ( type task struct { queueId int64 - fn func() + fn func() bool cb func(err error) } @@ -60,17 +60,18 @@ func (w *worker) exec(t *task) { cb(errors.New(errString)) } - w.endCallFun(t) + w.endCallFun(true,t) log.SError("core dump info[", errString, "]\n", string(buf[:l])) } }() - t.fn() - w.endCallFun(t) + w.endCallFun(t.fn(),t) } -func (w *worker) endCallFun(t *task) { - w.pushAsyncDoCallbackEvent(t.cb) +func (w *worker) endCallFun(isDocallBack bool,t *task) { + if isDocallBack { + w.pushAsyncDoCallbackEvent(t.cb) + } if t.queueId != 0 { w.pushQueueTaskFinishEvent(t.queueId)