Skip to content

Commit

Permalink
Create unicode module in datafusion/functions/src/unicode and unicode…
Browse files Browse the repository at this point in the history
…_expressions feature flag, move char_length function (apache#9825)

* Fix to_timestamp benchmark

* Remove reference to simd and nightly build as simd is no longer an available feature in DataFusion and building with nightly may not be a good recommendation when getting started.

* Fixed missing trim() function.

* Create unicode module in datafusion/functions/src/unicode and unicode_expressions feature flag, move char_length function
  • Loading branch information
Omega359 authored and Lordworms committed Apr 1, 2024
1 parent b213763 commit 5c267cd
Show file tree
Hide file tree
Showing 36 changed files with 484 additions and 318 deletions.
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ unicode_expressions = [
"datafusion-physical-expr/unicode_expressions",
"datafusion-optimizer/unicode_expressions",
"datafusion-sql/unicode_expressions",
"datafusion-functions/unicode_expressions",
]

[dependencies]
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/dataframe/dataframe_functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use datafusion::assert_batches_eq;
use datafusion_common::DFSchema;
use datafusion_expr::expr::Alias;
use datafusion_expr::{approx_median, cast, ExprSchemable};
use datafusion_functions::unicode::expr_fn::character_length;

fn test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Expand Down
14 changes: 1 addition & 13 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ pub enum BuiltinScalarFunction {
Cot,

// string functions
/// character_length
CharacterLength,
/// concat
Concat,
/// concat_ws
Expand Down Expand Up @@ -218,7 +216,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Cbrt => Volatility::Immutable,
BuiltinScalarFunction::Cot => Volatility::Immutable,
BuiltinScalarFunction::Trunc => Volatility::Immutable,
BuiltinScalarFunction::CharacterLength => Volatility::Immutable,
BuiltinScalarFunction::Concat => Volatility::Immutable,
BuiltinScalarFunction::ConcatWithSeparator => Volatility::Immutable,
BuiltinScalarFunction::EndsWith => Volatility::Immutable,
Expand Down Expand Up @@ -257,9 +254,6 @@ impl BuiltinScalarFunction {
// the return type of the built in function.
// Some built-in functions' return type depends on the incoming type.
match self {
BuiltinScalarFunction::CharacterLength => {
utf8_to_int_type(&input_expr_types[0], "character_length")
}
BuiltinScalarFunction::Coalesce => {
// COALESCE has multiple args and they might get coerced, get a preview of this
let coerced_types = data_types(input_expr_types, &self.signature());
Expand Down Expand Up @@ -367,9 +361,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Coalesce => {
Signature::variadic_equal(self.volatility())
}
BuiltinScalarFunction::CharacterLength
| BuiltinScalarFunction::InitCap
| BuiltinScalarFunction::Reverse => {
BuiltinScalarFunction::InitCap | BuiltinScalarFunction::Reverse => {
Signature::uniform(1, vec![Utf8, LargeUtf8], self.volatility())
}
BuiltinScalarFunction::Lpad | BuiltinScalarFunction::Rpad => {
Expand Down Expand Up @@ -584,10 +576,6 @@ impl BuiltinScalarFunction {
// conditional functions
BuiltinScalarFunction::Coalesce => &["coalesce"],

// string functions
BuiltinScalarFunction::CharacterLength => {
&["character_length", "char_length", "length"]
}
BuiltinScalarFunction::Concat => &["concat"],
BuiltinScalarFunction::ConcatWithSeparator => &["concat_ws"],
BuiltinScalarFunction::EndsWith => &["ends_with"],
Expand Down
8 changes: 0 additions & 8 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,13 +577,6 @@ scalar_expr!(Power, power, base exponent, "`base` raised to the power of `expone
scalar_expr!(Atan2, atan2, y x, "inverse tangent of a division given in the argument");
scalar_expr!(Log, log, base x, "logarithm of a `x` for a particular `base`");

// string functions
scalar_expr!(
CharacterLength,
character_length,
string,
"the number of characters in the `string`"
);
scalar_expr!(InitCap, initcap, string, "converts the first letter of each word in `string` in uppercase and the remaining characters in lowercase");
scalar_expr!(Left, left, string n, "returns the first `n` characters in the `string`");
scalar_expr!(Reverse, reverse, string, "reverses the `string`");
Expand Down Expand Up @@ -1032,7 +1025,6 @@ mod test {
test_scalar_expr!(Nanvl, nanvl, x, y);
test_scalar_expr!(Iszero, iszero, input);

test_scalar_expr!(CharacterLength, character_length, string);
test_scalar_expr!(Gcd, gcd, arg_1, arg_2);
test_scalar_expr!(Lcm, lcm, arg_1, arg_2);
test_scalar_expr!(InitCap, initcap, string);
Expand Down
4 changes: 4 additions & 0 deletions datafusion/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ default = [
"regex_expressions",
"crypto_expressions",
"string_expressions",
"unicode_expressions",
]
# enable encode/decode functions
encoding_expressions = ["base64", "hex"]
Expand All @@ -52,6 +53,8 @@ math_expressions = []
regex_expressions = ["regex"]
# enable string functions
string_expressions = []
# enable unicode functions
unicode_expressions = ["unicode-segmentation"]

[lib]
name = "datafusion_functions"
Expand All @@ -75,6 +78,7 @@ log = { workspace = true }
md-5 = { version = "^0.10.0", optional = true }
regex = { version = "1.8", optional = true }
sha2 = { version = "^0.10.1", optional = true }
unicode-segmentation = { version = "^1.7.1", optional = true }
uuid = { version = "1.7", features = ["v4"] }

[dev-dependencies]
Expand Down
9 changes: 9 additions & 0 deletions datafusion/functions/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ make_stub_package!(regex, "regex_expressions");
pub mod crypto;
make_stub_package!(crypto, "crypto_expressions");

#[cfg(feature = "unicode_expressions")]
pub mod unicode;
make_stub_package!(unicode, "unicode_expressions");

mod utils;

/// Fluent-style API for creating `Expr`s
pub mod expr_fn {
#[cfg(feature = "core_expressions")]
Expand All @@ -140,6 +146,8 @@ pub mod expr_fn {
pub use super::regex::expr_fn::*;
#[cfg(feature = "string_expressions")]
pub use super::string::expr_fn::*;
#[cfg(feature = "unicode_expressions")]
pub use super::unicode::expr_fn::*;
}

/// Registers all enabled packages with a [`FunctionRegistry`]
Expand All @@ -151,6 +159,7 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> {
.chain(math::functions())
.chain(regex::functions())
.chain(crypto::functions())
.chain(unicode::functions())
.chain(string::functions());

all_functions.try_for_each(|udf| {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions/src/string/ascii.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::string::common::make_scalar_function;
use crate::utils::make_scalar_function;
use arrow::array::Int32Array;
use arrow::array::{ArrayRef, OffsetSizeTrait};
use arrow::datatypes::DataType;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/functions/src/string/bit_length.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
// specific language governing permissions and limitations
// under the License.

use arrow::compute::kernels::length::bit_length;
use std::any::Any;

use arrow::compute::kernels::length::bit_length;
use arrow::datatypes::DataType;

use datafusion_common::{exec_err, Result, ScalarValue};
use datafusion_expr::{ColumnarValue, Volatility};
use datafusion_expr::{ScalarUDFImpl, Signature};

use crate::string::common::*;
use crate::utils::utf8_to_int_type;

#[derive(Debug)]
pub(super) struct BitLengthFunc {
Expand Down
1 change: 1 addition & 0 deletions datafusion/functions/src/string/btrim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use datafusion_expr::{ColumnarValue, Volatility};
use datafusion_expr::{ScalarUDFImpl, Signature};

use crate::string::common::*;
use crate::utils::{make_scalar_function, utf8_to_str_type};

/// Returns the longest string with leading and trailing characters removed. If the characters are not specified, whitespace is removed.
/// btrim('xyxtrimyyx', 'xyz') = 'trim'
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions/src/string/chr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use datafusion_common::{exec_err, Result};
use datafusion_expr::{ColumnarValue, Volatility};
use datafusion_expr::{ScalarUDFImpl, Signature};

use crate::string::common::*;
use crate::utils::make_scalar_function;

/// Returns the character with the given code. chr(0) is disallowed because text data types cannot store that character.
/// chr(65) = 'A'
Expand Down
158 changes: 1 addition & 157 deletions datafusion/functions/src/string/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ use arrow::datatypes::DataType;
use datafusion_common::cast::as_generic_string_array;
use datafusion_common::Result;
use datafusion_common::{exec_err, ScalarValue};
use datafusion_expr::{ColumnarValue, ScalarFunctionImplementation};
use datafusion_physical_expr::functions::Hint;
use datafusion_expr::ColumnarValue;

pub(crate) enum TrimType {
Left,
Expand Down Expand Up @@ -98,52 +97,6 @@ pub(crate) fn general_trim<T: OffsetSizeTrait>(
}
}

/// Creates a function to identify the optimal return type of a string function given
/// the type of its first argument.
///
/// If the input type is `LargeUtf8` or `LargeBinary` the return type is
/// `$largeUtf8Type`,
///
/// If the input type is `Utf8` or `Binary` the return type is `$utf8Type`,
macro_rules! get_optimal_return_type {
($FUNC:ident, $largeUtf8Type:expr, $utf8Type:expr) => {
pub(crate) fn $FUNC(arg_type: &DataType, name: &str) -> Result<DataType> {
Ok(match arg_type {
// LargeBinary inputs are automatically coerced to Utf8
DataType::LargeUtf8 | DataType::LargeBinary => $largeUtf8Type,
// Binary inputs are automatically coerced to Utf8
DataType::Utf8 | DataType::Binary => $utf8Type,
DataType::Null => DataType::Null,
DataType::Dictionary(_, value_type) => match **value_type {
DataType::LargeUtf8 | DataType::LargeBinary => $largeUtf8Type,
DataType::Utf8 | DataType::Binary => $utf8Type,
DataType::Null => DataType::Null,
_ => {
return datafusion_common::exec_err!(
"The {} function can only accept strings, but got {:?}.",
name.to_uppercase(),
**value_type
);
}
},
data_type => {
return datafusion_common::exec_err!(
"The {} function can only accept strings, but got {:?}.",
name.to_uppercase(),
data_type
);
}
})
}
};
}

// `utf8_to_str_type`: returns either a Utf8 or LargeUtf8 based on the input type size.
get_optimal_return_type!(utf8_to_str_type, DataType::LargeUtf8, DataType::Utf8);

// `utf8_to_int_type`: returns either a Int32 or Int64 based on the input type size.
get_optimal_return_type!(utf8_to_int_type, DataType::Int64, DataType::Int32);

/// applies a unary expression to `args[0]` that is expected to be downcastable to
/// a `GenericStringArray` and returns a `GenericStringArray` (which may have a different offset)
/// # Errors
Expand Down Expand Up @@ -221,112 +174,3 @@ where
},
}
}

pub(super) fn make_scalar_function<F>(
inner: F,
hints: Vec<Hint>,
) -> ScalarFunctionImplementation
where
F: Fn(&[ArrayRef]) -> Result<ArrayRef> + Sync + Send + 'static,
{
Arc::new(move |args: &[ColumnarValue]| {
// first, identify if any of the arguments is an Array. If yes, store its `len`,
// as any scalar will need to be converted to an array of len `len`.
let len = args
.iter()
.fold(Option::<usize>::None, |acc, arg| match arg {
ColumnarValue::Scalar(_) => acc,
ColumnarValue::Array(a) => Some(a.len()),
});

let is_scalar = len.is_none();

let inferred_length = len.unwrap_or(1);
let args = args
.iter()
.zip(hints.iter().chain(std::iter::repeat(&Hint::Pad)))
.map(|(arg, hint)| {
// Decide on the length to expand this scalar to depending
// on the given hints.
let expansion_len = match hint {
Hint::AcceptsSingular => 1,
Hint::Pad => inferred_length,
};
arg.clone().into_array(expansion_len)
})
.collect::<Result<Vec<_>>>()?;

let result = (inner)(&args);
if is_scalar {
// If all inputs are scalar, keeps output as scalar
let result = result.and_then(|arr| ScalarValue::try_from_array(&arr, 0));
result.map(ColumnarValue::Scalar)
} else {
result.map(ColumnarValue::Array)
}
})
}

#[cfg(test)]
pub mod test {
/// $FUNC ScalarUDFImpl to test
/// $ARGS arguments (vec) to pass to function
/// $EXPECTED a Result<ColumnarValue>
/// $EXPECTED_TYPE is the expected value type
/// $EXPECTED_DATA_TYPE is the expected result type
/// $ARRAY_TYPE is the column type after function applied
macro_rules! test_function {
($FUNC:expr, $ARGS:expr, $EXPECTED:expr, $EXPECTED_TYPE:ty, $EXPECTED_DATA_TYPE:expr, $ARRAY_TYPE:ident) => {
let expected: Result<Option<$EXPECTED_TYPE>> = $EXPECTED;
let func = $FUNC;

let type_array = $ARGS.iter().map(|arg| arg.data_type()).collect::<Vec<_>>();
let return_type = func.return_type(&type_array);

match expected {
Ok(expected) => {
assert_eq!(return_type.is_ok(), true);
assert_eq!(return_type.unwrap(), $EXPECTED_DATA_TYPE);

let result = func.invoke($ARGS);
assert_eq!(result.is_ok(), true);

let len = $ARGS
.iter()
.fold(Option::<usize>::None, |acc, arg| match arg {
ColumnarValue::Scalar(_) => acc,
ColumnarValue::Array(a) => Some(a.len()),
});
let inferred_length = len.unwrap_or(1);
let result = result.unwrap().clone().into_array(inferred_length).expect("Failed to convert to array");
let result = result.as_any().downcast_ref::<$ARRAY_TYPE>().expect("Failed to convert to type");

// value is correct
match expected {
Some(v) => assert_eq!(result.value(0), v),
None => assert!(result.is_null(0)),
};
}
Err(expected_error) => {
if return_type.is_err() {
match return_type {
Ok(_) => assert!(false, "expected error"),
Err(error) => { datafusion_common::assert_contains!(expected_error.strip_backtrace(), error.strip_backtrace()); }
}
}
else {
// invoke is expected error - cannot use .expect_err() due to Debug not being implemented
match func.invoke($ARGS) {
Ok(_) => assert!(false, "expected error"),
Err(error) => {
assert!(expected_error.strip_backtrace().starts_with(&error.strip_backtrace()));
}
}
}
}
};
};
}

pub(crate) use test_function;
}
3 changes: 1 addition & 2 deletions datafusion/functions/src/string/levenshtein.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ use std::sync::Arc;
use arrow::array::{ArrayRef, Int32Array, Int64Array, OffsetSizeTrait};
use arrow::datatypes::DataType;

use crate::utils::{make_scalar_function, utf8_to_int_type};
use datafusion_common::cast::as_generic_string_array;
use datafusion_common::utils::datafusion_strsim;
use datafusion_common::{exec_err, Result};
use datafusion_expr::ColumnarValue;
use datafusion_expr::TypeSignature::*;
use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};

use crate::string::common::{make_scalar_function, utf8_to_int_type};

#[derive(Debug)]
pub(super) struct LevenshteinFunc {
signature: Signature,
Expand Down
Loading

0 comments on commit 5c267cd

Please sign in to comment.