Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove operator| overloads for sync_wait and start_detached #346

Merged
merged 1 commit into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,8 @@ void test_for_each_sender(ExPolicy&& p, IteratorTag)
auto rng = pika::util::make_iterator_range(
iterator(std::begin(c)), iterator(std::end(c)));
auto f = [](std::size_t& v) { v = 42; };
auto result = ex::just(rng, f) |
pika::ranges::for_each(std::forward<ExPolicy>(p)) | tt::sync_wait();
auto result = tt::sync_wait(
ex::just(rng, f) | pika::ranges::for_each(std::forward<ExPolicy>(p)));
PIKA_TEST(result == iterator(std::end(c)));

// verify values
Expand Down Expand Up @@ -380,8 +380,8 @@ void test_for_each_exception_sender(ExPolicy p, IteratorTag)
bool caught_exception = false;
try
{
ex::just(rng, f) | pika::ranges::for_each(std::forward<ExPolicy>(p)) |
tt::sync_wait();
tt::sync_wait(ex::just(rng, f) |
pika::ranges::for_each(std::forward<ExPolicy>(p)));

PIKA_TEST(false);
}
Expand Down Expand Up @@ -417,8 +417,8 @@ void test_for_each_bad_alloc_sender(ExPolicy p, IteratorTag)
bool caught_exception = false;
try
{
ex::just(rng, f) | pika::ranges::for_each(std::forward<ExPolicy>(p)) |
tt::sync_wait();
tt::sync_wait(ex::just(rng, f) |
pika::ranges::for_each(std::forward<ExPolicy>(p)));

PIKA_TEST(false);
}
Expand Down
26 changes: 13 additions & 13 deletions libs/pika/async_cuda/tests/performance/synchronize.cu
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ int pika_main(pika::program_options::variables_map& vm)
pika::chrono::detail::high_resolution_timer timer;
for (std::size_t i = 0; i != iterations; ++i)
{
ex::schedule(sched) | cu::then_with_stream(f) | tt::sync_wait();
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f));
}
double elapsed = timer.elapsed();
std::cout
Expand All @@ -111,17 +111,17 @@ int pika_main(pika::program_options::variables_map& vm)
// We have to manually unroll this loop, because the type of the
// sender changes for each additional then_with_stream call. The
// number of unrolled calls must match batch_size above.
ex::schedule(sched) | cu::then_with_stream(f) |
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) | tt::sync_wait();
cu::then_with_stream(f));
}
// Do the remainder one-by-one
for (std::size_t i = 0; i < non_batch_iterations; ++i)
{
ex::schedule(sched) | cu::then_with_stream(f) | tt::sync_wait();
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f));
}
double elapsed = timer.elapsed();
std::cout
Expand All @@ -145,7 +145,7 @@ int pika_main(pika::program_options::variables_map& vm)
// intentionally insert dummy then([]{}) calls between the
// then_with_stream calls to force synchronization between the
// kernel launches.
ex::schedule(sched) | cu::then_with_stream(f) |
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | ex::then([] {}) |
ex::transfer(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | ex::then([] {}) |
Expand All @@ -163,12 +163,12 @@ int pika_main(pika::program_options::variables_map& vm)
ex::transfer(ex::thread_pool_scheduler{}) | ex::then([] {}) |
ex::transfer(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | ex::then([] {}) |
ex::transfer(sched) | cu::then_with_stream(f) | tt::sync_wait();
ex::transfer(sched) | cu::then_with_stream(f));
}
// Do the remainder one-by-one
for (std::size_t i = 0; i < non_batch_iterations; ++i)
{
ex::schedule(sched) | cu::then_with_stream(f) | tt::sync_wait();
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f));
}
double elapsed = timer.elapsed();
std::cout
Expand All @@ -186,8 +186,8 @@ int pika_main(pika::program_options::variables_map& vm)
pika::chrono::detail::high_resolution_timer timer;
for (std::size_t i = 0; i != iterations; ++i)
{
ex::schedule(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | tt::sync_wait();
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}));
}
double elapsed = timer.elapsed();
std::cout
Expand All @@ -208,19 +208,19 @@ int pika_main(pika::program_options::variables_map& vm)
// We have to manually unroll this loop, because the type of the
// sender changes for each additional then_with_stream call. The
// number of unrolled calls must match batch_size above.
ex::schedule(sched) | cu::then_with_stream(f) |
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | tt::sync_wait();
ex::transfer(ex::thread_pool_scheduler{}));
}
// Do the remainder one-by-one
for (std::size_t i = 0; i < non_batch_iterations; ++i)
{
ex::schedule(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | tt::sync_wait();
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}));
}
double elapsed = timer.elapsed();
std::cout
Expand Down
12 changes: 6 additions & 6 deletions libs/pika/async_cuda/tests/unit/then_with_stream.cu
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ int pika_main()
{
cudaStream_t first_stream{};
cudaStream_t second_stream{};
ex::schedule(cu::cuda_scheduler{pool}) |
tt::sync_wait(ex::schedule(cu::cuda_scheduler{pool}) |
cu::then_with_stream(
[&](cudaStream_t stream) { first_stream = stream; }) |
cu::then_with_stream([&](cudaStream_t stream) {
Expand All @@ -394,8 +394,7 @@ int pika_main()
}) |
cu::then_with_stream([&](cudaStream_t stream) {
PIKA_TEST_EQ(stream, second_stream);
}) |
tt::sync_wait();
}));
}

