Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: Cleanup BuiltinScalarFunction's phys-expr creation #8114

Merged
merged 1 commit into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 150 additions & 0 deletions datafusion/physical-expr/src/datetime_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

//! DateTime expressions

use crate::datetime_expressions;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
use crate::datetime_expressions;

It seems that this use may not be required 🤔.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 but clippy did not flag it

use crate::expressions::cast_column;
use arrow::array::Float64Builder;
use arrow::compute::cast;
use arrow::{
Expand Down Expand Up @@ -954,6 +956,154 @@ where
Ok(b.finish())
}

/// to_timestammp() SQL function implementation
pub fn to_timestamp_invoke(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 1 {
return internal_err!(
"to_timestamp function requires 1 arguments, got {}",
args.len()
);
}

match args[0].data_type() {
DataType::Int64 => {
cast_column(&args[0], &DataType::Timestamp(TimeUnit::Second, None), None)
}
DataType::Timestamp(_, None) => cast_column(
&args[0],
&DataType::Timestamp(TimeUnit::Nanosecond, None),
None,
),
DataType::Utf8 => datetime_expressions::to_timestamp(args),
other => {
internal_err!(
"Unsupported data type {:?} for function to_timestamp",
other
)
}
}
}

/// to_timestamp_millis() SQL function implementation
pub fn to_timestamp_millis_invoke(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 1 {
return internal_err!(
"to_timestamp_millis function requires 1 argument, got {}",
args.len()
);
}

match args[0].data_type() {
DataType::Int64 | DataType::Timestamp(_, None) => cast_column(
&args[0],
&DataType::Timestamp(TimeUnit::Millisecond, None),
None,
),
DataType::Utf8 => datetime_expressions::to_timestamp_millis(args),
other => {
internal_err!(
"Unsupported data type {:?} for function to_timestamp_millis",
other
)
}
}
}

/// to_timestamp_micros() SQL function implementation
pub fn to_timestamp_micros_invoke(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 1 {
return internal_err!(
"to_timestamp_micros function requires 1 argument, got {}",
args.len()
);
}

match args[0].data_type() {
DataType::Int64 | DataType::Timestamp(_, None) => cast_column(
&args[0],
&DataType::Timestamp(TimeUnit::Microsecond, None),
None,
),
DataType::Utf8 => datetime_expressions::to_timestamp_micros(args),
other => {
internal_err!(
"Unsupported data type {:?} for function to_timestamp_micros",
other
)
}
}
}

/// to_timestamp_nanos() SQL function implementation
pub fn to_timestamp_nanos_invoke(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 1 {
return internal_err!(
"to_timestamp_nanos function requires 1 argument, got {}",
args.len()
);
}

match args[0].data_type() {
DataType::Int64 | DataType::Timestamp(_, None) => cast_column(
&args[0],
&DataType::Timestamp(TimeUnit::Nanosecond, None),
None,
),
DataType::Utf8 => datetime_expressions::to_timestamp_nanos(args),
other => {
internal_err!(
"Unsupported data type {:?} for function to_timestamp_nanos",
other
)
}
}
}

/// to_timestamp_seconds() SQL function implementation
pub fn to_timestamp_seconds_invoke(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 1 {
return internal_err!(
"to_timestamp_seconds function requires 1 argument, got {}",
args.len()
);
}

match args[0].data_type() {
DataType::Int64 | DataType::Timestamp(_, None) => {
cast_column(&args[0], &DataType::Timestamp(TimeUnit::Second, None), None)
}
DataType::Utf8 => datetime_expressions::to_timestamp_seconds(args),
other => {
internal_err!(
"Unsupported data type {:?} for function to_timestamp_seconds",
other
)
}
}
}

