diff --git a/cpp/src/rolling/grouped_rolling.cu b/cpp/src/rolling/grouped_rolling.cu index b208e7cd980..79dc7f0288a 100644 --- a/cpp/src/rolling/grouped_rolling.cu +++ b/cpp/src/rolling/grouped_rolling.cu @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -269,6 +270,69 @@ __device__ T subtract_safe(T const& value, T const& delta) : cuda::std::numeric_limits::min(); } +/** + * @brief For a specified idx, find the lowest value of the (sorted) orderby column that + * participates in a range-window query. + */ +template +__device__ ElementT compute_lowest_in_window(ElementIter orderby_iter, + size_type idx, + ElementT delta) +{ + return subtract_safe(orderby_iter[idx], delta); +} + +/** + * @brief For a specified idx, find the highest value of the (sorted) orderby column that + * participates in a range-window query. + */ +template +__device__ ElementT compute_highest_in_window(ElementIter orderby_iter, + size_type idx, + ElementT delta) +{ + return add_safe(orderby_iter[idx], delta); +} + +/** + * Accessor for values in an order-by column, on the device. + */ +template +struct device_value_accessor { + column_device_view const col; ///< column view of column in device + + /** + * @brief constructor + * + * @param[in] col_ column device view of cudf column + */ + __device__ device_value_accessor(column_device_view const& col_) : col{col_} + { + cudf_assert(type_id_matches_device_storage_type(col.type().id()) && + "the data type mismatch"); + } + + /** + * @brief Returns the value of element at index `i` + * @param[in] i index of element + * @return value of element at index `i` + */ + __device__ T operator()(cudf::size_type i) const { return col.element(i); } +}; + +template +using const_device_iterator = + thrust::transform_iterator, thrust::counting_iterator>; + +/// This is a stand-in for the `cudf::column_device_view::begin()`, which is `__host__` only. +/// For range window functions, one might need to iterate over the order-by column, per row. +template ())> +[[nodiscard]] __device__ const_device_iterator begin(cudf::column_device_view const& col) +{ + return const_device_iterator{thrust::make_counting_iterator(0), + device_value_accessor{col}}; +} + /// Given a single, ungrouped order-by column, return the indices corresponding /// to the first null element, and (one past) the last null timestamp. /// The input column is sorted, with all null values clustered either @@ -324,11 +388,12 @@ std::unique_ptr range_window_ASC(column_view const& input, rmm::mr::device_memory_resource* mr) { auto [h_nulls_begin_idx, h_nulls_end_idx] = get_null_bounds_for_orderby_column(orderby_column); + auto const p_orderby_device_view = cudf::column_device_view::create(orderby_column, stream); - auto preceding_calculator = - [nulls_begin_idx = h_nulls_begin_idx, - nulls_end_idx = h_nulls_end_idx, - d_orderby = orderby_column.data(), + auto const preceding_calculator = + [nulls_begin_idx = h_nulls_begin_idx, + nulls_end_idx = h_nulls_end_idx, + orderby_device_view = *p_orderby_device_view, preceding_window, preceding_window_is_unbounded] __device__(size_type idx) -> size_type { if (preceding_window_is_unbounded) { @@ -342,13 +407,14 @@ std::unique_ptr range_window_ASC(column_view const& input, return idx - nulls_begin_idx + 1; } + auto const d_orderby = begin(orderby_device_view); // orderby[idx] not null. Binary search the group, excluding null group. // If nulls_begin_idx == 0, either // 1. NULLS FIRST ordering: Binary search starts where nulls_end_idx. // 2. NO NULLS: Binary search starts at 0 (also nulls_end_idx). // Otherwise, NULLS LAST ordering. Start at 0. - auto group_start = nulls_begin_idx == 0 ? nulls_end_idx : 0; - auto lowest_in_window = subtract_safe(d_orderby[idx], preceding_window); + auto const group_start = nulls_begin_idx == 0 ? nulls_end_idx : 0; + auto const lowest_in_window = compute_lowest_in_window(d_orderby, idx, preceding_window); return ((d_orderby + idx) - thrust::lower_bound(thrust::seq, d_orderby + group_start, @@ -357,13 +423,13 @@ std::unique_ptr range_window_ASC(column_view const& input, 1; // Add 1, for `preceding` to account for current row. }; - auto preceding_column = expand_to_column(preceding_calculator, input.size(), stream); + auto const preceding_column = expand_to_column(preceding_calculator, input.size(), stream); - auto following_calculator = - [nulls_begin_idx = h_nulls_begin_idx, - nulls_end_idx = h_nulls_end_idx, - num_rows = input.size(), - d_orderby = orderby_column.data(), + auto const following_calculator = + [nulls_begin_idx = h_nulls_begin_idx, + nulls_end_idx = h_nulls_end_idx, + num_rows = input.size(), + orderby_device_view = *p_orderby_device_view, following_window, following_window_is_unbounded] __device__(size_type idx) -> size_type { if (following_window_is_unbounded) { return num_rows - idx - 1; } @@ -373,14 +439,15 @@ std::unique_ptr range_window_ASC(column_view const& input, return nulls_end_idx - idx - 1; } + auto const d_orderby = begin(orderby_device_view); // orderby[idx] not null. Binary search the group, excluding null group. // If nulls_begin_idx == 0, either // 1. NULLS FIRST ordering: Binary search ends at num_rows. // 2. NO NULLS: Binary search also ends at num_rows. // Otherwise, NULLS LAST ordering. End at nulls_begin_idx. - auto group_end = nulls_begin_idx == 0 ? num_rows : nulls_begin_idx; - auto highest_in_window = add_safe(d_orderby[idx], following_window); + auto const group_end = nulls_begin_idx == 0 ? num_rows : nulls_begin_idx; + auto const highest_in_window = compute_highest_in_window(d_orderby, idx, following_window); return (thrust::upper_bound( thrust::seq, d_orderby + idx, d_orderby + group_end, highest_in_window) - @@ -388,7 +455,7 @@ std::unique_ptr range_window_ASC(column_view const& input, 1; }; - auto following_column = expand_to_column(following_calculator, input.size(), stream); + auto const following_column = expand_to_column(following_calculator, input.size(), stream); return cudf::detail::rolling_window( input, preceding_column->view(), following_column->view(), min_periods, aggr, stream, mr); @@ -411,7 +478,7 @@ get_null_bounds_for_orderby_column(column_view const& orderby_column, // For each group, the null values are clustered at the beginning or the end of the group. // These nulls cannot participate, except in their own window. - auto num_groups = group_offsets.size() - 1; + auto const num_groups = group_offsets.size() - 1; if (orderby_column.has_nulls()) { auto null_start = rmm::device_uvector(num_groups, stream); @@ -463,7 +530,7 @@ get_null_bounds_for_orderby_column(column_view const& orderby_column, } else { // The returned vectors have num_groups items, but the input offsets have num_groups+1 // Drop the last element using a span - auto group_offsets_span = + auto const group_offsets_span = cudf::device_span(group_offsets.data(), num_groups); // When there are no nulls, just copy the input group offsets to the output. @@ -491,19 +558,20 @@ std::unique_ptr range_window_ASC(column_view const& input, { auto [null_start, null_end] = get_null_bounds_for_orderby_column(orderby_column, group_offsets, stream); - - auto preceding_calculator = - [d_group_offsets = group_offsets.data(), - d_group_labels = group_labels.data(), - d_orderby = orderby_column.data(), - d_nulls_begin = null_start.data(), - d_nulls_end = null_end.data(), + auto const p_orderby_device_view = cudf::column_device_view::create(orderby_column, stream); + + auto const preceding_calculator = + [d_group_offsets = group_offsets.data(), + d_group_labels = group_labels.data(), + orderby_device_view = *p_orderby_device_view, + d_nulls_begin = null_start.data(), + d_nulls_end = null_end.data(), preceding_window, preceding_window_is_unbounded] __device__(size_type idx) -> size_type { - auto group_label = d_group_labels[idx]; - auto group_start = d_group_offsets[group_label]; - auto nulls_begin = d_nulls_begin[group_label]; - auto nulls_end = d_nulls_end[group_label]; + auto const group_label = d_group_labels[idx]; + auto const group_start = d_group_offsets[group_label]; + auto const nulls_begin = d_nulls_begin[group_label]; + auto const nulls_end = d_nulls_end[group_label]; if (preceding_window_is_unbounded) { return idx - group_start + 1; } @@ -514,14 +582,15 @@ std::unique_ptr range_window_ASC(column_view const& input, return idx - nulls_begin + 1; } + auto const d_orderby = begin(orderby_device_view); + // orderby[idx] not null. Search must exclude the null group. // If nulls_begin == group_start, either of the following is true: // 1. NULLS FIRST ordering: Search must begin at nulls_end. // 2. NO NULLS: Search must begin at group_start (which also equals nulls_end.) // Otherwise, NULLS LAST ordering. Search must start at nulls group_start. - auto search_start = nulls_begin == group_start ? nulls_end : group_start; - - auto lowest_in_window = subtract_safe(d_orderby[idx], preceding_window); + auto const search_start = nulls_begin == group_start ? nulls_end : group_start; + auto const lowest_in_window = compute_lowest_in_window(d_orderby, idx, preceding_window); return ((d_orderby + idx) - thrust::lower_bound(thrust::seq, d_orderby + search_start, @@ -530,22 +599,23 @@ std::unique_ptr range_window_ASC(column_view const& input, 1; // Add 1, for `preceding` to account for current row. }; - auto preceding_column = expand_to_column(preceding_calculator, input.size(), stream); + auto const preceding_column = expand_to_column(preceding_calculator, input.size(), stream); - auto following_calculator = - [d_group_offsets = group_offsets.data(), - d_group_labels = group_labels.data(), - d_orderby = orderby_column.data(), - d_nulls_begin = null_start.data(), - d_nulls_end = null_end.data(), + auto const following_calculator = + [d_group_offsets = group_offsets.data(), + d_group_labels = group_labels.data(), + orderby_device_view = *p_orderby_device_view, + d_nulls_begin = null_start.data(), + d_nulls_end = null_end.data(), following_window, following_window_is_unbounded] __device__(size_type idx) -> size_type { - auto group_label = d_group_labels[idx]; - auto group_start = d_group_offsets[group_label]; - auto group_end = d_group_offsets[group_label + 1]; // Cannot fall off the end, since offsets - // is capped with `input.size()`. - auto nulls_begin = d_nulls_begin[group_label]; - auto nulls_end = d_nulls_end[group_label]; + auto const group_label = d_group_labels[idx]; + auto const group_start = d_group_offsets[group_label]; + auto const group_end = + d_group_offsets[group_label + 1]; // Cannot fall off the end, since offsets + // is capped with `input.size()`. + auto const nulls_begin = d_nulls_begin[group_label]; + auto const nulls_end = d_nulls_end[group_label]; if (following_window_is_unbounded) { return (group_end - idx) - 1; } @@ -556,14 +626,15 @@ std::unique_ptr range_window_ASC(column_view const& input, return nulls_end - idx - 1; } + auto const d_orderby = begin(orderby_device_view); + // orderby[idx] not null. Search must exclude the null group. // If nulls_begin == group_start, either of the following is true: // 1. NULLS FIRST ordering: Search ends at group_end. // 2. NO NULLS: Search ends at group_end. // Otherwise, NULLS LAST ordering. Search ends at nulls_begin. - auto search_end = nulls_begin == group_start ? group_end : nulls_begin; - - auto highest_in_window = add_safe(d_orderby[idx], following_window); + auto const search_end = nulls_begin == group_start ? group_end : nulls_begin; + auto const highest_in_window = compute_highest_in_window(d_orderby, idx, following_window); return (thrust::upper_bound( thrust::seq, d_orderby + idx, d_orderby + search_end, highest_in_window) - @@ -571,7 +642,7 @@ std::unique_ptr range_window_ASC(column_view const& input, 1; }; - auto following_column = expand_to_column(following_calculator, input.size(), stream); + auto const following_column = expand_to_column(following_calculator, input.size(), stream); return cudf::detail::rolling_window( input, preceding_column->view(), following_column->view(), min_periods, aggr, stream, mr); @@ -594,11 +665,12 @@ std::unique_ptr range_window_DESC(column_view const& input, rmm::mr::device_memory_resource* mr) { auto [h_nulls_begin_idx, h_nulls_end_idx] = get_null_bounds_for_orderby_column(orderby_column); + auto const p_orderby_device_view = cudf::column_device_view::create(orderby_column, stream); - auto preceding_calculator = - [nulls_begin_idx = h_nulls_begin_idx, - nulls_end_idx = h_nulls_end_idx, - d_orderby = orderby_column.data(), + auto const preceding_calculator = + [nulls_begin_idx = h_nulls_begin_idx, + nulls_end_idx = h_nulls_end_idx, + orderby_device_view = *p_orderby_device_view, preceding_window, preceding_window_is_unbounded] __device__(size_type idx) -> size_type { if (preceding_window_is_unbounded) { @@ -612,13 +684,14 @@ std::unique_ptr range_window_DESC(column_view const& input, return idx - nulls_begin_idx + 1; } + auto const d_orderby = begin(orderby_device_view); // orderby[idx] not null. Binary search the group, excluding null group. // If nulls_begin_idx == 0, either // 1. NULLS FIRST ordering: Binary search starts where nulls_end_idx. // 2. NO NULLS: Binary search starts at 0 (also nulls_end_idx). // Otherwise, NULLS LAST ordering. Start at 0. - auto group_start = nulls_begin_idx == 0 ? nulls_end_idx : 0; - auto highest_in_window = add_safe(d_orderby[idx], preceding_window); + auto const group_start = nulls_begin_idx == 0 ? nulls_end_idx : 0; + auto const highest_in_window = compute_highest_in_window(d_orderby, idx, preceding_window); return ((d_orderby + idx) - thrust::lower_bound(thrust::seq, @@ -629,13 +702,13 @@ std::unique_ptr range_window_DESC(column_view const& input, 1; // Add 1, for `preceding` to account for current row. }; - auto preceding_column = expand_to_column(preceding_calculator, input.size(), stream); + auto const preceding_column = expand_to_column(preceding_calculator, input.size(), stream); - auto following_calculator = - [nulls_begin_idx = h_nulls_begin_idx, - nulls_end_idx = h_nulls_end_idx, - num_rows = input.size(), - d_orderby = orderby_column.data(), + auto const following_calculator = + [nulls_begin_idx = h_nulls_begin_idx, + nulls_end_idx = h_nulls_end_idx, + num_rows = input.size(), + orderby_device_view = *p_orderby_device_view, following_window, following_window_is_unbounded] __device__(size_type idx) -> size_type { if (following_window_is_unbounded) { return (num_rows - idx) - 1; } @@ -645,14 +718,15 @@ std::unique_ptr range_window_DESC(column_view const& input, return nulls_end_idx - idx - 1; } + auto const d_orderby = begin(orderby_device_view); // orderby[idx] not null. Search must exclude null group. // If nulls_begin_idx = 0, either // 1. NULLS FIRST ordering: Search ends at num_rows. // 2. NO NULLS: Search also ends at num_rows. // Otherwise, NULLS LAST ordering: End at nulls_begin_idx. - auto group_end = nulls_begin_idx == 0 ? num_rows : nulls_begin_idx; - auto lowest_in_window = subtract_safe(d_orderby[idx], following_window); + auto const group_end = nulls_begin_idx == 0 ? num_rows : nulls_begin_idx; + auto const lowest_in_window = compute_lowest_in_window(d_orderby, idx, following_window); return (thrust::upper_bound(thrust::seq, d_orderby + idx, @@ -663,7 +737,7 @@ std::unique_ptr range_window_DESC(column_view const& input, 1; }; - auto following_column = expand_to_column(following_calculator, input.size(), stream); + auto const following_column = expand_to_column(following_calculator, input.size(), stream); return cudf::detail::rolling_window( input, preceding_column->view(), following_column->view(), min_periods, aggr, stream, mr); @@ -686,19 +760,20 @@ std::unique_ptr range_window_DESC(column_view const& input, { auto [null_start, null_end] = get_null_bounds_for_orderby_column(orderby_column, group_offsets, stream); - - auto preceding_calculator = - [d_group_offsets = group_offsets.data(), - d_group_labels = group_labels.data(), - d_orderby = orderby_column.data(), - d_nulls_begin = null_start.data(), - d_nulls_end = null_end.data(), + auto const p_orderby_device_view = cudf::column_device_view::create(orderby_column, stream); + + auto const preceding_calculator = + [d_group_offsets = group_offsets.data(), + d_group_labels = group_labels.data(), + orderby_device_view = *p_orderby_device_view, + d_nulls_begin = null_start.data(), + d_nulls_end = null_end.data(), preceding_window, preceding_window_is_unbounded] __device__(size_type idx) -> size_type { - auto group_label = d_group_labels[idx]; - auto group_start = d_group_offsets[group_label]; - auto nulls_begin = d_nulls_begin[group_label]; - auto nulls_end = d_nulls_end[group_label]; + auto const group_label = d_group_labels[idx]; + auto const group_start = d_group_offsets[group_label]; + auto const nulls_begin = d_nulls_begin[group_label]; + auto const nulls_end = d_nulls_end[group_label]; if (preceding_window_is_unbounded) { return (idx - group_start) + 1; } @@ -709,14 +784,14 @@ std::unique_ptr range_window_DESC(column_view const& input, return idx - nulls_begin + 1; } + auto const d_orderby = begin(orderby_device_view); // orderby[idx] not null. Search must exclude the null group. // If nulls_begin == group_start, either of the following is true: // 1. NULLS FIRST ordering: Search must begin at nulls_end. // 2. NO NULLS: Search must begin at group_start (which also equals nulls_end.) // Otherwise, NULLS LAST ordering. Search must start at nulls group_start. - auto search_start = nulls_begin == group_start ? nulls_end : group_start; - - auto highest_in_window = add_safe(d_orderby[idx], preceding_window); + auto const search_start = nulls_begin == group_start ? nulls_end : group_start; + auto const highest_in_window = compute_highest_in_window(d_orderby, idx, preceding_window); return ((d_orderby + idx) - thrust::lower_bound(thrust::seq, @@ -727,21 +802,21 @@ std::unique_ptr range_window_DESC(column_view const& input, 1; // Add 1, for `preceding` to account for current row. }; - auto preceding_column = expand_to_column(preceding_calculator, input.size(), stream); + auto const preceding_column = expand_to_column(preceding_calculator, input.size(), stream); - auto following_calculator = - [d_group_offsets = group_offsets.data(), - d_group_labels = group_labels.data(), - d_orderby = orderby_column.data(), - d_nulls_begin = null_start.data(), - d_nulls_end = null_end.data(), + auto const following_calculator = + [d_group_offsets = group_offsets.data(), + d_group_labels = group_labels.data(), + orderby_device_view = *p_orderby_device_view, + d_nulls_begin = null_start.data(), + d_nulls_end = null_end.data(), following_window, following_window_is_unbounded] __device__(size_type idx) -> size_type { - auto group_label = d_group_labels[idx]; - auto group_start = d_group_offsets[group_label]; - auto group_end = d_group_offsets[group_label + 1]; - auto nulls_begin = d_nulls_begin[group_label]; - auto nulls_end = d_nulls_end[group_label]; + auto const group_label = d_group_labels[idx]; + auto const group_start = d_group_offsets[group_label]; + auto const group_end = d_group_offsets[group_label + 1]; + auto const nulls_begin = d_nulls_begin[group_label]; + auto const nulls_end = d_nulls_end[group_label]; if (following_window_is_unbounded) { return (group_end - idx) - 1; } @@ -752,14 +827,14 @@ std::unique_ptr range_window_DESC(column_view const& input, return nulls_end - idx - 1; } + auto const d_orderby = begin(orderby_device_view); // orderby[idx] not null. Search must exclude the null group. // If nulls_begin == group_start, either of the following is true: // 1. NULLS FIRST ordering: Search ends at group_end. // 2. NO NULLS: Search ends at group_end. // Otherwise, NULLS LAST ordering. Search ends at nulls_begin. - auto search_end = nulls_begin == group_start ? group_end : nulls_begin; - - auto lowest_in_window = subtract_safe(d_orderby[idx], following_window); + auto const search_end = nulls_begin == group_start ? group_end : nulls_begin; + auto const lowest_in_window = compute_lowest_in_window(d_orderby, idx, following_window); return (thrust::upper_bound(thrust::seq, d_orderby + idx, @@ -770,7 +845,7 @@ std::unique_ptr range_window_DESC(column_view const& input, 1; }; - auto following_column = expand_to_column(following_calculator, input.size(), stream); + auto const following_column = expand_to_column(following_calculator, input.size(), stream); if (aggr.kind == aggregation::CUDA || aggr.kind == aggregation::PTX) { CUDF_FAIL("Ranged rolling window does NOT (yet) support UDF.");