Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REVIEW] Fix rolling-window count for null input #6344

Merged
merged 20 commits into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8211a68
Fix rolling-window count for null input
mythrocks Sep 29, 2020
4da93f5
Merge remote-tracking branch 'origin/branch-0.16' into window-count-fix
mythrocks Sep 30, 2020
4d3c7b1
Rolling Window count fix: Fixing python tests.
mythrocks Sep 30, 2020
59a36f3
Merge remote-tracking branch 'origin/branch-0.16' into window-count-fix
mythrocks Oct 1, 2020
3b55909
Rolling Window count fix: Using marks for xfail.
mythrocks Oct 1, 2020
90aa2a9
Rolling Window count fix: Fixed formatting
mythrocks Oct 1, 2020
2be5049
Merge remote-tracking branch 'origin/branch-0.16' into window-count-fix
mythrocks Oct 2, 2020
160d9b2
Rolling Window count fix: Review fixes:
mythrocks Oct 2, 2020
1298b0c
Rolling Window count fix: Review fixes:
mythrocks Oct 6, 2020
8697cf1
Merge remote-tracking branch 'origin/branch-0.16' into window-count-fix
mythrocks Oct 6, 2020
9a4b168
Rolling Window count fix: Review fixes:
mythrocks Oct 6, 2020
45003cd
Merge remote-tracking branch 'origin/branch-0.16' into window-count-fix
mythrocks Oct 6, 2020
6264fcc
Merge remote-tracking branch 'origin/branch-0.16' into window-count-fix
mythrocks Oct 7, 2020
c982404
Merge remote-tracking branch 'origin/branch-0.16' into window-count-fix
mythrocks Oct 8, 2020
d901023
Rolling Window count fix: Fixed formatting
mythrocks Oct 8, 2020
f7a8c90
Rolling Window count fix: Fixed process_rolling_window for COUNT_ALL
mythrocks Oct 8, 2020
f34835b
Rolling Window count fix: Use is_valid_no_check().
mythrocks Oct 8, 2020
62709b8
Rolling Window count fix: Python review
mythrocks Oct 12, 2020
7f9b391
Merge remote-tracking branch 'origin/branch-0.16' into window-count-fix
mythrocks Oct 12, 2020
6445c5d
Rolling Window count fix: Python review
mythrocks Oct 12, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@
- PR #6304 Fix span_tests.cu includes
- PR #6331 Avoids materializing `RangeIndex` during frame concatnation (when not needed)
- PR #6278 Add filter tests for struct columns
- PR #6344 Fix rolling-window count for null input
- PR #6353 Rename `skip_rows` parameter to `skiprows` in `read_parquet`, `read_avro` and `read_orc`
- PR #6361 Detect overflow in hash join
- PR #6397 Fix `build.sh` when `PARALLEL_LEVEL` environment variable isn't set
Expand Down
124 changes: 79 additions & 45 deletions cpp/src/rolling/rolling.cu
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
#include <thrust/binary_search.h>
#include <rmm/device_scalar.hpp>

#include <thrust/detail/execution_policy.h>
#include <thrust/iterator/counting_iterator.h>
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/detail/utilities/device_operators.cuh>
#include <cudf/utilities/error.hpp>
Expand All @@ -53,32 +55,66 @@ namespace cudf {
namespace detail {
namespace { // anonymous
/**
* @brief Only count operation is executed and count is updated
* @brief Only COUNT_VALID operation is executed and count is updated
* depending on `min_periods` and returns true if it was
* valid, else false.
*/
template <typename InputType,
typename OutputType,
typename agg_op,
aggregation::Kind op,
bool has_nulls>
std::enable_if_t<op == aggregation::COUNT_VALID || op == aggregation::COUNT_ALL, bool> __device__
process_rolling_window(column_device_view input,
column_device_view ignored_default_outputs,
mutable_column_device_view output,
size_type start_index,
size_type end_index,
size_type current_index,
size_type min_periods)
bool has_nulls,
std::enable_if_t<op == aggregation::COUNT_VALID>* = nullptr>
bool __device__ process_rolling_window(column_device_view input,
column_device_view ignored_default_outputs,
mutable_column_device_view output,
size_type start_index,
size_type end_index,
size_type current_index,
size_type min_periods)
{
// declare this as volatile to avoid some compiler optimizations that lead to incorrect results
// for CUDA 10.0 and below (fixed in CUDA 10.1)
volatile cudf::size_type count = 0;

for (size_type j = start_index; j < end_index; j++) {
if (op == aggregation::COUNT_ALL || !has_nulls || input.is_valid(j)) { count++; }
bool output_is_valid = ((end_index - start_index) >= min_periods);

if (output_is_valid) {
if (!has_nulls) {
count = end_index - start_index;
} else {
count = thrust::count_if(thrust::seq,
thrust::make_counting_iterator(start_index),
thrust::make_counting_iterator(end_index),
[&input](auto i) { return input.is_valid_nocheck(i); });
}
output.element<OutputType>(current_index) = count;
}

return output_is_valid;
}

/**
* @brief Only COUNT_ALL operation is executed and count is updated
* depending on `min_periods` and returns true if it was
* valid, else false.
*/
template <typename InputType,
typename OutputType,
typename agg_op,
aggregation::Kind op,
bool has_nulls,
std::enable_if_t<op == aggregation::COUNT_ALL>* = nullptr>
bool __device__ process_rolling_window(column_device_view input,
column_device_view ignored_default_outputs,
mutable_column_device_view output,
size_type start_index,
size_type end_index,
size_type current_index,
size_type min_periods)
{
cudf::size_type count = end_index - start_index;

bool output_is_valid = (count >= min_periods);
output.element<OutputType>(current_index) = count;

Expand All @@ -94,15 +130,15 @@ template <typename InputType,
typename OutputType,
typename agg_op,
aggregation::Kind op,
bool has_nulls>
std::enable_if_t<op == aggregation::ROW_NUMBER, bool> __device__
process_rolling_window(column_device_view input,
column_device_view ignored_default_outputs,
mutable_column_device_view output,
size_type start_index,
size_type end_index,
size_type current_index,
size_type min_periods)
bool has_nulls,
std::enable_if_t<op == aggregation::ROW_NUMBER>* = nullptr>
bool __device__ process_rolling_window(column_device_view input,
column_device_view ignored_default_outputs,
mutable_column_device_view output,
size_type start_index,
size_type end_index,
size_type current_index,
size_type min_periods)
{
bool output_is_valid = ((end_index - start_index) >= min_periods);
output.element<OutputType>(current_index) = ((current_index - start_index) + 1);
Expand Down Expand Up @@ -218,17 +254,16 @@ template <typename InputType,
typename OutputType,
typename agg_op,
aggregation::Kind op,
bool has_nulls>
std::enable_if_t<(op == aggregation::ARGMIN or op == aggregation::ARGMAX) and
std::is_same<InputType, cudf::string_view>::value,
bool>
__device__ process_rolling_window(column_device_view input,
column_device_view ignored_default_outputs,
mutable_column_device_view output,
size_type start_index,
size_type end_index,
size_type current_index,
size_type min_periods)
bool has_nulls,
std::enable_if_t<(op == aggregation::ARGMIN or op == aggregation::ARGMAX) and
std::is_same<InputType, cudf::string_view>::value>* = nullptr>
bool __device__ process_rolling_window(column_device_view input,
column_device_view ignored_default_outputs,
mutable_column_device_view output,
size_type start_index,
size_type end_index,
size_type current_index,
size_type min_periods)
{
// declare this as volatile to avoid some compiler optimizations that lead to incorrect results
// for CUDA 10.0 and below (fixed in CUDA 10.1)
Expand Down Expand Up @@ -263,19 +298,18 @@ template <typename InputType,
typename OutputType,
typename agg_op,
aggregation::Kind op,
bool has_nulls>
std::enable_if_t<!std::is_same<InputType, cudf::string_view>::value and
!(op == aggregation::COUNT_VALID || op == aggregation::COUNT_ALL ||
op == aggregation::ROW_NUMBER || op == aggregation::LEAD ||
op == aggregation::LAG),
bool>
__device__ process_rolling_window(column_device_view input,
column_device_view ignored_default_outputs,
mutable_column_device_view output,
size_type start_index,
size_type end_index,
size_type current_index,
size_type min_periods)
bool has_nulls,
std::enable_if_t<!std::is_same<InputType, cudf::string_view>::value and
!(op == aggregation::COUNT_VALID || op == aggregation::COUNT_ALL ||
op == aggregation::ROW_NUMBER || op == aggregation::LEAD ||
op == aggregation::LAG)>* = nullptr>
bool __device__ process_rolling_window(column_device_view input,
column_device_view ignored_default_outputs,
mutable_column_device_view output,
size_type start_index,
size_type end_index,
size_type current_index,
size_type min_periods)
{
// declare this as volatile to avoid some compiler optimizations that lead to incorrect results
// for CUDA 10.0 and below (fixed in CUDA 10.1)
Expand Down
4 changes: 2 additions & 2 deletions cpp/tests/grouped_rolling/grouped_rolling_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ class GroupedRollingTest : public cudf::test::BaseFixture {
if (include_nulls || !input.nullable() || cudf::bit_is_set(valid_mask, j)) count++;
}

ref_valid[i] = (count >= min_periods);
ref_valid[i] = ((end_index - start_index) >= min_periods);
if (ref_valid[i]) ref_data[i] = count;
}

Expand Down Expand Up @@ -861,7 +861,7 @@ class GroupedTimeRangeRollingTest : public cudf::test::BaseFixture {
if (include_nulls || !input.nullable() || cudf::bit_is_set(valid_mask, j)) count++;
}

ref_valid[i] = (count >= min_periods);
ref_valid[i] = ((end_index - start_index) >= min_periods);
if (ref_valid[i]) ref_data[i] = count;
}

Expand Down
6 changes: 3 additions & 3 deletions cpp/tests/rolling/rolling_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ TEST_F(RollingStringTest, MinPeriods)
cudf::test::strings_column_wrapper expected_max(
{"This", "test", "test", "test", "test", "string", "string", "string", "string"},
{0, 0, 0, 0, 1, 1, 1, 0, 0});
fixed_width_column_wrapper<size_type> expected_count_val({0, 2, 2, 2, 3, 3, 3, 3, 2},
{0, 0, 0, 0, 1, 1, 1, 0, 0});
fixed_width_column_wrapper<size_type> expected_count_val({1, 2, 1, 2, 3, 3, 3, 2, 2},
{1, 1, 1, 1, 1, 1, 1, 1, 0});
fixed_width_column_wrapper<size_type> expected_count_all({3, 4, 4, 4, 4, 4, 4, 3, 2},
{0, 1, 1, 1, 1, 1, 1, 0, 0});

Expand Down Expand Up @@ -248,7 +248,7 @@ class RollingTest : public cudf::test::BaseFixture {
if (include_nulls || !input.nullable() || cudf::bit_is_set(valid_mask, j)) count++;
}

ref_valid[i] = (count >= min_periods);
ref_valid[i] = ((end_index - start_index) >= min_periods);
if (ref_valid[i]) ref_data[i] = count;
}

Expand Down
4 changes: 2 additions & 2 deletions python/cudf/cudf/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,6 @@ def _apply_agg_dataframe(self, df, agg_name):
return result_df

def _apply_agg(self, agg_name):
if agg_name == "count" and not self._time_window:
self.min_periods = 0
if isinstance(self.obj, cudf.Series):
return self._apply_agg_series(self.obj, agg_name)
else:
Expand Down Expand Up @@ -388,6 +386,8 @@ def _window_to_window_sizes(self, window):
)

def _apply_agg(self, agg_name):
if agg_name == "count" and not self._time_window:
self.min_periods = 0
index = cudf.MultiIndex.from_frame(
cudf.DataFrame(
{
Expand Down
73 changes: 55 additions & 18 deletions python/cudf/cudf/tests/test_rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
@pytest.mark.parametrize("agg", ["sum", "min", "max", "mean", "count"])
@pytest.mark.parametrize("nulls", ["none", "one", "some", "all"])
@pytest.mark.parametrize("center", [True, False])
def test_rollling_series_basic(data, index, agg, nulls, center):
def test_rolling_series_basic(data, index, agg, nulls, center):
if PANDAS_GE_110:
kwargs = {"check_freq": False}
else:
Expand All @@ -47,15 +47,7 @@ def test_rollling_series_basic(data, index, agg, nulls, center):
got = getattr(
gsr.rolling(window_size, min_periods, center), agg
)().fillna(-1)
try:
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
assert_eq(expect, got, check_dtype=False, **kwargs)
except AssertionError as e:
if agg == "count" and data != []:
pytest.xfail(
reason="Differ from Pandas behavior for count"
)
else:
raise e
assert_eq(expect, got, check_dtype=False, **kwargs)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -97,17 +89,24 @@ def test_rolling_dataframe_basic(data, agg, nulls, center):
got = getattr(
gdf.rolling(window_size, min_periods, center), agg
)().fillna(-1)
try:
assert_eq(expect, got, check_dtype=False)
except AssertionError as e:
if agg == "count" and len(pdf) > 0:
pytest.xfail(reason="Differ from pandas behavior here")
else:
raise e
assert_eq(expect, got, check_dtype=False)


@pytest.mark.parametrize(
"agg", ["sum", pytest.param("min"), pytest.param("max"), "mean", "count"]
"agg",
[
pytest.param("sum"),
pytest.param("min"),
pytest.param("max"),
pytest.param("mean"),
pytest.param(
"count", # Does not follow similar conventions as
mythrocks marked this conversation as resolved.
Show resolved Hide resolved
# with non-offset columns
marks=pytest.mark.xfail(
reason="Differs from pandas behaviour here"
),
),
],
)
def test_rolling_with_offset(agg):
psr = pd.Series(
Expand All @@ -129,6 +128,44 @@ def test_rolling_with_offset(agg):
)


def test_rolling_count_with_offset():
"""
This test covers the xfail case from test_rolling_with_offset["count"].
It is expected that count should return a non-Nan value, even if
the counted value is a Nan, unless the min-periods condition
is not met.
This behaviour is consistent with counts for rolling-windows,
in the non-offset window case.
"""
psr = pd.Series(
[1, 2, 4, 4, np.nan, 9],
index=[
pd.Timestamp("20190101 09:00:00"),
pd.Timestamp("20190101 09:00:01"),
pd.Timestamp("20190101 09:00:02"),
pd.Timestamp("20190101 09:00:04"),
pd.Timestamp("20190101 09:00:07"),
pd.Timestamp("20190101 09:00:08"),
],
)
gsr = cudf.from_pandas(psr)
assert_eq(
getattr(gsr.rolling("2s"), "count")().fillna(-1),
pd.Series(
[1, 2, 2, 1, 0, 1],
index=[
pd.Timestamp("20190101 09:00:00"),
pd.Timestamp("20190101 09:00:01"),
pd.Timestamp("20190101 09:00:02"),
pd.Timestamp("20190101 09:00:04"),
pd.Timestamp("20190101 09:00:07"),
pd.Timestamp("20190101 09:00:08"),
],
),
check_dtype=False,
)


def test_rolling_getattr():
pdf = pd.DataFrame({"a": [1, 2, 3, 4], "b": [1, 2, 3, 4]})
gdf = cudf.from_pandas(pdf)
Expand Down