-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: use sync.Cond to handle no-task blocking wait #299
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,7 @@ package taskqueue | |
|
||
import ( | ||
"context" | ||
"sync/atomic" | ||
"sync" | ||
"time" | ||
|
||
"github.com/ipfs/go-peertaskqueue" | ||
|
@@ -33,7 +33,7 @@ type WorkerTaskQueue struct { | |
cancelFn func() | ||
peerTaskQueue *peertaskqueue.PeerTaskQueue | ||
workSignal chan struct{} | ||
noTaskSignal chan struct{} | ||
noTaskCond *sync.Cond | ||
ticker *time.Ticker | ||
activeTasks int32 | ||
} | ||
|
@@ -46,7 +46,7 @@ func NewTaskQueue(ctx context.Context) *WorkerTaskQueue { | |
cancelFn: cancelFn, | ||
peerTaskQueue: peertaskqueue.New(), | ||
workSignal: make(chan struct{}, 1), | ||
noTaskSignal: make(chan struct{}, 1), | ||
noTaskCond: sync.NewCond(&sync.Mutex{}), | ||
ticker: time.NewTicker(thawSpeed), | ||
} | ||
} | ||
|
@@ -93,13 +93,11 @@ func (tq *WorkerTaskQueue) Shutdown() { | |
} | ||
|
||
func (tq *WorkerTaskQueue) WaitForNoActiveTasks() { | ||
for atomic.LoadInt32(&tq.activeTasks) > 0 { | ||
select { | ||
case <-tq.ctx.Done(): | ||
return | ||
case <-tq.noTaskSignal: | ||
} | ||
tq.noTaskCond.L.Lock() | ||
for tq.activeTasks > 0 { | ||
tq.noTaskCond.Wait() | ||
} | ||
tq.noTaskCond.L.Unlock() | ||
} | ||
|
||
func (tq *WorkerTaskQueue) worker(executor Executor) { | ||
|
@@ -118,14 +116,16 @@ func (tq *WorkerTaskQueue) worker(executor Executor) { | |
} | ||
} | ||
for _, task := range tasks { | ||
atomic.AddInt32(&tq.activeTasks, 1) | ||
tq.noTaskCond.L.Lock() | ||
tq.activeTasks = tq.activeTasks + 1 | ||
tq.noTaskCond.L.Unlock() | ||
terminate := executor.ExecuteTask(tq.ctx, pid, task) | ||
if atomic.AddInt32(&tq.activeTasks, -1) == 0 { | ||
select { | ||
case tq.noTaskSignal <- struct{}{}: | ||
default: | ||
} | ||
tq.noTaskCond.L.Lock() | ||
tq.activeTasks = tq.activeTasks - 1 | ||
if tq.activeTasks == 0 { | ||
tq.noTaskCond.Broadcast() | ||
} | ||
tq.noTaskCond.L.Unlock() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sync.Cond.Broadcast doesn't require holding the lock to be called, so I wonder if you could make this loop lock-free by also continuing to use sync/atomic for activeTasks. I think that's ideally what we want, because otherwise we're adding some amount of lock contention on each worker. The locks are only held for tiny amounts of time here, but they still add their overhead, whereas atomics are comparatively very cheap. In other words, I think we can just use sync.Cond for its broadcast feature, and continue using atomics for the counter. If our condition was more complex and we couldn't use atomics, then we'd need the shared lock for sure, but I don't think we absolutely need it here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Careful: if a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess this depends on when the Wait call happens. I assume we call Wait before the workers all go idle. If we may call Wait at a later time, then indeed we need something more. Perhaps Wait should first atomically check if we're already idle, and if so, return immediately. Otherwise, block until the next broadcast. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wait, the current code already does this - it only Waits inside the loop after it checks that we're not already idle. I think you're saying that replacing the lock with atomics on the broadcast side could add a race, like the following sequence of events:
Whereas with Rod's current code:
So I think you're right. we don't need to hold the lock to broadcast, like the docs say, but not doing so inserts a form of logic race. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup! This is what I was trying to convey, sorry for being terse! |
||
if terminate { | ||
return | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this looks unnecessary, but my assumption here is that there are no guarantees as to who gets the next mutex lock if there are multiple parties waiting on it - both via
Wait()
andLock()
. So if a second task started and got the lock before our wait caller got a mutex notify then we'd not be in atq.activeTasks>0
condition even after waking up here.At least that's how it would work in C++ and Java. Or do we have stronger guarantees in Go about who gets the lock next after a
Broadcast()
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the loop is right - the sync.Cond.Wait docs use a loop as an example too.