-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Common SQLCheckOperators Various Functionality Update #25164
Common SQLCheckOperators Various Functionality Update #25164
Conversation
Commit adds a WHERE clause to the sql statement that allows for arbitrary batching in a given table.
When multiple table checks are given to the SQLTableCheckOperator and at least one is not a fully aggregate statement, a GROUP BY clause was previously needed. This commit updates the operator to use the get_pandas_df() method instead of _get_first() to return a pandas dataframe object that contains the check names and check results from the new style of query. The new style of query uses UNION ALL to run each test as its own SELECT statement, bypassing the need to do a GROUP BY.
Changed name of method from _get_failed_tests to _get_failed_checks to better match naming, and updated logic of the method to include an optional column param. The query in the column check operator is removed from the failed test exception message, as it was only ever showing the last query, instead of the relevant one(s). This is replaced by the column, which will be more useful in debugging.
Some tests are failing |
Looking into this today, seems a test is missing a context parameter. |
Without a table alias, the query does not run on Postgres and other databases. The alias is arbitrary and used only for proper query execution.
Commit adds a WHERE clause to the sql statement that allows for arbitrary batching in a given table.
When multiple table checks are given to the SQLTableCheckOperator and at least one is not a fully aggregate statement, a GROUP BY clause was previously needed. This commit updates the operator to use the get_pandas_df() method instead of _get_first() to return a pandas dataframe object that contains the check names and check results from the new style of query. The new style of query uses UNION ALL to run each test as its own SELECT statement, bypassing the need to do a GROUP BY.
Changed name of method from _get_failed_tests to _get_failed_checks to better match naming, and updated logic of the method to include an optional column param. The query in the column check operator is removed from the failed test exception message, as it was only ever showing the last query, instead of the relevant one(s). This is replaced by the column, which will be more useful in debugging.
Without a table alias, the query does not run on Postgres and other databases. The alias is arbitrary and used only for proper query execution.
554e8ba
to
5645b5d
Compare
…thub.com:denimalpaca/airflow into sql_check_operators_various_functionality_update
The table alias should be in the self.sql query build statement as that is where the table it needs to alias is defined.
Commit adds a WHERE clause to the sql statement that allows for arbitrary batching in a given table.
When multiple table checks are given to the SQLTableCheckOperator and at least one is not a fully aggregate statement, a GROUP BY clause was previously needed. This commit updates the operator to use the get_pandas_df() method instead of _get_first() to return a pandas dataframe object that contains the check names and check results from the new style of query. The new style of query uses UNION ALL to run each test as its own SELECT statement, bypassing the need to do a GROUP BY.
Changed name of method from _get_failed_tests to _get_failed_checks to better match naming, and updated logic of the method to include an optional column param. The query in the column check operator is removed from the failed test exception message, as it was only ever showing the last query, instead of the relevant one(s). This is replaced by the column, which will be more useful in debugging.
Without a table alias, the query does not run on Postgres and other databases. The alias is arbitrary and used only for proper query execution.
5645b5d
to
987f6c2
Compare
…thub.com:denimalpaca/airflow into sql_check_operators_various_functionality_update
Fixed bug in test where the dataframe column names did not match the operator's expected dataframe column names. Added more info to the SQLColumnCheckOperator's batch arg. Fixed the location of table aliasing in SQLTableCheckOperator.
Gives a clearer name to the parameter and adds templating to the SQLTableCheckOperator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now it looks great! Thanks @denimalpaca for being so responsive to our comments :)
for check_name, value in self.checks.items() | ||
] | ||
) | ||
partition_clause_statement = f"WHERE {self.partition_clause}" if self.partition_clause else "" | ||
self.sql = f"SELECT check_name, check_result FROM ({checks_sql}) " | ||
f"AS check_table {partition_clause_statement};" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops. Missing +
here. This string is just a literal, not part of self.sql
(and thus the entire partition_clause_statement
isn't used!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just got back from vacation, I see this got merged without a fix here. I can open a separate PR to address it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Ash (nick-name Hawk-Eye
) noticed it after we merged and started voting on it. I decided to continue to release as both operators were new but I would love to get it fixed for 1.2.0.
I hope you had good vacation @denimalpaca :D
|
||
self.sql = f"SELECT {check_mins_sql} FROM (SELECT {checks_sql} FROM {self.table});" | ||
records = hook.get_first(self.sql) | ||
records = hook.get_pandas_df(self.sql) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did we change from getting records to getting this as a pandas dataframe? This now places a hard requirement on using pandas for this operator, where as previously pandas was almost entirely optional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this was changed because with hook.get_first
, there was an issue with how the SQL was being written that caused only fully aggregated checks to be returned, unless the syntax of the SQL query was changed, but that would require either a fetch_all
or get_pandas_df
call as the new SQL needs to returned multiple lines. It seemed much easier and possibly more efficient to use pandas here, but if a fetch_all
seems more reasonable this can be changed. Happy to explain more about the specific issue if curious.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be curious to hear more about it :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To expand a bit more, a check like col_a + col_b >= col_c
would not work when there were multiple checks in the operator as the previous SELECT
statement would then fail and require a GROUP BY
clause iirc. So the check would either have to be in its own operator, or be amended like so: SUM(col_a) + SUM(col_b) >= SUM(col_c)
which isn't quite the same check. So the query needed to be updated, and the one that I wound up using returns multiple rows. So get_first
is no longer useful, and in the moment of writing it seemed that handling things with a pandas dataframe might be easier in the long term, if more complicated uses of the pulled in data were implemented. But as of now I see how it's unneeded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.... Is the operator easy to understand by the users? I am afraid this is something we need good and clear howto and examples for it because people will have hard time using it and rais too many questions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally most users won't need to learn about why .fetch_all
is being used instead of .get_first
🙃 . I'm working with some users of the operator right now and seeing what's complicated to make sure the docs are robust. I also have a working example DAG showing how to use the operator (with several more planned) here.
The SQLCheckOperators under the Common provider have some issues since their initial release, including:
SQLTableCheckOperator
that forces users to use only fully-aggregated checksSQLColumnCheckOperator
that only ever returns the final SQL query builtThis PR remedies these issues.
closes: #25163