Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Add support for LEAD/LAG window functions for fixed-width types [redux] #6277

Merged
merged 18 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
a56a106
[lead/lag] [redux] Implement LEAD/LAG window functions
mythrocks Sep 17, 2020
1e24085
Merge remote-tracking branch 'origin/branch-0.16' into lead-lag-branc…
mythrocks Sep 22, 2020
c6245c5
[lead/lag] [redux] Added all-nulls test.
mythrocks Sep 22, 2020
5410189
Merge remote-tracking branch 'origin/branch-0.16' into lead-lag-branc…
mythrocks Sep 23, 2020
af98ce9
[lead/lag] [redux] Fixed headers in test.
mythrocks Sep 23, 2020
ccd5c40
[lead/lag] [redux] Review changes:
mythrocks Sep 24, 2020
8bcb349
Merge remote-tracking branch 'origin/branch-0.16' into lead-lag-branc…
mythrocks Sep 28, 2020
27c510d
[lead/lag] [redux] Modified tests, from review.
mythrocks Sep 28, 2020
0e80f6e
Merge remote-tracking branch 'origin/branch-0.16' into lead-lag-branc…
mythrocks Sep 30, 2020
f9cc799
[lead/lag] Added short-ckt path for LEAD/LAG(0)
mythrocks Oct 1, 2020
76d58d2
Merge remote-tracking branch 'origin/branch-0.16' into lead-lag-branc…
mythrocks Oct 1, 2020
1cd338f
[lead/lag] Cleanup from review:
mythrocks Oct 5, 2020
80244e8
Merge remote-tracking branch 'origin/branch-0.16' into lead-lag-branc…
mythrocks Oct 5, 2020
e266052
[lead/lag] Code formatting.
mythrocks Oct 6, 2020
b16ce5f
Merge remote-tracking branch 'origin/branch-0.16' into lead-lag-branc…
mythrocks Oct 6, 2020
20f1e78
Merge remote-tracking branch 'origin/branch-0.16' into lead-lag-branc…
mythrocks Oct 7, 2020
1a90b13
[lead/lag] Review changes:
mythrocks Oct 7, 2020
afe0ea7
[lead/lag] Code formatting.
mythrocks Oct 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
- PR #6139 Add column conversion to big endian byte list.
- PR #6220 Add `list_topics()` to supply list of underlying Kafka connection topics
- PR #6254 Add `cudf::make_dictionary_from_scalar` factory function
- PR #6262 Add nth_element series aggregation with null handling
- PR #6277 Add support for LEAD/LAG window functions for fixed-width types
- PR #6318 Add support for reading Struct and map types from Parquet files
- PR #6315 Native code for string-map lookups, for cudf-java
- PR #6302 Add custom dataframe accessors
Expand Down
8 changes: 8 additions & 0 deletions cpp/include/cudf/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class aggregation {
NTH_ELEMENT, ///< get the nth element
ROW_NUMBER, ///< get row-number of element
COLLECT, ///< collect values into a list
LEAD, ///< window function, accesses row at specified offset following current row
LAG, ///< window function, accesses row at specified offset preceding current row
PTX, ///< PTX UDF based reduction
CUDA ///< CUDA UDf based reduction
};
Expand Down Expand Up @@ -197,6 +199,12 @@ std::unique_ptr<aggregation> make_row_number_aggregation();
/// Factory to create a COLLECT_NUMBER aggregation
std::unique_ptr<aggregation> make_collect_aggregation();

/// Factory to create a LAG aggregation
std::unique_ptr<aggregation> make_lag_aggregation(size_type offset);

/// Factory to create a LEAD aggregation
std::unique_ptr<aggregation> make_lead_aggregation(size_type offset);

/**
* @brief Factory to create an aggregation base on UDF for PTX or CUDA
*
Expand Down
33 changes: 33 additions & 0 deletions cpp/include/cudf/detail/aggregation/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,23 @@ struct quantile_aggregation final : derived_aggregation<quantile_aggregation> {
}
};

struct lead_lag_aggregation final : derived_aggregation<lead_lag_aggregation> {
lead_lag_aggregation(Kind kind, size_type offset)
: derived_aggregation{offset < 0 ? (kind == LAG ? LEAD : LAG) : kind},
row_offset{std::abs(offset)}
{
}

size_type row_offset;

protected:
friend class derived_aggregation<lead_lag_aggregation>;

bool operator==(lead_lag_aggregation const& rhs) const { return row_offset == rhs.row_offset; }

size_t hash_impl() const { return std::hash<size_type>()(row_offset); }
};

/**
* @brief Derived class for specifying a standard deviation/variance aggregation
*/
Expand Down Expand Up @@ -361,6 +378,18 @@ struct target_type_impl<Source, aggregation::COLLECT> {
using type = cudf::list_view;
};

