From e784be0752292aed4c89a9a1ef2f957e14d4d0d9 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 18 Oct 2023 12:16:40 +0100 Subject: [PATCH 1/5] Decouple cursor storage --- datafusion/physical-plan/src/sorts/cursor.rs | 281 +++++++++---------- datafusion/physical-plan/src/sorts/merge.rs | 14 +- datafusion/physical-plan/src/sorts/stream.rs | 14 +- 3 files changed, 144 insertions(+), 165 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/cursor.rs b/datafusion/physical-plan/src/sorts/cursor.rs index 1ca41d4fe21c..d362aad1a561 100644 --- a/datafusion/physical-plan/src/sorts/cursor.rs +++ b/datafusion/physical-plan/src/sorts/cursor.rs @@ -20,124 +20,118 @@ use std::cmp::Ordering; use arrow::buffer::ScalarBuffer; use arrow::compute::SortOptions; use arrow::datatypes::ArrowNativeTypeOp; -use arrow::row::{Row, Rows}; +use arrow::row::Rows; use arrow_array::types::ByteArrayType; -use arrow_array::{Array, ArrowPrimitiveType, GenericByteArray, PrimitiveArray}; +use arrow_array::{ + Array, ArrowPrimitiveType, GenericByteArray, OffsetSizeTrait, PrimitiveArray, +}; +use arrow_buffer::{Buffer, OffsetBuffer}; use datafusion_execution::memory_pool::MemoryReservation; -/// A [`Cursor`] for [`Rows`] -pub struct RowCursor { - cur_row: usize, - num_rows: usize, +/// A comparable collection of values for use with [`Cursor`] +pub trait CursorValues { + fn len(&self) -> usize; - rows: Rows, + fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool; - /// Tracks for the memory used by in the `Rows` of this - /// cursor. Freed on drop - #[allow(dead_code)] - reservation: MemoryReservation, + fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering; } -impl std::fmt::Debug for RowCursor { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - f.debug_struct("SortKeyCursor") - .field("cur_row", &self.cur_row) - .field("num_rows", &self.num_rows) - .finish() - } +/// A comparable cursor, used by sort operations +#[derive(Debug)] +pub struct Cursor { + offset: usize, + values: T, } -impl RowCursor { - /// Create a new SortKeyCursor from `rows` and a `reservation` - /// that tracks its memory. There must be at least one row - /// - /// Panics if the reservation is not for exactly `rows.size()` - /// bytes or if `rows` is empty. - pub fn new(rows: Rows, reservation: MemoryReservation) -> Self { - assert_eq!( - rows.size(), - reservation.size(), - "memory reservation mismatch" - ); - assert!(rows.num_rows() > 0); - Self { - cur_row: 0, - num_rows: rows.num_rows(), - rows, - reservation, - } +impl Cursor { + /// Create a [`Cursor`] from the given [`CursorValues`] + pub fn new(values: T) -> Self { + Self { offset: 0, values } } - /// Returns the current row - fn current(&self) -> Row<'_> { - self.rows.row(self.cur_row) + /// Returns true if there are no more rows in this cursor + pub fn is_finished(&self) -> bool { + self.offset == self.values.len() + } + + /// Advance the cursor, returning the previous row index + pub fn advance(&mut self) -> usize { + let t = self.offset; + self.offset += 1; + t } } -impl PartialEq for RowCursor { +impl PartialEq for Cursor { fn eq(&self, other: &Self) -> bool { - self.current() == other.current() + T::eq(&self.values, self.offset, &other.values, other.offset) } } -impl Eq for RowCursor {} +impl Eq for Cursor {} -impl PartialOrd for RowCursor { +impl PartialOrd for Cursor { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } -impl Ord for RowCursor { +impl Ord for Cursor { fn cmp(&self, other: &Self) -> Ordering { - self.current().cmp(&other.current()) + T::compare(&self.values, self.offset, &other.values, other.offset) } } -/// A cursor into a sorted batch of rows. -/// -/// Each cursor must have at least one row so `advance` can be called at least -/// once prior to calling `is_finished`. -pub trait Cursor: Ord { - /// Returns true if there are no more rows in this cursor - fn is_finished(&self) -> bool; +/// Implements [`CursorValues`] for [`Rows`] +#[derive(Debug)] +pub struct CursorRows { + rows: Rows, - /// Advance the cursor, returning the previous row index - fn advance(&mut self) -> usize; + /// Tracks for the memory used by in the `Rows` of this + /// cursor. Freed on drop + #[allow(dead_code)] + reservation: MemoryReservation, +} + +impl CursorRows { + /// Create a new [`CursorRows`] from `rows` and a `reservation` + /// that tracks its memory. There must be at least one row + /// + /// Panics if the reservation is not for exactly `rows.size()` + /// bytes or if `rows` is empty. + pub fn new(rows: Rows, reservation: MemoryReservation) -> Self { + assert_eq!( + rows.size(), + reservation.size(), + "memory reservation mismatch" + ); + assert!(rows.num_rows() > 0); + Self { rows, reservation } + } } -impl Cursor for RowCursor { - #[inline] - fn is_finished(&self) -> bool { - self.num_rows == self.cur_row +impl CursorValues for CursorRows { + fn len(&self) -> usize { + self.rows.num_rows() } - #[inline] - fn advance(&mut self) -> usize { - let t = self.cur_row; - self.cur_row += 1; - t + fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool { + l.rows.row(l_idx) == r.rows.row(r_idx) + } + + fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering { + l.rows.row(l_idx).cmp(&r.rows.row(r_idx)) } } /// An [`Array`] that can be converted into [`FieldValues`] pub trait FieldArray: Array + 'static { - type Values: FieldValues; + type Values: CursorValues; fn values(&self) -> Self::Values; } -/// A comparable set of non-nullable values -pub trait FieldValues { - type Value: ?Sized; - - fn len(&self) -> usize; - - fn compare(a: &Self::Value, b: &Self::Value) -> Ordering; - - fn value(&self, idx: usize) -> &Self::Value; -} - impl FieldArray for PrimitiveArray { type Values = PrimitiveValues; @@ -149,70 +143,73 @@ impl FieldArray for PrimitiveArray { #[derive(Debug)] pub struct PrimitiveValues(ScalarBuffer); -impl FieldValues for PrimitiveValues { - type Value = T; - +impl CursorValues for PrimitiveValues { fn len(&self) -> usize { self.0.len() } - #[inline] - fn compare(a: &Self::Value, b: &Self::Value) -> Ordering { - T::compare(*a, *b) + fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool { + l.0[l_idx].is_eq(r.0[r_idx]) } - #[inline] - fn value(&self, idx: usize) -> &Self::Value { - &self.0[idx] + fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering { + l.0[l_idx].compare(r.0[r_idx]) } } -impl FieldArray for GenericByteArray { - type Values = Self; +pub struct ByteArrayValues { + offsets: OffsetBuffer, + values: Buffer, +} - fn values(&self) -> Self::Values { - // Once https://github.com/apache/arrow-rs/pull/4048 is released - // Could potentially destructure array into buffers to reduce codegen, - // in a similar vein to what is done for PrimitiveArray - self.clone() +impl ByteArrayValues { + fn value(&self, idx: usize) -> &[u8] { + let end = self.offsets[idx + 1].as_usize(); + let start = self.offsets[idx].as_usize(); + // Safety: offsets are valid + unsafe { self.values.get_unchecked(start..end) } } } -impl FieldValues for GenericByteArray { - type Value = T::Native; - +impl CursorValues for ByteArrayValues { fn len(&self) -> usize { - Array::len(self) + self.offsets.len() - 1 } - #[inline] - fn compare(a: &Self::Value, b: &Self::Value) -> Ordering { - let a: &[u8] = a.as_ref(); - let b: &[u8] = b.as_ref(); - a.cmp(b) + fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool { + l.value(l_idx) == r.value(r_idx) } - #[inline] - fn value(&self, idx: usize) -> &Self::Value { - self.value(idx) + fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering { + l.value(l_idx).cmp(r.value(r_idx)) } } -/// A cursor over sorted, nullable [`FieldValues`] +impl FieldArray for GenericByteArray { + type Values = ByteArrayValues; + + fn values(&self) -> Self::Values { + ByteArrayValues { + offsets: self.offsets().clone(), + values: self.values().clone(), + } + } +} + +/// A collection of sorted, nullable [`FieldArray`] /// /// Note: comparing cursors with different `SortOptions` will yield an arbitrary ordering #[derive(Debug)] -pub struct FieldCursor { +pub struct ArrayValues { values: T, - offset: usize, // If nulls first, the first non-null index // Otherwise, the first null index null_threshold: usize, options: SortOptions, } -impl FieldCursor { - /// Create a new [`FieldCursor`] from the provided `values` sorted according +impl ArrayValues { + /// Create a new [`ArrayValues`] from the provided `values` sorted according /// to `options`. /// /// Panics if the array is empty @@ -225,67 +222,48 @@ impl FieldCursor { Self { values: array.values(), - offset: 0, null_threshold, options, } } - fn is_null(&self) -> bool { - (self.offset < self.null_threshold) == self.options.nulls_first + fn is_null(&self, idx: usize) -> bool { + (idx < self.null_threshold) == self.options.nulls_first } } -impl PartialEq for FieldCursor { - fn eq(&self, other: &Self) -> bool { - self.cmp(other).is_eq() +impl CursorValues for ArrayValues { + fn len(&self) -> usize { + self.values.len() } -} -impl Eq for FieldCursor {} -impl PartialOrd for FieldCursor { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) + fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool { + match (l.is_null(l_idx), r.is_null(r_idx)) { + (true, true) => true, + (false, false) => T::eq(&l.values, l_idx, &r.values, r_idx), + _ => false, + } } -} -impl Ord for FieldCursor { - fn cmp(&self, other: &Self) -> Ordering { - match (self.is_null(), other.is_null()) { + fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering { + match (l.is_null(l_idx), r.is_null(r_idx)) { (true, true) => Ordering::Equal, - (true, false) => match self.options.nulls_first { + (true, false) => match l.options.nulls_first { true => Ordering::Less, false => Ordering::Greater, }, - (false, true) => match self.options.nulls_first { + (false, true) => match l.options.nulls_first { true => Ordering::Greater, false => Ordering::Less, }, - (false, false) => { - let s_v = self.values.value(self.offset); - let o_v = other.values.value(other.offset); - - match self.options.descending { - true => T::compare(o_v, s_v), - false => T::compare(s_v, o_v), - } - } + (false, false) => match l.options.descending { + true => T::compare(&r.values, r_idx, &l.values, l_idx), + false => T::compare(&l.values, l_idx, &r.values, r_idx), + }, } } } -impl Cursor for FieldCursor { - fn is_finished(&self) -> bool { - self.offset == self.values.len() - } - - fn advance(&mut self) -> usize { - let t = self.offset; - self.offset += 1; - t - } -} - #[cfg(test)] mod tests { use super::*; @@ -294,18 +272,19 @@ mod tests { options: SortOptions, values: ScalarBuffer, null_count: usize, - ) -> FieldCursor> { + ) -> Cursor>> { let null_threshold = match options.nulls_first { true => null_count, false => values.len() - null_count, }; - FieldCursor { - offset: 0, + let values = ArrayValues { values: PrimitiveValues(values), null_threshold, options, - } + }; + + Cursor::new(values) } #[test] diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index e60baf2cd806..1ec6cbd0fe8e 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -20,7 +20,7 @@ use crate::metrics::BaselineMetrics; use crate::sorts::builder::BatchBuilder; -use crate::sorts::cursor::Cursor; +use crate::sorts::cursor::{Cursor, CursorValues}; use crate::sorts::stream::PartitionedStream; use crate::RecordBatchStream; use arrow::datatypes::SchemaRef; @@ -35,7 +35,7 @@ use std::task::{ready, Context, Poll}; type CursorStream = Box>>; #[derive(Debug)] -pub(crate) struct SortPreservingMergeStream { +pub(crate) struct SortPreservingMergeStream { in_progress: BatchBuilder, /// The sorted input streams to merge together @@ -89,7 +89,7 @@ pub(crate) struct SortPreservingMergeStream { batch_size: usize, /// Vector that holds cursors for each non-exhausted input partition - cursors: Vec>, + cursors: Vec>>, /// Optional number of rows to fetch fetch: Option, @@ -98,7 +98,7 @@ pub(crate) struct SortPreservingMergeStream { produced: usize, } -impl SortPreservingMergeStream { +impl SortPreservingMergeStream { pub(crate) fn new( streams: CursorStream, schema: SchemaRef, @@ -140,7 +140,7 @@ impl SortPreservingMergeStream { None => Poll::Ready(Ok(())), Some(Err(e)) => Poll::Ready(Err(e)), Some(Ok((cursor, batch))) => { - self.cursors[idx] = Some(cursor); + self.cursors[idx] = Some(Cursor::new(cursor)); Poll::Ready(self.in_progress.push_batch(idx, batch)) } } @@ -310,7 +310,7 @@ impl SortPreservingMergeStream { } } -impl Stream for SortPreservingMergeStream { +impl Stream for SortPreservingMergeStream { type Item = Result; fn poll_next( @@ -322,7 +322,7 @@ impl Stream for SortPreservingMergeStream { } } -impl RecordBatchStream for SortPreservingMergeStream { +impl RecordBatchStream for SortPreservingMergeStream { fn schema(&self) -> SchemaRef { self.in_progress.schema().clone() } diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index a7f9e7380c47..321d6e63cdc1 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::sorts::cursor::{FieldArray, FieldCursor, RowCursor}; +use crate::sorts::cursor::{ArrayValues, CursorRows, FieldArray}; use crate::SendableRecordBatchStream; use crate::{PhysicalExpr, PhysicalSortExpr}; use arrow::array::Array; @@ -114,7 +114,7 @@ impl RowCursorStream { }) } - fn convert_batch(&mut self, batch: &RecordBatch) -> Result { + fn convert_batch(&mut self, batch: &RecordBatch) -> Result { let cols = self .column_expressions .iter() @@ -127,12 +127,12 @@ impl RowCursorStream { // track the memory in the newly created Rows. let mut rows_reservation = self.reservation.new_empty(); rows_reservation.try_grow(rows.size())?; - Ok(RowCursor::new(rows, rows_reservation)) + Ok(CursorRows::new(rows, rows_reservation)) } } impl PartitionedStream for RowCursorStream { - type Output = Result<(RowCursor, RecordBatch)>; + type Output = Result<(CursorRows, RecordBatch)>; fn partitions(&self) -> usize { self.streams.0.len() @@ -179,16 +179,16 @@ impl FieldCursorStream { } } - fn convert_batch(&mut self, batch: &RecordBatch) -> Result> { + fn convert_batch(&mut self, batch: &RecordBatch) -> Result> { let value = self.sort.expr.evaluate(batch)?; let array = value.into_array(batch.num_rows()); let array = array.as_any().downcast_ref::().expect("field values"); - Ok(FieldCursor::new(self.sort.options, array)) + Ok(ArrayValues::new(self.sort.options, array)) } } impl PartitionedStream for FieldCursorStream { - type Output = Result<(FieldCursor, RecordBatch)>; + type Output = Result<(ArrayValues, RecordBatch)>; fn partitions(&self) -> usize { self.streams.0.len() From 22961b219c63f9315e03f1d9d70d5da80d9a055a Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 18 Oct 2023 12:53:56 +0100 Subject: [PATCH 2/5] Fix doc --- datafusion/physical-plan/src/sorts/cursor.rs | 10 +++++----- datafusion/physical-plan/src/sorts/stream.rs | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/cursor.rs b/datafusion/physical-plan/src/sorts/cursor.rs index d362aad1a561..f63e41d3c254 100644 --- a/datafusion/physical-plan/src/sorts/cursor.rs +++ b/datafusion/physical-plan/src/sorts/cursor.rs @@ -85,7 +85,7 @@ impl Ord for Cursor { /// Implements [`CursorValues`] for [`Rows`] #[derive(Debug)] -pub struct CursorRows { +pub struct RowValues { rows: Rows, /// Tracks for the memory used by in the `Rows` of this @@ -94,8 +94,8 @@ pub struct CursorRows { reservation: MemoryReservation, } -impl CursorRows { - /// Create a new [`CursorRows`] from `rows` and a `reservation` +impl RowValues { + /// Create a new [`RowValues`] from `rows` and a `reservation` /// that tracks its memory. There must be at least one row /// /// Panics if the reservation is not for exactly `rows.size()` @@ -111,7 +111,7 @@ impl CursorRows { } } -impl CursorValues for CursorRows { +impl CursorValues for RowValues { fn len(&self) -> usize { self.rows.num_rows() } @@ -125,7 +125,7 @@ impl CursorValues for CursorRows { } } -/// An [`Array`] that can be converted into [`FieldValues`] +/// An [`Array`] that can be converted into [`CursorValues`] pub trait FieldArray: Array + 'static { type Values: CursorValues; diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index 321d6e63cdc1..2a5bbaeea6b0 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::sorts::cursor::{ArrayValues, CursorRows, FieldArray}; +use crate::sorts::cursor::{ArrayValues, RowValues, FieldArray}; use crate::SendableRecordBatchStream; use crate::{PhysicalExpr, PhysicalSortExpr}; use arrow::array::Array; @@ -76,7 +76,7 @@ impl FusedStreams { } /// A [`PartitionedStream`] that wraps a set of [`SendableRecordBatchStream`] -/// and computes [`RowCursor`] based on the provided [`PhysicalSortExpr`] +/// and computes [`RowValues`] based on the provided [`PhysicalSortExpr`] #[derive(Debug)] pub struct RowCursorStream { /// Converter to convert output of physical expressions @@ -114,7 +114,7 @@ impl RowCursorStream { }) } - fn convert_batch(&mut self, batch: &RecordBatch) -> Result { + fn convert_batch(&mut self, batch: &RecordBatch) -> Result { let cols = self .column_expressions .iter() @@ -127,12 +127,12 @@ impl RowCursorStream { // track the memory in the newly created Rows. let mut rows_reservation = self.reservation.new_empty(); rows_reservation.try_grow(rows.size())?; - Ok(CursorRows::new(rows, rows_reservation)) + Ok(RowValues::new(rows, rows_reservation)) } } impl PartitionedStream for RowCursorStream { - type Output = Result<(CursorRows, RecordBatch)>; + type Output = Result<(RowValues, RecordBatch)>; fn partitions(&self) -> usize { self.streams.0.len() From 26266d90b6c452317d0a2ad60adb6a6743063d62 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 18 Oct 2023 12:57:51 +0100 Subject: [PATCH 3/5] Format --- datafusion/physical-plan/src/sorts/stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index 2a5bbaeea6b0..1535765d86a3 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::sorts::cursor::{ArrayValues, RowValues, FieldArray}; +use crate::sorts::cursor::{ArrayValues, FieldArray, RowValues}; use crate::SendableRecordBatchStream; use crate::{PhysicalExpr, PhysicalSortExpr}; use arrow::array::Array; From eb19ff0e50315b10a330bf44378be25124743544 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 19 Oct 2023 19:34:26 +0100 Subject: [PATCH 4/5] Review feedback --- datafusion/physical-plan/src/sorts/cursor.rs | 46 +++++++++++++++++--- datafusion/physical-plan/src/sorts/merge.rs | 2 +- datafusion/physical-plan/src/sorts/stream.rs | 10 ++--- 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/cursor.rs b/datafusion/physical-plan/src/sorts/cursor.rs index f63e41d3c254..3c3989cc1a52 100644 --- a/datafusion/physical-plan/src/sorts/cursor.rs +++ b/datafusion/physical-plan/src/sorts/cursor.rs @@ -29,15 +29,49 @@ use arrow_buffer::{Buffer, OffsetBuffer}; use datafusion_execution::memory_pool::MemoryReservation; /// A comparable collection of values for use with [`Cursor`] +/// +/// This is a trait as there are several specialized implementations, such as for +/// single columns or for normalized multi column keys ([`Rows`]) pub trait CursorValues { fn len(&self) -> usize; + /// Returns true if `l[l_idx] == r[r_idx]` fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool; + /// Returns comparison of `l[l_idx]` and `r[r_idx]` fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering; } /// A comparable cursor, used by sort operations +/// +/// A `Cursor` is a pointer into a collection of rows, stored in +/// [`CursorValues`] +/// +/// ```text +/// +/// ┌───────────────────────┐ +/// │ │ ┌──────────────────────┐ +/// │ ┌─────────┐ ┌─────┐ │ ─ ─ ─ ─│ Cursor │ +/// │ │ 1 │ │ A │ │ │ └──────────────────────┘ +/// │ ├─────────┤ ├─────┤ │ +/// │ │ 2 │ │ A │◀─ ┼ ─ ┘ Cursor tracks an +/// │ └─────────┘ └─────┘ │ offset within a +/// │ ... ... │ CursorValues +/// │ │ +/// │ ┌─────────┐ ┌─────┐ │ +/// │ │ 3 │ │ E │ │ +/// │ └─────────┘ └─────┘ │ +/// │ │ +/// │ CursorValues │ +/// └───────────────────────┘ +/// +/// +/// Store logical rows using +/// one of several formats, +/// with specialized +/// implementations +/// depending on the column +/// types #[derive(Debug)] pub struct Cursor { offset: usize, @@ -84,6 +118,8 @@ impl Ord for Cursor { } /// Implements [`CursorValues`] for [`Rows`] +/// +/// Used for sorting when there are multiple columns in the sort key #[derive(Debug)] pub struct RowValues { rows: Rows, @@ -126,13 +162,13 @@ impl CursorValues for RowValues { } /// An [`Array`] that can be converted into [`CursorValues`] -pub trait FieldArray: Array + 'static { +pub trait CursorArray: Array + 'static { type Values: CursorValues; fn values(&self) -> Self::Values; } -impl FieldArray for PrimitiveArray { +impl CursorArray for PrimitiveArray { type Values = PrimitiveValues; fn values(&self) -> Self::Values { @@ -185,7 +221,7 @@ impl CursorValues for ByteArrayValues { } } -impl FieldArray for GenericByteArray { +impl CursorArray for GenericByteArray { type Values = ByteArrayValues; fn values(&self) -> Self::Values { @@ -196,7 +232,7 @@ impl FieldArray for GenericByteArray { } } -/// A collection of sorted, nullable [`FieldArray`] +/// A collection of sorted, nullable [`CursorValues`] /// /// Note: comparing cursors with different `SortOptions` will yield an arbitrary ordering #[derive(Debug)] @@ -213,7 +249,7 @@ impl ArrayValues { /// to `options`. /// /// Panics if the array is empty - pub fn new>(options: SortOptions, array: &A) -> Self { + pub fn new>(options: SortOptions, array: &A) -> Self { assert!(array.len() > 0, "Empty array passed to FieldCursor"); let null_threshold = match options.nulls_first { true => array.null_count(), diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index 1ec6cbd0fe8e..422ff3aebdb3 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -88,7 +88,7 @@ pub(crate) struct SortPreservingMergeStream { /// target batch size batch_size: usize, - /// Vector that holds cursors for each non-exhausted input partition + /// Cursors for each input partition. `None` means the input is exhausted cursors: Vec>>, /// Optional number of rows to fetch diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index 1535765d86a3..4cabdc6e178c 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::sorts::cursor::{ArrayValues, FieldArray, RowValues}; +use crate::sorts::cursor::{ArrayValues, CursorArray, RowValues}; use crate::SendableRecordBatchStream; use crate::{PhysicalExpr, PhysicalSortExpr}; use arrow::array::Array; @@ -153,7 +153,7 @@ impl PartitionedStream for RowCursorStream { } /// Specialized stream for sorts on single primitive columns -pub struct FieldCursorStream { +pub struct FieldCursorStream { /// The physical expressions to sort by sort: PhysicalSortExpr, /// Input streams @@ -161,7 +161,7 @@ pub struct FieldCursorStream { phantom: PhantomData T>, } -impl std::fmt::Debug for FieldCursorStream { +impl std::fmt::Debug for FieldCursorStream { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("PrimitiveCursorStream") .field("num_streams", &self.streams) @@ -169,7 +169,7 @@ impl std::fmt::Debug for FieldCursorStream { } } -impl FieldCursorStream { +impl FieldCursorStream { pub fn new(sort: PhysicalSortExpr, streams: Vec) -> Self { let streams = streams.into_iter().map(|s| s.fuse()).collect(); Self { @@ -187,7 +187,7 @@ impl FieldCursorStream { } } -impl PartitionedStream for FieldCursorStream { +impl PartitionedStream for FieldCursorStream { type Output = Result<(ArrayValues, RecordBatch)>; fn partitions(&self) -> usize { From 611c25c8fc39ab1e14a4847d7c66c51f8c2850f1 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 19 Oct 2023 19:43:40 +0100 Subject: [PATCH 5/5] Tweak ByteArrayValues::value --- datafusion/physical-plan/src/sorts/cursor.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/cursor.rs b/datafusion/physical-plan/src/sorts/cursor.rs index 3c3989cc1a52..df90c97faf68 100644 --- a/datafusion/physical-plan/src/sorts/cursor.rs +++ b/datafusion/physical-plan/src/sorts/cursor.rs @@ -200,10 +200,13 @@ pub struct ByteArrayValues { impl ByteArrayValues { fn value(&self, idx: usize) -> &[u8] { - let end = self.offsets[idx + 1].as_usize(); - let start = self.offsets[idx].as_usize(); - // Safety: offsets are valid - unsafe { self.values.get_unchecked(start..end) } + assert!(idx < self.len()); + // Safety: offsets are valid and checked bounds above + unsafe { + let start = self.offsets.get_unchecked(idx).as_usize(); + let end = self.offsets.get_unchecked(idx + 1).as_usize(); + self.values.get_unchecked(start..end) + } } }