Skip to content

Commit

Permalink
Added notifications to WaitableAtomic<>.
Browse files Browse the repository at this point in the history
  • Loading branch information
dkorolev committed Apr 12, 2024
1 parent b34fad8 commit 8229075
Showing 1 changed file with 47 additions and 1 deletion.
48 changes: 47 additions & 1 deletion bricks/sync/waitable_atomic.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ SOFTWARE.

#include <condition_variable>
#include <functional>
#include <map>
#include <mutex>

#ifdef CURRENT_FOR_CPP14
Expand Down Expand Up @@ -100,6 +101,13 @@ class IntrusiveClient final {
Interface* intrusive_object_;
};

struct WaitableAtomicSubscriberRemover {
virtual ~WaitableAtomicSubscriberRemover() = default;
virtual void Remove() = 0;
};

using WaitableAtomicSubscriberScope = std::unique_ptr<WaitableAtomicSubscriberRemover>;

template <typename DATA>
class WaitableAtomic {
public:
Expand Down Expand Up @@ -174,7 +182,17 @@ class WaitableAtomic {

MutableAccessor MutableScopedAccessor() { return MutableAccessor(this); }

void Notify() { data_condition_variable_.notify_all(); }
void Notify() {
data_condition_variable_.notify_all();
{
// Only lock the subscribers, no need to lock the data.
// Friendly reminder that the subscribers are expected to return quickly.
std::lock_guard lock(subscribers_mutex_);
for (const auto& [_, f] : subscribers_) {
f();
}
}
}

void UseAsLock(std::function<void()> f) const {
std::unique_lock<std::mutex> lock(data_t::data_mutex_);
Expand Down Expand Up @@ -316,10 +334,38 @@ class WaitableAtomic {
}
}

struct WaitableAtomicSubscriberRemoverImpl final : WaitableAtomicSubscriberRemover {
WaitableAtomic& self_;
const size_t id_;
WaitableAtomicSubscriberRemoverImpl(WaitableAtomic& self, size_t id) : self_(self), id_(id) {}
void Remove() override {
// Okay to only lock the subscribers map, but not the data.
std::lock_guard lock(self_.subscribers_mutex_);
self_.subscribers_.erase(id_);
}
};

[[nodiscard]]
WaitableAtomicSubscriberScope Subscribe(std::function<void()> f) {
// Need to lock both the data and the subscribers map to ensure exactly-once delivery of updates.
// The order is this way because subscribers are assumed to be locked for a shorter period of time.
// The assumption is that the clients will not perform slow operations and/or lock anything while notified,
// but at most schedule some tasks to be executed in their respective threads, thus releasing this lock quickly.
std::lock_guard lock_data(data_mutex_);
std::lock_guard lock_subscribers(subscribers_mutex_);
const size_t id = subscriber_next_id_;
++subscriber_next_id_;
subscribers_[id] = f;
return std::make_unique<WaitableAtomicSubscriberRemoverImpl>(*this, id);
}

protected:
data_t data_;
std::mutex subscribers_mutex_; // Declare the innermost mutex first.
mutable std::mutex data_mutex_;
mutable std::condition_variable data_condition_variable_;
std::map<size_t, std::function<void()>> subscribers_;
size_t subscriber_next_id_ = 0u;

private:
WaitableAtomic(const WaitableAtomic&) = delete;
Expand Down

0 comments on commit 8229075

Please sign in to comment.