Skip to content

Commit

Permalink
Floating point order-by columns for RANGE window functions (#13512)
Browse files Browse the repository at this point in the history
This commit adds support for `FLOAT32` and `FLOAT64` order-by columns for RANGE-based window functions.

## Background
Up until this commit, order-by columns for RANGE window functions were allowed to be integral numerics, timestamps, or strings (for unbounded/current rows).

With this commit, window functions will be permitted to run on floating point value ranges. E.g. This supports windows defined with floating point deltas, like `rows with values exceeding the current row by no more than 3.14f`.

This is in the same vein as the support for `DECIMAL` (#11645) and `STRING` (#13143).

Authors:
  - MithunR (https://github.com/mythrocks)

Approvers:
  - Nghia Truong (https://github.com/ttnghia)
  - David Wendt (https://github.com/davidwendt)

URL: #13512
  • Loading branch information
mythrocks authored Jun 20, 2023
1 parent 3cfa8fd commit 5a7e3c7
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 30 deletions.
8 changes: 4 additions & 4 deletions cpp/src/rolling/detail/range_window_bounds.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ template <typename RangeType>
constexpr bool is_supported_range_type()
{
return cudf::is_duration<RangeType>() || cudf::is_fixed_point<RangeType>() ||
(std::is_integral_v<RangeType> && !cudf::is_boolean<RangeType>());
(cudf::is_numeric<RangeType>() && !cudf::is_boolean<RangeType>());
}

/// Checks if the specified type is a supported target type,
Expand All @@ -38,7 +38,7 @@ template <typename ColumnType>
constexpr bool is_supported_order_by_column_type()
{
return cudf::is_timestamp<ColumnType>() || cudf::is_fixed_point<ColumnType>() ||
(std::is_integral_v<ColumnType> && !cudf::is_boolean<ColumnType>()) ||
(cudf::is_numeric<ColumnType>() && !cudf::is_boolean<ColumnType>()) ||
std::is_same_v<ColumnType, cudf::string_view>;
}

Expand All @@ -64,7 +64,7 @@ struct range_type_impl {
template <typename ColumnType>
struct range_type_impl<
ColumnType,
std::enable_if_t<std::is_integral_v<ColumnType> && !cudf::is_boolean<ColumnType>(), void>> {
std::enable_if_t<cudf::is_numeric<ColumnType>() && !cudf::is_boolean<ColumnType>(), void>> {
using type = ColumnType;
using rep_type = ColumnType;
};
Expand Down Expand Up @@ -98,7 +98,7 @@ void assert_non_negative([[maybe_unused]] T const& value)

template <typename RangeT,
typename RepT,
CUDF_ENABLE_IF(std::is_integral_v<RangeT> && !cudf::is_boolean<RangeT>())>
CUDF_ENABLE_IF(cudf::is_numeric<RangeT>() && !cudf::is_boolean<RangeT>())>
RepT range_comparable_value_impl(scalar const& range_scalar,
bool,
data_type const&,
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/rolling/grouped_rolling.cu
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ struct device_value_accessor {
*
* @param[in] col_ column device view of cudf column
*/
__device__ device_value_accessor(column_device_view const& col_) : col{col_}
explicit __device__ device_value_accessor(column_device_view const& col_) : col{col_}
{
cudf_assert(type_id_matches_device_storage_type<T>(col.type().id()) &&
"the data type mismatch");
Expand Down
14 changes: 6 additions & 8 deletions cpp/src/rolling/range_window_bounds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ struct range_scalar_constructor {
{
CUDF_FAIL(
"Unsupported range type. "
"Only Durations, fixed-point, and non-boolean integral range types are allowed.");
"Only durations, fixed-point, and non-boolean numeric range types are allowed.");
}

template <typename T, CUDF_ENABLE_IF(cudf::is_duration<T>())>
Expand All @@ -45,7 +45,7 @@ struct range_scalar_constructor {
static_cast<duration_scalar<T> const&>(range_scalar_));
}

template <typename T, CUDF_ENABLE_IF(std::is_integral_v<T> && not cudf::is_boolean<T>())>
template <typename T, CUDF_ENABLE_IF(cudf::is_numeric<T>() && not cudf::is_boolean<T>())>
std::unique_ptr<scalar> operator()(scalar const& range_scalar_) const
{
return std::make_unique<numeric_scalar<T>>(
Expand All @@ -59,7 +59,6 @@ struct range_scalar_constructor {
static_cast<fixed_point_scalar<T> const&>(range_scalar_));
}
};

} // namespace

range_window_bounds::range_window_bounds(extent_type extent_, std::unique_ptr<scalar> range_scalar_)
Expand All @@ -73,19 +72,18 @@ range_window_bounds::range_window_bounds(extent_type extent_, std::unique_ptr<sc

range_window_bounds range_window_bounds::unbounded(data_type type)
{
return range_window_bounds(extent_type::UNBOUNDED, make_default_constructed_scalar(type));
return {extent_type::UNBOUNDED, make_default_constructed_scalar(type)};
}

range_window_bounds range_window_bounds::current_row(data_type type)
{
return range_window_bounds(extent_type::CURRENT_ROW, make_default_constructed_scalar(type));
return {extent_type::CURRENT_ROW, make_default_constructed_scalar(type)};
}

range_window_bounds range_window_bounds::get(scalar const& boundary)
{
return range_window_bounds{
extent_type::BOUNDED,
cudf::type_dispatcher(boundary.type(), range_scalar_constructor{}, boundary)};
return {extent_type::BOUNDED,
cudf::type_dispatcher(boundary.type(), range_scalar_constructor{}, boundary)};
}

} // namespace cudf
210 changes: 193 additions & 17 deletions cpp/tests/rolling/grouped_rolling_range_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,204 @@ using bigints_column = fwcw<int64_t>;
using strings_column = cudf::test::strings_column_wrapper;
using column_ptr = std::unique_ptr<cudf::column>;

struct BaseGroupedRollingRangeOrderByDecimalTest : public cudf::test::BaseFixture {
template <typename T>
struct BaseGroupedRollingRangeOrderByTest : cudf::test::BaseFixture {
// Stand-in for std::pow(10, n), but for integral return.
static constexpr std::array<int32_t, 6> pow10{1, 10, 100, 1000, 10000, 100000};

// Test data.
column_ptr const grouping_keys = ints_column{0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2}.release();
column_ptr const agg_values = ints_column{1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3}.release();
cudf::size_type const num_rows = grouping_keys->size();

/**
* @brief Get grouped rolling results for specified order-by column and range bounds.
*/
[[nodiscard]] column_ptr get_grouped_range_rolling_result(
cudf::range_window_bounds const& preceding,
cudf::range_window_bounds const& following,
cudf::column_view const& order_by_column,
cudf::rolling_aggregation const& agg) const
{
return cudf::grouped_range_rolling_window(cudf::table_view{{grouping_keys->view()}},
order_by_column,
cudf::order::ASCENDING,
agg_values->view(),
preceding,
following,
1, // min_periods
agg);
}

[[nodiscard]] column_ptr get_grouped_range_rolling_sum_result(
cudf::range_window_bounds const& preceding,
cudf::range_window_bounds const& following,
cudf::column_view const& order_by_column) const
{
return get_grouped_range_rolling_result(
preceding,
following,
order_by_column,
*cudf::make_sum_aggregation<cudf::rolling_aggregation>());
}
};

template <typename T>
struct GroupedRollingRangeOrderByNumericTest : public BaseGroupedRollingRangeOrderByTest<T> {
using base = BaseGroupedRollingRangeOrderByTest<T>;

using base::agg_values;
using base::get_grouped_range_rolling_sum_result;
using base::grouping_keys;
using base::num_rows;

[[nodiscard]] auto make_range_bounds(T const& value) const
{
return cudf::range_window_bounds::get(*cudf::make_fixed_width_scalar(value));
}

[[nodiscard]] auto make_unbounded_range_bounds() const
{
return cudf::range_window_bounds::unbounded(cudf::data_type{cudf::type_to_id<T>()});
}

/// Generate order-by column with values: [0, 100, 200, 300, ... 1100, 1200, 1300]
[[nodiscard]] column_ptr generate_order_by_column() const
{
auto const begin = thrust::make_transform_iterator(
thrust::make_counting_iterator<cudf::size_type>(0), [&](T const& i) -> T { return i * 100; });

return fwcw<T>(begin, begin + num_rows).release();
}

/**
* @brief Run grouped_rolling test with no nulls in the order-by column
*/
void run_test_no_null_oby() const
{
auto const preceding = make_range_bounds(T{200});
auto const following = make_range_bounds(T{100});
auto const order_by = generate_order_by_column();
auto const results = get_grouped_range_rolling_sum_result(preceding, following, *order_by);
auto const expected_results = bigints_column{{2, 3, 4, 4, 4, 3, 4, 6, 8, 6, 6, 9, 12, 9},
cudf::test::iterators::no_nulls()};
CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results, expected_results);
}

/**
* @brief Run grouped_rolling test with nulls in the order-by column
* (i.e. 2 nulls at the beginning of each group)
*
*/
void run_test_nulls_in_oby() const
{
auto const preceding = make_range_bounds(T{200});
auto const following = make_range_bounds(T{100});

// Nullify the first two rows of each group in the order_by column.
auto const nulled_order_by = [&] {
auto col = generate_order_by_column();
auto new_null_mask = create_null_mask(col->size(), cudf::mask_state::ALL_VALID);
cudf::set_null_mask(static_cast<cudf::bitmask_type*>(new_null_mask.data()),
0,
2,
false); // Nulls in first group.
cudf::set_null_mask(static_cast<cudf::bitmask_type*>(new_null_mask.data()),
6,
8,
false); // Nulls in second group.
cudf::set_null_mask(static_cast<cudf::bitmask_type*>(new_null_mask.data()),
10,
12,
false); // Nulls in third group.
col->set_null_mask(std::move(new_null_mask), 6);
return col;
}();

auto const results =
get_grouped_range_rolling_sum_result(preceding, following, *nulled_order_by);
auto const expected_results =
bigints_column{{2, 2, 2, 3, 4, 3, 4, 4, 4, 4, 6, 6, 6, 6}, cudf::test::iterators::no_nulls()};
CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results, expected_results);
}

/**
* @brief Run grouped_rolling test with unbounded preceding and unbounded following.
*/
void run_test_unbounded_preceding_to_unbounded_following()
{
auto const order_by = generate_order_by_column();
auto const preceding = make_unbounded_range_bounds();
auto const following = make_unbounded_range_bounds();
auto const results = get_grouped_range_rolling_sum_result(preceding, following, *order_by);

auto const expected_results = bigints_column{{6, 6, 6, 6, 6, 6, 8, 8, 8, 8, 12, 12, 12, 12},
cudf::test::iterators::no_nulls()};
CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results, expected_results);
}

/**
* @brief Run grouped_rolling test with unbounded preceding and current row.
*/
void run_test_unbounded_preceding_to_current_row()
{
auto const order_by = generate_order_by_column();
auto const unbounded_preceding = make_unbounded_range_bounds();
auto const current_row = make_range_bounds(T{0});
auto const results =
get_grouped_range_rolling_sum_result(unbounded_preceding, current_row, *order_by);

auto const expected_results = bigints_column{{1, 2, 3, 4, 5, 6, 2, 4, 6, 8, 3, 6, 9, 12},
cudf::test::iterators::no_nulls()};
CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results, expected_results);
}

/**
* @brief Run grouped_rolling test with current row and unbounded following.
*/
void run_test_current_row_to_unbounded_following()
{
auto const order_by = generate_order_by_column();
auto const unbounded_following = make_unbounded_range_bounds();

auto const current_row = make_range_bounds(T{0});
auto const results =
get_grouped_range_rolling_sum_result(current_row, unbounded_following, *order_by);

auto const expected_results = bigints_column{{6, 5, 4, 3, 2, 1, 8, 6, 4, 2, 12, 9, 6, 3},
cudf::test::iterators::no_nulls()};
CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results, expected_results);
}
};

using base = BaseGroupedRollingRangeOrderByDecimalTest; // Shortcut to base test class.
template <typename FloatingPointType>
struct GroupedRollingRangeOrderByFloatingPointTest
: GroupedRollingRangeOrderByNumericTest<FloatingPointType> {};

TYPED_TEST_SUITE(GroupedRollingRangeOrderByFloatingPointTest, cudf::test::FloatingPointTypes);

TYPED_TEST(GroupedRollingRangeOrderByFloatingPointTest, BoundedRanges)
{
this->run_test_no_null_oby();
this->run_test_nulls_in_oby();
}

TYPED_TEST(GroupedRollingRangeOrderByFloatingPointTest, UnboundedRanges)
{
this->run_test_unbounded_preceding_to_unbounded_following();
this->run_test_unbounded_preceding_to_current_row();
this->run_test_current_row_to_unbounded_following();
}

template <typename DecimalT>
struct GroupedRollingRangeOrderByDecimalTypedTest : BaseGroupedRollingRangeOrderByDecimalTest {
using Rep = typename DecimalT::rep;
struct GroupedRollingRangeOrderByDecimalTypedTest
: BaseGroupedRollingRangeOrderByTest<typename DecimalT::rep> {
using Rep = typename DecimalT::rep;
using base = BaseGroupedRollingRangeOrderByTest<Rep>;

using base::agg_values;
using base::grouping_keys;
using base::num_rows;

[[nodiscard]] auto make_fixed_point_range_bounds(typename DecimalT::rep value,
numeric::scale_type scale) const
Expand Down Expand Up @@ -108,23 +292,15 @@ struct GroupedRollingRangeOrderByDecimalTypedTest : BaseGroupedRollingRangeOrder
* @brief Get grouped rolling results for specified order-by column and range scale
*
*/
column_ptr get_grouped_range_rolling_result(cudf::column_view const& order_by_column,
numeric::scale_type const& range_scale) const
[[nodiscard]] column_ptr get_grouped_range_rolling_result(
cudf::column_view const& order_by_column, numeric::scale_type const& range_scale) const
{
auto const preceding =
this->make_fixed_point_range_bounds(rescale_range_value(Rep{200}, range_scale), range_scale);
auto const following =
this->make_fixed_point_range_bounds(rescale_range_value(Rep{100}, range_scale), range_scale);

return cudf::grouped_range_rolling_window(
cudf::table_view{{grouping_keys->view()}},
order_by_column,
cudf::order::ASCENDING,
agg_values->view(),
preceding,
following,
1, // min_periods
*cudf::make_sum_aggregation<cudf::rolling_aggregation>());
return base::get_grouped_range_rolling_sum_result(preceding, following, order_by_column);
}

/**
Expand Down Expand Up @@ -209,7 +385,7 @@ struct GroupedRollingRangeOrderByDecimalTypedTest : BaseGroupedRollingRangeOrder

/**
* @brief Run grouped_rolling test for specified order-by column scale with
* unbounded preceding and unbounded following.
* unbounded preceding and current row.
*
*/
void run_test_unbounded_preceding_to_current_row(numeric::scale_type oby_column_scale)
Expand Down Expand Up @@ -239,7 +415,7 @@ struct GroupedRollingRangeOrderByDecimalTypedTest : BaseGroupedRollingRangeOrder

/**
* @brief Run grouped_rolling test for specified order-by column scale with
* unbounded preceding and unbounded following.
* current row and unbounded following.
*
*/
void run_test_current_row_to_unbounded_following(numeric::scale_type oby_column_scale)
Expand Down

0 comments on commit 5a7e3c7

Please sign in to comment.