Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use NullArray to Pass row count to ScalarFunctions that take 0 arguments #328

Merged
merged 2 commits into from
May 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 41 additions & 8 deletions datafusion/src/physical_plan/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ use crate::{
scalar::ScalarValue,
};
use arrow::{
array::ArrayRef,
array::{ArrayRef, NullArray},
compute::kernels::length::{bit_length, length},
datatypes::TimeUnit,
datatypes::{DataType, Field, Int32Type, Int64Type, Schema},
record_batch::RecordBatch,
};
use fmt::{Debug, Formatter};
use std::convert::From;
use std::{any::Any, fmt, str::FromStr, sync::Arc};

/// A function's signature, which defines the function's supported argument types.
Expand All @@ -76,6 +77,13 @@ pub enum Signature {
}

/// Scalar function
///
/// The Fn param is the wrapped function but be aware that the function will
/// be passed with the slice / vec of columnar values (either scalar or array)
/// with the exception of zero param function, where a singular element vec
/// will be passed. In that case the single element is a null array to indicate
/// the batch's row count (so that the generative zero-argument function can know
/// the result array size).
pub type ScalarFunctionImplementation =
Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;

Expand Down Expand Up @@ -207,6 +215,14 @@ pub enum BuiltinScalarFunction {
RegexpMatch,
}

impl BuiltinScalarFunction {
/// an allowlist of functions to take zero arguments, so that they will get special treatment
/// while executing.
fn supports_zero_argument(&self) -> bool {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

had to add this whitelist because vararg funcs like array will not accept 0 args

matches!(self, BuiltinScalarFunction::Now)
}
}

impl fmt::Display for BuiltinScalarFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// lowercase of the debug.
Expand Down Expand Up @@ -1371,6 +1387,17 @@ impl fmt::Display for ScalarFunctionExpr {
}
}

/// null columnar values are implemented as a null array in order to pass batch
/// num_rows
type NullColumnarValue = ColumnarValue;

impl From<&RecordBatch> for NullColumnarValue {
fn from(batch: &RecordBatch) -> Self {
let num_rows = batch.num_rows();
ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
}
}

impl PhysicalExpr for ScalarFunctionExpr {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
Expand All @@ -1386,20 +1413,26 @@ impl PhysicalExpr for ScalarFunctionExpr {
}

fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
// evaluate the arguments
let inputs = self
.args
.iter()
.map(|e| e.evaluate(batch))
.collect::<Result<Vec<_>>>()?;
// evaluate the arguments, if there are no arguments we'll instead pass in a null array
// indicating the batch size (as a convention)
let inputs = match (self.args.len(), self.name.parse::<BuiltinScalarFunction>()) {
(0, Ok(scalar_fun)) if scalar_fun.supports_zero_argument() => {
vec![NullColumnarValue::from(batch)]
}
_ => self
.args
.iter()
.map(|e| e.evaluate(batch))
.collect::<Result<Vec<_>>>()?,
};

// evaluate the function
let fun = self.fun.as_ref();
(fun)(&inputs)
}
}

/// decorates a function to handle [`ScalarValue`]s by coverting them to arrays before calling the function
/// decorates a function to handle [`ScalarValue`]s by converting them to arrays before calling the function
/// and vice-versa after evaluation.
pub fn make_scalar_function<F>(inner: F) -> ScalarFunctionImplementation
where
Expand Down
7 changes: 7 additions & 0 deletions datafusion/src/physical_plan/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ pub struct ScalarUDF {
/// Return type
pub return_type: ReturnTypeFunction,
/// actual implementation
///
/// The fn param is the wrapped function but be aware that the function will
/// be passed with the slice / vec of columnar values (either scalar or array)
/// with the exception of zero param function, where a singular element vec
/// will be passed. In that case the single element is a null array to indicate
/// the batch's row count (so that the generative zero-argument function can know
/// the result array size).
pub fun: ScalarFunctionImplementation,
}

Expand Down