Skip to content

Commit

Permalink
REFACTOR-#7294: Reduce access of methods _modin_frame methods from _q…
Browse files Browse the repository at this point in the history
…uery_compiler
  • Loading branch information
arunjose696 committed Jun 3, 2024
1 parent 002125b commit 9d286ba
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 73 deletions.
11 changes: 4 additions & 7 deletions modin/core/dataframe/algebra/binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,15 @@ def maybe_compute_dtypes_common_cast(
The dtypes of the operands are supposed to be known.
"""
if not trigger_computations:
if not first._modin_frame.has_materialized_dtypes:
if not first.frame_has_materialized_dtypes:
return None

if (
isinstance(second, type(first))
and not second._modin_frame.has_materialized_dtypes
):
if isinstance(second, type(first)) and not second.frame_has_materialized_dtypes:
return None

dtypes_first = first._modin_frame.dtypes.to_dict()
dtypes_first = first.dtypes.to_dict()
if isinstance(second, type(first)):
dtypes_second = second._modin_frame.dtypes.to_dict()
dtypes_second = second.dtypes.to_dict()
columns_first = set(first.columns)
columns_second = set(second.columns)
common_columns = columns_first.intersection(columns_second)
Expand Down
2 changes: 1 addition & 1 deletion modin/core/dataframe/algebra/tree_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def caller(
_axis = kwargs.get("axis") if axis is None else axis

new_dtypes = None
if compute_dtypes and query_compiler._modin_frame.has_materialized_dtypes:
if compute_dtypes and query_compiler.frame_has_materialized_dtypes:
new_dtypes = str(compute_dtypes(query_compiler.dtypes, *args, **kwargs))

return query_compiler.__constructor__(
Expand Down
53 changes: 53 additions & 0 deletions modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4521,6 +4521,59 @@ def has_multiindex(self, axis=0):
assert axis == 1
return isinstance(self.columns, pandas.MultiIndex)

@property
def frame_has_materialized_dtypes(self) -> bool:
"""
Check if the undelying dataframe has materialized dtypes.
Returns
-------
bool
"""
return self._modin_frame.has_materialized_dtypes

def set_frame_dtypes_cache(self, dtypes):
"""
Set dtypes cache for the underlying dataframe frame.
Parameters
----------
dtypes : pandas.Series, ModinDtypes, callable or None
"""
self._modin_frame.set_dtypes_cache(dtypes)

def set_frame_index_cache(self, index):
"""
Set index cache for underlying dataframe.
Parameters
----------
index : sequence, callable or None
"""
self._modin_frame.set_index_cache(index)

@property
def frame_has_index_cache(self):
"""
Check if the index cache exists for underlying dataframe.
Returns
-------
bool
"""
return self._modin_frame.has_index_cache

@property
def frame_has_dtypes_cache(self) -> bool:
"""
Check if the dtypes cache exists for the underlying dataframe.
Returns
-------
bool
"""
return self._modin_frame.has_dtypes_cache

def get_index_name(self, axis=0):
"""
Get index name of specified axis.
Expand Down
4 changes: 2 additions & 2 deletions modin/core/storage_formats/pandas/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ def corr_method(
np.repeat(pandas.api.types.pandas_dtype("float"), len(new_columns)),
index=new_columns,
)
elif numeric_only and qc._modin_frame.has_materialized_dtypes:
old_dtypes = qc._modin_frame.dtypes
elif numeric_only and qc.frame_has_materialized_dtypes:
old_dtypes = qc.dtypes

new_columns = old_dtypes[old_dtypes.map(is_numeric_dtype)].index
new_index = new_columns.copy()
Expand Down
21 changes: 8 additions & 13 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,10 +580,7 @@ def reindex(self, axis, labels, **kwargs):
new_index, indexer = (self.index, None) if axis else self.index.reindex(labels)
new_columns, _ = self.columns.reindex(labels) if axis else (self.columns, None)
new_dtypes = None
if (
self._modin_frame.has_materialized_dtypes
and kwargs.get("method", None) is None
):
if self.frame_has_materialized_dtypes and kwargs.get("method", None) is None:
# For columns, defining types is easier because we don't have to calculate the common
# type, since the entire column is filled. A simple `reindex` covers our needs.
# For rows, we can avoid calculating common types if we know that no new strings of
Expand Down Expand Up @@ -2650,8 +2647,8 @@ def fillna(df):
}
return df.fillna(value=func_dict, **kwargs)

