Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Improve materialisation performance of SortPreservingMergeExec #691

Merged
merged 4 commits into from
Jul 8, 2021

Conversation

e-dard
Copy link
Contributor

@e-dard e-dard commented Jul 6, 2021

This PR is part of work I am doing in support of #655. I expect to land some further improvements to this operator, so I don't know if you want to close out #655 with this PR and I can make a new ticket each time, or just keep #655 open.

Rationale

I work on a project (InfluxDB IOx) that exectues an in-memory compaction of collections of record batches using DataFusion. It is very common for input record batches to overlap with each other with respect to some sort key and we are seeing the SortPreservingMergeExec operator eating up a lot of CPU time.

One of the places where it's clear that we can improve performance is when materialising output record batches. The current logic of the operator uses the MutableArray API to copy over rows from operator inputs to the merged output. The extend() call is being called for every single input row, however the API supports batched copies wherein you can specify a start and end row index.

This PR:

  • c352fa3 Adds benchmarks to cover the execution of the SortPreservingMergeExec under some scenarios;
  • d7f76b3 Implements logic to coalesce calls the MutableArray.extend over all contiuguous rows from the same input.
  • 145a5b8 increases test coverage of coalescing logic.

Benchmarks

I have included benchmarks that evaluate performance on two input record batches in a number of scenarios:

  • interleave: the record batches both contain the same data, therefore one row at a time from each input is pushed to the output. This scenarios is the one where this PR will have the least improvement because there isn't much to coalesce.
  • merge_batches_some_overlap: the record batches contain similar data, however the rows need to be merged in a way where for n output rows in a batch approximately 1/3 rows first come from input A all at once, then 1/3 of the rows are effectively the same in both inputs resulting in many calls to extend then a 1/3 of the rows comes from input B.
  • merge_batches_no_overlap: in these cases both record batches contain data for the same sort keys but neither of the rows overlap for each unique sort key. That is, for a given unique sort key all rows from input A need to be sent to the output, and then all rows from input B need to be sent to the output.
  • merge_batches_small_into_large: in this case the two inputs share similar data but one input is significantly smaller than the other. This is a particularly realistic scenario for our use-case where we are compacting smaller record batches into larger ones.

Here are the results using critcmp

group                               master                                 pr
-----                               ------                                 --
interleave_batches                  1.04   637.5±51.84µs        ? ?/sec    1.00   615.5±12.13µs
merge_batches_no_overlap_large      1.12    454.9±2.90µs        ? ?/sec    1.00   404.9±10.94µs
merge_batches_no_overlap_small      1.14    485.1±6.67µs        ? ?/sec    1.00    425.7±9.33µs 
merge_batches_small_into_large      1.14    263.0±8.85µs        ? ?/sec    1.00    229.7±5.23µs  
merge_batches_some_overlap_large    1.05    532.5±8.33µs        ? ?/sec    1.00   508.3±14.24µs
merge_batches_some_overlap_small    1.06   546.9±12.82µs        ? ?/sec    1.00   516.9±13.20µs

The benchmarks show that in the case where this PR is likely to make the least difference, the PR's SortPreservingMergeExec is now about ~3% faster.

In the case where the PR is likely to make the most difference (when for each set of rows that need to be merged in order, you can just call extend in input A and then on input B) the PR's SortPreservingMergeExec is now about 12% faster.

Note: critcmp reports how much slower things are on master hence the slightly different numbers in the benchmark results.

@github-actions github-actions bot added the datafusion Changes in the datafusion crate label Jul 6, 2021
@alamb
Copy link
Contributor

alamb commented Jul 6, 2021

I think keeping #655 open is fine

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @e-dard -- I have reviewed the tests and code carefully 👍

current_rows_per_sort_key += 1;
}

col_a.sort();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This data is only sorted on a correct? In other words, the SortPreservingMerge operator will only see sorted input data if the sort key is ["a"], but the benchmark is using ["a", "b", "c", "d"] as the sort key. Or perhaps I am mis reading this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I sorted the columns independently because it was a bit simpler than invoking a lex sort on them and technically the rows are still sorted by a, b, c, d, it's just that the values for each column in the sort key change in unison.

For example to create two inputs where the sort keys don't overlap the test data might look like:

Input A:

