Skip to content

Commit

Permalink
Implement a mixin for scans (#10358)
Browse files Browse the repository at this point in the history
This PR builds on the framework introduced in #9925 to implement scans.

Authors:
  - Vyas Ramasubramani (https://github.com/vyasr)

Approvers:
  - Ashwin Srinath (https://github.com/shwina)

URL: #10358
  • Loading branch information
vyasr authored Mar 7, 2022
1 parent e610108 commit b782281
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 205 deletions.
18 changes: 13 additions & 5 deletions python/cudf/cudf/core/column/numerical_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
from cudf import _lib as libcudf
from cudf._typing import ScalarLike
from cudf.core.column import ColumnBase
from cudf.core.mixins import Scannable


class NumericalBaseColumn(ColumnBase):
class NumericalBaseColumn(ColumnBase, Scannable):
"""A column composed of numerical data.
This class encodes a standard interface for different types of columns
Expand All @@ -32,6 +33,13 @@ class NumericalBaseColumn(ColumnBase):
"std",
}

_VALID_SCANS = {
"cumsum",
"cumprod",
"cummin",
"cummax",
}

def _can_return_nan(self, skipna: bool = None) -> bool:
return not skipna and self.has_nulls()

Expand Down Expand Up @@ -174,7 +182,7 @@ def round(
"""Round the values in the Column to the given number of decimals."""
return libcudf.round.round(self, decimal_places=decimals, how=how)

def _apply_scan_op(self, op: str) -> ColumnBase:
return libcudf.reduce.scan(op, self, True)._with_type_metadata(
self.dtype
)
def _scan(self, op: str) -> ColumnBase:
return libcudf.reduce.scan(
op.replace("cum", ""), self, True
)._with_type_metadata(self.dtype)
2 changes: 1 addition & 1 deletion python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5158,7 +5158,7 @@ def _scan(
if axis == 0:
return super()._scan(op, axis=axis, *args, **kwargs)
elif axis == 1:
return self._apply_cupy_method_axis_1(f"cum{op}", **kwargs)
return self._apply_cupy_method_axis_1(op, **kwargs)

@annotate("DATAFRAME_MODE", color="green", domain="cudf_python")
def mode(self, axis=0, numeric_only=False, dropna=True):
Expand Down
240 changes: 58 additions & 182 deletions python/cudf/cudf/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
)
from cudf.core.column_accessor import ColumnAccessor
from cudf.core.join import Merge, MergeSemi
from cudf.core.mixins import BinaryOperand
from cudf.core.mixins import BinaryOperand, Scannable
from cudf.core.window import Rolling
from cudf.utils import ioutils
from cudf.utils.docutils import copy_docstring
Expand Down Expand Up @@ -99,7 +99,7 @@
}


class Frame(BinaryOperand):
class Frame(BinaryOperand, Scannable):
"""A collection of Column objects with an optional index.
Parameters
Expand All @@ -118,6 +118,21 @@ class Frame(BinaryOperand):

_VALID_BINARY_OPERATIONS = BinaryOperand._SUPPORTED_BINARY_OPERATIONS

_VALID_SCANS = {
"cumsum",
"cumprod",
"cummin",
"cummax",
}

# Necessary because the function names don't directly map to the docs.
_SCAN_DOCSTRINGS = {
"cumsum": {"op_name": "cumulative sum"},
"cumprod": {"op_name": "cumulative product"},
"cummin": {"op_name": "cumulative min"},
"cummax": {"op_name": "cumulative max"},
}

def __init__(self, data=None, index=None):
if data is None:
data = {}
Expand Down Expand Up @@ -4368,151 +4383,23 @@ def median(

# Scans
@annotate("FRAME_SCAN", color="green", domain="cudf_python")
def _scan(self, op, axis=None, skipna=True, cast_to_int=False):
skipna = True if skipna is None else skipna

results = {}
for name, col in self._data.items():
if skipna:
try:
result_col = col.nans_to_nulls()
except AttributeError:
result_col = col
else:
if col.has_nulls(include_nan=True):
# Workaround as find_first_value doesn't seem to work
# incase of bools.
first_index = int(
col.isnull().astype("int8").find_first_value(1)
)
result_col = col.copy()
result_col[first_index:] = None
else:
result_col = col

if (
cast_to_int
and not is_decimal_dtype(result_col.dtype)
and (
np.issubdtype(result_col.dtype, np.integer)
or np.issubdtype(result_col.dtype, np.bool_)
)
):
# For reductions that accumulate a value (e.g. sum, not max)
# pandas returns an int64 dtype for all int or bool dtypes.
result_col = result_col.astype(np.int64)
results[name] = result_col._apply_scan_op(op)
# TODO: This will work for Index because it's passing self._index
# (which is None), but eventually we may want to remove that parameter
# for Index._from_data and simplify.
return self._from_data(results, index=self._index)

@annotate("FRAME_CUMMIN", color="green", domain="cudf_python")
def cummin(self, axis=None, skipna=True, *args, **kwargs):
def _scan(self, op, axis=None, skipna=True):
"""
Return cumulative minimum of the Series or DataFrame.
Return {op_name} of the {cls}.
Parameters
----------
axis: {index (0), columns(1)}
axis: {{index (0), columns(1)}}
Axis for the function to be applied on.
skipna: bool, default True
Exclude NA/null values. If an entire row/column is NA,
the result will be NA.
Returns
-------
Series or DataFrame
Examples
--------
**Series**
>>> import cudf
>>> ser = cudf.Series([1, 5, 2, 4, 3])
>>> ser.cummin()
0 1
1 1
2 1
3 1
4 1
**DataFrame**
>>> import cudf
>>> df = cudf.DataFrame({'a': [1, 2, 3, 4], 'b': [7, 8, 9, 10]})
>>> df.cummin()
a b
0 1 7
1 1 7
2 1 7
3 1 7
"""
return self._scan("min", axis=axis, skipna=skipna, *args, **kwargs)

@annotate("FRAME_CUMMAX", color="green", domain="cudf_python")
def cummax(self, axis=None, skipna=True, *args, **kwargs):
"""
Return cumulative maximum of the Series or DataFrame.
Parameters
----------
axis: {index (0), columns(1)}
Axis for the function to be applied on.
skipna: bool, default True
Exclude NA/null values. If an entire row/column is NA,
the result will be NA.
Returns
-------
Series or DataFrame
Examples
--------
**Series**
>>> import cudf
>>> ser = cudf.Series([1, 5, 2, 4, 3])
>>> ser.cummax()
0 1
1 5
2 5
3 5
4 5
**DataFrame**
>>> import cudf
>>> df = cudf.DataFrame({'a': [1, 2, 3, 4], 'b': [7, 8, 9, 10]})
>>> df.cummax()
a b
0 1 7
1 2 8
2 3 9
3 4 10
"""
return self._scan("max", axis=axis, skipna=skipna, *args, **kwargs)

@annotate("FRAME_CUMSUM", color="green", domain="cudf_python")
def cumsum(self, axis=None, skipna=True, *args, **kwargs):
"""
Return cumulative sum of the Series or DataFrame.
Parameters
----------
axis: {index (0), columns(1)}
Axis for the function to be applied on.
skipna: bool, default True
Exclude NA/null values. If an entire row/column is NA,
the result will be NA.
Returns
-------
Series or DataFrame
{cls}
Examples
--------
Expand All @@ -4530,63 +4417,52 @@ def cumsum(self, axis=None, skipna=True, *args, **kwargs):
**DataFrame**
>>> import cudf
>>> df = cudf.DataFrame({'a': [1, 2, 3, 4], 'b': [7, 8, 9, 10]})
>>> df = cudf.DataFrame({{'a': [1, 2, 3, 4], 'b': [7, 8, 9, 10]}})
>>> s.cumsum()
a b
0 1 7
1 3 15
2 6 24
3 10 34
"""
return self._scan(
"sum", axis=axis, skipna=skipna, cast_to_int=True, *args, **kwargs
)

@annotate("FRAME_CUMPROD", color="green", domain="cudf_python")
def cumprod(self, axis=None, skipna=True, *args, **kwargs):
"""
Return cumulative product of the Series or DataFrame.
Parameters
----------
axis: {index (0), columns(1)}
Axis for the function to be applied on.
skipna: bool, default True
Exclude NA/null values. If an entire row/column is NA,
the result will be NA.
Returns
-------
Series or DataFrame
Examples
--------
**Series**
>>> import cudf
>>> ser = cudf.Series([1, 5, 2, 4, 3])
>>> ser.cumprod()
0 1
1 5
2 10
3 40
4 120
cast_to_int = op in ("cumsum", "cumprod")
skipna = True if skipna is None else skipna

**DataFrame**
results = {}
for name, col in self._data.items():
if skipna:
try:
result_col = col.nans_to_nulls()
except AttributeError:
result_col = col
else:
if col.has_nulls(include_nan=True):
# Workaround as find_first_value doesn't seem to work
# incase of bools.
first_index = int(
col.isnull().astype("int8").find_first_value(1)
)
result_col = col.copy()
result_col[first_index:] = None
else:
result_col = col

>>> import cudf
>>> df = cudf.DataFrame({'a': [1, 2, 3, 4], 'b': [7, 8, 9, 10]})
>>> s.cumprod()
a b
0 1 7
1 2 56
2 6 504
3 24 5040
"""
return self._scan(
"prod", axis=axis, skipna=skipna, cast_to_int=True, *args, **kwargs
)
if (
cast_to_int
and not is_decimal_dtype(result_col.dtype)
and (
np.issubdtype(result_col.dtype, np.integer)
or np.issubdtype(result_col.dtype, np.bool_)
)
):
# For reductions that accumulate a value (e.g. sum, not max)
# pandas returns an int64 dtype for all int or bool dtypes.
result_col = result_col.astype(np.int64)
results[name] = getattr(result_col, op)()
# TODO: This will work for Index because it's passing self._index
# (which is None), but eventually we may want to remove that parameter
# for Index._from_data and simplify.
return self._from_data(results, index=self._index)

@annotate("FRAME_TO_JSON", color="green", domain="cudf_python")
@ioutils.doc_to_json()
Expand Down
Loading

0 comments on commit b782281

Please sign in to comment.