{
Expand All @@ -414,14 +413,15 @@ int pika_main()
cu::then_with_stream(increment{}) |
cu::then_with_stream(increment{}) |
cu::then_with_stream(increment{});
ex::when_all(ex::just(&p_h), std::move(s), ex::just(sizeof(type)),
ex::just(cudaMemcpyDeviceToHost)) |
tt::sync_wait(
ex::when_all(ex::just(&p_h), std::move(s), ex::just(sizeof(type)),
ex::just(cudaMemcpyDeviceToHost)) |
ex::transfer(cu::cuda_scheduler{pool}) |
cu::then_with_stream(cuda_memcpy_async{}) |
ex::transfer(ex::thread_pool_scheduler{}) |
ex::then(&cu::check_cuda_error) |
ex::then([&p_h] { PIKA_TEST_EQ(p_h, 3); }) |
ex::transfer(ex::thread_pool_scheduler{}) | tt::sync_wait();
ex::transfer(ex::thread_pool_scheduler{}));

cu::check_cuda_error(cudaFree(p));
}
Expand Down
20 changes: 9 additions & 11 deletions libs/pika/async_mpi/tests/unit/algorithm_transform_mpi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,9 @@ int pika_main()
{
data = 42;
}
auto result = ex::just(&data, count, datatype, 0, comm) |
mpi::transform_mpi(MPI_Ibcast) | tt::sync_wait();
auto result =
tt::sync_wait(ex::just(&data, count, datatype, 0, comm) |
mpi::transform_mpi(MPI_Ibcast));
if (rank != 0)
{
PIKA_TEST_EQ(data, 42);
Expand All @@ -156,10 +157,9 @@ int pika_main()
bool exception_thrown = false;
try
{
mpi::transform_mpi(
tt::sync_wait(mpi::transform_mpi(
error_sender<int*, int, MPI_Datatype, int, MPI_Comm>{},
MPI_Ibcast) |
tt::sync_wait();
MPI_Ibcast));
PIKA_TEST(false);
}
catch (std::runtime_error const& e)
Expand Down Expand Up @@ -203,9 +203,8 @@ int pika_main()
bool exception_thrown = false;
try
{
mpi::transform_mpi(
ex::just(data, count, datatype, -1, comm), MPI_Ibcast) |
tt::sync_wait();
tt::sync_wait(mpi::transform_mpi(
ex::just(data, count, datatype, -1, comm), MPI_Ibcast));
PIKA_TEST(false);
}
catch (std::runtime_error const& e)
Expand All @@ -228,9 +227,8 @@ int pika_main()
bool exception_thrown = false;
try
{
mpi::transform_mpi(
ex::just(data, count, datatype, -1, comm), MPI_Ibcast) |
tt::sync_wait();
tt::sync_wait(mpi::transform_mpi(
ex::just(data, count, datatype, -1, comm), MPI_Ibcast));
PIKA_TEST(false);
}
catch (std::runtime_error const&)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#include <pika/allocator_support/traits/is_allocator.hpp>
#include <pika/assert.hpp>
#include <pika/concepts/concepts.hpp>
#include <pika/execution/algorithms/detail/partial_algorithm.hpp>
#include <pika/execution_base/operation_state.hpp>
#include <pika/execution_base/sender.hpp>
#include <pika/functional/detail/tag_fallback_invoke.hpp>
Expand Down Expand Up @@ -154,19 +153,6 @@ namespace pika { namespace execution { namespace experimental {
operation_state_type{PIKA_FORWARD(Sender, sender), alloc};
PIKA_UNUSED(p.release());
}

// clang-format off
template <typename Allocator = pika::detail::internal_allocator<>,
PIKA_CONCEPT_REQUIRES_(
pika::detail::is_allocator_v<Allocator>
)>
// clang-format on
friend constexpr PIKA_FORCEINLINE auto tag_fallback_invoke(
start_detached_t, Allocator const& allocator = Allocator{})
{
return detail::partial_algorithm<start_detached_t, Allocator>{
allocator};
}
} start_detached{};
}}} // namespace pika::execution::experimental
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#include <pika/concepts/concepts.hpp>
#include <pika/datastructures/variant.hpp>
#include <pika/execution/algorithms/detail/partial_algorithm.hpp>
#include <pika/execution/algorithms/detail/single_result.hpp>
#include <pika/execution_base/operation_state.hpp>
#include <pika/execution_base/receiver.hpp>
Expand Down Expand Up @@ -268,11 +267,5 @@ namespace pika::this_thread::experimental {
state.wait();
return state.get_value();
}

