Skip to content

Commit

Permalink
Disable choice of require_started error mode when using stdexec
Browse files Browse the repository at this point in the history
  • Loading branch information
msimberg committed Nov 27, 2023
1 parent cfd854e commit eb85c98
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,42 @@
#include <utility>

namespace pika {
#if !defined(PIKA_HAVE_STDEXEC)
namespace execution::experimental {
// We only make the choice of mode available when not using stdexec. stdexec's sender
// concepts require nothrow destructibility, which is not satisfied by throw_on_unstarted.
// With stdexec enabled, an unstarted sender will always terminate.
enum class require_started_mode
{
terminate_on_unstarted,
throw_on_unstarted
};
}
} // namespace execution::experimental
#endif

namespace require_started_detail {
#if defined(PIKA_HAVE_STDEXEC)
# define PIKA_DETAIL_HANDLE_UNSTARTED_REQUIRE_STARTED_SENDER(f, message) \
fmt::print(std::cerr, "{}: {}\n", f, message); \
std::terminate();
#else
using pika::execution::experimental::require_started_mode;

#define PIKA_DETAIL_HANDLE_UNSTARTED_REQUIRE_STARTED_SENDER(mode, f, message) \
{ \
switch (mode) \
# define PIKA_DETAIL_HANDLE_UNSTARTED_REQUIRE_STARTED_SENDER(mode, f, message) \
{ \
case require_started_mode::terminate_on_unstarted: \
fmt::print(std::cerr, "{}: {}\n", f, message); \
std::terminate(); \
break; \
switch (mode) \
{ \
case require_started_mode::terminate_on_unstarted: \
fmt::print(std::cerr, "{}: {}\n", f, message); \
std::terminate(); \
break; \
\
case require_started_mode::throw_on_unstarted: \
PIKA_THROW_EXCEPTION(pika::error::invalid_status, f, fmt::runtime(message)); \
break; \
} \
}
case require_started_mode::throw_on_unstarted: \
PIKA_THROW_EXCEPTION(pika::error::invalid_status, f, fmt::runtime(message)); \
break; \
} \
}
#endif

