diff --git a/libs/pika/execution/CMakeLists.txt b/libs/pika/execution/CMakeLists.txt index abe425a3ab..f5dfe31dc7 100644 --- a/libs/pika/execution/CMakeLists.txt +++ b/libs/pika/execution/CMakeLists.txt @@ -17,6 +17,7 @@ set(execution_headers pika/execution/algorithms/just.hpp pika/execution/algorithms/let_error.hpp pika/execution/algorithms/let_value.hpp + pika/execution/algorithms/require_started.hpp pika/execution/algorithms/schedule_from.hpp pika/execution/algorithms/split.hpp pika/execution/algorithms/split_tuple.hpp diff --git a/libs/pika/execution/include/pika/execution/algorithms/require_started.hpp b/libs/pika/execution/include/pika/execution/algorithms/require_started.hpp new file mode 100644 index 0000000000..86545d2041 --- /dev/null +++ b/libs/pika/execution/include/pika/execution/algorithms/require_started.hpp @@ -0,0 +1,448 @@ +// Copyright (c) 2023 ETH Zurich +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace pika { +#if !defined(PIKA_HAVE_STDEXEC) + namespace execution::experimental { + // We only make the choice of mode available when not using stdexec. stdexec's sender + // concepts require nothrow destructibility, which is not satisfied by throw_on_unstarted. + // With stdexec enabled, an unstarted sender will always terminate. + enum class require_started_mode + { + terminate_on_unstarted, + throw_on_unstarted + }; + } // namespace execution::experimental +#endif + + namespace require_started_detail { +#if defined(PIKA_HAVE_STDEXEC) +# define PIKA_DETAIL_HANDLE_UNSTARTED_REQUIRE_STARTED_SENDER(f, message) \ + fmt::print(std::cerr, "{}: {}\n", f, message); \ + std::terminate(); +#else + using pika::execution::experimental::require_started_mode; + +# define PIKA_DETAIL_HANDLE_UNSTARTED_REQUIRE_STARTED_SENDER(mode, f, message) \ + { \ + switch (mode) \ + { \ + case require_started_mode::terminate_on_unstarted: \ + fmt::print(std::cerr, "{}: {}\n", f, message); \ + std::terminate(); \ + break; \ + \ + case require_started_mode::throw_on_unstarted: \ + PIKA_THROW_EXCEPTION(pika::error::invalid_status, f, fmt::runtime(message)); \ + break; \ + } \ + } +#endif + + template + struct require_started_receiver_impl + { + struct require_started_receiver_type; + }; + + template + using require_started_receiver = + typename require_started_receiver_impl::require_started_receiver_type; + + template + struct require_started_receiver_impl::require_started_receiver_type + { + using is_receiver = void; + + OpState* op_state = nullptr; + + template + friend void tag_invoke(pika::execution::experimental::set_error_t, + require_started_receiver_type r, Error&& error) noexcept + { + PIKA_ASSERT(r.op_state != nullptr); + pika::execution::experimental::set_error( + PIKA_MOVE(r.op_state->receiver), PIKA_FORWARD(Error, error)); + } + + friend void tag_invoke(pika::execution::experimental::set_stopped_t, + require_started_receiver_type r) noexcept + { + PIKA_ASSERT(r.op_state != nullptr); + pika::execution::experimental::set_stopped(PIKA_MOVE(r.op_state->receiver)); + }; + + template + friend void tag_invoke(pika::execution::experimental::set_value_t, + require_started_receiver_type r, Ts&&... ts) noexcept + { + PIKA_ASSERT(r.op_state != nullptr); + pika::execution::experimental::set_value( + PIKA_MOVE(r.op_state->receiver), PIKA_FORWARD(Ts, ts)...); + } + + friend constexpr pika::execution::experimental::empty_env tag_invoke( + pika::execution::experimental::get_env_t, + require_started_receiver_type const&) noexcept + { + return {}; + } + }; + + template + struct require_started_op_state_impl + { + struct require_started_op_state_type; + }; + + template + using require_started_op_state = + typename require_started_op_state_impl::require_started_op_state_type; + + template + struct require_started_op_state_impl::require_started_op_state_type + { + using operation_state_type = pika::execution::experimental::connect_result_t>; + + PIKA_NO_UNIQUE_ADDRESS std::decay_t receiver; + std::optional op_state{std::nullopt}; +#if !defined(PIKA_HAVE_STDEXEC) + require_started_mode mode{require_started_mode::terminate_on_unstarted}; +#endif + bool started{false}; + + template + require_started_op_state_type(std::decay_t sender, Receiver_&& receiver +#if !defined(PIKA_HAVE_STDEXEC) + , + require_started_mode mode +#endif + ) + : receiver(PIKA_FORWARD(Receiver_, receiver)) + , op_state(pika::detail::with_result_of([&]() { + return pika::execution::experimental::connect(PIKA_MOVE(sender), + require_started_receiver{this}); + })) +#if !defined(PIKA_HAVE_STDEXEC) + , mode(mode) +#endif + { + } + + ~require_started_op_state_type() +#if !defined(PIKA_HAVE_STDEXEC) + noexcept(false) +#endif + { + if (!started) + { + op_state.reset(); + +#if defined(PIKA_HAVE_STDEXEC) + PIKA_DETAIL_HANDLE_UNSTARTED_REQUIRE_STARTED_SENDER( + "pika::execution::experimental::~require_started_operation_state", + "The operation state of a require_started sender was never started"); +#else + PIKA_DETAIL_HANDLE_UNSTARTED_REQUIRE_STARTED_SENDER(mode, + "pika::execution::experimental::~require_started_operation_state", + "The operation state of a require_started sender was never started"); +#endif + } + } + require_started_op_state_type(require_started_op_state_type&) = delete; + require_started_op_state_type& operator=(require_started_op_state_type&) = delete; + require_started_op_state_type(require_started_op_state_type const&) = delete; + require_started_op_state_type& operator=(require_started_op_state_type const&) = delete; + + friend void tag_invoke( + pika::execution::experimental::start_t, require_started_op_state_type& os) noexcept + { + PIKA_ASSERT(os.op_state.has_value()); + + os.started = true; + pika::execution::experimental::start(os.op_state.value()); + } + }; + + template + struct require_started_sender_impl + { + struct require_started_sender_type; + }; + + template + using require_started_sender = + typename require_started_sender_impl::require_started_sender_type; + + template + struct require_started_sender_impl::require_started_sender_type + { + using is_sender = void; + + std::optional> sender{std::nullopt}; +#if !defined(PIKA_HAVE_STDEXEC) + require_started_mode mode{require_started_mode::terminate_on_unstarted}; +#endif + mutable bool connected{false}; + +#if defined(PIKA_HAVE_STDEXEC) + using completion_signatures = + pika::execution::experimental::make_completion_signatures, + pika::execution::experimental::empty_env>; +#else + template