if self._modin_frame.has_materialized_dtypes:
dtypes = self._modin_frame.dtypes
if self.frame_has_materialized_dtypes:
dtypes = self.dtypes
value_dtypes = pandas.DataFrame(
{k: [v] for (k, v) in value.items()}
).dtypes
Expand All @@ -2663,12 +2660,10 @@ def fillna(df):
new_dtypes = dtypes

else:
if self._modin_frame.has_materialized_dtypes:
if self.frame_has_materialized_dtypes:
dtype = pandas.Series(value).dtype
if all(
find_common_type([t, dtype]) == t for t in self._modin_frame.dtypes
):
new_dtypes = self._modin_frame.dtypes
if all(find_common_type([t, dtype]) == t for t in self.dtypes):
new_dtypes = self.dtypes

def fillna(df):
return df.fillna(value=value, **kwargs)
Expand Down Expand Up @@ -2898,7 +2893,7 @@ def _set_item(df, row_loc): # pragma: no cover
df.loc[row_loc.squeeze(axis=1), col_loc] = item
return df

if self._modin_frame.has_materialized_dtypes and is_scalar(item):
if self.frame_has_materialized_dtypes and is_scalar(item):
new_dtypes = self.dtypes.copy()
old_dtypes = new_dtypes[col_loc]
item_type = extract_dtype(item)
Expand Down Expand Up @@ -4607,7 +4602,7 @@ def iloc_mut(partition, row_internal_indices, col_internal_indices, item):
# compute dtypes only if assigning entire columns
isinstance(row_numeric_index, slice)
and row_numeric_index == slice(None)
and self._modin_frame.has_materialized_dtypes
and self.frame_has_materialized_dtypes
):
new_dtypes = self.dtypes.copy()
new_dtypes.iloc[col_numeric_index] = broadcasted_dtypes.values
Expand Down
4 changes: 2 additions & 2 deletions modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1070,8 +1070,8 @@ def astype(

if not copy:
# If the new types match the old ones, then copying can be avoided
if self._query_compiler._modin_frame.has_materialized_dtypes:
frame_dtypes = self._query_compiler._modin_frame.dtypes
if self._query_compiler.frame_has_materialized_dtypes:
frame_dtypes = self._query_compiler.dtypes
if isinstance(dtype, dict):
for col in dtype:
if dtype[col] != frame_dtypes[col]:
Expand Down
66 changes: 33 additions & 33 deletions modin/tests/core/storage_formats/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -1138,14 +1138,14 @@ def test_binary_op_preserve_dtypes():
def setup_cache(df, has_cache=True):
if has_cache:
_ = df.dtypes
assert df._query_compiler._modin_frame.has_materialized_dtypes
assert df._query_compiler.frame_has_materialized_dtypes
else:
df._query_compiler._modin_frame.set_dtypes_cache(None)
assert not df._query_compiler._modin_frame.has_materialized_dtypes
df._query_compiler.set_frame_dtypes_cache(None)
assert not df._query_compiler.frame_has_materialized_dtypes
return df

def assert_cache(df, has_cache=True):
assert not (has_cache ^ df._query_compiler._modin_frame.has_materialized_dtypes)
assert not (has_cache ^ df._query_compiler.frame_has_materialized_dtypes)

# Check when `other` is a non-distributed object
assert_cache(setup_cache(df) + 2.0)
Expand Down Expand Up @@ -1179,7 +1179,7 @@ def remove_cache(df, axis):
if axis:
df._query_compiler._modin_frame.set_columns_cache(None)
else:
df._query_compiler._modin_frame.set_index_cache(None)
df._query_compiler.set_frame_index_cache(None)
assert_no_cache(df, axis)
return df

Expand All @@ -1195,30 +1195,30 @@ def test_setitem_bool_preserve_dtypes():
df = pd.DataFrame({"a": [1, 1, 2, 2], "b": [3, 4, 5, 6]})
indexer = pd.Series([True, False, True, False])

assert df._query_compiler._modin_frame.has_materialized_dtypes
assert df._query_compiler.frame_has_materialized_dtypes

# slice(None) as a col_loc
df.loc[indexer] = 2.0
assert df._query_compiler._modin_frame.has_materialized_dtypes
assert df._query_compiler.frame_has_materialized_dtypes

# list as a col_loc
df.loc[indexer, ["a", "b"]] = 2.0
assert df._query_compiler._modin_frame.has_materialized_dtypes
assert df._query_compiler.frame_has_materialized_dtypes

# scalar as a col_loc
df.loc[indexer, "a"] = 2.0
assert df._query_compiler._modin_frame.has_materialized_dtypes
assert df._query_compiler.frame_has_materialized_dtypes


def test_setitem_unhashable_preserve_dtypes():
df = pd.DataFrame([[1, 2, 3, 4], [5, 6, 7, 8]])
assert df._query_compiler._modin_frame.has_materialized_dtypes
assert df._query_compiler.frame_has_materialized_dtypes

df2 = pd.DataFrame([[9, 9], [5, 5]])
assert df2._query_compiler._modin_frame.has_materialized_dtypes
assert df2._query_compiler.frame_has_materialized_dtypes

df[[1, 2]] = df2
assert df._query_compiler._modin_frame.has_materialized_dtypes
assert df._query_compiler.frame_has_materialized_dtypes


@pytest.mark.parametrize("modify_config", [{RangePartitioning: True}], indirect=True)
Expand Down Expand Up @@ -1246,7 +1246,7 @@ def test_reindex_preserve_dtypes(kwargs):
df = pd.DataFrame({"a": [1, 1, 2, 2], "b": [3, 4, 5, 6]})

reindexed_df = df.reindex(**kwargs)
assert reindexed_df._query_compiler._modin_frame.has_materialized_dtypes
assert reindexed_df._query_compiler.frame_has_materialized_dtypes


class TestModinIndexIds:
Expand Down Expand Up @@ -2039,7 +2039,7 @@ def test_concat_axis_1(
)
# setting columns cache to 'None', in order to prevent completing 'dtypes' with the materialized columns
md_df._query_compiler._modin_frame.set_columns_cache(None)
md_df._query_compiler._modin_frame.set_dtypes_cache(
md_df._query_compiler.set_frame_dtypes_cache(
ModinDtypes(
DtypesDescriptor(
known_dtypes,
Expand Down Expand Up @@ -2100,7 +2100,7 @@ def test_update_parent(self):

# 'df2' will have a 'DtypesDescriptor' with unknown dtypes for a column 'c'
df2 = pd.DataFrame({"c": [2, 3, 4]})
df2._query_compiler._modin_frame.set_dtypes_cache(None)
df2._query_compiler.set_frame_dtypes_cache(None)
dtypes_cache = df2._query_compiler._modin_frame._dtypes
assert isinstance(
dtypes_cache._value, DtypesDescriptor
Expand Down Expand Up @@ -2226,7 +2226,7 @@ def test_set_index_with_dupl_labels(self):
"""Verify that setting duplicated columns doesn't propagate any errors to a user."""
df = pd.DataFrame({"a": [1, 2, 3, 4], "b": [3.5, 4.4, 5.5, 6.6]})
# making sure that dtypes are represented by an unmaterialized dtypes-descriptor
df._query_compiler._modin_frame.set_dtypes_cache(None)
df._query_compiler.set_frame_dtypes_cache(None)

df.columns = ["a", "a"]
assert df.dtypes.equals(
Expand All @@ -2252,8 +2252,8 @@ def test_concat_mi(self):
)

# Drop actual dtypes in order to use partially-known dtypes
md_df1._query_compiler._modin_frame.set_dtypes_cache(None)
md_df2._query_compiler._modin_frame.set_dtypes_cache(None)
md_df1._query_compiler.set_frame_dtypes_cache(None)
md_df2._query_compiler.set_frame_dtypes_cache(None)

md_res = pd.concat([md_df1, md_df2], axis=1)
pd_res = pandas.concat([pd_df1, pd_df2], axis=1)
Expand Down Expand Up @@ -2282,9 +2282,9 @@ def test_preserve_dtypes_setitem(self, self_dtype, value, value_dtype):
with mock.patch.object(PandasDataframe, "_compute_dtypes") as patch:
df = pd.DataFrame({"a": [1, 2], "b": [3, 4], "c": [3, 4]})
if self_dtype == "materialized":
assert df._query_compiler._modin_frame.has_materialized_dtypes
assert df._query_compiler.frame_has_materialized_dtypes
elif self_dtype == "partial":
df._query_compiler._modin_frame.set_dtypes_cache(
df._query_compiler.set_frame_dtypes_cache(
ModinDtypes(
DtypesDescriptor(
{"a": np.dtype("int64")},
Expand All @@ -2293,7 +2293,7 @@ def test_preserve_dtypes_setitem(self, self_dtype, value, value_dtype):
)
)
elif self_dtype == "unknown":
df._query_compiler._modin_frame.set_dtypes_cache(None)
df._query_compiler.set_frame_dtypes_cache(None)
else:
raise NotImplementedError(self_dtype)

Expand All @@ -2304,7 +2304,7 @@ def test_preserve_dtypes_setitem(self, self_dtype, value, value_dtype):
[np.dtype("int64"), value_dtype, np.dtype("int64")],
index=["a", "b", "c"],
)
assert df._query_compiler._modin_frame.has_materialized_dtypes
assert df._query_compiler.frame_has_materialized_dtypes
assert df.dtypes.equals(result_dtype)
elif self_dtype == "partial":
result_dtype = DtypesDescriptor(
Expand Down Expand Up @@ -2339,17 +2339,17 @@ def test_preserve_dtypes_insert(self, self_dtype, value, value_dtype):
with mock.patch.object(PandasDataframe, "_compute_dtypes") as patch:
df = pd.DataFrame({"a": [1, 2], "b": [3, 4]})
if self_dtype == "materialized":
assert df._query_compiler._modin_frame.has_materialized_dtypes
assert df._query_compiler.frame_has_materialized_dtypes
elif self_dtype == "partial":
df._query_compiler._modin_frame.set_dtypes_cache(
df._query_compiler.set_frame_dtypes_cache(
ModinDtypes(
DtypesDescriptor(
{"a": np.dtype("int64")}, cols_with_unknown_dtypes=["b"]
)
)
)
elif self_dtype == "unknown":
df._query_compiler._modin_frame.set_dtypes_cache(None)
df._query_compiler.set_frame_dtypes_cache(None)
else:
raise NotImplementedError(self_dtype)

Expand All @@ -2360,7 +2360,7 @@ def test_preserve_dtypes_insert(self, self_dtype, value, value_dtype):
[value_dtype, np.dtype("int64"), np.dtype("int64")],
index=["c", "a", "b"],
)
assert df._query_compiler._modin_frame.has_materialized_dtypes
assert df._query_compiler.frame_has_materialized_dtypes
assert df.dtypes.equals(result_dtype)
elif self_dtype == "partial":
result_dtype = DtypesDescriptor(
Expand Down Expand Up @@ -2390,7 +2390,7 @@ def test_get_dummies_case(self):
cols = [col for col in res.columns if col != "items"]
res[cols] = res[cols] / res[cols].mean()

assert res._query_compiler._modin_frame.has_materialized_dtypes
assert res._query_compiler.frame_has_materialized_dtypes

patch.assert_not_called()

Expand All @@ -2403,21 +2403,21 @@ def test_preserve_dtypes_reset_index(self, drop, has_materialized_index):
if has_materialized_index:
assert df._query_compiler._modin_frame.has_materialized_index
else:
df._query_compiler._modin_frame.set_index_cache(None)
df._query_compiler.set_frame_index_cache(None)
assert not df._query_compiler._modin_frame.has_materialized_index
assert df._query_compiler._modin_frame.has_materialized_dtypes
assert df._query_compiler.frame_has_materialized_dtypes

res = df.reset_index(drop=drop)
if drop:
# we droped the index, so columns and dtypes shouldn't change
assert res._query_compiler._modin_frame.has_materialized_dtypes
assert res._query_compiler.frame_has_materialized_dtypes
assert res.dtypes.equals(df.dtypes)
else:
if has_materialized_index:
# we should have inserted index dtype into the descriptor,
# and since both of them are materialized, the result should be
# materialized too
assert res._query_compiler._modin_frame.has_materialized_dtypes
assert res._query_compiler.frame_has_materialized_dtypes
assert res.dtypes.equals(
pandas.Series(
[np.dtype("int64"), np.dtype("int64")], index=["index", "a"]
Expand All @@ -2436,7 +2436,7 @@ def test_preserve_dtypes_reset_index(self, drop, has_materialized_index):

# case 2: 'df' has partial dtype by default
df = pd.DataFrame({"a": [1, 2, 3], "b": [3, 4, 5]})
df._query_compiler._modin_frame.set_dtypes_cache(
df._query_compiler.set_frame_dtypes_cache(
ModinDtypes(
DtypesDescriptor(
{"a": np.dtype("int64")}, cols_with_unknown_dtypes=["b"]
Expand All @@ -2446,7 +2446,7 @@ def test_preserve_dtypes_reset_index(self, drop, has_materialized_index):
if has_materialized_index:
assert df._query_compiler._modin_frame.has_materialized_index
else:
df._query_compiler._modin_frame.set_index_cache(None)
df._query_compiler.set_frame_index_cache(None)
assert not df._query_compiler._modin_frame.has_materialized_index

res = df.reset_index(drop=drop)
Expand Down
Loading

0 comments on commit 9d286ba

Please sign in to comment.