Skip to content

Commit

Permalink
Issue #80: Handle spurious wakeups.
Browse files Browse the repository at this point in the history
* BarrierSemaphore failed to handle spurious wakeups correctly. In
  particular, a spurious wakeup in BarrierSemaphore::down() could result
  in an early termination from requestWork() since one thread could
  decrease the semaphore's counter multiple times thereby activating
  the barrier condition although not all threads are blocked.

* Drop unecessary BarrierSemaphore in favor of two specific wait ops on
  a shared condition variable and a wait counter. On sync reqeusts,
  waitSync() is used to wait until the sync flag is no longer set. When
  a thread needs work, it calls waitWork() waiting until the work queue
  becomes non-empty. In either case, the wait fails if the caller is the
  last non-waiting thread. Since all waits are now coupled to a specific
  condition, spurious wakeups are benign.
  • Loading branch information
Benjamin Kaufmann committed Jul 24, 2022
1 parent b0b2e0c commit febd72e
Showing 1 changed file with 76 additions and 148 deletions.
224 changes: 76 additions & 148 deletions src/parallel_solve.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (c) 2010-2017 Benjamin Kaufmann
// Copyright (c) 2010-2022 Benjamin Kaufmann
//
// This file is part of Clasp. See http://www.cs.uni-potsdam.de/clasp/
//
Expand Down Expand Up @@ -31,123 +31,11 @@
#include <potassco/string_convert.h>
namespace Clasp { namespace mt {
/////////////////////////////////////////////////////////////////////////////////////////
// BarrierSemaphore
/////////////////////////////////////////////////////////////////////////////////////////
// A combination of a barrier and a semaphore
class BarrierSemaphore {
public:
explicit BarrierSemaphore(int counter = 0, int maxParties = 1) : counter_(counter), active_(maxParties) {}
// Initializes this object
// PRE: no thread is blocked on the semaphore
// (i.e. internal counter is >= 0)
// NOTE: not thread-safe
void unsafe_init(int counter = 0, int maxParties = 1) {
counter_ = counter;
active_ = maxParties;
}
// Returns the current semaphore counter.
int counter() { lock_guard<mutex> lock(semMutex_); return counter_; }
// Returns the number of parties required to trip this barrier.
int parties() { lock_guard<mutex> lock(semMutex_); return active_; }
// Returns true if all parties are waiting at the barrier
bool active() { lock_guard<mutex> lock(semMutex_); return unsafe_active(); }

// barrier interface

// Increases the barrier count, i.e. the number of
// parties required to trip this barrier.
void addParty() {
lock_guard<mutex> lock(semMutex_);
++active_;
}
// Decreases the barrier count and resets the barrier
// if reset is true.
// PRE: the thread does not itself wait on the barrier
int removeParty(bool reset) {
unique_lock<mutex> lock(semMutex_);
assert(active_ > 0);
int res = active_--;
if (reset) { unsafe_reset(0); }
else if (unsafe_active()) { counter_ = -active_; lock.unlock(); semCond_.notify_one(); }
return res;
}
// Waits until all parties have arrived, i.e. called wait.
// Exactly one of the parties will receive a return value of true,
// the others will receive a value of false.
// Applications shall use this value to designate one thread as the
// leader that will eventually reset the barrier thereby unblocking the other threads.
bool wait() {
unique_lock<mutex> lock(semMutex_);
if (--counter_ >= 0) { counter_ = -1; }
return unsafe_wait(lock);
}
// Resets the barrier and unblocks any waiting threads.
void reset(uint32 semCount = 0) {
lock_guard<mutex> lock(semMutex_);
unsafe_reset(semCount);
}
// semaphore interface

// Decrement the semaphore's counter.
// If the counter is zero or less prior to the call
// the calling thread is suspended.
// Returns false to signal that all but the calling thread
// are currently blocked.
bool down() {
unique_lock<mutex> lock(semMutex_);
return down(lock);
}
// LOW-LEVEL version of down
bool down(unique_lock<mutex>& m) {
assert(m.owns_lock());
if (--counter_ >= 0) { return true; }
return !unsafe_wait(m);
}
// Increments the semaphore's counter and resumes
// one thread which has called down() if the counter
// was less than zero prior to the call.
void up() {
unique_lock<mutex> lock(semMutex_);
up(lock);
}
// LOW-LEVEL version of up
void up(unique_lock<mutex>& m, bool transferLock = true) {
assert(m.owns_lock());
if (++counter_ < 1) {
if (transferLock) { m.unlock(); }
semCond_.notify_one();
}
}
mutex& toMutex() { return semMutex_; }
private:
BarrierSemaphore(const BarrierSemaphore&);
BarrierSemaphore& operator=(const BarrierSemaphore&);
typedef condition_variable cv;
bool unsafe_active() const { return -counter_ >= active_; }
void unsafe_reset(uint32 semCount) {
int prev = counter_;
counter_ = semCount;
if (prev < 0) { semCond_.notify_all(); }
}
// Returns true for the leader, else false
bool unsafe_wait(unique_lock<mutex>& lock) {
assert(counter_ < 0);
// don't put the last thread to sleep!
if (!unsafe_active()) {
semCond_.wait(lock);
}
return unsafe_active();
}
cv semCond_; // waiting threads
mutex semMutex_; // mutex for updating counter
int counter_; // semaphore's counter
int active_; // number of active threads
};
/////////////////////////////////////////////////////////////////////////////////////////
// ParallelSolve::Impl
/////////////////////////////////////////////////////////////////////////////////////////
struct ParallelSolve::SharedData {
typedef PodQueue<const LitVec*> Queue;
typedef condition_variable ConditionVar;
enum MsgFlag {
terminate_flag = 1u, sync_flag = 2u, split_flag = 4u,
restart_flag = 8u, complete_flag = 16u,
Expand Down Expand Up @@ -188,10 +76,11 @@ struct ParallelSolve::SharedData {
void reset(SharedContext* a_ctx) {
clearQueue();
syncT.reset();
workSem.unsafe_init(0, a_ctx ? a_ctx->concurrency() : 0);
msg.clear();
globalR.reset();
maxConflict = globalR.current();
threads = a_ctx ? a_ctx->concurrency() : 0;
waiting = 0;
errorSet = 0;
initVec = 0;
ctx = a_ctx;
Expand Down Expand Up @@ -222,23 +111,57 @@ struct ParallelSolve::SharedData {
if (!allowSplit()) { return 0; }
// try to get work from split
ctx->report(MessageEvent(s, "SPLIT", MessageEvent::sent));
const uint32 flags = uint32(terminate_flag) | uint32(sync_flag);
for (unique_lock<mutex> lock(workSem.toMutex());;) {
return waitWork();
}
void pushWork(const LitVec* v) {
unique_lock<mutex> lock(workM);
workQ.push(v);
notifyWaitingThreads(&lock, 1);
}
const LitVec* waitWork(bool postSplit = true) {
for (unique_lock<mutex> lock(workM); !hasControl(uint32(terminate_flag|sync_flag));) {
if (!workQ.empty()) {
const LitVec* res = workQ.pop_ret();
if (workQ.empty()) { workQ.clear(); }
return res;
}
postMessage(SharedData::msg_split, false);
if (!workSem.down(lock) || hasControl(flags)) {
return 0;
postSplit = postSplit && !postMessage(SharedData::msg_split, false);
if (!enterWait(lock))
break;
}
return 0;
}
void notifyWaitingThreads(unique_lock<mutex>* lock = 0, int n = 0) {
assert(!lock || lock->owns_lock());
if (lock)
lock->unlock();
else
unique_lock<mutex> preventLostWakeup(workM);
n == 1 ? workCond.notify_one() : workCond.notify_all();
}
bool enterWait(unique_lock<mutex>& lock) {
assert(lock.owns_lock());
if ((waiting + 1) >= threads)
return false;
++waiting;
workCond.wait(lock);
--waiting;
return true;
}
bool waitSync() {
for (unique_lock<mutex> lock(workM); synchronize();) {
if (!enterWait(lock)) {
assert(synchronize());
return true;
}
}
return false;
}
void pushWork(const LitVec* v) {
unique_lock<mutex> lock(workSem.toMutex());
workQ.push(v);
workSem.up(lock);
void leaveAlgorithm() {
assert(threads);
unique_lock<mutex> lock(workM);
--threads;
notifyWaitingThreads(&lock);
}
// MESSAGES
bool postMessage(Message m, bool notify);
Expand Down Expand Up @@ -268,14 +191,17 @@ struct ParallelSolve::SharedData {
GeneratorPtr generator; // optional data for model generation
Timer<RealTime> syncT; // thread sync time
mutex modelM; // model-mutex
BarrierSemaphore workSem; // work-semaphore
Queue workQ; // work-queue (must be protected by workSem)
mutex workM; // work-mutex
ConditionVar workCond; // work-condition
Queue workQ; // work-queue (must be protected by workM)
uint32 waiting; // number of worker threads waiting on workCond
uint32 nextId; // next solver id to use
LowerBound lower_; // last reported lower bound (if any)
LowerBound lower; // last reported lower bound (if any)
atomic<uint32> threads; // number of threads in the algorithm
atomic<int> workReq; // > 0: someone needs work
atomic<uint32> restartReq; // == numThreads(): restart
atomic<uint32> control; // set of active message flags
atomic<uint32> modCount; // coounter for synchronizing models
atomic<uint32> modCount; // counter for synchronizing models
uint32 errorCode; // global error code
};

Expand All @@ -287,8 +213,8 @@ bool ParallelSolve::SharedData::postMessage(Message m, bool notifyWaiting) {
}
else if (setControl(m)) {
// control message - notify all if requested
if (notifyWaiting) workSem.reset();
if ((uint32(m) & uint32(sync_flag|terminate_flag)) != 0) {
if (notifyWaiting) notifyWaitingThreads();
if ((uint32(m) & uint32(terminate_flag|sync_flag)) != 0) {
syncT.reset();
syncT.start();
}
Expand Down Expand Up @@ -327,7 +253,7 @@ ParallelSolve::~ParallelSolve() {
// algorithm was not started but there may be active threads -
// force orderly shutdown
ParallelSolve::doInterrupt();
shared_->workSem.removeParty(true);
shared_->notifyWaitingThreads();
joinThreads();
}
destroyThread(masterId);
Expand All @@ -340,8 +266,8 @@ bool ParallelSolve::beginSolve(SharedContext& ctx, const LitVec& path) {
shared_->reset(&ctx);
if (!enumerator().supportsParallel() && numThreads() > 1) {
ctx.warn("Selected reasoning mode implies #Threads=1.");
shared_->workSem.unsafe_init(1);
modeSplit_ = false;
shared_->threads = 1;
modeSplit_ = false;
ctx.setConcurrency(1, SharedContext::resize_reserve);
}
shared_->setControl(modeSplit_ ? SharedData::allow_split_flag : SharedData::forbid_restart_flag);
Expand Down Expand Up @@ -385,7 +311,7 @@ void ParallelSolve::setRestarts(uint32 maxR, const ScheduleStrategy& rs) {
shared_->maxConflict = shared_->globalR.current();
}

uint32 ParallelSolve::numThreads() const { return shared_->workSem.parties(); }
uint32 ParallelSolve::numThreads() const { return shared_->threads; }

void ParallelSolve::allocThread(uint32 id, Solver& s) {
if (!thread_) {
Expand Down Expand Up @@ -455,7 +381,7 @@ int ParallelSolve::doNext(int) {
int s = shared_->generator->state;
if (s != SharedData::Generator::done) {
shared_->generator->notify(SharedData::Generator::search);
if ((s = shared_->generator->waitWhile(SharedData::Generator::search)) == SharedData::Generator::model) {
if (shared_->generator->waitWhile(SharedData::Generator::search) == SharedData::Generator::model) {
return value_true;
}
}
Expand Down Expand Up @@ -505,6 +431,11 @@ void ParallelSolve::solveParallel(uint32 id) {
shared_->generator->waitWhile(SharedData::Generator::start);
}
try {
struct Scoped {
Scoped(SharedData* s) : shared(s) {}
~Scoped() { shared->leaveAlgorithm(); }
SharedData* shared;
} scoped(shared_);
// establish solver<->handler connection and attach to shared context
// should this fail because of an initial conflict, we'll terminate
// in requestWork.
Expand All @@ -525,12 +456,9 @@ void ParallelSolve::solveParallel(uint32 id) {
}
catch (const std::bad_alloc&) { exception(id, a, OutOfMemory, "bad alloc"); }
catch (const std::logic_error& e) { exception(id, a, LogicError, e.what()); }
catch (const std::runtime_error& e) { exception(id, a, RuntimeError, e.what()); }
catch (const std::exception& e) { exception(id, a, RuntimeError, e.what()); }
catch (...) { exception(id, a, UnknownError, "unknown"); }
assert(shared_->terminate() || thread_[id]->error());
// this thread is leaving
int active = shared_->workSem.removeParty(shared_->terminate());
// update stats
s.stats.accu(agg);
if (id != masterId) {
Expand All @@ -540,23 +468,22 @@ void ParallelSolve::solveParallel(uint32 id) {
thread_[id]->detach(*shared_->ctx, shared_->interrupt());
s.stats.addCpuTime(ThreadTime::getTime());
}
if (active == 1 && shared_->generator.get()) {
if (numThreads() == 0 && shared_->generator.get()) {
shared_->generator->notify(SharedData::Generator::done);
}
}

void ParallelSolve::exception(uint32 id, PathPtr& path, ErrorCode e, const char* what) {
try {
if (!thread_[id]->setError(e) || e != OutOfMemory || shared_->workSem.active()) {
if (!thread_[id]->setError(e) || e != OutOfMemory || numThreads() == 0) {
ParallelSolve::doInterrupt();
if (shared_->errorSet.fetch_or(bit_mask<uint64>(id)) == 0) {
shared_->errorCode = e;
shared_->msg.appendFormat("[%u]: %s", id, what);
}
}
else if (path.get() && shared_->allowSplit()) {
shared_->workQ.push(path.release());
shared_->workSem.up();
shared_->pushWork(path.release());
}
reportProgress(thread_[id]->solver(), e == OutOfMemory ? "Thread failed with out of memory" : "Thread failed with error");
}
Expand Down Expand Up @@ -594,12 +521,12 @@ bool ParallelSolve::requestWork(Solver& s, PathPtr& out) {
if (a == shared_->path) { out.release(); }
// propagate any new facts before starting new work
if (s.simplify()) { return true; }
// s now has a conflict - either an artifical stop conflict
// s now has a conflict - either an artificial stop conflict
// or a real conflict - we'll handle it in the next iteration
// via the call to popRootLevel()
popped = 0;
}
else if (!shared_->allowSplit() || !shared_->synchronize()) {
else if (!shared_->synchronize()) {
// no work left - quitting time?
terminate(s, true);
}
Expand Down Expand Up @@ -634,7 +561,7 @@ bool ParallelSolve::waitOnSync(Solver& s) {
}
bool hasPath = thread_[s.id()]->hasPath();
bool tentative= enumerator().tentative();
if (shared_->workSem.wait()) {
if (shared_->waitSync()) {
// last man standing - complete synchronization request
shared_->workReq = 0;
shared_->restartReq = 0;
Expand All @@ -659,8 +586,9 @@ bool ParallelSolve::waitOnSync(Solver& s) {
shared_->clearControl(SharedData::msg_split | SharedData::msg_sync_restart | SharedData::restart_abandoned_flag | SharedData::cancel_restart_flag);
shared_->syncT.lap();
reportProgress(MessageEvent(s, "SYNC", MessageEvent::completed, shared_->syncT.elapsed()));
assert(!shared_->synchronize());
// wake up all blocked threads
shared_->workSem.reset();
shared_->notifyWaitingThreads();
}
return shared_->terminate() || (hasPath && !shared_->hasControl(SharedData::restart_abandoned_flag));
}
Expand Down Expand Up @@ -708,8 +636,8 @@ bool ParallelSolve::commitUnsat(Solver& s) {
++shared_->modCount;
if (s.lower.bound > 0) {
lock.lock();
if (s.lower.bound > shared_->lower_.bound || s.lower.level > shared_->lower_.level) {
shared_->lower_ = s.lower;
if (s.lower.bound > shared_->lower.bound || s.lower.level > shared_->lower.level) {
shared_->lower = s.lower;
reportUnsat(s);
++shared_->modCount;
}
Expand Down

0 comments on commit febd72e

Please sign in to comment.