Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: scalar udf function argument handling #2304

Merged
merged 13 commits into from
Dec 28, 2023
Merged

Conversation

tychoish
Copy link
Contributor

In the discussion of #2998 it's clear that we've been handling scalar
udf arguments wrong when faced with a column rather than a single value.

The KDL functions are impacted by this, as are #2298 #2299 and #2301,
so I'll do some rebasing and re-homing those PRs.

Copy link
Member

@scsmithr scsmithr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks reasonable.

In the discussion of #2998 it's clear that we've been handling scalar
udf arguments wrong when faced with a column rather than a single value.

(Assuming #2298)

I read through that PR, I'm unsure what exactly is being fixed here. Is there a comment or quick example you can link to?

match input.get(n) {
Some(input) => match input {
ColumnarValue::Scalar(scalar) => Some(scalar.clone()),
ColumnarValue::Array(arr) => ScalarValue::try_from_array(arr, 0).ok(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the bug in the previous implementation: if you got a column reference previously we would only look at the first element in the column and ignore other values.

The change that follows allows us to process all of the values in the column, using the (newly) provided closure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can’t this simply be:

ColumnarValue::Array(arr) => Ok(ColumnarValue::Array(ScalarValue::iter_to_array(
    (0..arr.len()).map(|idx| -> Result<ScalarValue, ExtensionError> {
        Ok(op(ScalarValue::try_from_array(arr, idx)?)?)
    })
)?)),

@tychoish
Copy link
Contributor Author

just for better book keeping; this resolves the issue described here: #2298 (comment)

@tychoish tychoish enabled auto-merge (squash) December 27, 2023 16:26
@tychoish tychoish disabled auto-merge December 27, 2023 18:01
@tychoish tychoish enabled auto-merge (squash) December 27, 2023 18:01
Copy link
Contributor

@universalmind303 universalmind303 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach here is still fundamentally unsound. A scalar udf shouldn't operate on ScalarValues. It should operate on a ColumnarValue and ArrayRef. There is significant unnecessary overhead converting an ArrayRef to [ScalarValue].

the make_scalar_function provided by datafusion converts a ScalarValue to an ArrayRef with one item. We should do the same. the conversion from a single ScalarValue to ArrayRef is minimimal in comparison to the other way around.

Additionally, The code as written is still performing two loops in a performance critical operation when just one is needed.

It appears that there is some hangup on using ScalarValue where we shouldn't be using it at all.

Regardless of how we handle the processing/loop if a function is written to handle scalar data we have to convert the data to scalar at some point,

This is not the case, we should operate on the ArrayRef which allows us to downcast to the concrete Array type without having to convert to ScalarValue.

@scsmithr
Copy link
Member

The approach here is still fundamentally unsound.

Cory, is what we have here unsound in terms of correctness, or is the worry only about performance?

If it's just performance, I think optimizing this stuff is a candidate for a followup pr since the current pr as-is fixes a pretty substantial bug. I would argue that getting the fix in, writing tests, and then optimizing it to use the column values seems like the most reasonable order for this. And looking at how we optimize this could uncover some other areas that can be made faster too.

@universalmind303
Copy link
Contributor

universalmind303 commented Dec 27, 2023

The approach here is still fundamentally unsound.

Cory, is what we have here unsound in terms of correctness, or is the worry only about performance?

If it's just performance, I think optimizing this stuff is a candidate for a followup pr since the current pr as-is fixes a pretty substantial bug. I would argue that getting the fix in, writing tests, and then optimizing it to use the column values seems like the most reasonable order for this. And looking at how we optimize this could uncover some other areas that can be made faster too.

I think as it stands, the "correctness" is not in question as it produces the correct output, and the tests pass.

If the goal is just to fix the bug, I'd be fine doing it one off for those functions instead of this approach that is laying a foundation for other udfs to build off of. Since this is a highly performance sensitive area, the performance implications of this are too great to ignore for me to approve this.

Other than avoiding extra boilerplate, I don't see any reason to go with this approach instead of the approach I suggested which also aligns with the datafusion suggested approach (via the df examples).

Like I mentioned in another comment, I'd be fine with some zero cost, or near zero cost abstractions, but these abstractions are not justified by the added cost.

@universalmind303
Copy link
Contributor

universalmind303 commented Dec 27, 2023

If we really want udfs to operate on an iterator ScalarValue instead of ArrayRef, i'm totally fine with that as long as we do it in a near zero cost way. The ScalarValue enum itself doesn't introduce much overhead, so we could just pass in the return type to get_nth_scalar_value and build the arrays based on the return type with a single pass, and minimal allocations.

That way op: &dyn Fn(ScalarValue) -> Result<ScalarValue, BuiltinError>, still retains the same signature, and our cost of abstraction is much lower.

fn get_nth_scalar_value(
    input: &[ColumnarValue],
    n: usize,
    op: &dyn Fn(ScalarValue) -> Result<ScalarValue, BuiltinError>,
    return_type: DataType,
) -> Result<ColumnarValue, BuiltinError> {
    match input.get(n) {
        Some(input) => match input {
            ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(op(scalar.clone())?)),
            ColumnarValue::Array(arr) => {
                let arr = match arr.data_type() {
                    DataType::Null => new_null_array(&DataType::Null, arr.len()),
                    DataType::Utf8 => {
                        let arr = arr
                            .as_any()
                            .downcast_ref::<datafusion::arrow::array::StringArray>()
                            .unwrap();
                        match return_type {
                            DataType::Boolean => {
                                let mut builder =
                                    datafusion::arrow::array::BooleanBuilder::with_capacity(
                                        arr.len(),
                                    );
                                for idx in 0..arr.len() {
                                    // I'd prefer using `{unsafe arr.value_unchecked(idx)}` as `idx` will always be within bounds. There's no possible way for idx to be out of bounds within this loop. 
                                    let scalar: ScalarValue = arr.value(idx).into(); 
                                    match op(scalar)? {
                                        ScalarValue::Boolean(Some(v)) => builder.append_value(v),
                                        ScalarValue::Boolean(None) => builder.append_null(),
                                        _ => return Err(anyhow!("actual return type didnt match expected")),
                                    }
                                }
                                Arc::new(builder.finish())
                            }
                            _ => todo!("handle all return types"),
                        }
                    }
                    _ => todo!("handle all input types"),
                };

                Ok(ColumnarValue::Array(arr))
            }
        },
        None => Err(BuiltinError::MissingValueAtIndex(n)),
    }
}

We'd probably want to use some macros here as it is a ton of boilerplate writing this out for every input/output combination.

@tychoish
Copy link
Contributor Author

I think as it stands, the "correctness" is not in question as it produces the correct output, and the tests pass.

You spotted a bug in my initial PR that does impact the correctness, no one wrote tests. That doesn't make it not a bug.

The code as written is still performing two loops in a performance critical operation when just one The code as written is still performing two loops in a performance critical operation when just one is needed.

#2314 addresses this and removes the need for the loop extra copy of the data in working memory.


While I think this function is fine for many cases, I think it's fair that we shouldn't use use this function for most of the scalar UDFs that we write, but there are a bunch (in theory) where this model won't be the bottleneck. I think that holds true for the string handling functions certainly, others definitely less so.

@tychoish
Copy link
Contributor Author

tychoish commented Dec 28, 2023

Given that the compiler can't vectorize falible operations, and that we've merged in #2314, I don't think there are blockers to merging this.

@tychoish tychoish merged commit 2bd03aa into main Dec 28, 2023
13 checks passed
@tychoish tychoish deleted the tycho/scalar-value-handling branch December 28, 2023 17:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants