Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OpenLineage] fix: disabled_for_operators now stops whole event emission #38033

Merged
merged 1 commit into from
Mar 25, 2024

Conversation

kacpermuda
Copy link
Contributor

@kacpermuda kacpermuda commented Mar 11, 2024

Thanks to @JDarDagran , we know that putting operator in [openlineage] disabled_for_operators only prevents additional metadata from being attached to the event, instead of whole emission being prevented. This PR fixes it.

Also, i moved all the OpenLineage configuration to a separate conf.py, so it's easier to manage and find, also to understand the precedence.

I also added some tests for my changes.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@kacpermuda kacpermuda force-pushed the ol/fix/disabled-for-operators branch from 1a1ac05 to eacdec9 Compare March 11, 2024 13:31
@kacpermuda kacpermuda force-pushed the ol/fix/disabled-for-operators branch 5 times, most recently from e87399c to d81e7bb Compare March 21, 2024 15:58
@kacpermuda kacpermuda marked this pull request as ready for review March 21, 2024 15:58
@kacpermuda kacpermuda force-pushed the ol/fix/disabled-for-operators branch from d81e7bb to c9e88a0 Compare March 21, 2024 15:59
Signed-off-by: Kacper Muda <mudakacper@gmail.com>
@kacpermuda kacpermuda force-pushed the ol/fix/disabled-for-operators branch from c9e88a0 to 65f5916 Compare March 22, 2024 12:45
@mobuchowski mobuchowski merged commit 9c4e333 into apache:main Mar 25, 2024
46 checks passed
@kacpermuda kacpermuda deleted the ol/fix/disabled-for-operators branch March 25, 2024 11:24
tatiana added a commit to astronomer/airflow that referenced this pull request May 7, 2024
…rovider

Since the change apache#38033 was merged, `airflow-providers-dbt-cloud>=1.7.0` depend on `airflow-providers-openlineage>=1.7.0`. However, since this dependency was not declared anywhere.

This is the error users face if they use `airflow-providers-dbt-cloud>=1.7.0` and `airflow-providers-openlineage<1.7.0`:
```
2024-05-01, 10:17:39 UTC] {base.py:147} ERROR - OpenLineage provider method failed to import OpenLineage integration. This should not happen.
Traceback (most recent call last):
  File /usr/local/lib/python3.9/site-packages/airflow/providers/openlineage/extractors/base.py, line 137, in _get_openlineage_facets
    facets: OperatorLineage = get_facets_method(*args)
  File /usr/local/lib/python3.9/site-packages/airflow/providers/dbt/cloud/operators/dbt.py, line 249, in get_openlineage_facets_on_complete
    return generate_openlineage_events_from_dbt_cloud_run(operator=self, task_instance=task_instance)
  File /usr/local/lib/python3.9/site-packages/airflow/providers/dbt/cloud/utils/openlineage.py, line 50, in generate_openlineage_events_from_dbt_cloud_run
    from airflow.providers.openlineage.conf import namespace
ModuleNotFoundError: No module named 'airflow.providers.openlineage.conf'
```

Given that the dependency between both is optional, this PR introduces additional-extras to the dbt provider, solving the dependency issue for users who install using .
tatiana added a commit to astronomer/airflow that referenced this pull request May 7, 2024
…rovider

Since the change apache#38033 was merged, `airflow-providers-dbt-cloud>=1.7.0` depend on `airflow-providers-openlineage>=1.7.0`. However, since this dependency was not declared anywhere.

This is the error users face if they use `airflow-providers-dbt-cloud>=1.7.0` and `airflow-providers-openlineage<1.7.0`:
```
2024-05-01, 10:17:39 UTC] {base.py:147} ERROR - OpenLineage provider method failed to import OpenLineage integration. This should not happen.
Traceback (most recent call last):
  File /usr/local/lib/python3.9/site-packages/airflow/providers/openlineage/extractors/base.py, line 137, in _get_openlineage_facets
    facets: OperatorLineage = get_facets_method(*args)
  File /usr/local/lib/python3.9/site-packages/airflow/providers/dbt/cloud/operators/dbt.py, line 249, in get_openlineage_facets_on_complete
    return generate_openlineage_events_from_dbt_cloud_run(operator=self, task_instance=task_instance)
  File /usr/local/lib/python3.9/site-packages/airflow/providers/dbt/cloud/utils/openlineage.py, line 50, in generate_openlineage_events_from_dbt_cloud_run
    from airflow.providers.openlineage.conf import namespace
ModuleNotFoundError: No module named 'airflow.providers.openlineage.conf'
```

