Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Forward call of indexed sequence pairs to alignment algorithm. #1358

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <type_traits>
#include <vector>

#include <seqan3/alignment/pairwise/detail/concept.hpp>
#include <seqan3/contrib/parallel/buffer_queue.hpp>
#include <seqan3/core/parallel/detail/reader_writer_manager.hpp>
#include <seqan3/core/platform.hpp>
Expand Down Expand Up @@ -127,6 +128,33 @@ class execution_handler_parallel
assert(status == contrib::queue_op_status::success);
}

/*!\brief Takes underlying range of sequence pairs and invokes an alignment on each instance.
* \tparam algorithm_t The type of the alignment algorithm.
* \tparam indexed_sequence_pairs_t The type of underlying sequence pairs annotated with an index;
* must model seqan3::detail::indexed_sequence_pair_range.
* \tparam delegate_type The type of the callable invoked on the std::invoke_result of `algorithm_t`.
*
* \param[in] algorithm The alignment algorithm to invoke.
* \param[in] indexed_sequence_pairs The range of underlying annotated sequence pairs to be aligned.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You say range here but copy. Do you assume that this copy is cheap? Is it a view?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so it expects always a range.
It never uses the range as a lvalue reference because it executes the alignments asynchronously and want's to make sure that another thread is not modifying this reference. And in the implementation we actually pass in a rvalue view so that nothing is expensive in this context.

Copy link
Member

@smehringer smehringer Nov 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. If you always pass a view in our context we might want to constrain this?
But no strong feelings

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the interface does not care really. It just takes a copy of whatever comes in. Just we use it by passing in a view which is lightweight. The only constraint I might add here is that the type is move constructible since afterwards we move it around.

* \param[in] delegate A callable which will be invoked on each result of the computed alignments.
*/
template <typename algorithm_t, indexed_sequence_pair_range indexed_sequence_pairs_t, typename delegate_type>
void execute(algorithm_t && algorithm,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this always a specialisation of seqan3::alignment_algorithm? Or is this a policy?
Sorry for always picking on your types, but I always stumble about this when trying to understand what's happening.

I also do not understand what a delegate is (for).. 🙈

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the algorithm that is supposed to be called by the execution handler. That can be basically anything that is invocable, so it was called fn_t for function before. Since it is only used in the context of executing alignment algorithms it is now refined to be a more specific in the name.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then maybe add the comment, that it must model callable ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And what is the delegate thing ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then maybe add the comment, that it must model callable ?

I will have another invocable concept for this after all the open PRs with these changes are mereged. Then I can make sure that this works. Is that ok?

Ah and a delegate is another callable that is called on the result of the alignment algorithm. The term is a standard term for something that should be called on the result of something without jumping out of the process (as opposed to coroutines). https://stackoverflow.com/questions/9568150/what-is-a-c-delegate

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will have another invocable concept for this after all the open PRs with these changes are mereged. Then I can make sure that this works. Is that ok?

Sure.

Ah and a delegate is another callable that is called on the result of the alignment algorithm. The term is a standard term for something that should be called on the result of something without jumping out of the process (as opposed to coroutines). https://stackoverflow.com/questions/9568150/what-is-a-c-delegate

Ok, and what is the purpose of this here/what is the use case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have two interfaces for dealing with alignment results. The first one results a alignment result range and the second calls a user defined delegate on the result. Accordingly, there are two executors. The two way which we already know and the one way which will not implement the buffering. So both of them reuse the execution handlers which just get a delegate to call the alignment result on. The two way just passes a lambda that moves the result into the correct buffer location.

indexed_sequence_pairs_t indexed_sequence_pairs,
delegate_type && delegate)
{
assert(state != nullptr);

// Asynchronously pushes the alignment job as a task to the queue.
task_type task = [=, indexed_sequence_pairs = std::move(indexed_sequence_pairs)] ()
{
delegate(algorithm(std::move(indexed_sequence_pairs)));
};

[[maybe_unused]] contrib::queue_op_status status = state->queue.wait_push(std::move(task));
assert(status == contrib::queue_op_status::success);
}

//!\brief Waits until all submitted alignment jobs have been processed.
void wait()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <functional>

