-
Notifications
You must be signed in to change notification settings - Fork 653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Attempt to accelerate mean aggregation #3586
Attempt to accelerate mean aggregation #3586
Conversation
fc3ec80
to
b445b26
Compare
Codecov Report
@@ Coverage Diff @@
## master #3586 +/- ##
==========================================
+ Coverage 85.34% 87.69% +2.34%
==========================================
Files 195 195
Lines 16196 16816 +620
==========================================
+ Hits 13822 14746 +924
+ Misses 2374 2070 -304
Continue to review full report at Codecov.
|
This pull request introduces 2 alerts when merging 38939a8bace35169adb767c0b31adf61e215ad74 into dd8b423 - view on LGTM.com new alerts:
|
This pull request introduces 2 alerts when merging 841257cf5730fa2f858fa8b458616870f931cc91 into 17ad1f0 - view on LGTM.com new alerts:
|
def _mean_agg_map(dfgb, **kwargs): | ||
kwargs["min_count"] = 1 | ||
result = dfgb.sum(**kwargs) | ||
result["__mean_agg_size_column__"] = dfgb.size() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably need to count its own size
for every column, there is no fallback for NaN numeric columns at the front-end, so I suppose that this implementation should be able to handle cases like this, when the amount of NaN values is different among the cols:
import modin.pandas as pd
import numpy as np
df = pd.DataFrame({"a": [1, 1, 2, 2], "b": [10, np.nan, np.nan, np.nan], "c": [1, 2, 3, 4]})
print(f"modin:\n{df.groupby('a').mean()}\n")
print(f"pandas:\n{df._to_pandas().groupby('a').mean()}")
Output for this branch
modin:
b c
a
1 5.0 1.5
2 NaN 3.5
pandas:
b c
a
1 10.0 1.5
2 NaN 3.5
I suggest we would use dfgb.count()
instead of size
, this way we get the divisor for each column and will be able to concatenate it with the sum result (but it's needed to add some prefix to the divisors frame):
col1 col2 col3 divisor_col1 divisor_col2 divisor_col3
group1 10 30 4 3 5 3
group2 20 40 2 4 2 4
As an optimization, we may detect which columns contain no NaNs, and so they can use a common divisor for all of such cols:
nan_cols = tuple(
name for name, is_nan_col in df.isna().any().iteritems() if is_nan_col
)
...
def map_phase():
result = dfgb.sum()
nans_divisor = dfgb[nan_cols].count()
no_nans_divisor = dfgb[any_non_nan_col].count()
map_phase_result = pandas.concat(
[result, add_some_prefixes(nans_divisor, no_nans_divisor)],
axis=1
)
# Possible result (saved one 'count' call):
# col1 col2 col3 divisor_for_non_nan_cols divisor_col2
# group1 10 30 4 3 5
# group2 20 40 2 4 2
However, computing NaN cols may spend more time than the count
for every possible column, this is needed to be checked.
break | ||
else: | ||
converted_columns[col] = dt | ||
if fallback: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest moving this dispatching to the QueryCompiler level, calling groupby functions via _wrap_aggregation
doesn't force us to use MapReduce implementation, it just means that the backend's implementation flow deviates from simple groupby().aggregate(func_name)
and so the special method was introduced at the QC level.
Or instead of this, to make things clear, we should rename groupby_mean
to the groupby_numeric_mean
, so the groupby_mean
which sometimes fails for DateTime cols wasn't confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think renaming into groupby_mean_numeric
is the best approach. Otherwise it is not clear how to handle mean
aggregation in query_compiler separately from all other kinds of aggregations.
841257c
to
37b8f93
Compare
This pull request introduces 2 alerts when merging 37b8f939377e1d318d9c4ed1d259d94d82f8abc5 into 8acad95 - view on LGTM.com new alerts:
|
Signed-off-by: Gregory Shimansky <gregory.shimansky@intel.com>
Signed-off-by: Gregory Shimansky <gregory.shimansky@intel.com>
Signed-off-by: Gregory Shimansky <gregory.shimansky@intel.com>
Signed-off-by: Gregory Shimansky <gregory.shimansky@intel.com>
Signed-off-by: Gregory Shimansky <gregory.shimansky@intel.com>
Signed-off-by: Gregory Shimansky <gregory.shimansky@intel.com>
Signed-off-by: Gregory Shimansky <gregory.shimansky@intel.com>
Signed-off-by: Gregory Shimansky <gregory.shimansky@intel.com>
Signed-off-by: Gregory Shimansky <gregory.shimansky@intel.com>
37b8f93
to
2cc6bda
Compare
into groupby_mean_numeric because it doesn't work with non-numeric types. Signed-off-by: Gregory Shimansky <gregory.shimansky@intel.com>
Signed-off-by: Gregory Shimansky <gregory.shimansky@intel.com>
This pull request introduces 2 alerts when merging b84ccf1 into d81b118 - view on LGTM.com new alerts:
|
With
|
result = dfgb.sum(**kwargs) | ||
divisor = dfgb.count() | ||
divisor.set_axis( | ||
["__mean_agg_size_column__" + x for x in divisor.columns], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this breaks on a MultiIndex columns in divisor
import modin.pandas as pd
df = pd.DataFrame(
{
("foo", "a"): [1, 1, 2],
("foo", "b"): [3, 4, 5],
("bar", "a"): [10, 20, 30],
("bar", "b"): [40, 50, 60],
}
)
print(df.groupby(("foo", "a")).mean())
Error traceback
File "/localdisk1/dchigare/repos/modin_bp/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py", line 479, in apply_list_of_funcs
partition = func(partition, *args, **kwargs)
File "/localdisk1/dchigare/repos/modin_bp/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py", line 169, in map_func
return apply_func(df, **{other_name: other})
File "/localdisk1/dchigare/repos/modin_bp/modin/core/dataframe/algebra/groupby.py", line 415, in _map
result = wrapper(df, other)
File "/localdisk1/dchigare/repos/modin_bp/modin/core/dataframe/algebra/groupby.py", line 402, in wrapper
return cls.map(
File "/localdisk1/dchigare/repos/modin_bp/modin/core/dataframe/algebra/groupby.py", line 139, in map
result = apply_func(
File "/localdisk1/dchigare/repos/modin_bp/modin/core/storage_formats/pandas/query_compiler.py", line 2499, in _mean_agg_map
["__mean_agg_size_column__" + x for x in divisor.columns],
File "/localdisk1/dchigare/repos/modin_bp/modin/core/storage_formats/pandas/query_compiler.py", line 2499, in <listcomp>
["__mean_agg_size_column__" + x for x in divisor.columns],
TypeError: can only concatenate str (not "tuple") to str
I would recommend using add_prefix
here instead of manual renaming: divisor.add_prefix(prefix)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I would like the prefix to be stored in some variable to simplify its usage
kwargs["min_count"] = 1 | ||
result = dfgb.sum(**kwargs) | ||
divirgent = result[ | ||
[x for x in result.columns if not x.startswith("__mean_agg_size_column__")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the case of MultiIndex x
is a tuple here, we probably need an if-else check for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking again about this, probably a best approach would be to add a new multiindex level which would distinguish between original columns and result of .count()
operation. What do you think? I am not sure whether to do it this way in all cases or only in case of MultIndex columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mean
is df.sum() / df.count()
(or df.groupby.sum()
/df.groupby.count()
), and size
is just len
so I don't think it would be correct. Each column has an independent count
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it should be count()
instead of size()
in my comment above (I edited it too for clarity). The comment itself was not about using size()
, but was about distinguishing new columns from original without adding a string prefix to their names in a multiindex case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I have a prototype of this here: https://github.com/modin-project/modin/pull/1902/files
result = dfgb.sum(**kwargs) | ||
divisor = dfgb.count() | ||
divisor.set_axis( | ||
["__mean_agg_size_column__" + x for x in divisor.columns], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I would like the prefix to be stored in some variable to simplify its usage
divisor.set_axis( | ||
[x[len("__mean_agg_size_column__") :] for x in divisor.columns], | ||
axis=1, | ||
inplace=True, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The order of columns among dividend and divisor frames should be the same, so can we just do: divisor.columns = divirgent.columns.copy()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Except divisor and divirgent have different sets of columns because divisor also has "by" columns prefixed with "mean_agg_size_column". The following line (with comment above it) makes sure that they are excluded.
# NaNs converted to int64 produce wrong results and have to be handled differently, | ||
# so we have to resort to less efficient approach of broadcast full axis in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does NaNs
converted to int produce wrong results? As far as I know, we can convert NaN
to float
only. Does backward converting of float
-> int
-> datetime
causing precision loss and so incorrect results?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider the following example:
df = pd.DataFrame({"a": [1, 1, 2, 2], "b": [np.datetime64("2001-01-01"), np.NaN, np.datetime64("2001-01-01"), np.datetime64("2002-01-01")]})
Here if we convert column "b" to int64 as df.astype({"b": "int64"})
you can see that NaN in column "b" is converted into value -9223372036854775808
which affects the final sum of values. If NaN was converted into zero, we could use this approach because effectively NaN values would be ignored. We cannot convert into float64 because pandas gives an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what would it take to make this case (or just all datetimelike cases) just use the pandas implementation? https://github.com/pandas-dev/pandas/blob/main/pandas/_libs/groupby.pyx#L721
# Convert all datetime64 types to int64 to allow pandas "sum" to work. | ||
self._df = self._df.astype( | ||
{col: "int64" for col in converted_columns.keys()} | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think that it's pandas-backend specific because of its MapReduce implementation and so should be moved to the QC. Let's wait for other reviewers' opinions though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in pandas this astype would make a copy, so we'd do .view('i8')
instead. is that different in modin?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in pandas this astype would make a copy, so we'd do .view('i8') instead. is that different in modin?
No, astype
creates a copy in Modin, too. Unfortunately, in Modin, Series.view
does not have the same semantics as in pandas. While it creates a dataframe with the correct type, the new series doesn't share the same underlying values as the original one. Not only that, Modin view
also creates a copy of all the data under the hood. I've created #4650 to track the differences in behavior.
@@ -1466,6 +1466,62 @@ def test_agg_exceptions(operation): | |||
eval_aggregation(*create_test_dfs(data), operation=operation) | |||
|
|||
|
|||
@pytest.mark.parametrize("numeric_only", [True, False]) | |||
def test_mean_agg_different_types(numeric_only): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's also add a mean
test to the test_groupby_multiindex
to check whether our intermediate columns renaming works with them
@gshimansky is checking for non-nan columns in order to use a common divisor computed via |
for col, dt in self._query_compiler.dtypes.items(): | ||
if is_datetime64_any_dtype(dt): | ||
if self._df[col].hasnans: | ||
fallback = True | ||
break | ||
else: | ||
converted_columns[col] = dt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in order to minimize backend-API calls I would replace this for-loop with something like this:
datetime_cols = self._df[[col for col, dt in self._df.dtypes.items() if is_datetime64_any_dtype(dt)]]
has_nans_mask = datetime_cols.isna().any()
I've compared these two approaches and it appeared that the proposed one almost always faster:
Micro-benchmark and its results
import modin.pandas as pd
import numpy as np
import timeit
def is_dtype_to_pick(dtype):
return dtype == "float"
def fn1(df):
to_convert = {}
fallback = False
for col, dt in df.dtypes.items():
if is_dtype_to_pick(dt):
if df[col].hasnans:
fallback = True
break
else:
to_convert[col] = dt
def fn2(df):
cols_to_select = [col for col, dt in df.dtypes.items() if is_dtype_to_pick(dt)]
if len(cols_to_select) == 0:
return
selected_df = df[cols_to_select]
fallback = selected_df.isna().any().any()
if not fallback:
to_convert = selected_df.dtypes.items()
NROWS = 1_000_000
NCOLS = 10
df_no_floats = pd.DataFrame({f"col{i}": np.arange(NROWS) for i in range(NCOLS)})
df_has_half_float_no_nans = df_no_floats.astype({col: "float" for col in df_no_floats.columns[:len(df_no_floats.columns) // 2]})
df_has_half_float_nans_first = df_has_half_float_no_nans.copy()
df_has_half_float_nans_first.loc[0, df_has_half_float_nans_first.select_dtypes("float").columns[0]] = np.nan
df_has_half_float_nans_last = df_has_half_float_no_nans.copy()
df_has_half_float_nans_first.loc[0, df_has_half_float_nans_first.select_dtypes("float").columns[-1]] = np.nan
NRUNS = 5
print("df_no_float:")
print("for-loop:", timeit.timeit(lambda: fn1(df_no_floats), number=NRUNS))
print("non-for-loop:", timeit.timeit(lambda: fn2(df_no_floats), number=NRUNS))
print("\ndf_has_half_float_no_nans:")
print("for-loop:", timeit.timeit(lambda: fn1(df_has_half_float_no_nans), number=NRUNS))
print("non-for-loop:", timeit.timeit(lambda: fn2(df_has_half_float_no_nans), number=NRUNS))
print("\ndf_has_half_float_nans_first:")
print("for-loop:", timeit.timeit(lambda: fn1(df_has_half_float_nans_first), number=NRUNS))
print("non-for-loop:", timeit.timeit(lambda: fn2(df_has_half_float_nans_first), number=NRUNS))
print("\ndf_has_half_float_nans_last:")
print("for-loop:", timeit.timeit(lambda: fn1(df_has_half_float_nans_last), number=NRUNS))
print("non-for-loop:", timeit.timeit(lambda: fn2(df_has_half_float_nans_last), number=NRUNS))
Results (it's a sum of all 5 runs, this is how timeit
measures things):
df_no_float:
for-loop: 8.46749753691256e-05
non-for-loop: 6.431201472878456e-05
df_has_half_float_no_nans:
for-loop: 5.488216175988782
non-for-loop: 1.856114284018986
df_has_half_float_nans_first:
for-loop: 1.3251252140034921
non-for-loop: 1.966320100997109
df_has_half_float_nans_last:
for-loop: 5.407154903979972
non-for-loop: 1.9547889120294712
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the datetime_cols = ...
here would make a copy in pandas (not sure about modin). I suggest select_dtypes
@gshimansky, what is the status here? |
No progress so far. Handling all corner cases introduces too much complexity and slows down the overall performance. I am not sure that I should continue to push this PR. |
IIRC, @mvashishtha was digging into it. Would you be able to continue this work? Also, I think @dchigarev is always glad to look at groupby 😄 |
@YarShev I believe @naren-ponder was looking at this. Do you have any updates about it? |
I think we should default to pandas in such cases then. As far as I understand MultiIndex handling is the hardest thing here, we can just call |
MultiIndex doesn't look like a big problem to me. In a comment above I suggest to add an extra layer of MultiIndex which would contain two columns: original df columns and output of What is slowing down the algorithm is handling of NaNs because |
numeric_only = kwargs.get("numeric_only", False) | ||
if not numeric_only: | ||
for col, dt in self._query_compiler.dtypes.items(): | ||
if is_datetime64_any_dtype(dt): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will timedelta64 be affected too?
@dchigarev, would you take a look at this PR? What should we do next, continue or close it? |
we already did the same changes in #4591 |
This PR accelerates mean aggregation on NYC Taxi benchmark from 55 seconds to 6 seconds on my workstation.
What do these changes do?
flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
git commit -s
docs/developer/architecture.rst
is up-to-date