Skip to content

Commit

Permalink
Started to work on #1668
Browse files Browse the repository at this point in the history
- added Proj parameter to for_each and for_each_n
- let for_each return iterator
- fly-by fix for distributed algorithms (don't forward arguments more than once)
  • Loading branch information
hkaiser committed Jul 18, 2015
1 parent b9412bb commit bccec8b
Show file tree
Hide file tree
Showing 13 changed files with 934 additions and 182 deletions.
1 change: 1 addition & 0 deletions hpx/parallel/algorithms/detail/predicates.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1) { namespace detail
return val;
}
};

///////////////////////////////////////////////////////////////////////////
template <typename ForwardIt>
ForwardIt next(ForwardIt it,
Expand Down
132 changes: 86 additions & 46 deletions hpx/parallel/algorithms/for_each.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2014 Hartmut Kaiser
// Copyright (c) 2007-2015 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
Expand All @@ -8,7 +8,7 @@
#if !defined(HPX_PARALLEL_DETAIL_FOR_EACH_MAY_29_2014_0932PM)
#define HPX_PARALLEL_DETAIL_FOR_EACH_MAY_29_2014_0932PM

#include <hpx/hpx_fwd.hpp>
#include <hpx/config.hpp>
#include <hpx/traits/segmented_iterator_traits.hpp>
#include <hpx/util/void_guard.hpp>
#include <hpx/util/move.hpp>
Expand All @@ -20,6 +20,7 @@
#include <hpx/parallel/util/detail/algorithm_result.hpp>
#include <hpx/parallel/util/foreach_partitioner.hpp>
#include <hpx/parallel/util/loop.hpp>
#include <hpx/parallel/util/projection_identity.hpp>

#include <algorithm>
#include <iterator>
Expand All @@ -44,32 +45,34 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
: for_each_n::algorithm("for_each_n")
{}

template <typename ExPolicy, typename F>
template <typename ExPolicy, typename F, typename Proj>
static Iter
sequential(ExPolicy, Iter first, std::size_t count, F && f)
sequential(ExPolicy, Iter first, std::size_t count, F && f,
Proj && proj)
{
return util::loop_n(first, count,
[f](Iter const& curr)
[&f, &proj](Iter const& curr)
{
f(*curr);
f(proj(*curr));
});
}

template <typename ExPolicy, typename F>
template <typename ExPolicy, typename F, typename Proj>
static typename util::detail::algorithm_result<ExPolicy, Iter>::type
parallel(ExPolicy policy, Iter first, std::size_t count,
F && f)
F && f, Proj && proj)
{
if (count != 0)
{
return util::foreach_n_partitioner<ExPolicy>::call(
policy, first, count,
[f](Iter part_begin, std::size_t part_size)
[f, proj](Iter part_begin, std::size_t part_size)
{
// VS2015 bails out when proj is captured by ref
util::loop_n(part_begin, part_size,
[&f](Iter const& curr)
[=, &f](Iter const& curr)
{
f(*curr);
f(proj(*curr));
});
});
}
Expand Down Expand Up @@ -111,6 +114,8 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
/// (deduced). Unlike its sequential form, the parallel
/// overload of \a for_each requires \a F to meet the
/// requirements of \a CopyConstructible.
/// \tparam Proj The type of an optional projection function. This
/// defaults to \a util::projection_identity
///
/// \param policy The execution policy to use for the scheduling of
/// the iterations.
Expand All @@ -130,6 +135,10 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
/// type \a Type must be such that an object of
/// type \a InIter can be dereferenced and then
/// implicitly converted to Type.
/// \param proj Specifies the function (or function object) which
/// will be invoked for each of the elements as a
/// projection operation before the actual predicate
/// \a is invoked.
///
/// The application of function objects in parallel algorithm
/// invoked with an execution policy object of type
Expand All @@ -151,12 +160,14 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
/// It returns \a first + \a count for non-negative values of
/// \a count and \a first for negative values.
///
template <typename ExPolicy, typename InIter, typename Size, typename F>
template <typename ExPolicy, typename InIter, typename Size, typename F,
typename Proj = util::projection_identity>
inline typename boost::enable_if<
is_execution_policy<ExPolicy>,
typename util::detail::algorithm_result<ExPolicy, InIter>::type
>::type
for_each_n(ExPolicy && policy, InIter first, Size count, F && f)
for_each_n(ExPolicy && policy, InIter first, Size count, F && f,
Proj && proj = Proj())
{
typedef typename std::iterator_traits<InIter>::iterator_category
iterator_category;
Expand All @@ -179,49 +190,64 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)

return detail::for_each_n<InIter>().call(
std::forward<ExPolicy>(policy), is_seq(),
first, std::size_t(count), std::forward<F>(f));
first, std::size_t(count), std::forward<F>(f),
std::forward<Proj>(proj));
}

