From 8d586c6cc1c51d8a2d3b5b8049aeef8c5f08381c Mon Sep 17 00:00:00 2001 From: Mikael Simberg Date: Fri, 27 Jan 2023 21:19:29 +0100 Subject: [PATCH 1/9] Move completion schedulers to environments --- .../pika/async_cuda/cuda_scheduler.hpp | 21 ++++-- .../include/pika/async_cuda/then_on_host.hpp | 15 ++-- .../pika/async_cuda/then_with_stream.hpp | 27 ++++--- .../async_cuda/tests/unit/cuda_scheduler.cu | 6 +- .../async_cuda/tests/unit/then_with_stream.cu | 42 ++++++++--- .../include/pika/async_mpi/transform_mpi.hpp | 14 ++-- .../pika/execution/algorithms/bulk.hpp | 19 ++--- .../pika/execution/algorithms/drop_value.hpp | 29 +++----- .../pika/execution/algorithms/make_future.hpp | 11 ++- .../execution/algorithms/schedule_from.hpp | 50 ++++++------- .../pika/execution/algorithms/split_tuple.hpp | 18 +++-- .../pika/execution/algorithms/sync_wait.hpp | 9 ++- .../pika/execution/algorithms/then.hpp | 22 ++---- .../pika/execution/algorithms/transfer.hpp | 3 +- .../execution/algorithms/when_all_vector.hpp | 51 ++++++++----- .../tests/include/algorithm_test_utils.hpp | 71 +++++++++++++++---- .../tests/unit/algorithm_execute.cpp | 17 +++-- .../tests/unit/algorithm_transfer.cpp | 19 +++-- .../pika/execution_base/any_sender.hpp | 6 +- .../execution_base/completion_scheduler.hpp | 38 +++++----- .../include/pika/execution_base/receiver.hpp | 16 ----- .../include/pika/execution_base/sender.hpp | 24 ++++++- .../execution_base/tests/unit/any_sender.cpp | 8 +-- .../tests/unit/basic_receiver.cpp | 9 +-- .../tests/unit/basic_schedule.cpp | 13 +++- .../tests/unit/basic_sender.cpp | 4 +- .../pika/executors/std_thread_scheduler.hpp | 19 +++-- .../pika/executors/thread_pool_scheduler.hpp | 23 ++++-- .../executors/thread_pool_scheduler_bulk.hpp | 50 ++++--------- .../tests/unit/std_thread_scheduler.cpp | 20 +++--- .../tests/unit/thread_pool_scheduler.cpp | 20 +++--- 31 files changed, 389 insertions(+), 305 deletions(-) diff --git a/libs/pika/async_cuda/include/pika/async_cuda/cuda_scheduler.hpp b/libs/pika/async_cuda/include/pika/async_cuda/cuda_scheduler.hpp index 7901e7d0c0..978ae80383 100644 --- a/libs/pika/async_cuda/include/pika/async_cuda/cuda_scheduler.hpp +++ b/libs/pika/async_cuda/include/pika/async_cuda/cuda_scheduler.hpp @@ -149,12 +149,23 @@ namespace pika::cuda::experimental { return {s.scheduler, PIKA_FORWARD(Receiver, receiver)}; } - friend cuda_scheduler tag_invoke( - pika::execution::experimental::get_completion_scheduler_t< - pika::execution::experimental::set_value_t>, - cuda_scheduler_sender const& s) noexcept + struct env { - return s.scheduler; + cuda_scheduler scheduler; + + friend cuda_scheduler tag_invoke( + pika::execution::experimental::get_completion_scheduler_t< + pika::execution::experimental::set_value_t>, + env const& e) noexcept + { + return e.scheduler; + } + }; + + friend env tag_invoke(pika::execution::experimental::get_env_t, + cuda_scheduler_sender const& s) + { + return {s.scheduler}; } }; } // namespace detail diff --git a/libs/pika/async_cuda/include/pika/async_cuda/then_on_host.hpp b/libs/pika/async_cuda/include/pika/async_cuda/then_on_host.hpp index a623366f72..2e15abeb0c 100644 --- a/libs/pika/async_cuda/include/pika/async_cuda/then_on_host.hpp +++ b/libs/pika/async_cuda/include/pika/async_cuda/then_on_host.hpp @@ -117,7 +117,7 @@ namespace pika::cuda::experimental { }); } - friend constexpr pika::execution::experimental::detail::empty_env + friend constexpr pika::execution::experimental::empty_env tag_invoke(pika::execution::experimental::get_env_t, then_on_host_receiver_type const&) noexcept { @@ -169,7 +169,7 @@ namespace pika::cuda::experimental { using completion_signatures = pika::execution::experimental::make_completion_signatures< - Sender, pika::execution::experimental::detail::empty_env, + Sender, pika::execution::experimental::empty_env, pika::execution::experimental::completion_signatures< pika::execution::experimental::set_error_t( std::exception_ptr)>, @@ -225,12 +225,10 @@ namespace pika::cuda::experimental { PIKA_FORWARD(Receiver, receiver), s.f, s.sched}); } - friend cuda_scheduler tag_invoke( - pika::execution::experimental::get_completion_scheduler_t< - pika::execution::experimental::set_value_t>, - then_on_host_sender_type const& s) noexcept + friend auto tag_invoke(pika::execution::experimental::get_env_t, + then_on_host_sender_type const& s) { - return s.sched; + return pika::execution::experimental::get_env(s.sender); } }; } // namespace then_on_host_detail @@ -254,7 +252,8 @@ namespace pika::cuda::experimental { { auto completion_sched = pika::execution::experimental::get_completion_scheduler< - pika::execution::experimental::set_value_t>(sender); + pika::execution::experimental::set_value_t>( + pika::execution::experimental::get_env(sender)); static_assert( std::is_same_v, cuda_scheduler>, diff --git a/libs/pika/async_cuda/include/pika/async_cuda/then_with_stream.hpp b/libs/pika/async_cuda/include/pika/async_cuda/then_with_stream.hpp index 208cdd189f..dc01576b49 100644 --- a/libs/pika/async_cuda/include/pika/async_cuda/then_with_stream.hpp +++ b/libs/pika/async_cuda/include/pika/async_cuda/then_with_stream.hpp @@ -184,8 +184,7 @@ namespace pika::cuda::experimental::then_with_stream_detail { using completion_signatures = pika::execution::experimental::make_completion_signatures< - std::decay_t, - pika::execution::experimental::detail::empty_env, + std::decay_t, pika::execution::experimental::empty_env, pika::execution::experimental::completion_signatures< pika::execution::experimental::set_error_t( std::exception_ptr)>, @@ -462,10 +461,9 @@ namespace pika::cuda::experimental::then_with_stream_detail { }); } - friend constexpr pika::execution::experimental::detail:: - empty_env - tag_invoke(pika::execution::experimental::get_env_t, - then_with_cuda_stream_receiver const&) noexcept + friend constexpr pika::execution::experimental::empty_env + tag_invoke(pika::execution::experimental::get_env_t, + then_with_cuda_stream_receiver const&) noexcept { return {}; } @@ -499,8 +497,8 @@ namespace pika::cuda::experimental::then_with_stream_detail { pika::util::detail::transform_t< pika::execution::experimental::value_types_of_t< std::decay_t, - pika::execution::experimental::detail::empty_env, - std::tuple, pika::detail::variant>, + pika::execution::experimental::empty_env, std::tuple, + pika::detail::variant>, value_types_helper>, pika::detail::monostate>; #else @@ -548,7 +546,7 @@ namespace pika::cuda::experimental::then_with_stream_detail { pika::util::detail::transform_t< pika::execution::experimental::value_types_of_t< then_with_cuda_stream_sender_type, - pika::execution::experimental::detail::empty_env, + pika::execution::experimental::empty_env, pika::util::detail::pack, pika::util::detail::pack>, result_types_helper>, pika::detail::monostate>>>; @@ -600,12 +598,10 @@ namespace pika::cuda::experimental::then_with_stream_detail { PIKA_FORWARD(Receiver, receiver), s.f, s.sched, s.sender); } - friend cuda_scheduler tag_invoke( - pika::execution::experimental::get_completion_scheduler_t< - pika::execution::experimental::set_value_t>, - then_with_cuda_stream_sender_type const& s) noexcept + friend auto tag_invoke(pika::execution::experimental::get_env_t, + then_with_cuda_stream_sender_type const& s) { - return s.sched; + return pika::execution::experimental::get_env(s.sender); } }; @@ -616,7 +612,8 @@ namespace pika::cuda::experimental::then_with_stream_detail { { auto completion_sched = pika::execution::experimental::get_completion_scheduler< - pika::execution::experimental::set_value_t>(sender); + pika::execution::experimental::set_value_t>( + pika::execution::experimental::get_env(sender)); static_assert(std::is_same_v, cuda_scheduler>, "then_with_cuda_stream can only be used with senders whose " diff --git a/libs/pika/async_cuda/tests/unit/cuda_scheduler.cu b/libs/pika/async_cuda/tests/unit/cuda_scheduler.cu index 6fa1157450..ce42c54e6c 100644 --- a/libs/pika/async_cuda/tests/unit/cuda_scheduler.cu +++ b/libs/pika/async_cuda/tests/unit/cuda_scheduler.cu @@ -30,12 +30,12 @@ inline constexpr bool is_cuda_scheduler_v = std::is_same_v, cu::cuda_scheduler>; #define CHECK_CUDA_COMPLETION_SCHEDULER(...) \ - static_assert(is_cuda_scheduler_v< \ - decltype(ex::get_completion_scheduler(__VA_ARGS__))>) + static_assert(is_cuda_scheduler_v(ex::get_env(__VA_ARGS__)))>) #define CHECK_NOT_CUDA_COMPLETION_SCHEDULER(...) \ static_assert(!is_cuda_scheduler_v(__VA_ARGS__))>) + ex::set_value_t>(ex::get_env(__VA_ARGS__)))>) int pika_main() { diff --git a/libs/pika/async_cuda/tests/unit/then_with_stream.cu b/libs/pika/async_cuda/tests/unit/then_with_stream.cu index 4789978e10..fb4592ecf2 100644 --- a/libs/pika/async_cuda/tests/unit/then_with_stream.cu +++ b/libs/pika/async_cuda/tests/unit/then_with_stream.cu @@ -67,12 +67,23 @@ struct const_reference_cuda_sender return operation_state{std::move(s.x), std::forward(r)}; } - friend cu::cuda_scheduler tag_invoke( - pika::execution::experimental::get_completion_scheduler_t< - pika::execution::experimental::set_value_t>, - const_reference_cuda_sender const& s) noexcept + struct env { - return s.sched; + cu::cuda_scheduler sched; + + friend cu::cuda_scheduler tag_invoke( + pika::execution::experimental::get_completion_scheduler_t< + pika::execution::experimental::set_value_t>, + env const& e) noexcept + { + return e.sched; + } + }; + + friend env tag_invoke(pika::execution::experimental::get_env_t, + const_reference_cuda_sender const& s) + { + return {s.sched}; } }; @@ -118,12 +129,23 @@ struct const_reference_error_cuda_sender return {std::forward(r)}; } - friend cu::cuda_scheduler tag_invoke( - pika::execution::experimental::get_completion_scheduler_t< - pika::execution::experimental::set_value_t>, - const_reference_error_cuda_sender const& s) noexcept + struct env + { + cu::cuda_scheduler sched; + + friend cu::cuda_scheduler tag_invoke( + pika::execution::experimental::get_completion_scheduler_t< + pika::execution::experimental::set_value_t>, + env const& e) noexcept + { + return e.sched; + } + }; + + friend env tag_invoke(pika::execution::experimental::get_env_t, + const_reference_error_cuda_sender const& s) { - return s.sched; + return {s.sched}; } }; diff --git a/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp b/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp index 13aaab520b..92fe3c6797 100644 --- a/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp +++ b/libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp @@ -115,7 +115,7 @@ namespace pika::mpi::experimental { using completion_signatures = pika::execution::experimental::make_completion_signatures< std::decay_t, - pika::execution::experimental::detail::empty_env, + pika::execution::experimental::empty_env, pika::execution::experimental::completion_signatures< pika::execution::experimental::set_error_t( std::exception_ptr)>, @@ -262,10 +262,9 @@ namespace pika::mpi::experimental { }); } - friend constexpr pika::execution::experimental::detail:: - empty_env - tag_invoke(pika::execution::experimental::get_env_t, - transform_mpi_receiver const&) noexcept + friend constexpr pika::execution::experimental::empty_env + tag_invoke(pika::execution::experimental::get_env_t, + transform_mpi_receiver const&) noexcept { return {}; } @@ -288,7 +287,7 @@ namespace pika::mpi::experimental { pika::util::detail::transform_t< pika::execution::experimental::value_types_of_t< std::decay_t, - pika::execution::experimental::detail::empty_env, + pika::execution::experimental::empty_env, std::tuple, pika::detail::variant>, value_types_helper>, pika::detail::monostate>; @@ -340,8 +339,7 @@ namespace pika::mpi::experimental { pika::util::detail::transform_t< pika::execution::experimental::value_types_of_t< transform_mpi_sender_type, - pika::execution::experimental::detail:: - empty_env, + pika::execution::experimental::empty_env, pika::util::detail::pack, pika::util::detail::pack>, result_types_helper>, diff --git a/libs/pika/execution/include/pika/execution/algorithms/bulk.hpp b/libs/pika/execution/include/pika/execution/algorithms/bulk.hpp index 394d944fe3..21455900b1 100644 --- a/libs/pika/execution/include/pika/execution/algorithms/bulk.hpp +++ b/libs/pika/execution/include/pika/execution/algorithms/bulk.hpp @@ -63,20 +63,10 @@ namespace pika::bulk_detail { static constexpr bool sends_done = false; - template && - pika::execution::experimental::detail::has_completion_scheduler_v< - CPO, std::decay_t>) - // clang-format on - > - friend constexpr auto tag_invoke( - pika::execution::experimental::get_completion_scheduler_t, - bulk_sender_type const& sender) + friend constexpr decltype(auto) tag_invoke( + pika::execution::experimental::get_env_t, bulk_sender_type const& s) { - return pika::execution::experimental::get_completion_scheduler( - sender.sender); + return pika::execution::experimental::get_env(s.sender); } template @@ -182,7 +172,8 @@ namespace pika::execution::experimental { { auto scheduler = pika::execution::experimental::get_completion_scheduler< - pika::execution::experimental::set_value_t>(sender); + pika::execution::experimental::set_value_t>( + pika::execution::experimental::get_env(sender)); return pika::functional::detail::tag_invoke(bulk_t{}, PIKA_MOVE(scheduler), PIKA_FORWARD(Sender, sender), shape, PIKA_FORWARD(F, f)); diff --git a/libs/pika/execution/include/pika/execution/algorithms/drop_value.hpp b/libs/pika/execution/include/pika/execution/algorithms/drop_value.hpp index 942642192c..34a1aacde7 100644 --- a/libs/pika/execution/include/pika/execution/algorithms/drop_value.hpp +++ b/libs/pika/execution/include/pika/execution/algorithms/drop_value.hpp @@ -57,8 +57,8 @@ namespace pika::drop_value_detail { pika::execution::experimental::set_value(PIKA_MOVE(r.receiver)); } - friend constexpr pika::execution::experimental::detail::empty_env - tag_invoke(pika::execution::experimental::get_env_t, + friend constexpr pika::execution::experimental::empty_env tag_invoke( + pika::execution::experimental::get_env_t, drop_value_receiver_type const&) noexcept { return {}; @@ -88,7 +88,7 @@ namespace pika::drop_value_detail { using completion_signatures = pika::execution::experimental::make_completion_signatures, empty_set_value>; #else @@ -106,22 +106,6 @@ namespace pika::drop_value_detail { static constexpr bool sends_done = false; #endif - template && - pika::execution::experimental::detail::has_completion_scheduler_v< - CPO, std::decay_t>) - // clang-format on - > - friend constexpr auto tag_invoke( - pika::execution::experimental::get_completion_scheduler_t, - drop_value_sender_type const& sender) - { - return pika::execution::experimental::get_completion_scheduler( - sender.sender); - } - template friend auto tag_invoke(pika::execution::experimental::connect_t, drop_value_sender_type&& s, Receiver&& receiver) @@ -139,6 +123,13 @@ namespace pika::drop_value_detail { drop_value_receiver{ PIKA_FORWARD(Receiver, receiver)}); } + + friend constexpr decltype(auto) tag_invoke( + pika::execution::experimental::get_env_t, + drop_value_sender_type const& s) + { + return pika::execution::experimental::get_env(s.sender); + } }; } // namespace pika::drop_value_detail diff --git a/libs/pika/execution/include/pika/execution/algorithms/make_future.hpp b/libs/pika/execution/include/pika/execution/algorithms/make_future.hpp index 5037074f95..66a9fdc060 100644 --- a/libs/pika/execution/include/pika/execution/algorithms/make_future.hpp +++ b/libs/pika/execution/include/pika/execution/algorithms/make_future.hpp @@ -99,8 +99,8 @@ namespace pika::make_future_detail { r_local.data.reset(); } - friend constexpr pika::execution::experimental::detail::empty_env - tag_invoke(pika::execution::experimental::get_env_t, + friend constexpr pika::execution::experimental::empty_env tag_invoke( + pika::execution::experimental::get_env_t, make_future_receiver const&) noexcept { return {}; @@ -150,8 +150,8 @@ namespace pika::make_future_detail { r_local.data.reset(); } - friend constexpr pika::execution::experimental::detail::empty_env - tag_invoke(pika::execution::experimental::get_env_t, + friend constexpr pika::execution::experimental::empty_env tag_invoke( + pika::execution::experimental::get_env_t, make_future_receiver const&) noexcept { return {}; @@ -203,8 +203,7 @@ namespace pika::make_future_detail { #if defined(PIKA_HAVE_P2300_REFERENCE_IMPLEMENTATION) using value_types = typename pika::execution::experimental::value_types_of_t< - std::decay_t, - pika::execution::experimental::detail::empty_env, + std::decay_t, pika::execution::experimental::empty_env, pika::util::detail::pack, pika::util::detail::pack>; #else using value_types = typename pika::execution::experimental:: diff --git a/libs/pika/execution/include/pika/execution/algorithms/schedule_from.hpp b/libs/pika/execution/include/pika/execution/algorithms/schedule_from.hpp index 852c1234a7..248d752b9a 100644 --- a/libs/pika/execution/include/pika/execution/algorithms/schedule_from.hpp +++ b/libs/pika/execution/include/pika/execution/algorithms/schedule_from.hpp @@ -74,38 +74,34 @@ namespace pika::schedule_from_detail { static constexpr bool sends_done = false; - template && - (std::is_same_v || - pika::execution::experimental::detail::has_completion_scheduler_v< - pika::execution::experimental::set_error_t, - std::decay_t> || - pika::execution::experimental::detail::has_completion_scheduler_v< - pika::execution::experimental::set_stopped_t, - std::decay_t>)) - // clang-format on - > - friend constexpr auto tag_invoke( - pika::execution::experimental::get_completion_scheduler_t, - schedule_from_sender_type const& sender) + template + struct env { - if constexpr (std::is_same_v, - pika::execution::experimental::set_value_t>) + PIKA_NO_UNIQUE_ADDRESS Env e; + PIKA_NO_UNIQUE_ADDRESS std::decay_t scheduler; + + friend auto tag_invoke( + pika::execution::experimental::get_completion_scheduler_t< + pika::execution::experimental::set_value_t>, + env const& e) noexcept { - return sender.scheduler; + return e.scheduler; } - else + + template )> + friend decltype(auto) tag_invoke(Tag, env const& e) { - return pika::execution::experimental::get_completion_scheduler< - CPO>(sender.predecessor_sender); + return Tag{}(e.e); } - // This silences a bogus warning from nvcc about no return from - // a non-void function. -#if defined(__NVCC__) - __builtin_unreachable(); -#endif + }; + + friend auto tag_invoke(pika::execution::experimental::get_env_t, + schedule_from_sender_type const& s) + { + auto e = + pika::execution::experimental::get_env(s.predecessor_sender); + return env>{std::move(e), s.scheduler}; } template diff --git a/libs/pika/execution/include/pika/execution/algorithms/split_tuple.hpp b/libs/pika/execution/include/pika/execution/algorithms/split_tuple.hpp index d81fc53f72..c4df86c0e9 100644 --- a/libs/pika/execution/include/pika/execution/algorithms/split_tuple.hpp +++ b/libs/pika/execution/include/pika/execution/algorithms/split_tuple.hpp @@ -80,7 +80,7 @@ namespace pika::split_tuple_detail { using value_type = typename std::decay< pika::execution::experimental::detail::single_result_t< typename pika::execution::experimental::value_types_of_t>>::type; #else using value_type = typename std::decay< @@ -95,8 +95,7 @@ namespace pika::split_tuple_detail { pika::util::detail::unique_t, std::decay>, std::exception_ptr>>; @@ -144,8 +143,7 @@ namespace pika::split_tuple_detail { using value_type = typename std::decay< pika::execution::experimental::detail::single_result_t< typename pika::execution::experimental::value_types_of_t< - Sender, - pika::execution::experimental::detail::empty_env, + Sender, pika::execution::experimental::empty_env, pika::util::detail::pack, pika::util::detail::pack>>>:: type; #else @@ -170,7 +168,7 @@ namespace pika::split_tuple_detail { r.state.set_predecessor_done(); } - friend constexpr pika::execution::experimental::detail::empty_env + friend constexpr pika::execution::experimental::empty_env tag_invoke(pika::execution::experimental::get_env_t, split_tuple_receiver const&) noexcept { @@ -213,7 +211,7 @@ namespace pika::split_tuple_detail { constexpr bool sends_stopped = #if defined(PIKA_HAVE_P2300_REFERENCE_IMPLEMENTATION) pika::execution::experimental::sends_stopped + pika::execution::experimental::empty_env> #else pika::execution::experimental::sender_traits< Sender>::sends_done @@ -413,7 +411,7 @@ namespace pika::split_tuple_detail { using value_type = typename std::decay< pika::execution::experimental::detail::single_result_t< typename pika::execution::experimental::value_types_of_t>>::type; #else using value_type = typename std::decay< @@ -449,7 +447,7 @@ namespace pika::split_tuple_detail { using completion_signatures = pika::execution::experimental::make_completion_signatures, @@ -567,7 +565,7 @@ namespace pika::split_tuple_detail { using value_type = typename std::decay< pika::execution::experimental::detail::single_result_t< typename pika::execution::experimental::value_types_of_t>>::type; #else using value_type = typename std::decay< diff --git a/libs/pika/execution/include/pika/execution/algorithms/sync_wait.hpp b/libs/pika/execution/include/pika/execution/algorithms/sync_wait.hpp index fc72fd511e..ac3b1ab60a 100644 --- a/libs/pika/execution/include/pika/execution/algorithms/sync_wait.hpp +++ b/libs/pika/execution/include/pika/execution/algorithms/sync_wait.hpp @@ -63,13 +63,12 @@ namespace pika::sync_wait_detail { template class Variant> using predecessor_value_types = pika::execution::experimental::value_types_of_t; + pika::execution::experimental::empty_env, Tuple, Variant>; template