Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create physical scalar expression in functions modules from string (name) #9892

Open
viirya opened this issue Mar 31, 2024 · 16 comments
Open
Labels
enhancement New feature or request

Comments

@viirya
Copy link
Member

viirya commented Mar 31, 2024

Is your feature request related to a problem or challenge?

Create physical scalar expression in functions modules from string

Currently more and more built-in scalar functions are moved to functions modules, e.g. #9435. It avoids using a long enum of all built-in scalar functions which is hard to maintain. But for Comet, we rely the ability to create a physical scalar expression from string (e.g., datepart).

Previously it is easy and just calls BuiltinScalarFunction::from_str to get BuiltinScalarFunction. But now I don't see such convenient function to do that.

FunctionRegistry provides udf which can return a reference to ScalarUDF. But it requires these UDFs must be registered. As we don't know what UDFs will be used, we need to register all built-in UDFs in the registry. The flaw is, it will create ScalarUDFs for all built-in UDFs even they are not actually used in the queries.

I think we still need an approach that can simply create a physical scalar expression in functions modules from string. So we can create corresponding ScalarUDF on demand.

Another approach might be to avoid creating ScalarUDFs when registering built-in scalar functions.

Avoid creating ScalarUDFs before they are actually used when registering in FunctionRegistry

Actually, I am also wondering if it is necessary to create and register all these ScalaUDFs in DataFusion's FunctionRegistry before these scalar UDFs are actually used.

For example, Spark's FunctionRegistry registers expression builders instead of creating actual expressions when registering built-in expressions. A built-in expression is created only if it is actually used by a query.

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

@viirya viirya added the enhancement New feature or request label Mar 31, 2024
@alamb
Copy link
Contributor

alamb commented Apr 1, 2024

I think we still need an approach that can simply create a physical scalar expression in functions modules from string. So we can create corresponding ScalarUDF on demand.

I think you can do this via one of these functions (I can't link to rustdocs as 37 hasn't yet been released and the functions are made via macro):

Screenshot 2024-04-01 at 11 51 27 AM

@viirya
Copy link
Member Author

viirya commented Apr 1, 2024

I think you can do this via one of these functions (I can't link to rustdocs as 37 hasn't yet been released and the functions are made via macro):

Hmm, I think you mean that expr_fn module exports individual scalar functions. But to get corresponding function from a string of the scalar function name, we need to write a function like:

fn string_to_scalar_udf(udf_name: &str) -> Arc<ScalarUDF> {
  match udf_name {
    "ascii" -> datafusion_functions::string::ascii(),
    ...
  }
}

It is doable thought it means we need to maintain a long list of matches, but I'm also wondering if there is a better built-in solution in DataFusion.

@Omega359
Copy link
Contributor

Omega359 commented Apr 1, 2024

The names of the functions would be in the list of udf's available in the sessionState

    /// Scalar functions that are registered with the context
    scalar_functions: HashMap<String, Arc<ScalarUDF>>,

@viirya
Copy link
Member Author

viirya commented Apr 1, 2024

Oh, scalar_functions is good to use for that. We can get particular ScalarUDF from it without matching scalar udf name outside DataFusion. Currently we use SessionState.udf to retrieve required ScalarUDF.

Although it is still not creating the ScalarUDF on demand.

As I described in the description, for queries which don't use most of these scalar udfs, it looks unnecessary to create and hold these ScalarUDFs in the registry.

I think a better approach should be to create ScalarUDF once it is really needed. This is how Spark handles function registry.

@Omega359
Copy link
Contributor

Omega359 commented Apr 1, 2024

Ah, I misread the description, apologies. It's an interesting idea. I don't see an easy solution right now especially since the udf's are held in a singleton once called (see https://github.com/apache/arrow-datafusion/blob/d8d521ac8b90002fa0ba1f91456051a9775ae193/datafusion/functions/src/macros.rs#L66) so any memory savings from not using the udf would evaporate after the first use.

I think it may be possible with a change to the make_udf_function to remove the singleton code however I'm not sure of the wisdom of that.

@alamb
Copy link
Contributor

alamb commented Apr 1, 2024

It is doable thought it means we need to maintain a long list of matches, but I'm also wondering if there is a better built-in solution in DataFusion.

I think adding a function that did this matching to datafusion seems like a good idea to me. You could probably write a test to ensure it was kept in sync with the list of functions pretty easily.

I think a better approach should be to create ScalarUDF once it is really needed. This is how Spark handles function registry.

What is the concern about creating a bunch of ScalarUDFs that don't get used on process start? Is it to reduce process startup time / memory overhead?

@viirya
Copy link
Member Author

viirya commented Apr 1, 2024

What is the concern about creating a bunch of ScalarUDFs that don't get used on process start? Is it to reduce process startup time / memory overhead?

Yea, I suppose that these ScalarUDFs occupy additional resources as they are not free to create and hold. The number of built-in scalar functions is somehow large and grows continuously. If we can make them inactively created, it might be better.

@alamb
Copy link
Contributor

alamb commented Apr 2, 2024

BTW while updating influxdb we ended up with a bunch of the following in our code. I think the API @viirya is describing on this ticket would help us too

	let date_bin = datafusion::functions::datetime::functions()
            .iter()
            .find(|fun| fun.name().eq("date_bin"))
            .expect("should have date_bin UDF")
            .to_owned();