// Always use Source for LEAD
template <typename Source>
struct target_type_impl<Source, aggregation::LEAD> {
using type = Source;
};

// Always use Source for LAG
template <typename Source>
struct target_type_impl<Source, aggregation::LAG> {
using type = Source;
};

/**
* @brief Helper alias to get the accumulator type for performing aggregation
* `k` on elements of type `Source`
Expand Down Expand Up @@ -448,6 +477,10 @@ CUDA_HOST_DEVICE_CALLABLE decltype(auto) aggregation_dispatcher(aggregation::Kin
return f.template operator()<aggregation::ROW_NUMBER>(std::forward<Ts>(args)...);
case aggregation::COLLECT:
return f.template operator()<aggregation::COLLECT>(std::forward<Ts>(args)...);
case aggregation::LEAD:
return f.template operator()<aggregation::LEAD>(std::forward<Ts>(args)...);
case aggregation::LAG:
return f.template operator()<aggregation::LAG>(std::forward<Ts>(args)...);
default: {
#ifndef __CUDA_ARCH__
CUDF_FAIL("Unsupported aggregation.");
Expand Down
9 changes: 9 additions & 0 deletions cpp/include/cudf/detail/utilities/device_operators.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,15 @@ struct DeviceXor {
}
};

/**
* @brief Operator for calculating Lead/Lag window function.
*/
struct DeviceLeadLag {
const size_type row_offset;

explicit CUDA_HOST_DEVICE_CALLABLE DeviceLeadLag(size_type offset_) : row_offset(offset_) {}
};
mythrocks marked this conversation as resolved.
Show resolved Hide resolved

} // namespace cudf

#endif
46 changes: 46 additions & 0 deletions cpp/include/cudf/rolling.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,28 @@ std::unique_ptr<column> rolling_window(
std::unique_ptr<aggregation> const& agg,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @copydoc std::unique_ptr<column> rolling_window(
* column_view const& input,
* size_type preceding_window,
* size_type following_window,
* size_type min_periods,
* std::unique_ptr<aggregation> const& agg,
* rmm::mr::device_memory_resource* mr)
*
* @param default_outputs A column of per-row default values to be returned instead
* of nulls. Used for LEAD()/LAG(), if the row offset crosses
* the boundaries of the column.
*/
std::unique_ptr<column> rolling_window(
column_view const& input,
column_view const& default_outputs,
size_type preceding_window,
size_type following_window,
size_type min_periods,
std::unique_ptr<aggregation> const& agg,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Applies a grouping-aware, fixed-size rolling window function to the values in a column.
*
Expand Down Expand Up @@ -140,6 +162,30 @@ std::unique_ptr<column> grouped_rolling_window(
std::unique_ptr<aggregation> const& aggr,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @copydoc std::unique_ptr<column> grouped_rolling_window(
* table_view const& group_keys,
* column_view const& input,
* size_type preceding_window,
* size_type following_window,
* size_type min_periods,
* std::unique_ptr<aggregation> const& aggr,
* rmm::mr::device_memory_resource* mr)
*
* @param default_outputs A column of per-row default values to be returned instead
* of nulls. Used for LEAD()/LAG(), if the row offset crosses
* the boundaries of the column or group.
*/
std::unique_ptr<column> grouped_rolling_window(
table_view const& group_keys,
column_view const& input,
column_view const& default_outputs,
size_type preceding_window,
size_type following_window,
size_type min_periods,
std::unique_ptr<aggregation> const& aggr,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Applies a grouping-aware, timestamp-based rolling window function to the values in a
*column.
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/aggregation/aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ std::unique_ptr<aggregation> make_collect_aggregation()
{
return std::make_unique<aggregation>(aggregation::COLLECT);
}
/// Factory to create a LAG aggregation
std::unique_ptr<aggregation> make_lag_aggregation(size_type offset)
{
return std::make_unique<cudf::detail::lead_lag_aggregation>(aggregation::LAG, offset);
}
/// Factory to create a LEAD aggregation
std::unique_ptr<aggregation> make_lead_aggregation(size_type offset)
{
return std::make_unique<cudf::detail::lead_lag_aggregation>(aggregation::LEAD, offset);
}
/// Factory to create a UDF aggregation
std::unique_ptr<aggregation> make_udf_aggregation(udf_type type,
std::string const& user_defined_aggregator,
Expand Down
Loading