Skip to content

Commit

Permalink
Windows PCQueue support without Boost (#106)
Browse files Browse the repository at this point in the history
  • Loading branch information
kpu authored Apr 22, 2021
1 parent c00c263 commit 1184875
Showing 1 changed file with 151 additions and 105 deletions.
256 changes: 151 additions & 105 deletions src/translator/pcqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
#include <mutex>

#ifdef __APPLE__
#include <mach/mach.h>
#include <mach/mach_traps.h>
#include <mach/semaphore.h>
#include <mach/task.h>
#include <mach/mach_traps.h>
#include <mach/mach.h>
#elif defined(__linux)
#include <semaphore.h>
#elif defined(_WIN32) || defined(_WIN64)
#include <windows.h>
#else
#include <boost/interprocess/sync/interprocess_semaphore.hpp>
#endif
Expand All @@ -35,67 +37,107 @@ namespace bergamot {
#ifdef __APPLE__

class Semaphore {
public:
explicit Semaphore(int value) : task_(mach_task_self()) {
ABORT_IF(KERN_SUCCESS !=
semaphore_create(task_, &back_, SYNC_POLICY_FIFO, value),
"Could not create semaphore");
}
public:
explicit Semaphore(int value) : task_(mach_task_self()) {
ABORT_IF(KERN_SUCCESS != semaphore_create(task_, &back_, SYNC_POLICY_FIFO, value), "Could not create semaphore");
}

~Semaphore() {
if (KERN_SUCCESS != semaphore_destroy(task_, back_)) {
std::cerr << "Could not destroy semaphore" << std::endl;
abort();
~Semaphore() {
if (KERN_SUCCESS != semaphore_destroy(task_, back_)) {
std::cerr << "Could not destroy semaphore" << std::endl;
abort();
}
}
}

void wait() {
ABORT_IF(KERN_SUCCESS != semaphore_wait(back_),
"Wait for semaphore failed");
}
void wait() {
ABORT_IF(KERN_SUCCESS != semaphore_wait(back_), "Wait for semaphore failed");
}

void post() {
ABORT_IF(KERN_SUCCESS != semaphore_signal(back_),
"Could not post to semaphore");
}
void post() {
ABORT_IF(KERN_SUCCESS != semaphore_signal(back_), "Could not post to semaphore");
}

private:
semaphore_t back_;
task_t task_;
private:
semaphore_t back_;
task_t task_;
};

inline void WaitSemaphore(Semaphore &semaphore) { semaphore.wait(); }
inline void WaitSemaphore(Semaphore &semaphore) {
semaphore.wait();
}

#elif defined(__linux)

class Semaphore {
public:
explicit Semaphore(unsigned int value) {
ABORT_IF(sem_init(&sem_, 0, value), "Could not create semaphore");
}
public:
explicit Semaphore(unsigned int value) {
ABORT_IF(sem_init(&sem_, 0, value), "Could not create semaphore");
}

~Semaphore() {
if (-1 == sem_destroy(&sem_)) {
std::cerr << "Could not destroy semaphore " << std::endl;
abort();
~Semaphore() {
if (-1 == sem_destroy(&sem_)) {
std::cerr << "Could not destroy semaphore" << std::endl;
abort();
}
}
}

void wait() {
while (UTIL_UNLIKELY(-1 == sem_wait(&sem_))) {
ABORT_IF(errno != EINTR, "Wait for semaphore failed");
void wait() {
while (-1 == sem_wait(&sem_)) {
ABORT_IF(errno != EINTR, "Wait for semaphore failed");
}
}
}

void post() {
ABORT_IF(-1 == sem_post(&sem_), "Could not post to semaphore");
}
void post() {
ABORT_IF(-1 == sem_post(&sem_), "Could not post to semaphore");
}

private:
sem_t sem_;
private:
sem_t sem_;
};

inline void WaitSemaphore(Semaphore &semaphore) { semaphore.wait(); }
inline void WaitSemaphore(Semaphore &semaphore) {
semaphore.wait();
}

#elif defined(_WIN32) || defined(_WIN64)

class Semaphore {
public:
explicit Semaphore(LONG value) : sem_(CreateSemaphoreA(NULL, value, 2147483647, NULL)) {
ABORT_IF(!sem_, "Could not CreateSemaphore {}", GetLastError());
}

~Semaphore() {
CloseHandle(sem_);
}


void wait() {
while (true) {
switch (WaitForSingleObject(sem_, 0L)) {
case WAIT_OBJECT_0:
return;
case WAIT_ABANDONED:
ABORT("A semaphore can't be abandoned, confused by Windows");
case WAIT_TIMEOUT:
continue;
case WAIT_FAILED:
ABORT("Waiting on Semaphore failed {}", GetLastError());
}
}
}

void post() {
ABORT_IF(!ReleaseSemaphore(sem_, 1, NULL), "Failed to release Semaphore {}", GetLastError());
}

private:
HANDLE sem_;
};

inline void WaitSemaphore(Semaphore &semaphore) {
semaphore.wait();
}

#else
typedef boost::interprocess::interprocess_semaphore Semaphore;
Expand All @@ -113,7 +155,7 @@ inline void WaitSemaphore(Semaphore &on) {
}
}

#endif // Apple
#endif // Cases for semaphore support

/**
* Producer consumer queue safe for multiple producers and multiple consumers.
Expand All @@ -124,11 +166,13 @@ inline void WaitSemaphore(Semaphore &on) {
* throw.
*/
template <class T> class PCQueue {
public:
public:
explicit PCQueue(size_t size)
: empty_(size), used_(0), storage_(new T[size]),
end_(storage_.get() + size), produce_at_(storage_.get()),
consume_at_(storage_.get()) {}
: empty_(size), used_(0),
storage_(new T[size]),
end_(storage_.get() + size),
produce_at_(storage_.get()),
consume_at_(storage_.get()) {}

// Add a value to the queue.
void Produce(const T &val) {
Expand All @@ -141,8 +185,7 @@ template <class T> class PCQueue {
empty_.post();
throw;
}
if (++produce_at_ == end_)
produce_at_ = storage_.get();
if (++produce_at_ == end_) produce_at_ = storage_.get();
}
used_.post();
}
Expand All @@ -158,14 +201,14 @@ template <class T> class PCQueue {
empty_.post();
throw;
}
if (++produce_at_ == end_)
produce_at_ = storage_.get();
if (++produce_at_ == end_) produce_at_ = storage_.get();
}
used_.post();
}


// Consume a value, assigning it to out.
T &Consume(T &out) {
T& Consume(T &out) {
WaitSemaphore(used_);
{
std::lock_guard<std::mutex> consume_lock(consume_at_mutex_);
Expand All @@ -175,15 +218,14 @@ template <class T> class PCQueue {
used_.post();
throw;
}
if (++consume_at_ == end_)
consume_at_ = storage_.get();
if (++consume_at_ == end_) consume_at_ = storage_.get();
}
empty_.post();
return out;
}

// Consume a value, swapping it to out.
T &ConsumeSwap(T &out) {
T& ConsumeSwap(T &out) {
WaitSemaphore(used_);
{
std::lock_guard<std::mutex> consume_lock(consume_at_mutex_);
Expand All @@ -193,13 +235,13 @@ template <class T> class PCQueue {
used_.post();
throw;
}
if (++consume_at_ == end_)
consume_at_ = storage_.get();
if (++consume_at_ == end_) consume_at_ = storage_.get();
}
empty_.post();
return out;
}


// Convenience version of Consume that copies the value to return.
// The other version is faster.
T Consume() {
Expand All @@ -208,7 +250,7 @@ template <class T> class PCQueue {
return ret;
}

private:
private:
// Number of empty spaces in storage_.
Semaphore empty_;
// Number of occupied spaces in storage_.
Expand All @@ -234,63 +276,67 @@ template <class T> struct UnboundedPage {
};

template <class T> class UnboundedSingleQueue {
public:
UnboundedSingleQueue() : valid_(0) {
SetFilling(new UnboundedPage<T>());
SetReading(filling_);
}
public:
UnboundedSingleQueue() : valid_(0) {
SetFilling(new UnboundedPage<T>());
SetReading(filling_);
}

void Produce(T &&val) {
if (filling_current_ == filling_end_) {
UnboundedPage<T> *next = new UnboundedPage<T>();
filling_->next = next;
SetFilling(next);
void Produce(T &&val) {
if (filling_current_ == filling_end_) {
UnboundedPage<T> *next = new UnboundedPage<T>();
filling_->next = next;
SetFilling(next);
}
*(filling_current_++) = std::move(val);
valid_.post();
}
*(filling_current_++) = std::move(val);
valid_.post();
}

void Produce(const T &val) { Produce(T(val)); }
void Produce(const T &val) {
Produce(T(val));
}

T &Consume(T &out) {
WaitSemaphore(valid_);
if (reading_current_ == reading_end_) {
SetReading(reading_->next);
T& Consume(T &out) {
WaitSemaphore(valid_);
if (reading_current_ == reading_end_) {
SetReading(reading_->next);
}
out = std::move(*(reading_current_++));
return out;
}
out = std::move(*(reading_current_++));
return out;
}

// Warning: very much a no-guarantees race-condition-rich implementation!
// But sufficient for our specific purpose: The single thread that consumes
// is also the only one that checks Empty, and knows that it's racing.
bool Empty() const { return reading_current_ == filling_current_; }
// Warning: very much a no-guarantees race-condition-rich implementation!
// But sufficient for our specific purpose: The single thread that consumes
// is also the only one that checks Empty, and knows that it's racing.
bool Empty() const {
return reading_current_ == filling_current_;
}

private:
void SetFilling(UnboundedPage<T> *to) {
filling_ = to;
filling_current_ = to->entries;
filling_end_ = filling_current_ + sizeof(to->entries) / sizeof(T);
}
void SetReading(UnboundedPage<T> *to) {
reading_.reset(to);
reading_current_ = to->entries;
reading_end_ = reading_current_ + sizeof(to->entries) / sizeof(T);
}
private:
void SetFilling(UnboundedPage<T> *to) {
filling_ = to;
filling_current_ = to->entries;
filling_end_ = filling_current_ + sizeof(to->entries) / sizeof(T);
}
void SetReading(UnboundedPage<T> *to) {
reading_.reset(to);
reading_current_ = to->entries;
reading_end_ = reading_current_ + sizeof(to->entries) / sizeof(T);
}

Semaphore valid_;
Semaphore valid_;

UnboundedPage<T> *filling_;
UnboundedPage<T> *filling_;

std::unique_ptr<UnboundedPage<T>> reading_;
std::unique_ptr<UnboundedPage<T> > reading_;

T *filling_current_;
T *filling_end_;
T *reading_current_;
T *reading_end_;
T *filling_current_;
T *filling_end_;
T *reading_current_;
T *reading_end_;

UnboundedSingleQueue(const UnboundedSingleQueue &) = delete;
UnboundedSingleQueue &operator=(const UnboundedSingleQueue &) = delete;
UnboundedSingleQueue(const UnboundedSingleQueue &) = delete;
UnboundedSingleQueue &operator=(const UnboundedSingleQueue &) = delete;
};

} // namespace bergamot
Expand Down

0 comments on commit 1184875

Please sign in to comment.