Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Add support for LEAD/LAG window functions for fixed-width types [redux] #6277

Merged
merged 18 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
a56a106
[lead/lag] [redux] Implement LEAD/LAG window functions
mythrocks Sep 17, 2020
1e24085
Merge remote-tracking branch 'origin/branch-0.16' into lead-lag-branc…
mythrocks Sep 22, 2020
c6245c5
[lead/lag] [redux] Added all-nulls test.
mythrocks Sep 22, 2020
5410189
Merge remote-tracking branch 'origin/branch-0.16' into lead-lag-branc…
mythrocks Sep 23, 2020
af98ce9
[lead/lag] [redux] Fixed headers in test.
mythrocks Sep 23, 2020
ccd5c40
[lead/lag] [redux] Review changes:
mythrocks Sep 24, 2020
8bcb349
Merge remote-tracking branch 'origin/branch-0.16' into lead-lag-branc…
mythrocks Sep 28, 2020
27c510d
[lead/lag] [redux] Modified tests, from review.
mythrocks Sep 28, 2020
0e80f6e
Merge remote-tracking branch 'origin/branch-0.16' into lead-lag-branc…
mythrocks Sep 30, 2020
f9cc799
[lead/lag] Added short-ckt path for LEAD/LAG(0)
mythrocks Oct 1, 2020
76d58d2
Merge remote-tracking branch 'origin/branch-0.16' into lead-lag-branc…
mythrocks Oct 1, 2020
1cd338f
[lead/lag] Cleanup from review:
mythrocks Oct 5, 2020
80244e8
Merge remote-tracking branch 'origin/branch-0.16' into lead-lag-branc…
mythrocks Oct 5, 2020
e266052
[lead/lag] Code formatting.
mythrocks Oct 6, 2020
b16ce5f
Merge remote-tracking branch 'origin/branch-0.16' into lead-lag-branc…
mythrocks Oct 6, 2020
20f1e78
Merge remote-tracking branch 'origin/branch-0.16' into lead-lag-branc…
mythrocks Oct 7, 2020
1a90b13
[lead/lag] Review changes:
mythrocks Oct 7, 2020
afe0ea7
[lead/lag] Code formatting.
mythrocks Oct 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
- PR #6254 Add `cudf::make_dictionary_from_scalar` factory function
- PR #6262 Add nth_element series aggregation with null handling
- PR #6277 Add support for LEAD/LAG window functions for fixed-width types
- PR #6318 Add support for reading Struct and map types from Parquet files
- PR #6315 Native code for string-map lookups, for cudf-java
- PR #6302 Add custom dataframe accessors
- PR #6301 Add JNI bindings to nvcomp
Expand Down
46 changes: 15 additions & 31 deletions cpp/src/rolling/rolling.cu
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,15 @@ struct rolling_window_launcher {
{
if (input.is_empty()) return empty_like(input);

CUDF_EXPECTS(default_outputs.type().id() == input.type().id(),
"Defaults column type must match input column."); // Because LEAD/LAG.

// For LEAD(0)/LAG(0), no computation need be performed.
// Return copy of input.
if (0 == static_cast<cudf::detail::lead_lag_aggregation*>(agg.get())->row_offset) {
return std::make_unique<column>(input, static_cast<cudaStream_t>(0), mr);
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
}

auto output = make_fixed_width_column(
target_type(input.type(), op), input.size(), mask_state::UNINITIALIZED, stream, mr);

Expand Down Expand Up @@ -749,7 +758,9 @@ struct rolling_window_launcher {
rmm::mr::device_memory_resource* mr,
cudaStream_t stream)
{
CUDF_FAIL("Aggregation operator and/or input type combination is invalid");
CUDF_FAIL(
"Aggregation operator and/or input type combination is invalid: "
"LEAD/LAG supported only on fixed-width types");
}

template <aggregation::Kind op,
Expand All @@ -766,6 +777,9 @@ struct rolling_window_launcher {
rmm::mr::device_memory_resource* mr,
cudaStream_t stream)
{
CUDF_EXPECTS(default_outputs.is_empty() || op == aggregation::LEAD || op == aggregation::LAG,
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
"Only LEAD/LAG window functions support default values.");

return launch<InputType,
typename corresponding_operator<op>::type,
op,
Expand Down Expand Up @@ -1020,21 +1034,6 @@ std::unique_ptr<column> rolling_window(column_view const& input,
CUDF_EXPECTS((default_outputs.is_empty() || default_outputs.size() == input.size()),
"Defaults column must be either empty or have as many rows as the input column.");

CUDF_EXPECTS(
default_outputs.is_empty() || agg->kind == aggregation::LEAD || agg->kind == aggregation::LAG,
"Only LEAD/LAG window functions support default values.");

CUDF_EXPECTS(default_outputs.type().id() == input.type().id(),
"Defaults column type must match input column."); // Because LEAD/LAG.

if (agg->kind == aggregation::LEAD || agg->kind == aggregation::LAG) {
// For LEAD(0)/LAG(0), no computation need be performed.
// Return copy of input.
if (0 == static_cast<cudf::detail::lead_lag_aggregation*>(agg.get())->row_offset) {
return std::make_unique<column>(input, static_cast<cudaStream_t>(0), mr);
}
}

if (agg->kind == aggregation::CUDA || agg->kind == aggregation::PTX) {
return cudf::detail::rolling_window_udf(input,
preceding_window,
Expand Down Expand Up @@ -1141,21 +1140,6 @@ std::unique_ptr<column> grouped_rolling_window(table_view const& group_keys,
CUDF_EXPECTS((default_outputs.is_empty() || default_outputs.size() == input.size()),
"Defaults column must be either empty or have as many rows as the input column.");

CUDF_EXPECTS(
default_outputs.is_empty() || aggr->kind == aggregation::LEAD || aggr->kind == aggregation::LAG,
"Only LEAD/LAG window functions support default values.");

CUDF_EXPECTS(default_outputs.type().id() == input.type().id(),
"Defaults column type must match input column."); // Because LEAD/LAG.

if (aggr->kind == aggregation::LEAD || aggr->kind == aggregation::LAG) {
// For LEAD(0)/LAG(0), no computation need be performed.
// Return copy of input.
if (0 == static_cast<cudf::detail::lead_lag_aggregation*>(aggr.get())->row_offset) {
return std::make_unique<column>(input, static_cast<cudaStream_t>(0), mr);
}
}

if (group_keys.num_columns() == 0) {
// No Groupby columns specified. Treat as one big group.
return rolling_window(
Expand Down
72 changes: 70 additions & 2 deletions cpp/tests/lead_lag/lead_lag_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,9 +418,9 @@ TYPED_TEST(TypedLeadLagWindowTest, TestLeadLagWithNoGrouping)
fixed_width_column_wrapper<T>{{99, 99, 0, 1, -1, 3}, {1, 1, 1, 1, 0, 1}}.release()->view());
}

TEST_F(LeadLagWindowTest, TestLeadLagWithAllNullInput)
TYPED_TEST(TypedLeadLagWindowTest, TestLeadLagWithAllNullInput)
{
using T = int32_t;
using T = TypeParam;

auto const input_col = fixed_width_column_wrapper<T>{
{0, 1, 2, 3, 4, 5, 0, 10, 20, 30, 40, 50}, make_counting_transform_iterator(0, [](auto i) {
Expand Down Expand Up @@ -468,4 +468,72 @@ TEST_F(LeadLagWindowTest, TestLeadLagWithAllNullInput)
->view());
}

TYPED_TEST(TypedLeadLagWindowTest, DefaultValuesWithoutLeadLag)
{
// Test that passing default values for window-functions
// other than lead/lag lead to cudf::logic_error.

using T = TypeParam;

auto const input_col = fixed_width_column_wrapper<T>{
{0, 1, 2, 3, 4, 5}, make_counting_transform_iterator(0, [](auto i) {
return true;
})}.release();
auto const input_size = input_col->size();
auto const grouping_key = fixed_width_column_wrapper<int32_t>{0, 0, 0, 0, 0, 0};
auto const grouping_keys = cudf::table_view{std::vector<cudf::column_view>{grouping_key}};

auto const default_value =
cudf::make_fixed_width_scalar(detail::fixed_width_type_converter<int32_t, T>{}(99));
auto const default_outputs = cudf::make_column_from_scalar(*default_value, input_col->size());

auto const preceding = 4;
auto const following = 3;
auto const min_periods = 1;

auto const assert_aggregation_fails = [&](auto&& aggr) {
EXPECT_THROW(cudf::grouped_rolling_window(grouping_keys,
input_col->view(),
default_outputs->view(),
preceding,
following,
min_periods,
cudf::make_count_aggregation()),
cudf::logic_error);
};

auto aggs = {cudf::make_count_aggregation(), cudf::make_min_aggregation()};
std::for_each(
aggs.begin(), aggs.end(), [&](auto& agg) { assert_aggregation_fails(std::move(agg)); });
}

TEST_F(LeadLagWindowTest, LeadLagWithoutFixedWidthInput)
{
// Check that Lead/Lag aren't supported for non-fixed-width types.

auto const input_col = strings_column_wrapper{
{"0", "1", "2", "3", "4", "5"}, make_counting_transform_iterator(0, [](auto i) {
return false;
})}.release();
auto const input_size = input_col->size();
auto const grouping_key = fixed_width_column_wrapper<int32_t>{0, 0, 0, 0, 0, 0};
auto const grouping_keys = cudf::table_view{std::vector<cudf::column_view>{grouping_key}};

auto const default_value = cudf::make_string_scalar("99");
auto const default_outputs = cudf::make_column_from_scalar(*default_value, input_col->size());

auto const preceding = 4;
auto const following = 3;
auto const min_periods = 1;

EXPECT_THROW(cudf::grouped_rolling_window(grouping_keys,
input_col->view(),
default_outputs->view(),
preceding,
following,
min_periods,
cudf::make_lead_aggregation(4)),
cudf::logic_error);
}

CUDF_TEST_PROGRAM_MAIN()