+------+------+------+----+
| a    | b    | c    | d  |
+------+------+------+----+
| a-0  | b-0  | c-0  | 0  |
| a-0  | b-0  | c-0  | 1  |
| a-0  | b-0  | c-0  | 2  |
| a-0  | b-0  | c-0  | 3  |
| a-0  | b-0  | c-0  | 4  |
| a-0  | b-0  | c-0  | 5  |
| a-0  | b-0  | c-0  | 6  |
| a-0  | b-0  | c-0  | 7  |
| a-0  | b-0  | c-0  | 8  |
| a-0  | b-0  | c-0  | 9  |
| a-10 | b-10 | c-10 | 10 |
| a-10 | b-10 | c-10 | 11 |
| a-10 | b-10 | c-10 | 12 |
| a-10 | b-10 | c-10 | 13 |
| a-10 | b-10 | c-10 | 14 |
| a-10 | b-10 | c-10 | 15 |
| a-10 | b-10 | c-10 | 16 |
| a-10 | b-10 | c-10 | 17 |
| a-10 | b-10 | c-10 | 18 |
| a-10 | b-10 | c-10 | 19 |

Then using a batch_offset = 10 the second input would be generated as:

+------+------+------+----+
| a    | b    | c    | d  |
+------+------+------+----+
| a-0   | b-0   | c-0   | 5    |
| a-0   | b-0   | c-0   | 6    |
| a-0   | b-0   | c-0   | 7    |
| a-0   | b-0   | c-0   | 8    |
| a-0   | b-0   | c-0   | 9    |
| a-0   | b-0   | c-0   | 10   |
| a-0   | b-0   | c-0   | 11   |
| a-0   | b-0   | c-0   | 12   |
| a-0   | b-0   | c-0   | 13   |
| a-0   | b-0   | c-0   | 14   |
| a-10  | b-10  | c-10  | 15   |
| a-10  | b-10  | c-10  | 16   |
| a-10  | b-10  | c-10  | 17   |
| a-10  | b-10  | c-10  | 18   |
| a-10  | b-10  | c-10  | 19   |
| a-10  | b-10  | c-10  | 20   |
| a-10  | b-10  | c-10  | 21   |
| a-10  | b-10  | c-10  | 22   |
| a-10  | b-10  | c-10  | 23   |
| a-10  | b-10  | c-10  | 24   |

In this case the operator would take the first five rows from input A then have to alternate taking the next five rows from input A and input B (for rows where d = [5, 9]. Then finally take all the rows from input B.

Then it would move back to input A for the row containing | a-10 | b-10 | c-10 | 10 | and do the same thing again.

The idea here is that in main right now merging the above data would involve (I think) 40 calls to extend() but this PR should make it so it only involves 2 * (1 + 10 + 1) = 24 calls.

.collect::<Vec<_>>();

let exec = MemoryExec::try_new(
&batches.into_iter().map(|rb| vec![rb]).collect::<Vec<_>>(),
Copy link
Contributor

@Dandandan Dandandan Jul 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this have the same amount of partitions as batches here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a more realistic scenario, we should have some 8-100 partitions with many batches each

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want me to add more benchmarks in this PR? Or alternatively I could extend the benchmarks if I find any other performance improvements in a future one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point I tried to make is that a test with as many partitions as batches is not a normal workload. So I would change the benchmark here to e.g. test something like 8 partitions with 100 batches each (or something like this).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But currently I am seeing only one or two batches in the benchmark? So for now it's not a big deal, this could be covered in a future PR.

The `SortPreservingMergeStream` operator merges two streams together by creating an output record batch that is build from the contents of the input. Previously each row of input would be pushed into the output sink even if though the API supports pushing batches of rows.

This commit implements the logic to push batches of rows from inputs where possible.

Performance benchmarks show an improvement of between 3-12%.

```
group                               master                                 pr
-----                               ------                                 --
interleave_batches                  1.04   637.5±51.84µs        ? ?/sec    1.00   615.5±12.13µs        ? ?/sec
merge_batches_no_overlap_large      1.12    454.9±2.90µs        ? ?/sec    1.00   404.9±10.94µs        ? ?/sec
merge_batches_no_overlap_small      1.14    485.1±6.67µs        ? ?/sec    1.00    425.7±9.33µs        ? ?/sec
merge_batches_small_into_large      1.14    263.0±8.85µs        ? ?/sec    1.00    229.7±5.23µs        ? ?/sec
merge_batches_some_overlap_large    1.05    532.5±8.33µs        ? ?/sec    1.00   508.3±14.24µs        ? ?/sec
merge_batches_some_overlap_small    1.06   546.9±12.82µs        ? ?/sec    1.00   516.9±13.20µs        ? ?/sec
```
@e-dard e-dard force-pushed the er/perf/sort_preserve_merge branch from 9d31dde to 5f905c2 Compare July 7, 2021 16:19
@alamb
Copy link
Contributor

alamb commented Jul 7, 2021

Let us know if you would like any more changes to this PR @Dandandan -- I think it is ready to go from my end

@alamb alamb merged commit 024bd89 into apache:master Jul 8, 2021
@houqp houqp added the performance Make DataFusion faster label Jul 31, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate performance Make DataFusion faster
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants