Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sort preserving merge (#362) #379

Merged
merged 6 commits into from
Jun 1, 2021
Merged

Conversation

tustvold
Copy link
Contributor

@tustvold tustvold commented May 21, 2021

Closes #362.

Creating as draft as currently builds on top of #378 as it uses a partitioned SortExec as part of its tests.

This PR adds a SortPreservingMergeExec operator that allows merging together multiple sorted partitions into a single partition.

The main implementation is contained within SortPreservingMergeStream and SortKeyCursor:

SortKeyCursor provides the ability to compare the sort keys of the next row that could be yielded for each stream, in order to determine which one to yield.

SortPreservingMergeStream maintains a list of SortKeyCursor for each stream and builds up a list of sorted indices identifying rows within these cursors. When it reads the last row of a RecordBatch, it fetches another from the input. Once it has accumulated target_batch_size` row indexes (or exhausted all input streams) it will combine the relevant rows from the buffered RecordBatches into a single RecordBatch, drop any cursors it no longer needs, and yield the batch.

@@ -99,11 +99,11 @@ impl ExecutionPlan for SortExec {

/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
self.input.output_partitioning()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the change from #377

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @tustvold -- I think this is the last missing physical operator we need in DataFusion to start enabling sort based optimizations (e.g. sort-merge-join, etc)

I think this is pretty amazing work -- I am sure there will be more work to optimize this, but I like the overall structure and I think it is looking very cool.

I think we should let at least one other pair of eyes read it carefully so I will hold off on clicking approve until that happens. But from what I can see at this point, this PR is basically ready to go

@@ -113,3 +118,29 @@ fn build_file_list_recurse(
}
Ok(())
}

/// Spawns a task to the tokio threadpool and writes its outputs to the provided mpsc sender
pub(crate) fn spawn_execution(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a nice abstraction (and we can probably use it elsewhere)

Partitioning::UnknownPartitioning(1)
}

fn required_child_distribution(&self) -> Distribution {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eventually (not as part of this PR) we should add something like required_child_sort_order so the operators can report on what sortedness they are assuming.

(true, false) => return Ok(Ordering::Less),
(false, false) => {}
(true, true) => {
// TODO: Building the predicate each time is sub-optimal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I predicate this line will be the bottleneck of this operator.

However, I feel like getting it in and working and then optimizing as a follow on is the correct course of action in this case.

datafusion/src/physical_plan/sort_preserving_merge.rs Outdated Show resolved Hide resolved
"+---+---+-------------------------------+",
"| 1 | | 1970-01-01 00:00:00.000000008 |",
"| 1 | | 1970-01-01 00:00:00.000000008 |",
"| 2 | a | |",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to cover the nulls_first: false case for "c" I think you need several rows here with a tie for a and b, and both a null and non value for c. I didn't see any such cases (though I may have missed it)

Perhaps adding a row like the following would be enough

                "| 7 | b | NULL |",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sort key is just b and c so the lines

"| 7 | b | 1970-01-01 00:00:00.000000006 |",
"| 2 | b |                               |",

test this?

assert_eq!(basic, partition);
}

// Split the provided record batch into multiple batch_size record batches
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a function that we could add to RecordBatch itself? I can file a ticket to do so if you would like

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

#[tokio::test]
async fn test_partition_sort_streaming_input_output() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test covers the case where each input stream has more than one RecordBatch, right (each input partition has three record batches).

Is there any value to another test that has input streams with differing numbers of input batches (I am thinking of an input with 3 partitions: 0 record batches, 1 record batch, and "many" (aka 2 or 3))?

@codecov-commenter
Copy link

codecov-commenter commented May 24, 2021

Codecov Report

Merging #379 (7d3dbc5) into master (3593d1f) will increase coverage by 0.54%.
The diff coverage is 81.62%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #379      +/-   ##
==========================================
+ Coverage   74.85%   75.39%   +0.54%     
==========================================
  Files         146      148       +2     
  Lines       24565    25242     +677     
==========================================
+ Hits        18387    19031     +644     
- Misses       6178     6211      +33     
Impacted Files Coverage Δ
datafusion/src/physical_plan/mod.rs 78.70% <ø> (-4.06%) ⬇️
datafusion/src/physical_plan/common.rs 84.21% <77.77%> (-2.00%) ⬇️
...afusion/src/physical_plan/sort_preserving_merge.rs 81.66% <81.66%> (ø)
datafusion/src/physical_plan/merge.rs 75.00% <100.00%> (+0.71%) ⬆️
datafusion/src/physical_plan/window_functions.rs 85.71% <0.00%> (-3.01%) ⬇️
datafusion/src/scalar.rs 56.19% <0.00%> (-2.13%) ⬇️
ballista/rust/client/src/context.rs 0.00% <0.00%> (ø)
datafusion/src/physical_plan/expressions/mod.rs 71.42% <0.00%> (ø)
...fusion/src/physical_plan/expressions/row_number.rs 81.25% <0.00%> (ø)
... and 17 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 3593d1f...7d3dbc5. Read the comment docs.

/// if all cursors for all streams are exhausted
fn next_stream_idx(&self) -> Result<Option<usize>> {
let mut min_cursor: Option<(usize, &SortKeyCursor)> = None;
for (idx, candidate) in self.cursors.iter().enumerate() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For bigger number of partitions, storing the cursors in a BinaryHeap, sorted by their current item, would be beneficial.

A rust implementation of that approach can be seen in this blog post and the first comment under it. I have implemented the same approach in java before. I agree with @alamb though to make it work first, and then optimize later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great suggestion @jhorstmann -- thank you -- I filed #416 so it is more visible

@alamb alamb mentioned this pull request May 24, 2021
2 tasks
@tustvold tustvold marked this pull request as ready for review May 26, 2021 10:10
@tustvold
Copy link
Contributor Author

Will rebase to remove merges

@alamb
Copy link
Contributor

alamb commented May 26, 2021

This PR appears to need some rebasing / test fixing love:

https://github.com/apache/arrow-datafusion/pull/379/checks?check_run_id=2674096854



---- physical_plan::sort_preserving_merge::tests::test_partition_sort stdout ----
thread 'physical_plan::sort_preserving_merge::tests::test_partition_sort' panicked at 'called `Result::unwrap()` on an `Err` value: Internal("SortExec requires a single input partition")', datafusion/src/physical_plan/sort_preserving_merge.rs:627:47
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace


failures:

@tustvold
Copy link
Contributor Author

Apologies - I stripped out the merge that fixed the logical conflict 🤦

Pushed a commit that fixes it 😄

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR is ready -- thanks again @tustvold

What do you think @Dandandan / @andygrove ? Any objections to merging this (as a step towards a more sorted future in DataFusion)?

/// if all cursors for all streams are exhausted
fn next_stream_idx(&self) -> Result<Option<usize>> {
let mut min_cursor: Option<(usize, &SortKeyCursor)> = None;
for (idx, candidate) in self.cursors.iter().enumerate() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great suggestion @jhorstmann -- thank you -- I filed #416 so it is more visible

@alamb
Copy link
Contributor

alamb commented Jun 1, 2021

I just fixed a merge conflict -- if the tests pass I plan to merge this PR in

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add an Order Preserving merge operator
5 participants