Given that the dependency between both is optional, this PR introduces additional-extras to the dbt provider, solving the dependency issue for users who install using .
josh-fell pushed a commit that referenced this pull request May 7, 2024
…#39366)

* Add (optional) minimum dependency between dbt-cloud and OpenLineage provider

Since the change #38033 was merged, `airflow-providers-dbt-cloud>=1.7.0` depend on `airflow-providers-openlineage>=1.7.0`. However, since this dependency was not declared anywhere.

This is the error users face if they use `airflow-providers-dbt-cloud>=1.7.0` and `airflow-providers-openlineage<1.7.0`:
```
2024-05-01, 10:17:39 UTC] {base.py:147} ERROR - OpenLineage provider method failed to import OpenLineage integration. This should not happen.
Traceback (most recent call last):
  File /usr/local/lib/python3.9/site-packages/airflow/providers/openlineage/extractors/base.py, line 137, in _get_openlineage_facets
    facets: OperatorLineage = get_facets_method(*args)
  File /usr/local/lib/python3.9/site-packages/airflow/providers/dbt/cloud/operators/dbt.py, line 249, in get_openlineage_facets_on_complete
    return generate_openlineage_events_from_dbt_cloud_run(operator=self, task_instance=task_instance)
  File /usr/local/lib/python3.9/site-packages/airflow/providers/dbt/cloud/utils/openlineage.py, line 50, in generate_openlineage_events_from_dbt_cloud_run
    from airflow.providers.openlineage.conf import namespace
ModuleNotFoundError: No module named 'airflow.providers.openlineage.conf'
```

Given that the dependency between both is optional, this PR introduces additional-extras to the dbt provider, solving the dependency issue for users who install using .

* Refactor dbt-cloud provider to raise a user-friendly exception if using incompatible openlineage provider
eladkal pushed a commit that referenced this pull request May 8, 2024
…teError` (#39462)

* Add (optional) minimum dependency between dbt-cloud and OpenLineage provider

Since the change #38033 was merged, `airflow-providers-dbt-cloud>=1.7.0` depend on `airflow-providers-openlineage>=1.7.0`. However, since this dependency was not declared anywhere.

This is the error users face if they use `airflow-providers-dbt-cloud>=1.7.0` and `airflow-providers-openlineage<1.7.0`:
```
2024-05-01, 10:17:39 UTC] {base.py:147} ERROR - OpenLineage provider method failed to import OpenLineage integration. This should not happen.
Traceback (most recent call last):
  File /usr/local/lib/python3.9/site-packages/airflow/providers/openlineage/extractors/base.py, line 137, in _get_openlineage_facets
    facets: OperatorLineage = get_facets_method(*args)
  File /usr/local/lib/python3.9/site-packages/airflow/providers/dbt/cloud/operators/dbt.py, line 249, in get_openlineage_facets_on_complete
    return generate_openlineage_events_from_dbt_cloud_run(operator=self, task_instance=task_instance)
  File /usr/local/lib/python3.9/site-packages/airflow/providers/dbt/cloud/utils/openlineage.py, line 50, in generate_openlineage_events_from_dbt_cloud_run
    from airflow.providers.openlineage.conf import namespace
ModuleNotFoundError: No module named 'airflow.providers.openlineage.conf'
```

Given that the dependency between both is optional, this PR introduces additional-extras to the dbt provider, solving the dependency issue for users who install using .

* Fix `DataprocCreateBatchOperator` with `result_retry` raises `AttributeError`

Closes: #39394

When trying to run the `example_dataproc_batch.py` DAG locally,
some of the tasks failed, including:

```
    create_batch_2 = DataprocCreateBatchOperator(
        task_id=create_batch_2,
        project_id=PROJECT_ID,
        region=REGION,
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID_2,
        result_retry=AsyncRetry(maximum=10.0, initial=10.0, multiplier=1.0),
    )
