diff --git a/asv_bench/benchmarks/groupby.py b/asv_bench/benchmarks/groupby.py index 61db39528a5fb..c347442784d41 100644 --- a/asv_bench/benchmarks/groupby.py +++ b/asv_bench/benchmarks/groupby.py @@ -370,11 +370,11 @@ class GroupByMethods(object): param_names = ['dtype', 'method'] params = [['int', 'float'], - ['all', 'any', 'count', 'cumcount', 'cummax', 'cummin', - 'cumprod', 'cumsum', 'describe', 'first', 'head', 'last', 'mad', - 'max', 'min', 'median', 'mean', 'nunique', 'pct_change', 'prod', - 'rank', 'sem', 'shift', 'size', 'skew', 'std', 'sum', 'tail', - 'unique', 'value_counts', 'var']] + ['all', 'any', 'bfill', 'count', 'cumcount', 'cummax', 'cummin', + 'cumprod', 'cumsum', 'describe', 'ffill', 'first', 'head', + 'last', 'mad', 'max', 'min', 'median', 'mean', 'nunique', + 'pct_change', 'prod', 'rank', 'sem', 'shift', 'size', 'skew', + 'std', 'sum', 'tail', 'unique', 'value_counts', 'var']] def setup(self, dtype, method): ngroups = 1000 diff --git a/doc/source/whatsnew/v0.23.0.txt b/doc/source/whatsnew/v0.23.0.txt index fd3c3a5a7a301..fcaf46b1c3d71 100644 --- a/doc/source/whatsnew/v0.23.0.txt +++ b/doc/source/whatsnew/v0.23.0.txt @@ -689,6 +689,7 @@ Performance Improvements - Improved performance of pairwise ``.rolling()`` and ``.expanding()`` with ``.cov()`` and ``.corr()`` operations (:issue:`17917`) - Improved performance of :func:`DataFrameGroupBy.rank` (:issue:`15779`) - Improved performance of variable ``.rolling()`` on ``.min()`` and ``.max()`` (:issue:`19521`) +- Improved performance of ``GroupBy.ffill`` and ``GroupBy.bfill`` (:issue:`11296`) .. _whatsnew_0230.docs: diff --git a/pandas/_libs/groupby.pyx b/pandas/_libs/groupby.pyx index 866683ce378ab..e3d208a915225 100644 --- a/pandas/_libs/groupby.pyx +++ b/pandas/_libs/groupby.pyx @@ -94,5 +94,221 @@ cdef inline float64_t kth_smallest_c(float64_t* a, return a[k] +@cython.boundscheck(False) +@cython.wraparound(False) +def group_median_float64(ndarray[float64_t, ndim=2] out, + ndarray[int64_t] counts, + ndarray[float64_t, ndim=2] values, + ndarray[int64_t] labels, + Py_ssize_t min_count=-1): + """ + Only aggregates on axis=0 + """ + cdef: + Py_ssize_t i, j, N, K, ngroups, size + ndarray[int64_t] _counts + ndarray data + float64_t* ptr + + assert min_count == -1, "'min_count' only used in add and prod" + + ngroups = len(counts) + N, K = ( values).shape + + indexer, _counts = groupsort_indexer(labels, ngroups) + counts[:] = _counts[1:] + + data = np.empty((K, N), dtype=np.float64) + ptr = data.data + + take_2d_axis1_float64_float64(values.T, indexer, out=data) + + with nogil: + + for i in range(K): + # exclude NA group + ptr += _counts[0] + for j in range(ngroups): + size = _counts[j + 1] + out[j, i] = median_linear(ptr, size) + ptr += size + + +@cython.boundscheck(False) +@cython.wraparound(False) +def group_cumprod_float64(float64_t[:, :] out, + float64_t[:, :] values, + int64_t[:] labels, + bint is_datetimelike): + """ + Only transforms on axis=0 + """ + cdef: + Py_ssize_t i, j, N, K, size + float64_t val + float64_t[:, :] accum + int64_t lab + + N, K = ( values).shape + accum = np.ones_like(values) + + with nogil: + for i in range(N): + lab = labels[i] + + if lab < 0: + continue + for j in range(K): + val = values[i, j] + if val == val: + accum[lab, j] *= val + out[i, j] = accum[lab, j] + + +@cython.boundscheck(False) +@cython.wraparound(False) +def group_cumsum(numeric[:, :] out, + numeric[:, :] values, + int64_t[:] labels, + is_datetimelike): + """ + Only transforms on axis=0 + """ + cdef: + Py_ssize_t i, j, N, K, size + numeric val + numeric[:, :] accum + int64_t lab + + N, K = ( values).shape + accum = np.zeros_like(values) + + with nogil: + for i in range(N): + lab = labels[i] + + if lab < 0: + continue + for j in range(K): + val = values[i, j] + + if numeric == float32_t or numeric == float64_t: + if val == val: + accum[lab, j] += val + out[i, j] = accum[lab, j] + else: + accum[lab, j] += val + out[i, j] = accum[lab, j] + + +@cython.boundscheck(False) +@cython.wraparound(False) +def group_shift_indexer(ndarray[int64_t] out, ndarray[int64_t] labels, + int ngroups, int periods): + cdef: + Py_ssize_t N, i, j, ii + int offset, sign + int64_t lab, idxer, idxer_slot + int64_t[:] label_seen = np.zeros(ngroups, dtype=np.int64) + int64_t[:, :] label_indexer + + N, = ( labels).shape + + if periods < 0: + periods = -periods + offset = N - 1 + sign = -1 + elif periods > 0: + offset = 0 + sign = 1 + + if periods == 0: + with nogil: + for i in range(N): + out[i] = i + else: + # array of each previous indexer seen + label_indexer = np.zeros((ngroups, periods), dtype=np.int64) + with nogil: + for i in range(N): + ## reverse iterator if shifting backwards + ii = offset + sign * i + lab = labels[ii] + + # Skip null keys + if lab == -1: + out[ii] = -1 + continue + + label_seen[lab] += 1 + + idxer_slot = label_seen[lab] % periods + idxer = label_indexer[lab, idxer_slot] + + if label_seen[lab] > periods: + out[ii] = idxer + else: + out[ii] = -1 + + label_indexer[lab, idxer_slot] = ii + + +@cython.wraparound(False) +@cython.boundscheck(False) +def group_fillna_indexer(ndarray[int64_t] out, ndarray[int64_t] labels, + ndarray[uint8_t] mask, object direction, + int64_t limit): + """Indexes how to fill values forwards or backwards within a group + + Parameters + ---------- + out : array of int64_t values which this method will write its results to + Missing values will be written to with a value of -1 + labels : array containing unique label for each group, with its ordering + matching up to the corresponding record in `values` + mask : array of int64_t values where a 1 indicates a missing value + direction : {'ffill', 'bfill'} + Direction for fill to be applied (forwards or backwards, respectively) + limit : Consecutive values to fill before stopping, or -1 for no limit + + Notes + ----- + This method modifies the `out` parameter rather than returning an object + """ + cdef: + Py_ssize_t i, N + ndarray[int64_t] sorted_labels + int64_t idx, curr_fill_idx=-1, filled_vals=0 + + N = len(out) + + # Make sure all arrays are the same size + assert N == len(labels) == len(mask) + + sorted_labels = np.argsort(labels).astype(np.int64, copy=False) + if direction == 'bfill': + sorted_labels = sorted_labels[::-1] + + with nogil: + for i in range(N): + idx = sorted_labels[i] + if mask[idx] == 1: # is missing + # Stop filling once we've hit the limit + if filled_vals >= limit and limit != -1: + curr_fill_idx = -1 + filled_vals += 1 + else: # reset items when not missing + filled_vals = 0 + curr_fill_idx = idx + + out[idx] = curr_fill_idx + + # If we move to the next group, reset + # the fill_idx and counter + if i == N - 1 or labels[idx] != labels[sorted_labels[i+1]]: + curr_fill_idx = -1 + filled_vals = 0 + + # generated from template include "groupby_helper.pxi" diff --git a/pandas/_libs/groupby_helper.pxi.in b/pandas/_libs/groupby_helper.pxi.in index e03e3af65755b..de802f4a72277 100644 --- a/pandas/_libs/groupby_helper.pxi.in +++ b/pandas/_libs/groupby_helper.pxi.in @@ -791,166 +791,3 @@ def group_cummax_{{name}}(ndarray[{{dest_type2}}, ndim=2] out, out[i, j] = mval {{endfor}} - -#---------------------------------------------------------------------- -# other grouping functions not needing a template -#---------------------------------------------------------------------- - - -@cython.boundscheck(False) -@cython.wraparound(False) -def group_median_float64(ndarray[float64_t, ndim=2] out, - ndarray[int64_t] counts, - ndarray[float64_t, ndim=2] values, - ndarray[int64_t] labels, - Py_ssize_t min_count=-1): - """ - Only aggregates on axis=0 - """ - cdef: - Py_ssize_t i, j, N, K, ngroups, size - ndarray[int64_t] _counts - ndarray data - float64_t* ptr - - assert min_count == -1, "'min_count' only used in add and prod" - - ngroups = len(counts) - N, K = ( values).shape - - indexer, _counts = groupsort_indexer(labels, ngroups) - counts[:] = _counts[1:] - - data = np.empty((K, N), dtype=np.float64) - ptr = data.data - - take_2d_axis1_float64_float64(values.T, indexer, out=data) - - with nogil: - - for i in range(K): - # exclude NA group - ptr += _counts[0] - for j in range(ngroups): - size = _counts[j + 1] - out[j, i] = median_linear(ptr, size) - ptr += size - - -@cython.boundscheck(False) -@cython.wraparound(False) -def group_cumprod_float64(float64_t[:, :] out, - float64_t[:, :] values, - int64_t[:] labels, - bint is_datetimelike): - """ - Only transforms on axis=0 - """ - cdef: - Py_ssize_t i, j, N, K, size - float64_t val - float64_t[:, :] accum - int64_t lab - - N, K = ( values).shape - accum = np.ones_like(values) - - with nogil: - for i in range(N): - lab = labels[i] - - if lab < 0: - continue - for j in range(K): - val = values[i, j] - if val == val: - accum[lab, j] *= val - out[i, j] = accum[lab, j] - - -@cython.boundscheck(False) -@cython.wraparound(False) -def group_cumsum(numeric[:, :] out, - numeric[:, :] values, - int64_t[:] labels, - is_datetimelike): - """ - Only transforms on axis=0 - """ - cdef: - Py_ssize_t i, j, N, K, size - numeric val - numeric[:, :] accum - int64_t lab - - N, K = ( values).shape - accum = np.zeros_like(values) - - with nogil: - for i in range(N): - lab = labels[i] - - if lab < 0: - continue - for j in range(K): - val = values[i, j] - - if numeric == float32_t or numeric == float64_t: - if val == val: - accum[lab, j] += val - out[i, j] = accum[lab, j] - else: - accum[lab, j] += val - out[i, j] = accum[lab, j] - - -@cython.boundscheck(False) -@cython.wraparound(False) -def group_shift_indexer(int64_t[:] out, int64_t[:] labels, - int ngroups, int periods): - cdef: - Py_ssize_t N, i, j, ii - int offset, sign - int64_t lab, idxer, idxer_slot - int64_t[:] label_seen = np.zeros(ngroups, dtype=np.int64) - int64_t[:, :] label_indexer - - N, = ( labels).shape - - if periods < 0: - periods = -periods - offset = N - 1 - sign = -1 - elif periods > 0: - offset = 0 - sign = 1 - - if periods == 0: - with nogil: - for i in range(N): - out[i] = i - else: - # array of each previous indexer seen - label_indexer = np.zeros((ngroups, periods), dtype=np.int64) - with nogil: - for i in range(N): - ## reverse iterator if shifting backwards - ii = offset + sign * i - lab = labels[ii] - - # Skip null keys - if lab == -1: - out[ii] = -1 - continue - - label_seen[lab] += 1 - - idxer_slot = label_seen[lab] % periods - idxer = label_indexer[lab, idxer_slot] - - if label_seen[lab] > periods: - out[ii] = idxer - else: - out[ii] = -1 - - label_indexer[lab, idxer_slot] = ii diff --git a/pandas/core/groupby.py b/pandas/core/groupby.py index b1615f720368d..852ad04cd8a2e 100644 --- a/pandas/core/groupby.py +++ b/pandas/core/groupby.py @@ -1,5 +1,5 @@ import types -from functools import wraps +from functools import wraps, partial import numpy as np import datetime import collections @@ -38,7 +38,7 @@ _ensure_float) from pandas.core.dtypes.cast import maybe_downcast_to_dtype from pandas.core.dtypes.generic import ABCSeries -from pandas.core.dtypes.missing import isna, notna, _maybe_fill +from pandas.core.dtypes.missing import isna, isnull, notna, _maybe_fill from pandas.core.base import (PandasObject, SelectionMixin, GroupByError, DataError, SpecificationError) @@ -1457,6 +1457,36 @@ def expanding(self, *args, **kwargs): from pandas.core.window import ExpandingGroupby return ExpandingGroupby(self, *args, **kwargs) + def _fill(self, direction, limit=None): + """Shared function for `pad` and `backfill` to call Cython method + + Parameters + ---------- + direction : {'ffill', 'bfill'} + Direction passed to underlying Cython function. `bfill` will cause + values to be filled backwards. `ffill` and any other values will + default to a forward fill + limit : int, default None + Maximum number of consecutive values to fill. If `None`, this + method will convert to -1 prior to passing to Cython + + Returns + ------- + `Series` or `DataFrame` with filled values + + See Also + -------- + pad + backfill + """ + # Need int value for Cython + if limit is None: + limit = -1 + + return self._get_cythonized_result('group_fillna_indexer', + self.grouper, needs_mask=True, + direction=direction, limit=limit) + @Substitution(name='groupby') def pad(self, limit=None): """ @@ -1474,7 +1504,7 @@ def pad(self, limit=None): Series.fillna DataFrame.fillna """ - return self.apply(lambda x: x.ffill(limit=limit)) + return self._fill('ffill', limit=limit) ffill = pad @Substitution(name='groupby') @@ -1494,7 +1524,7 @@ def backfill(self, limit=None): Series.fillna DataFrame.fillna """ - return self.apply(lambda x: x.bfill(limit=limit)) + return self._fill('bfill', limit=limit) bfill = backfill @Substitution(name='groupby') @@ -1843,6 +1873,45 @@ def cummax(self, axis=0, **kwargs): return self._cython_transform('cummax', numeric_only=False) + def _get_cythonized_result(self, how, grouper, needs_mask=False, + needs_ngroups=False, **kwargs): + """Get result for Cythonized functions + + Parameters + ---------- + how : str, Cythonized function name to be called + grouper : Grouper object containing pertinent group info + needs_mask : bool, default False + Whether boolean mask needs to be part of the Cython call signature + needs_ngroups : bool, default False + Whether number of groups part of the Cython call signature + **kwargs : dict + Extra arguments to be passed back to Cython funcs + + Returns + ------- + `Series` or `DataFrame` with filled values + """ + + labels, _, ngroups = grouper.group_info + output = collections.OrderedDict() + base_func = getattr(libgroupby, how) + + for name, obj in self._iterate_slices(): + indexer = np.zeros_like(labels, dtype=np.int64) + func = partial(base_func, indexer, labels) + if needs_mask: + mask = isnull(obj.values).view(np.uint8) + func = partial(func, mask) + + if needs_ngroups: + func = partial(func, ngroups) + + func(**kwargs) # Call func to modify indexer values in place + output[name] = algorithms.take_nd(obj.values, indexer) + + return self._wrap_transformed_output(output) + @Substitution(name='groupby') @Appender(_doc_template) def shift(self, periods=1, freq=None, axis=0): @@ -1860,17 +1929,9 @@ def shift(self, periods=1, freq=None, axis=0): if freq is not None or axis != 0: return self.apply(lambda x: x.shift(periods, freq, axis)) - labels, _, ngroups = self.grouper.group_info - - # filled in by Cython - indexer = np.zeros_like(labels) - libgroupby.group_shift_indexer(indexer, labels, ngroups, periods) - - output = {} - for name, obj in self._iterate_slices(): - output[name] = algorithms.take_nd(obj.values, indexer) - - return self._wrap_transformed_output(output) + return self._get_cythonized_result('group_shift_indexer', + self.grouper, needs_ngroups=True, + periods=periods) @Substitution(name='groupby') @Appender(_doc_template) @@ -3577,7 +3638,6 @@ def describe(self, **kwargs): def value_counts(self, normalize=False, sort=True, ascending=False, bins=None, dropna=True): - from functools import partial from pandas.core.reshape.tile import cut from pandas.core.reshape.merge import _get_join_indexers @@ -4585,9 +4645,17 @@ def _apply_to_column_groupbys(self, func): in self._iterate_column_groupbys()), keys=self._selected_obj.columns, axis=1) + def _fill(self, direction, limit=None): + """Overriden method to join grouped columns in output""" + res = super(DataFrameGroupBy, self)._fill(direction, limit=limit) + output = collections.OrderedDict( + (grp.name, grp.grouper) for grp in self.grouper.groupings) + + from pandas import concat + return concat((self._wrap_transformed_output(output), res), axis=1) + def count(self): """ Compute count of group, excluding missing values """ - from functools import partial from pandas.core.dtypes.missing import _isna_ndarraylike as isna data, _ = self._get_data_to_aggregate() diff --git a/pandas/tests/groupby/test_groupby.py b/pandas/tests/groupby/test_groupby.py index 129ac6b06205c..2429e9975fc8e 100644 --- a/pandas/tests/groupby/test_groupby.py +++ b/pandas/tests/groupby/test_groupby.py @@ -2061,6 +2061,61 @@ def test_rank_object_raises(self, ties_method, ascending, na_option, ascending=ascending, na_option=na_option, pct=pct) + @pytest.mark.parametrize("mix_groupings", [True, False]) + @pytest.mark.parametrize("as_series", [True, False]) + @pytest.mark.parametrize("val1,val2", [ + ('foo', 'bar'), (1, 2), (1., 2.)]) + @pytest.mark.parametrize("fill_method,limit,exp_vals", [ + ("ffill", None, + [np.nan, np.nan, 'val1', 'val1', 'val1', 'val2', 'val2', 'val2']), + ("ffill", 1, + [np.nan, np.nan, 'val1', 'val1', np.nan, 'val2', 'val2', np.nan]), + ("bfill", None, + ['val1', 'val1', 'val1', 'val2', 'val2', 'val2', np.nan, np.nan]), + ("bfill", 1, + [np.nan, 'val1', 'val1', np.nan, 'val2', 'val2', np.nan, np.nan]) + ]) + def test_group_fill_methods(self, mix_groupings, as_series, val1, val2, + fill_method, limit, exp_vals): + vals = [np.nan, np.nan, val1, np.nan, np.nan, val2, np.nan, np.nan] + _exp_vals = list(exp_vals) + # Overwrite placeholder values + for index, exp_val in enumerate(_exp_vals): + if exp_val == 'val1': + _exp_vals[index] = val1 + elif exp_val == 'val2': + _exp_vals[index] = val2 + + # Need to modify values and expectations depending on the + # Series / DataFrame that we ultimately want to generate + if mix_groupings: # ['a', 'b', 'a, 'b', ...] + keys = ['a', 'b'] * len(vals) + + def interweave(list_obj): + temp = list() + for x in list_obj: + temp.extend([x, x]) + + return temp + + _exp_vals = interweave(_exp_vals) + vals = interweave(vals) + else: # ['a', 'a', 'a', ... 'b', 'b', 'b'] + keys = ['a'] * len(vals) + ['b'] * len(vals) + _exp_vals = _exp_vals * 2 + vals = vals * 2 + + df = DataFrame({'key': keys, 'val': vals}) + if as_series: + result = getattr( + df.groupby('key')['val'], fill_method)(limit=limit) + exp = Series(_exp_vals, name='val') + assert_series_equal(result, exp) + else: + result = getattr(df.groupby('key'), fill_method)(limit=limit) + exp = DataFrame({'key': keys, 'val': _exp_vals}) + assert_frame_equal(result, exp) + def test_dont_clobber_name_column(self): df = DataFrame({'key': ['a', 'a', 'a', 'b', 'b', 'b'], 'name': ['foo', 'bar', 'baz'] * 2})