///////////////////////////////////////////////////////////////////////////
// for_each
namespace detail
{
/// \cond NOINTERNAL
struct for_each : public detail::algorithm<for_each>
template <typename Iter>
struct for_each : public detail::algorithm<for_each<Iter>, Iter>
{
for_each()
: for_each::algorithm("for_each")
{}

template <typename ExPolicy, typename InIter, typename F,
typename Proj>
static Iter
sequential(ExPolicy, InIter first, InIter last, F && f,
Proj && proj)
{
return util::loop(first, last,
[&f, &proj](Iter const& curr)
{
f(proj(*curr));
});
}

template <typename ExPolicy, typename InIter, typename F>
static hpx::util::unused_type
sequential(ExPolicy, InIter first, InIter last, F && f)
static InIter
sequential(ExPolicy, InIter first, InIter last, F && f,
util::projection_identity)
{
std::for_each(first, last, std::forward<F>(f));
return hpx::util::unused;
return last;
}

template <typename ExPolicy, typename FwdIter, typename F>
static typename util::detail::algorithm_result<ExPolicy>::type
parallel(ExPolicy policy, FwdIter first, FwdIter last, F && f)
template <typename ExPolicy, typename InIter, typename F,
typename Proj>
static typename util::detail::algorithm_result<ExPolicy, InIter>::type
parallel(ExPolicy policy, InIter first, InIter last, F && f,
Proj && proj)
{
typedef
typename util::detail::algorithm_result<ExPolicy>::type
result_type;

return hpx::util::void_guard<result_type>(),
detail::for_each_n<FwdIter>().call(
policy, boost::mpl::false_(),
first, std::distance(first, last), std::forward<F>(f));
return detail::for_each_n<Iter>().call(
policy, boost::mpl::false_(),
first, std::distance(first, last), std::forward<F>(f),
std::forward<Proj>(proj));
}
};

///////////////////////////////////////////////////////////////////////
// non-segmented implementation
template <typename ExPolicy, typename InIter, typename F>
inline typename util::detail::algorithm_result<ExPolicy>::type
template <typename ExPolicy, typename InIter, typename F,
typename Proj>
inline typename util::detail::algorithm_result<ExPolicy, InIter>::type
for_each_(ExPolicy && policy, InIter first, InIter last, F && f,
std::false_type)
Proj && proj, std::false_type)
{
typedef typename std::iterator_traits<InIter>::iterator_category
iterator_category;
Expand All @@ -232,18 +258,22 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
>::type is_seq;

if (first == last)
return util::detail::algorithm_result<ExPolicy>::get();
{
typedef util::detail::algorithm_result<ExPolicy, InIter> result;
return result::get(std::move(last));
}

return for_each().call(
return for_each<InIter>().call(
std::forward<ExPolicy>(policy), is_seq(),
first, last, std::forward<F>(f));
first, last, std::forward<F>(f), std::forward<Proj>(proj));
}

// forward declare the segmented version of this algorithm
template <typename ExPolicy, typename SegIter, typename F>
inline typename util::detail::algorithm_result<ExPolicy>::type
template <typename ExPolicy, typename SegIter, typename F,
typename Proj>
inline typename util::detail::algorithm_result<ExPolicy, SegIter>::type
for_each_(ExPolicy && policy, SegIter first, SegIter last, F && f,
std::true_type);
Proj && proj, std::true_type);

