From 575b4b237824d5ebcd781802cc23745e9933190a Mon Sep 17 00:00:00 2001 From: Mikael Simberg Date: Wed, 18 Dec 2024 13:09:36 +0100 Subject: [PATCH 01/17] Store async_rw_mutex value in a separate shared state This allows avoiding synchronization required when passing the value from one shared state to another. --- .../pika/synchronization/async_rw_mutex.hpp | 49 +++++-------------- 1 file changed, 13 insertions(+), 36 deletions(-) diff --git a/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp b/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp index f1abf6813..73fe0008a 100644 --- a/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp +++ b/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp @@ -41,8 +41,8 @@ namespace pika::execution::experimental { { using mutex_type = pika::concurrency::detail::spinlock; using shared_state_ptr_type = std::shared_ptr; - std::atomic value_set{false}; - std::optional value{std::nullopt}; + using value_ptr_type = std::shared_ptr; + value_ptr_type value{nullptr}; shared_state_ptr_type next_state{nullptr}; mutex_type mtx{}; pika::detail::small_vector< @@ -67,36 +67,19 @@ namespace pika::execution::experimental { if (PIKA_LIKELY(next_state)) { - // The current state has now finished all accesses to the - // wrapped value, so we move the value to the next state. - PIKA_ASSERT(value.has_value()); - // NOLINTNEXTLINE(bugprone-unchecked-optional-access) - next_state->set_value(std::move(*value)); - - if (!continuations.empty()) - { - auto const size = continuations.size(); - for (std::size_t i = 0; i < size - 1; ++i) { continuations[i](next_state); } - - // Move shared state into the last continuation to ensure that the - // continuations release the last reference and not this destructor. - continuations[size - 1](std::move(next_state)); - } + for (auto& continuation : continuations) { continuation(next_state); } } } - template - void set_value(U&& u) + void set_value(value_ptr_type v) { + PIKA_ASSERT(v); PIKA_ASSERT(!value); - value.emplace(std::forward(u)); - value_set.store(true, std::memory_order_release); + value = std::move(v); } T& get_value() { - pika::util::yield_while( - [this]() { return !value_set.load(std::memory_order_acquire); }); PIKA_ASSERT(value); // NOLINTNEXTLINE(bugprone-unchecked-optional-access) return *value; @@ -565,7 +548,7 @@ namespace pika::execution::experimental { template , async_rw_mutex>::value>> explicit async_rw_mutex(U&& u, allocator_type const& alloc = {}) - : value(std::forward(u)) + : value(std::allocate_shared(alloc, std::forward(u))) , alloc(alloc) { } @@ -588,19 +571,15 @@ namespace pika::execution::experimental { { auto shared_prev_state = std::move(state); state = std::allocate_shared(alloc); + state->set_value(value); prev_access = async_rw_mutex_access_type::read; - // Only the first access has no previous shared state. When - // there is a previous state we set the next state so that the - // value can be passed from the previous state to the next - // state. When there is no previous state we need to move the - // value to the first state. + // Only the first access has no previous shared state. if (PIKA_LIKELY(shared_prev_state)) { shared_prev_state->set_next_state(state); prev_state = shared_prev_state; } - else { state->set_value(std::move(value)); } } return {prev_state, state}; @@ -611,18 +590,15 @@ namespace pika::execution::experimental { { auto shared_prev_state = std::move(state); state = std::allocate_shared(alloc); + state->set_value(value); prev_access = async_rw_mutex_access_type::readwrite; - // Only the first access has no previous shared state. When there is - // a previous state we set the next state so that the value can be - // passed from the previous state to the next state. When there is - // no previous state we need to move the value to the first state. + // Only the first access has no previous shared state. if (PIKA_LIKELY(shared_prev_state)) { shared_prev_state->set_next_state(state); prev_state = shared_prev_state; } - else { state->set_value(std::move(value)); } return {prev_state, state}; } @@ -630,6 +606,7 @@ namespace pika::execution::experimental { private: using shared_state_type = detail::async_rw_mutex_shared_state; using shared_state_weak_ptr_type = std::weak_ptr; + using value_ptr_type = std::shared_ptr; // nvc++ is not able to see this typedef unless it's public #if defined(PIKA_NVHPC_VERSION) @@ -741,7 +718,7 @@ namespace pika::execution::experimental { } }; - PIKA_NO_UNIQUE_ADDRESS readwrite_type value; + value_ptr_type value; PIKA_NO_UNIQUE_ADDRESS allocator_type alloc; async_rw_mutex_access_type prev_access = async_rw_mutex_access_type::readwrite; From 05ad0405517ea05128a4ed2ec0defa5a2c175bbb Mon Sep 17 00:00:00 2001 From: Mikael Simberg Date: Wed, 18 Dec 2024 13:21:55 +0100 Subject: [PATCH 02/17] Don't pass shared state from previous to next state in async_rw_mutex Use the shared state already stored in the operation state in continuations. --- .../pika/synchronization/async_rw_mutex.hpp | 48 +++++-------------- 1 file changed, 12 insertions(+), 36 deletions(-) diff --git a/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp b/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp index 73fe0008a..6aa5f37d6 100644 --- a/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp +++ b/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp @@ -45,8 +45,7 @@ namespace pika::execution::experimental { value_ptr_type value{nullptr}; shared_state_ptr_type next_state{nullptr}; mutex_type mtx{}; - pika::detail::small_vector< - pika::util::detail::unique_function, 1> + pika::detail::small_vector, 1> continuations{}; async_rw_mutex_shared_state() = default; @@ -57,18 +56,7 @@ namespace pika::execution::experimental { ~async_rw_mutex_shared_state() { - // If there is no next state the continuations must be empty. - PIKA_ASSERT(next_state || continuations.empty()); - - // This state must always have the value set by the time it is - // destructed. If there is no next state the value is destructed - // with this state. - PIKA_ASSERT(value); - - if (PIKA_LIKELY(next_state)) - { - for (auto& continuation : continuations) { continuation(next_state); } - } + for (auto& continuation : continuations) { continuation(); } } void set_value(value_ptr_type v) @@ -108,8 +96,7 @@ namespace pika::execution::experimental { using shared_state_ptr_type = std::shared_ptr; shared_state_ptr_type next_state{nullptr}; mutex_type mtx{}; - pika::detail::small_vector< - pika::util::detail::unique_function, 1> + pika::detail::small_vector, 1> continuations{}; async_rw_mutex_shared_state() = default; @@ -120,18 +107,7 @@ namespace pika::execution::experimental { ~async_rw_mutex_shared_state() { - // If there is no next state the continuations must be empty. - PIKA_ASSERT(next_state || continuations.empty()); - - if (!continuations.empty()) - { - auto const size = continuations.size(); - for (std::size_t i = 0; i < size - 1; ++i) { continuations[i](next_state); } - - // Move shared state into the last continuation to ensure that the continuations - // release the last reference and not this destructor. - continuations[size - 1](std::move(next_state)); - } + for (auto& continuation : continuations) { continuation(); } } void set_next_state(std::shared_ptr state) @@ -463,14 +439,15 @@ namespace pika::execution::experimental { "async_rw_lock::sender::operation_state state is empty, was the sender " "already started?"); - auto continuation = [&os](shared_state_ptr_type state) mutable { + auto continuation = [&os]() mutable { try { pika::execution::experimental::set_value( - std::move(os.r), access_type{std::move(state)}); + std::move(os.r), access_type{std::move(os.state)}); } catch (...) { + os.state.reset(); pika::execution::experimental::set_error( std::move(os.r), std::current_exception()); } @@ -482,7 +459,6 @@ namespace pika::execution::experimental { // add a continuation to be triggered when the previous // state is released. p->add_continuation(std::move(continuation)); - os.state.reset(); os.prev_state.reset(); } else @@ -490,7 +466,7 @@ namespace pika::execution::experimental { // There is no previous state on the first access or the // previous state has already been released. We can run // the continuation immediately. - continuation(std::move(os.state)); + continuation(); } } }; @@ -665,14 +641,15 @@ namespace pika::execution::experimental { "async_rw_lock::sender::operation_state state is empty, was the sender " "already started?"); - auto continuation = [&os](shared_state_ptr_type state) mutable { + auto continuation = [&os]() mutable { try { pika::execution::experimental::set_value( - std::move(os.r), access_type{std::move(state)}); + std::move(os.r), access_type{std::move(os.state)}); } catch (...) { + os.state.reset(); pika::execution::experimental::set_error( std::move(os.r), std::current_exception()); } @@ -684,7 +661,6 @@ namespace pika::execution::experimental { // add a continuation to be triggered when the previous // state is released. p->add_continuation(std::move(continuation)); - os.state.reset(); os.prev_state.reset(); } else @@ -692,7 +668,7 @@ namespace pika::execution::experimental { // There is no previous state on the first access or the // previous state has already been released. We can run // the continuation immediately. - continuation(std::move(os.state)); + continuation(); } } }; From 773c2ee8958cc6badaa0d3cd9a1cb4770e9168f3 Mon Sep 17 00:00:00 2001 From: Mikael Simberg Date: Wed, 18 Dec 2024 13:38:48 +0100 Subject: [PATCH 03/17] Trigger continuations of async_rw_mutex shared states in the owning shared state Don't do it in the previous shared state, for simpler reasoning about ownership. --- .../pika/synchronization/async_rw_mutex.hpp | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp b/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp index 6aa5f37d6..63794b91d 100644 --- a/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp +++ b/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp @@ -55,6 +55,11 @@ namespace pika::execution::experimental { async_rw_mutex_shared_state& operator=(async_rw_mutex_shared_state const&) = delete; ~async_rw_mutex_shared_state() + { + if (next_state) { next_state->done(); } + } + + void done() { for (auto& continuation : continuations) { continuation(); } } @@ -106,6 +111,11 @@ namespace pika::execution::experimental { async_rw_mutex_shared_state& operator=(async_rw_mutex_shared_state const&) = delete; ~async_rw_mutex_shared_state() + { + if (next_state) { next_state->done(); } + } + + void done() { for (auto& continuation : continuations) { continuation(); } } @@ -458,7 +468,10 @@ namespace pika::execution::experimental { // If the previous state is set and it's still alive, // add a continuation to be triggered when the previous // state is released. - p->add_continuation(std::move(continuation)); + os.state->add_continuation(std::move(continuation)); + + // Holding on to the previous state acts as a lock to ensure the + // continuations aren't triggered while the continuation is added. os.prev_state.reset(); } else @@ -660,7 +673,10 @@ namespace pika::execution::experimental { // If the previous state is set and it's still alive, // add a continuation to be triggered when the previous // state is released. - p->add_continuation(std::move(continuation)); + os.state->add_continuation(std::move(continuation)); + + // Holding on to the previous state acts as a lock to ensure the + // continuations aren't triggered while the continuation is added. os.prev_state.reset(); } else From 2e2ab032ff5ec55374949083dd9254532da1d904 Mon Sep 17 00:00:00 2001 From: Mikael Simberg Date: Wed, 18 Dec 2024 14:01:52 +0100 Subject: [PATCH 04/17] Replace type-erased unique_function with a pointer and virtual function in async_rw_mutex for continuations --- .../pika/synchronization/async_rw_mutex.hpp | 96 ++++++++++--------- 1 file changed, 49 insertions(+), 47 deletions(-) diff --git a/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp b/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp index 63794b91d..9b04aa681 100644 --- a/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp +++ b/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp @@ -14,7 +14,6 @@ #include #include #include -#include #include #include @@ -36,6 +35,11 @@ namespace pika::execution::experimental { }; namespace detail { + struct async_rw_mutex_operation_state_base + { + virtual void continuation() = 0; + }; + template struct async_rw_mutex_shared_state { @@ -45,8 +49,7 @@ namespace pika::execution::experimental { value_ptr_type value{nullptr}; shared_state_ptr_type next_state{nullptr}; mutex_type mtx{}; - pika::detail::small_vector, 1> - continuations{}; + pika::detail::small_vector op_states{}; async_rw_mutex_shared_state() = default; async_rw_mutex_shared_state(async_rw_mutex_shared_state&&) = delete; @@ -61,7 +64,7 @@ namespace pika::execution::experimental { void done() { - for (auto& continuation : continuations) { continuation(); } + for (auto* op_state : op_states) { op_state->continuation(); } } void set_value(value_ptr_type v) @@ -86,11 +89,10 @@ namespace pika::execution::experimental { next_state = std::move(state); } - template - void add_continuation(F&& continuation) + void add_op_state(async_rw_mutex_operation_state_base* op_state) { std::lock_guard l(mtx); - continuations.emplace_back(std::forward(continuation)); + op_states.emplace_back(op_state); } }; @@ -101,8 +103,7 @@ namespace pika::execution::experimental { using shared_state_ptr_type = std::shared_ptr; shared_state_ptr_type next_state{nullptr}; mutex_type mtx{}; - pika::detail::small_vector, 1> - continuations{}; + pika::detail::small_vector op_states{}; async_rw_mutex_shared_state() = default; async_rw_mutex_shared_state(async_rw_mutex_shared_state&&) = delete; @@ -117,7 +118,7 @@ namespace pika::execution::experimental { void done() { - for (auto& continuation : continuations) { continuation(); } + for (auto* op_state : op_states) { op_state->continuation(); } } void set_next_state(std::shared_ptr state) @@ -128,11 +129,10 @@ namespace pika::execution::experimental { next_state = std::move(state); } - template - void add_continuation(F&& continuation) + void add_op_state(async_rw_mutex_operation_state_base* op_state) { std::lock_guard l(mtx); - continuations.emplace_back(std::forward(continuation)); + op_states.emplace_back(op_state); } }; } // namespace detail @@ -422,7 +422,7 @@ namespace pika::execution::experimental { pika::execution::experimental::set_error_t(std::exception_ptr)>; template - struct operation_state + struct operation_state : detail::async_rw_mutex_operation_state_base { std::decay_t r; shared_state_weak_ptr_type prev_state; @@ -442,6 +442,21 @@ namespace pika::execution::experimental { operation_state(operation_state const&) = delete; operation_state& operator=(operation_state const&) = delete; + void continuation() override + { + try + { + pika::execution::experimental::set_value( + std::move(r), access_type{std::move(state)}); + } + catch (...) + { + state.reset(); + pika::execution::experimental::set_error( + std::move(r), std::current_exception()); + } + } + friend void tag_invoke( pika::execution::experimental::start_t, operation_state& os) noexcept { @@ -449,26 +464,12 @@ namespace pika::execution::experimental { "async_rw_lock::sender::operation_state state is empty, was the sender " "already started?"); - auto continuation = [&os]() mutable { - try - { - pika::execution::experimental::set_value( - std::move(os.r), access_type{std::move(os.state)}); - } - catch (...) - { - os.state.reset(); - pika::execution::experimental::set_error( - std::move(os.r), std::current_exception()); - } - }; - if (auto p = os.prev_state.lock()) { // If the previous state is set and it's still alive, // add a continuation to be triggered when the previous // state is released. - os.state->add_continuation(std::move(continuation)); + os.state->add_op_state(&os); // Holding on to the previous state acts as a lock to ensure the // continuations aren't triggered while the continuation is added. @@ -479,7 +480,7 @@ namespace pika::execution::experimental { // There is no previous state on the first access or the // previous state has already been released. We can run // the continuation immediately. - continuation(); + os.continuation(); } } }; @@ -627,7 +628,7 @@ namespace pika::execution::experimental { pika::execution::experimental::set_error_t(std::exception_ptr)>; template - struct operation_state + struct operation_state : detail::async_rw_mutex_operation_state_base { std::decay_t r; shared_state_weak_ptr_type prev_state; @@ -647,6 +648,21 @@ namespace pika::execution::experimental { operation_state(operation_state const&) = delete; operation_state& operator=(operation_state const&) = delete; + void continuation() override + { + try + { + pika::execution::experimental::set_value( + std::move(r), access_type{std::move(state)}); + } + catch (...) + { + state.reset(); + pika::execution::experimental::set_error( + std::move(r), std::current_exception()); + } + } + friend void tag_invoke( pika::execution::experimental::start_t, operation_state& os) noexcept { @@ -654,26 +670,12 @@ namespace pika::execution::experimental { "async_rw_lock::sender::operation_state state is empty, was the sender " "already started?"); - auto continuation = [&os]() mutable { - try - { - pika::execution::experimental::set_value( - std::move(os.r), access_type{std::move(os.state)}); - } - catch (...) - { - os.state.reset(); - pika::execution::experimental::set_error( - std::move(os.r), std::current_exception()); - } - }; - if (auto p = os.prev_state.lock()) { // If the previous state is set and it's still alive, // add a continuation to be triggered when the previous // state is released. - os.state->add_continuation(std::move(continuation)); + os.state->add_op_state(&os); // Holding on to the previous state acts as a lock to ensure the // continuations aren't triggered while the continuation is added. @@ -684,7 +686,7 @@ namespace pika::execution::experimental { // There is no previous state on the first access or the // previous state has already been released. We can run // the continuation immediately. - continuation(); + os.continuation(); } } }; From 35a19c642e027f00bd46aeefb91f329a7d98b5c6 Mon Sep 17 00:00:00 2001 From: Mikael Simberg Date: Wed, 18 Dec 2024 14:46:53 +0100 Subject: [PATCH 05/17] Store operation states of continuations in async_rw_mutex as an intrusive linked list --- .../pika/synchronization/async_rw_mutex.hpp | 91 ++++++++++++++++--- 1 file changed, 79 insertions(+), 12 deletions(-) diff --git a/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp b/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp index 9b04aa681..c606ba6b8 100644 --- a/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp +++ b/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp @@ -37,6 +37,7 @@ namespace pika::execution::experimental { namespace detail { struct async_rw_mutex_operation_state_base { + async_rw_mutex_operation_state_base* next{nullptr}; virtual void continuation() = 0; }; @@ -46,10 +47,10 @@ namespace pika::execution::experimental { using mutex_type = pika::concurrency::detail::spinlock; using shared_state_ptr_type = std::shared_ptr; using value_ptr_type = std::shared_ptr; + value_ptr_type value{nullptr}; shared_state_ptr_type next_state{nullptr}; - mutex_type mtx{}; - pika::detail::small_vector op_states{}; + std::atomic op_state_head{nullptr}; async_rw_mutex_shared_state() = default; async_rw_mutex_shared_state(async_rw_mutex_shared_state&&) = delete; @@ -62,9 +63,32 @@ namespace pika::execution::experimental { if (next_state) { next_state->done(); } } - void done() + void done() noexcept { - for (auto* op_state : op_states) { op_state->continuation(); } + while (true) + { + void* expected = op_state_head.load(std::memory_order_relaxed); + // TODO: memory order + // this is not an async_rw_mutex_operation_state_base*, but is a known value to + // signal that the queue has been processed + if (op_state_head.compare_exchange_strong(expected, static_cast(this))) + { + // We have now successfully acquired the head of the queue, and signaled to + // other threads that they can't add any more items to the queue. We can now + // process the queue without further synchronization. + async_rw_mutex_operation_state_base* current = + static_cast(expected); + + while (current != nullptr) + { + async_rw_mutex_operation_state_base* next = current->next; + current->continuation(); + current = next; + } + + break; + } + } } void set_value(value_ptr_type v) @@ -91,8 +115,18 @@ namespace pika::execution::experimental { void add_op_state(async_rw_mutex_operation_state_base* op_state) { - std::lock_guard l(mtx); - op_states.emplace_back(op_state); + while (true) + { + void* expected = op_state_head.load(std::memory_order_relaxed); + PIKA_ASSERT(expected != static_cast(this)); + op_state->next = static_cast(expected); + // TODO: memory order + if (op_state_head.compare_exchange_strong( + expected, static_cast(op_state))) + { + break; + } + } } }; @@ -101,9 +135,9 @@ namespace pika::execution::experimental { { using mutex_type = pika::concurrency::detail::spinlock; using shared_state_ptr_type = std::shared_ptr; + shared_state_ptr_type next_state{nullptr}; - mutex_type mtx{}; - pika::detail::small_vector op_states{}; + std::atomic op_state_head{nullptr}; async_rw_mutex_shared_state() = default; async_rw_mutex_shared_state(async_rw_mutex_shared_state&&) = delete; @@ -116,9 +150,32 @@ namespace pika::execution::experimental { if (next_state) { next_state->done(); } } - void done() + void done() noexcept { - for (auto* op_state : op_states) { op_state->continuation(); } + while (true) + { + void* expected = op_state_head.load(std::memory_order_relaxed); + // TODO: memory order + // this is not an async_rw_mutex_operation_state_base*, but is a known value to + // signal that the queue has been processed + if (op_state_head.compare_exchange_strong(expected, static_cast(this))) + { + // We have now successfully acquired the head of the queue, and signaled to + // other threads that they can't add any more items to the queue. We can now + // process the queue without further synchronization. + async_rw_mutex_operation_state_base* current = + static_cast(expected); + + while (current != nullptr) + { + async_rw_mutex_operation_state_base* next = current->next; + current->continuation(); + current = next; + } + + break; + } + } } void set_next_state(std::shared_ptr state) @@ -131,8 +188,18 @@ namespace pika::execution::experimental { void add_op_state(async_rw_mutex_operation_state_base* op_state) { - std::lock_guard l(mtx); - op_states.emplace_back(op_state); + while (true) + { + void* expected = op_state_head.load(std::memory_order_relaxed); + PIKA_ASSERT(expected != static_cast(this)); + op_state->next = static_cast(expected); + // TODO: memory order + if (op_state_head.compare_exchange_strong( + expected, static_cast(op_state))) + { + break; + } + } } }; } // namespace detail From 9b0f1a9777ed46fc91217e33cd0406ab984df494 Mon Sep 17 00:00:00 2001 From: Mikael Simberg Date: Wed, 18 Dec 2024 15:02:16 +0100 Subject: [PATCH 06/17] Avoid storing pointer to previous shared state in async_rw_mutex --- .../pika/synchronization/async_rw_mutex.hpp | 106 ++++++------------ 1 file changed, 33 insertions(+), 73 deletions(-) diff --git a/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp b/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp index c606ba6b8..4ec231128 100644 --- a/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp +++ b/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp @@ -113,12 +113,13 @@ namespace pika::execution::experimental { next_state = std::move(state); } - void add_op_state(async_rw_mutex_operation_state_base* op_state) + bool add_op_state(async_rw_mutex_operation_state_base* op_state) { while (true) { void* expected = op_state_head.load(std::memory_order_relaxed); - PIKA_ASSERT(expected != static_cast(this)); + if (expected == static_cast(this)) { return false; } + op_state->next = static_cast(expected); // TODO: memory order if (op_state_head.compare_exchange_strong( @@ -127,6 +128,8 @@ namespace pika::execution::experimental { break; } } + + return true; } }; @@ -186,12 +189,13 @@ namespace pika::execution::experimental { next_state = std::move(state); } - void add_op_state(async_rw_mutex_operation_state_base* op_state) + bool add_op_state(async_rw_mutex_operation_state_base* op_state) { while (true) { void* expected = op_state_head.load(std::memory_order_relaxed); - PIKA_ASSERT(expected != static_cast(this)); + if (expected == static_cast(this)) { return false; } + op_state->next = static_cast(expected); // TODO: memory order if (op_state_head.compare_exchange_strong( @@ -200,6 +204,8 @@ namespace pika::execution::experimental { break; } } + + return true; } }; } // namespace detail @@ -429,7 +435,7 @@ namespace pika::execution::experimental { { if (prev_access == async_rw_mutex_access_type::readwrite) { - auto shared_prev_state = std::move(state); + auto prev_state = std::move(state); state = std::allocate_shared(alloc); prev_access = async_rw_mutex_access_type::read; @@ -437,32 +443,26 @@ namespace pika::execution::experimental { // there is a previous state we set the next state so that the // value can be passed from the previous state to the next // state. - if (PIKA_LIKELY(shared_prev_state)) - { - shared_prev_state->set_next_state(state); - prev_state = shared_prev_state; - } + if (PIKA_LIKELY(prev_state)) { prev_state->set_next_state(state); } + else { state->done(); } } - return {prev_state, state}; + return {state}; } sender readwrite() { - auto shared_prev_state = std::move(state); + auto prev_state = std::move(state); state = std::allocate_shared(alloc); prev_access = async_rw_mutex_access_type::readwrite; // Only the first access has no previous shared state. When there is // a previous state we set the next state so that the value can be // passed from the previous state to the next state. - if (PIKA_LIKELY(shared_prev_state)) - { - shared_prev_state->set_next_state(state); - prev_state = shared_prev_state; - } + if (PIKA_LIKELY(prev_state)) { prev_state->set_next_state(state); } + else { state->done(); } - return {prev_state, state}; + return {state}; } private: @@ -471,7 +471,6 @@ namespace pika::execution::experimental { { PIKA_STDEXEC_SENDER_CONCEPT - shared_state_weak_ptr_type prev_state; shared_state_ptr_type state; using access_type = @@ -492,14 +491,11 @@ namespace pika::execution::experimental { struct operation_state : detail::async_rw_mutex_operation_state_base { std::decay_t r; - shared_state_weak_ptr_type prev_state; shared_state_ptr_type state; template - operation_state( - R_&& r, shared_state_weak_ptr_type prev_state, shared_state_ptr_type state) + operation_state(R_&& r, shared_state_ptr_type state) : r(std::forward(r)) - , prev_state(std::move(prev_state)) , state(std::move(state)) { } @@ -531,18 +527,7 @@ namespace pika::execution::experimental { "async_rw_lock::sender::operation_state state is empty, was the sender " "already started?"); - if (auto p = os.prev_state.lock()) - { - // If the previous state is set and it's still alive, - // add a continuation to be triggered when the previous - // state is released. - os.state->add_op_state(&os); - - // Holding on to the previous state acts as a lock to ensure the - // continuations aren't triggered while the continuation is added. - os.prev_state.reset(); - } - else + if (!os.state->add_op_state(&os)) { // There is no previous state on the first access or the // previous state has already been released. We can run @@ -555,8 +540,7 @@ namespace pika::execution::experimental { template friend auto tag_invoke(pika::execution::experimental::connect_t, sender&& s, R&& r) { - return operation_state{ - std::forward(r), std::move(s.prev_state), std::move(s.state)}; + return operation_state{std::forward(r), std::move(s.state)}; } }; @@ -564,7 +548,6 @@ namespace pika::execution::experimental { async_rw_mutex_access_type prev_access = async_rw_mutex_access_type::readwrite; - shared_state_weak_ptr_type prev_state; shared_state_ptr_type state; }; @@ -626,38 +609,32 @@ namespace pika::execution::experimental { { if (prev_access == async_rw_mutex_access_type::readwrite) { - auto shared_prev_state = std::move(state); + auto prev_state = std::move(state); state = std::allocate_shared(alloc); state->set_value(value); prev_access = async_rw_mutex_access_type::read; // Only the first access has no previous shared state. - if (PIKA_LIKELY(shared_prev_state)) - { - shared_prev_state->set_next_state(state); - prev_state = shared_prev_state; - } + if (PIKA_LIKELY(prev_state)) { prev_state->set_next_state(state); } + else { state->done(); } } - return {prev_state, state}; + return {state}; } /// \brief Access the wrapped value in read-write mode through a sender. sender readwrite() { - auto shared_prev_state = std::move(state); + auto prev_state = std::move(state); state = std::allocate_shared(alloc); state->set_value(value); prev_access = async_rw_mutex_access_type::readwrite; // Only the first access has no previous shared state. - if (PIKA_LIKELY(shared_prev_state)) - { - shared_prev_state->set_next_state(state); - prev_state = shared_prev_state; - } + if (PIKA_LIKELY(prev_state)) { prev_state->set_next_state(state); } + else { state->done(); } - return {prev_state, state}; + return {state}; } private: @@ -677,7 +654,6 @@ namespace pika::execution::experimental { { PIKA_STDEXEC_SENDER_CONCEPT - shared_state_weak_ptr_type prev_state; shared_state_ptr_type state; using access_type = @@ -698,14 +674,11 @@ namespace pika::execution::experimental { struct operation_state : detail::async_rw_mutex_operation_state_base { std::decay_t r; - shared_state_weak_ptr_type prev_state; shared_state_ptr_type state; template - operation_state( - R_&& r, shared_state_weak_ptr_type prev_state, shared_state_ptr_type state) + operation_state(R_&& r, shared_state_ptr_type state) : r(std::forward(r)) - , prev_state(std::move(prev_state)) , state(std::move(state)) { } @@ -737,18 +710,7 @@ namespace pika::execution::experimental { "async_rw_lock::sender::operation_state state is empty, was the sender " "already started?"); - if (auto p = os.prev_state.lock()) - { - // If the previous state is set and it's still alive, - // add a continuation to be triggered when the previous - // state is released. - os.state->add_op_state(&os); - - // Holding on to the previous state acts as a lock to ensure the - // continuations aren't triggered while the continuation is added. - os.prev_state.reset(); - } - else + if (!os.state->add_op_state(&os)) { // There is no previous state on the first access or the // previous state has already been released. We can run @@ -761,8 +723,7 @@ namespace pika::execution::experimental { template friend auto tag_invoke(pika::execution::experimental::connect_t, sender&& s, R&& r) { - return operation_state{ - std::forward(r), std::move(s.prev_state), std::move(s.state)}; + return operation_state{std::forward(r), std::move(s.state)}; } template @@ -775,7 +736,7 @@ namespace pika::execution::experimental { "connectable"); } - return operation_state{std::forward(r), s.prev_state, s.state}; + return operation_state{std::forward(r), s.state}; } }; @@ -784,7 +745,6 @@ namespace pika::execution::experimental { async_rw_mutex_access_type prev_access = async_rw_mutex_access_type::readwrite; - shared_state_weak_ptr_type prev_state; shared_state_ptr_type state; }; } // namespace pika::execution::experimental From 785183023231382126ba5849038b4816bf328514 Mon Sep 17 00:00:00 2001 From: Mikael Simberg Date: Thu, 19 Dec 2024 11:03:36 +0100 Subject: [PATCH 07/17] Manage shared state allocation and reference counting manually in async_rw_mutex --- .../pika/synchronization/async_rw_mutex.hpp | 262 +++++++++++++----- 1 file changed, 188 insertions(+), 74 deletions(-) diff --git a/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp b/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp index 4ec231128..e8e46f42f 100644 --- a/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp +++ b/libs/pika/synchronization/include/pika/synchronization/async_rw_mutex.hpp @@ -6,14 +6,16 @@ #pragma once +#include #include #include -#include #include #include #include #include #include +#include +#include #include #include @@ -41,18 +43,26 @@ namespace pika::execution::experimental { virtual void continuation() = 0; }; - template + template struct async_rw_mutex_shared_state { - using mutex_type = pika::concurrency::detail::spinlock; - using shared_state_ptr_type = std::shared_ptr; + using allocator_type = Allocator; + using shared_state_ptr_type = pika::intrusive_ptr; using value_ptr_type = std::shared_ptr; + PIKA_NO_UNIQUE_ADDRESS Allocator alloc; value_ptr_type value{nullptr}; + pika::detail::atomic_count reference_count{0}; shared_state_ptr_type next_state{nullptr}; std::atomic op_state_head{nullptr}; async_rw_mutex_shared_state() = default; + template + async_rw_mutex_shared_state(Allocator_&& alloc, value_ptr_type v) + : alloc(std::forward(alloc)) + , value(std::move(v)) + { + } async_rw_mutex_shared_state(async_rw_mutex_shared_state&&) = delete; async_rw_mutex_shared_state& operator=(async_rw_mutex_shared_state&&) = delete; async_rw_mutex_shared_state(async_rw_mutex_shared_state const&) = delete; @@ -60,10 +70,18 @@ namespace pika::execution::experimental { ~async_rw_mutex_shared_state() { - if (next_state) { next_state->done(); } + if (next_state) + { + // We pass the ownership of the intrusive_ptr of the next state to the next + // state itself, so that it can choose when to release it. If we can avoid it, + // we don't want this shared state to to hold on to the reference longer than + // necessary. + async_rw_mutex_shared_state* p = next_state.get(); + p->done(std::move(next_state)); + } } - void done() noexcept + void done(shared_state_ptr_type p) noexcept { while (true) { @@ -79,6 +97,10 @@ namespace pika::execution::experimental { async_rw_mutex_operation_state_base* current = static_cast(expected); + // We are also not accessing this shared state directly anymore, so we can + // reset p early. + p.reset(); + while (current != nullptr) { async_rw_mutex_operation_state_base* next = current->next; @@ -105,7 +127,7 @@ namespace pika::execution::experimental { return *value; } - void set_next_state(std::shared_ptr state) + void set_next_state(shared_state_ptr_type state) { // The next state should only be set once PIKA_ASSERT(!next_state); @@ -131,18 +153,42 @@ namespace pika::execution::experimental { return true; } + + friend void intrusive_ptr_add_ref(async_rw_mutex_shared_state* p) + { + ++p->reference_count; + } + + friend void intrusive_ptr_release(async_rw_mutex_shared_state* p) + { + if (--p->reference_count == 0) + { + using allocator_type = typename std::allocator_traits< + allocator_type>::template rebind_alloc; + allocator_type other_alloc(p->alloc); + std::allocator_traits::destroy(other_alloc, p); + std::allocator_traits::deallocate(other_alloc, p, 1); + } + } }; - template <> - struct async_rw_mutex_shared_state + template + struct async_rw_mutex_shared_state { - using mutex_type = pika::concurrency::detail::spinlock; - using shared_state_ptr_type = std::shared_ptr; + using allocator_type = Allocator; + using shared_state_ptr_type = pika::intrusive_ptr; + PIKA_NO_UNIQUE_ADDRESS Allocator alloc; shared_state_ptr_type next_state{nullptr}; + pika::detail::atomic_count reference_count{0}; std::atomic op_state_head{nullptr}; async_rw_mutex_shared_state() = default; + template + explicit async_rw_mutex_shared_state(Allocator_&& alloc) + : alloc(std::forward(alloc)) + { + } async_rw_mutex_shared_state(async_rw_mutex_shared_state&&) = delete; async_rw_mutex_shared_state& operator=(async_rw_mutex_shared_state&&) = delete; async_rw_mutex_shared_state(async_rw_mutex_shared_state const&) = delete; @@ -150,10 +196,18 @@ namespace pika::execution::experimental { ~async_rw_mutex_shared_state() { - if (next_state) { next_state->done(); } + if (next_state) + { + // We pass the ownership of the intrusive_ptr of the next state to the next + // state itself, so that it can choose when to release it. If we can avoid it, + // we don't want this shared state to to hold on to the reference longer than + // necessary. + async_rw_mutex_shared_state* p = next_state.get(); + p->done(std::move(next_state)); + } } - void done() noexcept + void done(shared_state_ptr_type p) noexcept { while (true) { @@ -169,6 +223,10 @@ namespace pika::execution::experimental { async_rw_mutex_operation_state_base* current = static_cast(expected); + // We are also not accessing this shared state directly anymore, so we can + // reset p early. + p.reset(); + while (current != nullptr) { async_rw_mutex_operation_state_base* next = current->next; @@ -181,7 +239,7 @@ namespace pika::execution::experimental { } } - void set_next_state(std::shared_ptr state) + void set_next_state(shared_state_ptr_type state) { // The next state should only be set once PIKA_ASSERT(!next_state); @@ -207,6 +265,23 @@ namespace pika::execution::experimental { return true; } + + friend void intrusive_ptr_add_ref(async_rw_mutex_shared_state* p) + { + ++p->reference_count; + } + + friend void intrusive_ptr_release(async_rw_mutex_shared_state* p) + { + if (--p->reference_count == 0) + { + using allocator_type = typename std::allocator_traits< + allocator_type>::template rebind_alloc; + allocator_type other_alloc(p->alloc); + std::allocator_traits::destroy(other_alloc, p); + std::allocator_traits::deallocate(other_alloc, p, 1); + } + } }; } // namespace detail @@ -219,22 +294,25 @@ namespace pika::execution::experimental { /// /// When the access type is \ref async_rw_mutex_access_type::readwrite the wrapper is move-only. /// When the access type is \ref async_rw_mutex_access_type::read the wrapper is copyable. - template + template class async_rw_mutex_access_wrapper; /// \brief A wrapper for values sent by senders from \ref async_rw_mutex with read-only access. /// /// The wrapper is copyable. - template - class async_rw_mutex_access_wrapper + template + class async_rw_mutex_access_wrapper { private: - using shared_state_type = std::shared_ptr>; - shared_state_type state; + using shared_state_ptr_type = + pika::intrusive_ptr>; + shared_state_ptr_type state; public: async_rw_mutex_access_wrapper() = delete; - async_rw_mutex_access_wrapper(shared_state_type state) + async_rw_mutex_access_wrapper(shared_state_ptr_type state) : state(std::move(state)) { } @@ -254,8 +332,9 @@ namespace pika::execution::experimental { /// \brief A wrapper for values sent by senders from \ref async_rw_mutex with read-write access. /// /// The wrapper is move-only. - template - class async_rw_mutex_access_wrapper + template + class async_rw_mutex_access_wrapper { private: static_assert(!std::is_void::value, @@ -265,12 +344,13 @@ namespace pika::execution::experimental { "Cannot mix void and non-void type in async_rw_mutex_access_wrapper wrapper (ReadT " "is void, ReadWriteT is non-void)"); - using shared_state_type = std::shared_ptr>; - shared_state_type state; + using shared_state_ptr_type = + pika::intrusive_ptr>; + shared_state_ptr_type state; public: async_rw_mutex_access_wrapper() = delete; - async_rw_mutex_access_wrapper(shared_state_type state) + async_rw_mutex_access_wrapper(shared_state_ptr_type state) : state(std::move(state)) { } @@ -294,16 +374,17 @@ namespace pika::execution::experimental { /// \brief A wrapper for read-only access granted by a \p void \ref async_rw_mutex. /// /// The wrapper is copyable. - template <> - class async_rw_mutex_access_wrapper + template + class async_rw_mutex_access_wrapper { private: - using shared_state_type = std::shared_ptr>; - shared_state_type state; + using shared_state_ptr_type = + pika::intrusive_ptr>; + shared_state_ptr_type state; public: async_rw_mutex_access_wrapper() = delete; - explicit async_rw_mutex_access_wrapper(shared_state_type state) + explicit async_rw_mutex_access_wrapper(shared_state_ptr_type state) : state(std::move(state)) { } @@ -316,16 +397,18 @@ namespace pika::execution::experimental { /// \brief A wrapper for read-write access granted by a \p void \ref async_rw_mutex. /// /// The wrapper is move-only. - template <> - class async_rw_mutex_access_wrapper + template + class async_rw_mutex_access_wrapper { private: - using shared_state_type = std::shared_ptr>; - shared_state_type state; + using shared_state_ptr_type = + pika::intrusive_ptr>; + shared_state_ptr_type state; public: async_rw_mutex_access_wrapper() = delete; - explicit async_rw_mutex_access_wrapper(shared_state_type state) + explicit async_rw_mutex_access_wrapper(shared_state_ptr_type state) : state(std::move(state)) { } @@ -398,27 +481,17 @@ namespace pika::execution::experimental { template class async_rw_mutex { - private: template struct sender; - using shared_state_type = detail::async_rw_mutex_shared_state; - using shared_state_weak_ptr_type = std::weak_ptr; - - // nvc++ is not able to see this typedef unless it's public -#if defined(PIKA_NVHPC_VERSION) - public: -#endif - using shared_state_ptr_type = std::shared_ptr; - public: using read_type = void; using readwrite_type = void; using read_access_type = async_rw_mutex_access_wrapper; + async_rw_mutex_access_type::read, Allocator>; using readwrite_access_type = async_rw_mutex_access_wrapper; + async_rw_mutex_access_type::readwrite, Allocator>; using allocator_type = Allocator; @@ -431,12 +504,38 @@ namespace pika::execution::experimental { async_rw_mutex(async_rw_mutex const&) = delete; async_rw_mutex& operator=(async_rw_mutex const&) = delete; + private: + using shared_state_type = detail::async_rw_mutex_shared_state; + + // nvc++ is not able to see this typedef unless it's public +#if defined(PIKA_NVHPC_VERSION) + public: +#endif + using shared_state_ptr_type = pika::intrusive_ptr; + + private: + shared_state_ptr_type allocate_shared() + { + using other_allocator = typename std::allocator_traits< + allocator_type>::template rebind_alloc; + using allocator_traits = std::allocator_traits; + using unique_ptr = std::unique_ptr>; + + other_allocator alloc(this->alloc); + unique_ptr p(allocator_traits::allocate(alloc, 1), + pika::detail::allocator_deleter{alloc}); + new (p.get()) shared_state_type{this->alloc}; + return p.release(); + } + + public: sender read() { if (prev_access == async_rw_mutex_access_type::readwrite) { auto prev_state = std::move(state); - state = std::allocate_shared(alloc); + state = allocate_shared(); prev_access = async_rw_mutex_access_type::read; // Only the first access has no previous shared state. When @@ -444,7 +543,7 @@ namespace pika::execution::experimental { // value can be passed from the previous state to the next // state. if (PIKA_LIKELY(prev_state)) { prev_state->set_next_state(state); } - else { state->done(); } + else { state->done(nullptr); } } return {state}; @@ -453,14 +552,14 @@ namespace pika::execution::experimental { sender readwrite() { auto prev_state = std::move(state); - state = std::allocate_shared(alloc); + state = allocate_shared(); prev_access = async_rw_mutex_access_type::readwrite; // Only the first access has no previous shared state. When there is // a previous state we set the next state so that the value can be // passed from the previous state to the next state. if (PIKA_LIKELY(prev_state)) { prev_state->set_next_state(state); } - else { state->done(); } + else { state->done(nullptr); } return {state}; } @@ -473,8 +572,8 @@ namespace pika::execution::experimental { shared_state_ptr_type state; - using access_type = - async_rw_mutex_access_wrapper; + using access_type = async_rw_mutex_access_wrapper; template