Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PERF-#5369: GroupBy.skew implementation via MapReduce pattern #5318

Merged
merged 9 commits into from
Dec 14, 2022

Conversation

dchigarev
Copy link
Collaborator

@dchigarev dchigarev commented Dec 2, 2022

What do these changes do?

Here are some performance comparisons of .groupby().skew() between the current master and MapReduce implementation. The comparison was run via ASV with our existing groupby scenarios. CPU: 2x 28 Cores (56 threads) Xeon Platinum-8276L @ 2.20

NCPUS=112
       before           after         ratio
     [a77a6464]       [05027610]
     <master>         <skew_impl>
+      1.46±0.04s        7.03±0.3s     4.83  benchmarks.TimeGroupByDefaultAggregations.time_groupby_skew([5000, 5000], ngroups=100)
+      1.85±0.06s       7.59±0.03s     4.09  benchmarks.TimeGroupByMultiColumn.time_groupby_agg_skew([5000, 5000], ngroups=100, by_ncols=6)
+       3.53±0.1s        7.11±0.1s     2.02  benchmarks.TimeGroupByDefaultAggregations.time_groupby_skew([5000, 5000], 'huge_amount_groups')
+      4.18±0.05s        8.30±0.4s     1.98  benchmarks.TimeGroupByMultiColumn.time_groupby_agg_skew([5000, 5000], 'huge_amount_groups', by_ncols=6)
-       4.38±0.1s       3.04±0.07s     0.69  benchmarks.TimeGroupByMultiColumn.time_groupby_agg_skew([1_000_000, 256], 'huge_amount_groups', by_ncols=6)
-       3.86±0.2s       2.60±0.02s     0.67  benchmarks.TimeGroupByDefaultAggregations.time_groupby_skew([1_000_000, 256], 'huge_amount_groups')
-      1.91±0.01s       1.24±0.04s     0.65  benchmarks.TimeGroupByMultiColumn.time_groupby_agg_skew([1_000_000, 256], ngroups=100, by_ncols=6)
-       1.66±0.1s       1.04±0.02s     0.63  benchmarks.TimeGroupByDefaultAggregations.time_groupby_skew([1_000_000, 256], 100)
-      3.33±0.07s       1.20±0.03s     0.36  benchmarks.TimeGroupByMultiColumn.time_groupby_agg_skew([1_000_000, 32], 'huge_amount_groups', by_ncols=6)
-      1.10±0.03s          388±7ms     0.35  benchmarks.TimeGroupByMultiColumn.time_groupby_agg_skew([1_000_000, 32], ngroups=100, by_ncols=6)
-        939±20ms         308±10ms     0.33  benchmarks.TimeGroupByDefaultAggregations.time_groupby_skew([1_000_000, 32], ngroups=100)
-      3.06±0.05s         982±70ms     0.32  benchmarks.TimeGroupByDefaultAggregations.time_groupby_skew([1_000_000, 32], 'huge_amount_groups')
-       10.0±0.2s       1.93±0.07s     0.19  benchmarks.TimeGroupByDefaultAggregations.time_groupby_skew([10_000_000, 32], 'huge_amount_groups')
-       12.2±0.3s        2.02±0.1s     0.17  benchmarks.TimeGroupByMultiColumn.time_groupby_agg_skew([10_000_000, 32], 'huge_amount_groups', by_ncols=6)
-       12.7±0.2s       1.14±0.06s     0.09  benchmarks.TimeGroupByMultiColumn.time_groupby_agg_skew([10_000_000, 32], ngroups=100, by_ncols=6)
-       12.0±0.8s       1.07±0.04s     0.09  benchmarks.TimeGroupByDefaultAggregations.time_groupby_skew([10_000_000, 32], ngroups=100)
NCPUS=16
       before           after         ratio
     [a77a6464]       [bb932f4d]
       <master>       <skew_impl>
+        202±20ms         504±20ms     2.49  benchmarks.TimeGroupByMultiColumn.time_groupby_agg_skew([5000, 5000], ngroups=100, by_ncols=6)
+        217±20ms         424±20ms     1.95  benchmarks.TimeGroupByDefaultAggregations.time_groupby_skew([5000, 5000], ngroups=100)
        1.61±0.1s       1.48±0.01s     0.92  benchmarks.TimeGroupByMultiColumn.time_groupby_agg_skew([1_000_000, 256], ngroups=100, by_ncols=6)
