Skip to content

Commit

Permalink
FEAT-#7337: Using dynamic partitionning in broadcast_apply (#7338)
Browse files Browse the repository at this point in the history
Signed-off-by: Kirill Suvorov <kirill.suvorov@intel.com>
  • Loading branch information
Retribution98 authored Aug 26, 2024
1 parent da01571 commit 8249915
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 9 deletions.
8 changes: 5 additions & 3 deletions modin/core/dataframe/algebra/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,9 +655,11 @@ def aggregate_on_dict(grp_obj, *args, **kwargs):
)

native_res_part = [] if native_agg_res is None else [native_agg_res]
result = pandas.concat(
[*native_res_part, *custom_results], axis=1, copy=False
)
parts = [*native_res_part, *custom_results]
if parts:
result = pandas.concat(parts, axis=1, copy=False)
else:
result = pandas.DataFrame(columns=result_columns)

# The order is naturally preserved if there's no custom aggregations
if preserve_aggregation_order and len(custom_aggs):
Expand Down
59 changes: 57 additions & 2 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ def get_partitions(index):

@classmethod
@wait_computations_if_benchmark_mode
def broadcast_apply(cls, axis, apply_func, left, right):
def base_broadcast_apply(cls, axis, apply_func, left, right):
"""
Broadcast the `right` partitions to `left` and apply `apply_func` function.
Expand Down Expand Up @@ -504,6 +504,7 @@ def broadcast_axis_partitions(
keep_partitioning=False,
num_splits=None,
apply_indices=None,
broadcast_all=True,
enumerate_partitions=False,
lengths=None,
apply_func_args=None,
Expand Down Expand Up @@ -532,6 +533,8 @@ def broadcast_axis_partitions(
then the number of splits is preserved.
apply_indices : list of ints, default: None
Indices of `axis ^ 1` to apply function over.
broadcast_all : bool, default: True
Whether or not to pass all right axis partitions to each of the left axis partitions.
enumerate_partitions : bool, default: False
Whether or not to pass partition index into `apply_func`.
Note that `apply_func` must be able to accept `partition_idx` kwarg.
Expand Down Expand Up @@ -578,7 +581,6 @@ def broadcast_axis_partitions(
# load-balance the data as well.
kw = {
"num_splits": num_splits,
"other_axis_partition": right_partitions,
"maintain_partitioning": keep_partitioning,
}
if lengths:
Expand All @@ -593,6 +595,9 @@ def broadcast_axis_partitions(
left_partitions[i].apply(
preprocessed_map_func,
*(apply_func_args if apply_func_args else []),
other_axis_partition=(
right_partitions if broadcast_all else right_partitions[i]
),
**kw,
**({"partition_idx": idx} if enumerate_partitions else {}),
**kwargs,
Expand Down Expand Up @@ -648,6 +653,56 @@ def base_map_partitions(
]
)

@classmethod
@wait_computations_if_benchmark_mode
def broadcast_apply(
cls,
axis,
apply_func,
left,
right,
):
"""
Broadcast the `right` partitions to `left` and apply `apply_func` function using different approaches to achieve the best performance.
Parameters
----------
axis : {0, 1}
Axis to apply and broadcast over.
apply_func : callable
Function to apply.
left : np.ndarray
NumPy array of left partitions.
right : np.ndarray
NumPy array of right partitions.
Returns
-------
np.ndarray
NumPy array of result partition objects.
"""
if not DynamicPartitioning.get():
# block-wise broadcast
new_partitions = cls.base_broadcast_apply(
axis,
apply_func,
left,
right,
)
else:
# The dynamic partitioning behavior of `broadcast_apply` differs from that of `map_partitions`,
# since the columnar approach for `broadcast_apply` results in slowdown.
# axis-wise broadcast
new_partitions = cls.broadcast_axis_partitions(
axis=axis ^ 1,
left=left,
right=right,
apply_func=apply_func,
broadcast_all=False,
keep_partitioning=True,
)
return new_partitions

@classmethod
@wait_computations_if_benchmark_mode
def map_partitions(
Expand Down
4 changes: 1 addition & 3 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3157,9 +3157,7 @@ def dropna(self, **kwargs):
lib.no_default,
None,
)
# FIXME: this is a naive workaround for this problem: https://github.com/modin-project/modin/issues/5394
# if there are too many partitions then all non-full-axis implementations start acting very badly.
# The here threshold is pretty random though it works fine on simple scenarios
# The map reduce approach works well for frames with few columnar partitions
processable_amount_of_partitions = (
self._modin_frame.num_parts < CpuCount.get() * 32
)
Expand Down
33 changes: 32 additions & 1 deletion modin/tests/pandas/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@
import pytest

import modin.pandas as pd
from modin.config import IsRayCluster, NPartitions, RangePartitioning, StorageFormat
from modin.config import (
IsRayCluster,
NPartitions,
RangePartitioning,
StorageFormat,
context,
)
from modin.core.dataframe.algebra.default2pandas.groupby import GroupBy
from modin.core.dataframe.pandas.partitioning.axis_partition import (
PandasDataframeAxisPartition,
Expand Down Expand Up @@ -2431,6 +2437,31 @@ def test_multi_column_groupby_different_partitions(
)


def test_empty_partitions_after_groupby():
def func_to_apply(grp):
return grp.agg(
{
list(test_data_values[0].keys())[1]: "sum",
list(test_data_values[0].keys())[-1]: "sum",
}
)

data = test_data_values[0]
md_df, pd_df = create_test_dfs(data)
by = pd_df.columns[0]

with context(DynamicPartitioning=True):
md_grp, pd_grp = (
md_df.groupby(by),
pd_df.groupby(by),
)
eval_general(
md_grp,
pd_grp,
func_to_apply,
)


@pytest.mark.parametrize(
"by",
[
Expand Down

0 comments on commit 8249915

Please sign in to comment.