template <typename OpState>
struct require_started_receiver_impl
Expand Down Expand Up @@ -126,30 +137,47 @@ namespace pika {

PIKA_NO_UNIQUE_ADDRESS std::decay_t<Receiver> receiver;
std::optional<operation_state_type> op_state{std::nullopt};
#if !defined(PIKA_HAVE_STDEXEC)
require_started_mode mode{require_started_mode::terminate_on_unstarted};

Check notice on line 141 in libs/pika/execution/include/pika/execution/algorithms/require_started.hpp

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

libs/pika/execution/include/pika/execution/algorithms/require_started.hpp#L141

struct member 'require_started_op_state_type::mode' is never used.
#endif
bool started{false};

template <typename Receiver_>
require_started_op_state_type(
std::decay_t<Sender> sender, Receiver_&& receiver, require_started_mode mode)
require_started_op_state_type(std::decay_t<Sender> sender, Receiver_&& receiver
#if !defined(PIKA_HAVE_STDEXEC)
,
require_started_mode mode
#endif
)
: receiver(PIKA_FORWARD(Receiver_, receiver))
, op_state(pika::detail::with_result_of([&]() {
return pika::execution::experimental::connect(PIKA_MOVE(sender),
require_started_receiver<require_started_op_state_type>{this});
}))
#if !defined(PIKA_HAVE_STDEXEC)
, mode(mode)

Check failure on line 158 in libs/pika/execution/include/pika/execution/algorithms/require_started.hpp

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

libs/pika/execution/include/pika/execution/algorithms/require_started.hpp#L158

Member variable 'mode' is initialized by itself.
#endif
{
}

~require_started_op_state_type() noexcept(false)
~require_started_op_state_type()
#if !defined(PIKA_HAVE_STDEXEC)
noexcept(false)
#endif
{
if (!started)
{
op_state.reset();

#if defined(PIKA_HAVE_STDEXEC)
PIKA_DETAIL_HANDLE_UNSTARTED_REQUIRE_STARTED_SENDER(
"pika::execution::experimental::~require_started_operation_state",
"The operation state of a require_started sender was never started");
#else
PIKA_DETAIL_HANDLE_UNSTARTED_REQUIRE_STARTED_SENDER(mode,
"pika::execution::experimental::~require_started_operation_state",
"The operation state of a require_started sender was never started");
#endif
}
}
require_started_op_state_type(require_started_op_state_type&) = delete;
Expand Down Expand Up @@ -183,7 +211,9 @@ namespace pika {
using is_sender = void;

std::optional<std::decay_t<Sender>> sender{std::nullopt};
#if !defined(PIKA_HAVE_STDEXEC)
require_started_mode mode{require_started_mode::terminate_on_unstarted};
#endif
mutable bool connected{false};

#if defined(PIKA_HAVE_STDEXEC)
Expand All @@ -205,16 +235,24 @@ namespace pika {
template <typename Sender_,
typename Enable = std::enable_if_t<
!std::is_same_v<std::decay_t<Sender_>, require_started_sender_type>>>
explicit require_started_sender_type(Sender_&& sender,
require_started_mode mode = require_started_mode::terminate_on_unstarted)
explicit require_started_sender_type(Sender_&& sender
#if !defined(PIKA_HAVE_STDEXEC)
,
require_started_mode mode = require_started_mode::terminate_on_unstarted
#endif
)
: sender(PIKA_FORWARD(Sender_, sender))
#if !defined(PIKA_HAVE_STDEXEC)
, mode(mode)
#endif
{
}

require_started_sender_type(require_started_sender_type&& other) noexcept
: sender(std::exchange(other.sender, std::nullopt))
#if !defined(PIKA_HAVE_STDEXEC)
, mode(other.mode)
#endif
, connected(other.connected)
{
}
Expand All @@ -225,23 +263,35 @@ namespace pika {
{
sender.reset();

#if defined(PIKA_HAVE_STDEXEC)
PIKA_DETAIL_HANDLE_UNSTARTED_REQUIRE_STARTED_SENDER(
"pika::execution::experimental::require_started_sender::operator=(require_"
"started_sender&&)",
"Assigning to a require_started sender that was never started, the target "
"would be discarded");
#else
PIKA_DETAIL_HANDLE_UNSTARTED_REQUIRE_STARTED_SENDER(mode,
"pika::execution::experimental::require_started_sender::operator=(require_"
"started_sender&&)",
"Assigning to a require_started sender that was never started, the target "
"would be discarded");
#endif
}

sender = std::exchange(other.sender, std::nullopt);
#if !defined(PIKA_HAVE_STDEXEC)
mode = other.mode;
#endif
connected = other.connected;

return *this;
}

require_started_sender_type(require_started_sender_type const& other)
: sender(other.sender)
#if !defined(PIKA_HAVE_STDEXEC)
, mode(other.mode)
#endif
, connected(false)
{
}
Expand All @@ -252,29 +302,48 @@ namespace pika {
{
sender.reset();

#if defined(PIKA_HAVE_STDEXEC)
PIKA_DETAIL_HANDLE_UNSTARTED_REQUIRE_STARTED_SENDER(
"pika::execution::experimental::require_started_sender::operator=(require_"
"started_sender const&)",
"Assigning to a require_started sender that was never started, the target "
"would be discarded");
#else
PIKA_DETAIL_HANDLE_UNSTARTED_REQUIRE_STARTED_SENDER(mode,
"pika::execution::experimental::require_started_sender::operator=(require_"
"started_sender const&)",
"Assigning to a require_started sender that was never started, the target "
"would be discarded");
#endif
}

sender = other.sender;
#if !defined(PIKA_HAVE_STDEXEC)
mode = other.mode;
#endif
connected = false;

return *this;
}

~require_started_sender_type() noexcept(false)
~require_started_sender_type()
#if !defined(PIKA_HAVE_STDEXEC)
noexcept(false)
#endif
{
if (sender.has_value() && !connected)
{
sender.reset();

#if defined(PIKA_HAVE_STDEXEC)
PIKA_DETAIL_HANDLE_UNSTARTED_REQUIRE_STARTED_SENDER(
"pika::execution::experimental::~require_started_sender",
"A require_started sender was never started");
#else
PIKA_DETAIL_HANDLE_UNSTARTED_REQUIRE_STARTED_SENDER(mode,
"pika::execution::experimental::~require_started_sender",
"A require_started sender was never started");
#endif
}
}

Expand All @@ -285,14 +354,26 @@ namespace pika {
{
if (!s.sender.has_value())
{
#if defined(PIKA_HAVE_STDEXEC)
PIKA_DETAIL_HANDLE_UNSTARTED_REQUIRE_STARTED_SENDER(s.mode,

Check failure on line 358 in libs/pika/execution/include/pika/execution/algorithms/require_started.hpp

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

libs/pika/execution/include/pika/execution/algorithms/require_started.hpp#L358

failed to expand 'PIKA_DETAIL_HANDLE_UNSTARTED_REQUIRE_STARTED_SENDER', Wrong number of parameters for macro 'PIKA_DETAIL_HANDLE_UNSTARTED_REQUIRE_STARTED_SENDER'.
"pika::execution::experimental::connect(require_started_sender&&)",
"Trying to connect an empty require_started sender");
#else
PIKA_DETAIL_HANDLE_UNSTARTED_REQUIRE_STARTED_SENDER(s.mode,
"pika::execution::experimental::connect(require_started_sender&&)",
"Trying to connect an empty require_started sender");
#endif
}

s.connected = true;
return {std::exchange(s.sender, std::nullopt).value(),
PIKA_FORWARD(Receiver, receiver), s.mode};
return
{
std::exchange(s.sender, std::nullopt).value(), PIKA_FORWARD(Receiver, receiver)
#if !defined(PIKA_HAVE_STDEXEC)
,
s.mode
#endif
};
}

template <typename Receiver>
Expand All @@ -302,17 +383,32 @@ namespace pika {
{
if (!s.sender.has_value())
{
#if defined(PIKA_HAVE_STDEXEC)
PIKA_DETAIL_HANDLE_UNSTARTED_REQUIRE_STARTED_SENDER(
"pika::execution::experimental::connect(require_started_sender const&)",
"Trying to connect an empty require_started sender");
#else
PIKA_DETAIL_HANDLE_UNSTARTED_REQUIRE_STARTED_SENDER(s.mode,
"pika::execution::experimental::connect(require_started_sender const&)",
"Trying to connect an empty require_started sender");
#endif
}

s.connected = true;
return {s.sender.value(), PIKA_FORWARD(Receiver, receiver), s.mode};
return
{
s.sender.value(), PIKA_FORWARD(Receiver, receiver)
#if !defined(PIKA_HAVE_STDEXEC)
,
s.mode
#endif
};
}

void discard() noexcept { connected = true; }
#if !defined(PIKA_HAVE_STDEXEC)
void set_mode(require_started_mode mode) noexcept { this->mode = mode; }
#endif
};

#undef PIKA_DETAIL_HANDLE_UNSTARTED_REQUIRE_STARTED_SENDER
Expand All @@ -322,11 +418,20 @@ namespace pika {
inline constexpr struct require_started_t final
{
template <typename Sender, PIKA_CONCEPT_REQUIRES_(is_sender_v<Sender>)>
constexpr PIKA_FORCEINLINE auto operator()(Sender&& sender,
require_started_mode mode = require_started_mode::terminate_on_unstarted) const
constexpr PIKA_FORCEINLINE auto operator()(Sender&& sender
#if !defined(PIKA_HAVE_STDEXEC)
,
require_started_mode mode = require_started_mode::terminate_on_unstarted
#endif
) const
{
return require_started_detail::require_started_sender<Sender>{
PIKA_FORWARD(Sender, sender), mode};
return require_started_detail::require_started_sender<Sender>
{
PIKA_FORWARD(Sender, sender)
#if !defined(PIKA_HAVE_STDEXEC)
, mode
#endif
};
}

constexpr PIKA_FORCEINLINE auto operator()() const
Expand Down
7 changes: 6 additions & 1 deletion libs/pika/execution/tests/unit/algorithm_require_started.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ enum class expect_exception
};

// Check that the sender throws if it's not connected or the operation state isn't started
void check_exception(expect_exception e, auto test) {
void check_exception(expect_exception e, auto test)
{
try
{
test();
Expand Down Expand Up @@ -233,6 +234,7 @@ void discard_if_required(S& s, exception_test_mode mode)

void test_exception(exception_test_mode mode)
{
#if !defined(PIKA_HAVE_STDEXEC)
// When the mode is no_discard we expect exceptions, otherwise we don't expect exceptions
check_exception(
mode == exception_test_mode::discard ? expect_exception::no : expect_exception::yes, [=] {
Expand Down Expand Up @@ -306,10 +308,12 @@ void test_exception(exception_test_mode mode)
discard_if_required(rs2, mode);
rs2 = rs1;
});
#endif
}

void test_unstarted()
{
#if !defined(PIKA_HAVE_STDEXEC)
// Connected, but not started
check_exception(expect_exception::yes, [] {
auto rs = ex::require_started(ex::just(), ex::require_started_mode::throw_on_unstarted);
Expand All @@ -327,6 +331,7 @@ void test_unstarted()
auto os = ex::connect(rs, std::move(r));
tt::sync_wait(std::move(rs));
});
#endif
}

int main()
Expand Down

0 comments on commit eb85c98

Please sign in to comment.