Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#554: Lead/lag window function with offset and default value arguments #687

Merged
merged 1 commit into from
Jul 14, 2021

Conversation

jgoday
Copy link
Contributor

@jgoday jgoday commented Jul 5, 2021

Which issue does this PR close?

#Closes #554.

Rationale for this change

Implement offset and default value optional arguments in lead/lag window functions.

What changes are included in this PR?

Changes in lead/lag pub fn signatures.

Are there any user-facing changes?

@github-actions github-actions bot added the datafusion Changes in the datafusion crate label Jul 5, 2021
@jgoday
Copy link
Contributor Author

jgoday commented Jul 5, 2021

I have a few doubts:

  1. Lag/Lead signatures has to be changed to support 2nd/3nd optional arguments, is this correct or am I misunderstanding the use of Signature::OneOf ?
Signature::OneOf(vec![
    Signature::Any(1),
    Signature::Any(2),
    Signature::Any(3),
])
  1. If previous point is correct, then evaluation of Signature::OneOf in type_coercion::get_valid_types has to be changed to something like this
Signature::OneOf(types) => {
    let mut r = vec![];
    for s in types {
        // we must ensure that one of the signatures is valid against the current_types
        if let Ok(valid_types) = get_valid_types(s, current_types) {
            r.extend(valid_types);
        }
    }
    r
}

original code is currently forcing all options within Signature::OneOf to be valid

Signature::OneOf(types) => {
    let mut r = vec![];
    for s in types {
        r.extend(get_valid_types(s, current_types)?);
    }
    r
}
  1. I'd like to test create_built_in_window_expr with the optional arguments on and off, to match against the lead/lag signatures,
    but How can I test the resulting WindowShift struct?
    Right now, I can only try that the presence or absence of this optional arguments don't panic in test_create_window_exp_lead_no_args/test_create_window_exp_lead_with_args

  2. Had to copy and change the original arrow::compute::kernels::window::shift to allow filling the start/end nulls with the default value (this new fn is called shift_with_default_value inside lead_lag crate)

@alamb
Copy link
Contributor

alamb commented Jul 6, 2021

FYI @jimexist -- do you have some time to review this code?

}

