Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix
DataprocCreateBatchOperator
with result_retry
raises `Attribu…
…teError` (#39462) * Add (optional) minimum dependency between dbt-cloud and OpenLineage provider Since the change #38033 was merged, `airflow-providers-dbt-cloud>=1.7.0` depend on `airflow-providers-openlineage>=1.7.0`. However, since this dependency was not declared anywhere. This is the error users face if they use `airflow-providers-dbt-cloud>=1.7.0` and `airflow-providers-openlineage<1.7.0`: ``` 2024-05-01, 10:17:39 UTC] {base.py:147} ERROR - OpenLineage provider method failed to import OpenLineage integration. This should not happen. Traceback (most recent call last): File /usr/local/lib/python3.9/site-packages/airflow/providers/openlineage/extractors/base.py, line 137, in _get_openlineage_facets facets: OperatorLineage = get_facets_method(*args) File /usr/local/lib/python3.9/site-packages/airflow/providers/dbt/cloud/operators/dbt.py, line 249, in get_openlineage_facets_on_complete return generate_openlineage_events_from_dbt_cloud_run(operator=self, task_instance=task_instance) File /usr/local/lib/python3.9/site-packages/airflow/providers/dbt/cloud/utils/openlineage.py, line 50, in generate_openlineage_events_from_dbt_cloud_run from airflow.providers.openlineage.conf import namespace ModuleNotFoundError: No module named 'airflow.providers.openlineage.conf' ``` Given that the dependency between both is optional, this PR introduces additional-extras to the dbt provider, solving the dependency issue for users who install using . * Fix `DataprocCreateBatchOperator` with `result_retry` raises `AttributeError` Closes: #39394 When trying to run the `example_dataproc_batch.py` DAG locally, some of the tasks failed, including: ``` create_batch_2 = DataprocCreateBatchOperator( task_id=create_batch_2, project_id=PROJECT_ID, region=REGION, batch=BATCH_CONFIG, batch_id=BATCH_ID_2, result_retry=AsyncRetry(maximum=10.0, initial=10.0, multiplier=1.0), ) ``` With the error: ``` Traceback (most recent call last): File /usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py, line 434, in _execute_task result = execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File /usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/dataproc.py, line 2537, in execute result = hook.wait_for_operation( ^^^^^^^^^^^^^^^^^^^^^^^^ File /usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataproc.py, line 266, in wait_for_operation error = operation.exception(timeout=timeout) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File /usr/local/lib/python3.11/site-packages/google/api_core/future/polling.py, line 282, in exception self._blocking_poll(timeout=timeout) File /usr/local/lib/python3.11/site-packages/google/api_core/future/polling.py, line 137, in _blocking_poll polling(self._done_or_raise)(retry=retry) File /usr/local/lib/python3.11/site-packages/google/api_core/retry.py, line 372, in retry_wrapped_func return retry_target( ^^^^^^^^^^^^^ File /usr/local/lib/python3.11/site-packages/google/api_core/retry.py, line 207, in retry_target result = target() ^^^^^^^^ File /usr/local/lib/python3.11/site-packages/google/api_core/future/polling.py, line 119, in _done_or_raise if not self.done(retry=retry): ^^^^^^^^^^^^^^^^^^^^^^ File /usr/local/lib/python3.11/site-packages/google/api_core/operation.py, line 174, in done self._refresh_and_update(retry) File /usr/local/lib/python3.11/site-packages/google/api_core/operation.py, line 161, in _refresh_and_update if not self._operation.done: ^^^^^^^^^^^^^^^^^^^^ AttributeError: 'coroutine' object has no attribute 'done' ``` This was due to an issue in the dependecy `google-api-core==2.18.0`. By either running with 2.17.0 or 2.19.0, the DAG works.
- Loading branch information