Skip to content

Commit

Permalink
Merge pull request pika-org#1321 from msimberg/transform-mpi-lifetimes
Browse files Browse the repository at this point in the history
      Test and fix lifetimes of `transform_mpi` values
  • Loading branch information
msimberg authored Nov 14, 2024
2 parents 22d5654 + 18b0162 commit 6c9fdf1
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 12 deletions.
3 changes: 1 addition & 2 deletions libs/pika/async_mpi/include/pika/async_mpi/dispatch_mpi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,8 @@ namespace pika::mpi::experimental {
friend constexpr PIKA_FORCEINLINE auto
tag_fallback_invoke(dispatch_mpi_t, Sender&& sender, F&& f)
{
auto snd1 = detail::dispatch_mpi_sender<Sender, F>{
return detail::dispatch_mpi_sender<Sender, F>{
PIKA_FORWARD(Sender, sender), PIKA_FORWARD(F, f)};
return pika::execution::experimental::make_unique_any_sender(std::move(snd1));
}

template <typename F>
Expand Down
17 changes: 12 additions & 5 deletions libs/pika/async_mpi/include/pika/async_mpi/transform_mpi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <pika/execution/algorithms/detail/partial_algorithm.hpp>
#include <pika/execution/algorithms/just.hpp>
#include <pika/execution/algorithms/let_value.hpp>
#include <pika/execution/algorithms/unpack.hpp>
#include <pika/execution_base/any_sender.hpp>
#include <pika/execution_base/receiver.hpp>
#include <pika/execution_base/sender.hpp>
Expand Down Expand Up @@ -52,6 +53,7 @@ namespace pika::mpi::experimental {
using pika::execution::experimental::just;
using pika::execution::experimental::let_value;
using pika::execution::experimental::unique_any_sender;
using pika::execution::experimental::unpack;

// get mpi completion mode settings
auto mode = get_completion_mode();
Expand All @@ -78,14 +80,19 @@ namespace pika::mpi::experimental {

if (requests_inline)
{
return dispatch_mpi_sender<Sender, F>{PIKA_MOVE(sender), PIKA_FORWARD(F, f)} |
let_value(completion_snd);
return std::forward<Sender>(sender) |
let_value([=, f = std::forward<F>(f)](auto&... args) mutable {
return just(std::forward_as_tuple(args...)) | ex::unpack() |
dispatch_mpi(std::move(f)) | let_value(completion_snd);
});
}
else
{
auto snd0 = PIKA_FORWARD(Sender, sender) | continues_on(mpi_pool_scheduler(p));
return dispatch_mpi_sender<decltype(snd0), F>{PIKA_MOVE(snd0), PIKA_FORWARD(F, f)} |
let_value(completion_snd);
return std::forward<Sender>(sender) | continues_on(mpi_pool_scheduler(p)) |
let_value([=, f = std::forward<F>(f)](auto&... args) mutable {
return just(std::forward_as_tuple(args...)) | ex::unpack() |
dispatch_mpi(std::move(f)) | let_value(completion_snd);
});
}
}

Expand Down
30 changes: 27 additions & 3 deletions libs/pika/async_mpi/tests/unit/algorithm_transform_mpi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,38 @@ int pika_main()
PIKA_TEST_EQ(data, 42);
}

// Values passed to transform_mpi should be kept alive by transform_mpi itself
{
int count = 1 << 20;
auto s = ex::just(std::vector<int>{count, 0}, datatype, 0, comm) |
ex::drop_operation_state() |
mpi::transform_mpi([](auto& data, MPI_Datatype datatype, int i, MPI_Comm comm,
MPI_Request* request) {
MPI_Ibcast(data.data(), data.size(), datatype, i, comm, request);
});
tt::sync_wait(PIKA_MOVE(s));
}

{
auto s = ex::just(custom_type_non_default_constructible_non_copyable{42}, datatype,
0, comm) |
ex::drop_operation_state() |
mpi::transform_mpi([](auto& data, MPI_Datatype datatype, int i, MPI_Comm comm,
MPI_Request* request) {
MPI_Ibcast(&data.x, 1, datatype, i, comm, request);
});
tt::sync_wait(PIKA_MOVE(s));
}

// transform_mpi should be able to handle reference types (by copying
// them to the operation state)
{
int data = 0, count = 1;
if (rank == 0) { data = 42; }
auto s = mpi::transform_mpi(
const_reference_sender<int>{count}, [&](int& count, MPI_Request* request) {
MPI_Ibcast(&data, count, datatype, 0, comm, request);
auto s = mpi::transform_mpi(const_reference_sender<int>{count},
[&](int& count_transform_mpi, MPI_Request* request) {
PIKA_TEST(&count_transform_mpi != &count);
MPI_Ibcast(&data, count_transform_mpi, datatype, 0, comm, request);
});
tt::sync_wait(PIKA_MOVE(s));
PIKA_TEST_EQ(data, 42);
Expand Down
10 changes: 8 additions & 2 deletions libs/pika/config/include/pika/config/compiler_specific.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,20 @@
#endif

// clang-format on
# if !defined(__has_feature)
# define PIKA_HAS_FEATURE(x) 0
# else
# define PIKA_HAS_FEATURE(x) __has_feature(x)
# endif

# if defined(PIKA_HAVE_SANITIZERS)
# if defined(__SANITIZE_ADDRESS__) || (defined(__has_feature) && __has_feature(address_sanitizer))
# if defined(__SANITIZE_ADDRESS__) || PIKA_HAS_FEATURE(address_sanitizer)
# define PIKA_HAVE_ADDRESS_SANITIZER
# if defined(PIKA_GCC_VERSION) || defined(PIKA_CLANG_VERSION)
# define PIKA_NO_SANITIZE_ADDRESS __attribute__((no_sanitize("address")))
# endif
# endif
# if defined(__SANITIZE_THREAD__) || (defined(__has_feature) && __has_feature(thread_sanitizer))
# if defined(__SANITIZE_THREAD__) || PIKA_HAS_FEATURE(thread_sanitizer)
# define PIKA_HAVE_THREAD_SANITIZER
# if defined(PIKA_GCC_VERSION) || defined(PIKA_CLANG_VERSION)
# define PIKA_NO_SANITIZE_THREAD __attribute__((no_sanitize("thread")))
Expand Down

0 comments on commit 6c9fdf1

Please sign in to comment.