-      1.42±0.04s       1.28±0.01s     0.91  benchmarks.TimeGroupByDefaultAggregations.time_groupby_skew([1_000_000, 256], ngroups=100)
-      1.66±0.04s         906±50ms     0.55  benchmarks.TimeGroupByMultiColumn.time_groupby_agg_skew([5000, 5000], 'huge_amount_groups', by_ncols=6)
-      1.67±0.03s         832±30ms     0.50  benchmarks.TimeGroupByDefaultAggregations.time_groupby_skew([5000, 5000], 'huge_amount_groups')
-      3.47±0.06s       1.68±0.04s     0.48  benchmarks.TimeGroupByDefaultAggregations.time_groupby_skew([1_000_000, 256], 'huge_amount_groups')
-       3.93±0.1s       1.79±0.01s     0.46  benchmarks.TimeGroupByMultiColumn.time_groupby_agg_skew([1_000_000, 256], 'huge_amount_groups', by_ncols=6)
-        782±10ms         219±30ms     0.28  benchmarks.TimeGroupByDefaultAggregations.time_groupby_skew([1_000_000, 32], ngroups=100)
-        888±40ms         226±20ms     0.26  benchmarks.TimeGroupByMultiColumn.time_groupby_agg_skew([1_000_000, 32], ngroups=100, by_ncols=6)
-       9.56±0.3s        1.76±0.1s     0.18  benchmarks.TimeGroupByDefaultAggregations.time_groupby_skew([10_000_000, 32], 'huge_amount_groups')
-       10.5±0.3s        1.79±0.2s     0.17  benchmarks.TimeGroupByMultiColumn.time_groupby_agg_skew([10_000_000, 32], ngroups=100, 6)
-       11.1±0.1s       1.86±0.07s     0.17  benchmarks.TimeGroupByMultiColumn.time_groupby_agg_skew([10_000_000, 32], 'huge_amount_groups', by_ncols=6)
-       11.8±0.2s        1.68±0.1s     0.14  benchmarks.TimeGroupByDefaultAggregations.time_groupby_skew([10_000_000, 32], ngroups=100)
-      2.60±0.08s         345±20ms     0.13  benchmarks.TimeGroupByDefaultAggregations.time_groupby_skew([1_000_000, 32], 'huge_amount_groups')
-      2.91±0.06s         361±10ms     0.12  benchmarks.TimeGroupByMultiColumn.time_groupby_agg_skew([1_000_000, 32], 'huge_amount_groups', by_ncols=6)
How to run this? 1. Add an ASV benchmark for a skew function by applying the following patch:
From bb932f4d3eb58f2c83155aaddcd1d538a5eb879e Mon Sep 17 00:00:00 2001
From: Dmitry Chigarev <dmitry.chigarev@intel.com>
Date: Tue, 6 Dec 2022 15:23:17 -0600
Subject: [PATCH] Skew benchmarks

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
---
 asv_bench/benchmarks/benchmarks.py | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/asv_bench/benchmarks/benchmarks.py b/asv_bench/benchmarks/benchmarks.py
index 3ec5d197..ff8b11d8 100644
--- a/asv_bench/benchmarks/benchmarks.py
+++ b/asv_bench/benchmarks/benchmarks.py
@@ -54,6 +54,7 @@ class BaseTimeGroupBy:
 
 
 class TimeGroupByMultiColumn(BaseTimeGroupBy):
