Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#7254: Support right merge/join #7226

Merged
merged 22 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3321,8 +3321,11 @@ def _extract_partitions(self):
if self._partitions.size > 0:
return self._partitions
else:
dtypes = None
if self.has_materialized_dtypes:
dtypes = self.dtypes
return self._partition_mgr_cls.create_partition_from_metadata(
index=self.index, columns=self.columns
index=self.index, columns=self.columns, dtypes=dtypes
)

@lazy_metadata_decorator(apply_axis="both")
Expand Down
10 changes: 8 additions & 2 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import warnings
from abc import ABC
from functools import wraps
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional

import numpy as np
import pandas
Expand Down Expand Up @@ -183,12 +183,16 @@ def preprocess_func(cls, map_func):
# END Abstract Methods

@classmethod
def create_partition_from_metadata(cls, **metadata):
def create_partition_from_metadata(
cls, dtypes: Optional[pandas.Series] = None, **metadata
):
"""
Create NumPy array of partitions that holds an empty dataframe with given metadata.

Parameters
----------
dtypes : pandas.Series, optional
Dtypes that will be used when calling `astype`.
anmyachev marked this conversation as resolved.
Show resolved Hide resolved
**metadata : dict
Metadata that has to be wrapped in a partition.

Expand All @@ -198,6 +202,8 @@ def create_partition_from_metadata(cls, **metadata):
A NumPy 2D array of a single partition which contains the data.
"""
metadata_dataframe = pandas.DataFrame(**metadata)
if dtypes is not None:
metadata_dataframe = metadata_dataframe.astype(dtypes)
return np.array([[cls._partition_class.put(metadata_dataframe)]])

@classmethod
Expand Down
6 changes: 4 additions & 2 deletions modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
from . import doc_utils

if TYPE_CHECKING:
from typing_extensions import Self

Check warning on line 57 in modin/core/storage_formats/base/query_compiler.py

View check run for this annotation

Codecov / codecov/patch

modin/core/storage_formats/base/query_compiler.py#L57

Added line #L57 was not covered by tests

# TODO: should be ModinDataframe
# https://github.com/modin-project/modin/issues/7244
from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe
Expand Down Expand Up @@ -158,7 +160,7 @@
else:
return obj

def default_to_pandas(self, pandas_op, *args, **kwargs):
def default_to_pandas(self, pandas_op, *args, **kwargs) -> Self:
"""
Do fallback to pandas for the passed function.

Expand Down Expand Up @@ -4467,7 +4469,7 @@
# END Abstract methods for QueryCompiler

@cached_property
def __constructor__(self) -> type[BaseQueryCompiler]:
def __constructor__(self) -> type[Self]:
"""
Get query compiler constructor.

Expand Down
49 changes: 41 additions & 8 deletions modin/core/storage_formats/pandas/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from __future__ import annotations

from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional

import pandas
from pandas.core.dtypes.common import is_list_like
Expand Down Expand Up @@ -103,7 +103,7 @@ def func(left, right):
@classmethod
def row_axis_merge(
cls, left: PandasQueryCompiler, right: PandasQueryCompiler, kwargs: dict
):
) -> PandasQueryCompiler:
"""
Execute merge using row-axis implementation.

Expand All @@ -126,10 +126,25 @@ def row_axis_merge(
right_index = kwargs.get("right_index", False)
sort = kwargs.get("sort", False)

if how in ["left", "inner"] and left_index is False and right_index is False:
if (
(
how in ["left", "inner"]
or (how == "right" and right._modin_frame._partitions.size != 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)
and left_index is False
and right_index is False
):
kwargs["sort"] = False

def should_keep_index(left, right):
reverted = False
if how == "right":
left, right = right, left
reverted = True

def should_keep_index(
left: PandasQueryCompiler,
right: PandasQueryCompiler,
) -> bool:
keep_index = False
if left_on is not None and right_on is not None:
keep_index = any(
Expand All @@ -144,8 +159,14 @@ def should_keep_index(left, right):
)
return keep_index

def map_func(left, right): # pragma: no cover
return pandas.merge(left, right, **kwargs)
def map_func(
left, right, kwargs=kwargs
) -> pandas.DataFrame: # pragma: no cover
if reverted:
df = pandas.merge(right, left, **kwargs)
else:
df = pandas.merge(left, right, **kwargs)
return df

# Want to ensure that these are python lists
if left_on is not None and right_on is not None:
Expand All @@ -156,7 +177,11 @@ def map_func(left, right): # pragma: no cover

right_to_broadcast = right._modin_frame.combine()
new_columns, new_dtypes = cls._compute_result_metadata(
left, right, on, left_on, right_on, kwargs.get("suffixes", ("_x", "_y"))
*((left, right) if not reverted else (right, left)),
on,
left_on,
right_on,
kwargs.get("suffixes", ("_x", "_y")),
)

# We rebalance when the ratio of the number of existing partitions to
Expand Down Expand Up @@ -226,7 +251,15 @@ def map_func(left, right): # pragma: no cover
return left.default_to_pandas(pandas.DataFrame.merge, right, **kwargs)

@classmethod
def _compute_result_metadata(cls, left, right, on, left_on, right_on, suffixes):
def _compute_result_metadata(
cls,
left: PandasQueryCompiler,
right: PandasQueryCompiler,
on,
left_on,
right_on,
suffixes,
) -> tuple[Optional[pandas.Index], Optional[ModinDtypes]]:
"""
Compute columns and dtypes metadata for the result of merge if possible.

