Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add on_kill() to kill Trino query if Airflow task is killed #24559

Merged
merged 1 commit into from
Jun 20, 2022

Conversation

phanikumv
Copy link
Contributor

@phanikumv phanikumv commented Jun 20, 2022

This PR is a follow up to PR #24415.
It adds on_kill method to the TrinoOperator to kill Trino query if Airflow task is killed

cc @kaxil @eladkal


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragement file, named {pr_number}.significant.rst, in newsfragments.

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Jun 20, 2022
@github-actions
Copy link

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


def on_kill(self) -> None:
if self.hook is not None and isinstance(self.hook, TrinoHook):
query_id = "'" + self.hook.query_id + "'"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the following work?

Suggested change
query_id = "'" + self.hook.query_id + "'"
query_id = f"'{self.hook.query_id}'"

@@ -79,3 +81,18 @@ def execute(self, context: 'Context') -> None:
self.hook.run(
sql=self.sql, autocommit=self.autocommit, parameters=self.parameters, handler=self.handler
)

def on_kill(self) -> None:
if self.hook is not None and isinstance(self.hook, TrinoHook):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following might be sufficient

Suggested change
if self.hook is not None and isinstance(self.hook, TrinoHook):
if isinstance(self.hook, TrinoHook):

@kaxil kaxil merged commit 4f4f37c into apache:main Jun 20, 2022
@kaxil kaxil deleted the trino_review branch June 20, 2022 18:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers full tests needed We need to run full set of tests for this PR to merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants