Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use .element() instead of .data() for window range calculations #13095

Merged
Merged
168 changes: 122 additions & 46 deletions cpp/src/rolling/grouped_rolling.cu
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <cudf/detail/iterator.cuh>
#include <cudf/detail/rolling.hpp>
#include <cudf/detail/utilities/assert.cuh>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/rolling/range_window_bounds.hpp>
#include <cudf/types.hpp>
Expand Down Expand Up @@ -269,6 +270,69 @@ __device__ T subtract_safe(T const& value, T const& delta)
: cuda::std::numeric_limits<T>::min();
}

/**
* @brief For a specified idx, find the lowest value of the (sorted) orderby column that
* participates in a range-window query.
*/
template <typename ElementT, typename ElementIter>
__device__ ElementT compute_lowest_in_window(ElementIter orderby_iter,
size_type idx,
ElementT delta)
{
return subtract_safe(orderby_iter[idx], delta);
}

/**
* @brief For a specified idx, find the highest value of the (sorted) orderby column that
* participates in a range-window query.
*/
template <typename ElementT, typename ElementIter>
__device__ ElementT compute_highest_in_window(ElementIter orderby_iter,
size_type idx,
ElementT delta)
{
return add_safe(orderby_iter[idx], delta);
}

/**
* Accessor for values in an order-by column, on the device.
*/
template <typename T>
struct device_value_accessor {
column_device_view const col; ///< column view of column in device

/**
* @brief constructor
*
* @param[in] _col column device view of cudf column
*/
__device__ device_value_accessor(column_device_view const& _col) : col{_col}
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
{
cudf_assert(type_id_matches_device_storage_type<T>(col.type().id()) &&
"the data type mismatch");
}

/**
* @brief Returns the value of element at index `i`
* @param[in] i index of element
* @return value of element at index `i`
*/
__device__ T operator()(cudf::size_type i) const { return col.element<T>(i); }
};

template <typename T>
using const_device_iterator =
thrust::transform_iterator<device_value_accessor<T>, thrust::counting_iterator<size_type>>;

/// This is a stand-in for the `cudf::column_device_view::begin<T>()`, which is `__host__` only.
/// For range window functions, one might need to iterate over the `oby` column, per row.
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
template <typename T, CUDF_ENABLE_IF(cudf::column_device_view::has_element_accessor<T>())>
[[nodiscard]] __device__ const_device_iterator<T> begin(cudf::column_device_view const& col)
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
{
return const_device_iterator<T>{thrust::make_counting_iterator<cudf::size_type>(0),
device_value_accessor<T>{col}};
}