Expand Down
35 changes: 24 additions & 11 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,33 +526,46 @@ def merge(self, right, **kwargs):
get_logger().info(message)
return MergeImpl.row_axis_merge(self, right, kwargs)

def join(self, right, **kwargs):
def join(self, right: PandasQueryCompiler, **kwargs) -> PandasQueryCompiler:
on = kwargs.get("on", None)
how = kwargs.get("how", "left")
sort = kwargs.get("sort", False)
left = self

if how in ["left", "inner"]:

def map_func(left, right, kwargs=kwargs): # pragma: no cover
return pandas.DataFrame.join(left, right, **kwargs)
if how in ["left", "inner"] or (
how == "right" and right._modin_frame._partitions.size != 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if left has size equals to 0?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty dataframes are processed at a higher level (simply defaulted to pandas) and all implementation logic relies on the fact that the left operand has a non-empty set of partitions. Therefore, to avoid an error, we need to handle this situation as before.

):
reverted = False
if how == "right":
left, right = right, left
reverted = True

def map_func(
left, right, kwargs=kwargs
) -> pandas.DataFrame: # pragma: no cover
if reverted:
df = pandas.DataFrame.join(right, left, **kwargs)
else:
df = pandas.DataFrame.join(left, right, **kwargs)
return df

right_to_broadcast = right._modin_frame.combine()
new_self = self.__constructor__(
self._modin_frame.broadcast_apply_full_axis(
left = left.__constructor__(
left._modin_frame.broadcast_apply_full_axis(
axis=1,
func=map_func,
# We're going to explicitly change the shape across the 1-axis,
# so we want for partitioning to adapt as well
keep_partitioning=False,
num_splits=merge_partitioning(
self._modin_frame, right._modin_frame, axis=1
left._modin_frame, right._modin_frame, axis=1
),
other=right_to_broadcast,
)
)
return new_self.sort_rows_by_column_values(on) if sort else new_self
return left.sort_rows_by_column_values(on) if sort else left
else:
return self.default_to_pandas(pandas.DataFrame.join, right, **kwargs)
return left.default_to_pandas(pandas.DataFrame.join, right, **kwargs)

# END Inter-Data operations

Expand Down Expand Up @@ -588,7 +601,7 @@ def reindex(self, axis, labels, **kwargs):
)
return self.__constructor__(new_modin_frame)

def reset_index(self, **kwargs):
def reset_index(self, **kwargs) -> PandasQueryCompiler:
if self.lazy_execution:

def _reset(df, *axis_lengths, partition_idx): # pragma: no cover
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1454,9 +1454,14 @@ def _join_by_index(self, other_modin_frames, how, sort, ignore_index):
condition=condition,
)

new_columns = Index.__new__(
Index, data=new_columns, dtype=new_columns_dtype
)
# in the case of heterogeneous data, using the `dtype` parameter of the
# `Index` constructor can lead to the following error:
# `ValueError: string values cannot be losslessly cast to int64`
# that's why we explicitly call astype below
new_columns = Index(new_columns)
if new_columns.dtype != new_columns_dtype and new_columns_dtype is not None:
# ValueError: string values cannot be losslessly cast to int64
new_columns = new_columns.astype(new_columns_dtype)
lhs = lhs.__constructor__(
dtypes=lhs._dtypes_for_exprs(exprs),
columns=new_columns,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1347,13 +1347,13 @@ def build_row_idx_filter_expr(row_idx, row_col):
return row_col.eq(row_idx)

if is_range_like(row_idx):
start = row_idx[0]
stop = row_idx[-1]
start = row_idx.start
stop = row_idx.stop
step = row_idx.step
if step < 0:
start, stop = stop, start
step = -step
exprs = [row_col.ge(start), row_col.le(stop)]
exprs = [row_col.ge(start), row_col.cmp("<", stop)]
if step > 1:
mod = OpExpr("MOD", [row_col, LiteralExpr(step)], _get_dtype(int))
exprs.append(mod.eq(0))
Expand Down
65 changes: 44 additions & 21 deletions modin/tests/pandas/dataframe/test_join_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,20 @@ def test_combine(data):
"test_data, test_data2",
[
(
np.random.uniform(0, 100, size=(2**6, 2**6)),
np.random.uniform(0, 100, size=(2**7, 2**6)),
np.random.randint(0, 100, size=(64, 64)),
np.random.uniform(0, 100, size=(128, 64)),
anmyachev marked this conversation as resolved.
Show resolved Hide resolved
),
(
np.random.uniform(0, 100, size=(2**7, 2**6)),
np.random.uniform(0, 100, size=(2**6, 2**6)),
np.random.randint(0, 100, size=(128, 64)),
np.random.randint(0, 100, size=(64, 64)),
),
(
np.random.uniform(0, 100, size=(2**6, 2**6)),
np.random.uniform(0, 100, size=(2**6, 2**7)),
np.random.randint(0, 100, size=(64, 64)),
np.random.randint(0, 100, size=(64, 128)),
),
(
np.random.uniform(0, 100, size=(2**6, 2**7)),
np.random.uniform(0, 100, size=(2**6, 2**6)),
np.random.randint(0, 100, size=(64, 128)),
np.random.randint(0, 100, size=(64, 64)),
),
],
)
Expand Down Expand Up @@ -122,8 +122,9 @@ def test_join(test_data, test_data2):
hows = ["inner", "left", "right", "outer"]
ons = ["col33", "col34"]
sorts = [False, True]
for i in range(4):
for j in range(2):
assert len(ons) == len(sorts), "the loop below is designed for this condition"
for i in range(len(hows)):
for j in range(len(ons)):
modin_result = modin_df.join(
modin_df2,
how=hows[i],
Expand All @@ -140,7 +141,13 @@ def test_join(test_data, test_data2):
lsuffix="_caller",
rsuffix="_other",
)
df_equals(modin_result, pandas_result)
if sorts[j]:
# sorting in `join` is implemented through range partitioning technique
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand this comment. sorting is just called in df_equals_and_sort. Should we remove this comment?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expanded the comment a little

