Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change notification of waiting threads in condition_variable #998

Merged
merged 1 commit into from
Jan 24, 2024

Conversation

msimberg
Copy link
Contributor

@msimberg msimberg commented Jan 23, 2024

#689 was a bit too eager in relaxing locks in condition_variable. Thread sanitizer reported a race between wait checking if the agent_ref had already been reset by notify, and notify_all resetting the agent_ref. The latter was happening without a lock (after #689). This PR slightly rearranges notify_all (and hopefully simplifies it as well). The lock is now held while working on the queue and the individual entries, but released when resuming a thread.

This is reported as the following race by thread sanitizer:

WARNING: ThreadSanitizer: data race (pid=3967134)
  Read of size 8 at 0x7f5d7ac9ab68 by thread T4:
    #0 pika::execution::detail::agent_ref::operator bool() const /home/mjs/src/pika/libs/pika/execution_base/include/pika/execution_base/agent_ref.hpp:40 (libpika.so.0+0x1ef040)
    #1 pika::detail::condition_variable::wait_until(std::unique_lock<pika::concurrency::detail::spinlock>&, pika::chrono::steady_time_point const&, char const*, pika::error_code&) /home/mjs/src/pika/libs/pika/synchronization/src/detail/condition_variable.cpp:181 (libpika.so
.0+0x1ef040)
    #2 pika::detail::condition_variable::wait_until(std::unique_lock<pika::concurrency::detail::spinlock>&, pika::chrono::steady_time_point const&, pika::error_code&) /home/mjs/src/pika/libs/pika/synchronization/include/pika/synchronization/detail/condition_variable.hpp:126
 (condition_variable2_test+0x103fa7)
    #3 bool pika::condition_variable_any::wait_until<std::unique_lock<pika::mutex>, test_timed_wait<std::chrono::duration<long, std::ratio<1l, 1000l> > >(bool, bool, std::chrono::duration<long, std::ratio<1l, 1000l> >)::{lambda(pika::stop_token)#1}::operator()(pika::stop_to
ken) const::{lambda()#1}>(std::unique_lock<pika::mutex>&, pika::stop_token, pika::chrono::steady_time_point const&, test_timed_wait<std::chrono::duration<long, std::ratio<1l, 1000l> > >(bool, bool, std::chrono::duration<long, std::ratio<1l, 1000l> >)::{lambda(pika::stop_tok
en)#1}::operator()(pika::stop_token) const::{lambda()#1}, pika::error_code&) /home/mjs/src/pika/libs/pika/synchronization/include/pika/synchronization/condition_variable.hpp:365 (condition_variable2_test+0x103fa7)
    #4 bool pika::condition_variable_any::wait_for<std::unique_lock<pika::mutex>, test_timed_wait<std::chrono::duration<long, std::ratio<1l, 1000l> > >(bool, bool, std::chrono::duration<long, std::ratio<1l, 1000l> >)::{lambda(pika::stop_token)#1}::operator()(pika::stop_toke
