Skip to content

Commit

Permalink
Merge 44ff7d5 into c61370d
Browse files Browse the repository at this point in the history
  • Loading branch information
msimberg authored Aug 4, 2022
2 parents c61370d + 44ff7d5 commit 6185de8
Show file tree
Hide file tree
Showing 10 changed files with 298 additions and 346 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,8 @@ void test_for_each_sender(ExPolicy&& p, IteratorTag)
auto rng = pika::util::make_iterator_range(
iterator(std::begin(c)), iterator(std::end(c)));
auto f = [](std::size_t& v) { v = 42; };
auto result = ex::just(rng, f) |
pika::ranges::for_each(std::forward<ExPolicy>(p)) | tt::sync_wait();
auto result = tt::sync_wait(
ex::just(rng, f) | pika::ranges::for_each(std::forward<ExPolicy>(p)));
PIKA_TEST(result == iterator(std::end(c)));

// verify values
Expand Down Expand Up @@ -380,8 +380,8 @@ void test_for_each_exception_sender(ExPolicy p, IteratorTag)
bool caught_exception = false;
try
{
ex::just(rng, f) | pika::ranges::for_each(std::forward<ExPolicy>(p)) |
tt::sync_wait();
tt::sync_wait(ex::just(rng, f) |
pika::ranges::for_each(std::forward<ExPolicy>(p)));

PIKA_TEST(false);
}
Expand Down Expand Up @@ -417,8 +417,8 @@ void test_for_each_bad_alloc_sender(ExPolicy p, IteratorTag)
bool caught_exception = false;
try
{
ex::just(rng, f) | pika::ranges::for_each(std::forward<ExPolicy>(p)) |
tt::sync_wait();
tt::sync_wait(ex::just(rng, f) |
pika::ranges::for_each(std::forward<ExPolicy>(p)));

PIKA_TEST(false);
}
Expand Down
27 changes: 14 additions & 13 deletions libs/pika/async_cuda/tests/performance/synchronize.cu
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ int pika_main(pika::program_options::variables_map& vm)
pika::chrono::detail::high_resolution_timer timer;
for (std::size_t i = 0; i != iterations; ++i)
{
ex::schedule(sched) | cu::then_with_stream(f) | tt::sync_wait();
tt::sync_wait(
ex::sync_wait(ex::schedule(sched) | cu::then_with_stream(f)));
}
double elapsed = timer.elapsed();
std::cout
Expand All @@ -111,17 +112,17 @@ int pika_main(pika::program_options::variables_map& vm)
// We have to manually unroll this loop, because the type of the
// sender changes for each additional then_with_stream call. The
// number of unrolled calls must match batch_size above.
ex::schedule(sched) | cu::then_with_stream(f) |
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) | tt::sync_wait();
cu::then_with_stream(f));
}
// Do the remainder one-by-one
for (std::size_t i = 0; i < non_batch_iterations; ++i)
{
ex::schedule(sched) | cu::then_with_stream(f) | tt::sync_wait();
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f));
}
double elapsed = timer.elapsed();
std::cout
Expand All @@ -145,7 +146,7 @@ int pika_main(pika::program_options::variables_map& vm)
// intentionally insert dummy then([]{}) calls between the
// then_with_stream calls to force synchronization between the
// kernel launches.
ex::schedule(sched) | cu::then_with_stream(f) |
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | ex::then([] {}) |
ex::transfer(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | ex::then([] {}) |
Expand All @@ -163,12 +164,12 @@ int pika_main(pika::program_options::variables_map& vm)
ex::transfer(ex::thread_pool_scheduler{}) | ex::then([] {}) |
ex::transfer(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | ex::then([] {}) |
ex::transfer(sched) | cu::then_with_stream(f) | tt::sync_wait();
ex::transfer(sched) | cu::then_with_stream(f));
}
// Do the remainder one-by-one
for (std::size_t i = 0; i < non_batch_iterations; ++i)
{
ex::schedule(sched) | cu::then_with_stream(f) | tt::sync_wait();
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f));
}
double elapsed = timer.elapsed();
std::cout
Expand All @@ -186,8 +187,8 @@ int pika_main(pika::program_options::variables_map& vm)
pika::chrono::detail::high_resolution_timer timer;
for (std::size_t i = 0; i != iterations; ++i)
{
ex::schedule(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | tt::sync_wait();
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}));
}
double elapsed = timer.elapsed();
std::cout
Expand All @@ -208,19 +209,19 @@ int pika_main(pika::program_options::variables_map& vm)
// We have to manually unroll this loop, because the type of the
// sender changes for each additional then_with_stream call. The
// number of unrolled calls must match batch_size above.
ex::schedule(sched) | cu::then_with_stream(f) |
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | tt::sync_wait();
ex::transfer(ex::thread_pool_scheduler{}));
}
// Do the remainder one-by-one
for (std::size_t i = 0; i < non_batch_iterations; ++i)
{
ex::schedule(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | tt::sync_wait();
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}));
}
double elapsed = timer.elapsed();
std::cout
Expand Down
12 changes: 6 additions & 6 deletions libs/pika/async_cuda/tests/unit/then_with_stream.cu
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ int pika_main()
{
cudaStream_t first_stream{};
cudaStream_t second_stream{};
ex::schedule(cu::cuda_scheduler{pool}) |
tt::sync_wait(ex::schedule(cu::cuda_scheduler{pool}) |
cu::then_with_stream(
[&](cudaStream_t stream) { first_stream = stream; }) |
cu::then_with_stream([&](cudaStream_t stream) {
Expand All @@ -394,8 +394,7 @@ int pika_main()
}) |
cu::then_with_stream([&](cudaStream_t stream) {
PIKA_TEST_EQ(stream, second_stream);
}) |
tt::sync_wait();
}));
}

