Skip to content

Commit

Permalink
Make SortPreservingMergeStream stable on input stream order (#1687)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Jan 27, 2022
1 parent 2266474 commit 63d24bf
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 1 deletion.
4 changes: 3 additions & 1 deletion datafusion/src/physical_plan/sorts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ impl SortKeyCursor {
}
}

Ok(Ordering::Equal)
// Break ties using stream_idx to ensure a predictable
// ordering of rows when comparing equal streams.
Ok(self.stream_idx.cmp(&other.stream_idx))
}

/// Initialize a collection of comparators for comparing
Expand Down
100 changes: 100 additions & 0 deletions datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,27 @@ use crate::physical_plan::{
/// provided each partition of the input plan is sorted with respect to
/// these sort expressions, this operator will yield a single partition
/// that is also sorted with respect to them
///
/// ```text
/// ┌─────────────────────────┐
/// │ ┌───┬───┬───┬───┐ │
/// │ │ A │ B │ C │ D │ ... │──┐
/// │ └───┴───┴───┴───┘ │ │
/// └─────────────────────────┘ │ ┌───────────────────┐ ┌───────────────────────────────┐
/// Stream 1 │ │ │ │ ┌───┬───╦═══╦───┬───╦═══╗ │
/// ├─▶│SortPreservingMerge│───▶│ │ A │ B ║ B ║ C │ D ║ E ║ ... │
/// │ │ │ │ └───┴─▲─╩═══╩───┴───╩═══╝ │
/// ┌─────────────────────────┐ │ └───────────────────┘ └─┬─────┴───────────────────────┘
/// │ ╔═══╦═══╗ │ │
/// │ ║ B ║ E ║ ... │──┘ │
/// │ ╚═══╩═══╝ │ Note Stable Sort: the merged stream
/// └─────────────────────────┘ places equal rows from stream 1
/// Stream 2
///
///
/// Input Streams Output stream
/// (sorted) (sorted)
/// ```
#[derive(Debug)]
pub struct SortPreservingMergeExec {
/// Input plan
Expand Down Expand Up @@ -1361,4 +1382,83 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_stable_sort() {
let runtime = Arc::new(RuntimeEnv::default());

// Create record batches like:
// batch_number |value
// -------------+------
// 1 | A
// 1 | B
//
// Ensure that the output is in the same order the batches were fed
let partitions: Vec<Vec<RecordBatch>> = (0..10)
.map(|batch_number| {
let batch_number: Int32Array =
vec![Some(batch_number), Some(batch_number)]
.into_iter()
.collect();
let value: StringArray = vec![Some("A"), Some("B")].into_iter().collect();

let batch = RecordBatch::try_from_iter(vec![
("batch_number", Arc::new(batch_number) as ArrayRef),
("value", Arc::new(value) as ArrayRef),
])
.unwrap();

vec![batch]
})
.collect();

let schema = partitions[0][0].schema();

let sort = vec![PhysicalSortExpr {
expr: col("value", &schema).unwrap(),
options: SortOptions {
descending: false,
nulls_first: true,
},
}];

let exec = MemoryExec::try_new(&partitions, schema, None).unwrap();
let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec)));

let collected = collect(merge, runtime).await.unwrap();
assert_eq!(collected.len(), 1);

// Expect the data to be sorted first by "batch_number" (because
// that was the order it was fed in, even though only "value"
// is in the sort key)
assert_batches_eq!(
&[
"+--------------+-------+",
"| batch_number | value |",
"+--------------+-------+",
"| 0 | A |",
"| 1 | A |",
"| 2 | A |",
"| 3 | A |",
"| 4 | A |",
"| 5 | A |",
"| 6 | A |",
"| 7 | A |",
"| 8 | A |",
"| 9 | A |",
"| 0 | B |",
"| 1 | B |",
"| 2 | B |",
"| 3 | B |",
"| 4 | B |",
"| 5 | B |",
"| 6 | B |",
"| 7 | B |",
"| 8 | B |",
"| 9 | B |",
"+--------------+-------+",
],
collected.as_slice()
);
}
}

0 comments on commit 63d24bf

Please sign in to comment.