Skip to content

Commit

Permalink
Merge 705c413 into 1b5dbca
Browse files Browse the repository at this point in the history
  • Loading branch information
hcq9102 authored Sep 19, 2022
2 parents 1b5dbca + 705c413 commit 9fff673
Show file tree
Hide file tree
Showing 4 changed files with 498 additions and 11 deletions.
154 changes: 143 additions & 11 deletions libs/core/execution/include/hpx/execution/algorithms/sync_wait.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@
#include <type_traits>
#include <utility>

enum class Sync_wait_type
{
single,
variant
};

namespace hpx::execution::experimental::detail {

struct sync_wait_error_visitor
Expand Down Expand Up @@ -84,7 +90,7 @@ namespace hpx::execution::experimental::detail {
template <typename Pack>
using make_decayed_pack_t = typename make_decayed_pack<Pack>::type;

template <typename Sender>
template <typename Sender, Sync_wait_type Type>
struct sync_wait_receiver
{
// value and error_types of the predecessor sender
Expand All @@ -108,9 +114,11 @@ namespace hpx::execution::experimental::detail {
single_variant_t<predecessor_value_types<hpx::tuple, meta::pack>>>;

// The template should compute the result type of whatever returned from
// sync_wait, which should be optional of the variant of the tuples. The
// sync_wait works when the variant has one tuple.
using result_type = hpx::variant<single_result_type>;
// sync_wait or sync_wait_with_variant by checking Sync_wait_type is
// single or variant
using result_type = std::conditional_t<Type == Sync_wait_type::single,
hpx::variant<single_result_type>,
predecessor_value_types<hpx::tuple, hpx::variant>>;

// The type of errors to store in the variant. This in itself is a
// variant.
Expand All @@ -131,8 +139,19 @@ namespace hpx::execution::experimental::detail {
{
// pull the tuple out of the variant and wrap it into an
// optional, make sure to remove the references
return hpx::optional<single_result_type>(
hpx::get<0>(hpx::get<result_type>(HPX_MOVE(value))));
// return hpx::optional<single_result_type>(
// hpx::get<0>(hpx::get<result_type>(HPX_MOVE(value))));

if constexpr (Type == Sync_wait_type::single)
{
return hpx::optional<single_result_type>(hpx::get<0>(
hpx::get<result_type>(HPX_MOVE(value))));
}
else if constexpr (Type == Sync_wait_type::variant)
{
return hpx::optional(
hpx::get<result_type>(HPX_MOVE(value)));
}
}
else if (hpx::holds_alternative<error_type>(value))
{
Expand All @@ -145,7 +164,7 @@ namespace hpx::execution::experimental::detail {
// this means that none of set_value/set_error/set_stopped was
// called.
HPX_ASSERT(hpx::holds_alternative<stopped_type>(value));
return hpx::optional<single_result_type>();
return hpx::optional<result_type>();
}
};

Expand Down Expand Up @@ -342,8 +361,8 @@ namespace hpx::this_thread::experimental {
Sender&& sender)
{
using receiver_type =
hpx::execution::experimental::detail::sync_wait_receiver<
Sender>;
hpx::execution::experimental::detail::sync_wait_receiver<Sender,
Sync_wait_type::single>;
using state_type = typename receiver_type::shared_state;

hpx::execution::experimental::run_loop& loop = sched.get_run_loop();
Expand All @@ -369,8 +388,8 @@ namespace hpx::this_thread::experimental {
sync_wait_t, Sender&& sender)
{
using receiver_type =
hpx::execution::experimental::detail::sync_wait_receiver<
Sender>;
hpx::execution::experimental::detail::sync_wait_receiver<Sender,
Sync_wait_type::single>;
using state_type = typename receiver_type::shared_state;

hpx::execution::experimental::run_loop loop{};
Expand Down Expand Up @@ -404,4 +423,117 @@ namespace hpx::this_thread::experimental {
sync_wait_t>{};
}
} sync_wait{};

////////////////////////////////////////////////////////////////////
// DPO for sync_wait_with_variant

// this_thread::sync_wait_with_variant is a sender consumer that submits
// the work described by the provided sender for execution, similarly to
// ensure_started, except that it blocks the current std::thread or
// thread of main until the work is completed, and returns an optional
// of variant of tuples that were sent by the provided sender on its
// completion of work.
inline constexpr struct sync_wait_with_variant_t final
: hpx::functional::detail::tag_priority<sync_wait_with_variant_t>
{
private:
// clang-format off
template <typename Sender,
HPX_CONCEPT_REQUIRES_(
hpx::execution::experimental::is_sender_v<Sender,
hpx::execution::experimental::detail::sync_wait_receiver_env> &&
hpx::execution::experimental::detail::
is_completion_scheduler_tag_invocable_v<
hpx::execution::experimental::set_value_t,
Sender, sync_wait_with_variant_t
>
)>
// clang-format on
friend constexpr HPX_FORCEINLINE auto tag_override_invoke(
sync_wait_with_variant_t, Sender&& sender)
{
auto scheduler =
hpx::execution::experimental::get_completion_scheduler<
hpx::execution::experimental::set_value_t>(sender);

return hpx::functional::tag_invoke(sync_wait_with_variant_t{},
HPX_MOVE(scheduler), HPX_FORWARD(Sender, sender));
}

// clang-format off
template <typename Sender,
HPX_CONCEPT_REQUIRES_(
hpx::execution::experimental::is_sender_v<Sender,
hpx::execution::experimental::detail::sync_wait_receiver_env>
)>
// clang-format on
friend auto tag_invoke(sync_wait_with_variant_t,
hpx::execution::experimental::run_loop_scheduler const& sched,
Sender&& sender)
{
using receiver_type =
hpx::execution::experimental::detail::sync_wait_receiver<Sender,
Sync_wait_type::variant>;
using state_type = typename receiver_type::shared_state;

hpx::execution::experimental::run_loop& loop = sched.get_run_loop();
state_type state{};
auto op_state = hpx::execution::experimental::connect(
HPX_FORWARD(Sender, sender), receiver_type{state, loop});
hpx::execution::experimental::start(op_state);

// Wait for the variant to be filled in.
loop.run();

return state.get_value();
}

// clang-format off
template <typename Sender,
HPX_CONCEPT_REQUIRES_(
hpx::execution::experimental::is_sender_v<Sender,
hpx::execution::experimental::detail::sync_wait_receiver_env>
)>
// clang-format on
friend HPX_FORCEINLINE auto tag_fallback_invoke(
sync_wait_with_variant_t, Sender&& sender)
{
using receiver_type =
hpx::execution::experimental::detail::sync_wait_receiver<Sender,
Sync_wait_type::variant>;
using state_type = typename receiver_type::shared_state;

hpx::execution::experimental::run_loop loop{};
state_type state{};
auto op_state = hpx::execution::experimental::connect(
HPX_FORWARD(Sender, sender), receiver_type{state, loop});
hpx::execution::experimental::start(op_state);

// Wait for the variant to be filled in.
loop.run();

return state.get_value();
}

// clang-format off
template <typename Scheduler,
HPX_CONCEPT_REQUIRES_(
hpx::execution::experimental::is_scheduler_v<Scheduler>
)>
// clang-format on
friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
sync_wait_with_variant_t, Scheduler&& scheduler)
{
return hpx::execution::experimental::detail::inject_scheduler<
sync_wait_with_variant_t, Scheduler>{
HPX_FORWARD(Scheduler, scheduler)};
}

friend constexpr HPX_FORCEINLINE auto tag_fallback_invoke(
sync_wait_with_variant_t)
{
return hpx::execution::experimental::detail::partial_algorithm<
sync_wait_with_variant_t>{};
}
} sync_wait_with_variant{};
} // namespace hpx::this_thread::experimental
1 change: 1 addition & 0 deletions libs/core/execution/tests/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ set(tests
algorithm_split
algorithm_start_detached
algorithm_sync_wait
algorithm_sync_wait_with_variant
algorithm_then
algorithm_transfer
algorithm_transfer_just
Expand Down
Loading

0 comments on commit 9fff673

Please sign in to comment.