#include <seqan3/alignment/pairwise/detail/concept.hpp>
#include <seqan3/core/platform.hpp>
#include <seqan3/range/views/view_all.hpp>
#include <seqan3/std/concepts>
Expand Down Expand Up @@ -66,21 +67,19 @@ struct execution_handler_sequential
/*!\brief Takes underlying range of sequence pairs and invokes an alignment on each instance.
* \tparam algorithm_t The type of the alignment algorithm.
* \tparam indexed_sequence_pairs_t The type of underlying sequence pairs annotated with an index;
* must model std::ranges::forward_range.
* must model seqan3::detail::indexed_sequence_pair_range.
* \tparam delegate_type The type of the callable invoked on the std::invoke_result of `algorithm_t`.
*
* \param[in] algorithm The alignment algorithm to invoke.
* \param[in] indexed_sequence_pairs The range of underlying annotated sequence pairs to be aligned.
* \param[in] delegate A callable which will be invoked on each result of the computed alignments.
*/
template <typename algorithm_t, std::ranges::forward_range indexed_sequence_pairs_t, typename delegate_type>
template <typename algorithm_t, indexed_sequence_pair_range indexed_sequence_pairs_t, typename delegate_type>
void execute(algorithm_t && algorithm,
indexed_sequence_pairs_t indexed_sequence_pairs,
indexed_sequence_pairs_t && indexed_sequence_pairs,
delegate_type && delegate)
{
using std::get;
for (auto && [sequence_pair, idx] : indexed_sequence_pairs)
execute(std::forward<algorithm_t>(algorithm), idx, get<0>(sequence_pair), get<1>(sequence_pair), delegate);
delegate(algorithm(std::forward<indexed_sequence_pairs_t>(indexed_sequence_pairs)));
}

//!\brief Waits for the submitted alignments jobs to finish. (Noop).
Expand Down
146 changes: 96 additions & 50 deletions test/unit/alignment/pairwise/execution/execution_handler_template.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,93 +12,139 @@

#include <seqan3/alphabet/nucleotide/dna4.hpp>
#include <seqan3/range/views/view_all.hpp>
#include <seqan3/range/views/zip.hpp>
#include <seqan3/test/performance/sequence_generator.hpp>
#include <seqan3/std/iterator>
#include <seqan3/std/algorithm>
#include <seqan3/std/ranges>

using namespace seqan3;

template <typename T>
class execution_handler : public ::testing::Test
{};

TYPED_TEST_CASE_P(execution_handler);

TYPED_TEST_P(execution_handler, execute_w_lvalue)
struct execution_handler : public ::testing::Test
{
constexpr size_t SIZE = 10000;
std::vector<std::pair<size_t, size_t>> buffer;
buffer.resize(SIZE);
static constexpr size_t total_size = 10000;

TypeParam exec_handler{};
void SetUp()
{
for (unsigned i = 0; i < total_size; ++i)
{
sequence_collection1.push_back(test::generate_sequence<dna4>(100, 20, i));
sequence_collection2.push_back(test::generate_sequence<dna4>(100, 20, i + total_size));
}
}

auto callable = [](size_t const idx, auto && rng1, auto && rng2)
template <typename buffer_t>
void check_result(buffer_t const & buffer) const
{
return std::pair{idx, rng1.size() + rng2.size()};
};
for (unsigned i = 0; i < total_size; ++i)
{
EXPECT_EQ(buffer[i].first, i) << "Position: " << i;
EXPECT_EQ(buffer[i].second,
sequence_collection1[i].size() + sequence_collection2[i].size()) << "Position: " << i;
}
}

std::vector<dna4_vector> set1;
std::vector<dna4_vector> set2;
std::vector<dna4_vector> sequence_collection1{};
std::vector<dna4_vector> sequence_collection2{};
};

auto simulate_alignment = [](size_t const idx, auto && rng1, auto && rng2)
{
return std::pair{idx, rng1.size() + rng2.size()};
};

for (unsigned i = 0; i < SIZE; ++i)
auto simulate_alignment_with_range = [] (auto indexed_sequence_pairs)
{
std::vector<std::pair<size_t, size_t>> results{};
for (auto && [sequence_pair, idx] : indexed_sequence_pairs)
{
set1.push_back(test::generate_sequence<dna4>(100, 20, i));
set2.push_back(test::generate_sequence<dna4>(100, 20, i + SIZE));
results.emplace_back(idx, std::get<0>(sequence_pair).size() + std::get<1>(sequence_pair).size());
}

size_t pos = 0;
return results;
};