impl PartitionEvaluator for WindowShiftEvaluator {
fn evaluate_partition(&self, partition: Range<usize>) -> Result<ArrayRef> {
let value = &self.values[0];
let value = value.slice(partition.start, partition.end - partition.start);
shift(value.as_ref(), self.shift_offset).map_err(DataFusionError::ArrowError)
if let Some(default_value) = self.default_value.clone() {
shift_with_default_value(&value, self.shift_offset, default_value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in both branches you can just call this shift_with_default_value. if the default value isn't present, you can unwrap or provide a null scalar value

Copy link
Contributor Author

@jgoday jgoday Jul 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the conditional statement and changed to one single call to shift_with_default_value with the scalar_value as an option

@@ -131,7 +131,10 @@ fn get_valid_types(
Signature::OneOf(types) => {
let mut r = vec![];
for s in types {
r.extend(get_valid_types(s, current_types)?);
if let Ok(valid_types) = get_valid_types(s, current_types) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I have misunderstood it, but in the original code, if one of the signatures within OneOf fails then the whole get_valid_types returns an error, isn't it ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no you were totally correct in that the original code didn't work. i wasn't seeing this the first round.

i was thinking that you'd need to remove the commented out line below. also do you mind adding a unit test for this case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can also use filter_map and then take first

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks ! changed to filter_map.

Copy link
Member

@jimexist jimexist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change looks generally okay, thanks for the effort!

I would suggest: replacing branching with one call and also add a case in the integration test folder, esp.:

  1. out of bounds shift offset,
  2. negative bounds
  3. default value being null

@jimexist
Copy link
Member

jimexist commented Jul 6, 2021

Lag/Lead signatures has to be changed to support 2nd/3nd optional arguments, is this correct or am I misunderstanding the use of Signature::OneOf ?

that part was confusing to me as well. i believe a most elegant way is to create support for heterogenous support for function arguments

I'd like to test create_built_in_window_expr with the optional arguments on and off, to match against the lead/lag signatures,

i'm okay with integration tests that cover these

Had to copy and change the original arrow::compute::kernels::window::shift to allow filling the start/end nulls with the default value (this new fn is called shift_with_default_value inside lead_lag crate)

changing arrow-rs repo is one way to achieve it although it'll needs two more weeks to be released. i'm okay with keeping the copied and modified version here.

@jgoday
Copy link
Contributor Author

jgoday commented Jul 6, 2021

this change looks generally okay, thanks for the effort!

I would suggest: replacing branching with one call and also add a case in the integration test folder, esp.:

1. out of bounds shift offset,

2. negative bounds

3. default value being null

Thank you @jimexist.
I have followed your advices and changed the type_coercion::get_valid_types to use filter_map and switch the two conditional branches to one single call to shift_with_default_value. I will try to look the integration tests later today.

Copy link
Member

@jimexist jimexist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implementation looks great, 👍 thanks!

would be great to add integration test coverage as well before merge

default_value: Option<ScalarValue>,
}

fn shift_with_default_value(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can add a comment of todo to push this upstream to arrow-rs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a TODO comment

let shift_offset = coerced_args
.get(1)
.map(|v| {
v.as_any()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feel like you can have a helper function to extract argument as Result<Literal>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, created get_scalar_value_from_args helper function.

@jgoday
Copy link
Contributor Author

jgoday commented Jul 8, 2021

@jimexist I was trying to execute the integration tests manually,
executing test_psql_parity.py with the following sql

SELECT
  c8,
  LEAD(c8) OVER () next_c8,
  LEAD(c8, 10, 10) OVER() next_10_c8,
  LEAD(c8, 100, 10) OVER() next_overflow_c8,
  LAG(c8) OVER() prev_c8,
  LAG(c8, -2, 0) OVER() AS prev_2_c8,
  LAG(c8, -200, 10) OVER() AS prev_overflow_c8

FROM test
ORDER BY c8;

There was some errors: type coercing problems between default_value and the slice data_type
and between lead postgres (positive) and datafusion/shifting (negative) arguments.
After fixed it in the latest commit, previous example works fine.

But if I try to test agains some custom partition

SELECT
  LAG(c8) OVER(PARTITION BY c9 ORDER BY c9) AS prev_c8

FROM test
ORDER BY c8;

datafusion throws an error

Empty DataFrame
Columns: [Plan("Projections require unique expression names but the expression \"LAG(#test.c8) AS prev_c8\" at position 4 and \"LAG(#test.c8) PARTITION BY [#test.c9] ORDER BY [#test.c9 ASC NULLS FIRST] AS prev_c8\" at position 7 have the same name. Consider aliasing (\"AS\") one of them.")]

I will try to look at this later today if I can ...

@jimexist
Copy link
Member

jimexist commented Jul 9, 2021

if the first example works then it's fine. the second example might fail because of different reasons not related to this change.

@jgoday
Copy link
Contributor Author

jgoday commented Jul 9, 2021

if the first example works then it's fine. the second example might fail because of different reasons not related to this change.

Two more questions/doubts ?

  • Would it be better to have a different sql for each case (lead/lag out of bounds/negative/default being null) or is it ok to test all combinations in a single sql?
  • Would it be more appropriate to squash all into a single commit?

@alamb
Copy link
Contributor

alamb commented Jul 12, 2021

@jimexist do you think this PR is ready? Do you need help reviewing ?

@jimexist
Copy link
Member

@jimexist do you think this PR is ready? Do you need help reviewing ?

looks okay after rebasing.

@jgoday jgoday force-pushed the lead_lag_optional_arguments branch from 34475c7 to 10aedf9 Compare July 14, 2021 16:17
@jgoday jgoday force-pushed the lead_lag_optional_arguments branch from 10aedf9 to 6d5e1f2 Compare July 14, 2021 16:50
@jgoday
Copy link
Contributor Author

jgoday commented Jul 14, 2021

@jimexist do you think this PR is ready? Do you need help reviewing ?

looks okay after rebasing.

Hi @jimexist, just made the rebase from master

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me . Thank you @jgoday and @jimexist

Some(ScalarValue::Int32(Some(100))),
),
vec![
Some(100),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb alamb merged commit 002ca5d into apache:master Jul 14, 2021
@houqp houqp added the enhancement New feature or request label Jul 31, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

implement lead and lag with 2nd and 3rd argument
4 participants