Skip to content

Commit

Permalink
Merge #677
Browse files Browse the repository at this point in the history
677: Eagerly reset shared state in `async_rw_mutex` r=msimberg a=msimberg

Eagerly reset the shared state held by the operation state to allow continuations to run as early as possible. This fixes a deadlock that occurred in the added test case.

Co-authored-by: Mikael Simberg <mikael.simberg@iki.fi>
  • Loading branch information
bors[bot] and msimberg authored May 10, 2023
2 parents d98281e + 5bbc6f3 commit 1bb7fbc
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ namespace pika::execution::experimental {
// add a continuation to be triggered when the previous
// state is released.
p->add_continuation(PIKA_MOVE(continuation));
os.state.reset();
os.prev_state.reset();
}
else
Expand Down Expand Up @@ -655,6 +656,7 @@ namespace pika::execution::experimental {
// add a continuation to be triggered when the previous
// state is released.
p->add_continuation(PIKA_MOVE(continuation));
os.state.reset();
os.prev_state.reset();
}
else
Expand Down
34 changes: 34 additions & 0 deletions libs/pika/synchronization/tests/unit/async_rw_mutex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ using pika::execution::experimental::start_detached;
using pika::execution::experimental::then;
using pika::execution::experimental::thread_pool_scheduler;
using pika::execution::experimental::transfer;
using pika::execution::experimental::when_all;
using pika::this_thread::experimental::sync_wait;

unsigned int seed = std::random_device{}();
Expand Down Expand Up @@ -259,6 +260,31 @@ void test_read_sender_copyable(async_rw_mutex<ReadWriteT, ReadT> rwm)
PIKA_TEST_EQ(read_accesses, std::size_t(4));
}

template <typename ReadWriteT, typename ReadT = ReadWriteT>
void test_multiple_when_all(async_rw_mutex<ReadWriteT, ReadT> rwm)
{
{
auto s1 = rwm.readwrite() | then([](auto) {});
auto s2 = rwm.readwrite() | then([](auto) {});
auto s3 = rwm.readwrite() | then([](auto) {});
sync_wait(when_all(std::move(s1), std::move(s2), std::move(s3)));
}

{
auto s1 = rwm.readwrite() | then([](auto) {});
auto s2 = rwm.readwrite() | then([](auto) {});
auto s3 = rwm.readwrite() | then([](auto) {});
sync_wait(when_all(std::move(s3), std::move(s2), std::move(s1)));
}

{
auto s1 = rwm.readwrite() | then([](auto) {});
auto s2 = rwm.readwrite() | then([](auto) {});
auto s3 = rwm.readwrite() | then([](auto) {});
sync_wait(when_all(std::move(s3), std::move(s1), std::move(s2)));
}
}

///////////////////////////////////////////////////////////////////////////////
int pika_main(pika::program_options::variables_map& vm)
{
Expand Down Expand Up @@ -292,6 +318,14 @@ int pika_main(pika::program_options::variables_map& vm)
test_read_sender_copyable(async_rw_mutex<std::size_t>{0});
test_read_sender_copyable(async_rw_mutex<mytype, mytype_base>{mytype{}});

// These tests are disabled with stdexec and nvhpc because of bad codegen.
// This is fixed in newer versions of stdexec.
#if !(defined(PIKA_NVHPC_VERSION) && defined(PIKA_HAVE_STDEXEC))
test_multiple_when_all(async_rw_mutex<void>{});
test_multiple_when_all(async_rw_mutex<std::size_t>{0});
test_multiple_when_all(async_rw_mutex<mytype, mytype_base>{mytype{}});
#endif

return pika::finalize();
}

Expand Down

0 comments on commit 1bb7fbc

Please sign in to comment.