From c8f2179107b485df2f08476e6bc7042a140a82e4 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Fri, 26 Nov 2021 16:06:46 +1100 Subject: [PATCH 1/2] feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests --- taskqueue/taskqueue.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/taskqueue/taskqueue.go b/taskqueue/taskqueue.go index 67810ca3..79ce181a 100644 --- a/taskqueue/taskqueue.go +++ b/taskqueue/taskqueue.go @@ -32,7 +32,9 @@ type WorkerTaskQueue struct { cancelFn func() peerTaskQueue *peertaskqueue.PeerTaskQueue workSignal chan struct{} + noTaskSignal chan struct{} ticker *time.Ticker + activeTasks int } // NewTaskQueue initializes a new queue @@ -43,6 +45,7 @@ func NewTaskQueue(ctx context.Context) *WorkerTaskQueue { cancelFn: cancelFn, peerTaskQueue: peertaskqueue.New(), workSignal: make(chan struct{}, 1), + noTaskSignal: make(chan struct{}, 1), ticker: time.NewTicker(thawSpeed), } } @@ -88,6 +91,16 @@ func (tq *WorkerTaskQueue) Shutdown() { tq.cancelFn() } +func (tq *WorkerTaskQueue) WaitForNoActiveTasks() { + for tq.activeTasks > 0 { + select { + case <-tq.ctx.Done(): + return + case <-tq.noTaskSignal: + } + } +} + func (tq *WorkerTaskQueue) worker(executor Executor) { targetWork := 1 for { @@ -104,7 +117,15 @@ func (tq *WorkerTaskQueue) worker(executor Executor) { } } for _, task := range tasks { + tq.activeTasks++ terminate := executor.ExecuteTask(tq.ctx, pid, task) + tq.activeTasks-- + if tq.activeTasks == 0 { + select { + case tq.noTaskSignal <- struct{}{}: + default: + } + } if terminate { return } From cfef593f66237e038c12c839c8665c468921d904 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Fri, 26 Nov 2021 16:49:56 +1100 Subject: [PATCH 2/2] fixup! feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests --- taskqueue/taskqueue.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/taskqueue/taskqueue.go b/taskqueue/taskqueue.go index 79ce181a..86ba5667 100644 --- a/taskqueue/taskqueue.go +++ b/taskqueue/taskqueue.go @@ -2,6 +2,7 @@ package taskqueue import ( "context" + "sync/atomic" "time" "github.com/ipfs/go-peertaskqueue" @@ -34,7 +35,7 @@ type WorkerTaskQueue struct { workSignal chan struct{} noTaskSignal chan struct{} ticker *time.Ticker - activeTasks int + activeTasks int32 } // NewTaskQueue initializes a new queue @@ -92,7 +93,7 @@ func (tq *WorkerTaskQueue) Shutdown() { } func (tq *WorkerTaskQueue) WaitForNoActiveTasks() { - for tq.activeTasks > 0 { + for atomic.LoadInt32(&tq.activeTasks) > 0 { select { case <-tq.ctx.Done(): return @@ -117,10 +118,9 @@ func (tq *WorkerTaskQueue) worker(executor Executor) { } } for _, task := range tasks { - tq.activeTasks++ + atomic.AddInt32(&tq.activeTasks, 1) terminate := executor.ExecuteTask(tq.ctx, pid, task) - tq.activeTasks-- - if tq.activeTasks == 0 { + if atomic.AddInt32(&tq.activeTasks, -1) == 0 { select { case tq.noTaskSignal <- struct{}{}: default: