Skip to content

Commit

Permalink
Merge pull request #1358 from rrahn/feature/range_interface_for_paral…
Browse files Browse the repository at this point in the history
…lel_execution_handler

[FEATURE] Forward call of indexed sequence pairs to alignment algorithm.
  • Loading branch information
smehringer authored Nov 12, 2019
2 parents cf417c8 + 25d9152 commit ca6dd02
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <type_traits>
#include <vector>

#include <seqan3/alignment/pairwise/detail/concept.hpp>
#include <seqan3/contrib/parallel/buffer_queue.hpp>
#include <seqan3/core/parallel/detail/reader_writer_manager.hpp>
#include <seqan3/core/platform.hpp>
Expand Down Expand Up @@ -127,6 +128,33 @@ class execution_handler_parallel
assert(status == contrib::queue_op_status::success);
}

/*!\brief Takes underlying range of sequence pairs and invokes an alignment on each instance.
* \tparam algorithm_t The type of the alignment algorithm.
* \tparam indexed_sequence_pairs_t The type of underlying sequence pairs annotated with an index;
* must model seqan3::detail::indexed_sequence_pair_range.
* \tparam delegate_type The type of the callable invoked on the std::invoke_result of `algorithm_t`.
*
* \param[in] algorithm The alignment algorithm to invoke.
* \param[in] indexed_sequence_pairs The range of underlying annotated sequence pairs to be aligned.
* \param[in] delegate A callable which will be invoked on each result of the computed alignments.
*/
template <typename algorithm_t, indexed_sequence_pair_range indexed_sequence_pairs_t, typename delegate_type>
void execute(algorithm_t && algorithm,
indexed_sequence_pairs_t indexed_sequence_pairs,
delegate_type && delegate)
{
assert(state != nullptr);

// Asynchronously pushes the alignment job as a task to the queue.
task_type task = [=, indexed_sequence_pairs = std::move(indexed_sequence_pairs)] ()
{
delegate(algorithm(std::move(indexed_sequence_pairs)));
};

[[maybe_unused]] contrib::queue_op_status status = state->queue.wait_push(std::move(task));
assert(status == contrib::queue_op_status::success);
}

//!\brief Waits until all submitted alignment jobs have been processed.
void wait()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <functional>

#include <seqan3/alignment/pairwise/detail/concept.hpp>
#include <seqan3/core/platform.hpp>
#include <seqan3/range/views/view_all.hpp>
#include <seqan3/std/concepts>
Expand Down Expand Up @@ -66,21 +67,19 @@ struct execution_handler_sequential
/*!\brief Takes underlying range of sequence pairs and invokes an alignment on each instance.
* \tparam algorithm_t The type of the alignment algorithm.
* \tparam indexed_sequence_pairs_t The type of underlying sequence pairs annotated with an index;
* must model std::ranges::forward_range.
* must model seqan3::detail::indexed_sequence_pair_range.
* \tparam delegate_type The type of the callable invoked on the std::invoke_result of `algorithm_t`.
*
* \param[in] algorithm The alignment algorithm to invoke.
* \param[in] indexed_sequence_pairs The range of underlying annotated sequence pairs to be aligned.
* \param[in] delegate A callable which will be invoked on each result of the computed alignments.
*/
template <typename algorithm_t, std::ranges::forward_range indexed_sequence_pairs_t, typename delegate_type>
template <typename algorithm_t, indexed_sequence_pair_range indexed_sequence_pairs_t, typename delegate_type>
void execute(algorithm_t && algorithm,
indexed_sequence_pairs_t indexed_sequence_pairs,
indexed_sequence_pairs_t && indexed_sequence_pairs,
delegate_type && delegate)
{
using std::get;
for (auto && [sequence_pair, idx] : indexed_sequence_pairs)
execute(std::forward<algorithm_t>(algorithm), idx, get<0>(sequence_pair), get<1>(sequence_pair), delegate);
delegate(algorithm(std::forward<indexed_sequence_pairs_t>(indexed_sequence_pairs)));
}

//!\brief Waits for the submitted alignments jobs to finish. (Noop).
Expand Down
146 changes: 96 additions & 50 deletions test/unit/alignment/pairwise/execution/execution_handler_template.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,93 +12,139 @@

#include <seqan3/alphabet/nucleotide/dna4.hpp>
#include <seqan3/range/views/view_all.hpp>
#include <seqan3/range/views/zip.hpp>
#include <seqan3/test/performance/sequence_generator.hpp>
#include <seqan3/std/iterator>
#include <seqan3/std/algorithm>
#include <seqan3/std/ranges>

using namespace seqan3;

template <typename T>
class execution_handler : public ::testing::Test
{};

TYPED_TEST_CASE_P(execution_handler);

TYPED_TEST_P(execution_handler, execute_w_lvalue)
struct execution_handler : public ::testing::Test
{
constexpr size_t SIZE = 10000;
std::vector<std::pair<size_t, size_t>> buffer;
buffer.resize(SIZE);
static constexpr size_t total_size = 10000;

TypeParam exec_handler{};
void SetUp()
{
for (unsigned i = 0; i < total_size; ++i)
{
sequence_collection1.push_back(test::generate_sequence<dna4>(100, 20, i));
sequence_collection2.push_back(test::generate_sequence<dna4>(100, 20, i + total_size));
}
}

auto callable = [](size_t const idx, auto && rng1, auto && rng2)
template <typename buffer_t>
void check_result(buffer_t const & buffer) const
{
return std::pair{idx, rng1.size() + rng2.size()};
};
for (unsigned i = 0; i < total_size; ++i)
{
EXPECT_EQ(buffer[i].first, i) << "Position: " << i;
EXPECT_EQ(buffer[i].second,
sequence_collection1[i].size() + sequence_collection2[i].size()) << "Position: " << i;
}
}