friend constexpr PIKA_FORCEINLINE auto tag_fallback_invoke(sync_wait_t)
{
return pika::execution::experimental::detail::partial_algorithm<
sync_wait_t>{};
}
} sync_wait{};
} // namespace pika::this_thread::experimental
5 changes: 2 additions & 3 deletions libs/pika/execution/tests/unit/algorithm_start_detached.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ int main()
std::atomic<bool> start_called{false};
std::atomic<bool> connect_called{false};
std::atomic<bool> tag_invoke_overload_called{false};
custom_sender{
start_called, connect_called, tag_invoke_overload_called} |
ex::start_detached();
ex::start_detached(custom_sender{
start_called, connect_called, tag_invoke_overload_called});
PIKA_TEST(start_called);
PIKA_TEST(connect_called);
PIKA_TEST(!tag_invoke_overload_called);
Expand Down
7 changes: 3 additions & 4 deletions libs/pika/execution/tests/unit/algorithm_sync_wait.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,15 @@ int pika_main()
std::atomic<bool> start_called{false};
std::atomic<bool> connect_called{false};
std::atomic<bool> tag_invoke_overload_called{false};
custom_sender{
start_called, connect_called, tag_invoke_overload_called} |
tt::sync_wait();
tt::sync_wait(custom_sender{
start_called, connect_called, tag_invoke_overload_called});
PIKA_TEST(start_called);
PIKA_TEST(connect_called);
PIKA_TEST(!tag_invoke_overload_called);
}

{
PIKA_TEST_EQ(ex::just(3) | tt::sync_wait(), 3);
PIKA_TEST_EQ(tt::sync_wait(ex::just(3)), 3);
}

// tag_invoke overload
Expand Down
Loading