Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REFACTOR-#7294: Reduce access of methods _modin_frame methods from query compiler objects #7297

Merged
merged 1 commit into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions modin/core/dataframe/algebra/binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,15 @@ def maybe_compute_dtypes_common_cast(
The dtypes of the operands are supposed to be known.
"""
if not trigger_computations:
if not first._modin_frame.has_materialized_dtypes:
if not first.frame_has_materialized_dtypes:
return None

if (
isinstance(second, type(first))
and not second._modin_frame.has_materialized_dtypes
):
if isinstance(second, type(first)) and not second.frame_has_materialized_dtypes:
return None

dtypes_first = first._modin_frame.dtypes.to_dict()
dtypes_first = first.dtypes.to_dict()
if isinstance(second, type(first)):
dtypes_second = second._modin_frame.dtypes.to_dict()
dtypes_second = second.dtypes.to_dict()
columns_first = set(first.columns)
columns_second = set(second.columns)
common_columns = columns_first.intersection(columns_second)
Expand Down
2 changes: 1 addition & 1 deletion modin/core/dataframe/algebra/tree_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def caller(
_axis = kwargs.get("axis") if axis is None else axis

new_dtypes = None
if compute_dtypes and query_compiler._modin_frame.has_materialized_dtypes:
if compute_dtypes and query_compiler.frame_has_materialized_dtypes:
new_dtypes = str(compute_dtypes(query_compiler.dtypes, *args, **kwargs))

return query_compiler.__constructor__(
Expand Down
53 changes: 53 additions & 0 deletions modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4521,6 +4521,59 @@ def has_multiindex(self, axis=0):
assert axis == 1
return isinstance(self.columns, pandas.MultiIndex)

@property
def frame_has_materialized_dtypes(self) -> bool:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anmyachev, @arunjose696, why didn't we add similar methods for operating on columns?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently the relevant code was simply not touched when working on the small query compiler. There is no reason not to do this for columns.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would propose to do this for consistency. @arunjose696, would you take on this?

"""
Check if the undelying dataframe has materialized dtypes.

Returns
-------
bool
"""
return self._modin_frame.has_materialized_dtypes

def set_frame_dtypes_cache(self, dtypes):
"""
Set dtypes cache for the underlying dataframe frame.

Parameters
----------
dtypes : pandas.Series, ModinDtypes, callable or None
"""
self._modin_frame.set_dtypes_cache(dtypes)

def set_frame_index_cache(self, index):
"""
Set index cache for underlying dataframe.

Parameters
----------
index : sequence, callable or None
"""
self._modin_frame.set_index_cache(index)

@property
def frame_has_index_cache(self):
"""
Check if the index cache exists for underlying dataframe.

Returns
-------
bool
"""
return self._modin_frame.has_index_cache

@property
def frame_has_dtypes_cache(self) -> bool:
"""
Check if the dtypes cache exists for the underlying dataframe.

Returns
-------
bool
"""
return self._modin_frame.has_dtypes_cache

def get_index_name(self, axis=0):
"""
Get index name of specified axis.
Expand Down
4 changes: 2 additions & 2 deletions modin/core/storage_formats/pandas/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ def corr_method(
np.repeat(pandas.api.types.pandas_dtype("float"), len(new_columns)),
index=new_columns,
)
elif numeric_only and qc._modin_frame.has_materialized_dtypes:
old_dtypes = qc._modin_frame.dtypes
elif numeric_only and qc.frame_has_materialized_dtypes:
old_dtypes = qc.dtypes

new_columns = old_dtypes[old_dtypes.map(is_numeric_dtype)].index
new_index = new_columns.copy()
Expand Down
21 changes: 8 additions & 13 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,10 +580,7 @@ def reindex(self, axis, labels, **kwargs):
new_index, indexer = (self.index, None) if axis else self.index.reindex(labels)
new_columns, _ = self.columns.reindex(labels) if axis else (self.columns, None)
new_dtypes = None
if (
self._modin_frame.has_materialized_dtypes
and kwargs.get("method", None) is None
):
if self.frame_has_materialized_dtypes and kwargs.get("method", None) is None:
# For columns, defining types is easier because we don't have to calculate the common
# type, since the entire column is filled. A simple `reindex` covers our needs.
# For rows, we can avoid calculating common types if we know that no new strings of
Expand Down Expand Up @@ -2650,8 +2647,8 @@ def fillna(df):
}
return df.fillna(value=func_dict, **kwargs)

