diff --git a/modin/core/storage_formats/base/query_compiler.py b/modin/core/storage_formats/base/query_compiler.py index 308034ac7de..929b0aac106 100644 --- a/modin/core/storage_formats/base/query_compiler.py +++ b/modin/core/storage_formats/base/query_compiler.py @@ -2307,6 +2307,28 @@ def groupby_sum( drop=drop, ) + @doc_utils.doc_groupby_method(result="mean", refer_to="mean") + def groupby_mean_numeric( + self, + by, + axis, + groupby_args, + map_args, + reduce_args=None, + numeric_only=True, + drop=False, + ): + return GroupByDefault.register(pandas.core.groupby.DataFrameGroupBy.mean)( + self, + by=by, + axis=axis, + groupby_args=groupby_args, + map_args=map_args, + reduce_args=reduce_args, + numeric_only=numeric_only, + drop=drop, + ) + @doc_utils.doc_groupby_method( action="get the number of elements", result="number of elements", diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index c65c4cf8caa..72c116f5960 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -2491,6 +2491,44 @@ def _callable_func(self, func, axis, *args, **kwargs): groupby_prod = GroupByReduce.register("prod") groupby_sum = GroupByReduce.register("sum") + def _mean_agg_map(dfgb, **kwargs): + kwargs["min_count"] = 1 + result = dfgb.sum(**kwargs) + divisor = dfgb.count() + divisor.set_axis( + ["__mean_agg_size_column__" + x for x in divisor.columns], + axis=1, + inplace=True, + ) + result = pandas.concat([result, divisor], axis=1, copy=False) + return result + + def _mean_agg_reduce(dfgb, **kwargs): + kwargs["min_count"] = 1 + result = dfgb.sum(**kwargs) + divirgent = result[ + [x for x in result.columns if not x.startswith("__mean_agg_size_column__")] + ] + divisor = result[ + [x for x in result.columns if x.startswith("__mean_agg_size_column__")] + ] + divisor.set_axis( + [x[len("__mean_agg_size_column__") :] for x in divisor.columns], + axis=1, + inplace=True, + ) + # Following line makes sure we exclude any "by" columns that could be carried in via "__mean_agg_size_column__" + # while they shouldn't be present and was removed since map phase was done. + divisor = divisor[divirgent.columns] + result = divirgent.divide(divisor) + return result + + groupby_mean_numeric = GroupByReduce.register( + _mean_agg_map, + _mean_agg_reduce, + default_to_pandas_func=lambda dfgb, **kwargs: dfgb.mean(**kwargs), + ) + def groupby_size( self, by, axis, groupby_args, map_args, reduce_args, numeric_only, drop ): diff --git a/modin/pandas/groupby.py b/modin/pandas/groupby.py index c341599d221..c53aa432bcd 100644 --- a/modin/pandas/groupby.py +++ b/modin/pandas/groupby.py @@ -33,6 +33,7 @@ ) from modin.core.storage_formats.base.query_compiler import BaseQueryCompiler from modin.core.dataframe.algebra.default2pandas.groupby import GroupBy +from pandas.core.dtypes.common import is_datetime64_any_dtype from modin.config import IsExperimental from .series import Series from .utils import is_label @@ -128,7 +129,46 @@ def sem(self, ddof=1): return self._default_to_pandas(lambda df: df.sem(ddof=ddof)) def mean(self, *args, **kwargs): - return self._apply_agg_function(lambda df: df.mean(*args, **kwargs)) + fallback = False + converted_columns = {} + numeric_only = kwargs.get("numeric_only", False) + if not numeric_only: + for col, dt in self._query_compiler.dtypes.items(): + if is_datetime64_any_dtype(dt): + if self._df[col].hasnans: + fallback = True + break + else: + converted_columns[col] = dt + if fallback: + # We cannot use map-reduce approach because non-numeric columns are present and + # are requested to be processed. It happens because in map-reduce we use "sum" + # aggregation which always drops non-numeric columns unconditionally, no matter + # what arguments are specified. It is different from how "mean" handles non-numeric + # columns (in particular datetime types). We could use a workaround to convert + # datetime to int64 but it works only when NaNs aren't present in datetime columns. + # NaNs converted to int64 produce wrong results and have to be handled differently, + # so we have to resort to less efficient approach of broadcast full axis in + # _apply_agg_function. + result = self._apply_agg_function(lambda df: df.mean(*args, **kwargs)) + else: + if len(converted_columns) > 0: + # Convert all datetime64 types to int64 to allow pandas "sum" to work. + self._df = self._df.astype( + {col: "int64" for col in converted_columns.keys()} + ) + self._query_compiler = self._df._query_compiler + result = self._wrap_aggregation( + type(self._query_compiler).groupby_mean_numeric, + lambda df, **kwargs: df.mean(*args, **kwargs), + **kwargs, + ) + if len(converted_columns) > 0: + # Convert int64 types back to datetime64 types in result. + result = result.astype( + {col: dt for col, dt in converted_columns.items()} + ) + return result def any(self, **kwargs): return self._wrap_aggregation( diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index 2e116c78ca4..d4086f944ff 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -1466,6 +1466,62 @@ def test_agg_exceptions(operation): eval_aggregation(*create_test_dfs(data), operation=operation) +@pytest.mark.parametrize("numeric_only", [True, False]) +def test_mean_agg_different_types(numeric_only): + N = 200 + fill_data = { + "date": [ + np.datetime64("2000"), + np.datetime64("2010"), + np.datetime64("2011"), + np.datetime64("2011-06-15T00:00"), + np.datetime64("2009-01-01"), + ] + * (N // 5), + "int_only": [2000, 2010, 2011, 2012, 2009] * (N // 5), + "int_and_nan": [ + 2000, + 2010, + 2011, + None, + None, + ] + * (N // 5), + "float_only": [ + 2000.0, + 2010.0, + 2011.0, + 2012.0, + 2009.0, + ] + * (N // 5), + "float_and_nan": [ + 2000.0, + 2010.0, + 2011.0, + None, + None, + ] + * (N // 5), + } + + data1 = { + "column_to_by": ["foo", "bar", "baz", "qux"] * (N // 4), + } + + data2 = { + f"{key}{i}": value + for key, value in fill_data.items() + for i in range(N // len(fill_data)) + } + + data = {**data1, **data2} + + eval_aggregation( + *create_test_dfs(data), operation="mean", numeric_only=numeric_only + ) + + @pytest.mark.skip( "Pandas raises a ValueError on empty dictionary aggregation since 1.2.0" "It's unclear is that was made on purpose or it is a bug. That question"