diff --git a/CHANGELOG.md b/CHANGELOG.md index b899d6b5544..c5b2d926c7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -175,6 +175,7 @@ - PR #6304 Fix span_tests.cu includes - PR #6331 Avoids materializing `RangeIndex` during frame concatnation (when not needed) - PR #6278 Add filter tests for struct columns +- PR #6344 Fix rolling-window count for null input - PR #6353 Rename `skip_rows` parameter to `skiprows` in `read_parquet`, `read_avro` and `read_orc` - PR #6361 Detect overflow in hash join - PR #6397 Fix `build.sh` when `PARALLEL_LEVEL` environment variable isn't set diff --git a/cpp/src/rolling/rolling.cu b/cpp/src/rolling/rolling.cu index 362f572f94d..36cfb053adb 100644 --- a/cpp/src/rolling/rolling.cu +++ b/cpp/src/rolling/rolling.cu @@ -43,6 +43,8 @@ #include #include +#include +#include #include #include #include @@ -53,7 +55,7 @@ namespace cudf { namespace detail { namespace { // anonymous /** - * @brief Only count operation is executed and count is updated + * @brief Only COUNT_VALID operation is executed and count is updated * depending on `min_periods` and returns true if it was * valid, else false. */ @@ -61,24 +63,58 @@ template -std::enable_if_t __device__ -process_rolling_window(column_device_view input, - column_device_view ignored_default_outputs, - mutable_column_device_view output, - size_type start_index, - size_type end_index, - size_type current_index, - size_type min_periods) + bool has_nulls, + std::enable_if_t* = nullptr> +bool __device__ process_rolling_window(column_device_view input, + column_device_view ignored_default_outputs, + mutable_column_device_view output, + size_type start_index, + size_type end_index, + size_type current_index, + size_type min_periods) { // declare this as volatile to avoid some compiler optimizations that lead to incorrect results // for CUDA 10.0 and below (fixed in CUDA 10.1) volatile cudf::size_type count = 0; - for (size_type j = start_index; j < end_index; j++) { - if (op == aggregation::COUNT_ALL || !has_nulls || input.is_valid(j)) { count++; } + bool output_is_valid = ((end_index - start_index) >= min_periods); + + if (output_is_valid) { + if (!has_nulls) { + count = end_index - start_index; + } else { + count = thrust::count_if(thrust::seq, + thrust::make_counting_iterator(start_index), + thrust::make_counting_iterator(end_index), + [&input](auto i) { return input.is_valid_nocheck(i); }); + } + output.element(current_index) = count; } + return output_is_valid; +} + +/** + * @brief Only COUNT_ALL operation is executed and count is updated + * depending on `min_periods` and returns true if it was + * valid, else false. + */ +template * = nullptr> +bool __device__ process_rolling_window(column_device_view input, + column_device_view ignored_default_outputs, + mutable_column_device_view output, + size_type start_index, + size_type end_index, + size_type current_index, + size_type min_periods) +{ + cudf::size_type count = end_index - start_index; + bool output_is_valid = (count >= min_periods); output.element(current_index) = count; @@ -94,15 +130,15 @@ template -std::enable_if_t __device__ -process_rolling_window(column_device_view input, - column_device_view ignored_default_outputs, - mutable_column_device_view output, - size_type start_index, - size_type end_index, - size_type current_index, - size_type min_periods) + bool has_nulls, + std::enable_if_t* = nullptr> +bool __device__ process_rolling_window(column_device_view input, + column_device_view ignored_default_outputs, + mutable_column_device_view output, + size_type start_index, + size_type end_index, + size_type current_index, + size_type min_periods) { bool output_is_valid = ((end_index - start_index) >= min_periods); output.element(current_index) = ((current_index - start_index) + 1); @@ -218,17 +254,16 @@ template -std::enable_if_t<(op == aggregation::ARGMIN or op == aggregation::ARGMAX) and - std::is_same::value, - bool> - __device__ process_rolling_window(column_device_view input, - column_device_view ignored_default_outputs, - mutable_column_device_view output, - size_type start_index, - size_type end_index, - size_type current_index, - size_type min_periods) + bool has_nulls, + std::enable_if_t<(op == aggregation::ARGMIN or op == aggregation::ARGMAX) and + std::is_same::value>* = nullptr> +bool __device__ process_rolling_window(column_device_view input, + column_device_view ignored_default_outputs, + mutable_column_device_view output, + size_type start_index, + size_type end_index, + size_type current_index, + size_type min_periods) { // declare this as volatile to avoid some compiler optimizations that lead to incorrect results // for CUDA 10.0 and below (fixed in CUDA 10.1) @@ -263,19 +298,18 @@ template -std::enable_if_t::value and - !(op == aggregation::COUNT_VALID || op == aggregation::COUNT_ALL || - op == aggregation::ROW_NUMBER || op == aggregation::LEAD || - op == aggregation::LAG), - bool> - __device__ process_rolling_window(column_device_view input, - column_device_view ignored_default_outputs, - mutable_column_device_view output, - size_type start_index, - size_type end_index, - size_type current_index, - size_type min_periods) + bool has_nulls, + std::enable_if_t::value and + !(op == aggregation::COUNT_VALID || op == aggregation::COUNT_ALL || + op == aggregation::ROW_NUMBER || op == aggregation::LEAD || + op == aggregation::LAG)>* = nullptr> +bool __device__ process_rolling_window(column_device_view input, + column_device_view ignored_default_outputs, + mutable_column_device_view output, + size_type start_index, + size_type end_index, + size_type current_index, + size_type min_periods) { // declare this as volatile to avoid some compiler optimizations that lead to incorrect results // for CUDA 10.0 and below (fixed in CUDA 10.1) diff --git a/cpp/tests/grouped_rolling/grouped_rolling_test.cpp b/cpp/tests/grouped_rolling/grouped_rolling_test.cpp index 19731da4daf..64ce1ef7930 100644 --- a/cpp/tests/grouped_rolling/grouped_rolling_test.cpp +++ b/cpp/tests/grouped_rolling/grouped_rolling_test.cpp @@ -281,7 +281,7 @@ class GroupedRollingTest : public cudf::test::BaseFixture { if (include_nulls || !input.nullable() || cudf::bit_is_set(valid_mask, j)) count++; } - ref_valid[i] = (count >= min_periods); + ref_valid[i] = ((end_index - start_index) >= min_periods); if (ref_valid[i]) ref_data[i] = count; } @@ -861,7 +861,7 @@ class GroupedTimeRangeRollingTest : public cudf::test::BaseFixture { if (include_nulls || !input.nullable() || cudf::bit_is_set(valid_mask, j)) count++; } - ref_valid[i] = (count >= min_periods); + ref_valid[i] = ((end_index - start_index) >= min_periods); if (ref_valid[i]) ref_data[i] = count; } diff --git a/cpp/tests/rolling/rolling_test.cpp b/cpp/tests/rolling/rolling_test.cpp index 38b326e108b..4a6a028f063 100644 --- a/cpp/tests/rolling/rolling_test.cpp +++ b/cpp/tests/rolling/rolling_test.cpp @@ -106,8 +106,8 @@ TEST_F(RollingStringTest, MinPeriods) cudf::test::strings_column_wrapper expected_max( {"This", "test", "test", "test", "test", "string", "string", "string", "string"}, {0, 0, 0, 0, 1, 1, 1, 0, 0}); - fixed_width_column_wrapper expected_count_val({0, 2, 2, 2, 3, 3, 3, 3, 2}, - {0, 0, 0, 0, 1, 1, 1, 0, 0}); + fixed_width_column_wrapper expected_count_val({1, 2, 1, 2, 3, 3, 3, 2, 2}, + {1, 1, 1, 1, 1, 1, 1, 1, 0}); fixed_width_column_wrapper expected_count_all({3, 4, 4, 4, 4, 4, 4, 3, 2}, {0, 1, 1, 1, 1, 1, 1, 0, 0}); @@ -248,7 +248,7 @@ class RollingTest : public cudf::test::BaseFixture { if (include_nulls || !input.nullable() || cudf::bit_is_set(valid_mask, j)) count++; } - ref_valid[i] = (count >= min_periods); + ref_valid[i] = ((end_index - start_index) >= min_periods); if (ref_valid[i]) ref_data[i] = count; } diff --git a/python/cudf/cudf/core/window/rolling.py b/python/cudf/cudf/core/window/rolling.py index 2dda4d25bcf..a536d47a89d 100644 --- a/python/cudf/cudf/core/window/rolling.py +++ b/python/cudf/cudf/core/window/rolling.py @@ -230,8 +230,6 @@ def _apply_agg_dataframe(self, df, agg_name): return result_df def _apply_agg(self, agg_name): - if agg_name == "count" and not self._time_window: - self.min_periods = 0 if isinstance(self.obj, cudf.Series): return self._apply_agg_series(self.obj, agg_name) else: @@ -388,6 +386,8 @@ def _window_to_window_sizes(self, window): ) def _apply_agg(self, agg_name): + if agg_name == "count" and not self._time_window: + self.min_periods = 0 index = cudf.MultiIndex.from_frame( cudf.DataFrame( { diff --git a/python/cudf/cudf/tests/test_rolling.py b/python/cudf/cudf/tests/test_rolling.py index 9f103e8a68b..f8e7fc5b4f3 100644 --- a/python/cudf/cudf/tests/test_rolling.py +++ b/python/cudf/cudf/tests/test_rolling.py @@ -21,7 +21,7 @@ @pytest.mark.parametrize("agg", ["sum", "min", "max", "mean", "count"]) @pytest.mark.parametrize("nulls", ["none", "one", "some", "all"]) @pytest.mark.parametrize("center", [True, False]) -def test_rollling_series_basic(data, index, agg, nulls, center): +def test_rolling_series_basic(data, index, agg, nulls, center): if PANDAS_GE_110: kwargs = {"check_freq": False} else: @@ -47,15 +47,7 @@ def test_rollling_series_basic(data, index, agg, nulls, center): got = getattr( gsr.rolling(window_size, min_periods, center), agg )().fillna(-1) - try: - assert_eq(expect, got, check_dtype=False, **kwargs) - except AssertionError as e: - if agg == "count" and data != []: - pytest.xfail( - reason="Differ from Pandas behavior for count" - ) - else: - raise e + assert_eq(expect, got, check_dtype=False, **kwargs) @pytest.mark.parametrize( @@ -97,17 +89,24 @@ def test_rolling_dataframe_basic(data, agg, nulls, center): got = getattr( gdf.rolling(window_size, min_periods, center), agg )().fillna(-1) - try: - assert_eq(expect, got, check_dtype=False) - except AssertionError as e: - if agg == "count" and len(pdf) > 0: - pytest.xfail(reason="Differ from pandas behavior here") - else: - raise e + assert_eq(expect, got, check_dtype=False) @pytest.mark.parametrize( - "agg", ["sum", pytest.param("min"), pytest.param("max"), "mean", "count"] + "agg", + [ + pytest.param("sum"), + pytest.param("min"), + pytest.param("max"), + pytest.param("mean"), + pytest.param( + "count", # Does not follow similar conventions as + # with non-offset columns + marks=pytest.mark.xfail( + reason="Differs from pandas behaviour here" + ), + ), + ], ) def test_rolling_with_offset(agg): psr = pd.Series( @@ -129,6 +128,44 @@ def test_rolling_with_offset(agg): ) +def test_rolling_count_with_offset(): + """ + This test covers the xfail case from test_rolling_with_offset["count"]. + It is expected that count should return a non-Nan value, even if + the counted value is a Nan, unless the min-periods condition + is not met. + This behaviour is consistent with counts for rolling-windows, + in the non-offset window case. + """ + psr = pd.Series( + [1, 2, 4, 4, np.nan, 9], + index=[ + pd.Timestamp("20190101 09:00:00"), + pd.Timestamp("20190101 09:00:01"), + pd.Timestamp("20190101 09:00:02"), + pd.Timestamp("20190101 09:00:04"), + pd.Timestamp("20190101 09:00:07"), + pd.Timestamp("20190101 09:00:08"), + ], + ) + gsr = cudf.from_pandas(psr) + assert_eq( + getattr(gsr.rolling("2s"), "count")().fillna(-1), + pd.Series( + [1, 2, 2, 1, 0, 1], + index=[ + pd.Timestamp("20190101 09:00:00"), + pd.Timestamp("20190101 09:00:01"), + pd.Timestamp("20190101 09:00:02"), + pd.Timestamp("20190101 09:00:04"), + pd.Timestamp("20190101 09:00:07"), + pd.Timestamp("20190101 09:00:08"), + ], + ), + check_dtype=False, + ) + + def test_rolling_getattr(): pdf = pd.DataFrame({"a": [1, 2, 3, 4], "b": [1, 2, 3, 4]}) gdf = cudf.from_pandas(pdf)