diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs index 960d7c5d8e0d7..e09f325923a3e 100644 --- a/datafusion/src/physical_plan/functions.rs +++ b/datafusion/src/physical_plan/functions.rs @@ -43,13 +43,14 @@ use crate::{ scalar::ScalarValue, }; use arrow::{ - array::ArrayRef, + array::{ArrayRef, NullArray}, compute::kernels::length::{bit_length, length}, datatypes::TimeUnit, datatypes::{DataType, Field, Int32Type, Int64Type, Schema}, record_batch::RecordBatch, }; use fmt::{Debug, Formatter}; +use std::convert::From; use std::{any::Any, fmt, str::FromStr, sync::Arc}; /// A function's signature, which defines the function's supported argument types. @@ -75,6 +76,13 @@ pub enum Signature { } /// Scalar function +/// +/// The Fn param is the wrapped function but be aware that the function will +/// be passed with the slice / vec of columnar values (either scalar or array) +/// with the exception of zero param function, where a singular element vec +/// will be passed. In that case the single element is a null array to indicate +/// the batch's row count (so that the generative zero-argument function can know +/// the result array size). pub type ScalarFunctionImplementation = Arc Result + Send + Sync>; @@ -1358,6 +1366,17 @@ impl fmt::Display for ScalarFunctionExpr { } } +/// null columnar values are implemented as a null array in order to pass batch +/// num_rows +type NullColumnarValue = ColumnarValue; + +impl From<&RecordBatch> for NullColumnarValue { + fn from(batch: &RecordBatch) -> Self { + let num_rows = batch.num_rows(); + ColumnarValue::Array(Arc::new(NullArray::new(num_rows))) + } +} + impl PhysicalExpr for ScalarFunctionExpr { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -1373,12 +1392,16 @@ impl PhysicalExpr for ScalarFunctionExpr { } fn evaluate(&self, batch: &RecordBatch) -> Result { - // evaluate the arguments - let inputs = self - .args - .iter() - .map(|e| e.evaluate(batch)) - .collect::>>()?; + // evaluate the arguments, if there are no arguments we'll instead pass in a null array + // indicating the batch size (as a convention) + let inputs = match self.args.len() { + 0 => vec![NullColumnarValue::from(batch)], + _ => self + .args + .iter() + .map(|e| e.evaluate(batch)) + .collect::>>()?, + }; // evaluate the function let fun = self.fun.as_ref(); @@ -1386,7 +1409,7 @@ impl PhysicalExpr for ScalarFunctionExpr { } } -/// decorates a function to handle [`ScalarValue`]s by coverting them to arrays before calling the function +/// decorates a function to handle [`ScalarValue`]s by converting them to arrays before calling the function /// and vice-versa after evaluation. pub fn make_scalar_function(inner: F) -> ScalarFunctionImplementation where diff --git a/datafusion/src/physical_plan/udf.rs b/datafusion/src/physical_plan/udf.rs index 9189da47bd6f8..a79c0a8a36059 100644 --- a/datafusion/src/physical_plan/udf.rs +++ b/datafusion/src/physical_plan/udf.rs @@ -43,6 +43,13 @@ pub struct ScalarUDF { /// Return type pub return_type: ReturnTypeFunction, /// actual implementation + /// + /// The fn param is the wrapped function but be aware that the function will + /// be passed with the slice / vec of columnar values (either scalar or array) + /// with the exception of zero param function, where a singular element vec + /// will be passed. In that case the single element is a null array to indicate + /// the batch's row count (so that the generative zero-argument function can know + /// the result array size). pub fun: ScalarFunctionImplementation, }