TYPED_TEST_CASE_P(execution_handler);

TYPED_TEST_P(execution_handler, execute_with_lvalue)
{
std::vector<std::pair<size_t, size_t>> buffer;
buffer.resize(this->total_size);

for (unsigned i = 0; i < SIZE; ++i, ++pos)
TypeParam exec_handler{};

size_t pos = 0;
for (unsigned i = 0; i < this->total_size; ++i, ++pos)
{
auto v1 = set1[i] | views::all;
auto v2 = set2[i] | views::all;
exec_handler.execute(callable, i, v1, v2, [pos, &buffer] (auto && res) { buffer[pos] = std::move(res); });
auto seq_collection1_as_view = this->sequence_collection1[i] | views::all;
auto seq_collection2_as_view = this->sequence_collection2[i] | views::all;
exec_handler.execute(simulate_alignment,
i,
seq_collection1_as_view,
seq_collection2_as_view,
[pos, &buffer] (auto && res) { buffer[pos] = std::move(res); });
}

exec_handler.wait();

for (unsigned i = 0; i < SIZE; ++i)
{
EXPECT_EQ(buffer[i].first, i);
EXPECT_EQ(buffer[i].second, set1[i].size() + set2[i].size());
}
this->check_result(buffer);
}

TYPED_TEST_P(execution_handler, execute_w_rvalue)
TYPED_TEST_P(execution_handler, execute_with_rvalue)
{
constexpr size_t SIZE = 10000;
std::vector<std::pair<size_t, size_t>> buffer;
buffer.resize(SIZE);
buffer.resize(this->total_size);

TypeParam exec_handler{};

auto callable = [](size_t const idx, auto && rng1, auto && rng2)
{
return std::pair{idx, rng1.size() + rng2.size()};
};

std::vector<dna4_vector> set1;
std::vector<dna4_vector> set2;
size_t pos = 0;

for (unsigned i = 0; i < SIZE; ++i)
for (unsigned i = 0; i < this->total_size; ++i, ++pos)
{
set1.push_back(test::generate_sequence<dna4>(100, 20, i));
set2.push_back(test::generate_sequence<dna4>(100, 20, i + SIZE));
exec_handler.execute(simulate_alignment,
i,
this->sequence_collection1[i] | views::all,
this->sequence_collection2[i] | views::all,
[&buffer, pos] (auto && res) { buffer[pos] = std::move(res); });
}

exec_handler.wait();

this->check_result(buffer);
}

TYPED_TEST_P(execution_handler, execute_as_indexed_sequence_pairs)
{
std::vector<std::pair<size_t, size_t>> buffer;
buffer.resize(this->total_size);

TypeParam exec_handler{};

size_t pos = 0;
size_t chunk_size = 4; // total_size is a multiple of chunk size.

for (unsigned i = 0; i < SIZE; ++i, ++pos)
auto indexed_sequence_pairs = views::zip(views::zip(this->sequence_collection1, this->sequence_collection2),
std::views::iota(0));
using range_iterator_t = std::ranges::iterator_t<decltype(indexed_sequence_pairs)>;

for (range_iterator_t it = indexed_sequence_pairs.begin();
it != indexed_sequence_pairs.end();
it += chunk_size, pos += chunk_size)
{
exec_handler.execute(callable, i, set1[i] | views::all, set2[i] | views::all,
[&buffer, pos] (auto && res) { buffer[pos] = std::move(res); });
std::ranges::subrange<range_iterator_t, range_iterator_t> chunk{it, std::next(it, chunk_size)};
exec_handler.execute(simulate_alignment_with_range, chunk, [=, &buffer] (auto res_range)
{
std::ranges::move(res_range, buffer.begin() + pos);
});
}

exec_handler.wait();

for (unsigned i = 0; i < SIZE; ++i)
{
EXPECT_EQ(buffer[i].first, i);
EXPECT_EQ(buffer[i].second, set1[i].size() + set2[i].size());
}
this->check_result(buffer);
}

REGISTER_TYPED_TEST_CASE_P(execution_handler, execute_w_lvalue, execute_w_rvalue);
REGISTER_TYPED_TEST_CASE_P(execution_handler,
execute_with_lvalue,
execute_with_rvalue,
execute_as_indexed_sequence_pairs);