n) const::{lambda()#1}>(std::unique_lock<pika::mutex>&, pika::stop_token, pika::chrono::steady_duration const&, test_timed_wait<std::chrono::duration<long, std::ratio<1l, 1000l> > >(bool, bool, std::chrono::duration<long, std::ratio<1l, 1000l> >)::{lambda(pika::stop_token)#
1}::operator()(pika::stop_token) const::{lambda()#1}, pika::error_code&) /home/mjs/src/pika/libs/pika/synchronization/include/pika/synchronization/condition_variable.hpp:382 (condition_variable2_test+0x103c2b)
    #5 test_timed_wait<std::chrono::duration<long, std::ratio<1l, 1000l> > >(bool, bool, std::chrono::duration<long, std::ratio<1l, 1000l> >)::{lambda(pika::stop_token)#1}::operator()(pika::stop_token) const /home/mjs/src/pika/libs/pika/threading/tests/unit/condition_variab
le2.cpp:390 (condition_variable2_test+0x103732)
    #6 void pika::util::detail::callable_vtable<void ()>::_invoke<pika::util::detail::deferred<pika::jthread::jthread<test_timed_wait<std::chrono::duration<long, std::ratio<1l, 1000l> > >(bool, bool, std::chrono::duration<long, std::ratio<1l, 1000l> >)::{lambda(pika::stop_t
oken)#1}, , void>(test_timed_wait<std::chrono::duration<long, std::ratio<1l, 1000l> > >(bool, bool, std::chrono::duration<long, std::ratio<1l, 1000l> >)::{lambda(pika::stop_token)#1}&&)::{lambda(pika::stop_token, test_timed_wait<std::chrono::duration<long, std::ratio<1l, 10
00l> > >(bool, bool, auto:1)::{lambda(pika::stop_token)#1}&&)#1}, pika::util::detail::pack_c<unsigned long, 0ul, 1ul>, pika::stop_token, test_timed_wait<std::chrono::duration<long, std::ratio<1l, 1000l> > >(bool, bool, std::chrono::duration<long, std::ratio<1l, 1000l> >)::{
lambda(pika::stop_token)#1}> >(void*) /home/mjs/src/pika/libs/pika/threading/include/pika/threading/jthread.hpp:34 (condition_variable2_test+0x10354a)
    #7 pika::util::detail::basic_function<void (), false>::operator()() const /home/mjs/src/pika/libs/pika/functional/include/pika/functional/detail/basic_function.hpp:199 (libpika.so.0+0x291090)
    #8 pika::thread::thread_function_nullary(pika::util::detail::unique_function<void ()> const&) /home/mjs/src/pika/libs/pika/threading/src/thread.cpp:116 (libpika.so.0+0x291090)
    #9 pika::util::detail::invoke_bound_result<std::pair<pika::threads::detail::thread_schedule_state, pika::threads::detail::thread_id> (*&&)(pika::util::detail::unique_function<void ()> const&), pika::util::detail::pack<pika::util::detail::unique_function<void ()>&&>, pik
a::threads::detail::thread_restart_state&&>::type pika::util::detail::bound<std::pair<pika::threads::detail::thread_schedule_state, pika::threads::detail::thread_id> (*)(pika::util::detail::unique_function<void ()> const&), pika::util::detail::pack_c<unsigned long, 0ul>, pi
ka::util::detail::unique_function<void ()> >::operator()<pika::threads::detail::thread_restart_state>(pika::threads::detail::thread_restart_state&&) && /home/mjs/src/pika/libs/pika/functional/include/pika/functional/bind.hpp:146 (libpika.so.0+0x292c37)
    #10 std::invoke_result<pika::util::detail::bound<std::pair<pika::threads::detail::thread_schedule_state, pika::threads::detail::thread_id> (*)(pika::util::detail::unique_function<void ()> const&), pika::util::detail::pack_c<unsigned long, 0ul>, pika::util::detail::uniqu
e_function<void ()> >, pika::threads::detail::thread_restart_state>::type pika::util::detail::one_shot_wrapper<pika::util::detail::bound<std::pair<pika::threads::detail::thread_schedule_state, pika::threads::detail::thread_id> (*)(pika::util::detail::unique_function<void ()
> const&), pika::util::detail::pack_c<unsigned long, 0ul>, pika::util::detail::unique_function<void ()> > >::operator()<pika::threads::detail::thread_restart_state>(pika::threads::detail::thread_restart_state&&) /home/mjs/src/pika/libs/pika/functional/include/pika/functiona
l/one_shot.hpp:58 (libpika.so.0+0x292c37)
    #11 std::pair<pika::threads::detail::thread_schedule_state, pika::threads::detail::thread_id> pika::util::detail::callable_vtable<std::pair<pika::threads::detail::thread_schedule_state, pika::threads::detail::thread_id> (pika::threads::detail::thread_restart_state)>::_i
nvoke<pika::util::detail::one_shot_wrapper<pika::util::detail::bound<std::pair<pika::threads::detail::thread_schedule_state, pika::threads::detail::thread_id> (*)(pika::util::detail::unique_function<void ()> const&), pika::util::detail::pack_c<unsigned long, 0ul>, pika::uti
l::detail::unique_function<void ()> > > >(void*, pika::threads::detail::thread_restart_state&&) /home/mjs/src/pika/libs/pika/functional/include/pika/functional/detail/vtable/callable_vtable.hpp:88 (libpika.so.0+0x292c37)
    #12 pika::util::detail::basic_function<std::pair<pika::threads::detail::thread_schedule_state, pika::threads::detail::thread_id> (pika::threads::detail::thread_restart_state), false>::operator()(pika::threads::detail::thread_restart_state) const /home/mjs/src/pika/libs/
pika/functional/include/pika/functional/detail/basic_function.hpp:199 (libpika.so.0+0x10a096)
    #13 pika::threads::coroutines::detail::coroutine_impl::operator()() /home/mjs/src/pika/libs/pika/coroutines/src/detail/coroutine_impl.cpp:71 (libpika.so.0+0x10a096)
    #14 void pika::threads::coroutines::detail::lx::trampoline<pika::threads::coroutines::detail::coroutine_impl>(void*) /home/mjs/src/pika/libs/pika/coroutines/include/pika/coroutines/detail/context_linux_x86.hpp:93 (libpika.so.0+0x109bf9)

  Previous write of size 8 at 0x7f5d7ac9ab68 by thread T2:
    #0 pika::execution::detail::agent_ref::reset(pika::execution::detail::agent_base*) /home/mjs/src/pika/libs/pika/execution_base/include/pika/execution_base/agent_ref.hpp:42 (libpika.so.0+0x1ee5ee)
    #1 pika::detail::condition_variable::notify_all(std::unique_lock<pika::concurrency::detail::spinlock>, pika::execution::thread_priority, pika::error_code&) /home/mjs/src/pika/libs/pika/synchronization/src/detail/condition_variable.cpp:113 (libpika.so.0+0x1ee5ee)
    #2 pika::stop_callback<pika::condition_variable_any::wait_until<std::unique_lock<pika::mutex>, test_timed_wait<std::chrono::duration<long, std::ratio<1l, 1000l> > >(bool, bool, std::chrono::duration<long, std::ratio<1l, 1000l> >)::{lambda(pika::stop_token)#1}::operator(
)(pika::stop_token) const::{lambda()#1}>(std::unique_lock<pika::mutex>&, pika::stop_token, pika::chrono::steady_time_point const&, test_timed_wait<std::chrono::duration<long, std::ratio<1l, 1000l> > >(bool, bool, std::chrono::duration<long, std::ratio<1l, 1000l> >)::{lambda
(pika::stop_token)#1}::operator()(pika::stop_token) const::{lambda()#1}, pika::error_code&)::{lambda()#1}>::execute() /home/mjs/src/pika/libs/pika/synchronization/include/pika/synchronization/detail/condition_variable.hpp:105 (condition_variable2_test+0x10445d)
    #3 pika::detail::stop_state::request_stop() /home/mjs/src/pika/libs/pika/synchronization/src/stop_token.cpp:263 (libpika.so.0+0x1f1517)
    #4 pika::stop_source::request_stop() /home/mjs/src/pika/libs/pika/synchronization/include/pika/synchronization/stop_token.hpp:370 (condition_variable2_test+0xffef3)
    #5 pika::jthread::request_stop() /home/mjs/src/pika/libs/pika/threading/include/pika/threading/jthread.hpp:223 (condition_variable2_test+0xffef3)
    #6 void test_timed_wait<std::chrono::duration<long, std::ratio<1l, 1000l> > >(bool, bool, std::chrono::duration<long, std::ratio<1l, 1000l> >) /home/mjs/src/pika/libs/pika/threading/tests/unit/condition_variable2.cpp:446 (condition_variable2_test+0xffef3)
    #7 pika_main() /home/mjs/src/pika/libs/pika/threading/tests/unit/condition_variable2.cpp:583 (condition_variable2_test+0xfae64)
    #8 pika::this_thread::suspend(pika::threads::detail::thread_schedule_state, pika::detail::thread_description const&, pika::error_code&) /home/mjs/src/pika/libs/pika/threading_base/include/pika/threading_base/thread_helpers.hpp:450 (libpika.so.0+0x291a12)
    #9 pika::thread::join() /home/mjs/src/pika/libs/pika/threading/src/thread.cpp:210 (libpika.so.0+0x291a12)
    #10 pika::jthread::join() /home/mjs/src/pika/libs/pika/threading/include/pika/threading/jthread.hpp:187 (condition_variable2_test+0xff4ef)
    #11 pika::jthread::~jthread() /home/mjs/src/pika/libs/pika/threading/include/pika/threading/jthread.hpp:128 (condition_variable2_test+0xff4ef)
    #12 test_minimal_wait_for(int, int) /home/mjs/src/pika/libs/pika/threading/tests/unit/condition_variable2.cpp:292 (condition_variable2_test+0xfabe1)
    #13 pika_main() /home/mjs/src/pika/libs/pika/threading/tests/unit/condition_variable2.cpp:550 (condition_variable2_test+0xfada4)

@msimberg msimberg added this to the 0.22.0 milestone Jan 23, 2024
@msimberg msimberg self-assigned this Jan 23, 2024
@msimberg
Copy link
Contributor Author

cscs-ci run ci-on-merge

@msimberg msimberg force-pushed the condition-variable-queue-handling branch from 5012a35 to ce75fcf Compare January 23, 2024 17:17
@msimberg
Copy link
Contributor Author

cscs-ci run ci-on-merge

@msimberg msimberg added this pull request to the merge queue Jan 24, 2024
Copy link
Contributor

@aurianer aurianer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM thanks!

@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to failed status checks Jan 24, 2024
@msimberg
Copy link
Contributor Author

The test failure (shared_mutex1) is unrelated to these changes and is already listed on #191.

@msimberg msimberg added this pull request to the merge queue Jan 24, 2024
@msimberg
Copy link
Contributor Author

Clariden CI jobs are not running currently due to maintenance, merging manually. All other jobs finished successfully on the latest run.

@msimberg msimberg removed this pull request from the merge queue due to a manual request Jan 24, 2024
@msimberg msimberg merged commit d76fa80 into pika-org:main Jan 24, 2024
53 of 56 checks passed
@msimberg msimberg deleted the condition-variable-queue-handling branch January 24, 2024 08:26
msimberg added a commit to msimberg/pika that referenced this pull request Jan 24, 2024
msimberg added a commit to msimberg/pika that referenced this pull request Jan 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

2 participants