Skip to content

Commit

Permalink
Try #346:
Browse files Browse the repository at this point in the history
  • Loading branch information
bors[bot] authored Aug 4, 2022
2 parents c61370d + dab083a commit 43dcc5b
Show file tree
Hide file tree
Showing 11 changed files with 299 additions and 349 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,8 @@ void test_for_each_sender(ExPolicy&& p, IteratorTag)
auto rng = pika::util::make_iterator_range(
iterator(std::begin(c)), iterator(std::end(c)));
auto f = [](std::size_t& v) { v = 42; };
auto result = ex::just(rng, f) |
pika::ranges::for_each(std::forward<ExPolicy>(p)) | tt::sync_wait();
auto result = tt::sync_wait(
ex::just(rng, f) | pika::ranges::for_each(std::forward<ExPolicy>(p)));
PIKA_TEST(result == iterator(std::end(c)));

// verify values
Expand Down Expand Up @@ -380,8 +380,8 @@ void test_for_each_exception_sender(ExPolicy p, IteratorTag)
bool caught_exception = false;
try
{
ex::just(rng, f) | pika::ranges::for_each(std::forward<ExPolicy>(p)) |
tt::sync_wait();
tt::sync_wait(ex::just(rng, f) |
pika::ranges::for_each(std::forward<ExPolicy>(p)));

PIKA_TEST(false);
}
Expand Down Expand Up @@ -417,8 +417,8 @@ void test_for_each_bad_alloc_sender(ExPolicy p, IteratorTag)
bool caught_exception = false;
try
{
ex::just(rng, f) | pika::ranges::for_each(std::forward<ExPolicy>(p)) |
tt::sync_wait();
tt::sync_wait(ex::just(rng, f) |
pika::ranges::for_each(std::forward<ExPolicy>(p)));

PIKA_TEST(false);
}
Expand Down
26 changes: 13 additions & 13 deletions libs/pika/async_cuda/tests/performance/synchronize.cu
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ int pika_main(pika::program_options::variables_map& vm)
pika::chrono::detail::high_resolution_timer timer;
for (std::size_t i = 0; i != iterations; ++i)
{
ex::schedule(sched) | cu::then_with_stream(f) | tt::sync_wait();
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f));
}
double elapsed = timer.elapsed();
std::cout
Expand All @@ -111,17 +111,17 @@ int pika_main(pika::program_options::variables_map& vm)
// We have to manually unroll this loop, because the type of the
// sender changes for each additional then_with_stream call. The
// number of unrolled calls must match batch_size above.
ex::schedule(sched) | cu::then_with_stream(f) |
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) | tt::sync_wait();
cu::then_with_stream(f));
}
// Do the remainder one-by-one
for (std::size_t i = 0; i < non_batch_iterations; ++i)
{
ex::schedule(sched) | cu::then_with_stream(f) | tt::sync_wait();
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f));
}
double elapsed = timer.elapsed();
std::cout
Expand All @@ -145,7 +145,7 @@ int pika_main(pika::program_options::variables_map& vm)
// intentionally insert dummy then([]{}) calls between the
// then_with_stream calls to force synchronization between the
// kernel launches.
ex::schedule(sched) | cu::then_with_stream(f) |
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | ex::then([] {}) |
ex::transfer(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | ex::then([] {}) |
Expand All @@ -163,12 +163,12 @@ int pika_main(pika::program_options::variables_map& vm)
ex::transfer(ex::thread_pool_scheduler{}) | ex::then([] {}) |
ex::transfer(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | ex::then([] {}) |
ex::transfer(sched) | cu::then_with_stream(f) | tt::sync_wait();
ex::transfer(sched) | cu::then_with_stream(f));
}
// Do the remainder one-by-one
for (std::size_t i = 0; i < non_batch_iterations; ++i)
{
ex::schedule(sched) | cu::then_with_stream(f) | tt::sync_wait();
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f));
}
double elapsed = timer.elapsed();
std::cout
Expand All @@ -186,8 +186,8 @@ int pika_main(pika::program_options::variables_map& vm)
pika::chrono::detail::high_resolution_timer timer;
for (std::size_t i = 0; i != iterations; ++i)
{
ex::schedule(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | tt::sync_wait();
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}));
}
double elapsed = timer.elapsed();
std::cout
Expand All @@ -208,19 +208,19 @@ int pika_main(pika::program_options::variables_map& vm)
// We have to manually unroll this loop, because the type of the
// sender changes for each additional then_with_stream call. The
// number of unrolled calls must match batch_size above.
ex::schedule(sched) | cu::then_with_stream(f) |
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) | cu::then_with_stream(f) |
cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | tt::sync_wait();
ex::transfer(ex::thread_pool_scheduler{}));
}
// Do the remainder one-by-one
for (std::size_t i = 0; i < non_batch_iterations; ++i)
{
ex::schedule(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}) | tt::sync_wait();
tt::sync_wait(ex::schedule(sched) | cu::then_with_stream(f) |
ex::transfer(ex::thread_pool_scheduler{}));
}
double elapsed = timer.elapsed();
std::cout
Expand Down
12 changes: 6 additions & 6 deletions libs/pika/async_cuda/tests/unit/then_with_stream.cu
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ int pika_main()
{
cudaStream_t first_stream{};
cudaStream_t second_stream{};
ex::schedule(cu::cuda_scheduler{pool}) |
tt::sync_wait(ex::schedule(cu::cuda_scheduler{pool}) |
cu::then_with_stream(
[&](cudaStream_t stream) { first_stream = stream; }) |
cu::then_with_stream([&](cudaStream_t stream) {
Expand All @@ -394,8 +394,7 @@ int pika_main()
}) |
cu::then_with_stream([&](cudaStream_t stream) {
PIKA_TEST_EQ(stream, second_stream);
}) |
tt::sync_wait();
}));
}

