From 2ef2e0bc8620b3612f38459320ad43cd8a8b1818 Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Fri, 8 Oct 2021 12:46:50 -0500 Subject: [PATCH 01/11] FIX-#3585: Attempt to accelerate mean aggregation Signed-off-by: Gregory Shimansky --- .../storage_formats/pandas/query_compiler.py | 18 ++++++++ modin/pandas/groupby.py | 42 ++++++++++++++++++- modin/pandas/test/test_groupby.py | 31 ++++++++++++++ 3 files changed, 90 insertions(+), 1 deletion(-) diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index c65c4cf8caa..63dacc0b867 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -2491,6 +2491,24 @@ def _callable_func(self, func, axis, *args, **kwargs): groupby_prod = GroupByReduce.register("prod") groupby_sum = GroupByReduce.register("sum") + def _mean_agg_map(dfgb, **kwargs): + kwargs["min_count"] = 1 + result = dfgb.sum(**kwargs) + result["__mean_agg_size_column__"] = dfgb.size() + return result + + def _mean_agg_reduce(dfgb, **kwargs): + kwargs["min_count"] = 1 + result = dfgb.sum(**kwargs) + result = result.div(result["__mean_agg_size_column__"], axis=0) + return result.drop("__mean_agg_size_column__", axis=1) + + groupby_mean = GroupbyReduceFunction.register( + _mean_agg_map, + _mean_agg_reduce, + default_to_pandas_func=lambda dfgb, **kwargs: dfgb.mean(**kwargs), + ) + def groupby_size( self, by, axis, groupby_args, map_args, reduce_args, numeric_only, drop ): diff --git a/modin/pandas/groupby.py b/modin/pandas/groupby.py index c341599d221..56baf4d878a 100644 --- a/modin/pandas/groupby.py +++ b/modin/pandas/groupby.py @@ -33,6 +33,7 @@ ) from modin.core.storage_formats.base.query_compiler import BaseQueryCompiler from modin.core.dataframe.algebra.default2pandas.groupby import GroupBy +from pandas.core.dtypes.common import is_datetime64_any_dtype from modin.config import IsExperimental from .series import Series from .utils import is_label @@ -128,7 +129,46 @@ def sem(self, ddof=1): return self._default_to_pandas(lambda df: df.sem(ddof=ddof)) def mean(self, *args, **kwargs): - return self._apply_agg_function(lambda df: df.mean(*args, **kwargs)) + fallback = False + converted_columns = {} + if not kwargs.get("numeric_only", False): + for col, dt in self._df.dtypes.items(): + if is_datetime64_any_dtype(dt): + if self._df[col].hasnans: + fallback = True + break + else: + converted_columns[col] = dt + if fallback: + # We cannot use map-reduce approach because non-numeric columns are present and + # are requested to be processed. It happens because in map-reduce we use "sum" + # aggregation which always drops non-numeric columns unconditionally, no matter + # what arguments are specified. It is different from how "mean" handles non-numeric + # columns (in particular datetime types). We could use a workaround to convert + # datetime to int64 but it works only when NaNs aren't present in datetime columns. + # NaNs converted to int64 produce wrong results and have to be handled differently, + # so we have to resort to less efficient approach of broadcast full axis in + # _apply_agg_function. + result = self._apply_agg_function(lambda df: df.mean(*args, **kwargs)) + else: + if len(converted_columns) > 0: + # Convert all datetime64 types to int64 to allow pandas "sum" to work. + self._df = self._df.astype( + {col: "int64" for col in converted_columns.keys()} + ) + self._query_compiler = self._df._query_compiler + result = self._wrap_aggregation( + type(self._query_compiler).groupby_mean, + lambda df, **kwargs: df.mean(*args, **kwargs), + numeric_only=False, + **kwargs, + ) + if len(converted_columns) > 0: + # Convert int64 types back to datetime64 types in result. + result = result.astype( + {col: dt for col, dt in converted_columns.items()} + ) + return result def any(self, **kwargs): return self._wrap_aggregation( diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index 2e116c78ca4..84facacc7c1 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -1466,6 +1466,37 @@ def test_agg_exceptions(operation): eval_aggregation(*create_test_dfs(data), operation=operation) +def test_mean_agg_datetime(): + N = 200 + fill_data = [ + ( + "date_column", + [ + np.datetime64("2000"), + np.datetime64("2010"), + np.datetime64("2011"), + np.datetime64("2011-06-15T00:00"), + np.datetime64("2009-01-01"), + ] + * (N // 5), + ), + ] + + data1 = { + "column_to_by": ["foo", "bar", "baz", "qux"] * (N // 4), + } + + data2 = { + f"{key}{i}": value + for key, value in fill_data + for i in range(N // len(fill_data)) + } + + data = {**data1, **data2} + + eval_aggregation(*create_test_dfs(data), operation="mean") + + @pytest.mark.skip( "Pandas raises a ValueError on empty dictionary aggregation since 1.2.0" "It's unclear is that was made on purpose or it is a bug. That question" From 3f689a7ac6c23d4486235cafa264d96fb8c81d86 Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Wed, 20 Oct 2021 19:30:42 -0500 Subject: [PATCH 02/11] Added groupby_mean to the base query compiler Signed-off-by: Gregory Shimansky --- .../storage_formats/base/query_compiler.py | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/modin/core/storage_formats/base/query_compiler.py b/modin/core/storage_formats/base/query_compiler.py index 308034ac7de..dce888ae29c 100644 --- a/modin/core/storage_formats/base/query_compiler.py +++ b/modin/core/storage_formats/base/query_compiler.py @@ -2307,6 +2307,28 @@ def groupby_sum( drop=drop, ) + @doc_utils.doc_groupby_method(result="mean", refer_to="mean") + def groupby_mean( + self, + by, + axis, + groupby_args, + map_args, + reduce_args=None, + numeric_only=True, + drop=False, + ): + return GroupByDefault.register(pandas.core.groupby.DataFrameGroupBy.mean)( + self, + by=by, + axis=axis, + groupby_args=groupby_args, + map_args=map_args, + reduce_args=reduce_args, + numeric_only=numeric_only, + drop=drop, + ) + @doc_utils.doc_groupby_method( action="get the number of elements", result="number of elements", From a4e1e41a2a8278e70cf863245543ef09371c7354 Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Thu, 21 Oct 2021 12:30:39 -0500 Subject: [PATCH 03/11] Fixed dtypes for Series Signed-off-by: Gregory Shimansky --- modin/pandas/groupby.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/pandas/groupby.py b/modin/pandas/groupby.py index 56baf4d878a..72471e32e23 100644 --- a/modin/pandas/groupby.py +++ b/modin/pandas/groupby.py @@ -132,7 +132,7 @@ def mean(self, *args, **kwargs): fallback = False converted_columns = {} if not kwargs.get("numeric_only", False): - for col, dt in self._df.dtypes.items(): + for col, dt in self._query_compiler.dtypes.items(): if is_datetime64_any_dtype(dt): if self._df[col].hasnans: fallback = True From 6078771cd487e5794d33d226263be6758e04a361 Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Fri, 5 Nov 2021 17:13:02 -0500 Subject: [PATCH 04/11] Rebase to latest master Signed-off-by: Gregory Shimansky --- modin/core/storage_formats/pandas/query_compiler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 63dacc0b867..bf7a5d4644e 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -2503,7 +2503,7 @@ def _mean_agg_reduce(dfgb, **kwargs): result = result.div(result["__mean_agg_size_column__"], axis=0) return result.drop("__mean_agg_size_column__", axis=1) - groupby_mean = GroupbyReduceFunction.register( + groupby_mean = GroupByReduce.register( _mean_agg_map, _mean_agg_reduce, default_to_pandas_func=lambda dfgb, **kwargs: dfgb.mean(**kwargs), From dfb4e603dd1fd6d7724298d34c1f79141a75cbba Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Fri, 5 Nov 2021 17:13:17 -0500 Subject: [PATCH 05/11] Improved test to test integers/floats with NaNs Signed-off-by: Gregory Shimansky --- modin/pandas/test/test_groupby.py | 43 ++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index 84facacc7c1..0c7a8bb071a 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -1466,21 +1466,34 @@ def test_agg_exceptions(operation): eval_aggregation(*create_test_dfs(data), operation=operation) -def test_mean_agg_datetime(): +def test_mean_agg_different_types(): N = 200 - fill_data = [ - ( - "date_column", - [ - np.datetime64("2000"), - np.datetime64("2010"), - np.datetime64("2011"), - np.datetime64("2011-06-15T00:00"), - np.datetime64("2009-01-01"), - ] - * (N // 5), - ), - ] + fill_data = { + "date": [ + np.datetime64("2000"), + np.datetime64("2010"), + np.datetime64("2011"), + np.datetime64("2011-06-15T00:00"), + np.datetime64("2009-01-01"), + ] + * (N // 5), + "int_and_nan": [ + 2000, + 2010, + 2011, + None, + None, + ] + * (N // 5), + "float_and_nan": [ + 2000.0, + 2010.0, + 2011.0, + None, + None, + ] + * (N // 5), + } data1 = { "column_to_by": ["foo", "bar", "baz", "qux"] * (N // 4), @@ -1488,7 +1501,7 @@ def test_mean_agg_datetime(): data2 = { f"{key}{i}": value - for key, value in fill_data + for key, value in fill_data.items() for i in range(N // len(fill_data)) } From e239ae0422ed91baf471d74aee85994abc61c103 Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Fri, 5 Nov 2021 17:16:25 -0500 Subject: [PATCH 06/11] Added pure int and float columns for reference Signed-off-by: Gregory Shimansky --- modin/pandas/test/test_groupby.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index 0c7a8bb071a..a11f31705b5 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -1477,6 +1477,14 @@ def test_mean_agg_different_types(): np.datetime64("2009-01-01"), ] * (N // 5), + "int_only": [ + 2000, + 2010, + 2011, + 2012, + 2009 + ] + * (N // 5), "int_and_nan": [ 2000, 2010, @@ -1485,6 +1493,14 @@ def test_mean_agg_different_types(): None, ] * (N // 5), + "float_only": [ + 2000.0, + 2010.0, + 2011.0, + 2012.0, + 2009.0, + ] + * (N // 5), "float_and_nan": [ 2000.0, 2010.0, From 2f1d0fb9af1b87cebbb08a3db4512ea328e68e06 Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Mon, 8 Nov 2021 16:50:01 -0600 Subject: [PATCH 07/11] Implemented mean using "count" to count NaNs correctly Signed-off-by: Gregory Shimansky --- modin/core/storage_formats/pandas/query_compiler.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index bf7a5d4644e..2d76d9e0fbd 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -2494,14 +2494,19 @@ def _callable_func(self, func, axis, *args, **kwargs): def _mean_agg_map(dfgb, **kwargs): kwargs["min_count"] = 1 result = dfgb.sum(**kwargs) - result["__mean_agg_size_column__"] = dfgb.size() + divisor = dfgb.count() + divisor.set_axis(["__mean_agg_size_column__" + x for x in divisor.columns], axis=1, inplace=True) + result = pandas.concat([result, divisor], axis=1, copy=False) return result def _mean_agg_reduce(dfgb, **kwargs): kwargs["min_count"] = 1 result = dfgb.sum(**kwargs) - result = result.div(result["__mean_agg_size_column__"], axis=0) - return result.drop("__mean_agg_size_column__", axis=1) + divirgent = result[[x for x in result.columns if not x.startswith("__mean_agg_size_column__")]] + divisor = result[[x for x in result.columns if x.startswith("__mean_agg_size_column__")]] + divisor.set_axis([x[len("__mean_agg_size_column__"):] for x in divisor.columns], axis=1, inplace=True) + result = divirgent.divide(divisor) + return result groupby_mean = GroupByReduce.register( _mean_agg_map, From 17801014220255c68c16f7cb47116aeb26f79104 Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Tue, 9 Nov 2021 15:43:30 -0600 Subject: [PATCH 08/11] Added numeric_only to test and function Signed-off-by: Gregory Shimansky --- modin/pandas/groupby.py | 4 ++-- modin/pandas/test/test_groupby.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/modin/pandas/groupby.py b/modin/pandas/groupby.py index 72471e32e23..8460948e381 100644 --- a/modin/pandas/groupby.py +++ b/modin/pandas/groupby.py @@ -131,7 +131,8 @@ def sem(self, ddof=1): def mean(self, *args, **kwargs): fallback = False converted_columns = {} - if not kwargs.get("numeric_only", False): + numeric_only=kwargs.get("numeric_only", False) + if not numeric_only: for col, dt in self._query_compiler.dtypes.items(): if is_datetime64_any_dtype(dt): if self._df[col].hasnans: @@ -160,7 +161,6 @@ def mean(self, *args, **kwargs): result = self._wrap_aggregation( type(self._query_compiler).groupby_mean, lambda df, **kwargs: df.mean(*args, **kwargs), - numeric_only=False, **kwargs, ) if len(converted_columns) > 0: diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index a11f31705b5..7bc9d68352b 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -1465,8 +1465,8 @@ def test_agg_exceptions(operation): eval_aggregation(*create_test_dfs(data), operation=operation) - -def test_mean_agg_different_types(): +@pytest.mark.parametrize("numeric_only", [True, False]) +def test_mean_agg_different_types(numeric_only): N = 200 fill_data = { "date": [ @@ -1523,7 +1523,7 @@ def test_mean_agg_different_types(): data = {**data1, **data2} - eval_aggregation(*create_test_dfs(data), operation="mean") + eval_aggregation(*create_test_dfs(data), operation="mean", numeric_only=numeric_only) @pytest.mark.skip( From 2cc6bda12ec93191b10508e9c21d67e666ba206a Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Wed, 10 Nov 2021 17:13:18 -0600 Subject: [PATCH 09/11] Remove undesired "by" columns in reduce phase Signed-off-by: Gregory Shimansky --- modin/core/storage_formats/pandas/query_compiler.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 2d76d9e0fbd..21e8260ea9c 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -2505,6 +2505,9 @@ def _mean_agg_reduce(dfgb, **kwargs): divirgent = result[[x for x in result.columns if not x.startswith("__mean_agg_size_column__")]] divisor = result[[x for x in result.columns if x.startswith("__mean_agg_size_column__")]] divisor.set_axis([x[len("__mean_agg_size_column__"):] for x in divisor.columns], axis=1, inplace=True) + # Following line makes sure we exclude any "by" columns that could be carried in via "__mean_agg_size_column__" + # while they shouldn't be present and was removed since map phase was done. + divisor = divisor[divirgent.columns] result = divirgent.divide(divisor) return result From a98d196cf55e90694fb6b2a7f354e35e7d123615 Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Wed, 10 Nov 2021 17:39:31 -0600 Subject: [PATCH 10/11] Renamed map-reduce version of groupby_mean into groupby_mean_numeric because it doesn't work with non-numeric types. Signed-off-by: Gregory Shimansky --- modin/core/storage_formats/base/query_compiler.py | 2 +- modin/core/storage_formats/pandas/query_compiler.py | 2 +- modin/pandas/groupby.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/modin/core/storage_formats/base/query_compiler.py b/modin/core/storage_formats/base/query_compiler.py index dce888ae29c..929b0aac106 100644 --- a/modin/core/storage_formats/base/query_compiler.py +++ b/modin/core/storage_formats/base/query_compiler.py @@ -2308,7 +2308,7 @@ def groupby_sum( ) @doc_utils.doc_groupby_method(result="mean", refer_to="mean") - def groupby_mean( + def groupby_mean_numeric( self, by, axis, diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 21e8260ea9c..ed46c2d8679 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -2511,7 +2511,7 @@ def _mean_agg_reduce(dfgb, **kwargs): result = divirgent.divide(divisor) return result - groupby_mean = GroupByReduce.register( + groupby_mean_numeric = GroupByReduce.register( _mean_agg_map, _mean_agg_reduce, default_to_pandas_func=lambda dfgb, **kwargs: dfgb.mean(**kwargs), diff --git a/modin/pandas/groupby.py b/modin/pandas/groupby.py index 8460948e381..f4c6c8f0e04 100644 --- a/modin/pandas/groupby.py +++ b/modin/pandas/groupby.py @@ -159,7 +159,7 @@ def mean(self, *args, **kwargs): ) self._query_compiler = self._df._query_compiler result = self._wrap_aggregation( - type(self._query_compiler).groupby_mean, + type(self._query_compiler).groupby_mean_numeric, lambda df, **kwargs: df.mean(*args, **kwargs), **kwargs, ) From b84ccf1d3ba7592150fb321c5a04dc35aaec8ff2 Mon Sep 17 00:00:00 2001 From: Gregory Shimansky Date: Wed, 10 Nov 2021 17:44:38 -0600 Subject: [PATCH 11/11] Formatting fixes Signed-off-by: Gregory Shimansky --- .../storage_formats/pandas/query_compiler.py | 20 +++++++++++++++---- modin/pandas/groupby.py | 2 +- modin/pandas/test/test_groupby.py | 14 +++++-------- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index ed46c2d8679..72c116f5960 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -2495,16 +2495,28 @@ def _mean_agg_map(dfgb, **kwargs): kwargs["min_count"] = 1 result = dfgb.sum(**kwargs) divisor = dfgb.count() - divisor.set_axis(["__mean_agg_size_column__" + x for x in divisor.columns], axis=1, inplace=True) + divisor.set_axis( + ["__mean_agg_size_column__" + x for x in divisor.columns], + axis=1, + inplace=True, + ) result = pandas.concat([result, divisor], axis=1, copy=False) return result def _mean_agg_reduce(dfgb, **kwargs): kwargs["min_count"] = 1 result = dfgb.sum(**kwargs) - divirgent = result[[x for x in result.columns if not x.startswith("__mean_agg_size_column__")]] - divisor = result[[x for x in result.columns if x.startswith("__mean_agg_size_column__")]] - divisor.set_axis([x[len("__mean_agg_size_column__"):] for x in divisor.columns], axis=1, inplace=True) + divirgent = result[ + [x for x in result.columns if not x.startswith("__mean_agg_size_column__")] + ] + divisor = result[ + [x for x in result.columns if x.startswith("__mean_agg_size_column__")] + ] + divisor.set_axis( + [x[len("__mean_agg_size_column__") :] for x in divisor.columns], + axis=1, + inplace=True, + ) # Following line makes sure we exclude any "by" columns that could be carried in via "__mean_agg_size_column__" # while they shouldn't be present and was removed since map phase was done. divisor = divisor[divirgent.columns] diff --git a/modin/pandas/groupby.py b/modin/pandas/groupby.py index f4c6c8f0e04..c53aa432bcd 100644 --- a/modin/pandas/groupby.py +++ b/modin/pandas/groupby.py @@ -131,7 +131,7 @@ def sem(self, ddof=1): def mean(self, *args, **kwargs): fallback = False converted_columns = {} - numeric_only=kwargs.get("numeric_only", False) + numeric_only = kwargs.get("numeric_only", False) if not numeric_only: for col, dt in self._query_compiler.dtypes.items(): if is_datetime64_any_dtype(dt): diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index 7bc9d68352b..d4086f944ff 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -1465,6 +1465,7 @@ def test_agg_exceptions(operation): eval_aggregation(*create_test_dfs(data), operation=operation) + @pytest.mark.parametrize("numeric_only", [True, False]) def test_mean_agg_different_types(numeric_only): N = 200 @@ -1477,14 +1478,7 @@ def test_mean_agg_different_types(numeric_only): np.datetime64("2009-01-01"), ] * (N // 5), - "int_only": [ - 2000, - 2010, - 2011, - 2012, - 2009 - ] - * (N // 5), + "int_only": [2000, 2010, 2011, 2012, 2009] * (N // 5), "int_and_nan": [ 2000, 2010, @@ -1523,7 +1517,9 @@ def test_mean_agg_different_types(numeric_only): data = {**data1, **data2} - eval_aggregation(*create_test_dfs(data), operation="mean", numeric_only=numeric_only) + eval_aggregation( + *create_test_dfs(data), operation="mean", numeric_only=numeric_only + ) @pytest.mark.skip(