Skip to content

Commit

Permalink
Cythonized GroupBy Fill (pandas-dev#19673)
Browse files Browse the repository at this point in the history
  • Loading branch information
WillAyd authored and harisbal committed Feb 28, 2018
1 parent ffa89a6 commit 08732e0
Show file tree
Hide file tree
Showing 6 changed files with 362 additions and 185 deletions.
10 changes: 5 additions & 5 deletions asv_bench/benchmarks/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,11 +370,11 @@ class GroupByMethods(object):

param_names = ['dtype', 'method']
params = [['int', 'float'],
['all', 'any', 'count', 'cumcount', 'cummax', 'cummin',
'cumprod', 'cumsum', 'describe', 'first', 'head', 'last', 'mad',
'max', 'min', 'median', 'mean', 'nunique', 'pct_change', 'prod',
'rank', 'sem', 'shift', 'size', 'skew', 'std', 'sum', 'tail',
'unique', 'value_counts', 'var']]
['all', 'any', 'bfill', 'count', 'cumcount', 'cummax', 'cummin',
'cumprod', 'cumsum', 'describe', 'ffill', 'first', 'head',
'last', 'mad', 'max', 'min', 'median', 'mean', 'nunique',
'pct_change', 'prod', 'rank', 'sem', 'shift', 'size', 'skew',
'std', 'sum', 'tail', 'unique', 'value_counts', 'var']]

def setup(self, dtype, method):
ngroups = 1000
Expand Down
1 change: 1 addition & 0 deletions doc/source/whatsnew/v0.23.0.txt
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,7 @@ Performance Improvements
- Improved performance of pairwise ``.rolling()`` and ``.expanding()`` with ``.cov()`` and ``.corr()`` operations (:issue:`17917`)
- Improved performance of :func:`DataFrameGroupBy.rank` (:issue:`15779`)
- Improved performance of variable ``.rolling()`` on ``.min()`` and ``.max()`` (:issue:`19521`)
- Improved performance of ``GroupBy.ffill`` and ``GroupBy.bfill`` (:issue:`11296`)

.. _whatsnew_0230.docs:

