diff --git a/libs/pika/execution/CMakeLists.txt b/libs/pika/execution/CMakeLists.txt index ed1d69d7e..161f93f05 100644 --- a/libs/pika/execution/CMakeLists.txt +++ b/libs/pika/execution/CMakeLists.txt @@ -10,6 +10,7 @@ set(execution_headers pika/execution/algorithms/bulk.hpp pika/execution/algorithms/detail/helpers.hpp pika/execution/algorithms/detail/partial_algorithm.hpp + pika/execution/algorithms/drop_operation_state.hpp pika/execution/algorithms/drop_value.hpp pika/execution/algorithms/ensure_started.hpp pika/execution/algorithms/execute.hpp diff --git a/libs/pika/execution/include/pika/execution/algorithms/drop_operation_state.hpp b/libs/pika/execution/include/pika/execution/algorithms/drop_operation_state.hpp new file mode 100644 index 000000000..5a7657731 --- /dev/null +++ b/libs/pika/execution/include/pika/execution/algorithms/drop_operation_state.hpp @@ -0,0 +1,251 @@ +// Copyright (c) 2023 ETH Zurich +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace pika::drop_op_state_detail { + template + struct drop_op_state_receiver_impl + { + struct drop_op_state_receiver_type; + }; + + template + using drop_op_state_receiver = + typename drop_op_state_receiver_impl::drop_op_state_receiver_type; + + template + struct drop_op_state_receiver_impl::drop_op_state_receiver_type + { + using is_receiver = void; + + OpState* op_state = nullptr; + + template + friend void tag_invoke(pika::execution::experimental::set_error_t, + drop_op_state_receiver_type r, Error&& error) noexcept + { + PIKA_ASSERT(r.op_state != nullptr); + PIKA_ASSERT(r.op_state->op_state.has_value()); + + try + { + auto error_local = PIKA_FORWARD(Error, error); + r.op_state->op_state.reset(); + + pika::execution::experimental::set_error( + PIKA_MOVE(r.op_state->receiver), PIKA_MOVE(error_local)); + } + catch (...) + { + r.op_state->op_state.reset(); + + pika::execution::experimental::set_error( + PIKA_MOVE(r.op_state->receiver), std::current_exception()); + } + } + + friend void tag_invoke( + pika::execution::experimental::set_stopped_t, drop_op_state_receiver_type r) noexcept + { + PIKA_ASSERT(r.op_state != nullptr); + PIKA_ASSERT(r.op_state->op_state.has_value()); + + r.op_state->op_state.reset(); + + pika::execution::experimental::set_stopped(PIKA_MOVE(r.op_state->receiver)); + }; + + template + friend void tag_invoke(pika::execution::experimental::set_value_t, + drop_op_state_receiver_type r, Ts&&... ts) noexcept + { + PIKA_ASSERT(r.op_state != nullptr); + PIKA_ASSERT(r.op_state->op_state.has_value()); + + try + { + auto ts_local = std::tuple(PIKA_FORWARD(Ts, ts)...); + r.op_state->op_state.reset(); + + std::apply(pika::util::detail::bind_front(pika::execution::experimental::set_value, + PIKA_MOVE(r.op_state->receiver)), + PIKA_MOVE(ts_local)); + } + catch (...) + { + r.op_state->op_state.reset(); + + pika::execution::experimental::set_error( + PIKA_MOVE(r.op_state->receiver), std::current_exception()); + } + } + + friend constexpr pika::execution::experimental::empty_env tag_invoke( + pika::execution::experimental::get_env_t, drop_op_state_receiver_type const&) noexcept + { + return {}; + } + }; + + template + struct drop_op_state_op_state_impl + { + struct drop_op_state_op_state_type; + }; + + template + using drop_op_state_op_state = + typename drop_op_state_op_state_impl::drop_op_state_op_state_type; + + template + struct drop_op_state_op_state_impl::drop_op_state_op_state_type + { + PIKA_NO_UNIQUE_ADDRESS std::decay_t receiver; + using operation_state_type = pika::execution::experimental::connect_result_t>; + std::optional op_state; + + template + drop_op_state_op_state_type(std::decay_t sender, Receiver_&& receiver) + : receiver(PIKA_FORWARD(Receiver_, receiver)) + , op_state(pika::detail::with_result_of([&]() mutable { + return pika::execution::experimental::connect( + PIKA_MOVE(sender), drop_op_state_receiver{this}); + })) + { + } + drop_op_state_op_state_type(drop_op_state_op_state_type&) = delete; + drop_op_state_op_state_type& operator=(drop_op_state_op_state_type&) = delete; + drop_op_state_op_state_type(drop_op_state_op_state_type const&) = delete; + drop_op_state_op_state_type& operator=(drop_op_state_op_state_type const&) = delete; + + friend void tag_invoke( + pika::execution::experimental::start_t, drop_op_state_op_state_type& os) noexcept + { + PIKA_ASSERT(os.op_state.has_value()); + pika::execution::experimental::start(os.op_state.value()); + } + }; + + template + struct drop_op_state_sender_impl + { + struct drop_op_state_sender_type; + }; + + template + using drop_op_state_sender = + typename drop_op_state_sender_impl::drop_op_state_sender_type; + + template + struct drop_op_state_sender_impl::drop_op_state_sender_type + { + using is_sender = void; + + std::decay_t sender; + +#if defined(PIKA_HAVE_STDEXEC) + template + using value_types_helper = pika::execution::experimental::completion_signatures< + pika::execution::experimental::set_value_t(std::decay_t&&...)>; + + using completion_signatures = + pika::execution::experimental::make_completion_signatures, + pika::execution::experimental::empty_env, + pika::execution::experimental::completion_signatures< + pika::execution::experimental::set_error_t(std::exception_ptr)>, + value_types_helper>; +#else + template + struct value_types_helper + { + using type = + pika::util::detail::transform_t, + std::add_rvalue_reference>; + }; + + template