Skip to content

Commit

Permalink
FIX-#4818, PERF-#4825: Fix where by using the new n-ary operator (#4820)
Browse files Browse the repository at this point in the history
Signed-off-by: mvashishtha <mahesh@ponder.io>
Co-authored-by: Vasily Litvinov <fam1ly.n4me@yandex.ru>
  • Loading branch information
mvashishtha and vnlitvinov committed Sep 1, 2022
1 parent 1c0935c commit 2660aa3
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 57 deletions.
1 change: 1 addition & 0 deletions docs/release_notes/release_notes-0.16.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Key Features and Updates
* FIX-#4872: Stop checking the private ray mac memory limit (#4873)
* FIX-#4848: Fix rebalancing partitions when NPartitions == 1 (#4874)
* FIX-#4907: Implement `radd` for Series and DataFrame (#4908)
* FIX-#4818, PERF-#4825: Fix where by using the new n-ary operator (#4820)
* Performance enhancements
* PERF-#4182: Add cell-wise execution for binary ops, fix bin ops for empty dataframes (#4391)
* PERF-#4288: Improve perf of `groupby.mean` for narrow data (#4591)
Expand Down
4 changes: 2 additions & 2 deletions modin/core/dataframe/algebra/binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ def caller(
)
else:
return query_compiler.__constructor__(
query_compiler._modin_frame.binary_op(
query_compiler._modin_frame.n_ary_op(
lambda x, y: func(x, y, *args, **kwargs),
other._modin_frame,
[other._modin_frame],
join_type=join_type,
)
)
Expand Down
40 changes: 22 additions & 18 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2563,16 +2563,16 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
return (reindexed_frames[0], reindexed_frames[1:], joined_index, base_lengths)

@lazy_metadata_decorator(apply_axis="both")
def binary_op(self, op, right_frame, join_type="outer"):
def n_ary_op(self, op, right_frames: list, join_type="outer"):
"""
Perform an operation that requires joining with another Modin DataFrame.
Perform an n-opary operation by joining with other Modin DataFrame(s).
Parameters
----------
op : callable
Function to apply after the join.
right_frame : PandasDataframe
Modin DataFrame to join with.
right_frames : list of PandasDataframe
Modin DataFrames to join with.
join_type : str, default: "outer"
Type of join to apply.
Expand All @@ -2581,32 +2581,36 @@ def binary_op(self, op, right_frame, join_type="outer"):
PandasDataframe
New Modin DataFrame.
"""
left_parts, right_parts, joined_index, row_lengths = self._copartition(
0, right_frame, join_type, sort=True
left_parts, list_of_right_parts, joined_index, row_lengths = self._copartition(
0, right_frames, join_type, sort=True
)
new_left_frame = self.__constructor__(
left_parts, joined_index, self.columns, row_lengths, self._column_widths
)
new_right_frame = self.__constructor__(
right_parts[0],
joined_index,
right_frame.columns,
row_lengths,
right_frame._column_widths,
)
new_right_frames = [
self.__constructor__(
right_parts,
joined_index,
right_frame.columns,
row_lengths,
right_frame._column_widths,
)
for right_parts, right_frame in zip(list_of_right_parts, right_frames)
]

(
left_parts,
right_parts,
list_of_right_parts,
joined_columns,
column_widths,
) = new_left_frame._copartition(1, new_right_frame, join_type, sort=True)
) = new_left_frame._copartition(1, new_right_frames, join_type, sort=True)

new_frame = (
np.array([])
if len(left_parts) == 0 or len(right_parts[0]) == 0
else self._partition_mgr_cls.binary_operation(
left_parts, op, right_parts[0]
if len(left_parts) == 0
or any(len(right_parts) == 0 for right_parts in list_of_right_parts)
else self._partition_mgr_cls.n_ary_operation(
left_parts, op, list_of_right_parts
)
)

Expand Down
24 changes: 16 additions & 8 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1265,18 +1265,23 @@ def compute_part_size(indexer, remote_part, part_idx, axis):

@classmethod
@wait_computations_if_benchmark_mode
def binary_operation(cls, left, func, right):
"""
Apply a function that requires two ``PandasDataframe`` objects.
def n_ary_operation(cls, left, func, right: list):
r"""
Apply an n-ary operation to multiple ``PandasDataframe`` objects.
This method assumes that all the partitions of the dataframes in left
and right have the same dimensions. For each position i, j in each
dataframe's partitions, the result has a partition at (i, j) whose data
is func(left_partitions[i,j], \*each_right_partitions[i,j]).
Parameters
----------
left : np.ndarray
The partitions of left ``PandasDataframe``.
func : callable
The function to apply.
right : np.ndarray
The partitions of right ``PandasDataframe``.
right : list of np.ndarray
The list of partitions of other ``PandasDataframe``.
Returns
-------
Expand All @@ -1285,8 +1290,8 @@ def binary_operation(cls, left, func, right):
"""
func = cls.preprocess_func(func)

def get_right_block(row_idx, col_idx):
blocks = right[row_idx][col_idx].list_of_blocks
def get_right_block(right_partitions, row_idx, col_idx):
blocks = right_partitions[row_idx][col_idx].list_of_blocks
# TODO Resolve this assertion as a part of #4691, because the current implementation assumes
# that partition contains only 1 block.
assert (
Expand All @@ -1299,7 +1304,10 @@ def get_right_block(row_idx, col_idx):
[
part.apply(
func,
get_right_block(row_idx, col_idx),
*(
get_right_block(right_partitions, row_idx, col_idx)
for right_partitions in right
),
)
for col_idx, part in enumerate(left[row_idx])
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,6 @@ def _make_wrapped_method(name: str):
"apply_func_to_select_indices",
"apply_func_to_select_indices_along_full_axis",
"apply_func_to_indices_both_axis",
"binary_operation",
"n_ary_operation",
):
_make_wrapped_method(method)
34 changes: 13 additions & 21 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,23 +412,15 @@ def where(self, cond, other, **kwargs):
cond, type(self)
), "Must have the same QueryCompiler subclass to perform this operation"
if isinstance(other, type(self)):
# Note: Currently we are doing this with two maps across the entire
# data. This can be done with a single map, but it will take a
# modification in the `BlockPartition` class.
# If this were in one pass it would be ~2x faster.
# TODO (devin-petersohn) rewrite this to take one pass.
def where_builder_first_pass(cond, other, **kwargs):
return cond.where(cond, other, **kwargs)

first_pass = cond._modin_frame.binary_op(
where_builder_first_pass, other._modin_frame, join_type="left"
)

def where_builder_second_pass(df, new_other, **kwargs):
return df.where(new_other.eq(True), new_other, **kwargs)

new_modin_frame = self._modin_frame.binary_op(
where_builder_second_pass, first_pass, join_type="left"
# Make sure to set join_type=None so the `where` result always has
# the same row and column labels as `self`.
new_modin_frame = self._modin_frame.n_ary_op(
lambda df, cond, other: df.where(cond, other, **kwargs),
[
cond._modin_frame,
other._modin_frame,
],
join_type=None,
)
# This will be a Series of scalars to be applied based on the condition
# dataframe.
Expand All @@ -437,8 +429,8 @@ def where_builder_second_pass(df, new_other, **kwargs):
def where_builder_series(df, cond):
return df.where(cond, other, **kwargs)

new_modin_frame = self._modin_frame.binary_op(
where_builder_series, cond._modin_frame, join_type="left"
new_modin_frame = self._modin_frame.n_ary_op(
where_builder_series, [cond._modin_frame], join_type="left"
)
return self.__constructor__(new_modin_frame)

Expand Down Expand Up @@ -1889,8 +1881,8 @@ def fillna_builder(series, value_arg):
# objects (when `limit` parameter is absent) as it works on two `Series`.
return series.fillna(value=value_arg, **kwargs)

new_modin_frame = self._modin_frame.binary_op(
fillna_builder, value._modin_frame, join_type="left"
new_modin_frame = self._modin_frame.n_ary_op(
fillna_builder, [value._modin_frame], join_type="left"
)

return self.__constructor__(new_modin_frame)
Expand Down
44 changes: 39 additions & 5 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2279,7 +2279,6 @@ def _where(
try_cast=try_cast,
)
return self._create_or_update_from_compiler(new_query_compiler, inplace)
axis = self._get_axis_number(axis)
cond = cond(self) if callable(cond) else cond

if not isinstance(cond, DataFrame):
Expand All @@ -2290,11 +2289,46 @@ def _where(
cond = DataFrame(cond, index=self.index, columns=self.columns)
if isinstance(other, DataFrame):
other = other._query_compiler
elif isinstance(other, pandas.Series):
other = other.reindex(self.index if not axis else self.columns)
else:
index = self.index if not axis else self.columns
other = pandas.Series(other, index=index)
"""
Only infer the axis number when ``other`` will be made into a
series. When ``other`` is a dataframe, axis=None has a meaning
distinct from 0 and 1, e.g. at pandas 1.4.3:
import pandas as pd
df = pd.DataFrame([[1,2], [3, 4]], index=[1, 0])
cond = pd.DataFrame([[True,False], [False, True]], columns=[1, 0])
other = pd.DataFrame([[5,6], [7,8]], columns=[1, 0])
print(df.where(cond, other, axis=None))
0 1
1 1 7
0 6 4
print(df.where(cond, other, axis=0))
0 1
1 1 8
0 5 4
print(df.where(cond, other, axis=1))
0 1
1 1 5
0 8 4
"""
# _get_axis_number interprets no_default as None, but where doesn't
# accept no_default.
if axis == no_default:
raise ValueError(
"No axis named NoDefault.no_default for object type DataFrame"
)
axis = self._get_axis_number(axis)
if isinstance(other, pandas.Series):
other = other.reindex(self.index if axis == 0 else self.columns)
elif is_list_like(other):
index = self.index if axis == 0 else self.columns
other = pandas.Series(other, index=index)
query_compiler = self._query_compiler.where(
cond._query_compiler, other, axis=axis, level=level
)
Expand Down
38 changes: 36 additions & 2 deletions modin/pandas/test/dataframe/test_join_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from modin.utils import to_pandas

from modin.pandas.test.utils import (
create_test_dfs,
random_state,
df_equals,
arg_keys,
Expand Down Expand Up @@ -529,16 +530,25 @@ def test_sort_values_with_string_index():


def test_where():
columns = list("abcdefghij")

frame_data = random_state.randn(100, 10)
pandas_df = pandas.DataFrame(frame_data, columns=list("abcdefghij"))
modin_df = pd.DataFrame(frame_data, columns=list("abcdefghij"))
modin_df, pandas_df = create_test_dfs(frame_data, columns=columns)
pandas_cond_df = pandas_df % 5 < 2
modin_cond_df = modin_df % 5 < 2

pandas_result = pandas_df.where(pandas_cond_df, -pandas_df)
modin_result = modin_df.where(modin_cond_df, -modin_df)
assert all((to_pandas(modin_result) == pandas_result).all())

# Test that we choose the right values to replace when `other` == `True`
# everywhere.
other_data = np.full(shape=pandas_df.shape, fill_value=True)
modin_other, pandas_other = create_test_dfs(other_data, columns=columns)
pandas_result = pandas_df.where(pandas_cond_df, pandas_other)
modin_result = modin_df.where(modin_cond_df, modin_other)
df_equals(modin_result, pandas_result)

other = pandas_df.loc[3]
pandas_result = pandas_df.where(pandas_cond_df, other, axis=1)
modin_result = modin_df.where(modin_cond_df, other, axis=1)
Expand All @@ -554,6 +564,30 @@ def test_where():
assert all((to_pandas(modin_result) == pandas_result).all())


def test_where_different_axis_order():
# Test `where` when `cond`, `df`, and `other` each have columns and index
# in different orders.
data = test_data["float_nan_data"]
pandas_df = pandas.DataFrame(data)
pandas_cond_df = pandas_df % 5 < 2
pandas_cond_df = pandas_cond_df.reindex(
columns=pandas_df.columns[::-1], index=pandas_df.index[::-1]
)
pandas_other_df = -pandas_df
pandas_other_df = pandas_other_df.reindex(
columns=pandas_df.columns[-1:].append(pandas_df.columns[:-1]),
index=pandas_df.index[-1:].append(pandas_df.index[:-1]),
)

modin_df = pd.DataFrame(pandas_df)
modin_cond_df = pd.DataFrame(pandas_cond_df)
modin_other_df = pd.DataFrame(pandas_other_df)

pandas_result = pandas_df.where(pandas_cond_df, pandas_other_df)
modin_result = modin_df.where(modin_cond_df, modin_other_df)
df_equals(modin_result, pandas_result)


@pytest.mark.parametrize("align_axis", ["index", "columns"])
@pytest.mark.parametrize("keep_shape", [False, True])
@pytest.mark.parametrize("keep_equal", [False, True])
Expand Down

0 comments on commit 2660aa3

Please sign in to comment.