/// \endcond
}
Expand Down Expand Up @@ -275,6 +305,8 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
/// (deduced). Unlike its sequential form, the parallel
/// overload of \a for_each requires \a F to meet the
/// requirements of \a CopyConstructible.
/// \tparam Proj The type of an optional projection function. This
/// defaults to \a util::projection_identity
///
/// \param policy The execution policy to use for the scheduling of
/// the iterations.
Expand All @@ -294,6 +326,10 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
/// type \a Type must be such that an object of
/// type \a InIter can be dereferenced and then
/// implicitly converted to Type.
/// \param proj Specifies the function (or function object) which
/// will be invoked for each of the elements as a
/// projection operation before the actual predicate
/// \a is invoked.
///
/// The application of function objects in parallel algorithm
/// invoked with an execution policy object of type
Expand All @@ -306,18 +342,22 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
/// permitted to execute in an unordered fashion in unspecified
/// threads, and indeterminately sequenced within each thread.
///
/// \returns The \a for_each algorithm returns a \a hpx::future<void> if the
/// execution policy is of type
/// \returns The \a for_each_n algorithm returns a
/// \a hpx::future<InIter> if the execution policy is of
/// type
/// \a sequential_task_execution_policy or
/// \a parallel_task_execution_policy and
/// returns \a void otherwise.
/// \a parallel_task_execution_policy and returns \a InIter
/// otherwise.
/// It returns \a last.
///
template <typename ExPolicy, typename InIter, typename F>
template <typename ExPolicy, typename InIter, typename F,
typename Proj = util::projection_identity>
inline typename boost::enable_if<
is_execution_policy<ExPolicy>,
typename util::detail::algorithm_result<ExPolicy, void>::type
typename util::detail::algorithm_result<ExPolicy, InIter>::type
>::type
for_each(ExPolicy && policy, InIter first, InIter last, F && f)
for_each(ExPolicy && policy, InIter first, InIter last, F && f,
Proj && proj = Proj())
{
typedef typename std::iterator_traits<InIter>::iterator_category
iterator_category;
Expand All @@ -331,7 +371,7 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)

return detail::for_each_(
std::forward<ExPolicy>(policy), first, last,
std::forward<F>(f), is_segmented());
std::forward<F>(f), std::forward<Proj>(proj), is_segmented());
}
}}}

Expand Down
28 changes: 14 additions & 14 deletions hpx/parallel/segmented_algorithms/copy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
{
local_output_iterator_type out = dispatch(
traits::get_id(sit),
std::forward<Algo>(algo), policy, true_(),
algo, policy, true_(),
beg, end, traits::local(dest));

dest = output_traits::compose(sdest, out);
Expand All @@ -87,8 +87,7 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
if (beg != end)
{
out = dispatch(traits::get_id(sit),
std::forward<Algo>(algo), policy, true_(),
beg, end, out);
algo, policy, true_(), beg, end, out);
}

// handle all of the full partitions
Expand All @@ -101,8 +100,7 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
if (beg != end)
{
out = dispatch(traits::get_id(sit),
std::forward<Algo>(algo), policy, true_(),
beg, end, out);
algo, policy, true_(), beg, end, out);
}
}

Expand All @@ -112,8 +110,7 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
if (beg != end)
{
out = dispatch(traits::get_id(sit),
std::forward<Algo>(algo), policy, true_(),
beg, end, traits::begin(sdest));
algo, policy, true_(), beg, end, traits::begin(sdest));
}

dest = output_traits::compose(sdest, out);
Expand Down Expand Up @@ -166,7 +163,7 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
if (beg != end)
{
segments.push_back(dispatch_async(traits::get_id(sit),
std::forward<Algo>(algo), policy, forced_seq(),
algo, policy, forced_seq(),
beg, end, traits::local(dest)));
}
}
Expand All @@ -179,8 +176,7 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
if (beg != end)
{
segments.push_back(dispatch_async(traits::get_id(sit),
std::forward<Algo>(algo), policy, forced_seq(),
beg, end, out));
algo, policy, forced_seq(), beg, end, out));
}

// handle all of the full partitions
Expand All @@ -193,8 +189,7 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
if (beg != end)
{
segments.push_back(dispatch_async(traits::get_id(sit),
std::forward<Algo>(algo), policy, forced_seq(),
beg, end, out));
algo, policy, forced_seq(), beg, end, out));
}
}

Expand All @@ -205,7 +200,7 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
if (beg != end)
{
segments.push_back(dispatch_async(traits::get_id(sit),
std::forward<Algo>(algo), policy, forced_seq(),
algo, policy, forced_seq(),
beg, end, traits::begin(sdest)));
}
}
Expand All @@ -215,9 +210,14 @@ namespace hpx { namespace parallel { HPX_INLINE_NAMESPACE(v1)
lcos::local::dataflow(
[=](std::vector<shared_future<local_output_iterator_type> > && r)
{
// handle any remote exceptions, will throw on error
std::list<boost::exception_ptr> errors;
parallel::util::detail::handle_remote_exceptions<
ExPolicy
>::call(r, errors);
return output_traits::compose(sdest, r.back().get());
},
segments));
std::move(segments)));
}

///////////////////////////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit bccec8b

Please sign in to comment.