diff --git a/taskqueue/taskqueue.go b/taskqueue/taskqueue.go index 86ba5667..a1414735 100644 --- a/taskqueue/taskqueue.go +++ b/taskqueue/taskqueue.go @@ -2,7 +2,7 @@ package taskqueue import ( "context" - "sync/atomic" + "sync" "time" "github.com/ipfs/go-peertaskqueue" @@ -33,7 +33,7 @@ type WorkerTaskQueue struct { cancelFn func() peerTaskQueue *peertaskqueue.PeerTaskQueue workSignal chan struct{} - noTaskSignal chan struct{} + noTaskCond *sync.Cond ticker *time.Ticker activeTasks int32 } @@ -46,7 +46,7 @@ func NewTaskQueue(ctx context.Context) *WorkerTaskQueue { cancelFn: cancelFn, peerTaskQueue: peertaskqueue.New(), workSignal: make(chan struct{}, 1), - noTaskSignal: make(chan struct{}, 1), + noTaskCond: sync.NewCond(&sync.Mutex{}), ticker: time.NewTicker(thawSpeed), } } @@ -93,13 +93,11 @@ func (tq *WorkerTaskQueue) Shutdown() { } func (tq *WorkerTaskQueue) WaitForNoActiveTasks() { - for atomic.LoadInt32(&tq.activeTasks) > 0 { - select { - case <-tq.ctx.Done(): - return - case <-tq.noTaskSignal: - } + tq.noTaskCond.L.Lock() + for tq.activeTasks > 0 { + tq.noTaskCond.Wait() } + tq.noTaskCond.L.Unlock() } func (tq *WorkerTaskQueue) worker(executor Executor) { @@ -118,14 +116,16 @@ func (tq *WorkerTaskQueue) worker(executor Executor) { } } for _, task := range tasks { - atomic.AddInt32(&tq.activeTasks, 1) + tq.noTaskCond.L.Lock() + tq.activeTasks = tq.activeTasks + 1 + tq.noTaskCond.L.Unlock() terminate := executor.ExecuteTask(tq.ctx, pid, task) - if atomic.AddInt32(&tq.activeTasks, -1) == 0 { - select { - case tq.noTaskSignal <- struct{}{}: - default: - } + tq.noTaskCond.L.Lock() + tq.activeTasks = tq.activeTasks - 1 + if tq.activeTasks == 0 { + tq.noTaskCond.Broadcast() } + tq.noTaskCond.L.Unlock() if terminate { return }