From ce089f4f267764b1d3ab5565d382230b0f6213de Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Wed, 12 May 2021 18:14:59 +0800 Subject: [PATCH 1/2] add docs and comments --- datafusion/src/physical_plan/functions.rs | 39 ++++++++++++++++++----- datafusion/src/physical_plan/udf.rs | 7 ++++ 2 files changed, 38 insertions(+), 8 deletions(-) diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs index 2e053a80976b..b495ec40b79c 100644 --- a/datafusion/src/physical_plan/functions.rs +++ b/datafusion/src/physical_plan/functions.rs @@ -44,13 +44,14 @@ use crate::{ scalar::ScalarValue, }; use arrow::{ - array::ArrayRef, + array::{ArrayRef, NullArray}, compute::kernels::length::{bit_length, length}, datatypes::TimeUnit, datatypes::{DataType, Field, Int32Type, Int64Type, Schema}, record_batch::RecordBatch, }; use fmt::{Debug, Formatter}; +use std::convert::From; use std::{any::Any, fmt, str::FromStr, sync::Arc}; /// A function's signature, which defines the function's supported argument types. @@ -76,6 +77,13 @@ pub enum Signature { } /// Scalar function +/// +/// The Fn param is the wrapped function but be aware that the function will +/// be passed with the slice / vec of columnar values (either scalar or array) +/// with the exception of zero param function, where a singular element vec +/// will be passed. In that case the single element is a null array to indicate +/// the batch's row count (so that the generative zero-argument function can know +/// the result array size). pub type ScalarFunctionImplementation = Arc Result + Send + Sync>; @@ -1371,6 +1379,17 @@ impl fmt::Display for ScalarFunctionExpr { } } +/// null columnar values are implemented as a null array in order to pass batch +/// num_rows +type NullColumnarValue = ColumnarValue; + +impl From<&RecordBatch> for NullColumnarValue { + fn from(batch: &RecordBatch) -> Self { + let num_rows = batch.num_rows(); + ColumnarValue::Array(Arc::new(NullArray::new(num_rows))) + } +} + impl PhysicalExpr for ScalarFunctionExpr { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { @@ -1386,12 +1405,16 @@ impl PhysicalExpr for ScalarFunctionExpr { } fn evaluate(&self, batch: &RecordBatch) -> Result { - // evaluate the arguments - let inputs = self - .args - .iter() - .map(|e| e.evaluate(batch)) - .collect::>>()?; + // evaluate the arguments, if there are no arguments we'll instead pass in a null array + // indicating the batch size (as a convention) + let inputs = match self.args.len() { + 0 => vec![NullColumnarValue::from(batch)], + _ => self + .args + .iter() + .map(|e| e.evaluate(batch)) + .collect::>>()?, + }; // evaluate the function let fun = self.fun.as_ref(); @@ -1399,7 +1422,7 @@ impl PhysicalExpr for ScalarFunctionExpr { } } -/// decorates a function to handle [`ScalarValue`]s by coverting them to arrays before calling the function +/// decorates a function to handle [`ScalarValue`]s by converting them to arrays before calling the function /// and vice-versa after evaluation. pub fn make_scalar_function(inner: F) -> ScalarFunctionImplementation where diff --git a/datafusion/src/physical_plan/udf.rs b/datafusion/src/physical_plan/udf.rs index 9189da47bd6f..a79c0a8a3605 100644 --- a/datafusion/src/physical_plan/udf.rs +++ b/datafusion/src/physical_plan/udf.rs @@ -43,6 +43,13 @@ pub struct ScalarUDF { /// Return type pub return_type: ReturnTypeFunction, /// actual implementation + /// + /// The fn param is the wrapped function but be aware that the function will + /// be passed with the slice / vec of columnar values (either scalar or array) + /// with the exception of zero param function, where a singular element vec + /// will be passed. In that case the single element is a null array to indicate + /// the batch's row count (so that the generative zero-argument function can know + /// the result array size). pub fun: ScalarFunctionImplementation, } From fe4a6dfd4fad737c0280eddf053de238ad8d929e Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sun, 16 May 2021 10:11:52 +0800 Subject: [PATCH 2/2] use supports_zero_argument --- datafusion/src/physical_plan/functions.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/datafusion/src/physical_plan/functions.rs b/datafusion/src/physical_plan/functions.rs index b495ec40b79c..c0c915f29a72 100644 --- a/datafusion/src/physical_plan/functions.rs +++ b/datafusion/src/physical_plan/functions.rs @@ -215,6 +215,14 @@ pub enum BuiltinScalarFunction { RegexpMatch, } +impl BuiltinScalarFunction { + /// an allowlist of functions to take zero arguments, so that they will get special treatment + /// while executing. + fn supports_zero_argument(&self) -> bool { + matches!(self, BuiltinScalarFunction::Now) + } +} + impl fmt::Display for BuiltinScalarFunction { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { // lowercase of the debug. @@ -1407,8 +1415,10 @@ impl PhysicalExpr for ScalarFunctionExpr { fn evaluate(&self, batch: &RecordBatch) -> Result { // evaluate the arguments, if there are no arguments we'll instead pass in a null array // indicating the batch size (as a convention) - let inputs = match self.args.len() { - 0 => vec![NullColumnarValue::from(batch)], + let inputs = match (self.args.len(), self.name.parse::()) { + (0, Ok(scalar_fun)) if scalar_fun.supports_zero_argument() => { + vec![NullColumnarValue::from(batch)] + } _ => self .args .iter()