if self._modin_frame.has_materialized_dtypes:
dtypes = self._modin_frame.dtypes
if self.frame_has_materialized_dtypes:
dtypes = self.dtypes
value_dtypes = pandas.DataFrame(
{k: [v] for (k, v) in value.items()}
).dtypes
Expand All @@ -2663,12 +2660,10 @@ def fillna(df):
new_dtypes = dtypes

else:
if self._modin_frame.has_materialized_dtypes:
if self.frame_has_materialized_dtypes:
dtype = pandas.Series(value).dtype
if all(
find_common_type([t, dtype]) == t for t in self._modin_frame.dtypes
):
new_dtypes = self._modin_frame.dtypes
if all(find_common_type([t, dtype]) == t for t in self.dtypes):
new_dtypes = self.dtypes

def fillna(df):
return df.fillna(value=value, **kwargs)
Expand Down Expand Up @@ -2898,7 +2893,7 @@ def _set_item(df, row_loc): # pragma: no cover
df.loc[row_loc.squeeze(axis=1), col_loc] = item
return df

if self._modin_frame.has_materialized_dtypes and is_scalar(item):
if self.frame_has_materialized_dtypes and is_scalar(item):
new_dtypes = self.dtypes.copy()
old_dtypes = new_dtypes[col_loc]
item_type = extract_dtype(item)
Expand Down Expand Up @@ -4607,7 +4602,7 @@ def iloc_mut(partition, row_internal_indices, col_internal_indices, item):
# compute dtypes only if assigning entire columns
isinstance(row_numeric_index, slice)
and row_numeric_index == slice(None)
and self._modin_frame.has_materialized_dtypes
and self.frame_has_materialized_dtypes
):
new_dtypes = self.dtypes.copy()
new_dtypes.iloc[col_numeric_index] = broadcasted_dtypes.values
Expand Down
4 changes: 2 additions & 2 deletions modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1070,8 +1070,8 @@ def astype(

if not copy:
# If the new types match the old ones, then copying can be avoided
if self._query_compiler._modin_frame.has_materialized_dtypes:
frame_dtypes = self._query_compiler._modin_frame.dtypes
if self._query_compiler.frame_has_materialized_dtypes:
frame_dtypes = self._query_compiler.dtypes
if isinstance(dtype, dict):
for col in dtype:
if dtype[col] != frame_dtypes[col]:
Expand Down
66 changes: 33 additions & 33 deletions modin/tests/core/storage_formats/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -1138,14 +1138,14 @@ def test_binary_op_preserve_dtypes():
def setup_cache(df, has_cache=True):
if has_cache:
_ = df.dtypes
assert df._query_compiler._modin_frame.has_materialized_dtypes
assert df._query_compiler.frame_has_materialized_dtypes
else:
df._query_compiler._modin_frame.set_dtypes_cache(None)
assert not df._query_compiler._modin_frame.has_materialized_dtypes
df._query_compiler.set_frame_dtypes_cache(None)
assert not df._query_compiler.frame_has_materialized_dtypes
return df

def assert_cache(df, has_cache=True):
assert not (has_cache ^ df._query_compiler._modin_frame.has_materialized_dtypes)
assert not (has_cache ^ df._query_compiler.frame_has_materialized_dtypes)

# Check when `other` is a non-distributed object
assert_cache(setup_cache(df) + 2.0)
Expand Down Expand Up @@ -1179,7 +1179,7 @@ def remove_cache(df, axis):
if axis:
df._query_compiler._modin_frame.set_columns_cache(None)
else:
df._query_compiler._modin_frame.set_index_cache(None)
df._query_compiler.set_frame_index_cache(None)
assert_no_cache(df, axis)
return df

Expand All @@ -1195,30 +1195,30 @@ def test_setitem_bool_preserve_dtypes():
df = pd.DataFrame({"a": [1, 1, 2, 2], "b": [3, 4, 5, 6]})
indexer = pd.Series([True, False, True, False])

assert df._query_compiler._modin_frame.has_materialized_dtypes
assert df._query_compiler.frame_has_materialized_dtypes

# slice(None) as a col_loc
df.loc[indexer] = 2.0
assert df._query_compiler._modin_frame.has_materialized_dtypes
assert df._query_compiler.frame_has_materialized_dtypes

# list as a col_loc
df.loc[indexer, ["a", "b"]] = 2.0
assert df._query_compiler._modin_frame.has_materialized_dtypes
assert df._query_compiler.frame_has_materialized_dtypes

# scalar as a col_loc
df.loc[indexer, "a"] = 2.0
assert df._query_compiler._modin_frame.has_materialized_dtypes
assert df._query_compiler.frame_has_materialized_dtypes


def test_setitem_unhashable_preserve_dtypes():
df = pd.DataFrame([[1, 2, 3, 4], [5, 6, 7, 8]])
assert df._query_compiler._modin_frame.has_materialized_dtypes
assert df._query_compiler.frame_has_materialized_dtypes

df2 = pd.DataFrame([[9, 9], [5, 5]])
assert df2._query_compiler._modin_frame.has_materialized_dtypes
assert df2._query_compiler.frame_has_materialized_dtypes

df[[1, 2]] = df2
assert df._query_compiler._modin_frame.has_materialized_dtypes
assert df._query_compiler.frame_has_materialized_dtypes


@pytest.mark.parametrize("modify_config", [{RangePartitioning: True}], indirect=True)
Expand Down Expand Up @@ -1246,7 +1246,7 @@ def test_reindex_preserve_dtypes(kwargs):
df = pd.DataFrame({"a": [1, 1, 2, 2], "b": [3, 4, 5, 6]})

reindexed_df = df.reindex(**kwargs)
assert reindexed_df._query_compiler._modin_frame.has_materialized_dtypes
assert reindexed_df._query_compiler.frame_has_materialized_dtypes


class TestModinIndexIds:
Expand Down Expand Up @@ -2039,7 +2039,7 @@ def test_concat_axis_1(
)
# setting columns cache to 'None', in order to prevent completing 'dtypes' with the materialized columns
md_df._query_compiler._modin_frame.set_columns_cache(None)
md_df._query_compiler._modin_frame.set_dtypes_cache(
md_df._query_compiler.set_frame_dtypes_cache(
ModinDtypes(
DtypesDescriptor(
known_dtypes,
Expand Down Expand Up @@ -2100,7 +2100,7 @@ def test_update_parent(self):

# 'df2' will have a 'DtypesDescriptor' with unknown dtypes for a column 'c'
df2 = pd.DataFrame({"c": [2, 3, 4]})
df2._query_compiler._modin_frame.set_dtypes_cache(None)
df2._query_compiler.set_frame_dtypes_cache(None)
dtypes_cache = df2._query_compiler._modin_frame._dtypes
assert isinstance(
dtypes_cache._value, DtypesDescriptor
Expand Down Expand Up @@ -2226,7 +2226,7 @@ def test_set_index_with_dupl_labels(self):
"""Verify that setting duplicated columns doesn't propagate any errors to a user."""
df = pd.DataFrame({"a": [1, 2, 3, 4], "b": [3.5, 4.4, 5.5, 6.6]})
# making sure that dtypes are represented by an unmaterialized dtypes-descriptor
df._query_compiler._modin_frame.set_dtypes_cache(None)
df._query_compiler.set_frame_dtypes_cache(None)

df.columns = ["a", "a"]
assert df.dtypes.equals(
Expand All @@ -2252,8 +2252,8 @@ def test_concat_mi(self):
)

# Drop actual dtypes in order to use partially-known dtypes
md_df1._query_compiler._modin_frame.set_dtypes_cache(None)
md_df2._query_compiler._modin_frame.set_dtypes_cache(None)
md_df1._query_compiler.set_frame_dtypes_cache(None)
md_df2._query_compiler.set_frame_dtypes_cache(None)

md_res = pd.concat([md_df1, md_df2], axis=1)
pd_res = pandas.concat([pd_df1, pd_df2], axis=1)
Expand Down Expand Up @@ -2282,9 +2282,9 @@ def test_preserve_dtypes_setitem(self, self_dtype, value, value_dtype):
with mock.patch.object(PandasDataframe, "_compute_dtypes") as patch:
df = pd.DataFrame({"a": [1, 2], "b": [3, 4], "c": [3, 4]})
if self_dtype == "materialized":
assert df._query_compiler._modin_frame.has_materialized_dtypes
assert df._query_compiler.frame_has_materialized_dtypes
elif self_dtype == "partial":
df._query_compiler._modin_frame.set_dtypes_cache(
df._query_compiler.set_frame_dtypes_cache(
ModinDtypes(
DtypesDescriptor(
{"a": np.dtype("int64")},
Expand All @@ -2293,7 +2293,7 @@ def test_preserve_dtypes_setitem(self, self_dtype, value, value_dtype):
)
)
elif self_dtype == "unknown":
df._query_compiler._modin_frame.set_dtypes_cache(None)
df._query_compiler.set_frame_dtypes_cache(None)
else:
raise NotImplementedError(self_dtype)

Expand All @@ -2304,7 +2304,7 @@ def test_preserve_dtypes_setitem(self, self_dtype, value, value_dtype):
[np.dtype("int64"), value_dtype, np.dtype("int64")],
index=["a", "b", "c"],
)
assert df._query_compiler._modin_frame.has_materialized_dtypes
assert df._query_compiler.frame_has_materialized_dtypes
assert df.dtypes.equals(result_dtype)
elif self_dtype == "partial":
result_dtype = DtypesDescriptor(
Expand Down Expand Up @@ -2339,17 +2339,17 @@ def test_preserve_dtypes_insert(self, self_dtype, value, value_dtype):
with mock.patch.object(PandasDataframe, "_compute_dtypes") as patch:
df = pd.DataFrame({"a": [1, 2], "b": [3, 4]})
if self_dtype == "materialized":
assert df._query_compiler._modin_frame.has_materialized_dtypes
assert df._query_compiler.frame_has_materialized_dtypes
elif self_dtype == "partial":
df._query_compiler._modin_frame.set_dtypes_cache(
df._query_compiler.set_frame_dtypes_cache(
ModinDtypes(
DtypesDescriptor(
{"a": np.dtype("int64")}, cols_with_unknown_dtypes=["b"]
)
)
)
elif self_dtype == "unknown":
df._query_compiler._modin_frame.set_dtypes_cache(None)
df._query_compiler.set_frame_dtypes_cache(None)
else:
raise NotImplementedError(self_dtype)

Expand All @@ -2360,7 +2360,7 @@ def test_preserve_dtypes_insert(self, self_dtype, value, value_dtype):
[value_dtype, np.dtype("int64"), np.dtype("int64")],
index=["c", "a", "b"],
)
assert df._query_compiler._modin_frame.has_materialized_dtypes
assert df._query_compiler.frame_has_materialized_dtypes
assert df.dtypes.equals(result_dtype)
elif self_dtype == "partial":
result_dtype = DtypesDescriptor(
Expand Down Expand Up @@ -2390,7 +2390,7 @@ def test_get_dummies_case(self):
cols = [col for col in res.columns if col != "items"]
res[cols] = res[cols] / res[cols].mean()

assert res._query_compiler._modin_frame.has_materialized_dtypes
assert res._query_compiler.frame_has_materialized_dtypes

patch.assert_not_called()

Expand All @@ -2403,21 +2403,21 @@ def test_preserve_dtypes_reset_index(self, drop, has_materialized_index):
if has_materialized_index:
assert df._query_compiler._modin_frame.has_materialized_index
else:
df._query_compiler._modin_frame.set_index_cache(None)
df._query_compiler.set_frame_index_cache(None)
assert not df._query_compiler._modin_frame.has_materialized_index
assert df._query_compiler._modin_frame.has_materialized_dtypes
assert df._query_compiler.frame_has_materialized_dtypes

res = df.reset_index(drop=drop)
if drop:
# we droped the index, so columns and dtypes shouldn't change
assert res._query_compiler._modin_frame.has_materialized_dtypes
assert res._query_compiler.frame_has_materialized_dtypes
assert res.dtypes.equals(df.dtypes)
else:
if has_materialized_index:
# we should have inserted index dtype into the descriptor,
# and since both of them are materialized, the result should be
# materialized too
assert res._query_compiler._modin_frame.has_materialized_dtypes
assert res._query_compiler.frame_has_materialized_dtypes
assert res.dtypes.equals(
pandas.Series(
[np.dtype("int64"), np.dtype("int64")], index=["index", "a"]
Expand All @@ -2436,7 +2436,7 @@ def test_preserve_dtypes_reset_index(self, drop, has_materialized_index):

# case 2: 'df' has partial dtype by default
df = pd.DataFrame({"a": [1, 2, 3], "b": [3, 4, 5]})
df._query_compiler._modin_frame.set_dtypes_cache(
df._query_compiler.set_frame_dtypes_cache(
ModinDtypes(
DtypesDescriptor(
{"a": np.dtype("int64")}, cols_with_unknown_dtypes=["b"]
Expand All @@ -2446,7 +2446,7 @@ def test_preserve_dtypes_reset_index(self, drop, has_materialized_index):
if has_materialized_index:
assert df._query_compiler._modin_frame.has_materialized_index
else:
df._query_compiler._modin_frame.set_index_cache(None)
df._query_compiler.set_frame_index_cache(None)
assert not df._query_compiler._modin_frame.has_materialized_index

res = df.reset_index(drop=drop)
Expand Down
Loading
Loading