Skip to content

Commit

Permalink
PERF-#7299: Avoid using synchronize_labels for combine function (#…
Browse files Browse the repository at this point in the history
…7300)

Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
  • Loading branch information
anmyachev committed Jun 7, 2024
1 parent dea0003 commit af5ed06
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 8 deletions.
11 changes: 9 additions & 2 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2891,7 +2891,15 @@ def combine(self) -> PandasDataframe:
PandasDataframe
A single partition PandasDataframe.
"""
partitions = self._partition_mgr_cls.combine(self._partitions)
new_index = None
new_columns = None
if self._deferred_index:
new_index = self.index
if self._deferred_column:
new_columns = self.columns
partitions = self._partition_mgr_cls.combine(
self._partitions, new_index, new_columns
)
result = self.__constructor__(
partitions,
index=self.copy_index_cache(),
Expand All @@ -2909,7 +2917,6 @@ def combine(self) -> PandasDataframe:
dtypes=self.copy_dtypes_cache(),
pandas_backend=self._pandas_backend,
)
result.synchronize_labels()
return result

@lazy_metadata_decorator(apply_axis="both")
Expand Down
16 changes: 13 additions & 3 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1261,27 +1261,37 @@ def _apply_func_to_list_of_partitions(cls, func, partitions, **kwargs):
return [obj.apply(preprocessed_func, **kwargs) for obj in partitions]

@classmethod
def combine(cls, partitions):
def combine(cls, partitions, new_index=None, new_columns=None):
"""
Convert a NumPy 2D array of partitions to a NumPy 2D array of a single partition.
Parameters
----------
partitions : np.ndarray
The partitions which have to be converted to a single partition.
new_index : pandas.Index, optional
Index for propagation into internal partitions.
Optimization allowing to do this in one remote kernel.
new_columns : pandas.Index, optional
Columns for propagation into internal partitions.
Optimization allowing to do this in one remote kernel.
Returns
-------
np.ndarray
A NumPy 2D array of a single partition.
"""
if partitions.size <= 1:
if partitions.size <= 1 and new_index is None and new_columns is None:
return partitions

def to_pandas_remote(df, partition_shape, *dfs):
"""Copy of ``cls.to_pandas()`` method adapted for a remote function."""
return create_pandas_df_from_partitions(
(df,) + dfs, partition_shape, called_from_remote=True
(df,) + dfs,
partition_shape,
called_from_remote=True,
new_index=new_index,
new_columns=new_columns,
)

preprocessed_func = cls.preprocess_func(to_pandas_remote)
Expand Down
23 changes: 20 additions & 3 deletions modin/core/dataframe/pandas/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ def concatenate(dfs, copy=True):


def create_pandas_df_from_partitions(
partition_data, partition_shape, called_from_remote=False
partition_data,
partition_shape,
called_from_remote=False,
new_index=None,
new_columns=None,
):
"""
Convert partition data of multiple dataframes to a single dataframe.
Expand All @@ -85,6 +89,12 @@ def create_pandas_df_from_partitions(
Shape of the partitions NumPy array.
called_from_remote : bool, default: False
Flag used to check if explicit copy should be done in concat.
new_index : pandas.Index, optional
Index for propagation into internal partitions.
Optimization allowing to do this in one remote kernel.
new_columns : pandas.Index, optional
Columns for propagation into internal partitions.
Optimization allowing to do this in one remote kernel.
Returns
-------
Expand Down Expand Up @@ -127,6 +137,13 @@ def is_part_empty(part):
del partition_data

if len(df_rows) == 0:
return pandas.DataFrame()
res = pandas.DataFrame()
else:
return concatenate(df_rows, copy=not called_from_remote)
res = concatenate(df_rows, copy=not called_from_remote)

if new_index is not None:
res.index = new_index
if new_columns is not None:
res.columns = new_columns

return res

0 comments on commit af5ed06

Please sign in to comment.