diff --git a/datafusion/common/src/utils/mod.rs b/datafusion/common/src/utils/mod.rs index 83f98ff9aff6..355d699721a3 100644 --- a/datafusion/common/src/utils/mod.rs +++ b/datafusion/common/src/utils/mod.rs @@ -98,7 +98,7 @@ pub fn get_record_batch_at_indices( record_batch: &RecordBatch, indices: &PrimitiveArray, ) -> Result { - let new_columns = get_arrayref_at_indices(record_batch.columns(), indices)?; + let new_columns = take_arrays(record_batch.columns(), indices)?; RecordBatch::try_new_with_options( record_batch.schema(), new_columns, @@ -291,10 +291,7 @@ pub(crate) fn parse_identifiers(s: &str) -> Result> { } /// Construct a new [`Vec`] of [`ArrayRef`] from the rows of the `arrays` at the `indices`. -pub fn get_arrayref_at_indices( - arrays: &[ArrayRef], - indices: &PrimitiveArray, -) -> Result> { +pub fn take_arrays(arrays: &[ArrayRef], indices: &dyn Array) -> Result> { arrays .iter() .map(|array| { @@ -1023,8 +1020,9 @@ mod tests { vec![2, 4], ]; for row_indices in row_indices_vec { - let indices = PrimitiveArray::from_iter_values(row_indices.iter().cloned()); - let chunk = get_arrayref_at_indices(&arrays, &indices)?; + let indices: PrimitiveArray = + PrimitiveArray::from_iter_values(row_indices.iter().cloned()); + let chunk = take_arrays(&arrays, &indices)?; for (arr_orig, arr_chunk) in arrays.iter().zip(&chunk) { for (idx, orig_idx) in row_indices.iter().enumerate() { let res1 = ScalarValue::try_from_array(arr_orig, *orig_idx as usize)?; diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs index e60f68972074..fbbf4d303515 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs @@ -29,8 +29,7 @@ use arrow::{ datatypes::UInt32Type, }; use datafusion_common::{ - arrow_datafusion_err, utils::get_arrayref_at_indices, DataFusionError, Result, - ScalarValue, + arrow_datafusion_err, utils::take_arrays, DataFusionError, Result, ScalarValue, }; use datafusion_expr_common::accumulator::Accumulator; use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator}; @@ -239,7 +238,7 @@ impl GroupsAccumulatorAdapter { // reorder the values and opt_filter by batch_indices so that // all values for each group are contiguous, then invoke the // accumulator once per group with values - let values = get_arrayref_at_indices(values, &batch_indices)?; + let values = take_arrays(values, &batch_indices)?; let opt_filter = get_filter_at_indices(opt_filter, &batch_indices)?; // invoke each accumulator with the appropriate rows, first diff --git a/datafusion/functions-aggregate/src/first_last.rs b/datafusion/functions-aggregate/src/first_last.rs index 30f5d5b07561..41ac7875795d 100644 --- a/datafusion/functions-aggregate/src/first_last.rs +++ b/datafusion/functions-aggregate/src/first_last.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use arrow::array::{ArrayRef, AsArray, BooleanArray}; use arrow::compute::{self, lexsort_to_indices, SortColumn}; use arrow::datatypes::{DataType, Field}; -use datafusion_common::utils::{compare_rows, get_arrayref_at_indices, get_row_at_idx}; +use datafusion_common::utils::{compare_rows, get_row_at_idx, take_arrays}; use datafusion_common::{ arrow_datafusion_err, internal_err, DataFusionError, Result, ScalarValue, }; @@ -310,7 +310,7 @@ impl Accumulator for FirstValueAccumulator { filtered_states } else { let indices = lexsort_to_indices(&sort_cols, None)?; - get_arrayref_at_indices(&filtered_states, &indices)? + take_arrays(&filtered_states, &indices)? }; if !ordered_states[0].is_empty() { let first_row = get_row_at_idx(&ordered_states, 0)?; @@ -613,7 +613,7 @@ impl Accumulator for LastValueAccumulator { filtered_states } else { let indices = lexsort_to_indices(&sort_cols, None)?; - get_arrayref_at_indices(&filtered_states, &indices)? + take_arrays(&filtered_states, &indices)? }; if !ordered_states[0].is_empty() { diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index 70a63e71ad2f..649c05d52e8b 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -104,7 +104,7 @@ impl PartialSortExec { input: Arc, common_prefix_length: usize, ) -> Self { - assert!(common_prefix_length > 0); + debug_assert!(common_prefix_length > 0); let preserve_partitioning = false; let cache = Self::compute_properties(&input, expr.clone(), preserve_partitioning); Self { @@ -289,7 +289,7 @@ impl ExecutionPlan for PartialSortExec { // Make sure common prefix length is larger than 0 // Otherwise, we should use SortExec. - assert!(self.common_prefix_length > 0); + debug_assert!(self.common_prefix_length > 0); Ok(Box::pin(PartialSortStream { input, diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 64434e7a4a04..91816713c6c3 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -46,7 +46,7 @@ use arrow::record_batch::RecordBatch; use arrow::row::{RowConverter, SortField}; use arrow_array::{Array, RecordBatchOptions, UInt32Array}; use arrow_schema::DataType; -use datafusion_common::utils::get_arrayref_at_indices; +use datafusion_common::utils::take_arrays; use datafusion_common::{internal_err, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; @@ -617,7 +617,7 @@ pub fn sort_batch( lexsort_to_indices(&sort_columns, fetch)? }; - let columns = get_arrayref_at_indices(batch.columns(), &indices)?; + let columns = take_arrays(batch.columns(), &indices)?; let options = RecordBatchOptions::new().with_row_count(Some(indices.len())); Ok(RecordBatch::try_new_with_options( diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 001e134581c0..9510baab51fb 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -49,8 +49,8 @@ use arrow::{ use datafusion_common::hash_utils::create_hashes; use datafusion_common::stats::Precision; use datafusion_common::utils::{ - evaluate_partition_ranges, get_arrayref_at_indices, get_at_indices, - get_record_batch_at_indices, get_row_at_idx, + evaluate_partition_ranges, get_at_indices, get_record_batch_at_indices, + get_row_at_idx, take_arrays, }; use datafusion_common::{arrow_datafusion_err, exec_err, DataFusionError, Result}; use datafusion_execution::TaskContext; @@ -542,7 +542,7 @@ impl PartitionSearcher for LinearSearch { // We should emit columns according to row index ordering. let sorted_indices = sort_to_indices(&all_indices, None, None)?; // Construct new column according to row ordering. This fixes ordering - get_arrayref_at_indices(&new_columns, &sorted_indices).map(Some) + take_arrays(&new_columns, &sorted_indices).map(Some) } fn evaluate_partition_batches(