std::vector<dna4_vector> set1;
std::vector<dna4_vector> set2;
std::vector<dna4_vector> sequence_collection1{};
std::vector<dna4_vector> sequence_collection2{};
};

auto simulate_alignment = [](size_t const idx, auto && rng1, auto && rng2)
{
return std::pair{idx, rng1.size() + rng2.size()};
};

for (unsigned i = 0; i < SIZE; ++i)
auto simulate_alignment_with_range = [] (auto indexed_sequence_pairs)
{
std::vector<std::pair<size_t, size_t>> results{};
for (auto && [sequence_pair, idx] : indexed_sequence_pairs)
{
set1.push_back(test::generate_sequence<dna4>(100, 20, i));
set2.push_back(test::generate_sequence<dna4>(100, 20, i + SIZE));
results.emplace_back(idx, std::get<0>(sequence_pair).size() + std::get<1>(sequence_pair).size());
}

size_t pos = 0;
return results;
};

TYPED_TEST_CASE_P(execution_handler);

TYPED_TEST_P(execution_handler, execute_with_lvalue)
{
std::vector<std::pair<size_t, size_t>> buffer;
buffer.resize(this->total_size);

for (unsigned i = 0; i < SIZE; ++i, ++pos)
TypeParam exec_handler{};

size_t pos = 0;
for (unsigned i = 0; i < this->total_size; ++i, ++pos)
{
auto v1 = set1[i] | views::all;
auto v2 = set2[i] | views::all;
exec_handler.execute(callable, i, v1, v2, [pos, &buffer] (auto && res) { buffer[pos] = std::move(res); });
auto seq_collection1_as_view = this->sequence_collection1[i] | views::all;
auto seq_collection2_as_view = this->sequence_collection2[i] | views::all;
exec_handler.execute(simulate_alignment,
i,
seq_collection1_as_view,
seq_collection2_as_view,
[pos, &buffer] (auto && res) { buffer[pos] = std::move(res); });
}

exec_handler.wait();

for (unsigned i = 0; i < SIZE; ++i)
{
EXPECT_EQ(buffer[i].first, i);
EXPECT_EQ(buffer[i].second, set1[i].size() + set2[i].size());
}
this->check_result(buffer);
}

TYPED_TEST_P(execution_handler, execute_w_rvalue)
TYPED_TEST_P(execution_handler, execute_with_rvalue)
{
constexpr size_t SIZE = 10000;
std::vector<std::pair<size_t, size_t>> buffer;
buffer.resize(SIZE);
buffer.resize(this->total_size);

TypeParam exec_handler{};

auto callable = [](size_t const idx, auto && rng1, auto && rng2)
{
return std::pair{idx, rng1.size() + rng2.size()};
};

std::vector<dna4_vector> set1;
std::vector<dna4_vector> set2;
size_t pos = 0;

for (unsigned i = 0; i < SIZE; ++i)
for (unsigned i = 0; i < this->total_size; ++i, ++pos)
{
set1.push_back(test::generate_sequence<dna4>(100, 20, i));
set2.push_back(test::generate_sequence<dna4>(100, 20, i + SIZE));
exec_handler.execute(simulate_alignment,
i,
this->sequence_collection1[i] | views::all,
this->sequence_collection2[i] | views::all,
[&buffer, pos] (auto && res) { buffer[pos] = std::move(res); });
}

exec_handler.wait();

this->check_result(buffer);
}

TYPED_TEST_P(execution_handler, execute_as_indexed_sequence_pairs)
{
std::vector<std::pair<size_t, size_t>> buffer;
buffer.resize(this->total_size);

TypeParam exec_handler{};

size_t pos = 0;
size_t chunk_size = 4; // total_size is a multiple of chunk size.

for (unsigned i = 0; i < SIZE; ++i, ++pos)
auto indexed_sequence_pairs = views::zip(views::zip(this->sequence_collection1, this->sequence_collection2),
std::views::iota(0));
using range_iterator_t = std::ranges::iterator_t<decltype(indexed_sequence_pairs)>;

for (range_iterator_t it = indexed_sequence_pairs.begin();
it != indexed_sequence_pairs.end();
it += chunk_size, pos += chunk_size)
{
exec_handler.execute(callable, i, set1[i] | views::all, set2[i] | views::all,
[&buffer, pos] (auto && res) { buffer[pos] = std::move(res); });
std::ranges::subrange<range_iterator_t, range_iterator_t> chunk{it, std::next(it, chunk_size)};
exec_handler.execute(simulate_alignment_with_range, chunk, [=, &buffer] (auto res_range)
{
std::ranges::move(res_range, buffer.begin() + pos);
});
}

exec_handler.wait();

for (unsigned i = 0; i < SIZE; ++i)
{
EXPECT_EQ(buffer[i].first, i);
EXPECT_EQ(buffer[i].second, set1[i].size() + set2[i].size());
}
this->check_result(buffer);
}

REGISTER_TYPED_TEST_CASE_P(execution_handler, execute_w_lvalue, execute_w_rvalue);
REGISTER_TYPED_TEST_CASE_P(execution_handler,
execute_with_lvalue,
execute_with_rvalue,
execute_as_indexed_sequence_pairs);

0 comments on commit ca6dd02

Please sign in to comment.