Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle spurious wake ups in BarrierSemaphore #82

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions src/parallel_solve.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ namespace Clasp { namespace mt {
// A combination of a barrier and a semaphore
class BarrierSemaphore {
public:
explicit BarrierSemaphore(int counter = 0, int maxParties = 1) : counter_(counter), active_(maxParties) {}
explicit BarrierSemaphore(int counter = 0, int maxParties = 1)
: counter_(counter), active_(maxParties), wait_{0} , notify_{0} {}
// Initializes this object
// PRE: no thread is blocked on the semaphore
// (i.e. internal counter is >= 0)
Expand Down Expand Up @@ -68,7 +69,12 @@ class BarrierSemaphore {
assert(active_ > 0);
int res = active_--;
if (reset) { unsafe_reset(0); }
else if (unsafe_active()) { counter_ = -active_; lock.unlock(); semCond_.notify_one(); }
else if (unsafe_active()) {
counter_ = -active_;
if (notify_ < wait_) { ++notify_; }
lock.unlock();
semCond_.notify_one();
}
return res;
}
// Waits until all parties have arrived, i.e. called wait.
Expand Down Expand Up @@ -114,6 +120,7 @@ class BarrierSemaphore {
void up(unique_lock<mutex>& m, bool transferLock = true) {
assert(m.owns_lock());
if (++counter_ < 1) {
if (notify_ < wait_) { ++notify_; }
if (transferLock) { m.unlock(); }
semCond_.notify_one();
}
Expand All @@ -127,21 +134,29 @@ class BarrierSemaphore {
void unsafe_reset(uint32 semCount) {
int prev = counter_;
counter_ = semCount;
if (prev < 0) { semCond_.notify_all(); }
if (prev < 0) {
notify_ = wait_;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the mutex be unlocked before notification?

semCond_.notify_all();
}
}
// Returns true for the leader, else false
bool unsafe_wait(unique_lock<mutex>& lock) {
assert(counter_ < 0);
// don't put the last thread to sleep!
if (!unsafe_active()) {
semCond_.wait(lock);
++wait_;
while (notify_ == 0) { semCond_.wait(lock); }
--notify_;
--wait_;
}
return unsafe_active();
}
cv semCond_; // waiting threads
mutex semMutex_; // mutex for updating counter
int counter_; // semaphore's counter
int active_; // number of active threads
int wait_; // number of currently waiting threads
int notify_; // number of unprocessed notifications
};
/////////////////////////////////////////////////////////////////////////////////////////
// ParallelSolve::Impl
Expand Down