From be6ba6def73718c9191023a16e3641755f8be29e Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Thu, 17 Jun 2021 17:04:18 +0800 Subject: [PATCH 1/5] use vec for concat --- .../physical_plan/expressions/nth_value.rs | 23 ++++--- .../physical_plan/expressions/row_number.rs | 13 ++-- .../src/physical_plan/window_functions.rs | 9 ++- datafusion/src/physical_plan/windows.rs | 61 ++++++++----------- 4 files changed, 55 insertions(+), 51 deletions(-) diff --git a/datafusion/src/physical_plan/expressions/nth_value.rs b/datafusion/src/physical_plan/expressions/nth_value.rs index 577c19b54ade..39fa33b1e79a 100644 --- a/datafusion/src/physical_plan/expressions/nth_value.rs +++ b/datafusion/src/physical_plan/expressions/nth_value.rs @@ -20,9 +20,11 @@ use crate::error::{DataFusionError, Result}; use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr}; use crate::scalar::ScalarValue; -use arrow::array::{new_empty_array, new_null_array, ArrayRef}; +use arrow::array::ArrayRef; use arrow::datatypes::{DataType, Field}; use std::any::Any; +use std::convert::TryFrom; +use std::iter; use std::sync::Arc; /// nth_value kind @@ -111,7 +113,11 @@ impl BuiltInWindowFunctionExpr for NthValue { &self.name } - fn evaluate(&self, num_rows: usize, values: &[ArrayRef]) -> Result { + fn evaluate( + &self, + num_rows: usize, + values: &[ArrayRef], + ) -> Result>> { if values.is_empty() { return Err(DataFusionError::Execution(format!( "No arguments supplied to {}", @@ -128,19 +134,20 @@ impl BuiltInWindowFunctionExpr for NthValue { ))); } if num_rows == 0 { - return Ok(new_empty_array(value.data_type())); + return Ok(Box::new(iter::empty())); } let index: usize = match self.kind { NthValueKind::First => 0, NthValueKind::Last => (num_rows as usize) - 1, NthValueKind::Nth(n) => (n as usize) - 1, }; - Ok(if index >= num_rows { - new_null_array(value.data_type(), num_rows) + let scalar = if index >= num_rows { + let data_type: &DataType = value.data_type(); + ScalarValue::try_from(data_type) } else { - let value = ScalarValue::try_from_array(value, index)?; - value.to_array_of_size(num_rows) - }) + ScalarValue::try_from_array(value, index) + }?; + Ok(Box::new(iter::repeat(scalar).take(num_rows))) } } diff --git a/datafusion/src/physical_plan/expressions/row_number.rs b/datafusion/src/physical_plan/expressions/row_number.rs index 0444ee971f40..9557c0be8c70 100644 --- a/datafusion/src/physical_plan/expressions/row_number.rs +++ b/datafusion/src/physical_plan/expressions/row_number.rs @@ -19,7 +19,8 @@ use crate::error::Result; use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr}; -use arrow::array::{ArrayRef, UInt64Array}; +use crate::scalar::ScalarValue; +use arrow::array::ArrayRef; use arrow::datatypes::{DataType, Field}; use std::any::Any; use std::sync::Arc; @@ -57,10 +58,12 @@ impl BuiltInWindowFunctionExpr for RowNumber { self.name.as_str() } - fn evaluate(&self, num_rows: usize, _values: &[ArrayRef]) -> Result { - Ok(Arc::new(UInt64Array::from_iter_values( - (1..num_rows + 1).map(|i| i as u64), - ))) + fn evaluate( + &self, + num_rows: usize, + _values: &[ArrayRef], + ) -> Result>> { + Ok(Box::new((1..(num_rows as u64) + 1).map(|i| i.into()))) } } diff --git a/datafusion/src/physical_plan/window_functions.rs b/datafusion/src/physical_plan/window_functions.rs index 4f56aa7d3826..b19423c0fa49 100644 --- a/datafusion/src/physical_plan/window_functions.rs +++ b/datafusion/src/physical_plan/window_functions.rs @@ -20,13 +20,14 @@ //! //! see also https://www.postgresql.org/docs/current/functions-window.html -use crate::arrow::array::ArrayRef; use crate::arrow::datatypes::Field; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ aggregates, aggregates::AggregateFunction, functions::Signature, type_coercion::data_types, PhysicalExpr, }; +use crate::scalar::ScalarValue; +use arrow::array::ArrayRef; use arrow::datatypes::DataType; use std::any::Any; use std::sync::Arc; @@ -231,7 +232,11 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug { } /// Evaluate the built-in window function against the number of rows and the arguments - fn evaluate(&self, num_rows: usize, values: &[ArrayRef]) -> Result; + fn evaluate( + &self, + num_rows: usize, + values: &[ArrayRef], + ) -> Result>>; } #[cfg(test)] diff --git a/datafusion/src/physical_plan/windows.rs b/datafusion/src/physical_plan/windows.rs index 2f539057c82f..8a8c715b29fe 100644 --- a/datafusion/src/physical_plan/windows.rs +++ b/datafusion/src/physical_plan/windows.rs @@ -29,7 +29,7 @@ use crate::physical_plan::{ Accumulator, AggregateExpr, Distribution, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, WindowExpr, }; -use arrow::compute::concat; +use crate::scalar::ScalarValue; use arrow::{ array::ArrayRef, datatypes::{Field, Schema, SchemaRef}, @@ -42,6 +42,7 @@ use futures::Future; use pin_project_lite::pin_project; use std::any::Any; use std::convert::TryInto; +use std::iter; use std::ops::Range; use std::pin::Pin; use std::sync::Arc; @@ -176,22 +177,17 @@ impl WindowExpr for BuiltInWindowExpr { batch.num_rows(), &self.partition_columns(batch)?, )?; - let results = partition_points - .iter() - .map(|partition_range| { - let start = partition_range.start; - let len = partition_range.end - start; - let values = values - .iter() - .map(|arr| arr.slice(start, len)) - .collect::>(); - self.window.evaluate(len, &values) - }) - .collect::>>()? - .into_iter() - .collect::>(); - let results = results.iter().map(|i| i.as_ref()).collect::>(); - concat(&results).map_err(DataFusionError::ArrowError) + let mut results = Vec::with_capacity(batch.num_rows()); + for partition_range in partition_points { + let start = partition_range.start; + let len = partition_range.end - start; + let values = values + .iter() + .map(|arr| arr.slice(start, len)) + .collect::>(); + results.extend(self.window.evaluate(len, &values)?); + } + ScalarValue::iter_to_array(results.into_iter()) } } @@ -244,23 +240,16 @@ impl AggregateWindowExpr { let sort_partition_points = self.evaluate_partition_points(num_rows, &self.sort_columns(batch)?)?; let values = self.evaluate_args(batch)?; - let results = partition_points - .iter() - .map(|partition_range| { - let sort_partition_points = - find_ranges_in_range(partition_range, &sort_partition_points); - let mut window_accumulators = self.create_accumulator()?; - sort_partition_points - .iter() - .map(|range| window_accumulators.scan_peers(&values, range)) - .collect::>>() - }) - .collect::>>>()? - .into_iter() - .flatten() - .collect::>(); - let results = results.iter().map(|i| i.as_ref()).collect::>(); - concat(&results).map_err(DataFusionError::ArrowError) + let mut result = Vec::with_capacity(num_rows); + for partition_range in partition_points { + let sort_partition_points = + find_ranges_in_range(&partition_range, &sort_partition_points); + let mut window_accumulators = self.create_accumulator()?; + for range in sort_partition_points { + result.extend(window_accumulators.scan_peers(&values, range)?); + } + } + ScalarValue::iter_to_array(result.into_iter()) } fn group_based_evaluate(&self, _batch: &RecordBatch) -> Result { @@ -328,7 +317,7 @@ impl AggregateWindowAccumulator { &mut self, values: &[ArrayRef], value_range: &Range, - ) -> Result { + ) -> Result> { if value_range.is_empty() { return Err(DataFusionError::Internal( "Value range cannot be empty".to_owned(), @@ -341,7 +330,7 @@ impl AggregateWindowAccumulator { .collect::>(); self.accumulator.update_batch(&values)?; let value = self.accumulator.evaluate()?; - Ok(value.to_array_of_size(len)) + Ok(iter::repeat(value).take(len)) } } From eb2898da5e8a470acbab3bf769dbffb17f9cf2d6 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Thu, 17 Jun 2021 20:08:36 +0800 Subject: [PATCH 2/5] change to iterator style --- .../physical_plan/expressions/nth_value.rs | 3 +- .../physical_plan/expressions/row_number.rs | 6 ++- datafusion/src/physical_plan/windows.rs | 52 ++++++++++++------- 3 files changed, 38 insertions(+), 23 deletions(-) diff --git a/datafusion/src/physical_plan/expressions/nth_value.rs b/datafusion/src/physical_plan/expressions/nth_value.rs index 39fa33b1e79a..4bf372e640e1 100644 --- a/datafusion/src/physical_plan/expressions/nth_value.rs +++ b/datafusion/src/physical_plan/expressions/nth_value.rs @@ -164,7 +164,8 @@ mod tests { let values = vec![arr]; let schema = Schema::new(vec![Field::new("arr", DataType::Int32, false)]); let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?; - let result = expr.evaluate(batch.num_rows(), &values)?; + let result = + ScalarValue::iter_to_array(expr.evaluate(batch.num_rows(), &values)?)?; let result = result.as_any().downcast_ref::().unwrap(); let result = result.values(); assert_eq!(expected, result); diff --git a/datafusion/src/physical_plan/expressions/row_number.rs b/datafusion/src/physical_plan/expressions/row_number.rs index 9557c0be8c70..047dfb0f5979 100644 --- a/datafusion/src/physical_plan/expressions/row_number.rs +++ b/datafusion/src/physical_plan/expressions/row_number.rs @@ -82,7 +82,8 @@ mod tests { let schema = Schema::new(vec![Field::new("arr", DataType::Boolean, false)]); let batch = RecordBatch::try_new(Arc::new(schema), vec![arr])?; let row_number = RowNumber::new("row_number".to_owned()); - let result = row_number.evaluate(batch.num_rows(), &[])?; + let result = + ScalarValue::iter_to_array(row_number.evaluate(batch.num_rows(), &[])?)?; let result = result.as_any().downcast_ref::().unwrap(); let result = result.values(); assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], result); @@ -97,7 +98,8 @@ mod tests { let schema = Schema::new(vec![Field::new("arr", DataType::Boolean, false)]); let batch = RecordBatch::try_new(Arc::new(schema), vec![arr])?; let row_number = RowNumber::new("row_number".to_owned()); - let result = row_number.evaluate(batch.num_rows(), &[])?; + let result = + ScalarValue::iter_to_array(row_number.evaluate(batch.num_rows(), &[])?)?; let result = result.as_any().downcast_ref::().unwrap(); let result = result.values(); assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], result); diff --git a/datafusion/src/physical_plan/windows.rs b/datafusion/src/physical_plan/windows.rs index 8a8c715b29fe..2969b111e5c1 100644 --- a/datafusion/src/physical_plan/windows.rs +++ b/datafusion/src/physical_plan/windows.rs @@ -177,17 +177,22 @@ impl WindowExpr for BuiltInWindowExpr { batch.num_rows(), &self.partition_columns(batch)?, )?; - let mut results = Vec::with_capacity(batch.num_rows()); - for partition_range in partition_points { - let start = partition_range.start; - let len = partition_range.end - start; - let values = values + ScalarValue::iter_to_array( + partition_points .iter() - .map(|arr| arr.slice(start, len)) - .collect::>(); - results.extend(self.window.evaluate(len, &values)?); - } - ScalarValue::iter_to_array(results.into_iter()) + .map(|partition_range| { + let start = partition_range.start; + let len = partition_range.end - start; + let values = values + .iter() + .map(|arr| arr.slice(start, len)) + .collect::>(); + self.window.evaluate(len, &values) + }) + .collect::>>()? + .into_iter() + .flatten(), + ) } } @@ -240,16 +245,23 @@ impl AggregateWindowExpr { let sort_partition_points = self.evaluate_partition_points(num_rows, &self.sort_columns(batch)?)?; let values = self.evaluate_args(batch)?; - let mut result = Vec::with_capacity(num_rows); - for partition_range in partition_points { - let sort_partition_points = - find_ranges_in_range(&partition_range, &sort_partition_points); - let mut window_accumulators = self.create_accumulator()?; - for range in sort_partition_points { - result.extend(window_accumulators.scan_peers(&values, range)?); - } - } - ScalarValue::iter_to_array(result.into_iter()) + ScalarValue::iter_to_array( + partition_points + .iter() + .map::, _>(|partition_range| { + let sort_partition_points = + find_ranges_in_range(partition_range, &sort_partition_points); + let mut window_accumulators = self.create_accumulator()?; + sort_partition_points + .iter() + .map(|range| window_accumulators.scan_peers(&values, range)) + .collect::>>() + }) + .collect::>>()? + .into_iter() + .flatten() + .flatten(), + ) } fn group_based_evaluate(&self, _batch: &RecordBatch) -> Result { From 055941fc22d1d188a28ccc032b2600421727830d Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Thu, 17 Jun 2021 20:25:09 +0800 Subject: [PATCH 3/5] Revert "change to iterator style" This reverts commit 284f0a358663177d1003d8fb8afd283389e8566b. --- .../physical_plan/expressions/nth_value.rs | 3 +- .../physical_plan/expressions/row_number.rs | 6 +-- datafusion/src/physical_plan/windows.rs | 52 +++++++------------ 3 files changed, 23 insertions(+), 38 deletions(-) diff --git a/datafusion/src/physical_plan/expressions/nth_value.rs b/datafusion/src/physical_plan/expressions/nth_value.rs index 4bf372e640e1..39fa33b1e79a 100644 --- a/datafusion/src/physical_plan/expressions/nth_value.rs +++ b/datafusion/src/physical_plan/expressions/nth_value.rs @@ -164,8 +164,7 @@ mod tests { let values = vec![arr]; let schema = Schema::new(vec![Field::new("arr", DataType::Int32, false)]); let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?; - let result = - ScalarValue::iter_to_array(expr.evaluate(batch.num_rows(), &values)?)?; + let result = expr.evaluate(batch.num_rows(), &values)?; let result = result.as_any().downcast_ref::().unwrap(); let result = result.values(); assert_eq!(expected, result); diff --git a/datafusion/src/physical_plan/expressions/row_number.rs b/datafusion/src/physical_plan/expressions/row_number.rs index 047dfb0f5979..9557c0be8c70 100644 --- a/datafusion/src/physical_plan/expressions/row_number.rs +++ b/datafusion/src/physical_plan/expressions/row_number.rs @@ -82,8 +82,7 @@ mod tests { let schema = Schema::new(vec![Field::new("arr", DataType::Boolean, false)]); let batch = RecordBatch::try_new(Arc::new(schema), vec![arr])?; let row_number = RowNumber::new("row_number".to_owned()); - let result = - ScalarValue::iter_to_array(row_number.evaluate(batch.num_rows(), &[])?)?; + let result = row_number.evaluate(batch.num_rows(), &[])?; let result = result.as_any().downcast_ref::().unwrap(); let result = result.values(); assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], result); @@ -98,8 +97,7 @@ mod tests { let schema = Schema::new(vec![Field::new("arr", DataType::Boolean, false)]); let batch = RecordBatch::try_new(Arc::new(schema), vec![arr])?; let row_number = RowNumber::new("row_number".to_owned()); - let result = - ScalarValue::iter_to_array(row_number.evaluate(batch.num_rows(), &[])?)?; + let result = row_number.evaluate(batch.num_rows(), &[])?; let result = result.as_any().downcast_ref::().unwrap(); let result = result.values(); assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], result); diff --git a/datafusion/src/physical_plan/windows.rs b/datafusion/src/physical_plan/windows.rs index 2969b111e5c1..8a8c715b29fe 100644 --- a/datafusion/src/physical_plan/windows.rs +++ b/datafusion/src/physical_plan/windows.rs @@ -177,22 +177,17 @@ impl WindowExpr for BuiltInWindowExpr { batch.num_rows(), &self.partition_columns(batch)?, )?; - ScalarValue::iter_to_array( - partition_points + let mut results = Vec::with_capacity(batch.num_rows()); + for partition_range in partition_points { + let start = partition_range.start; + let len = partition_range.end - start; + let values = values .iter() - .map(|partition_range| { - let start = partition_range.start; - let len = partition_range.end - start; - let values = values - .iter() - .map(|arr| arr.slice(start, len)) - .collect::>(); - self.window.evaluate(len, &values) - }) - .collect::>>()? - .into_iter() - .flatten(), - ) + .map(|arr| arr.slice(start, len)) + .collect::>(); + results.extend(self.window.evaluate(len, &values)?); + } + ScalarValue::iter_to_array(results.into_iter()) } } @@ -245,23 +240,16 @@ impl AggregateWindowExpr { let sort_partition_points = self.evaluate_partition_points(num_rows, &self.sort_columns(batch)?)?; let values = self.evaluate_args(batch)?; - ScalarValue::iter_to_array( - partition_points - .iter() - .map::, _>(|partition_range| { - let sort_partition_points = - find_ranges_in_range(partition_range, &sort_partition_points); - let mut window_accumulators = self.create_accumulator()?; - sort_partition_points - .iter() - .map(|range| window_accumulators.scan_peers(&values, range)) - .collect::>>() - }) - .collect::>>()? - .into_iter() - .flatten() - .flatten(), - ) + let mut result = Vec::with_capacity(num_rows); + for partition_range in partition_points { + let sort_partition_points = + find_ranges_in_range(&partition_range, &sort_partition_points); + let mut window_accumulators = self.create_accumulator()?; + for range in sort_partition_points { + result.extend(window_accumulators.scan_peers(&values, range)?); + } + } + ScalarValue::iter_to_array(result.into_iter()) } fn group_based_evaluate(&self, _batch: &RecordBatch) -> Result { From 5fc1e40cae84caaa8fa00f8eacd95c2d436c08d2 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Thu, 17 Jun 2021 20:36:47 +0800 Subject: [PATCH 4/5] Revert "Revert "change to iterator style"" This reverts commit 3cc8c6b559164b1823c4c6b74f885bd6941d6095. --- .../physical_plan/expressions/nth_value.rs | 3 +- .../physical_plan/expressions/row_number.rs | 6 ++- datafusion/src/physical_plan/windows.rs | 52 ++++++++++++------- 3 files changed, 38 insertions(+), 23 deletions(-) diff --git a/datafusion/src/physical_plan/expressions/nth_value.rs b/datafusion/src/physical_plan/expressions/nth_value.rs index 39fa33b1e79a..4bf372e640e1 100644 --- a/datafusion/src/physical_plan/expressions/nth_value.rs +++ b/datafusion/src/physical_plan/expressions/nth_value.rs @@ -164,7 +164,8 @@ mod tests { let values = vec![arr]; let schema = Schema::new(vec![Field::new("arr", DataType::Int32, false)]); let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?; - let result = expr.evaluate(batch.num_rows(), &values)?; + let result = + ScalarValue::iter_to_array(expr.evaluate(batch.num_rows(), &values)?)?; let result = result.as_any().downcast_ref::().unwrap(); let result = result.values(); assert_eq!(expected, result); diff --git a/datafusion/src/physical_plan/expressions/row_number.rs b/datafusion/src/physical_plan/expressions/row_number.rs index 9557c0be8c70..047dfb0f5979 100644 --- a/datafusion/src/physical_plan/expressions/row_number.rs +++ b/datafusion/src/physical_plan/expressions/row_number.rs @@ -82,7 +82,8 @@ mod tests { let schema = Schema::new(vec![Field::new("arr", DataType::Boolean, false)]); let batch = RecordBatch::try_new(Arc::new(schema), vec![arr])?; let row_number = RowNumber::new("row_number".to_owned()); - let result = row_number.evaluate(batch.num_rows(), &[])?; + let result = + ScalarValue::iter_to_array(row_number.evaluate(batch.num_rows(), &[])?)?; let result = result.as_any().downcast_ref::().unwrap(); let result = result.values(); assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], result); @@ -97,7 +98,8 @@ mod tests { let schema = Schema::new(vec![Field::new("arr", DataType::Boolean, false)]); let batch = RecordBatch::try_new(Arc::new(schema), vec![arr])?; let row_number = RowNumber::new("row_number".to_owned()); - let result = row_number.evaluate(batch.num_rows(), &[])?; + let result = + ScalarValue::iter_to_array(row_number.evaluate(batch.num_rows(), &[])?)?; let result = result.as_any().downcast_ref::().unwrap(); let result = result.values(); assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], result); diff --git a/datafusion/src/physical_plan/windows.rs b/datafusion/src/physical_plan/windows.rs index 8a8c715b29fe..2969b111e5c1 100644 --- a/datafusion/src/physical_plan/windows.rs +++ b/datafusion/src/physical_plan/windows.rs @@ -177,17 +177,22 @@ impl WindowExpr for BuiltInWindowExpr { batch.num_rows(), &self.partition_columns(batch)?, )?; - let mut results = Vec::with_capacity(batch.num_rows()); - for partition_range in partition_points { - let start = partition_range.start; - let len = partition_range.end - start; - let values = values + ScalarValue::iter_to_array( + partition_points .iter() - .map(|arr| arr.slice(start, len)) - .collect::>(); - results.extend(self.window.evaluate(len, &values)?); - } - ScalarValue::iter_to_array(results.into_iter()) + .map(|partition_range| { + let start = partition_range.start; + let len = partition_range.end - start; + let values = values + .iter() + .map(|arr| arr.slice(start, len)) + .collect::>(); + self.window.evaluate(len, &values) + }) + .collect::>>()? + .into_iter() + .flatten(), + ) } } @@ -240,16 +245,23 @@ impl AggregateWindowExpr { let sort_partition_points = self.evaluate_partition_points(num_rows, &self.sort_columns(batch)?)?; let values = self.evaluate_args(batch)?; - let mut result = Vec::with_capacity(num_rows); - for partition_range in partition_points { - let sort_partition_points = - find_ranges_in_range(&partition_range, &sort_partition_points); - let mut window_accumulators = self.create_accumulator()?; - for range in sort_partition_points { - result.extend(window_accumulators.scan_peers(&values, range)?); - } - } - ScalarValue::iter_to_array(result.into_iter()) + ScalarValue::iter_to_array( + partition_points + .iter() + .map::, _>(|partition_range| { + let sort_partition_points = + find_ranges_in_range(partition_range, &sort_partition_points); + let mut window_accumulators = self.create_accumulator()?; + sort_partition_points + .iter() + .map(|range| window_accumulators.scan_peers(&values, range)) + .collect::>>() + }) + .collect::>>()? + .into_iter() + .flatten() + .flatten(), + ) } fn group_based_evaluate(&self, _batch: &RecordBatch) -> Result { From 47645e7f25f2c2bf89f70a6cb323e9e8437d7afc Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Tue, 22 Jun 2021 18:57:58 +0800 Subject: [PATCH 5/5] change to use concat in partitions --- .../physical_plan/expressions/nth_value.rs | 28 +++---- .../physical_plan/expressions/row_number.rs | 19 ++--- .../src/physical_plan/window_functions.rs | 7 +- datafusion/src/physical_plan/windows.rs | 73 ++++++++++--------- 4 files changed, 58 insertions(+), 69 deletions(-) diff --git a/datafusion/src/physical_plan/expressions/nth_value.rs b/datafusion/src/physical_plan/expressions/nth_value.rs index 4bf372e640e1..cb03fde3ae91 100644 --- a/datafusion/src/physical_plan/expressions/nth_value.rs +++ b/datafusion/src/physical_plan/expressions/nth_value.rs @@ -20,11 +20,9 @@ use crate::error::{DataFusionError, Result}; use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr}; use crate::scalar::ScalarValue; -use arrow::array::ArrayRef; +use arrow::array::{new_null_array, ArrayRef}; use arrow::datatypes::{DataType, Field}; use std::any::Any; -use std::convert::TryFrom; -use std::iter; use std::sync::Arc; /// nth_value kind @@ -113,11 +111,7 @@ impl BuiltInWindowFunctionExpr for NthValue { &self.name } - fn evaluate( - &self, - num_rows: usize, - values: &[ArrayRef], - ) -> Result>> { + fn evaluate(&self, num_rows: usize, values: &[ArrayRef]) -> Result { if values.is_empty() { return Err(DataFusionError::Execution(format!( "No arguments supplied to {}", @@ -133,21 +127,20 @@ impl BuiltInWindowFunctionExpr for NthValue { value.len() ))); } - if num_rows == 0 { - return Ok(Box::new(iter::empty())); - } + assert!(num_rows > 0, "Impossibly got empty values"); let index: usize = match self.kind { NthValueKind::First => 0, NthValueKind::Last => (num_rows as usize) - 1, NthValueKind::Nth(n) => (n as usize) - 1, }; - let scalar = if index >= num_rows { + + Ok(if index >= num_rows { let data_type: &DataType = value.data_type(); - ScalarValue::try_from(data_type) + new_null_array(data_type, num_rows) } else { - ScalarValue::try_from_array(value, index) - }?; - Ok(Box::new(iter::repeat(scalar).take(num_rows))) + let scalar = ScalarValue::try_from_array(value, num_rows)?; + scalar.to_array_of_size(num_rows) + }) } } @@ -164,8 +157,7 @@ mod tests { let values = vec![arr]; let schema = Schema::new(vec![Field::new("arr", DataType::Int32, false)]); let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?; - let result = - ScalarValue::iter_to_array(expr.evaluate(batch.num_rows(), &values)?)?; + let result = expr.evaluate(batch.num_rows(), &values)?; let result = result.as_any().downcast_ref::().unwrap(); let result = result.values(); assert_eq!(expected, result); diff --git a/datafusion/src/physical_plan/expressions/row_number.rs b/datafusion/src/physical_plan/expressions/row_number.rs index 047dfb0f5979..7c4d0324e830 100644 --- a/datafusion/src/physical_plan/expressions/row_number.rs +++ b/datafusion/src/physical_plan/expressions/row_number.rs @@ -19,8 +19,7 @@ use crate::error::Result; use crate::physical_plan::{window_functions::BuiltInWindowFunctionExpr, PhysicalExpr}; -use crate::scalar::ScalarValue; -use arrow::array::ArrayRef; +use arrow::array::{ArrayRef, UInt64Array}; use arrow::datatypes::{DataType, Field}; use std::any::Any; use std::sync::Arc; @@ -58,12 +57,10 @@ impl BuiltInWindowFunctionExpr for RowNumber { self.name.as_str() } - fn evaluate( - &self, - num_rows: usize, - _values: &[ArrayRef], - ) -> Result>> { - Ok(Box::new((1..(num_rows as u64) + 1).map(|i| i.into()))) + fn evaluate(&self, num_rows: usize, _values: &[ArrayRef]) -> Result { + Ok(Arc::new(UInt64Array::from_iter_values( + 1..(num_rows as u64) + 1, + ))) } } @@ -82,8 +79,7 @@ mod tests { let schema = Schema::new(vec![Field::new("arr", DataType::Boolean, false)]); let batch = RecordBatch::try_new(Arc::new(schema), vec![arr])?; let row_number = RowNumber::new("row_number".to_owned()); - let result = - ScalarValue::iter_to_array(row_number.evaluate(batch.num_rows(), &[])?)?; + let result = row_number.evaluate(batch.num_rows(), &[])?; let result = result.as_any().downcast_ref::().unwrap(); let result = result.values(); assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], result); @@ -98,8 +94,7 @@ mod tests { let schema = Schema::new(vec![Field::new("arr", DataType::Boolean, false)]); let batch = RecordBatch::try_new(Arc::new(schema), vec![arr])?; let row_number = RowNumber::new("row_number".to_owned()); - let result = - ScalarValue::iter_to_array(row_number.evaluate(batch.num_rows(), &[])?)?; + let result = row_number.evaluate(batch.num_rows(), &[])?; let result = result.as_any().downcast_ref::().unwrap(); let result = result.values(); assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], result); diff --git a/datafusion/src/physical_plan/window_functions.rs b/datafusion/src/physical_plan/window_functions.rs index b19423c0fa49..366f59a3a405 100644 --- a/datafusion/src/physical_plan/window_functions.rs +++ b/datafusion/src/physical_plan/window_functions.rs @@ -26,7 +26,6 @@ use crate::physical_plan::{ aggregates, aggregates::AggregateFunction, functions::Signature, type_coercion::data_types, PhysicalExpr, }; -use crate::scalar::ScalarValue; use arrow::array::ArrayRef; use arrow::datatypes::DataType; use std::any::Any; @@ -232,11 +231,7 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug { } /// Evaluate the built-in window function against the number of rows and the arguments - fn evaluate( - &self, - num_rows: usize, - values: &[ArrayRef], - ) -> Result>>; + fn evaluate(&self, num_rows: usize, values: &[ArrayRef]) -> Result; } #[cfg(test)] diff --git a/datafusion/src/physical_plan/windows.rs b/datafusion/src/physical_plan/windows.rs index 2969b111e5c1..a5f23374a53b 100644 --- a/datafusion/src/physical_plan/windows.rs +++ b/datafusion/src/physical_plan/windows.rs @@ -30,6 +30,7 @@ use crate::physical_plan::{ RecordBatchStream, SendableRecordBatchStream, WindowExpr, }; use crate::scalar::ScalarValue; +use arrow::compute::concat; use arrow::{ array::ArrayRef, datatypes::{Field, Schema, SchemaRef}, @@ -177,22 +178,24 @@ impl WindowExpr for BuiltInWindowExpr { batch.num_rows(), &self.partition_columns(batch)?, )?; - ScalarValue::iter_to_array( - partition_points - .iter() - .map(|partition_range| { - let start = partition_range.start; - let len = partition_range.end - start; - let values = values - .iter() - .map(|arr| arr.slice(start, len)) - .collect::>(); - self.window.evaluate(len, &values) - }) - .collect::>>()? - .into_iter() - .flatten(), - ) + let results = partition_points + .iter() + .map(|partition_range| { + let start = partition_range.start; + let len = partition_range.end - start; + let values = values + .iter() + .map(|arr| arr.slice(start, len)) + .collect::>(); + self.window.evaluate(len, &values) + }) + .collect::>>()?; + if results.len() == 1 { + Ok(results[0].clone()) + } else { + let results = results.iter().map(|i| i.as_ref()).collect::>(); + concat(&results).map_err(DataFusionError::ArrowError) + } } } @@ -245,23 +248,27 @@ impl AggregateWindowExpr { let sort_partition_points = self.evaluate_partition_points(num_rows, &self.sort_columns(batch)?)?; let values = self.evaluate_args(batch)?; - ScalarValue::iter_to_array( - partition_points - .iter() - .map::, _>(|partition_range| { - let sort_partition_points = - find_ranges_in_range(partition_range, &sort_partition_points); - let mut window_accumulators = self.create_accumulator()?; - sort_partition_points - .iter() - .map(|range| window_accumulators.scan_peers(&values, range)) - .collect::>>() - }) - .collect::>>()? - .into_iter() - .flatten() - .flatten(), - ) + let results = partition_points + .iter() + .map::, _>(|partition_range| { + let sort_partition_points = + find_ranges_in_range(partition_range, &sort_partition_points); + let mut window_accumulators = self.create_accumulator()?; + let result = sort_partition_points + .iter() + .map(|range| window_accumulators.scan_peers(&values, range)) + .collect::>>()? + .into_iter() + .flatten(); + ScalarValue::iter_to_array(result) + }) + .collect::>>()?; + if results.len() == 1 { + Ok(results[0].clone()) + } else { + let results = results.iter().map(|i| i.as_ref()).collect::>(); + concat(&results).map_err(DataFusionError::ArrowError) + } } fn group_based_evaluate(&self, _batch: &RecordBatch) -> Result {