Skip to content

Commit

Permalink
PERF-#4268: Implement partition-parallel __getitem__ for bool Series …
Browse files Browse the repository at this point in the history
…masks (#4753)

Signed-off-by: Vasily Litvinov <fam1ly.n4me@yandex.ru>
  • Loading branch information
vnlitvinov authored Aug 16, 2022
1 parent f7fd559 commit bd326f1
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 36 deletions.
1 change: 1 addition & 0 deletions docs/release_notes/release_notes-0.16.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ Key Features and Updates
* PERF-#4773: Compute `lengths` and `widths` in `put` method of Dask partition like Ray do (#4780)
* PERF-#4732: Avoid overwriting already-evaluated `PandasOnRayDataframePartition._length_cache` and `PandasOnRayDataframePartition._width_cache` (#4754)
* PERF-#4713: Stop overriding the ray MacOS object store size limit (#4792)
* PERF-#4268: Implement partition-parallel __getitem__ for bool Series masks (#4753)
* Benchmarking enhancements
* FEAT-#4706: Add Modin ClassLogger to PandasDataframePartitionManager (#4707)
* Refactor Codebase
Expand Down
20 changes: 14 additions & 6 deletions modin/core/dataframe/algebra/binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class Binary(Operator):
"""Builder class for Binary operator."""

@classmethod
def call(cls, func, join_type="outer", preserve_labels=False):
def call(cls, func, join_type="outer", labels="replace"):
"""
Build template binary operator.
Expand All @@ -33,16 +33,19 @@ def call(cls, func, join_type="outer", preserve_labels=False):
Binary function to execute. Have to be able to accept at least two arguments.
join_type : {'left', 'right', 'outer', 'inner', None}, default: 'outer'
Type of join that will be used if indices of operands are not aligned.
preserve_labels : bool, default: False
Whether or not to force keep the axis labels of the right frame if the join occured.
labels : {"keep", "replace", "drop"}, default: "replace"
Whether keep labels from left Modin DataFrame, replace them with labels
from joined DataFrame or drop altogether to make them be computed lazily later.
Returns
-------
callable
Function that takes query compiler and executes binary operation.
"""

def caller(query_compiler, other, broadcast=False, *args, **kwargs):
def caller(
query_compiler, other, broadcast=False, *args, dtypes=None, **kwargs
):
"""
Apply binary `func` to passed operands.
Expand All @@ -58,6 +61,8 @@ def caller(query_compiler, other, broadcast=False, *args, **kwargs):
at the query compiler level, so this parameter is a hint that passed from a high level API.
*args : args,
Arguments that will be passed to `func`.
dtypes : "copy" or None, default: None
Whether to keep old dtypes or infer new dtypes from data.
**kwargs : kwargs,
Arguments that will be passed to `func`.
Expand All @@ -84,7 +89,8 @@ def caller(query_compiler, other, broadcast=False, *args, **kwargs):
lambda l, r: func(l, r.squeeze(), *args, **kwargs),
other._modin_frame,
join_type=join_type,
preserve_labels=preserve_labels,
labels=labels,
dtypes=dtypes,
)
)
else:
Expand All @@ -102,10 +108,12 @@ def caller(query_compiler, other, broadcast=False, *args, **kwargs):
lambda df: func(df, other, *args, **kwargs),
new_index=query_compiler.index,
new_columns=query_compiler.columns,
dtypes=dtypes,
)
else:
new_modin_frame = query_compiler._modin_frame.map(
lambda df: func(df, other, *args, **kwargs)
lambda df: func(df, other, *args, **kwargs),
dtypes=dtypes,
)
return query_compiler.__constructor__(new_modin_frame)

Expand Down
41 changes: 25 additions & 16 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2107,7 +2107,7 @@ def apply_select_indices(

@lazy_metadata_decorator(apply_axis="both")
def broadcast_apply(
self, axis, func, other, join_type="left", preserve_labels=True, dtypes=None
self, axis, func, other, join_type="left", labels="keep", dtypes=None
):
"""
Broadcast axis partitions of `other` to partitions of `self` and apply a function.
Expand All @@ -2122,8 +2122,9 @@ def broadcast_apply(
Modin DataFrame to broadcast.
join_type : str, default: "left"
Type of join to apply.
preserve_labels : bool, default: True
Whether keep labels from `self` Modin DataFrame or not.
labels : {"keep", "replace", "drop"}, default: "keep"
Whether keep labels from `self` Modin DataFrame, replace them with labels
from joined DataFrame or drop altogether to make them be computed lazily later.
dtypes : "copy" or None, default: None
Whether keep old dtypes or infer new dtypes from data.
Expand All @@ -2148,19 +2149,27 @@ def broadcast_apply(
)
if dtypes == "copy":
dtypes = self._dtypes
new_index = self.index
new_columns = self.columns
# Pass shape caches instead of values in order to not trigger shape
# computation.
new_row_lengths = self._row_lengths_cache
new_column_widths = self._column_widths_cache
if not preserve_labels:
if axis == 1:
new_columns = joined_index
new_column_widths = partition_sizes_along_axis
else:
new_index = joined_index
new_row_lengths = partition_sizes_along_axis

def _pick_axis(get_axis, sizes_cache):
if labels == "keep":
return get_axis(), sizes_cache
if labels == "replace":
return joined_index, partition_sizes_along_axis
assert labels == "drop", f"Unexpected `labels`: {labels}"
return None, None

if axis == 0:
# Pass shape caches instead of values in order to not trigger shape computation.
new_index, new_row_lengths = _pick_axis(
self._get_index, self._row_lengths_cache
)
new_columns, new_column_widths = self.columns, self._column_widths_cache
else:
new_index, new_row_lengths = self.index, self._row_lengths_cache
new_columns, new_column_widths = _pick_axis(
self._get_columns, self._column_widths_cache
)

return self.__constructor__(
new_frame,
new_index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,9 @@ def broadcast_apply(cls, axis, apply_func, left, right, other_name="r"):
"""

def map_func(df, *others):
other = pandas.concat(others, axis=axis ^ 1)
other = (
pandas.concat(others, axis=axis ^ 1) if len(others) > 1 else others[0]
)
return apply_func(df, **{other_name: other})

map_func = cls.preprocess_func(map_func)
Expand Down
39 changes: 26 additions & 13 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
is_numeric_dtype,
is_datetime_or_timedelta_dtype,
is_datetime64_any_dtype,
is_bool_dtype,
)
from pandas.core.base import DataError
from collections.abc import Iterable
Expand Down Expand Up @@ -2149,23 +2150,35 @@ def applyier(df, internal_indices, other=[], internal_other_indices=[]):
# END Map across rows/columns

# __getitem__ methods
__getitem_bool = Binary.register(
lambda df, r: df[r], join_type="left", labels="drop"
)

def __validate_bool_indexer(self, indexer):
if len(indexer) != len(self.index):
raise ValueError(
f"Item wrong length {len(indexer)} instead of {len(self.index)}."
)
if isinstance(indexer, pandas.Series) and not indexer.equals(self.index):
warnings.warn(
"Boolean Series key will be reindexed to match DataFrame index.",
PendingDeprecationWarning,
stacklevel=4,
)

def getitem_array(self, key):
# TODO: dont convert to pandas for array indexing
if isinstance(key, type(self)):
# here we check for a subset of bool indexers only to simplify the code;
# there could (potentially) be more of those, but we assume the most frequent
# ones are just of bool dtype
if len(key.dtypes) == 1 and is_bool_dtype(key.dtypes[0]):
self.__validate_bool_indexer(key.index)
return self.__getitem_bool(key, broadcast=True, dtypes="copy")

key = key.to_pandas().squeeze(axis=1)

if is_bool_indexer(key):
if isinstance(key, pandas.Series) and not key.index.equals(self.index):
warnings.warn(
"Boolean Series key will be reindexed to match DataFrame index.",
PendingDeprecationWarning,
stacklevel=3,
)
elif len(key) != len(self.index):
raise ValueError(
"Item wrong length {} instead of {}.".format(
len(key), len(self.index)
)
)
self.__validate_bool_indexer(key)
key = check_bool_indexer(self.index, key)
# We convert to a RangeIndex because getitem_row_array is expecting a list
# of indices, and RangeIndex will give us the exact indices of each boolean
Expand Down

0 comments on commit bd326f1

Please sign in to comment.