Skip to content

Commit

Permalink
Add python bindings to fixed-size window and groupby rolling.var, `…
Browse files Browse the repository at this point in the history
…rolling.std` (#9097)

Closes #8695
Closes #8696 

This PR creates bindings for rolling aggregations for variance and standard deviations. Unlike pandas, the underlying implementation from libcudf computes each window independently from other windows.

Authors:
  - Michael Wang (https://github.com/isVoid)

Approvers:
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Sheilah Kirui (https://github.com/skirui-source)

URL: #9097
  • Loading branch information
isVoid authored Sep 15, 2021
1 parent d069d7e commit a855660
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 12 deletions.
18 changes: 18 additions & 0 deletions python/cudf/cudf/_lib/aggregation.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,24 @@ cdef class RollingAggregation:
libcudf_aggregation.make_mean_aggregation[rolling_aggregation]())
return agg

@classmethod
def var(cls, ddof=1):
cdef RollingAggregation agg = cls()
agg.c_obj = move(
libcudf_aggregation.make_variance_aggregation[rolling_aggregation](
ddof
)
)
return agg

@classmethod
def std(cls, ddof=1):
cdef RollingAggregation agg = cls()
agg.c_obj = move(
libcudf_aggregation.make_std_aggregation[rolling_aggregation](ddof)
)
return agg

@classmethod
def count(cls, dropna=True):
cdef libcudf_types.null_policy c_null_handling
Expand Down
13 changes: 10 additions & 3 deletions python/cudf/cudf/_lib/rolling.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@ from cudf._lib.cpp.rolling cimport rolling_window as cpp_rolling_window
from cudf._lib.cpp.types cimport size_type


def rolling(Column source_column, Column pre_column_window,
Column fwd_column_window, window, min_periods, center, op):
def rolling(Column source_column,
Column pre_column_window,
Column fwd_column_window,
window,
min_periods,
center,
op,
agg_params):
"""
Rolling on input executing operation within the given window for each row
Expand All @@ -33,6 +39,7 @@ def rolling(Column source_column, Column pre_column_window,
center : Set the labels at the center of the window
op : operation to be executed, as of now it supports MIN, MAX, COUNT, SUM,
MEAN and UDF
agg_params : dict, parameter for the aggregation (e.g. ddof for VAR/STD)
Returns
-------
Expand All @@ -51,7 +58,7 @@ def rolling(Column source_column, Column pre_column_window,
cython_agg = make_rolling_aggregation(
op, {'dtype': source_column.dtype})
else:
cython_agg = make_rolling_aggregation(op)
cython_agg = make_rolling_aggregation(op, agg_params)

if window is None:
if center:
Expand Down
10 changes: 10 additions & 0 deletions python/cudf/cudf/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def __init__(
self.min_periods = min_periods
self.center = center
self._normalize()
self.agg_params = {}
if axis != 0:
raise NotImplementedError("axis != 0 is not supported yet.")
self.axis = axis
Expand Down Expand Up @@ -237,6 +238,7 @@ def _apply_agg_series(self, sr, agg_name):
min_periods=min_periods,
center=self.center,
op=agg_name,
agg_params=self.agg_params,
)
return sr._from_data({sr.name: result_col}, sr._index)

Expand Down Expand Up @@ -266,6 +268,14 @@ def max(self):
def mean(self):
return self._apply_agg("mean")

def var(self, ddof=1):
self.agg_params["ddof"] = ddof
return self._apply_agg("var")

def std(self, ddof=1):
self.agg_params["ddof"] = ddof
return self._apply_agg("std")

def count(self):
return self._apply_agg("count")

Expand Down
106 changes: 97 additions & 9 deletions python/cudf/cudf/tests/test_rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import cudf
from cudf.core._compat import PANDAS_GE_110
from cudf.testing._utils import assert_eq
from cudf.testing.dataset_generator import rand_dataframe


@pytest.mark.parametrize(
Expand All @@ -20,20 +21,23 @@
([1, 2, 4, 9, 9, 4], ["a", "b", "c", "d", "e", "f"]),
],
)
@pytest.mark.parametrize("agg", ["sum", "min", "max", "mean", "count"])
@pytest.mark.parametrize(
"agg", ["sum", "min", "max", "mean", "count", "std", "var"]
)
@pytest.mark.parametrize("nulls", ["none", "one", "some", "all"])
@pytest.mark.parametrize("center", [True, False])
def test_rolling_series_basic(data, index, agg, nulls, center):
rng = np.random.default_rng(1)
if PANDAS_GE_110:
kwargs = {"check_freq": False}
else:
kwargs = {}
if len(data) > 0:
if nulls == "one":
p = np.random.randint(0, len(data))
p = rng.integers(0, len(data))
data[p] = np.nan
elif nulls == "some":
p1, p2 = np.random.randint(0, len(data), (2,))
p1, p2 = rng.integers(0, len(data), (2,))
data[p1] = np.nan
data[p2] = np.nan
elif nulls == "all":
Expand Down Expand Up @@ -64,19 +68,22 @@ def test_rolling_series_basic(data, index, agg, nulls, center):
},
],
)
@pytest.mark.parametrize("agg", ["sum", "min", "max", "mean", "count"])
@pytest.mark.parametrize(
"agg", ["sum", "min", "max", "mean", "count", "std", "var"]
)
@pytest.mark.parametrize("nulls", ["none", "one", "some", "all"])
@pytest.mark.parametrize("center", [True, False])
def test_rolling_dataframe_basic(data, agg, nulls, center):
rng = np.random.default_rng(0)
pdf = pd.DataFrame(data)

if len(pdf) > 0:
for col_name in pdf.columns:
if nulls == "one":
p = np.random.randint(0, len(data))
p = rng.integers(0, len(data))
pdf[col_name][p] = np.nan
elif nulls == "some":
p1, p2 = np.random.randint(0, len(data), (2,))
p1, p2 = rng.integers(0, len(data), (2,))
pdf[col_name][p1] = np.nan
pdf[col_name][p2] = np.nan
elif nulls == "all":
Expand All @@ -102,6 +109,8 @@ def test_rolling_dataframe_basic(data, agg, nulls, center):
pytest.param("max"),
pytest.param("mean"),
pytest.param("count"),
pytest.param("std"),
pytest.param("var"),
],
)
def test_rolling_with_offset(agg):
Expand All @@ -124,6 +133,79 @@ def test_rolling_with_offset(agg):
)


@pytest.mark.parametrize("agg", ["std", "var"])
@pytest.mark.parametrize("ddof", [0, 1])
@pytest.mark.parametrize("center", [True, False])
@pytest.mark.parametrize("seed", [100, 2000])
@pytest.mark.parametrize("window_size", [2, 10, 100])
def test_rolling_var_std_large(agg, ddof, center, seed, window_size):
if PANDAS_GE_110:
kwargs = {"check_freq": False}
else:
kwargs = {}

iupper_bound = math.sqrt(np.iinfo(np.int64).max / window_size)
ilower_bound = -math.sqrt(abs(np.iinfo(np.int64).min) / window_size)

fupper_bound = math.sqrt(np.finfo(np.float64).max / window_size)
flower_bound = -math.sqrt(abs(np.finfo(np.float64).min) / window_size)

n_rows = 1_000
data = rand_dataframe(
dtypes_meta=[
{
"dtype": "int64",
"null_frequency": 0.4,
"cardinality": n_rows,
"min_bound": ilower_bound,
"max_bound": iupper_bound,
},
{
"dtype": "float64",
"null_frequency": 0.4,
"cardinality": n_rows,
"min_bound": flower_bound,
"max_bound": fupper_bound,
},
{
"dtype": "decimal64",
"null_frequency": 0.4,
"cardinality": n_rows,
"min_bound": ilower_bound,
"max_bound": iupper_bound,
},
],
rows=n_rows,
use_threads=False,
seed=seed,
)
pdf = data.to_pandas()
gdf = cudf.from_pandas(pdf)

expect = getattr(pdf.rolling(window_size, 1, center), agg)(ddof=ddof)
got = getattr(gdf.rolling(window_size, 1, center), agg)(ddof=ddof)

assert_eq(expect, got, **kwargs)


@pytest.mark.xfail
def test_rolling_var_uniform_window():
"""
Pandas adopts an online variance calculation algorithm. This gives a
floating point artifact.
https://github.com/pandas-dev/pandas/issues/37051
In cudf, each window is computed independently from the previous window,
this gives better numeric precision.
"""

s = pd.Series([1e8, 5, 5, 5])
expected = s.rolling(3).var()
got = cudf.from_pandas(s).rolling(3).var()

assert_eq(expected, got)


def test_rolling_count_with_offset():
"""
This test covers the xfail case from test_rolling_with_offset["count"].
Expand Down Expand Up @@ -300,7 +382,9 @@ def some_func(A):
)


@pytest.mark.parametrize("agg", ["sum", "min", "max", "mean", "count"])
@pytest.mark.parametrize(
"agg", ["sum", "min", "max", "mean", "count", "var", "std"]
)
def test_rolling_groupby_simple(agg):
pdf = pd.DataFrame(
{
Expand Down Expand Up @@ -330,7 +414,9 @@ def test_rolling_groupby_simple(agg):
assert_eq(expect, got, check_dtype=False)


@pytest.mark.parametrize("agg", ["sum", "min", "max", "mean", "count"])
@pytest.mark.parametrize(
"agg", ["sum", "min", "max", "mean", "count", "var", "std"]
)
def test_rolling_groupby_multi(agg):
pdf = pd.DataFrame(
{
Expand All @@ -351,7 +437,9 @@ def test_rolling_groupby_multi(agg):
assert_eq(expect, got, check_dtype=False)


@pytest.mark.parametrize("agg", ["sum", "min", "max", "mean", "count"])
@pytest.mark.parametrize(
"agg", ["sum", "min", "max", "mean", "count", "var", "std"]
)
@pytest.mark.parametrize(
"window_size", ["1d", "2d", "3d", "4d", "5d", "6d", "7d"]
)
Expand Down

0 comments on commit a855660

Please sign in to comment.