```

With the error:
```
Traceback (most recent call last):
  File /usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py, line 434, in _execute_task
    result = execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File /usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/dataproc.py, line 2537, in execute
    result = hook.wait_for_operation(
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File /usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataproc.py, line 266, in wait_for_operation
    error = operation.exception(timeout=timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File /usr/local/lib/python3.11/site-packages/google/api_core/future/polling.py, line 282, in exception
    self._blocking_poll(timeout=timeout)
  File /usr/local/lib/python3.11/site-packages/google/api_core/future/polling.py, line 137, in _blocking_poll
    polling(self._done_or_raise)(retry=retry)
  File /usr/local/lib/python3.11/site-packages/google/api_core/retry.py, line 372, in retry_wrapped_func
    return retry_target(
           ^^^^^^^^^^^^^
  File /usr/local/lib/python3.11/site-packages/google/api_core/retry.py, line 207, in retry_target
    result = target()
             ^^^^^^^^
  File /usr/local/lib/python3.11/site-packages/google/api_core/future/polling.py, line 119, in _done_or_raise
    if not self.done(retry=retry):
           ^^^^^^^^^^^^^^^^^^^^^^
  File /usr/local/lib/python3.11/site-packages/google/api_core/operation.py, line 174, in done
    self._refresh_and_update(retry)
  File /usr/local/lib/python3.11/site-packages/google/api_core/operation.py, line 161, in _refresh_and_update
    if not self._operation.done:
           ^^^^^^^^^^^^^^^^^^^^
AttributeError: 'coroutine' object has no attribute 'done'
```

This was due to an issue in the dependecy `google-api-core==2.18.0`. By either running with 2.17.0 or 2.19.0, the DAG works.
pateash pushed a commit to pateash/airflow that referenced this pull request May 13, 2024
…apache#39366)

* Add (optional) minimum dependency between dbt-cloud and OpenLineage provider

Since the change apache#38033 was merged, `airflow-providers-dbt-cloud>=1.7.0` depend on `airflow-providers-openlineage>=1.7.0`. However, since this dependency was not declared anywhere.

This is the error users face if they use `airflow-providers-dbt-cloud>=1.7.0` and `airflow-providers-openlineage<1.7.0`:
```
2024-05-01, 10:17:39 UTC] {base.py:147} ERROR - OpenLineage provider method failed to import OpenLineage integration. This should not happen.
Traceback (most recent call last):
  File /usr/local/lib/python3.9/site-packages/airflow/providers/openlineage/extractors/base.py, line 137, in _get_openlineage_facets
    facets: OperatorLineage = get_facets_method(*args)
  File /usr/local/lib/python3.9/site-packages/airflow/providers/dbt/cloud/operators/dbt.py, line 249, in get_openlineage_facets_on_complete
    return generate_openlineage_events_from_dbt_cloud_run(operator=self, task_instance=task_instance)
  File /usr/local/lib/python3.9/site-packages/airflow/providers/dbt/cloud/utils/openlineage.py, line 50, in generate_openlineage_events_from_dbt_cloud_run
    from airflow.providers.openlineage.conf import namespace
ModuleNotFoundError: No module named 'airflow.providers.openlineage.conf'
```

Given that the dependency between both is optional, this PR introduces additional-extras to the dbt provider, solving the dependency issue for users who install using .

* Refactor dbt-cloud provider to raise a user-friendly exception if using incompatible openlineage provider
pateash pushed a commit to pateash/airflow that referenced this pull request May 13, 2024
…teError` (apache#39462)

* Add (optional) minimum dependency between dbt-cloud and OpenLineage provider

Since the change apache#38033 was merged, `airflow-providers-dbt-cloud>=1.7.0` depend on `airflow-providers-openlineage>=1.7.0`. However, since this dependency was not declared anywhere.

This is the error users face if they use `airflow-providers-dbt-cloud>=1.7.0` and `airflow-providers-openlineage<1.7.0`:
```
2024-05-01, 10:17:39 UTC] {base.py:147} ERROR - OpenLineage provider method failed to import OpenLineage integration. This should not happen.
Traceback (most recent call last):
  File /usr/local/lib/python3.9/site-packages/airflow/providers/openlineage/extractors/base.py, line 137, in _get_openlineage_facets
    facets: OperatorLineage = get_facets_method(*args)
  File /usr/local/lib/python3.9/site-packages/airflow/providers/dbt/cloud/operators/dbt.py, line 249, in get_openlineage_facets_on_complete
    return generate_openlineage_events_from_dbt_cloud_run(operator=self, task_instance=task_instance)
  File /usr/local/lib/python3.9/site-packages/airflow/providers/dbt/cloud/utils/openlineage.py, line 50, in generate_openlineage_events_from_dbt_cloud_run
    from airflow.providers.openlineage.conf import namespace
ModuleNotFoundError: No module named 'airflow.providers.openlineage.conf'
```

Given that the dependency between both is optional, this PR introduces additional-extras to the dbt provider, solving the dependency issue for users who install using .

* Fix `DataprocCreateBatchOperator` with `result_retry` raises `AttributeError`

Closes: apache#39394

When trying to run the `example_dataproc_batch.py` DAG locally,
some of the tasks failed, including:

```
    create_batch_2 = DataprocCreateBatchOperator(
        task_id=create_batch_2,
        project_id=PROJECT_ID,
        region=REGION,
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID_2,
        result_retry=AsyncRetry(maximum=10.0, initial=10.0, multiplier=1.0),
    )
```

With the error:
```
Traceback (most recent call last):
  File /usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py, line 434, in _execute_task
    result = execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File /usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/dataproc.py, line 2537, in execute
    result = hook.wait_for_operation(
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File /usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataproc.py, line 266, in wait_for_operation
    error = operation.exception(timeout=timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File /usr/local/lib/python3.11/site-packages/google/api_core/future/polling.py, line 282, in exception
    self._blocking_poll(timeout=timeout)
  File /usr/local/lib/python3.11/site-packages/google/api_core/future/polling.py, line 137, in _blocking_poll
    polling(self._done_or_raise)(retry=retry)
  File /usr/local/lib/python3.11/site-packages/google/api_core/retry.py, line 372, in retry_wrapped_func
    return retry_target(
           ^^^^^^^^^^^^^
  File /usr/local/lib/python3.11/site-packages/google/api_core/retry.py, line 207, in retry_target
    result = target()
             ^^^^^^^^
  File /usr/local/lib/python3.11/site-packages/google/api_core/future/polling.py, line 119, in _done_or_raise
    if not self.done(retry=retry):
           ^^^^^^^^^^^^^^^^^^^^^^
  File /usr/local/lib/python3.11/site-packages/google/api_core/operation.py, line 174, in done
    self._refresh_and_update(retry)
  File /usr/local/lib/python3.11/site-packages/google/api_core/operation.py, line 161, in _refresh_and_update
    if not self._operation.done:
           ^^^^^^^^^^^^^^^^^^^^
AttributeError: 'coroutine' object has no attribute 'done'
```

This was due to an issue in the dependecy `google-api-core==2.18.0`. By either running with 2.17.0 or 2.19.0, the DAG works.
kosteev pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this pull request Nov 9, 2024
… (#39366)

* Add (optional) minimum dependency between dbt-cloud and OpenLineage provider

Since the change apache/airflow#38033 was merged, `airflow-providers-dbt-cloud>=1.7.0` depend on `airflow-providers-openlineage>=1.7.0`. However, since this dependency was not declared anywhere.

This is the error users face if they use `airflow-providers-dbt-cloud>=1.7.0` and `airflow-providers-openlineage<1.7.0`:
```
2024-05-01, 10:17:39 UTC] {base.py:147} ERROR - OpenLineage provider method failed to import OpenLineage integration. This should not happen.
Traceback (most recent call last):
  File /usr/local/lib/python3.9/site-packages/airflow/providers/openlineage/extractors/base.py, line 137, in _get_openlineage_facets
    facets: OperatorLineage = get_facets_method(*args)
  File /usr/local/lib/python3.9/site-packages/airflow/providers/dbt/cloud/operators/dbt.py, line 249, in get_openlineage_facets_on_complete
    return generate_openlineage_events_from_dbt_cloud_run(operator=self, task_instance=task_instance)
  File /usr/local/lib/python3.9/site-packages/airflow/providers/dbt/cloud/utils/openlineage.py, line 50, in generate_openlineage_events_from_dbt_cloud_run
    from airflow.providers.openlineage.conf import namespace
ModuleNotFoundError: No module named 'airflow.providers.openlineage.conf'
```

Given that the dependency between both is optional, this PR introduces additional-extras to the dbt provider, solving the dependency issue for users who install using .

* Refactor dbt-cloud provider to raise a user-friendly exception if using incompatible openlineage provider

GitOrigin-RevId: 7550a11f1d094b768942f28698b87c4f69fad7f5
kosteev pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this pull request Nov 9, 2024
…teError` (#39462)

* Add (optional) minimum dependency between dbt-cloud and OpenLineage provider

Since the change apache/airflow#38033 was merged, `airflow-providers-dbt-cloud>=1.7.0` depend on `airflow-providers-openlineage>=1.7.0`. However, since this dependency was not declared anywhere.

This is the error users face if they use `airflow-providers-dbt-cloud>=1.7.0` and `airflow-providers-openlineage<1.7.0`:
```
2024-05-01, 10:17:39 UTC] {base.py:147} ERROR - OpenLineage provider method failed to import OpenLineage integration. This should not happen.
Traceback (most recent call last):
  File /usr/local/lib/python3.9/site-packages/airflow/providers/openlineage/extractors/base.py, line 137, in _get_openlineage_facets
    facets: OperatorLineage = get_facets_method(*args)
  File /usr/local/lib/python3.9/site-packages/airflow/providers/dbt/cloud/operators/dbt.py, line 249, in get_openlineage_facets_on_complete
    return generate_openlineage_events_from_dbt_cloud_run(operator=self, task_instance=task_instance)
  File /usr/local/lib/python3.9/site-packages/airflow/providers/dbt/cloud/utils/openlineage.py, line 50, in generate_openlineage_events_from_dbt_cloud_run
    from airflow.providers.openlineage.conf import namespace
ModuleNotFoundError: No module named 'airflow.providers.openlineage.conf'
```

Given that the dependency between both is optional, this PR introduces additional-extras to the dbt provider, solving the dependency issue for users who install using .

* Fix `DataprocCreateBatchOperator` with `result_retry` raises `AttributeError`

Closes: #39394

When trying to run the `example_dataproc_batch.py` DAG locally,
some of the tasks failed, including:

```
    create_batch_2 = DataprocCreateBatchOperator(
        task_id=create_batch_2,
        project_id=PROJECT_ID,
        region=REGION,
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID_2,
        result_retry=AsyncRetry(maximum=10.0, initial=10.0, multiplier=1.0),
    )
```

With the error:
```
Traceback (most recent call last):
  File /usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py, line 434, in _execute_task
    result = execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File /usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/dataproc.py, line 2537, in execute
    result = hook.wait_for_operation(
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File /usr/local/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataproc.py, line 266, in wait_for_operation
    error = operation.exception(timeout=timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File /usr/local/lib/python3.11/site-packages/google/api_core/future/polling.py, line 282, in exception
    self._blocking_poll(timeout=timeout)
  File /usr/local/lib/python3.11/site-packages/google/api_core/future/polling.py, line 137, in _blocking_poll
    polling(self._done_or_raise)(retry=retry)
  File /usr/local/lib/python3.11/site-packages/google/api_core/retry.py, line 372, in retry_wrapped_func
    return retry_target(
           ^^^^^^^^^^^^^
  File /usr/local/lib/python3.11/site-packages/google/api_core/retry.py, line 207, in retry_target
    result = target()
             ^^^^^^^^
  File /usr/local/lib/python3.11/site-packages/google/api_core/future/polling.py, line 119, in _done_or_raise
    if not self.done(retry=retry):
           ^^^^^^^^^^^^^^^^^^^^^^
  File /usr/local/lib/python3.11/site-packages/google/api_core/operation.py, line 174, in done
    self._refresh_and_update(retry)
  File /usr/local/lib/python3.11/site-packages/google/api_core/operation.py, line 161, in _refresh_and_update
    if not self._operation.done:
           ^^^^^^^^^^^^^^^^^^^^
AttributeError: 'coroutine' object has no attribute 'done'
```

This was due to an issue in the dependecy `google-api-core==2.18.0`. By either running with 2.17.0 or 2.19.0, the DAG works.

GitOrigin-RevId: f00006d1c6d0ad531a1f783a09ab6e700efac74b
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants