Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing JdbcOperator non-SELECT statement run #25412

Merged
merged 2 commits into from
Aug 7, 2022

Conversation

kazanzhy
Copy link
Contributor

@kazanzhy kazanzhy commented Jul 29, 2022

Closes: #25388

I added a very unpleasant bug in #23817 to close #19313.
It was adding fetchall to JdbcHook in JdbcOperator.
But error raises when running not select statements as described in #25388

My main question for other maintainers is which solution is better

  • adding parameter fetch_results: bool
  • give users the ability to pass their own handlers

Anyway, for more compatibility and unification we have to decide the default behavior of SQL operators in terms of returning results.

My opinion is adding handler parameter and no Xcom push by default, like:
handler: Optional[Callable] = None

@kazanzhy
Copy link
Contributor Author

Hi @cdabella @nkyuray
Probably, changes that fixed #19313 will be rolled back but with adding some flexibility

@kazanzhy
Copy link
Contributor Author

@akashgangulyhf this PR is devoted to #25388. Please join to discussions

@josh-fell
Copy link
Contributor

My main question for other maintainers is which solution is better

  • adding parameter fetch_results: bool
  • give users the ability to pass their own handlers

It might not be what is better, but perhaps why not do both? Have users control if they want to fetch results from the query they execute and the option to customize how to handle said results (with fetch_all_handler as a default even).

self.hook = None

def execute(self, context: 'Context'):
self.log.info('Executing: %s', self.sql)
hook = JdbcHook(jdbc_conn_id=self.jdbc_conn_id)
return hook.run(self.sql, self.autocommit, parameters=self.parameters, handler=fetch_all_handler)
if self.fetch_results:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this parameter?
This is operator not hook so it looks like fetch_results = do_xcom_push ?
Did I miss something?

Copy link
Contributor Author

@kazanzhy kazanzhy Aug 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, you're right. I missed that no additional parameter is needed if do_xcom_push is already present

@@ -57,16 +57,21 @@ def __init__(
jdbc_conn_id: str = 'jdbc_default',
autocommit: bool = False,
parameters: Optional[Union[Iterable, Mapping]] = None,
handler: Callable = fetch_all_handler,
Copy link
Member

@uranusjr uranusjr Aug 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
handler: Callable = fetch_all_handler,
handler: Callable[[CursorResult], Any] = fetch_all_handler,

(plus needed imports)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think we should type hint here since the fetchall handler must not be sqlalchemy but bdapi2 compliant (DbApiHook.run does not use sqlalchemy).

Does the CursorResult import comes from jaydebeapi ?

Copy link
Member

@uranusjr uranusjr Aug 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should probably be Callable[[Any], Any] then? The goal is to make sure the handler accepts exactly one argument, and returns something.

Copy link
Contributor Author

@kazanzhy kazanzhy Aug 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Callable[[Any], Any] should be enough, because initially, I thought about:

from airflow.typing_compat import Protocol

class Cursor(Protocol):
    description: Optional[Tuple]
    fetchall: Callable[[], List[Tuple]]

handler: Callable[[Cursor], Optional[List[Tuple]]]

@potiuk
Copy link
Member

potiuk commented Aug 7, 2022

Some intermittent issues. Re-running.

@FanatoniQ
Copy link
Contributor

@potiuk @kazanzhy Didn't we fix #25388 with #25430 ? fetch_all_handler should be fixed now

@potiuk
Copy link
Member

potiuk commented Aug 7, 2022

Yep. I think it's not technically needed now - the effect is the same, though the difference is that it allows to override the handler in JDBC, which might be a useful thing.

@potiuk potiuk merged commit 1708da9 into apache:main Aug 7, 2022
@FanatoniQ
Copy link
Contributor

Yep. I think it's not technically needed now - the effect is the same, though the difference is that it allows to override the handler in JDBC, which might be a useful thing.

Ok I agree.

Quick question: shouldn't this handler parameter, and sql operator run methods return value feature used alongside do_xcom_push be coded in a common parent SQL class ? I found the other day that MsSqlOperator and MySqlOperator run methods do not return the result...

Is that planned ? I am asking after seeing this commit's message: "move all old sql operator...": acab8f5

@potiuk
Copy link
Member

potiuk commented Aug 7, 2022

Is that planned ? I am asking after seeing this commit's message: "move all old sql operator...": acab8f5

Maybe - if you want to add such refactor - feel free :)

@kazanzhy
Copy link
Contributor Author

kazanzhy commented Aug 7, 2022

@FanatoniQ I'm already working on #25259.
It might answer your question :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants