Skip to content

Commit

Permalink
add docs and comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiayu Liu committed May 15, 2021
1 parent b44238d commit efcdd4a
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 8 deletions.
39 changes: 31 additions & 8 deletions datafusion/src/physical_plan/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ use crate::{
scalar::ScalarValue,
};
use arrow::{
array::ArrayRef,
array::{ArrayRef, NullArray},
compute::kernels::length::{bit_length, length},
datatypes::TimeUnit,
datatypes::{DataType, Field, Int32Type, Int64Type, Schema},
record_batch::RecordBatch,
};
use fmt::{Debug, Formatter};
use std::convert::From;
use std::{any::Any, fmt, str::FromStr, sync::Arc};

/// A function's signature, which defines the function's supported argument types.
Expand All @@ -75,6 +76,13 @@ pub enum Signature {
}

/// Scalar function
///
/// The Fn param is the wrapped function but be aware that the function will
/// be passed with the slice / vec of columnar values (either scalar or array)
/// with the exception of zero param function, where a singular element vec
/// will be passed. In that case the single element is a null array to indicate
/// the batch's row count (so that the generative zero-argument function can know
/// the result array size).
pub type ScalarFunctionImplementation =
Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> + Send + Sync>;

Expand Down Expand Up @@ -1358,6 +1366,17 @@ impl fmt::Display for ScalarFunctionExpr {
}
}

/// null columnar values are implemented as a null array in order to pass batch
/// num_rows
type NullColumnarValue = ColumnarValue;

impl From<&RecordBatch> for NullColumnarValue {
fn from(batch: &RecordBatch) -> Self {
let num_rows = batch.num_rows();
ColumnarValue::Array(Arc::new(NullArray::new(num_rows)))
}
}

impl PhysicalExpr for ScalarFunctionExpr {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
Expand All @@ -1373,20 +1392,24 @@ impl PhysicalExpr for ScalarFunctionExpr {
}

fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
// evaluate the arguments
let inputs = self
.args
.iter()
.map(|e| e.evaluate(batch))
.collect::<Result<Vec<_>>>()?;
// evaluate the arguments, if there are no arguments we'll instead pass in a null array
// indicating the batch size (as a convention)
let inputs = match self.args.len() {
0 => vec![NullColumnarValue::from(batch)],
_ => self
.args
.iter()
.map(|e| e.evaluate(batch))
.collect::<Result<Vec<_>>>()?,
};

// evaluate the function
let fun = self.fun.as_ref();
(fun)(&inputs)
}
}

/// decorates a function to handle [`ScalarValue`]s by coverting them to arrays before calling the function
/// decorates a function to handle [`ScalarValue`]s by converting them to arrays before calling the function
/// and vice-versa after evaluation.
pub fn make_scalar_function<F>(inner: F) -> ScalarFunctionImplementation
where
Expand Down
7 changes: 7 additions & 0 deletions datafusion/src/physical_plan/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ pub struct ScalarUDF {
/// Return type
pub return_type: ReturnTypeFunction,
/// actual implementation
///
/// The fn param is the wrapped function but be aware that the function will
/// be passed with the slice / vec of columnar values (either scalar or array)
/// with the exception of zero param function, where a singular element vec
/// will be passed. In that case the single element is a null array to indicate
/// the batch's row count (so that the generative zero-argument function can know
/// the result array size).
pub fun: ScalarFunctionImplementation,
}

Expand Down

0 comments on commit efcdd4a

Please sign in to comment.