Skip to content

Commit

Permalink
Minor: Assert streaming_merge has non empty sort exprs (#7795)
Browse files Browse the repository at this point in the history
* Minor: Assert streaming_merge has non empty sort exprs

* clippy

* Add test

* Apply suggestions from code review

Co-authored-by: jakevin <jakevingoo@gmail.com>
Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>

---------

Co-authored-by: jakevin <jakevingoo@gmail.com>
Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
  • Loading branch information
3 people authored Oct 16, 2023
1 parent e84b999 commit 7aa6b36
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 7 deletions.
18 changes: 13 additions & 5 deletions datafusion/physical-plan/src/sorts/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,17 @@ impl std::fmt::Debug for RowCursor {

impl RowCursor {
/// Create a new SortKeyCursor from `rows` and a `reservation`
/// that tracks its memory.
/// that tracks its memory. There must be at least one row
///
/// Panic's if the reservation is not for exactly `rows.size()`
/// bytes
/// Panics if the reservation is not for exactly `rows.size()`
/// bytes or if `rows` is empty.
pub fn new(rows: Rows, reservation: MemoryReservation) -> Self {
assert_eq!(
rows.size(),
reservation.size(),
"memory reservation mismatch"
);
assert!(rows.num_rows() > 0);
Self {
cur_row: 0,
num_rows: rows.num_rows(),
Expand Down Expand Up @@ -92,7 +93,10 @@ impl Ord for RowCursor {
}
}

/// A cursor into a sorted batch of rows
/// A cursor into a sorted batch of rows.
///
/// Each cursor must have at least one row so `advance` can be called at least
/// once prior to calling `is_finished`.
pub trait Cursor: Ord {
/// Returns true if there are no more rows in this cursor
fn is_finished(&self) -> bool;
Expand Down Expand Up @@ -207,8 +211,12 @@ pub struct FieldCursor<T: FieldValues> {
}

impl<T: FieldValues> FieldCursor<T> {
/// Create a new [`FieldCursor`] from the provided `values` sorted according to `options`
/// Create a new [`FieldCursor`] from the provided `values` sorted according
/// to `options`.
///
/// Panics if the array is empty
pub fn new<A: FieldArray<Values = T>>(options: SortOptions, array: &A) -> Self {
assert!(array.len() > 0, "Empty array passed to FieldCursor");
let null_threshold = match options.nulls_first {
true => array.null_count(),
false => array.len() - array.null_count(),
Expand Down
21 changes: 20 additions & 1 deletion datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ mod tests {
use crate::test::{self, assert_is_pending, make_partition};
use crate::{collect, common};
use arrow::array::{Int32Array, StringArray, TimestampNanosecondArray};
use datafusion_common::assert_batches_eq;
use datafusion_common::{assert_batches_eq, assert_contains};

use super::*;

Expand Down Expand Up @@ -342,6 +342,25 @@ mod tests {
.await;
}

#[tokio::test]
async fn test_merge_no_exprs() {
let task_ctx = Arc::new(TaskContext::default());
let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 9, 3]));
let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();

let schema = batch.schema();
let sort = vec![]; // no sort expressions
let exec = MemoryExec::try_new(&[vec![batch.clone()], vec![batch]], schema, None)
.unwrap();
let merge = Arc::new(SortPreservingMergeExec::new(sort, Arc::new(exec)));

let res = collect(merge, task_ctx).await.unwrap_err();
assert_contains!(
res.to_string(),
"Internal error: Sort expressions cannot be empty for streaming merge"
);
}

#[tokio::test]
async fn test_merge_some_overlap() {
let task_ctx = Arc::new(TaskContext::default());
Expand Down
7 changes: 6 additions & 1 deletion datafusion/physical-plan/src/sorts/streaming_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::sorts::{
use crate::{PhysicalSortExpr, SendableRecordBatchStream};
use arrow::datatypes::{DataType, SchemaRef};
use arrow_array::*;
use datafusion_common::Result;
use datafusion_common::{internal_err, DataFusionError, Result};
use datafusion_execution::memory_pool::MemoryReservation;

macro_rules! primitive_merge_helper {
Expand Down Expand Up @@ -60,6 +60,11 @@ pub fn streaming_merge(
fetch: Option<usize>,
reservation: MemoryReservation,
) -> Result<SendableRecordBatchStream> {
// If there are no sort expressions, preserving the order
// doesn't mean anything (and result in infinite loops)
if expressions.is_empty() {
return internal_err!("Sort expressions cannot be empty for streaming merge");
}
// Special case single column comparisons with optimized cursor implementations
if expressions.len() == 1 {
let sort = expressions[0].clone();
Expand Down

0 comments on commit 7aa6b36

Please sign in to comment.