diff --git a/datafusion/physical-expr/src/datetime_expressions.rs b/datafusion/physical-expr/src/datetime_expressions.rs index bb8720cb8d00..3b61e7f48d59 100644 --- a/datafusion/physical-expr/src/datetime_expressions.rs +++ b/datafusion/physical-expr/src/datetime_expressions.rs @@ -17,6 +17,8 @@ //! DateTime expressions +use crate::datetime_expressions; +use crate::expressions::cast_column; use arrow::array::Float64Builder; use arrow::compute::cast; use arrow::{ @@ -954,6 +956,154 @@ where Ok(b.finish()) } +/// to_timestammp() SQL function implementation +pub fn to_timestamp_invoke(args: &[ColumnarValue]) -> Result { + if args.len() != 1 { + return internal_err!( + "to_timestamp function requires 1 arguments, got {}", + args.len() + ); + } + + match args[0].data_type() { + DataType::Int64 => { + cast_column(&args[0], &DataType::Timestamp(TimeUnit::Second, None), None) + } + DataType::Timestamp(_, None) => cast_column( + &args[0], + &DataType::Timestamp(TimeUnit::Nanosecond, None), + None, + ), + DataType::Utf8 => datetime_expressions::to_timestamp(args), + other => { + internal_err!( + "Unsupported data type {:?} for function to_timestamp", + other + ) + } + } +} + +/// to_timestamp_millis() SQL function implementation +pub fn to_timestamp_millis_invoke(args: &[ColumnarValue]) -> Result { + if args.len() != 1 { + return internal_err!( + "to_timestamp_millis function requires 1 argument, got {}", + args.len() + ); + } + + match args[0].data_type() { + DataType::Int64 | DataType::Timestamp(_, None) => cast_column( + &args[0], + &DataType::Timestamp(TimeUnit::Millisecond, None), + None, + ), + DataType::Utf8 => datetime_expressions::to_timestamp_millis(args), + other => { + internal_err!( + "Unsupported data type {:?} for function to_timestamp_millis", + other + ) + } + } +} + +/// to_timestamp_micros() SQL function implementation +pub fn to_timestamp_micros_invoke(args: &[ColumnarValue]) -> Result { + if args.len() != 1 { + return internal_err!( + "to_timestamp_micros function requires 1 argument, got {}", + args.len() + ); + } + + match args[0].data_type() { + DataType::Int64 | DataType::Timestamp(_, None) => cast_column( + &args[0], + &DataType::Timestamp(TimeUnit::Microsecond, None), + None, + ), + DataType::Utf8 => datetime_expressions::to_timestamp_micros(args), + other => { + internal_err!( + "Unsupported data type {:?} for function to_timestamp_micros", + other + ) + } + } +} + +/// to_timestamp_nanos() SQL function implementation +pub fn to_timestamp_nanos_invoke(args: &[ColumnarValue]) -> Result { + if args.len() != 1 { + return internal_err!( + "to_timestamp_nanos function requires 1 argument, got {}", + args.len() + ); + } + + match args[0].data_type() { + DataType::Int64 | DataType::Timestamp(_, None) => cast_column( + &args[0], + &DataType::Timestamp(TimeUnit::Nanosecond, None), + None, + ), + DataType::Utf8 => datetime_expressions::to_timestamp_nanos(args), + other => { + internal_err!( + "Unsupported data type {:?} for function to_timestamp_nanos", + other + ) + } + } +} + +/// to_timestamp_seconds() SQL function implementation +pub fn to_timestamp_seconds_invoke(args: &[ColumnarValue]) -> Result { + if args.len() != 1 { + return internal_err!( + "to_timestamp_seconds function requires 1 argument, got {}", + args.len() + ); + } + + match args[0].data_type() { + DataType::Int64 | DataType::Timestamp(_, None) => { + cast_column(&args[0], &DataType::Timestamp(TimeUnit::Second, None), None) + } + DataType::Utf8 => datetime_expressions::to_timestamp_seconds(args), + other => { + internal_err!( + "Unsupported data type {:?} for function to_timestamp_seconds", + other + ) + } + } +} + +/// from_unixtime() SQL function implementation +pub fn from_unixtime_invoke(args: &[ColumnarValue]) -> Result { + if args.len() != 1 { + return internal_err!( + "from_unixtime function requires 1 argument, got {}", + args.len() + ); + } + + match args[0].data_type() { + DataType::Int64 => { + cast_column(&args[0], &DataType::Timestamp(TimeUnit::Second, None), None) + } + other => { + internal_err!( + "Unsupported data type {:?} for function from_unixtime", + other + ) + } + } +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index f14bad093ac7..088bac100978 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -34,14 +34,12 @@ use crate::execution_props::ExecutionProps; use crate::sort_properties::SortProperties; use crate::{ array_expressions, conditional_expressions, datetime_expressions, - expressions::{cast_column, nullif_func}, - math_expressions, string_expressions, struct_expressions, PhysicalExpr, - ScalarFunctionExpr, + expressions::nullif_func, math_expressions, string_expressions, struct_expressions, + PhysicalExpr, ScalarFunctionExpr, }; use arrow::{ array::ArrayRef, compute::kernels::length::{bit_length, length}, - datatypes::TimeUnit, datatypes::{DataType, Int32Type, Int64Type, Schema}, }; use datafusion_common::{internal_err, DataFusionError, Result, ScalarValue}; @@ -71,143 +69,8 @@ pub fn create_physical_expr( let data_type = fun.return_type(&input_expr_types)?; - let fun_expr: ScalarFunctionImplementation = match fun { - // These functions need args and input schema to pick an implementation - // Unlike the string functions, which actually figure out the function to use with each array, - // here we return either a cast fn or string timestamp translation based on the expression data type - // so we don't have to pay a per-array/batch cost. - BuiltinScalarFunction::ToTimestamp => { - Arc::new(match input_phy_exprs[0].data_type(input_schema) { - Ok(DataType::Int64) => |col_values: &[ColumnarValue]| { - cast_column( - &col_values[0], - &DataType::Timestamp(TimeUnit::Second, None), - None, - ) - }, - Ok(DataType::Timestamp(_, None)) => |col_values: &[ColumnarValue]| { - cast_column( - &col_values[0], - &DataType::Timestamp(TimeUnit::Nanosecond, None), - None, - ) - }, - Ok(DataType::Utf8) => datetime_expressions::to_timestamp, - other => { - return internal_err!( - "Unsupported data type {other:?} for function to_timestamp" - ); - } - }) - } - BuiltinScalarFunction::ToTimestampMillis => { - Arc::new(match input_phy_exprs[0].data_type(input_schema) { - Ok(DataType::Int64) | Ok(DataType::Timestamp(_, None)) => { - |col_values: &[ColumnarValue]| { - cast_column( - &col_values[0], - &DataType::Timestamp(TimeUnit::Millisecond, None), - None, - ) - } - } - Ok(DataType::Utf8) => datetime_expressions::to_timestamp_millis, - other => { - return internal_err!( - "Unsupported data type {other:?} for function to_timestamp_millis" - ); - } - }) - } - BuiltinScalarFunction::ToTimestampMicros => { - Arc::new(match input_phy_exprs[0].data_type(input_schema) { - Ok(DataType::Int64) | Ok(DataType::Timestamp(_, None)) => { - |col_values: &[ColumnarValue]| { - cast_column( - &col_values[0], - &DataType::Timestamp(TimeUnit::Microsecond, None), - None, - ) - } - } - Ok(DataType::Utf8) => datetime_expressions::to_timestamp_micros, - other => { - return internal_err!( - "Unsupported data type {other:?} for function to_timestamp_micros" - ); - } - }) - } - BuiltinScalarFunction::ToTimestampNanos => { - Arc::new(match input_phy_exprs[0].data_type(input_schema) { - Ok(DataType::Int64) | Ok(DataType::Timestamp(_, None)) => { - |col_values: &[ColumnarValue]| { - cast_column( - &col_values[0], - &DataType::Timestamp(TimeUnit::Nanosecond, None), - None, - ) - } - } - Ok(DataType::Utf8) => datetime_expressions::to_timestamp_nanos, - other => { - return internal_err!( - "Unsupported data type {other:?} for function to_timestamp_nanos" - ); - } - }) - } - BuiltinScalarFunction::ToTimestampSeconds => Arc::new({ - match input_phy_exprs[0].data_type(input_schema) { - Ok(DataType::Int64) | Ok(DataType::Timestamp(_, None)) => { - |col_values: &[ColumnarValue]| { - cast_column( - &col_values[0], - &DataType::Timestamp(TimeUnit::Second, None), - None, - ) - } - } - Ok(DataType::Utf8) => datetime_expressions::to_timestamp_seconds, - other => { - return internal_err!( - "Unsupported data type {other:?} for function to_timestamp_seconds" - ); - } - } - }), - BuiltinScalarFunction::FromUnixtime => Arc::new({ - match input_phy_exprs[0].data_type(input_schema) { - Ok(DataType::Int64) => |col_values: &[ColumnarValue]| { - cast_column( - &col_values[0], - &DataType::Timestamp(TimeUnit::Second, None), - None, - ) - }, - other => { - return internal_err!( - "Unsupported data type {other:?} for function from_unixtime" - ); - } - } - }), - BuiltinScalarFunction::ArrowTypeof => { - let input_data_type = input_phy_exprs[0].data_type(input_schema)?; - Arc::new(move |_| { - Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(format!( - "{input_data_type}" - ))))) - }) - } - BuiltinScalarFunction::Abs => { - let input_data_type = input_phy_exprs[0].data_type(input_schema)?; - let abs_fun = math_expressions::create_abs_function(&input_data_type)?; - Arc::new(move |args| make_scalar_function(abs_fun)(args)) - } - // These don't need args and input schema - _ => create_physical_fun(fun, execution_props)?, - }; + let fun_expr: ScalarFunctionImplementation = + create_physical_fun(fun, execution_props)?; let monotonicity = fun.monotonicity(); @@ -397,6 +260,9 @@ pub fn create_physical_fun( ) -> Result { Ok(match fun { // math functions + BuiltinScalarFunction::Abs => { + Arc::new(|args| make_scalar_function(math_expressions::abs_invoke)(args)) + } BuiltinScalarFunction::Acos => Arc::new(math_expressions::acos), BuiltinScalarFunction::Asin => Arc::new(math_expressions::asin), BuiltinScalarFunction::Atan => Arc::new(math_expressions::atan), @@ -625,6 +491,24 @@ pub fn create_physical_fun( execution_props.query_execution_start_time, )) } + BuiltinScalarFunction::ToTimestamp => { + Arc::new(datetime_expressions::to_timestamp_invoke) + } + BuiltinScalarFunction::ToTimestampMillis => { + Arc::new(datetime_expressions::to_timestamp_millis_invoke) + } + BuiltinScalarFunction::ToTimestampMicros => { + Arc::new(datetime_expressions::to_timestamp_micros_invoke) + } + BuiltinScalarFunction::ToTimestampNanos => { + Arc::new(datetime_expressions::to_timestamp_nanos_invoke) + } + BuiltinScalarFunction::ToTimestampSeconds => { + Arc::new(datetime_expressions::to_timestamp_seconds_invoke) + } + BuiltinScalarFunction::FromUnixtime => { + Arc::new(datetime_expressions::from_unixtime_invoke) + } BuiltinScalarFunction::InitCap => Arc::new(|args| match args[0].data_type() { DataType::Utf8 => { make_scalar_function(string_expressions::initcap::)(args) @@ -927,11 +811,19 @@ pub fn create_physical_fun( }), BuiltinScalarFunction::Upper => Arc::new(string_expressions::upper), BuiltinScalarFunction::Uuid => Arc::new(string_expressions::uuid), - _ => { - return internal_err!( - "create_physical_fun: Unsupported scalar function {fun:?}" - ); - } + BuiltinScalarFunction::ArrowTypeof => Arc::new(move |args| { + if args.len() != 1 { + return internal_err!( + "arrow_typeof function requires 1 arguments, got {}", + args.len() + ); + } + + let input_data_type = args[0].data_type(); + Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(format!( + "{input_data_type}" + ))))) + }), }) } diff --git a/datafusion/physical-expr/src/math_expressions.rs b/datafusion/physical-expr/src/math_expressions.rs index 96f611e2b7b4..0b7bc34014f9 100644 --- a/datafusion/physical-expr/src/math_expressions.rs +++ b/datafusion/physical-expr/src/math_expressions.rs @@ -743,6 +743,18 @@ pub(super) fn create_abs_function( } } +/// abs() SQL function implementation +pub fn abs_invoke(args: &[ArrayRef]) -> Result { + if args.len() != 1 { + return internal_err!("abs function requires 1 argument, got {}", args.len()); + } + + let input_data_type = args[0].data_type(); + let abs_fun = create_abs_function(input_data_type)?; + + abs_fun(args) +} + #[cfg(test)] mod tests {