diff --git a/libs/pika/algorithms/tests/unit/container_algorithms/foreach_tests.hpp b/libs/pika/algorithms/tests/unit/container_algorithms/foreach_tests.hpp index ba6173288..bd86eba56 100644 --- a/libs/pika/algorithms/tests/unit/container_algorithms/foreach_tests.hpp +++ b/libs/pika/algorithms/tests/unit/container_algorithms/foreach_tests.hpp @@ -348,8 +348,8 @@ void test_for_each_sender(ExPolicy&& p, IteratorTag) auto rng = pika::util::make_iterator_range( iterator(std::begin(c)), iterator(std::end(c))); auto f = [](std::size_t& v) { v = 42; }; - auto result = ex::just(rng, f) | - pika::ranges::for_each(std::forward(p)) | tt::sync_wait(); + auto result = tt::sync_wait( + ex::just(rng, f) | pika::ranges::for_each(std::forward(p))); PIKA_TEST(result == iterator(std::end(c))); // verify values @@ -380,8 +380,8 @@ void test_for_each_exception_sender(ExPolicy p, IteratorTag) bool caught_exception = false; try { - ex::just(rng, f) | pika::ranges::for_each(std::forward(p)) | - tt::sync_wait(); + tt::sync_wait(ex::just(rng, f) | + pika::ranges::for_each(std::forward(p))); PIKA_TEST(false); } @@ -417,8 +417,8 @@ void test_for_each_bad_alloc_sender(ExPolicy p, IteratorTag) bool caught_exception = false; try { - ex::just(rng, f) | pika::ranges::for_each(std::forward(p)) | - tt::sync_wait(); + tt::sync_wait(ex::just(rng, f) | + pika::ranges::for_each(std::forward(p))); PIKA_TEST(false); } diff --git a/libs/pika/async_cuda/tests/performance/synchronize.cu b/libs/pika/async_cuda/tests/performance/synchronize.cu index fc2ed60f6..a51198a16 100644 --- a/libs/pika/async_cuda/tests/performance/synchronize.cu +++ b/libs/pika/async_cuda/tests/performance/synchronize.cu @@ -90,7 +90,7 @@ int pika_main(pika::program_options::variables_map& vm) pika::chrono::detail::high_resolution_timer timer; for (std::size_t i = 0; i != iterations; ++i) { - ex::schedule(sched) | cu::then_with_stream(f) | tt::sync_wait(); + tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f)); } double elapsed = timer.elapsed(); std::cout @@ -111,17 +111,17 @@ int pika_main(pika::program_options::variables_map& vm) // We have to manually unroll this loop, because the type of the // sender changes for each additional then_with_stream call. The // number of unrolled calls must match batch_size above. - ex::schedule(sched) | cu::then_with_stream(f) | + tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) | cu::then_with_stream(f) | cu::then_with_stream(f) | cu::then_with_stream(f) | cu::then_with_stream(f) | cu::then_with_stream(f) | cu::then_with_stream(f) | cu::then_with_stream(f) | cu::then_with_stream(f) | - cu::then_with_stream(f) | tt::sync_wait(); + cu::then_with_stream(f)); } // Do the remainder one-by-one for (std::size_t i = 0; i < non_batch_iterations; ++i) { - ex::schedule(sched) | cu::then_with_stream(f) | tt::sync_wait(); + tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f)); } double elapsed = timer.elapsed(); std::cout @@ -145,7 +145,7 @@ int pika_main(pika::program_options::variables_map& vm) // intentionally insert dummy then([]{}) calls between the // then_with_stream calls to force synchronization between the // kernel launches. - ex::schedule(sched) | cu::then_with_stream(f) | + tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) | ex::transfer(ex::thread_pool_scheduler{}) | ex::then([] {}) | ex::transfer(sched) | cu::then_with_stream(f) | ex::transfer(ex::thread_pool_scheduler{}) | ex::then([] {}) | @@ -163,12 +163,12 @@ int pika_main(pika::program_options::variables_map& vm) ex::transfer(ex::thread_pool_scheduler{}) | ex::then([] {}) | ex::transfer(sched) | cu::then_with_stream(f) | ex::transfer(ex::thread_pool_scheduler{}) | ex::then([] {}) | - ex::transfer(sched) | cu::then_with_stream(f) | tt::sync_wait(); + ex::transfer(sched) | cu::then_with_stream(f)); } // Do the remainder one-by-one for (std::size_t i = 0; i < non_batch_iterations; ++i) { - ex::schedule(sched) | cu::then_with_stream(f) | tt::sync_wait(); + tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f)); } double elapsed = timer.elapsed(); std::cout @@ -186,8 +186,8 @@ int pika_main(pika::program_options::variables_map& vm) pika::chrono::detail::high_resolution_timer timer; for (std::size_t i = 0; i != iterations; ++i) { - ex::schedule(sched) | cu::then_with_stream(f) | - ex::transfer(ex::thread_pool_scheduler{}) | tt::sync_wait(); + tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) | + ex::transfer(ex::thread_pool_scheduler{})); } double elapsed = timer.elapsed(); std::cout @@ -208,19 +208,19 @@ int pika_main(pika::program_options::variables_map& vm) // We have to manually unroll this loop, because the type of the // sender changes for each additional then_with_stream call. The // number of unrolled calls must match batch_size above. - ex::schedule(sched) | cu::then_with_stream(f) | + tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) | cu::then_with_stream(f) | cu::then_with_stream(f) | cu::then_with_stream(f) | cu::then_with_stream(f) | cu::then_with_stream(f) | cu::then_with_stream(f) | cu::then_with_stream(f) | cu::then_with_stream(f) | cu::then_with_stream(f) | - ex::transfer(ex::thread_pool_scheduler{}) | tt::sync_wait(); + ex::transfer(ex::thread_pool_scheduler{})); } // Do the remainder one-by-one for (std::size_t i = 0; i < non_batch_iterations; ++i) { - ex::schedule(sched) | cu::then_with_stream(f) | - ex::transfer(ex::thread_pool_scheduler{}) | tt::sync_wait(); + tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) | + ex::transfer(ex::thread_pool_scheduler{})); } double elapsed = timer.elapsed(); std::cout diff --git a/libs/pika/async_cuda/tests/unit/then_with_stream.cu b/libs/pika/async_cuda/tests/unit/then_with_stream.cu index aedc89df6..62af63078 100644 --- a/libs/pika/async_cuda/tests/unit/then_with_stream.cu +++ b/libs/pika/async_cuda/tests/unit/then_with_stream.cu @@ -375,7 +375,7 @@ int pika_main() { cudaStream_t first_stream{}; cudaStream_t second_stream{}; - ex::schedule(cu::cuda_scheduler{pool}) | + tt::sync_wait(ex::schedule(cu::cuda_scheduler{pool}) | cu::then_with_stream( [&](cudaStream_t stream) { first_stream = stream; }) | cu::then_with_stream([&](cudaStream_t stream) { @@ -394,8 +394,7 @@ int pika_main() }) | cu::then_with_stream([&](cudaStream_t stream) { PIKA_TEST_EQ(stream, second_stream); - }) | - tt::sync_wait(); + })); } { @@ -414,14 +413,15 @@ int pika_main() cu::then_with_stream(increment{}) | cu::then_with_stream(increment{}) | cu::then_with_stream(increment{}); - ex::when_all(ex::just(&p_h), std::move(s), ex::just(sizeof(type)), - ex::just(cudaMemcpyDeviceToHost)) | + tt::sync_wait( + ex::when_all(ex::just(&p_h), std::move(s), ex::just(sizeof(type)), + ex::just(cudaMemcpyDeviceToHost)) | ex::transfer(cu::cuda_scheduler{pool}) | cu::then_with_stream(cuda_memcpy_async{}) | ex::transfer(ex::thread_pool_scheduler{}) | ex::then(&cu::check_cuda_error) | ex::then([&p_h] { PIKA_TEST_EQ(p_h, 3); }) | - ex::transfer(ex::thread_pool_scheduler{}) | tt::sync_wait(); + ex::transfer(ex::thread_pool_scheduler{})); cu::check_cuda_error(cudaFree(p)); } diff --git a/libs/pika/async_mpi/tests/unit/algorithm_transform_mpi.cpp b/libs/pika/async_mpi/tests/unit/algorithm_transform_mpi.cpp index 0947a98f7..5f0f42b13 100644 --- a/libs/pika/async_mpi/tests/unit/algorithm_transform_mpi.cpp +++ b/libs/pika/async_mpi/tests/unit/algorithm_transform_mpi.cpp @@ -138,8 +138,9 @@ int pika_main() { data = 42; } - auto result = ex::just(&data, count, datatype, 0, comm) | - mpi::transform_mpi(MPI_Ibcast) | tt::sync_wait(); + auto result = + tt::sync_wait(ex::just(&data, count, datatype, 0, comm) | + mpi::transform_mpi(MPI_Ibcast)); if (rank != 0) { PIKA_TEST_EQ(data, 42); @@ -156,10 +157,9 @@ int pika_main() bool exception_thrown = false; try { - mpi::transform_mpi( + tt::sync_wait(mpi::transform_mpi( error_sender{}, - MPI_Ibcast) | - tt::sync_wait(); + MPI_Ibcast)); PIKA_TEST(false); } catch (std::runtime_error const& e) @@ -203,9 +203,8 @@ int pika_main() bool exception_thrown = false; try { - mpi::transform_mpi( - ex::just(data, count, datatype, -1, comm), MPI_Ibcast) | - tt::sync_wait(); + tt::sync_wait(mpi::transform_mpi( + ex::just(data, count, datatype, -1, comm), MPI_Ibcast)); PIKA_TEST(false); } catch (std::runtime_error const& e) @@ -228,9 +227,8 @@ int pika_main() bool exception_thrown = false; try { - mpi::transform_mpi( - ex::just(data, count, datatype, -1, comm), MPI_Ibcast) | - tt::sync_wait(); + tt::sync_wait(mpi::transform_mpi( + ex::just(data, count, datatype, -1, comm), MPI_Ibcast)); PIKA_TEST(false); } catch (std::runtime_error const&) diff --git a/libs/pika/execution/include/pika/execution/algorithms/start_detached.hpp b/libs/pika/execution/include/pika/execution/algorithms/start_detached.hpp index 54ee8ebfb..368c84203 100644 --- a/libs/pika/execution/include/pika/execution/algorithms/start_detached.hpp +++ b/libs/pika/execution/include/pika/execution/algorithms/start_detached.hpp @@ -16,7 +16,6 @@ #include #include #include -#include #include #include #include @@ -154,19 +153,6 @@ namespace pika { namespace execution { namespace experimental { operation_state_type{PIKA_FORWARD(Sender, sender), alloc}; PIKA_UNUSED(p.release()); } - - // clang-format off - template , - PIKA_CONCEPT_REQUIRES_( - pika::detail::is_allocator_v - )> - // clang-format on - friend constexpr PIKA_FORCEINLINE auto tag_fallback_invoke( - start_detached_t, Allocator const& allocator = Allocator{}) - { - return detail::partial_algorithm{ - allocator}; - } } start_detached{}; }}} // namespace pika::execution::experimental #endif diff --git a/libs/pika/execution/include/pika/execution/algorithms/sync_wait.hpp b/libs/pika/execution/include/pika/execution/algorithms/sync_wait.hpp index 989fea86e..c4cdbb198 100644 --- a/libs/pika/execution/include/pika/execution/algorithms/sync_wait.hpp +++ b/libs/pika/execution/include/pika/execution/algorithms/sync_wait.hpp @@ -14,7 +14,6 @@ #include #include -#include #include #include #include @@ -268,11 +267,5 @@ namespace pika::this_thread::experimental { state.wait(); return state.get_value(); } - - friend constexpr PIKA_FORCEINLINE auto tag_fallback_invoke(sync_wait_t) - { - return pika::execution::experimental::detail::partial_algorithm< - sync_wait_t>{}; - } } sync_wait{}; } // namespace pika::this_thread::experimental diff --git a/libs/pika/execution/tests/unit/algorithm_start_detached.cpp b/libs/pika/execution/tests/unit/algorithm_start_detached.cpp index 24a6ae868..7f38f7ae9 100644 --- a/libs/pika/execution/tests/unit/algorithm_start_detached.cpp +++ b/libs/pika/execution/tests/unit/algorithm_start_detached.cpp @@ -82,9 +82,8 @@ int main() std::atomic start_called{false}; std::atomic connect_called{false}; std::atomic tag_invoke_overload_called{false}; - custom_sender{ - start_called, connect_called, tag_invoke_overload_called} | - ex::start_detached(); + ex::start_detached(custom_sender{ + start_called, connect_called, tag_invoke_overload_called}); PIKA_TEST(start_called); PIKA_TEST(connect_called); PIKA_TEST(!tag_invoke_overload_called); diff --git a/libs/pika/execution/tests/unit/algorithm_sync_wait.cpp b/libs/pika/execution/tests/unit/algorithm_sync_wait.cpp index 7e2819c0a..8fcc3ef98 100644 --- a/libs/pika/execution/tests/unit/algorithm_sync_wait.cpp +++ b/libs/pika/execution/tests/unit/algorithm_sync_wait.cpp @@ -72,16 +72,15 @@ int pika_main() std::atomic start_called{false}; std::atomic connect_called{false}; std::atomic tag_invoke_overload_called{false}; - custom_sender{ - start_called, connect_called, tag_invoke_overload_called} | - tt::sync_wait(); + tt::sync_wait(custom_sender{ + start_called, connect_called, tag_invoke_overload_called}); PIKA_TEST(start_called); PIKA_TEST(connect_called); PIKA_TEST(!tag_invoke_overload_called); } { - PIKA_TEST_EQ(ex::just(3) | tt::sync_wait(), 3); + PIKA_TEST_EQ(tt::sync_wait(ex::just(3)), 3); } // tag_invoke overload diff --git a/libs/pika/executors/tests/unit/std_thread_scheduler.cpp b/libs/pika/executors/tests/unit/std_thread_scheduler.cpp index ba6aa9d05..8ae1892fe 100644 --- a/libs/pika/executors/tests/unit/std_thread_scheduler.cpp +++ b/libs/pika/executors/tests/unit/std_thread_scheduler.cpp @@ -477,15 +477,14 @@ void test_when_all() ex::when_all(std::move(work1), std::move(work2), std::move(work3)); bool executed{false}; - std::move(when1) | + tt::sync_wait(std::move(when1) | ex::then([parent_id, &executed](int x, std::string y, double z) { PIKA_TEST_NEQ(parent_id, std::this_thread::get_id()); PIKA_TEST_EQ(x, 42); PIKA_TEST_EQ(y, std::string("hello")); PIKA_TEST_EQ(z, 3.14); executed = true; - }) | - tt::sync_wait(); + })); PIKA_TEST(executed); } @@ -510,9 +509,8 @@ void test_when_all() try { - ex::when_all(std::move(work1), std::move(work2)) | - ex::then([](int, std::string) { PIKA_TEST(false); }) | - tt::sync_wait(); + tt::sync_wait(ex::when_all(std::move(work1), std::move(work2)) | + ex::then([](int, std::string) { PIKA_TEST(false); })); PIKA_TEST(false); } catch (std::runtime_error const& e) @@ -545,9 +543,8 @@ void test_when_all() try { - ex::when_all(std::move(work1), std::move(work2)) | - ex::then([](int, std::string) { PIKA_TEST(false); }) | - tt::sync_wait(); + tt::sync_wait(ex::when_all(std::move(work1), std::move(work2)) | + ex::then([](int, std::string) { PIKA_TEST(false); })); PIKA_TEST(false); } catch (std::runtime_error const& e) @@ -589,21 +586,21 @@ void test_when_all_vector() #if !defined(PIKA_HAVE_P2300_REFERENCE_IMPLEMENTATION) bool executed{false}; #endif - std::move(when1) | + tt::sync_wait(std::move(when1) // TODO: ADL issues? Uncommenting instantiates set_value with the // sync_wait_receiver from when_all_vector_receiver, i.e. then is // "skipped". #if !defined(PIKA_HAVE_P2300_REFERENCE_IMPLEMENTATION) - ex::then([parent_id, &executed](std::vector v) { - PIKA_TEST_NEQ(parent_id, std::this_thread::get_id()); - PIKA_TEST_EQ(v.size(), std::size_t(3)); - PIKA_TEST_EQ(v[0], 42.0); - PIKA_TEST_EQ(v[1], 43.0); - PIKA_TEST_EQ(v[2], 3.14); - executed = true; - }) | + | ex::then([parent_id, &executed](std::vector v) { + PIKA_TEST_NEQ(parent_id, std::this_thread::get_id()); + PIKA_TEST_EQ(v.size(), std::size_t(3)); + PIKA_TEST_EQ(v[0], 42.0); + PIKA_TEST_EQ(v[1], 43.0); + PIKA_TEST_EQ(v[2], 3.14); + executed = true; + }) #endif - tt::sync_wait(); + ); #if !defined(PIKA_HAVE_P2300_REFERENCE_IMPLEMENTATION) PIKA_TEST(executed); #endif @@ -632,14 +629,14 @@ void test_when_all_vector() try { - ex::when_all_vector(std::move(senders)) | + tt::sync_wait(ex::when_all_vector(std::move(senders)) // TODO: ADL issues? Uncommenting instantiates set_value with the // sync_wait_receiver from when_all_vector_receiver, i.e. then is // "skipped". #if !defined(PIKA_HAVE_P2300_REFERENCE_IMPLEMENTATION) - ex::then([](std::vector) { PIKA_TEST(false); }) | + | ex::then([](std::vector) { PIKA_TEST(false); }) #endif - tt::sync_wait(); + ); PIKA_TEST(false); } catch (std::runtime_error const& e) @@ -674,14 +671,14 @@ void test_when_all_vector() try { - ex::when_all_vector(std::move(senders)) | + tt::sync_wait(ex::when_all_vector(std::move(senders)) // TODO: ADL issues? Uncommenting instantiates set_value with the // sync_wait_receiver from when_all_vector_receiver, i.e. then is // "skipped". #if !defined(PIKA_HAVE_P2300_REFERENCE_IMPLEMENTATION) - ex::then([](std::vector) { PIKA_TEST(false); }) | + | ex::then([](std::vector) { PIKA_TEST(false); }) #endif - tt::sync_wait(); + ); PIKA_TEST(false); } catch (std::runtime_error const& e) @@ -701,7 +698,7 @@ void test_ensure_started() ex::std_thread_scheduler sched{}; { - ex::schedule(sched) | ex::ensure_started() | tt::sync_wait(); + tt::sync_wait(ex::schedule(sched) | ex::ensure_started()); } { @@ -762,9 +759,9 @@ void test_ensure_started_when_all() ++successor_task_calls; return 2; }); - PIKA_TEST_EQ(ex::when_all(succ1, succ2) | - ex::then([](int const& x, int const& y) { return x + y; }) | - tt::sync_wait(), + PIKA_TEST_EQ( + tt::sync_wait(ex::when_all(succ1, succ2) | + ex::then([](int const& x, int const& y) { return x + y; })), 3); PIKA_TEST_EQ(first_task_calls, std::size_t(1)); PIKA_TEST_EQ(successor_task_calls, std::size_t(2)); @@ -797,9 +794,9 @@ void test_ensure_started_when_all() ++successor_task_calls; return x + 2; }); - PIKA_TEST_EQ(ex::when_all(succ1, succ2) | - ex::then([](int const& x, int const& y) { return x + y; }) | - tt::sync_wait(), + PIKA_TEST_EQ( + tt::sync_wait(ex::when_all(succ1, succ2) | + ex::then([](int const& x, int const& y) { return x + y; })), 9); PIKA_TEST_EQ(first_task_calls, std::size_t(1)); PIKA_TEST_EQ(successor_task_calls, std::size_t(2)); @@ -831,9 +828,9 @@ void test_ensure_started_when_all() ++successor_task_calls; return x + 2; }); - PIKA_TEST_EQ(ex::when_all(succ1, succ2) | - ex::then([](int const& x, int const& y) { return x + y; }) | - tt::sync_wait(), + PIKA_TEST_EQ( + tt::sync_wait(ex::when_all(succ1, succ2) | + ex::then([](int const& x, int const& y) { return x + y; })), 9); PIKA_TEST_EQ(first_task_calls, std::size_t(1)); PIKA_TEST_EQ(successor_task_calls, std::size_t(2)); @@ -848,7 +845,7 @@ void test_split() ex::std_thread_scheduler sched{}; { - ex::schedule(sched) | ex::split() | tt::sync_wait(); + tt::sync_wait(ex::schedule(sched) | ex::split()); } { @@ -891,9 +888,9 @@ void test_split_when_all() ++successor_task_calls; return 2; }); - PIKA_TEST_EQ(ex::when_all(succ1, succ2) | - ex::then([](int const& x, int const& y) { return x + y; }) | - tt::sync_wait(), + PIKA_TEST_EQ( + tt::sync_wait(ex::when_all(succ1, succ2) | + ex::then([](int const& x, int const& y) { return x + y; })), 3); PIKA_TEST_EQ(first_task_calls, std::size_t(1)); PIKA_TEST_EQ(successor_task_calls, std::size_t(2)); @@ -914,9 +911,9 @@ void test_split_when_all() ++successor_task_calls; return x + 2; }); - PIKA_TEST_EQ(ex::when_all(succ1, succ2) | - ex::then([](int const& x, int const& y) { return x + y; }) | - tt::sync_wait(), + PIKA_TEST_EQ( + tt::sync_wait(ex::when_all(succ1, succ2) | + ex::then([](int const& x, int const& y) { return x + y; })), 9); PIKA_TEST_EQ(first_task_calls, std::size_t(1)); PIKA_TEST_EQ(successor_task_calls, std::size_t(2)); @@ -937,9 +934,9 @@ void test_split_when_all() ++successor_task_calls; return x + 2; }); - PIKA_TEST_EQ(ex::when_all(succ1, succ2) | - ex::then([](int const& x, int const& y) { return x + y; }) | - tt::sync_wait(), + PIKA_TEST_EQ( + tt::sync_wait(ex::when_all(succ1, succ2) | + ex::then([](int const& x, int const& y) { return x + y; })), 9); PIKA_TEST_EQ(first_task_calls, std::size_t(1)); PIKA_TEST_EQ(successor_task_calls, std::size_t(2)); @@ -953,67 +950,65 @@ void test_let_value() // void predecessor { - auto result = ex::schedule(sched) | - ex::let_value([]() { return ex::just(42); }) | tt::sync_wait(); + auto result = tt::sync_wait( + ex::schedule(sched) | ex::let_value([]() { return ex::just(42); })); PIKA_TEST_EQ(result, 42); } { - auto result = ex::schedule(sched) | - ex::let_value([=]() { return ex::transfer_just(sched, 42); }) | - tt::sync_wait(); + auto result = tt::sync_wait(ex::schedule(sched) | + ex::let_value([=]() { return ex::transfer_just(sched, 42); })); PIKA_TEST_EQ(result, 42); } { - auto result = ex::just() | - ex::let_value([=]() { return ex::transfer_just(sched, 42); }) | - tt::sync_wait(); + auto result = tt::sync_wait(ex::just() | + ex::let_value([=]() { return ex::transfer_just(sched, 42); })); PIKA_TEST_EQ(result, 42); } // int predecessor, value ignored { - auto result = ex::transfer_just(sched, 43) | - ex::let_value([](int&) { return ex::just(42); }) | tt::sync_wait(); + auto result = tt::sync_wait(ex::transfer_just(sched, 43) | + ex::let_value([](int&) { return ex::just(42); })); PIKA_TEST_EQ(result, 42); } { - auto result = ex::transfer_just(sched, 43) | - ex::let_value([=](int&) { return ex::transfer_just(sched, 42); }) | - tt::sync_wait(); + auto result = tt::sync_wait(ex::transfer_just(sched, 43) | + ex::let_value([=](int&) { return ex::transfer_just(sched, 42); })); PIKA_TEST_EQ(result, 42); } { - auto result = ex::just(43) | - ex::let_value([=](int&) { return ex::transfer_just(sched, 42); }) | - tt::sync_wait(); + auto result = tt::sync_wait(ex::just(43) | + ex::let_value([=](int&) { return ex::transfer_just(sched, 42); })); PIKA_TEST_EQ(result, 42); } // int predecessor, value used { - auto result = ex::transfer_just(sched, 43) | ex::let_value([](int& x) { - return ex::just(42) | ex::then([&](int y) { return x + y; }); - }) | tt::sync_wait(); + auto result = tt::sync_wait( + ex::transfer_just(sched, 43) | ex::let_value([](int& x) { + return ex::just(42) | ex::then([&](int y) { return x + y; }); + })); PIKA_TEST_EQ(result, 85); } { - auto result = ex::transfer_just(sched, 43) | ex::let_value([=](int& x) { - return ex::transfer_just(sched, 42) | - ex::then([&](int y) { return x + y; }); - }) | tt::sync_wait(); + auto result = tt::sync_wait( + ex::transfer_just(sched, 43) | ex::let_value([=](int& x) { + return ex::transfer_just(sched, 42) | + ex::then([&](int y) { return x + y; }); + })); PIKA_TEST_EQ(result, 85); } { - auto result = ex::just(43) | ex::let_value([=](int& x) { + auto result = tt::sync_wait(ex::just(43) | ex::let_value([=](int& x) { return ex::transfer_just(sched, 42) | ex::then([&](int y) { return x + y; }); - }) | tt::sync_wait(); + })); PIKA_TEST_EQ(result, 85); } @@ -1023,13 +1018,13 @@ void test_let_value() try { - ex::transfer_just(sched, 43) | ex::then([](int x) { + tt::sync_wait(ex::transfer_just(sched, 43) | ex::then([](int x) { throw std::runtime_error("error"); return x; }) | ex::let_value([](int&) { PIKA_TEST(false); return ex::just(0); - }) | tt::sync_wait(); + })); PIKA_TEST(false); } catch (std::runtime_error const& e) @@ -1065,100 +1060,99 @@ void test_let_error() // void predecessor { std::atomic called{false}; - ex::schedule(sched) | ex::then([]() { + tt::sync_wait(ex::schedule(sched) | ex::then([]() { throw std::runtime_error("error"); }) | ex::let_error([&called](std::exception_ptr& ep) { called = true; check_exception_ptr_message(ep, "error"); return ex::just(); - }) | tt::sync_wait(); + })); PIKA_TEST(called); } { std::atomic called{false}; - ex::schedule(sched) | ex::then([]() { + tt::sync_wait(ex::schedule(sched) | ex::then([]() { throw std::runtime_error("error"); }) | ex::let_error([=, &called](std::exception_ptr& ep) { called = true; check_exception_ptr_message(ep, "error"); return ex::transfer_just(sched); - }) | tt::sync_wait(); + })); PIKA_TEST(called); } { std::atomic called{false}; - ex::just() | ex::then([]() { throw std::runtime_error("error"); }) | - ex::let_error([=, &called](std::exception_ptr& ep) { - called = true; - check_exception_ptr_message(ep, "error"); - return ex::transfer_just(sched); - }) | - tt::sync_wait(); + tt::sync_wait(ex::just() | ex::then([]() { + throw std::runtime_error("error"); + }) | ex::let_error([=, &called](std::exception_ptr& ep) { + called = true; + check_exception_ptr_message(ep, "error"); + return ex::transfer_just(sched); + })); PIKA_TEST(called); } // int predecessor { - auto result = ex::schedule(sched) | ex::then([]() { + auto result = tt::sync_wait(ex::schedule(sched) | ex::then([]() { throw std::runtime_error("error"); return 43; }) | ex::let_error([](std::exception_ptr& ep) { check_exception_ptr_message(ep, "error"); return ex::just(42); - }) | tt::sync_wait(); + })); PIKA_TEST_EQ(result, 42); } { - auto result = ex::schedule(sched) | ex::then([]() { + auto result = tt::sync_wait(ex::schedule(sched) | ex::then([]() { throw std::runtime_error("error"); return 43; }) | ex::let_error([=](std::exception_ptr& ep) { check_exception_ptr_message(ep, "error"); return ex::transfer_just(sched, 42); - }) | tt::sync_wait(); + })); PIKA_TEST_EQ(result, 42); } { - auto result = ex::just() | ex::then([]() { + auto result = tt::sync_wait(ex::just() | ex::then([]() { throw std::runtime_error("error"); return 43; }) | ex::let_error([=](std::exception_ptr& ep) { check_exception_ptr_message(ep, "error"); return ex::transfer_just(sched, 42); - }) | tt::sync_wait(); + })); PIKA_TEST_EQ(result, 42); } // predecessor doesn't throw, let sender is ignored { - auto result = ex::transfer_just(sched, 42) | + auto result = tt::sync_wait(ex::transfer_just(sched, 42) | ex::let_error([](std::exception_ptr) { PIKA_TEST(false); return ex::just(43); - }) | - tt::sync_wait(); + })); PIKA_TEST_EQ(result, 42); } { - auto result = ex::transfer_just(sched, 42) | + auto result = tt::sync_wait(ex::transfer_just(sched, 42) | ex::let_error([=](std::exception_ptr) { PIKA_TEST(false); return ex::transfer_just(sched, 43); - }) | - tt::sync_wait(); + })); PIKA_TEST_EQ(result, 42); } { - auto result = ex::just(42) | ex::let_error([=](std::exception_ptr) { - PIKA_TEST(false); - return ex::transfer_just(sched, 43); - }) | tt::sync_wait(); + auto result = + tt::sync_wait(ex::just(42) | ex::let_error([=](std::exception_ptr) { + PIKA_TEST(false); + return ex::transfer_just(sched, 43); + })); PIKA_TEST_EQ(result, 42); } } @@ -1219,10 +1213,11 @@ void test_bulk() std::vector v(n, 0); std::thread::id parent_id = std::this_thread::get_id(); - ex::schedule(ex::std_thread_scheduler{}) | ex::bulk(n, [&](int i) { - ++v[i]; - PIKA_TEST_NEQ(parent_id, std::this_thread::get_id()); - }) | tt::sync_wait(); + tt::sync_wait( + ex::schedule(ex::std_thread_scheduler{}) | ex::bulk(n, [&](int i) { + ++v[i]; + PIKA_TEST_NEQ(parent_id, std::this_thread::get_id()); + })); for (int i = 0; i < n; ++i) { @@ -1235,14 +1230,12 @@ void test_bulk() std::vector v(n, -1); std::thread::id parent_id = std::this_thread::get_id(); - auto v_out = + auto v_out = tt::sync_wait( ex::transfer_just(ex::std_thread_scheduler{}, std::move(v)) | - ex::bulk(n, - [&parent_id](int i, std::vector& v) { - v[i] = i; - PIKA_TEST_NEQ(parent_id, std::this_thread::get_id()); - }) | - tt::sync_wait(); + ex::bulk(n, [&parent_id](int i, std::vector& v) { + v[i] = i; + PIKA_TEST_NEQ(parent_id, std::this_thread::get_id()); + })); for (int i = 0; i < n; ++i) { @@ -1257,13 +1250,11 @@ void test_bulk() std::mutex mtx; - ex::schedule(ex::std_thread_scheduler{}) | - ex::bulk(std::move(v), - [&](std::string const& s) { - std::lock_guard lk(mtx); - string_map.insert(s); - }) | - tt::sync_wait(); + tt::sync_wait(ex::schedule(ex::std_thread_scheduler{}) | + ex::bulk(std::move(v), [&](std::string const& s) { + std::lock_guard lk(mtx); + string_map.insert(s); + })); for (auto const& s : v_ref) { @@ -1280,16 +1271,14 @@ void test_bulk() try { - ex::transfer_just(ex::std_thread_scheduler{}) | - ex::bulk(n, - [&v](int i) { - if (i == i_fail) - { - throw std::runtime_error("error"); - } - v[i] = i; - }) | - tt::sync_wait(); + tt::sync_wait(ex::transfer_just(ex::std_thread_scheduler{}) | + ex::bulk(n, [&v](int i) { + if (i == i_fail) + { + throw std::runtime_error("error"); + } + v[i] = i; + })); if (expect_exception) { diff --git a/libs/pika/executors/tests/unit/thread_pool_scheduler.cpp b/libs/pika/executors/tests/unit/thread_pool_scheduler.cpp index dbbdc4057..797807fcd 100644 --- a/libs/pika/executors/tests/unit/thread_pool_scheduler.cpp +++ b/libs/pika/executors/tests/unit/thread_pool_scheduler.cpp @@ -593,15 +593,14 @@ void test_when_all() ex::when_all(std::move(work1), std::move(work2), std::move(work3)); bool executed{false}; - std::move(when1) | + tt::sync_wait(std::move(when1) | ex::then([parent_id, &executed](int x, std::string y, double z) { PIKA_TEST_NEQ(parent_id, pika::this_thread::get_id()); PIKA_TEST_EQ(x, 42); PIKA_TEST_EQ(y, std::string("hello")); PIKA_TEST_EQ(z, 3.14); executed = true; - }) | - tt::sync_wait(); + })); PIKA_TEST(executed); } @@ -626,9 +625,8 @@ void test_when_all() try { - ex::when_all(std::move(work1), std::move(work2)) | - ex::then([](int, std::string) { PIKA_TEST(false); }) | - tt::sync_wait(); + tt::sync_wait(ex::when_all(std::move(work1), std::move(work2)) | + ex::then([](int, std::string) { PIKA_TEST(false); })); PIKA_TEST(false); } catch (std::runtime_error const& e) @@ -661,9 +659,8 @@ void test_when_all() try { - ex::when_all(std::move(work1), std::move(work2)) | - ex::then([](int, std::string) { PIKA_TEST(false); }) | - tt::sync_wait(); + tt::sync_wait(ex::when_all(std::move(work1), std::move(work2)) | + ex::then([](int, std::string) { PIKA_TEST(false); })); PIKA_TEST(false); } catch (std::runtime_error const& e) @@ -705,21 +702,21 @@ void test_when_all_vector() #if !defined(PIKA_HAVE_P2300_REFERENCE_IMPLEMENTATION) bool executed{false}; #endif - std::move(when1) | + tt::sync_wait(std::move(when1) // TODO: ADL issues? Uncommenting instantiates set_value with the // sync_wait_receiver from when_all_vector_receiver, i.e. then is // "skipped". #if !defined(PIKA_HAVE_P2300_REFERENCE_IMPLEMENTATION) - ex::then([parent_id, &executed](std::vector v) { - PIKA_TEST_NEQ(parent_id, pika::this_thread::get_id()); - PIKA_TEST_EQ(v.size(), std::size_t(3)); - PIKA_TEST_EQ(v[0], 42.0); - PIKA_TEST_EQ(v[1], 43.0); - PIKA_TEST_EQ(v[2], 3.14); - executed = true; - }) | + | ex::then([parent_id, &executed](std::vector v) { + PIKA_TEST_NEQ(parent_id, pika::this_thread::get_id()); + PIKA_TEST_EQ(v.size(), std::size_t(3)); + PIKA_TEST_EQ(v[0], 42.0); + PIKA_TEST_EQ(v[1], 43.0); + PIKA_TEST_EQ(v[2], 3.14); + executed = true; + }) #endif - tt::sync_wait(); + ); #if !defined(PIKA_HAVE_P2300_REFERENCE_IMPLEMENTATION) PIKA_TEST(executed); #endif @@ -748,14 +745,14 @@ void test_when_all_vector() try { - ex::when_all_vector(std::move(senders)) | + tt::sync_wait(ex::when_all_vector(std::move(senders)) // TODO: ADL issues? Uncommenting instantiates set_value with the // sync_wait_receiver from when_all_vector_receiver, i.e. then is // "skipped". #if !defined(PIKA_HAVE_P2300_REFERENCE_IMPLEMENTATION) - ex::then([](std::vector) { PIKA_TEST(false); }) | + | ex::then([](std::vector) { PIKA_TEST(false); }) #endif - tt::sync_wait(); + ); PIKA_TEST(false); } catch (std::runtime_error const& e) @@ -790,14 +787,14 @@ void test_when_all_vector() try { - ex::when_all_vector(std::move(senders)) | + tt::sync_wait(ex::when_all_vector(std::move(senders)) // TODO: ADL issues? Uncommenting instantiates set_value with the // sync_wait_receiver from when_all_vector_receiver, i.e. then is // "skipped". #if !defined(PIKA_HAVE_P2300_REFERENCE_IMPLEMENTATION) - ex::then([](std::vector) { PIKA_TEST(false); }) | + | ex::then([](std::vector) { PIKA_TEST(false); }) #endif - tt::sync_wait(); + ); PIKA_TEST(false); } catch (std::runtime_error const& e) @@ -1028,7 +1025,7 @@ void test_ensure_started() ex::thread_pool_scheduler sched{}; { - ex::schedule(sched) | ex::ensure_started() | tt::sync_wait(); + tt::sync_wait(ex::schedule(sched) | ex::ensure_started()); } { @@ -1089,9 +1086,9 @@ void test_ensure_started_when_all() ++successor_task_calls; return 2; }); - PIKA_TEST_EQ(ex::when_all(succ1, succ2) | - ex::then([](int const& x, int const& y) { return x + y; }) | - tt::sync_wait(), + PIKA_TEST_EQ( + tt::sync_wait(ex::when_all(succ1, succ2) | + ex::then([](int const& x, int const& y) { return x + y; })), 3); PIKA_TEST_EQ(first_task_calls, std::size_t(1)); PIKA_TEST_EQ(successor_task_calls, std::size_t(2)); @@ -1124,9 +1121,9 @@ void test_ensure_started_when_all() ++successor_task_calls; return x + 2; }); - PIKA_TEST_EQ(ex::when_all(succ1, succ2) | - ex::then([](int const& x, int const& y) { return x + y; }) | - tt::sync_wait(), + PIKA_TEST_EQ( + tt::sync_wait(ex::when_all(succ1, succ2) | + ex::then([](int const& x, int const& y) { return x + y; })), 9); PIKA_TEST_EQ(first_task_calls, std::size_t(1)); PIKA_TEST_EQ(successor_task_calls, std::size_t(2)); @@ -1158,9 +1155,9 @@ void test_ensure_started_when_all() ++successor_task_calls; return x + 2; }); - PIKA_TEST_EQ(ex::when_all(succ1, succ2) | - ex::then([](int const& x, int const& y) { return x + y; }) | - tt::sync_wait(), + PIKA_TEST_EQ( + tt::sync_wait(ex::when_all(succ1, succ2) | + ex::then([](int const& x, int const& y) { return x + y; })), 9); PIKA_TEST_EQ(first_task_calls, std::size_t(1)); PIKA_TEST_EQ(successor_task_calls, std::size_t(2)); @@ -1175,7 +1172,7 @@ void test_split() ex::thread_pool_scheduler sched{}; { - ex::schedule(sched) | ex::split() | tt::sync_wait(); + tt::sync_wait(ex::schedule(sched) | ex::split()); } { @@ -1218,9 +1215,9 @@ void test_split_when_all() ++successor_task_calls; return 2; }); - PIKA_TEST_EQ(ex::when_all(succ1, succ2) | - ex::then([](int const& x, int const& y) { return x + y; }) | - tt::sync_wait(), + PIKA_TEST_EQ( + tt::sync_wait(ex::when_all(succ1, succ2) | + ex::then([](int const& x, int const& y) { return x + y; })), 3); PIKA_TEST_EQ(first_task_calls, std::size_t(1)); PIKA_TEST_EQ(successor_task_calls, std::size_t(2)); @@ -1241,9 +1238,9 @@ void test_split_when_all() ++successor_task_calls; return x + 2; }); - PIKA_TEST_EQ(ex::when_all(succ1, succ2) | - ex::then([](int const& x, int const& y) { return x + y; }) | - tt::sync_wait(), + PIKA_TEST_EQ( + tt::sync_wait(ex::when_all(succ1, succ2) | + ex::then([](int const& x, int const& y) { return x + y; })), 9); PIKA_TEST_EQ(first_task_calls, std::size_t(1)); PIKA_TEST_EQ(successor_task_calls, std::size_t(2)); @@ -1264,9 +1261,9 @@ void test_split_when_all() ++successor_task_calls; return x + 2; }); - PIKA_TEST_EQ(ex::when_all(succ1, succ2) | - ex::then([](int const& x, int const& y) { return x + y; }) | - tt::sync_wait(), + PIKA_TEST_EQ( + tt::sync_wait(ex::when_all(succ1, succ2) | + ex::then([](int const& x, int const& y) { return x + y; })), 9); PIKA_TEST_EQ(first_task_calls, std::size_t(1)); PIKA_TEST_EQ(successor_task_calls, std::size_t(2)); @@ -1280,67 +1277,65 @@ void test_let_value() // void predecessor { - auto result = ex::schedule(sched) | - ex::let_value([]() { return ex::just(42); }) | tt::sync_wait(); + auto result = tt::sync_wait( + ex::schedule(sched) | ex::let_value([]() { return ex::just(42); })); PIKA_TEST_EQ(result, 42); } { - auto result = ex::schedule(sched) | - ex::let_value([=]() { return ex::transfer_just(sched, 42); }) | - tt::sync_wait(); + auto result = tt::sync_wait(ex::schedule(sched) | + ex::let_value([=]() { return ex::transfer_just(sched, 42); })); PIKA_TEST_EQ(result, 42); } { - auto result = ex::just() | - ex::let_value([=]() { return ex::transfer_just(sched, 42); }) | - tt::sync_wait(); + auto result = tt::sync_wait(ex::just() | + ex::let_value([=]() { return ex::transfer_just(sched, 42); })); PIKA_TEST_EQ(result, 42); } // int predecessor, value ignored { - auto result = ex::transfer_just(sched, 43) | - ex::let_value([](int&) { return ex::just(42); }) | tt::sync_wait(); + auto result = tt::sync_wait(ex::transfer_just(sched, 43) | + ex::let_value([](int&) { return ex::just(42); })); PIKA_TEST_EQ(result, 42); } { - auto result = ex::transfer_just(sched, 43) | - ex::let_value([=](int&) { return ex::transfer_just(sched, 42); }) | - tt::sync_wait(); + auto result = tt::sync_wait(ex::transfer_just(sched, 43) | + ex::let_value([=](int&) { return ex::transfer_just(sched, 42); })); PIKA_TEST_EQ(result, 42); } { - auto result = ex::just(43) | - ex::let_value([=](int&) { return ex::transfer_just(sched, 42); }) | - tt::sync_wait(); + auto result = tt::sync_wait(ex::just(43) | + ex::let_value([=](int&) { return ex::transfer_just(sched, 42); })); PIKA_TEST_EQ(result, 42); } // int predecessor, value used { - auto result = ex::transfer_just(sched, 43) | ex::let_value([](int& x) { - return ex::just(42) | ex::then([&](int y) { return x + y; }); - }) | tt::sync_wait(); + auto result = tt::sync_wait( + ex::transfer_just(sched, 43) | ex::let_value([](int& x) { + return ex::just(42) | ex::then([&](int y) { return x + y; }); + })); PIKA_TEST_EQ(result, 85); } { - auto result = ex::transfer_just(sched, 43) | ex::let_value([=](int& x) { - return ex::transfer_just(sched, 42) | - ex::then([&](int y) { return x + y; }); - }) | tt::sync_wait(); + auto result = tt::sync_wait( + ex::transfer_just(sched, 43) | ex::let_value([=](int& x) { + return ex::transfer_just(sched, 42) | + ex::then([&](int y) { return x + y; }); + })); PIKA_TEST_EQ(result, 85); } { - auto result = ex::just(43) | ex::let_value([=](int& x) { + auto result = tt::sync_wait(ex::just(43) | ex::let_value([=](int& x) { return ex::transfer_just(sched, 42) | ex::then([&](int y) { return x + y; }); - }) | tt::sync_wait(); + })); PIKA_TEST_EQ(result, 85); } @@ -1350,13 +1345,13 @@ void test_let_value() try { - ex::transfer_just(sched, 43) | ex::then([](int x) { + tt::sync_wait(ex::transfer_just(sched, 43) | ex::then([](int x) { throw std::runtime_error("error"); return x; }) | ex::let_value([](int&) { PIKA_TEST(false); return ex::just(0); - }) | tt::sync_wait(); + })); PIKA_TEST(false); } catch (std::runtime_error const& e) @@ -1392,100 +1387,99 @@ void test_let_error() // void predecessor { std::atomic called{false}; - ex::schedule(sched) | ex::then([]() { + tt::sync_wait(ex::schedule(sched) | ex::then([]() { throw std::runtime_error("error"); }) | ex::let_error([&called](std::exception_ptr& ep) { called = true; check_exception_ptr_message(ep, "error"); return ex::just(); - }) | tt::sync_wait(); + })); PIKA_TEST(called); } { std::atomic called{false}; - ex::schedule(sched) | ex::then([]() { + tt::sync_wait(ex::schedule(sched) | ex::then([]() { throw std::runtime_error("error"); }) | ex::let_error([=, &called](std::exception_ptr& ep) { called = true; check_exception_ptr_message(ep, "error"); return ex::transfer_just(sched); - }) | tt::sync_wait(); + })); PIKA_TEST(called); } { std::atomic called{false}; - ex::just() | ex::then([]() { throw std::runtime_error("error"); }) | - ex::let_error([=, &called](std::exception_ptr& ep) { - called = true; - check_exception_ptr_message(ep, "error"); - return ex::transfer_just(sched); - }) | - tt::sync_wait(); + tt::sync_wait(ex::just() | ex::then([]() { + throw std::runtime_error("error"); + }) | ex::let_error([=, &called](std::exception_ptr& ep) { + called = true; + check_exception_ptr_message(ep, "error"); + return ex::transfer_just(sched); + })); PIKA_TEST(called); } // int predecessor { - auto result = ex::schedule(sched) | ex::then([]() { + auto result = tt::sync_wait(ex::schedule(sched) | ex::then([]() { throw std::runtime_error("error"); return 43; }) | ex::let_error([](std::exception_ptr& ep) { check_exception_ptr_message(ep, "error"); return ex::just(42); - }) | tt::sync_wait(); + })); PIKA_TEST_EQ(result, 42); } { - auto result = ex::schedule(sched) | ex::then([]() { + auto result = tt::sync_wait(ex::schedule(sched) | ex::then([]() { throw std::runtime_error("error"); return 43; }) | ex::let_error([=](std::exception_ptr& ep) { check_exception_ptr_message(ep, "error"); return ex::transfer_just(sched, 42); - }) | tt::sync_wait(); + })); PIKA_TEST_EQ(result, 42); } { - auto result = ex::just() | ex::then([]() { + auto result = tt::sync_wait(ex::just() | ex::then([]() { throw std::runtime_error("error"); return 43; }) | ex::let_error([=](std::exception_ptr& ep) { check_exception_ptr_message(ep, "error"); return ex::transfer_just(sched, 42); - }) | tt::sync_wait(); + })); PIKA_TEST_EQ(result, 42); } // predecessor doesn't throw, let sender is ignored { - auto result = ex::transfer_just(sched, 42) | + auto result = tt::sync_wait(ex::transfer_just(sched, 42) | ex::let_error([](std::exception_ptr) { PIKA_TEST(false); return ex::just(43); - }) | - tt::sync_wait(); + })); PIKA_TEST_EQ(result, 42); } { - auto result = ex::transfer_just(sched, 42) | + auto result = tt::sync_wait(ex::transfer_just(sched, 42) | ex::let_error([=](std::exception_ptr) { PIKA_TEST(false); return ex::transfer_just(sched, 43); - }) | - tt::sync_wait(); + })); PIKA_TEST_EQ(result, 42); } { - auto result = ex::just(42) | ex::let_error([=](std::exception_ptr) { - PIKA_TEST(false); - return ex::transfer_just(sched, 43); - }) | tt::sync_wait(); + auto result = + tt::sync_wait(ex::just(42) | ex::let_error([=](std::exception_ptr) { + PIKA_TEST(false); + return ex::transfer_just(sched, 43); + })); PIKA_TEST_EQ(result, 42); } } @@ -1539,35 +1533,31 @@ void test_keep_future_sender() { // the future should be passed to then, not it's contained value { - pika::make_ready_future() | ex::keep_future() | - ex::then([](pika::future&& f) { PIKA_TEST(f.is_ready()); }) | - tt::sync_wait(); + tt::sync_wait(pika::make_ready_future() | ex::keep_future() | + ex::then([](pika::future&& f) { PIKA_TEST(f.is_ready()); })); } { - pika::make_ready_future().share() | ex::keep_future() | - ex::then([](pika::shared_future&& f) { + tt::sync_wait(pika::make_ready_future().share() | + ex::keep_future() | ex::then([](pika::shared_future&& f) { PIKA_TEST(f.is_ready()); - }) | - tt::sync_wait(); + })); } { - pika::make_ready_future(42) | ex::keep_future() | + tt::sync_wait(pika::make_ready_future(42) | ex::keep_future() | ex::then([](pika::future&& f) { PIKA_TEST(f.is_ready()); PIKA_TEST_EQ(f.get(), 42); - }) | - tt::sync_wait(); + })); } { - pika::make_ready_future(42).share() | ex::keep_future() | - ex::then([](pika::shared_future&& f) { + tt::sync_wait(pika::make_ready_future(42).share() | + ex::keep_future() | ex::then([](pika::shared_future&& f) { PIKA_TEST(f.is_ready()); PIKA_TEST_EQ(f.get(), 42); - }) | - tt::sync_wait(); + })); } { @@ -1693,8 +1683,8 @@ void test_keep_future_sender() { auto f = pika::async([&]() { return 42; }); - auto r = std::move(f) | ex::keep_future() | - ex::transfer(ex::thread_pool_scheduler{}) | tt::sync_wait(); + auto r = tt::sync_wait(std::move(f) | ex::keep_future() | + ex::transfer(ex::thread_pool_scheduler{})); PIKA_TEST(r.is_ready()); PIKA_TEST_EQ(r.get(), 42); } @@ -1702,8 +1692,8 @@ void test_keep_future_sender() { auto sf = pika::async([&]() { return 42; }).share(); - auto r = std::move(sf) | ex::keep_future() | - ex::transfer(ex::thread_pool_scheduler{}) | tt::sync_wait(); + auto r = tt::sync_wait(std::move(sf) | ex::keep_future() | + ex::transfer(ex::thread_pool_scheduler{})); PIKA_TEST(r.is_ready()); PIKA_TEST_EQ(r.get(), 42); } @@ -1718,8 +1708,8 @@ void test_keep_future_sender() // or storing a const&. The copy is not possible because the type is // noncopyable, and storing a reference is not acceptable since the // reference may outlive the value. - auto r = std::move(sf) | ex::keep_future() | - ex::transfer(ex::thread_pool_scheduler{}) | tt::sync_wait(); + auto r = tt::sync_wait(std::move(sf) | ex::keep_future() | + ex::transfer(ex::thread_pool_scheduler{})); PIKA_TEST(r.is_ready()); PIKA_TEST_EQ(r.get().x, 42); } @@ -1731,9 +1721,10 @@ void test_keep_future_sender() auto fun = pika::unwrapping( [](int&& x, double const& y) { return x * 2 + (int(y) / 2); }); - PIKA_TEST_EQ(ex::when_all(std::move(f) | ex::keep_future(), - sf | ex::keep_future()) | - ex::then(fun) | tt::sync_wait(), + PIKA_TEST_EQ( + tt::sync_wait(ex::when_all(std::move(f) | ex::keep_future(), + sf | ex::keep_future()) | + ex::then(fun)), 85); } @@ -1743,10 +1734,10 @@ void test_keep_future_sender() auto fun = pika::unwrapping( [](int&& x, double const& y) { return x * 2 + (int(y) / 2); }); - PIKA_TEST_EQ(ex::when_all(std::move(f) | ex::keep_future(), - sf | ex::keep_future()) | - ex::transfer(ex::thread_pool_scheduler{}) | ex::then(fun) | - tt::sync_wait(), + PIKA_TEST_EQ( + tt::sync_wait(ex::when_all(std::move(f) | ex::keep_future(), + sf | ex::keep_future()) | + ex::transfer(ex::thread_pool_scheduler{}) | ex::then(fun)), 85); } } @@ -1760,10 +1751,11 @@ void test_bulk() std::vector v(n, 0); pika::thread::id parent_id = pika::this_thread::get_id(); - ex::schedule(ex::thread_pool_scheduler{}) | ex::bulk(n, [&](int i) { - ++v[i]; - PIKA_TEST_NEQ(parent_id, pika::this_thread::get_id()); - }) | tt::sync_wait(); + tt::sync_wait( + ex::schedule(ex::thread_pool_scheduler{}) | ex::bulk(n, [&](int i) { + ++v[i]; + PIKA_TEST_NEQ(parent_id, pika::this_thread::get_id()); + })); for (int i = 0; i < n; ++i) { @@ -1776,14 +1768,12 @@ void test_bulk() std::vector v(n, -1); pika::thread::id parent_id = pika::this_thread::get_id(); - auto v_out = + auto v_out = tt::sync_wait( ex::transfer_just(ex::thread_pool_scheduler{}, std::move(v)) | - ex::bulk(n, - [&parent_id](int i, std::vector& v) { - v[i] = i; - PIKA_TEST_NEQ(parent_id, pika::this_thread::get_id()); - }) | - tt::sync_wait(); + ex::bulk(n, [&parent_id](int i, std::vector& v) { + v[i] = i; + PIKA_TEST_NEQ(parent_id, pika::this_thread::get_id()); + })); for (int i = 0; i < n; ++i) { @@ -1800,13 +1790,11 @@ void test_bulk() pika::mutex mtx; - ex::schedule(ex::thread_pool_scheduler{}) | - ex::bulk(std::move(v), - [&](std::string const& s) { - std::lock_guard lk(mtx); - string_map.insert(s); - }) | - tt::sync_wait(); + tt::sync_wait(ex::schedule(ex::thread_pool_scheduler{}) | + ex::bulk(std::move(v), [&](std::string const& s) { + std::lock_guard lk(mtx); + string_map.insert(s); + })); for (auto const& s : v_ref) { @@ -1824,16 +1812,14 @@ void test_bulk() try { - ex::transfer_just(ex::thread_pool_scheduler{}) | - ex::bulk(n, - [&v](int i) { - if (i == i_fail) - { - throw std::runtime_error("error"); - } - v[i] = i; - }) | - tt::sync_wait(); + tt::sync_wait(ex::transfer_just(ex::thread_pool_scheduler{}) | + ex::bulk(n, [&v](int i) { + if (i == i_fail) + { + throw std::runtime_error("error"); + } + v[i] = i; + })); if (expect_exception) { diff --git a/libs/pika/synchronization/tests/unit/async_rw_mutex.cpp b/libs/pika/synchronization/tests/unit/async_rw_mutex.cpp index 51d2738d2..223e2f0ba 100644 --- a/libs/pika/synchronization/tests/unit/async_rw_mutex.cpp +++ b/libs/pika/synchronization/tests/unit/async_rw_mutex.cpp @@ -169,7 +169,7 @@ template void test_single_read_access(async_rw_mutex rwm) { std::atomic called{false}; - rwm.read() | then([&](auto) { called = true; }) | sync_wait(); + sync_wait(rwm.read() | then([&](auto) { called = true; })); PIKA_TEST(called); } @@ -177,7 +177,7 @@ template void test_single_readwrite_access(async_rw_mutex rwm) { std::atomic called{false}; - rwm.readwrite() | then([&](auto) { called = true; }) | sync_wait(); + sync_wait(rwm.readwrite() | then([&](auto) { called = true; })); PIKA_TEST(called); } @@ -188,7 +188,7 @@ void test_moved(async_rw_mutex rwm) // values alive auto rwm2 = std::move(rwm); std::atomic called{false}; - rwm2.read() | then([&](auto) { called = true; }) | sync_wait(); + sync_wait(rwm2.read() | then([&](auto) { called = true; })); PIKA_TEST(called); } @@ -250,7 +250,7 @@ void test_multiple_accesses( submit_senders(exec, rw_senders); // The destructor does not block, so we block here manually - rwm.readwrite() | sync_wait(); + sync_wait(rwm.readwrite()); } ///////////////////////////////////////////////////////////////////////////////