Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempt to accelerate mean aggregation #3586

Closed
22 changes: 22 additions & 0 deletions modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2307,6 +2307,28 @@ def groupby_sum(
drop=drop,
)

@doc_utils.doc_groupby_method(result="mean", refer_to="mean")
def groupby_mean_numeric(
self,
by,
axis,
groupby_args,
map_args,
reduce_args=None,
numeric_only=True,
drop=False,
):
return GroupByDefault.register(pandas.core.groupby.DataFrameGroupBy.mean)(
self,
by=by,
axis=axis,
groupby_args=groupby_args,
map_args=map_args,
reduce_args=reduce_args,
numeric_only=numeric_only,
drop=drop,
)

@doc_utils.doc_groupby_method(
action="get the number of elements",
result="number of elements",
Expand Down
38 changes: 38 additions & 0 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2491,6 +2491,44 @@ def _callable_func(self, func, axis, *args, **kwargs):
groupby_prod = GroupByReduce.register("prod")
groupby_sum = GroupByReduce.register("sum")

def _mean_agg_map(dfgb, **kwargs):
kwargs["min_count"] = 1
result = dfgb.sum(**kwargs)
divisor = dfgb.count()
divisor.set_axis(
["__mean_agg_size_column__" + x for x in divisor.columns],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this breaks on a MultiIndex columns in divisor

import modin.pandas as pd

df = pd.DataFrame(
    {
        ("foo", "a"): [1, 1, 2],
        ("foo", "b"): [3, 4, 5],
        ("bar", "a"): [10, 20, 30],
        ("bar", "b"): [40, 50, 60],
    }
)

print(df.groupby(("foo", "a")).mean())
Error traceback
  File "/localdisk1/dchigare/repos/modin_bp/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py", line 479, in apply_list_of_funcs
    partition = func(partition, *args, **kwargs)
  File "/localdisk1/dchigare/repos/modin_bp/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py", line 169, in map_func
    return apply_func(df, **{other_name: other})
  File "/localdisk1/dchigare/repos/modin_bp/modin/core/dataframe/algebra/groupby.py", line 415, in _map
    result = wrapper(df, other)
  File "/localdisk1/dchigare/repos/modin_bp/modin/core/dataframe/algebra/groupby.py", line 402, in wrapper
    return cls.map(
  File "/localdisk1/dchigare/repos/modin_bp/modin/core/dataframe/algebra/groupby.py", line 139, in map
    result = apply_func(
  File "/localdisk1/dchigare/repos/modin_bp/modin/core/storage_formats/pandas/query_compiler.py", line 2499, in _mean_agg_map
    ["__mean_agg_size_column__" + x for x in divisor.columns],
  File "/localdisk1/dchigare/repos/modin_bp/modin/core/storage_formats/pandas/query_compiler.py", line 2499, in <listcomp>
    ["__mean_agg_size_column__" + x for x in divisor.columns],
TypeError: can only concatenate str (not "tuple") to str

I would recommend using add_prefix here instead of manual renaming: divisor.add_prefix(prefix).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I would like the prefix to be stored in some variable to simplify its usage

axis=1,
inplace=True,
)
result = pandas.concat([result, divisor], axis=1, copy=False)
return result

def _mean_agg_reduce(dfgb, **kwargs):
kwargs["min_count"] = 1
result = dfgb.sum(**kwargs)
divirgent = result[
[x for x in result.columns if not x.startswith("__mean_agg_size_column__")]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the case of MultiIndex x is a tuple here, we probably need an if-else check for this

Copy link
Collaborator Author

@gshimansky gshimansky Jan 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking again about this, probably a best approach would be to add a new multiindex level which would distinguish between original columns and result of .count() operation. What do you think? I am not sure whether to do it this way in all cases or only in case of MultIndex columns.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mean is df.sum() / df.count() (or df.groupby.sum()/df.groupby.count()), and size is just len so I don't think it would be correct. Each column has an independent count

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should be count() instead of size() in my comment above (I edited it too for clarity). The comment itself was not about using size(), but was about distinguishing new columns from original without adding a string prefix to their names in a multiindex case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I have a prototype of this here: https://github.com/modin-project/modin/pull/1902/files

]
divisor = result[
[x for x in result.columns if x.startswith("__mean_agg_size_column__")]
]
divisor.set_axis(
[x[len("__mean_agg_size_column__") :] for x in divisor.columns],
axis=1,
inplace=True,
)
Comment on lines +2515 to +2519
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order of columns among dividend and divisor frames should be the same, so can we just do: divisor.columns = divirgent.columns.copy()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except divisor and divirgent have different sets of columns because divisor also has "by" columns prefixed with "mean_agg_size_column". The following line (with comment above it) makes sure that they are excluded.

# Following line makes sure we exclude any "by" columns that could be carried in via "__mean_agg_size_column__"
# while they shouldn't be present and was removed since map phase was done.
divisor = divisor[divirgent.columns]
result = divirgent.divide(divisor)
return result

groupby_mean_numeric = GroupByReduce.register(
_mean_agg_map,
_mean_agg_reduce,
default_to_pandas_func=lambda dfgb, **kwargs: dfgb.mean(**kwargs),
)

def groupby_size(
self, by, axis, groupby_args, map_args, reduce_args, numeric_only, drop
):
Expand Down
42 changes: 41 additions & 1 deletion modin/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
)
from modin.core.storage_formats.base.query_compiler import BaseQueryCompiler
from modin.core.dataframe.algebra.default2pandas.groupby import GroupBy
from pandas.core.dtypes.common import is_datetime64_any_dtype
from modin.config import IsExperimental
from .series import Series
from .utils import is_label
Expand Down Expand Up @@ -128,7 +129,46 @@ def sem(self, ddof=1):
return self._default_to_pandas(lambda df: df.sem(ddof=ddof))

def mean(self, *args, **kwargs):
return self._apply_agg_function(lambda df: df.mean(*args, **kwargs))
fallback = False
converted_columns = {}
numeric_only = kwargs.get("numeric_only", False)
if not numeric_only:
for col, dt in self._query_compiler.dtypes.items():
if is_datetime64_any_dtype(dt):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will timedelta64 be affected too?

if self._df[col].hasnans:
fallback = True
break
else:
converted_columns[col] = dt
Comment on lines +136 to +142
Copy link
Collaborator

@dchigarev dchigarev Nov 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in order to minimize backend-API calls I would replace this for-loop with something like this:

datetime_cols = self._df[[col for col, dt in self._df.dtypes.items() if is_datetime64_any_dtype(dt)]]
has_nans_mask = datetime_cols.isna().any()

I've compared these two approaches and it appeared that the proposed one almost always faster:

Micro-benchmark and its results
import modin.pandas as pd
import numpy as np
import timeit


def is_dtype_to_pick(dtype):
    return dtype == "float"


def fn1(df):
    to_convert = {}
    fallback = False
    for col, dt in df.dtypes.items():
        if is_dtype_to_pick(dt):
            if df[col].hasnans:
                fallback = True
                break
            else:
                to_convert[col] = dt


def fn2(df):
    cols_to_select = [col for col, dt in df.dtypes.items() if is_dtype_to_pick(dt)]
    if len(cols_to_select) == 0:
        return
    selected_df = df[cols_to_select]
    fallback = selected_df.isna().any().any()
    if not fallback:
        to_convert = selected_df.dtypes.items()


NROWS = 1_000_000
NCOLS = 10

df_no_floats = pd.DataFrame({f"col{i}": np.arange(NROWS) for i in range(NCOLS)})
df_has_half_float_no_nans = df_no_floats.astype({col: "float" for col in df_no_floats.columns[:len(df_no_floats.columns) // 2]})

df_has_half_float_nans_first = df_has_half_float_no_nans.copy()
df_has_half_float_nans_first.loc[0, df_has_half_float_nans_first.select_dtypes("float").columns[0]] = np.nan

df_has_half_float_nans_last = df_has_half_float_no_nans.copy()
df_has_half_float_nans_first.loc[0, df_has_half_float_nans_first.select_dtypes("float").columns[-1]] = np.nan

NRUNS = 5

print("df_no_float:")
print("for-loop:", timeit.timeit(lambda: fn1(df_no_floats), number=NRUNS))
print("non-for-loop:", timeit.timeit(lambda: fn2(df_no_floats), number=NRUNS))

print("\ndf_has_half_float_no_nans:")
print("for-loop:", timeit.timeit(lambda: fn1(df_has_half_float_no_nans), number=NRUNS))
print("non-for-loop:", timeit.timeit(lambda: fn2(df_has_half_float_no_nans), number=NRUNS))

print("\ndf_has_half_float_nans_first:")
print("for-loop:", timeit.timeit(lambda: fn1(df_has_half_float_nans_first), number=NRUNS))
print("non-for-loop:", timeit.timeit(lambda: fn2(df_has_half_float_nans_first), number=NRUNS))

print("\ndf_has_half_float_nans_last:")
print("for-loop:", timeit.timeit(lambda: fn1(df_has_half_float_nans_last), number=NRUNS))
print("non-for-loop:", timeit.timeit(lambda: fn2(df_has_half_float_nans_last), number=NRUNS))

Results (it's a sum of all 5 runs, this is how timeit measures things):

df_no_float:
for-loop: 8.46749753691256e-05
non-for-loop: 6.431201472878456e-05

df_has_half_float_no_nans:
for-loop: 5.488216175988782
non-for-loop: 1.856114284018986

df_has_half_float_nans_first:
for-loop: 1.3251252140034921
non-for-loop: 1.966320100997109

df_has_half_float_nans_last:
for-loop: 5.407154903979972
non-for-loop: 1.9547889120294712

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the datetime_cols = ... here would make a copy in pandas (not sure about modin). I suggest select_dtypes

if fallback:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest moving this dispatching to the QueryCompiler level, calling groupby functions via _wrap_aggregation doesn't force us to use MapReduce implementation, it just means that the backend's implementation flow deviates from simple groupby().aggregate(func_name) and so the special method was introduced at the QC level.

Or instead of this, to make things clear, we should rename groupby_mean to the groupby_numeric_mean, so the groupby_mean which sometimes fails for DateTime cols wasn't confusing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think renaming into groupby_mean_numeric is the best approach. Otherwise it is not clear how to handle mean aggregation in query_compiler separately from all other kinds of aggregations.

# We cannot use map-reduce approach because non-numeric columns are present and
# are requested to be processed. It happens because in map-reduce we use "sum"
# aggregation which always drops non-numeric columns unconditionally, no matter
# what arguments are specified. It is different from how "mean" handles non-numeric
# columns (in particular datetime types). We could use a workaround to convert
# datetime to int64 but it works only when NaNs aren't present in datetime columns.
# NaNs converted to int64 produce wrong results and have to be handled differently,
# so we have to resort to less efficient approach of broadcast full axis in
Comment on lines +150 to +151
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does NaNs converted to int produce wrong results? As far as I know, we can convert NaN to float only. Does backward converting of float -> int -> datetime causing precision loss and so incorrect results?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider the following example:

df = pd.DataFrame({"a": [1, 1, 2, 2], "b": [np.datetime64("2001-01-01"), np.NaN, np.datetime64("2001-01-01"), np.datetime64("2002-01-01")]})

Here if we convert column "b" to int64 as df.astype({"b": "int64"}) you can see that NaN in column "b" is converted into value -9223372036854775808 which affects the final sum of values. If NaN was converted into zero, we could use this approach because effectively NaN values would be ignored. We cannot convert into float64 because pandas gives an error.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would it take to make this case (or just all datetimelike cases) just use the pandas implementation? https://github.com/pandas-dev/pandas/blob/main/pandas/_libs/groupby.pyx#L721

# _apply_agg_function.
result = self._apply_agg_function(lambda df: df.mean(*args, **kwargs))
else:
if len(converted_columns) > 0:
# Convert all datetime64 types to int64 to allow pandas "sum" to work.
self._df = self._df.astype(
{col: "int64" for col in converted_columns.keys()}
)
Comment on lines +156 to +159
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think that it's pandas-backend specific because of its MapReduce implementation and so should be moved to the QC. Let's wait for other reviewers' opinions though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in pandas this astype would make a copy, so we'd do .view('i8') instead. is that different in modin?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in pandas this astype would make a copy, so we'd do .view('i8') instead. is that different in modin?

No, astype creates a copy in Modin, too. Unfortunately, in Modin, Series.view does not have the same semantics as in pandas. While it creates a dataframe with the correct type, the new series doesn't share the same underlying values as the original one. Not only that, Modin view also creates a copy of all the data under the hood. I've created #4650 to track the differences in behavior.

self._query_compiler = self._df._query_compiler
result = self._wrap_aggregation(
type(self._query_compiler).groupby_mean_numeric,
lambda df, **kwargs: df.mean(*args, **kwargs),
**kwargs,
)
if len(converted_columns) > 0:
# Convert int64 types back to datetime64 types in result.
result = result.astype(
{col: dt for col, dt in converted_columns.items()}
)
return result

def any(self, **kwargs):
return self._wrap_aggregation(
Expand Down
56 changes: 56 additions & 0 deletions modin/pandas/test/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -1466,6 +1466,62 @@ def test_agg_exceptions(operation):
eval_aggregation(*create_test_dfs(data), operation=operation)


@pytest.mark.parametrize("numeric_only", [True, False])
def test_mean_agg_different_types(numeric_only):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also add a mean test to the test_groupby_multiindex to check whether our intermediate columns renaming works with them

N = 200
fill_data = {
"date": [
np.datetime64("2000"),
np.datetime64("2010"),
np.datetime64("2011"),
np.datetime64("2011-06-15T00:00"),
np.datetime64("2009-01-01"),
]
* (N // 5),
"int_only": [2000, 2010, 2011, 2012, 2009] * (N // 5),
"int_and_nan": [
2000,
2010,
2011,
None,
None,
]
* (N // 5),
"float_only": [
2000.0,
2010.0,
2011.0,
2012.0,
2009.0,
]
* (N // 5),
"float_and_nan": [
2000.0,
2010.0,
2011.0,
None,
None,
]
* (N // 5),
}

data1 = {
"column_to_by": ["foo", "bar", "baz", "qux"] * (N // 4),
}

data2 = {
f"{key}{i}": value
for key, value in fill_data.items()
for i in range(N // len(fill_data))
}

data = {**data1, **data2}

eval_aggregation(
*create_test_dfs(data), operation="mean", numeric_only=numeric_only
)


@pytest.mark.skip(
"Pandas raises a ValueError on empty dictionary aggregation since 1.2.0"
"It's unclear is that was made on purpose or it is a bug. That question"
Expand Down