# therefore the order of the rows after it does not match the pandas,
# so additional sorting is needed in order to get the same result as for pandas
df_equals_and_sort(modin_result, pandas_result)
else:
df_equals(modin_result, pandas_result)

frame_data = {
"col1": [0, 1, 2, 3],
Expand Down Expand Up @@ -174,6 +181,15 @@ def test_join(test_data, test_data2):
df_equals(modin_join, pandas_join)


@pytest.mark.parametrize("how", ["left", "inner", "right"])
def test_join_empty(how):
data = np.random.randint(0, 100, size=(64, 64))
eval_general(
*create_test_dfs(data),
lambda df: df.join(df.iloc[:0], on=1, how=how, lsuffix="_caller"),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we not use on parameter for a similar test with merge?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Different threads of execution, for merge function we can omit this parameter so that a certain part of the code starts executing.

)


def test_join_cross_6786():
data = [[7, 8, 9], [10, 11, 12]]
modin_df, pandas_df = create_test_dfs(data, columns=["x", "y", "z"])
Expand Down Expand Up @@ -269,19 +285,25 @@ def test_merge(test_data, test_data2):
index=pandas.Index([i for i in range(1, test_data2.shape[0] + 1)], name="key"),
)

hows = ["left", "inner"]
hows = ["left", "inner", "right"]
ons = ["col33", ["col33", "col34"]]
sorts = [False, True]
for i in range(2):
for j in range(2):
assert len(ons) == len(sorts), "the loop below is designed for this condition"
for i in range(len(hows)):
for j in range(len(ons)):
modin_result = modin_df.merge(
modin_df2, how=hows[i], on=ons[j], sort=sorts[j]
)
pandas_result = pandas_df.merge(
pandas_df2, how=hows[i], on=ons[j], sort=sorts[j]
)
# sorting in `merge` is implemented through range partitioning technique
# therefore the order of the rows after it does not match the pandas,
# so additional sorting is needed in order to get the same result as for pandas
sort_if_range_partitioning(
modin_result, pandas_result, force=StorageFormat.get() == "Hdk"
modin_result,
pandas_result,
force=StorageFormat.get() == "Hdk" or sorts[j],
)

modin_result = modin_df.merge(
Expand All @@ -299,7 +321,9 @@ def test_merge(test_data, test_data2):
sort=sorts[j],
)
sort_if_range_partitioning(
modin_result, pandas_result, force=StorageFormat.get() == "Hdk"
modin_result,
pandas_result,
force=StorageFormat.get() == "Hdk" or sorts[j],
)

# Test for issue #1771
Expand Down Expand Up @@ -418,11 +442,10 @@ def test_merge(test_data, test_data2):
modin_df.merge("Non-valid type")


def test_merge_empty():
data = np.random.uniform(0, 100, size=(2**6, 2**6))
pandas_df = pandas.DataFrame(data)
modin_df = pd.DataFrame(data)
eval_general(modin_df, pandas_df, lambda df: df.merge(df.iloc[:0]))
@pytest.mark.parametrize("how", ["left", "inner", "right"])
def test_merge_empty(how):
data = np.random.randint(0, 100, size=(64, 64))
eval_general(*create_test_dfs(data), lambda df: df.merge(df.iloc[:0], how=how))


def test_merge_with_mi_columns():
Expand Down
Loading