Expand Down
216 changes: 216 additions & 0 deletions pandas/_libs/groupby.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,221 @@ cdef inline float64_t kth_smallest_c(float64_t* a,
return a[k]


@cython.boundscheck(False)
@cython.wraparound(False)
def group_median_float64(ndarray[float64_t, ndim=2] out,
ndarray[int64_t] counts,
ndarray[float64_t, ndim=2] values,
ndarray[int64_t] labels,
Py_ssize_t min_count=-1):
"""
Only aggregates on axis=0
"""
cdef:
Py_ssize_t i, j, N, K, ngroups, size
ndarray[int64_t] _counts
ndarray data
float64_t* ptr

assert min_count == -1, "'min_count' only used in add and prod"

ngroups = len(counts)
N, K = (<object> values).shape

indexer, _counts = groupsort_indexer(labels, ngroups)
counts[:] = _counts[1:]

data = np.empty((K, N), dtype=np.float64)
ptr = <float64_t*> data.data

take_2d_axis1_float64_float64(values.T, indexer, out=data)

with nogil:

for i in range(K):
# exclude NA group
ptr += _counts[0]
for j in range(ngroups):
size = _counts[j + 1]
out[j, i] = median_linear(ptr, size)
ptr += size


@cython.boundscheck(False)
@cython.wraparound(False)
def group_cumprod_float64(float64_t[:, :] out,
float64_t[:, :] values,
int64_t[:] labels,
bint is_datetimelike):
"""
Only transforms on axis=0
"""
cdef:
Py_ssize_t i, j, N, K, size
float64_t val
float64_t[:, :] accum
int64_t lab

N, K = (<object> values).shape
accum = np.ones_like(values)

with nogil:
for i in range(N):
lab = labels[i]

if lab < 0:
continue
for j in range(K):
val = values[i, j]
if val == val:
accum[lab, j] *= val
out[i, j] = accum[lab, j]


@cython.boundscheck(False)
@cython.wraparound(False)
def group_cumsum(numeric[:, :] out,
numeric[:, :] values,
int64_t[:] labels,
is_datetimelike):
"""
Only transforms on axis=0
"""
cdef:
Py_ssize_t i, j, N, K, size
numeric val
numeric[:, :] accum
int64_t lab

N, K = (<object> values).shape
accum = np.zeros_like(values)

with nogil:
for i in range(N):
lab = labels[i]

if lab < 0:
continue
for j in range(K):
val = values[i, j]

if numeric == float32_t or numeric == float64_t:
if val == val:
accum[lab, j] += val
out[i, j] = accum[lab, j]
else:
accum[lab, j] += val
out[i, j] = accum[lab, j]


@cython.boundscheck(False)
@cython.wraparound(False)
def group_shift_indexer(ndarray[int64_t] out, ndarray[int64_t] labels,
int ngroups, int periods):
cdef:
Py_ssize_t N, i, j, ii
int offset, sign
int64_t lab, idxer, idxer_slot
int64_t[:] label_seen = np.zeros(ngroups, dtype=np.int64)
int64_t[:, :] label_indexer

N, = (<object> labels).shape

if periods < 0:
periods = -periods
offset = N - 1
sign = -1
elif periods > 0:
offset = 0
sign = 1

if periods == 0:
with nogil:
for i in range(N):
out[i] = i
else:
# array of each previous indexer seen
label_indexer = np.zeros((ngroups, periods), dtype=np.int64)
with nogil:
for i in range(N):
## reverse iterator if shifting backwards
ii = offset + sign * i
lab = labels[ii]

# Skip null keys
if lab == -1:
out[ii] = -1
continue

label_seen[lab] += 1

idxer_slot = label_seen[lab] % periods
idxer = label_indexer[lab, idxer_slot]

if label_seen[lab] > periods:
out[ii] = idxer
else:
out[ii] = -1

label_indexer[lab, idxer_slot] = ii


@cython.wraparound(False)
@cython.boundscheck(False)
def group_fillna_indexer(ndarray[int64_t] out, ndarray[int64_t] labels,
ndarray[uint8_t] mask, object direction,
int64_t limit):
"""Indexes how to fill values forwards or backwards within a group
Parameters
----------
out : array of int64_t values which this method will write its results to
Missing values will be written to with a value of -1
labels : array containing unique label for each group, with its ordering
matching up to the corresponding record in `values`
mask : array of int64_t values where a 1 indicates a missing value
direction : {'ffill', 'bfill'}
Direction for fill to be applied (forwards or backwards, respectively)
limit : Consecutive values to fill before stopping, or -1 for no limit
Notes
-----
This method modifies the `out` parameter rather than returning an object
"""
cdef:
Py_ssize_t i, N
ndarray[int64_t] sorted_labels
int64_t idx, curr_fill_idx=-1, filled_vals=0

N = len(out)

# Make sure all arrays are the same size
assert N == len(labels) == len(mask)

sorted_labels = np.argsort(labels).astype(np.int64, copy=False)
if direction == 'bfill':
sorted_labels = sorted_labels[::-1]

with nogil:
for i in range(N):
idx = sorted_labels[i]
if mask[idx] == 1: # is missing
# Stop filling once we've hit the limit
if filled_vals >= limit and limit != -1:
curr_fill_idx = -1
filled_vals += 1
else: # reset items when not missing
filled_vals = 0
curr_fill_idx = idx

out[idx] = curr_fill_idx

# If we move to the next group, reset
# the fill_idx and counter
if i == N - 1 or labels[idx] != labels[sorted_labels[i+1]]:
curr_fill_idx = -1
filled_vals = 0


# generated from template
include "groupby_helper.pxi"
Loading

0 comments on commit 08732e0

Please sign in to comment.