Skip to content

Commit

Permalink
Rename function (#12654)
Browse files Browse the repository at this point in the history
  • Loading branch information
akurmustafa authored Sep 28, 2024
1 parent 4df83f5 commit 689500f
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 20 deletions.
12 changes: 5 additions & 7 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub fn get_record_batch_at_indices(
record_batch: &RecordBatch,
indices: &PrimitiveArray<UInt32Type>,
) -> Result<RecordBatch> {
let new_columns = get_arrayref_at_indices(record_batch.columns(), indices)?;
let new_columns = take_arrays(record_batch.columns(), indices)?;
RecordBatch::try_new_with_options(
record_batch.schema(),
new_columns,
Expand Down Expand Up @@ -291,10 +291,7 @@ pub(crate) fn parse_identifiers(s: &str) -> Result<Vec<Ident>> {
}

/// Construct a new [`Vec`] of [`ArrayRef`] from the rows of the `arrays` at the `indices`.
pub fn get_arrayref_at_indices(
arrays: &[ArrayRef],
indices: &PrimitiveArray<UInt32Type>,
) -> Result<Vec<ArrayRef>> {
pub fn take_arrays(arrays: &[ArrayRef], indices: &dyn Array) -> Result<Vec<ArrayRef>> {
arrays
.iter()
.map(|array| {
Expand Down Expand Up @@ -1023,8 +1020,9 @@ mod tests {
vec![2, 4],
];
for row_indices in row_indices_vec {
let indices = PrimitiveArray::from_iter_values(row_indices.iter().cloned());
let chunk = get_arrayref_at_indices(&arrays, &indices)?;
let indices: PrimitiveArray<UInt32Type> =
PrimitiveArray::from_iter_values(row_indices.iter().cloned());
let chunk = take_arrays(&arrays, &indices)?;
for (arr_orig, arr_chunk) in arrays.iter().zip(&chunk) {
for (idx, orig_idx) in row_indices.iter().enumerate() {
let res1 = ScalarValue::try_from_array(arr_orig, *orig_idx as usize)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ use arrow::{
datatypes::UInt32Type,
};
use datafusion_common::{
arrow_datafusion_err, utils::get_arrayref_at_indices, DataFusionError, Result,
ScalarValue,
arrow_datafusion_err, utils::take_arrays, DataFusionError, Result, ScalarValue,
};
use datafusion_expr_common::accumulator::Accumulator;
use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
Expand Down Expand Up @@ -239,7 +238,7 @@ impl GroupsAccumulatorAdapter {
// reorder the values and opt_filter by batch_indices so that
// all values for each group are contiguous, then invoke the
// accumulator once per group with values
let values = get_arrayref_at_indices(values, &batch_indices)?;
let values = take_arrays(values, &batch_indices)?;
let opt_filter = get_filter_at_indices(opt_filter, &batch_indices)?;

// invoke each accumulator with the appropriate rows, first
Expand Down
6 changes: 3 additions & 3 deletions datafusion/functions-aggregate/src/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::sync::Arc;
use arrow::array::{ArrayRef, AsArray, BooleanArray};
use arrow::compute::{self, lexsort_to_indices, SortColumn};
use arrow::datatypes::{DataType, Field};
use datafusion_common::utils::{compare_rows, get_arrayref_at_indices, get_row_at_idx};
use datafusion_common::utils::{compare_rows, get_row_at_idx, take_arrays};
use datafusion_common::{
arrow_datafusion_err, internal_err, DataFusionError, Result, ScalarValue,
};
Expand Down Expand Up @@ -310,7 +310,7 @@ impl Accumulator for FirstValueAccumulator {
filtered_states
} else {
let indices = lexsort_to_indices(&sort_cols, None)?;
get_arrayref_at_indices(&filtered_states, &indices)?
take_arrays(&filtered_states, &indices)?
};
if !ordered_states[0].is_empty() {
let first_row = get_row_at_idx(&ordered_states, 0)?;
Expand Down Expand Up @@ -613,7 +613,7 @@ impl Accumulator for LastValueAccumulator {
filtered_states
} else {
let indices = lexsort_to_indices(&sort_cols, None)?;
get_arrayref_at_indices(&filtered_states, &indices)?
take_arrays(&filtered_states, &indices)?
};

if !ordered_states[0].is_empty() {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/sorts/partial_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl PartialSortExec {
input: Arc<dyn ExecutionPlan>,
common_prefix_length: usize,
) -> Self {
assert!(common_prefix_length > 0);
debug_assert!(common_prefix_length > 0);
let preserve_partitioning = false;
let cache = Self::compute_properties(&input, expr.clone(), preserve_partitioning);
Self {
Expand Down Expand Up @@ -289,7 +289,7 @@ impl ExecutionPlan for PartialSortExec {

// Make sure common prefix length is larger than 0
// Otherwise, we should use SortExec.
assert!(self.common_prefix_length > 0);
debug_assert!(self.common_prefix_length > 0);

Ok(Box::pin(PartialSortStream {
input,
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use arrow::record_batch::RecordBatch;
use arrow::row::{RowConverter, SortField};
use arrow_array::{Array, RecordBatchOptions, UInt32Array};
use arrow_schema::DataType;
use datafusion_common::utils::get_arrayref_at_indices;
use datafusion_common::utils::take_arrays;
use datafusion_common::{internal_err, Result};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
Expand Down Expand Up @@ -617,7 +617,7 @@ pub fn sort_batch(
lexsort_to_indices(&sort_columns, fetch)?
};

let columns = get_arrayref_at_indices(batch.columns(), &indices)?;
let columns = take_arrays(batch.columns(), &indices)?;

let options = RecordBatchOptions::new().with_row_count(Some(indices.len()));
Ok(RecordBatch::try_new_with_options(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ use arrow::{
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::stats::Precision;
use datafusion_common::utils::{
evaluate_partition_ranges, get_arrayref_at_indices, get_at_indices,
get_record_batch_at_indices, get_row_at_idx,
evaluate_partition_ranges, get_at_indices, get_record_batch_at_indices,
get_row_at_idx, take_arrays,
};
use datafusion_common::{arrow_datafusion_err, exec_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -542,7 +542,7 @@ impl PartitionSearcher for LinearSearch {
// We should emit columns according to row index ordering.
let sorted_indices = sort_to_indices(&all_indices, None, None)?;
// Construct new column according to row ordering. This fixes ordering
get_arrayref_at_indices(&new_columns, &sorted_indices).map(Some)
take_arrays(&new_columns, &sorted_indices).map(Some)
}

fn evaluate_partition_batches(
Expand Down

0 comments on commit 689500f

Please sign in to comment.