{
Expand All @@ -414,14 +413,15 @@ int pika_main()
cu::then_with_stream(increment{}) |
cu::then_with_stream(increment{}) |
cu::then_with_stream(increment{});
ex::when_all(ex::just(&p_h), std::move(s), ex::just(sizeof(type)),
ex::just(cudaMemcpyDeviceToHost)) |
tt::sync_wait(
ex::when_all(ex::just(&p_h), std::move(s), ex::just(sizeof(type)),
ex::just(cudaMemcpyDeviceToHost)) |
ex::transfer(cu::cuda_scheduler{pool}) |
cu::then_with_stream(cuda_memcpy_async{}) |
ex::transfer(ex::thread_pool_scheduler{}) |
ex::then(&cu::check_cuda_error) |
ex::then([&p_h] { PIKA_TEST_EQ(p_h, 3); }) |
ex::transfer(ex::thread_pool_scheduler{}) | tt::sync_wait();
ex::transfer(ex::thread_pool_scheduler{}));

cu::check_cuda_error(cudaFree(p));
}
Expand Down
20 changes: 9 additions & 11 deletions libs/pika/async_mpi/tests/unit/algorithm_transform_mpi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,9 @@ int pika_main()
{
data = 42;
}
auto result = ex::just(&data, count, datatype, 0, comm) |
mpi::transform_mpi(MPI_Ibcast) | tt::sync_wait();
auto result =
tt::sync_wait(ex::just(&data, count, datatype, 0, comm) |
mpi::transform_mpi(MPI_Ibcast));
if (rank != 0)
{
PIKA_TEST_EQ(data, 42);
Expand All @@ -156,10 +157,9 @@ int pika_main()
bool exception_thrown = false;
try
{
mpi::transform_mpi(
tt::sync_wait(mpi::transform_mpi(
error_sender<int*, int, MPI_Datatype, int, MPI_Comm>{},
MPI_Ibcast) |
tt::sync_wait();
MPI_Ibcast));
PIKA_TEST(false);
}
catch (std::runtime_error const& e)
Expand Down Expand Up @@ -203,9 +203,8 @@ int pika_main()
bool exception_thrown = false;
try
{
mpi::transform_mpi(
ex::just(data, count, datatype, -1, comm), MPI_Ibcast) |
tt::sync_wait();
tt::sync_wait(mpi::transform_mpi(
ex::just(data, count, datatype, -1, comm), MPI_Ibcast));
PIKA_TEST(false);
}
catch (std::runtime_error const& e)
Expand All @@ -228,9 +227,8 @@ int pika_main()
bool exception_thrown = false;
try
{
mpi::transform_mpi(
ex::just(data, count, datatype, -1, comm), MPI_Ibcast) |
tt::sync_wait();
tt::sync_wait(mpi::transform_mpi(
ex::just(data, count, datatype, -1, comm), MPI_Ibcast));
PIKA_TEST(false);
}
catch (std::runtime_error const&)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#include <pika/allocator_support/traits/is_allocator.hpp>
#include <pika/assert.hpp>
#include <pika/concepts/concepts.hpp>
#include <pika/execution/algorithms/detail/partial_algorithm.hpp>
#include <pika/execution_base/operation_state.hpp>
#include <pika/execution_base/sender.hpp>
#include <pika/functional/detail/tag_fallback_invoke.hpp>
Expand Down Expand Up @@ -154,19 +153,6 @@ namespace pika { namespace execution { namespace experimental {
operation_state_type{PIKA_FORWARD(Sender, sender), alloc};
PIKA_UNUSED(p.release());
}

// clang-format off
template <typename Allocator = pika::detail::internal_allocator<>,
PIKA_CONCEPT_REQUIRES_(
pika::detail::is_allocator_v<Allocator>
)>
// clang-format on
friend constexpr PIKA_FORCEINLINE auto tag_fallback_invoke(
start_detached_t, Allocator const& allocator = Allocator{})
{
return detail::partial_algorithm<start_detached_t, Allocator>{
allocator};
}
} start_detached{};
}}} // namespace pika::execution::experimental
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#include <pika/concepts/concepts.hpp>
#include <pika/datastructures/variant.hpp>
#include <pika/execution/algorithms/detail/partial_algorithm.hpp>
#include <pika/execution/algorithms/detail/single_result.hpp>
#include <pika/execution_base/operation_state.hpp>
#include <pika/execution_base/receiver.hpp>
Expand Down Expand Up @@ -268,11 +267,5 @@ namespace pika::this_thread::experimental {
state.wait();
return state.get_value();
}

friend constexpr PIKA_FORCEINLINE auto tag_fallback_invoke(sync_wait_t)
{
return pika::execution::experimental::detail::partial_algorithm<
sync_wait_t>{};
}
} sync_wait{};
} // namespace pika::this_thread::experimental
7 changes: 3 additions & 4 deletions libs/pika/execution/tests/unit/algorithm_sync_wait.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,15 @@ int pika_main()
std::atomic<bool> start_called{false};
std::atomic<bool> connect_called{false};
std::atomic<bool> tag_invoke_overload_called{false};
custom_sender{
start_called, connect_called, tag_invoke_overload_called} |
tt::sync_wait();
tt::sync_wait(custom_sender{
start_called, connect_called, tag_invoke_overload_called});
PIKA_TEST(start_called);
PIKA_TEST(connect_called);
PIKA_TEST(!tag_invoke_overload_called);
}

{
PIKA_TEST_EQ(ex::just(3) | tt::sync_wait(), 3);
PIKA_TEST_EQ(tt::sync_wait(ex::just(3)), 3);
}

// tag_invoke overload
Expand Down
Loading

0 comments on commit 6185de8

Please sign in to comment.