Skip to content

Commit

Permalink
REF: use BlockManager.apply for cython_agg_blocks, apply_blockwise (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
jbrockmendel authored and Kevin D Smith committed Nov 2, 2020
1 parent b148e75 commit da91aa5
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 39 deletions.
21 changes: 5 additions & 16 deletions pandas/core/groupby/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1035,8 +1035,6 @@ def _cython_agg_blocks(
if numeric_only:
data = data.get_numeric_data(copy=False)

agg_blocks: List["Block"] = []

no_result = object()

def cast_agg_result(result, values: ArrayLike, how: str) -> ArrayLike:
Expand Down Expand Up @@ -1118,23 +1116,14 @@ def blk_func(bvalues: ArrayLike) -> ArrayLike:
res_values = cast_agg_result(result, bvalues, how)
return res_values

for i, block in enumerate(data.blocks):
try:
nbs = block.apply(blk_func)
except (NotImplementedError, TypeError):
# TypeError -> we may have an exception in trying to aggregate
# continue and exclude the block
# NotImplementedError -> "ohlc" with wrong dtype
pass
else:
agg_blocks.extend(nbs)
# TypeError -> we may have an exception in trying to aggregate
# continue and exclude the block
# NotImplementedError -> "ohlc" with wrong dtype
new_mgr = data.apply(blk_func, ignore_failures=True)

if not agg_blocks:
if not len(new_mgr):
raise DataError("No numeric types to aggregate")

# reset the locs in the blocks to correspond to our
# current ordering
new_mgr = data._combine(agg_blocks)
return new_mgr

def _aggregate_frame(self, func, *args, **kwargs) -> DataFrame:
Expand Down
30 changes: 24 additions & 6 deletions pandas/core/internals/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,14 +350,24 @@ def operate_blockwise(self, other: "BlockManager", array_op) -> "BlockManager":
"""
return operate_blockwise(self, other, array_op)

def apply(self: T, f, align_keys=None, **kwargs) -> T:
def apply(
self: T,
f,
align_keys: Optional[List[str]] = None,
ignore_failures: bool = False,
**kwargs,
) -> T:
"""
Iterate over the blocks, collect and create a new BlockManager.
Parameters
----------
f : str or callable
Name of the Block method to apply.
align_keys: List[str] or None, default None
ignore_failures: bool, default False
**kwargs
Keywords to pass to `f`
Returns
-------
Expand Down Expand Up @@ -387,12 +397,20 @@ def apply(self: T, f, align_keys=None, **kwargs) -> T:
# otherwise we have an ndarray
kwargs[k] = obj[b.mgr_locs.indexer]

if callable(f):
applied = b.apply(f, **kwargs)
else:
applied = getattr(b, f)(**kwargs)
try:
if callable(f):
applied = b.apply(f, **kwargs)
else:
applied = getattr(b, f)(**kwargs)
except (TypeError, NotImplementedError):
if not ignore_failures:
raise
continue
result_blocks = _extend_blocks(applied, result_blocks)

if ignore_failures:
return self._combine(result_blocks)

if len(result_blocks) == 0:
return self.make_empty(self.axes)

Expand Down Expand Up @@ -704,7 +722,7 @@ def get_numeric_data(self, copy: bool = False) -> "BlockManager":
self._consolidate_inplace()
return self._combine([b for b in self.blocks if b.is_numeric], copy)

def _combine(self, blocks: List[Block], copy: bool = True) -> "BlockManager":
def _combine(self: T, blocks: List[Block], copy: bool = True) -> T:
""" return a new manager with the blocks """
if len(blocks) == 0:
return self.make_empty()
Expand Down
21 changes: 4 additions & 17 deletions pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,8 +489,6 @@ def _apply_blockwise(
if self._selected_obj.ndim == 1:
return self._apply_series(homogeneous_func)

# This isn't quite blockwise, since `blocks` is actually a collection
# of homogenenous DataFrames.
_, obj = self._create_blocks(self._selected_obj)
mgr = obj._mgr

Expand All @@ -500,25 +498,14 @@ def hfunc(bvalues: ArrayLike) -> ArrayLike:
res_values = homogeneous_func(values)
return getattr(res_values, "T", res_values)

skipped: List[int] = []
res_blocks: List["Block"] = []
for i, blk in enumerate(mgr.blocks):
try:
nbs = blk.apply(hfunc)

except (TypeError, NotImplementedError):
skipped.append(i)
continue

res_blocks.extend(nbs)
new_mgr = mgr.apply(hfunc, ignore_failures=True)
out = obj._constructor(new_mgr)

if not len(res_blocks) and skipped:
if out.shape[1] == 0 and obj.shape[1] > 0:
raise DataError("No numeric types to aggregate")
elif not len(res_blocks):
elif out.shape[1] == 0:
return obj.astype("float64")

new_mgr = mgr._combine(res_blocks)
out = obj._constructor(new_mgr)
self._insert_on_column(out, obj)
return out

Expand Down

0 comments on commit da91aa5

Please sign in to comment.