{
Expand All @@ -414,14 +413,15 @@ int pika_main()
cu::then_with_stream(increment{}) |
cu::then_with_stream(increment{}) |
cu::then_with_stream(increment{});
ex::when_all(ex::just(&p_h), std::move(s), ex::just(sizeof(type)),
ex::just(cudaMemcpyDeviceToHost)) |
tt::sync_wait(
ex::when_all(ex::just(&p_h), std::move(s), ex::just(sizeof(type)),
ex::just(cudaMemcpyDeviceToHost)) |
ex::transfer(cu::cuda_scheduler{pool}) |
cu::then_with_stream(cuda_memcpy_async{}) |
ex::transfer(ex::thread_pool_scheduler{}) |
ex::then(&cu::check_cuda_error) |
ex::then([&p_h] { PIKA_TEST_EQ(p_h, 3); }) |
ex::transfer(ex::thread_pool_scheduler{}) | tt::sync_wait();
ex::transfer(ex::thread_pool_scheduler{}));

cu::check_cuda_error(cudaFree(p));
}
Expand Down
20 changes: 9 additions & 11 deletions libs/pika/async_mpi/tests/unit/algorithm_transform_mpi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,9 @@ int pika_main()
{
data = 42;
}
auto result = ex::just(&data, count, datatype, 0, comm) |
mpi::transform_mpi(MPI_Ibcast) | tt::sync_wait();
auto result =
tt::sync_wait(ex::just(&data, count, datatype, 0, comm) |
mpi::transform_mpi(MPI_Ibcast));
if (rank != 0)
{
PIKA_TEST_EQ(data, 42);
Expand All @@ -156,10 +157,9 @@ int pika_main()
bool exception_thrown = false;
try
{
mpi::transform_mpi(
tt::sync_wait(mpi::transform_mpi(
error_sender<int*, int, MPI_Datatype, int, MPI_Comm>{},
MPI_Ibcast) |
tt::sync_wait();
MPI_Ibcast));
PIKA_TEST(false);
}
catch (std::runtime_error const& e)
Expand Down Expand Up @@ -203,9 +203,8 @@ int pika_main()
bool exception_thrown = false;
try
{
mpi::transform_mpi(
ex::just(data, count, datatype, -1, comm), MPI_Ibcast) |
tt::sync_wait();
tt::sync_wait(mpi::transform_mpi(
ex::just(data, count, datatype, -1, comm), MPI_Ibcast));
PIKA_TEST(false);
}
catch (std::runtime_error const& e)
Expand All @@ -228,9 +227,8 @@ int pika_main()
bool exception_thrown = false;
try
{
mpi::transform_mpi(
ex::just(data, count, datatype, -1, comm), MPI_Ibcast) |
tt::sync_wait();
tt::sync_wait(mpi::transform_mpi(
ex::just(data, count, datatype, -1, comm), MPI_Ibcast));
PIKA_TEST(false);
}
catch (std::runtime_error const&)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#include <pika/allocator_support/traits/is_allocator.hpp>
#include <pika/assert.hpp>
#include <pika/concepts/concepts.hpp>
#include <pika/execution/algorithms/detail/partial_algorithm.hpp>
#include <pika/execution_base/operation_state.hpp>
#include <pika/execution_base/sender.hpp>
#include <pika/functional/detail/tag_fallback_invoke.hpp>
Expand Down Expand Up @@ -154,19 +153,6 @@ namespace pika { namespace execution { namespace experimental {
operation_state_type{PIKA_FORWARD(Sender, sender), alloc};
PIKA_UNUSED(p.release());
}

// clang-format off
template <typename Allocator = pika::detail::internal_allocator<>,
PIKA_CONCEPT_REQUIRES_(
pika::detail::is_allocator_v<Allocator>
)>
// clang-format on
friend constexpr PIKA_FORCEINLINE auto tag_fallback_invoke(
start_detached_t, Allocator const& allocator = Allocator{})
{
return detail::partial_algorithm<start_detached_t, Allocator>{
allocator};
}
} start_detached{};
}}} // namespace pika::execution::experimental
#endif
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#include <pika/concepts/concepts.hpp>
#include <pika/datastructures/variant.hpp>
#include <pika/execution/algorithms/detail/partial_algorithm.hpp>
#include <pika/execution/algorithms/detail/single_result.hpp>
#include <pika/execution_base/operation_state.hpp>
#include <pika/execution_base/receiver.hpp>
Expand Down Expand Up @@ -268,11 +267,5 @@ namespace pika::this_thread::experimental {
state.wait();
return state.get_value();
}

