Skip to content

Commit

Permalink
PERF-#7230: Don't preserve bad partition for 'merge' (#7229)
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
  • Loading branch information
anmyachev authored Apr 30, 2024
1 parent c48bb30 commit 747f4de
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 1 deletion.
22 changes: 21 additions & 1 deletion modin/core/storage_formats/pandas/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,23 @@

"""Contains implementations for Merge/Join."""

from __future__ import annotations

from typing import TYPE_CHECKING

import pandas
from pandas.core.dtypes.common import is_list_like
from pandas.errors import MergeError

from modin.config import MinPartitionSize, NPartitions
from modin.core.dataframe.base.dataframe.utils import join_columns
from modin.core.dataframe.pandas.metadata import ModinDtypes

from .utils import merge_partitioning

if TYPE_CHECKING:
from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler


# TODO: add methods for 'join' here
class MergeImpl:
Expand Down Expand Up @@ -93,7 +101,9 @@ def func(left, right):
).reset_index(drop=True)

@classmethod
def row_axis_merge(cls, left, right, kwargs):
def row_axis_merge(
cls, left: PandasQueryCompiler, right: PandasQueryCompiler, kwargs: dict
):
"""
Execute merge using row-axis implementation.
Expand Down Expand Up @@ -164,6 +174,16 @@ def map_func(
left, right, on, left_on, right_on, kwargs.get("suffixes", ("_x", "_y"))
)

# We rebalance when the ratio of the number of existing partitions to
# the ideal number of partitions is smaller than this threshold. The
# threshold is a heuristic that may need to be tuned for performance.
if (
left._modin_frame._partitions.shape[0] < 0.3 * NPartitions.get()
# to avoid empty partitions after repartition; can materialize index
and len(left._modin_frame) > NPartitions.get() * MinPartitionSize.get()
):
left = left.repartition(axis=0)

new_left = left.__constructor__(
left._modin_frame.broadcast_apply_full_axis(
axis=1,
Expand Down
35 changes: 35 additions & 0 deletions modin/tests/core/storage_formats/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,41 @@ def test_merge_partitioning(
)


def test_merge_with_bad_partitioning():
# https://github.com/modin-project/modin/pull/7229

left_partitioning = [256]
right_partitioning = [32, 32, 32, 32]

left_df = pandas.DataFrame(
[np.arange(sum(left_partitioning)) for _ in range(sum(left_partitioning))]
)
right_df = pandas.DataFrame(
[np.arange(sum(right_partitioning)) for _ in range(sum(right_partitioning))]
)

left = construct_modin_df_by_scheme(
left_df, {"row_lengths": left_partitioning, "column_widths": left_partitioning}
)
right = construct_modin_df_by_scheme(
right_df,
{"row_lengths": right_partitioning, "column_widths": right_partitioning},
)

left_frame = left._query_compiler._modin_frame
right_frame = right._query_compiler._modin_frame
assert left_frame.row_lengths == left_frame.column_widths == left_partitioning
assert right_frame.row_lengths == right_frame.column_widths == right_partitioning

# just a dummy value
return_value = pd.DataFrame([1, 2, 3, 4])._query_compiler
with mock.patch.object(
left._query_compiler, "repartition", return_value=return_value
) as repartition:
_ = left.merge(right)
repartition.assert_called_once_with(axis=0)


def test_groupby_with_empty_partition():
# see #5461 for details
md_df = construct_modin_df_by_scheme(
Expand Down

0 comments on commit 747f4de

Please sign in to comment.