/// Given a single, ungrouped order-by column, return the indices corresponding
/// to the first null element, and (one past) the last null timestamp.
/// The input column is sorted, with all null values clustered either
Expand Down Expand Up @@ -324,11 +388,12 @@ std::unique_ptr<column> range_window_ASC(column_view const& input,
rmm::mr::device_memory_resource* mr)
{
auto [h_nulls_begin_idx, h_nulls_end_idx] = get_null_bounds_for_orderby_column(orderby_column);
auto const p_orderby_device_view = cudf::column_device_view::create(orderby_column, stream);

auto preceding_calculator =
[nulls_begin_idx = h_nulls_begin_idx,
nulls_end_idx = h_nulls_end_idx,
d_orderby = orderby_column.data<T>(),
[nulls_begin_idx = h_nulls_begin_idx,
nulls_end_idx = h_nulls_end_idx,
orderby_device_view = *p_orderby_device_view,
preceding_window,
preceding_window_is_unbounded] __device__(size_type idx) -> size_type {
if (preceding_window_is_unbounded) {
Expand All @@ -342,13 +407,14 @@ std::unique_ptr<column> range_window_ASC(column_view const& input,
return idx - nulls_begin_idx + 1;
}

auto const d_orderby = begin<T>(orderby_device_view);
// orderby[idx] not null. Binary search the group, excluding null group.
// If nulls_begin_idx == 0, either
// 1. NULLS FIRST ordering: Binary search starts where nulls_end_idx.
// 2. NO NULLS: Binary search starts at 0 (also nulls_end_idx).
// Otherwise, NULLS LAST ordering. Start at 0.
auto group_start = nulls_begin_idx == 0 ? nulls_end_idx : 0;
auto lowest_in_window = subtract_safe(d_orderby[idx], preceding_window);
auto lowest_in_window = compute_lowest_in_window(d_orderby, idx, preceding_window);

return ((d_orderby + idx) - thrust::lower_bound(thrust::seq,
d_orderby + group_start,
Expand All @@ -360,10 +426,10 @@ std::unique_ptr<column> range_window_ASC(column_view const& input,
auto preceding_column = expand_to_column(preceding_calculator, input.size(), stream);

auto following_calculator =
[nulls_begin_idx = h_nulls_begin_idx,
nulls_end_idx = h_nulls_end_idx,
num_rows = input.size(),
d_orderby = orderby_column.data<T>(),
[nulls_begin_idx = h_nulls_begin_idx,
nulls_end_idx = h_nulls_end_idx,
num_rows = input.size(),
orderby_device_view = *p_orderby_device_view,
following_window,
following_window_is_unbounded] __device__(size_type idx) -> size_type {
if (following_window_is_unbounded) { return num_rows - idx - 1; }
Expand All @@ -373,14 +439,15 @@ std::unique_ptr<column> range_window_ASC(column_view const& input,
return nulls_end_idx - idx - 1;
}

auto const d_orderby = begin<T>(orderby_device_view);
// orderby[idx] not null. Binary search the group, excluding null group.
// If nulls_begin_idx == 0, either
// 1. NULLS FIRST ordering: Binary search ends at num_rows.
// 2. NO NULLS: Binary search also ends at num_rows.
// Otherwise, NULLS LAST ordering. End at nulls_begin_idx.

auto group_end = nulls_begin_idx == 0 ? num_rows : nulls_begin_idx;
auto highest_in_window = add_safe(d_orderby[idx], following_window);
auto highest_in_window = compute_highest_in_window(d_orderby, idx, following_window);

return (thrust::upper_bound(
thrust::seq, d_orderby + idx, d_orderby + group_end, highest_in_window) -
Expand Down Expand Up @@ -491,13 +558,14 @@ std::unique_ptr<column> range_window_ASC(column_view const& input,
{
auto [null_start, null_end] =
get_null_bounds_for_orderby_column(orderby_column, group_offsets, stream);
auto const p_orderby_device_view = cudf::column_device_view::create(orderby_column, stream);

auto preceding_calculator =
[d_group_offsets = group_offsets.data(),
d_group_labels = group_labels.data(),
d_orderby = orderby_column.data<T>(),
d_nulls_begin = null_start.data(),
d_nulls_end = null_end.data(),
[d_group_offsets = group_offsets.data(),
d_group_labels = group_labels.data(),
orderby_device_view = *p_orderby_device_view,
d_nulls_begin = null_start.data(),
d_nulls_end = null_end.data(),
preceding_window,
preceding_window_is_unbounded] __device__(size_type idx) -> size_type {
auto group_label = d_group_labels[idx];
Expand All @@ -514,14 +582,16 @@ std::unique_ptr<column> range_window_ASC(column_view const& input,
return idx - nulls_begin + 1;
}

auto const d_orderby = begin<T>(orderby_device_view);

// orderby[idx] not null. Search must exclude the null group.
// If nulls_begin == group_start, either of the following is true:
// 1. NULLS FIRST ordering: Search must begin at nulls_end.
// 2. NO NULLS: Search must begin at group_start (which also equals nulls_end.)
// Otherwise, NULLS LAST ordering. Search must start at nulls group_start.
auto search_start = nulls_begin == group_start ? nulls_end : group_start;

auto lowest_in_window = subtract_safe(d_orderby[idx], preceding_window);
auto lowest_in_window = compute_lowest_in_window(d_orderby, idx, preceding_window);

return ((d_orderby + idx) - thrust::lower_bound(thrust::seq,
d_orderby + search_start,
Expand All @@ -533,11 +603,11 @@ std::unique_ptr<column> range_window_ASC(column_view const& input,
auto preceding_column = expand_to_column(preceding_calculator, input.size(), stream);

auto following_calculator =
[d_group_offsets = group_offsets.data(),
d_group_labels = group_labels.data(),
d_orderby = orderby_column.data<T>(),
d_nulls_begin = null_start.data(),
d_nulls_end = null_end.data(),
[d_group_offsets = group_offsets.data(),
d_group_labels = group_labels.data(),
orderby_device_view = *p_orderby_device_view,
d_nulls_begin = null_start.data(),
d_nulls_end = null_end.data(),
following_window,
following_window_is_unbounded] __device__(size_type idx) -> size_type {
auto group_label = d_group_labels[idx];
Expand All @@ -556,14 +626,16 @@ std::unique_ptr<column> range_window_ASC(column_view const& input,
return nulls_end - idx - 1;
}

auto const d_orderby = begin<T>(orderby_device_view);

// orderby[idx] not null. Search must exclude the null group.
// If nulls_begin == group_start, either of the following is true:
// 1. NULLS FIRST ordering: Search ends at group_end.
// 2. NO NULLS: Search ends at group_end.
// Otherwise, NULLS LAST ordering. Search ends at nulls_begin.
auto search_end = nulls_begin == group_start ? group_end : nulls_begin;

auto highest_in_window = add_safe(d_orderby[idx], following_window);
auto highest_in_window = compute_highest_in_window(d_orderby, idx, following_window);

return (thrust::upper_bound(
thrust::seq, d_orderby + idx, d_orderby + search_end, highest_in_window) -
Expand Down Expand Up @@ -594,11 +666,12 @@ std::unique_ptr<column> range_window_DESC(column_view const& input,
rmm::mr::device_memory_resource* mr)
{
auto [h_nulls_begin_idx, h_nulls_end_idx] = get_null_bounds_for_orderby_column(orderby_column);
auto const p_orderby_device_view = cudf::column_device_view::create(orderby_column, stream);

auto preceding_calculator =
[nulls_begin_idx = h_nulls_begin_idx,
nulls_end_idx = h_nulls_end_idx,
d_orderby = orderby_column.data<T>(),
[nulls_begin_idx = h_nulls_begin_idx,
nulls_end_idx = h_nulls_end_idx,
orderby_device_view = *p_orderby_device_view,
preceding_window,
preceding_window_is_unbounded] __device__(size_type idx) -> size_type {
if (preceding_window_is_unbounded) {
Expand All @@ -612,13 +685,14 @@ std::unique_ptr<column> range_window_DESC(column_view const& input,
return idx - nulls_begin_idx + 1;
}

auto const d_orderby = begin<T>(orderby_device_view);
// orderby[idx] not null. Binary search the group, excluding null group.
// If nulls_begin_idx == 0, either
// 1. NULLS FIRST ordering: Binary search starts where nulls_end_idx.
// 2. NO NULLS: Binary search starts at 0 (also nulls_end_idx).
// Otherwise, NULLS LAST ordering. Start at 0.
auto group_start = nulls_begin_idx == 0 ? nulls_end_idx : 0;
auto highest_in_window = add_safe(d_orderby[idx], preceding_window);
auto highest_in_window = compute_highest_in_window(d_orderby, idx, preceding_window);

return ((d_orderby + idx) -
thrust::lower_bound(thrust::seq,
Expand All @@ -632,10 +706,10 @@ std::unique_ptr<column> range_window_DESC(column_view const& input,
auto preceding_column = expand_to_column(preceding_calculator, input.size(), stream);

auto following_calculator =
[nulls_begin_idx = h_nulls_begin_idx,
nulls_end_idx = h_nulls_end_idx,
num_rows = input.size(),
d_orderby = orderby_column.data<T>(),
[nulls_begin_idx = h_nulls_begin_idx,
nulls_end_idx = h_nulls_end_idx,
num_rows = input.size(),
orderby_device_view = *p_orderby_device_view,
following_window,
following_window_is_unbounded] __device__(size_type idx) -> size_type {
if (following_window_is_unbounded) { return (num_rows - idx) - 1; }
Expand All @@ -645,14 +719,15 @@ std::unique_ptr<column> range_window_DESC(column_view const& input,
return nulls_end_idx - idx - 1;
}

auto const d_orderby = begin<T>(orderby_device_view);
// orderby[idx] not null. Search must exclude null group.
// If nulls_begin_idx = 0, either
// 1. NULLS FIRST ordering: Search ends at num_rows.
// 2. NO NULLS: Search also ends at num_rows.
// Otherwise, NULLS LAST ordering: End at nulls_begin_idx.

auto group_end = nulls_begin_idx == 0 ? num_rows : nulls_begin_idx;
auto lowest_in_window = subtract_safe(d_orderby[idx], following_window);
auto lowest_in_window = compute_lowest_in_window(d_orderby, idx, following_window);

return (thrust::upper_bound(thrust::seq,
d_orderby + idx,
Expand Down Expand Up @@ -686,13 +761,14 @@ std::unique_ptr<column> range_window_DESC(column_view const& input,
{
auto [null_start, null_end] =
get_null_bounds_for_orderby_column(orderby_column, group_offsets, stream);
auto const p_orderby_device_view = cudf::column_device_view::create(orderby_column, stream);

auto preceding_calculator =
[d_group_offsets = group_offsets.data(),
d_group_labels = group_labels.data(),
d_orderby = orderby_column.data<T>(),
d_nulls_begin = null_start.data(),
d_nulls_end = null_end.data(),
[d_group_offsets = group_offsets.data(),
d_group_labels = group_labels.data(),
orderby_device_view = *p_orderby_device_view,
d_nulls_begin = null_start.data(),
d_nulls_end = null_end.data(),
preceding_window,
preceding_window_is_unbounded] __device__(size_type idx) -> size_type {
auto group_label = d_group_labels[idx];
Expand All @@ -709,14 +785,14 @@ std::unique_ptr<column> range_window_DESC(column_view const& input,
return idx - nulls_begin + 1;
}

auto const d_orderby = begin<T>(orderby_device_view);
// orderby[idx] not null. Search must exclude the null group.
// If nulls_begin == group_start, either of the following is true:
// 1. NULLS FIRST ordering: Search must begin at nulls_end.
// 2. NO NULLS: Search must begin at group_start (which also equals nulls_end.)
// Otherwise, NULLS LAST ordering. Search must start at nulls group_start.
auto search_start = nulls_begin == group_start ? nulls_end : group_start;

auto highest_in_window = add_safe(d_orderby[idx], preceding_window);
auto search_start = nulls_begin == group_start ? nulls_end : group_start;
auto highest_in_window = compute_highest_in_window(d_orderby, idx, preceding_window);

return ((d_orderby + idx) -
thrust::lower_bound(thrust::seq,
Expand All @@ -730,11 +806,11 @@ std::unique_ptr<column> range_window_DESC(column_view const& input,
auto preceding_column = expand_to_column(preceding_calculator, input.size(), stream);

auto following_calculator =
[d_group_offsets = group_offsets.data(),
d_group_labels = group_labels.data(),
d_orderby = orderby_column.data<T>(),
d_nulls_begin = null_start.data(),
d_nulls_end = null_end.data(),
[d_group_offsets = group_offsets.data(),
d_group_labels = group_labels.data(),
orderby_device_view = *p_orderby_device_view,
d_nulls_begin = null_start.data(),
d_nulls_end = null_end.data(),
following_window,
following_window_is_unbounded] __device__(size_type idx) -> size_type {
auto group_label = d_group_labels[idx];
Expand All @@ -752,14 +828,14 @@ std::unique_ptr<column> range_window_DESC(column_view const& input,
return nulls_end - idx - 1;
}

auto const d_orderby = begin<T>(orderby_device_view);
// orderby[idx] not null. Search must exclude the null group.
// If nulls_begin == group_start, either of the following is true:
// 1. NULLS FIRST ordering: Search ends at group_end.
// 2. NO NULLS: Search ends at group_end.
// Otherwise, NULLS LAST ordering. Search ends at nulls_begin.
auto search_end = nulls_begin == group_start ? group_end : nulls_begin;

auto lowest_in_window = subtract_safe(d_orderby[idx], following_window);
auto search_end = nulls_begin == group_start ? group_end : nulls_begin;
auto lowest_in_window = compute_lowest_in_window(d_orderby, idx, following_window);
mythrocks marked this conversation as resolved.
Show resolved Hide resolved

return (thrust::upper_bound(thrust::seq,
d_orderby + idx,
Expand Down