@alamb alamb changed the title Create physical scalar expression in functions modules from string Create physical scalar expression in functions modules from string (name) Apr 2, 2024
@jayzhan211
Copy link
Contributor

jayzhan211 commented Apr 5, 2024

How about we introduce pub type ScalarFunctionImpl = Arc<dyn Fn() -> std::sync::Arc<datafusion_expr::ScalarUDF>>; , so we can register the closure first and create the udf on demand.

If I understand correctly, only at the point we call the closure, we will run this function.

            pub fn $SCALAR_UDF_FN() -> std::sync::Arc<datafusion_expr::ScalarUDF> {
                [< STATIC_ $UDF >]
                    .get_or_init(|| {
                        std::sync::Arc::new(datafusion_expr::ScalarUDF::new_from_impl(
                            <$UDF>::new(),
                        ))
                    })
                    .clone()
            }

https://users.rust-lang.org/t/cost-of-creating-closure-vs-cost-of-creating-struct/33101/5

@Omega359
Copy link
Contributor

Omega359 commented Apr 5, 2024

Note that the use of name -> function is used internally within scalar_function.rs:

https://github.com/apache/arrow-datafusion/blob/2dad90425bacb98a3c2a4214faad53850c93104e/datafusion/physical-expr/src/scalar_function.rs#L149

Having a good name -> fn lookup mechanism would help here I think.

@alamb
Copy link
Contributor

alamb commented Apr 5, 2024

How about we introduce pub type ScalarFunctionImpl = Arc<dyn Fn() -> std::sync::Arc<datafusion_expr::ScalarUDF>>; , so we can register the closure first and create the udf on demand.

I was thinking maybe we can even avoid new types / macros with a function like this:

fn get_udf(name: &str) -> Option<Arc<ScalarUDF>> {
  // build hash table of deferred functions that create functions on demand
  // (note this hash table would probably be built in a static OnceLock or something)
  let functions: HashMap<_, _> = [
    ("abs", Box::new(|| crate::math::abs()), 
    ("sin", Box::new(|| crate::math::sin()), 
    ("date_bin", Box::new(|| crate::datetime::date_bin()), 
  ...
 ].into_iter().collect();

  functions.get(name).map(|factory| (factor)())
}

Then we could then ensure this was in sync with a test something like

for expected_function in datafusion_functions::all_functions() {
  assert!(get_udf(expected_function.name()).is_some())
}

@jayzhan211
Copy link
Contributor

jayzhan211 commented Apr 6, 2024

I can work on this.
I plan to introduce get_udf that replaces register_all. We can just call get_udf if we need one.

@alamb
Copy link
Contributor

alamb commented Apr 6, 2024

I can work on this. I plan to introduce get_udf that replaces register_all. We can just call get_udf if we need one.

Thanks @jayzhan211

I think both patterns are needed -- one that gets all UDFs (what register_all currently does) as well as a "get me just a single udf"

@jayzhan211
Copy link
Contributor

    fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
        self.state.scalar_functions().get(name).cloned()
    }

I was considering replacing the get which is why I think register_all may not be needed. Let me try it out and see 👀 .

@jayzhan211
Copy link
Contributor

jayzhan211 commented Apr 6, 2024

I draft the idea here, I think the old function can be deprecated, but it will be an API change design, should we deprecate them?

use std::sync::OnceLock;

pub type ScalarFactory = Box<dyn Fn() -> Arc<ScalarUDF> + Send + Sync>;

/// HashMap Singleton for UDFs
///
/// Replace register_all with our built-in functions
/// Replace  scalar_functions: HashMap<String, Arc<ScalarUDF>> in SessionState
pub fn scalar_functions() -> &'static Mutex<HashMap<String, ScalarFactory>> {
    static FUNCTIONS: OnceLock<Mutex<HashMap<String, ScalarFactory>>> = OnceLock::new();
    FUNCTIONS.get_or_init(|| {
        let mut functions = HashMap::new();
        functions.insert(
            String::from("array_to_string"),
            Box::new(string::array_to_string_udf) as _,
        );
        // TODO: Add more builtin functions here
        Mutex::new(functions)
    })
}

// Get an UDF by name
//
// Replace with `get_udf`
// fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
//     self.state.scalar_functions().get(name).cloned()
// }
pub fn get_udf(name: &str) -> Option<Arc<ScalarUDF>> {
    scalar_functions().lock().unwrap().get(name).map(|f| f())
}

/// Register a single new UDF, so the user can register their own functions
/// 
/// Repalce old regsiter_udf
pub fn register_udf(name: &str, udf: ScalarFactory) -> Option<ScalarFactory> {
    scalar_functions().lock().unwrap().insert(name.to_string(), udf)
}

@alamb
Copy link
Contributor

alamb commented Apr 6, 2024

/// Register a single new UDF, so the user can register their own functions

I am not sure about this -- I think it would be better if the data in the module was static / not mutable at runtime. If users want to register their own functions, they can do so in a FunctionRegistry

I draft the idea here, I think the old function can be deprecated, but it will be an API change design, should we deprecate them?

How about we try what having both APIs would look like? Maybe it is too much duplication, but I bet it would be pretty minimal

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants