From da91aa55a0d52f51c35a12cccdef8c021bc322ad Mon Sep 17 00:00:00 2001 From: jbrockmendel Date: Wed, 2 Sep 2020 19:56:33 -0700 Subject: [PATCH] REF: use BlockManager.apply for cython_agg_blocks, apply_blockwise (#35900) --- pandas/core/groupby/generic.py | 21 +++++---------------- pandas/core/internals/managers.py | 30 ++++++++++++++++++++++++------ pandas/core/window/rolling.py | 21 ++++----------------- 3 files changed, 33 insertions(+), 39 deletions(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index a92e3af0764a7..537feace59fcb 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -1035,8 +1035,6 @@ def _cython_agg_blocks( if numeric_only: data = data.get_numeric_data(copy=False) - agg_blocks: List["Block"] = [] - no_result = object() def cast_agg_result(result, values: ArrayLike, how: str) -> ArrayLike: @@ -1118,23 +1116,14 @@ def blk_func(bvalues: ArrayLike) -> ArrayLike: res_values = cast_agg_result(result, bvalues, how) return res_values - for i, block in enumerate(data.blocks): - try: - nbs = block.apply(blk_func) - except (NotImplementedError, TypeError): - # TypeError -> we may have an exception in trying to aggregate - # continue and exclude the block - # NotImplementedError -> "ohlc" with wrong dtype - pass - else: - agg_blocks.extend(nbs) + # TypeError -> we may have an exception in trying to aggregate + # continue and exclude the block + # NotImplementedError -> "ohlc" with wrong dtype + new_mgr = data.apply(blk_func, ignore_failures=True) - if not agg_blocks: + if not len(new_mgr): raise DataError("No numeric types to aggregate") - # reset the locs in the blocks to correspond to our - # current ordering - new_mgr = data._combine(agg_blocks) return new_mgr def _aggregate_frame(self, func, *args, **kwargs) -> DataFrame: diff --git a/pandas/core/internals/managers.py b/pandas/core/internals/managers.py index 389252e7ef0f2..2e3098d94afcb 100644 --- a/pandas/core/internals/managers.py +++ b/pandas/core/internals/managers.py @@ -350,7 +350,13 @@ def operate_blockwise(self, other: "BlockManager", array_op) -> "BlockManager": """ return operate_blockwise(self, other, array_op) - def apply(self: T, f, align_keys=None, **kwargs) -> T: + def apply( + self: T, + f, + align_keys: Optional[List[str]] = None, + ignore_failures: bool = False, + **kwargs, + ) -> T: """ Iterate over the blocks, collect and create a new BlockManager. @@ -358,6 +364,10 @@ def apply(self: T, f, align_keys=None, **kwargs) -> T: ---------- f : str or callable Name of the Block method to apply. + align_keys: List[str] or None, default None + ignore_failures: bool, default False + **kwargs + Keywords to pass to `f` Returns ------- @@ -387,12 +397,20 @@ def apply(self: T, f, align_keys=None, **kwargs) -> T: # otherwise we have an ndarray kwargs[k] = obj[b.mgr_locs.indexer] - if callable(f): - applied = b.apply(f, **kwargs) - else: - applied = getattr(b, f)(**kwargs) + try: + if callable(f): + applied = b.apply(f, **kwargs) + else: + applied = getattr(b, f)(**kwargs) + except (TypeError, NotImplementedError): + if not ignore_failures: + raise + continue result_blocks = _extend_blocks(applied, result_blocks) + if ignore_failures: + return self._combine(result_blocks) + if len(result_blocks) == 0: return self.make_empty(self.axes) @@ -704,7 +722,7 @@ def get_numeric_data(self, copy: bool = False) -> "BlockManager": self._consolidate_inplace() return self._combine([b for b in self.blocks if b.is_numeric], copy) - def _combine(self, blocks: List[Block], copy: bool = True) -> "BlockManager": + def _combine(self: T, blocks: List[Block], copy: bool = True) -> T: """ return a new manager with the blocks """ if len(blocks) == 0: return self.make_empty() diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index a3f60c0bc5098..558c0eeb0ea65 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -489,8 +489,6 @@ def _apply_blockwise( if self._selected_obj.ndim == 1: return self._apply_series(homogeneous_func) - # This isn't quite blockwise, since `blocks` is actually a collection - # of homogenenous DataFrames. _, obj = self._create_blocks(self._selected_obj) mgr = obj._mgr @@ -500,25 +498,14 @@ def hfunc(bvalues: ArrayLike) -> ArrayLike: res_values = homogeneous_func(values) return getattr(res_values, "T", res_values) - skipped: List[int] = [] - res_blocks: List["Block"] = [] - for i, blk in enumerate(mgr.blocks): - try: - nbs = blk.apply(hfunc) - - except (TypeError, NotImplementedError): - skipped.append(i) - continue - - res_blocks.extend(nbs) + new_mgr = mgr.apply(hfunc, ignore_failures=True) + out = obj._constructor(new_mgr) - if not len(res_blocks) and skipped: + if out.shape[1] == 0 and obj.shape[1] > 0: raise DataError("No numeric types to aggregate") - elif not len(res_blocks): + elif out.shape[1] == 0: return obj.astype("float64") - new_mgr = mgr._combine(res_blocks) - out = obj._constructor(new_mgr) self._insert_on_column(out, obj) return out