+    timeout = 720
     param_names = ["shape", "ngroups", "groupby_ncols"]
     params = [
         get_benchmark_shapes("TimeGroupByMultiColumn"),
@@ -66,9 +67,13 @@ class TimeGroupByMultiColumn(BaseTimeGroupBy):
 
     def time_groupby_agg_mean(self, *args, **kwargs):
         execute(self.df.groupby(by=self.groupby_columns).apply(lambda df: df.mean()))
+    
+    def time_groupby_agg_skew(self, *args, **kwargs):
+        execute(self.df.groupby(by=self.groupby_columns).skew())
 
 
 class TimeGroupByDefaultAggregations(BaseTimeGroupBy):
+    timeout = 720
     param_names = ["shape", "ngroups"]
     params = [
         get_benchmark_shapes("TimeGroupByDefaultAggregations"),
@@ -86,6 +91,9 @@ class TimeGroupByDefaultAggregations(BaseTimeGroupBy):
 
     def time_groupby_mean(self, *args, **kwargs):
         execute(self.df.groupby(by=self.groupby_columns).mean())
+    
+    def time_groupby_skew(self, *args, **kwargs):
+        execute(self.df.groupby(by=self.groupby_columns).skew())
 
 
 class TimeGroupByDictionaryAggregation(BaseTimeGroupBy):
-- 
2.25.1
  1. Specify custom data shapes for groupby benchmarks by creating the following json file:
{
    "TimeGroupByDefaultAggregations": [[1000000, 32], [10000000, 32], [5000, 5000], [1000000, 256]],
    "TimeGroupByMultiColumn": [[1000000, 32], [10000000, 32], [5000, 5000], [1000000, 256]]
}
  1. Run ASV with the following command substituting the $ASV_CONFIG_PATH with the path to the JSON file created in the previous step:
MODIN_TEST_DATASET_SIZE="Big" MODIN_ASV_DATASIZE_CONFIG=$ASV_CONFIG_PATH asv continuous origin/master skew_impl --launch-method=spawn -b TimeGroupByDefaultAggregations.time_groupby_skew -b TimeGroupByMultiColumn.time_groupby_agg_skew --no-only-changed -a repeat=5

Square-like frames are some kind of anti-pattern for Map and MapReduce implementation in Modin (see optimization notes) so the new implementation is slower than the previous one on these cases. Filled an issue to resolve this problem generally (#5394).

  • first commit message and PR title follow format outlined here

    NOTE: If you edit the PR title to match this format, you need to add another commit (even if it's empty) or amend your last commit for the CI job that checks the PR title to pick up the new PR title.

  • passes flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
  • passes black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
  • signed commit with git commit -s
  • Resolves Implement groupby.skew() via GroupbyReduce pattern #5369
  • test added existing tests for .skew() are passing
  • module layout described at docs/development/architecture.rst is up-to-date

@dchigarev dchigarev changed the title PERF-#0000 GroupBy.skew implementation via MapReduce pattern PERF-#0000: GroupBy.skew implementation via MapReduce pattern Dec 2, 2022
modin/pandas/groupby.py Fixed Show fixed Hide fixed
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
@dchigarev dchigarev changed the title PERF-#0000: GroupBy.skew implementation via MapReduce pattern PERF-#5369: GroupBy.skew implementation via MapReduce pattern Dec 6, 2022
Comment on lines 130 to 133
# Other is a broadcasted partition that represents 'by' data to group on.
# If 'drop' then the 'by' data came from the 'self' frame, thus
# inserting missed columns to the partition to group on them.
if drop or isinstance(other := other.squeeze(axis=1), pandas.DataFrame):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this previously was handled incorrectly which caused 'by' columns to be aggregated (they were dropped afterward so no effect to result), the 'skew' aggregation doesn't tolerate non-numeric columns ('by') to be aggregated thus this correction is required for proper behavior.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was squeeze(axis=axis ^ 1) changed to squeeze(axis=1)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad, reverted to axis ^ 1

this was not alerted by tests because we only support groupby along '0' axis for now

numeric_only=True,
numeric_only=NumericOnly.TRUE_EXCL_NUMERIC_CATEGORIES,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously, native pandas .skew() implementation called by qc.groupby_agg was actually dropping unsuitable categorical columns, now we need to drop them manually

@@ -380,5 +381,48 @@ def _doc_binary_op(operation, bin_op, left="Series", right="right", returns="Ser
return doc_op


class NumericOnly(IntEnum): # noqa: PR01
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's possibly more than one method that doesn't tolerate numeric categories, so decided to add this enum

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
@dchigarev dchigarev marked this pull request as ready for review December 6, 2022 23:29
@dchigarev dchigarev requested a review from a team as a code owner December 6, 2022 23:29
@vnlitvinov
Copy link
Collaborator

Square-like frames are some kind of anti-pattern for Map and MapReduce implementation in Modin (see optimization notes) so the new implementation is slower than the previous one on these cases.

I wonder if it's possible to use old implementation for mostly square dataframes to not introduce such a speed loss...

modin/core/storage_formats/pandas/query_compiler.py Outdated Show resolved Hide resolved
modin/pandas/groupby.py Outdated Show resolved Hide resolved
)

if numeric_only and self.ndim == 2:
if numeric_only > NumericOnly.FALSE and self.ndim == 2:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What?.. I don't think one should really be comparing enums on anything but (in)equality.

If you want to check on classes of enum values, I'd suggest doing IntFlag instead

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that the whole point of IntEnum is to work with them like an integer, e.g. if day > DayEnum.TUESDAY: ... or as in our case with 'numeric only' if tolerance_level > ToleranceEnum.LEVEL2: ....

I've changed the IntEnum to IntFlag as you suggested, though don't see much difference between them for our use-case (even their docstrings from py-docs are almost identical).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not what I meant... an IntFlag enum should be like

class NumericOnly(IntFlag):
    AUTO = 0b000
    FALSE = 0b001
    TRUE = 0b010
    TRUE_EXCL_NUMERIC_CATEGORIES = 0b011

(I've stated the fields in bit notation for readability), and then you check stuff like

if numeric_only & NumericOnly.TRUE:
    # do things

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really want this? this seems much more complicated than a simple > check. I don't get why this is a bad approach since IntEnum API allows this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted the changes introducing NumericOnly enum as it appeared as some kind of a blocker

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not everything which is technically allowed should be done. 🙃

Co-authored-by: Vasily Litvinov <fam1ly.n4me@yandex.ru>
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
@dchigarev
Copy link
Collaborator Author

dchigarev commented Dec 8, 2022

@vnlitvinov

I wonder if it's possible to use old implementation for mostly square dataframes to not introduce such a speed loss...

I've created a separate tracker (#5394) as this perf-issue affects every Map/MapReduce function so there should be a general solution for this.

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
vnlitvinov
vnlitvinov previously approved these changes Dec 12, 2022
Copy link
Collaborator

@vnlitvinov vnlitvinov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Copy link
Collaborator

@anmyachev anmyachev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement groupby.skew() via GroupbyReduce pattern
3 participants