Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add WorkerTaskQueue#WaitForNoActiveTasks() for tests #284

Merged
merged 3 commits into from
Nov 30, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions taskqueue/taskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package taskqueue

import (
"context"
"sync/atomic"
"time"

"github.com/ipfs/go-peertaskqueue"
Expand Down Expand Up @@ -32,7 +33,9 @@ type WorkerTaskQueue struct {
cancelFn func()
peerTaskQueue *peertaskqueue.PeerTaskQueue
workSignal chan struct{}
noTaskSignal chan struct{}
ticker *time.Ticker
activeTasks int32
}

// NewTaskQueue initializes a new queue
Expand All @@ -43,6 +46,7 @@ func NewTaskQueue(ctx context.Context) *WorkerTaskQueue {
cancelFn: cancelFn,
peerTaskQueue: peertaskqueue.New(),
workSignal: make(chan struct{}, 1),
noTaskSignal: make(chan struct{}, 1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My two cents, and this might be why the method isn't working as expected for you:

  • the channel only buffers one "no tasks" signal
  • worker only tries to signal via the channel when finishing a task results in activeTasks being 0
  • WaitForNoActiveTasks consumes that channel signal

So, for instance, if two WaitForNoActiveTasks calls come in and the worker performs one task, it will only send one signal to the channel, and only one of the two WaitForNoActiveTasks calls will grab that signal - the other call will potentially block forever.

You want a "broadcast signal", which unfortunately doesn't have a primitive in Go right now - channels are one-to-one. You can use close as a way to broadcast to infinite receivers, but that closes the channel forever, not allowing its reuse.

If you really only call WaitForNoActiveTasks once at a time, then your code isn't buggy, but I'd still argue it's prone to misuse :) So I would adapt its code to make it panic if called while it's already waiting.

If you do want to support concurrent Wait calls, then we'd need a different approach. Happy to bounce some ideas if that's the case.

ticker: time.NewTicker(thawSpeed),
}
}
Expand Down Expand Up @@ -88,6 +92,16 @@ func (tq *WorkerTaskQueue) Shutdown() {
tq.cancelFn()
}

func (tq *WorkerTaskQueue) WaitForNoActiveTasks() {
for atomic.LoadInt32(&tq.activeTasks) > 0 {
select {
case <-tq.ctx.Done():
return
case <-tq.noTaskSignal:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this case return too? if you got the signal that there were zero active tasks a moment ago, you should be able to return.

Perhaps what you're trying to do is double-check that with the activeTasks counter, but then WaitForNoActiveTasks is blocking for the counter to reach zero on two instances, not just one. I imagine just one is still fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you apply this change, then you can also simplify the code by using an if rather than a for on the atomic load.

}
}
}

func (tq *WorkerTaskQueue) worker(executor Executor) {
targetWork := 1
for {
Expand All @@ -104,7 +118,14 @@ func (tq *WorkerTaskQueue) worker(executor Executor) {
}
}
for _, task := range tasks {
atomic.AddInt32(&tq.activeTasks, 1)
terminate := executor.ExecuteTask(tq.ctx, pid, task)
if atomic.AddInt32(&tq.activeTasks, -1) == 0 {
select {
case tq.noTaskSignal <- struct{}{}:
default:
}
}
if terminate {
return
}
Expand Down