friend constexpr PIKA_FORCEINLINE auto tag_fallback_invoke(sync_wait_t)
{
return pika::execution::experimental::detail::partial_algorithm<
sync_wait_t>{};
}
} sync_wait{};
} // namespace pika::this_thread::experimental
5 changes: 2 additions & 3 deletions libs/pika/execution/tests/unit/algorithm_start_detached.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ int main()
std::atomic<bool> start_called{false};
std::atomic<bool> connect_called{false};
std::atomic<bool> tag_invoke_overload_called{false};
custom_sender{
start_called, connect_called, tag_invoke_overload_called} |
ex::start_detached();
ex::start_detached(custom_sender{
start_called, connect_called, tag_invoke_overload_called});
PIKA_TEST(start_called);
PIKA_TEST(connect_called);
PIKA_TEST(!tag_invoke_overload_called);
Expand Down
7 changes: 3 additions & 4 deletions libs/pika/execution/tests/unit/algorithm_sync_wait.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,15 @@ int pika_main()
std::atomic<bool> start_called{false};
std::atomic<bool> connect_called{false};
std::atomic<bool> tag_invoke_overload_called{false};
custom_sender{
start_called, connect_called, tag_invoke_overload_called} |
tt::sync_wait();
tt::sync_wait(custom_sender{
start_called, connect_called, tag_invoke_overload_called});
PIKA_TEST(start_called);
PIKA_TEST(connect_called);
PIKA_TEST(!tag_invoke_overload_called);
}

{
PIKA_TEST_EQ(ex::just(3) | tt::sync_wait(), 3);
PIKA_TEST_EQ(tt::sync_wait(ex::just(3)), 3);
}

// tag_invoke overload
Expand Down
Loading

0 comments on commit 43dcc5b

Please sign in to comment.