/// from_unixtime() SQL function implementation
pub fn from_unixtime_invoke(args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 1 {
return internal_err!(
"from_unixtime function requires 1 argument, got {}",
args.len()
);
}

match args[0].data_type() {
DataType::Int64 => {
cast_column(&args[0], &DataType::Timestamp(TimeUnit::Second, None), None)
}
other => {
internal_err!(
"Unsupported data type {:?} for function from_unixtime",
other
)
}
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
184 changes: 38 additions & 146 deletions datafusion/physical-expr/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,12 @@ use crate::execution_props::ExecutionProps;
use crate::sort_properties::SortProperties;
use crate::{
array_expressions, conditional_expressions, datetime_expressions,
expressions::{cast_column, nullif_func},
math_expressions, string_expressions, struct_expressions, PhysicalExpr,
ScalarFunctionExpr,
expressions::nullif_func, math_expressions, string_expressions, struct_expressions,
PhysicalExpr, ScalarFunctionExpr,
};
use arrow::{
array::ArrayRef,
compute::kernels::length::{bit_length, length},
datatypes::TimeUnit,
datatypes::{DataType, Int32Type, Int64Type, Schema},
};
use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue};
Expand Down Expand Up @@ -71,143 +69,8 @@ pub fn create_physical_expr(

let data_type = fun.return_type(&input_expr_types)?;

let fun_expr: ScalarFunctionImplementation = match fun {
// These functions need args and input schema to pick an implementation
// Unlike the string functions, which actually figure out the function to use with each array,
// here we return either a cast fn or string timestamp translation based on the expression data type
// so we don't have to pay a per-array/batch cost.
BuiltinScalarFunction::ToTimestamp => {
Arc::new(match input_phy_exprs[0].data_type(input_schema) {
Ok(DataType::Int64) => |col_values: &[ColumnarValue]| {
cast_column(
&col_values[0],
&DataType::Timestamp(TimeUnit::Second, None),
None,
)
},
Ok(DataType::Timestamp(_, None)) => |col_values: &[ColumnarValue]| {
cast_column(
&col_values[0],
&DataType::Timestamp(TimeUnit::Nanosecond, None),
None,
)
},
Ok(DataType::Utf8) => datetime_expressions::to_timestamp,
other => {
return internal_err!(
"Unsupported data type {other:?} for function to_timestamp"
);
}
})
}
BuiltinScalarFunction::ToTimestampMillis => {
Arc::new(match input_phy_exprs[0].data_type(input_schema) {
Ok(DataType::Int64) | Ok(DataType::Timestamp(_, None)) => {
|col_values: &[ColumnarValue]| {
cast_column(
&col_values[0],
&DataType::Timestamp(TimeUnit::Millisecond, None),
None,
)
}
}
Ok(DataType::Utf8) => datetime_expressions::to_timestamp_millis,
other => {
return internal_err!(
"Unsupported data type {other:?} for function to_timestamp_millis"
);
}
})
}
BuiltinScalarFunction::ToTimestampMicros => {
Arc::new(match input_phy_exprs[0].data_type(input_schema) {
Ok(DataType::Int64) | Ok(DataType::Timestamp(_, None)) => {
|col_values: &[ColumnarValue]| {
cast_column(
&col_values[0],
&DataType::Timestamp(TimeUnit::Microsecond, None),
None,
)
}
}
Ok(DataType::Utf8) => datetime_expressions::to_timestamp_micros,
other => {
return internal_err!(
"Unsupported data type {other:?} for function to_timestamp_micros"
);
}
})
}
BuiltinScalarFunction::ToTimestampNanos => {
Arc::new(match input_phy_exprs[0].data_type(input_schema) {
Ok(DataType::Int64) | Ok(DataType::Timestamp(_, None)) => {
|col_values: &[ColumnarValue]| {
cast_column(
&col_values[0],
&DataType::Timestamp(TimeUnit::Nanosecond, None),
None,
)
}
}
Ok(DataType::Utf8) => datetime_expressions::to_timestamp_nanos,
other => {
return internal_err!(
"Unsupported data type {other:?} for function to_timestamp_nanos"
);
}
})
}
BuiltinScalarFunction::ToTimestampSeconds => Arc::new({
match input_phy_exprs[0].data_type(input_schema) {
Ok(DataType::Int64) | Ok(DataType::Timestamp(_, None)) => {
|col_values: &[ColumnarValue]| {
cast_column(
&col_values[0],
&DataType::Timestamp(TimeUnit::Second, None),
None,
)
}
}
Ok(DataType::Utf8) => datetime_expressions::to_timestamp_seconds,
other => {
return internal_err!(
"Unsupported data type {other:?} for function to_timestamp_seconds"
);
}
}
}),
BuiltinScalarFunction::FromUnixtime => Arc::new({
match input_phy_exprs[0].data_type(input_schema) {
Ok(DataType::Int64) => |col_values: &[ColumnarValue]| {
cast_column(
&col_values[0],
&DataType::Timestamp(TimeUnit::Second, None),
None,
)
},
other => {
return internal_err!(
"Unsupported data type {other:?} for function from_unixtime"
);
}
}
}),
BuiltinScalarFunction::ArrowTypeof => {
let input_data_type = input_phy_exprs[0].data_type(input_schema)?;
Arc::new(move |_| {
Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(format!(
"{input_data_type}"
)))))
})
}
BuiltinScalarFunction::Abs => {
let input_data_type = input_phy_exprs[0].data_type(input_schema)?;
let abs_fun = math_expressions::create_abs_function(&input_data_type)?;
Arc::new(move |args| make_scalar_function(abs_fun)(args))
}
// These don't need args and input schema
_ => create_physical_fun(fun, execution_props)?,
};
let fun_expr: ScalarFunctionImplementation =
create_physical_fun(fun, execution_props)?;

let monotonicity = fun.monotonicity();

Expand Down Expand Up @@ -397,6 +260,9 @@ pub fn create_physical_fun(
) -> Result<ScalarFunctionImplementation> {
Ok(match fun {
// math functions
BuiltinScalarFunction::Abs => {
Arc::new(|args| make_scalar_function(math_expressions::abs_invoke)(args))
}
BuiltinScalarFunction::Acos => Arc::new(math_expressions::acos),
BuiltinScalarFunction::Asin => Arc::new(math_expressions::asin),
BuiltinScalarFunction::Atan => Arc::new(math_expressions::atan),
Expand Down Expand Up @@ -625,6 +491,24 @@ pub fn create_physical_fun(
execution_props.query_execution_start_time,
))
}
BuiltinScalarFunction::ToTimestamp => {
Arc::new(datetime_expressions::to_timestamp_invoke)
}
BuiltinScalarFunction::ToTimestampMillis => {
Arc::new(datetime_expressions::to_timestamp_millis_invoke)
}
BuiltinScalarFunction::ToTimestampMicros => {
Arc::new(datetime_expressions::to_timestamp_micros_invoke)
}
BuiltinScalarFunction::ToTimestampNanos => {
Arc::new(datetime_expressions::to_timestamp_nanos_invoke)
}
BuiltinScalarFunction::ToTimestampSeconds => {
Arc::new(datetime_expressions::to_timestamp_seconds_invoke)
}
BuiltinScalarFunction::FromUnixtime => {
Arc::new(datetime_expressions::from_unixtime_invoke)
}
BuiltinScalarFunction::InitCap => Arc::new(|args| match args[0].data_type() {
DataType::Utf8 => {
make_scalar_function(string_expressions::initcap::<i32>)(args)
Expand Down Expand Up @@ -927,11 +811,19 @@ pub fn create_physical_fun(
}),
BuiltinScalarFunction::Upper => Arc::new(string_expressions::upper),
BuiltinScalarFunction::Uuid => Arc::new(string_expressions::uuid),
_ => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 for removing the catchall

return internal_err!(
"create_physical_fun: Unsupported scalar function {fun:?}"
);
}
BuiltinScalarFunction::ArrowTypeof => Arc::new(move |args| {
if args.len() != 1 {
return internal_err!(
"arrow_typeof function requires 1 arguments, got {}",
args.len()
);
}

let input_data_type = args[0].data_type();
Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(format!(
"{input_data_type}"
)))))
}),
})
}

Expand Down
Loading