Skip to content

Commit

Permalink
Rename opt_in to selective_enable.
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>
  • Loading branch information
JDarDagran committed Mar 26, 2024
1 parent a2c9d18 commit e0a08d9
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 75 deletions.
5 changes: 5 additions & 0 deletions airflow/providers/openlineage/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ def disabled_operators() -> set[str]:
return set(operator.strip() for operator in option.split(";") if operator.strip())


@cache
def selective_enable() -> bool:
return conf.getboolean(_CONFIG_SECTION, "selective_enable", fallback=False)


@cache
def custom_extractors() -> set[str]:
"""[openlineage] extractors."""
Expand Down
25 changes: 7 additions & 18 deletions airflow/providers/openlineage/plugins/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,15 @@

from openlineage.client.serde import Serde

from airflow.configuration import conf
from airflow.listeners import hookimpl
from airflow.models import DAG
from airflow.providers.openlineage.extractors import ExtractorManager
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, RunState
from airflow.providers.openlineage.utils.opt_in import is_dag_lineage_enabled, is_task_lineage_enabled
from airflow.providers.openlineage.utils.utils import (
get_airflow_run_facet,
get_custom_facets,
get_job_name,
is_operator_disabled,
is_selective_lineage_enabled,
print_warning,
)
from airflow.stats import Stats
Expand All @@ -55,15 +53,6 @@ def __init__(self):
self.log = logging.getLogger(__name__)
self.extractor_manager = ExtractorManager()
self.adapter = OpenLineageAdapter()
self._is_opt_in = conf.getboolean("openlineage", "opt_in", fallback=False)

def _is_lineage_enabled(self, obj) -> bool:
if not self._is_opt_in:
return True
if isinstance(obj, DAG):
return is_dag_lineage_enabled(obj)
else:
return is_task_lineage_enabled(obj)

@hookimpl
def on_task_instance_running(
Expand Down Expand Up @@ -95,7 +84,7 @@ def on_task_instance_running(
)
return None

if not self._is_lineage_enabled(task):
if not is_selective_lineage_enabled(task):
return

@print_warning(self.log)
Expand Down Expand Up @@ -165,7 +154,7 @@ def on_task_instance_success(self, previous_state, task_instance: TaskInstance,
)
return None

if not self._is_lineage_enabled(task):
if not is_selective_lineage_enabled(task):
return

@print_warning(self.log)
Expand Down Expand Up @@ -220,7 +209,7 @@ def on_task_instance_failed(self, previous_state, task_instance: TaskInstance, s
)
return None

if not self._is_lineage_enabled(task):
if not is_selective_lineage_enabled(task):
return

@print_warning(self.log)
Expand Down Expand Up @@ -276,7 +265,7 @@ def before_stopping(self, component):

@hookimpl
def on_dag_run_running(self, dag_run: DagRun, msg: str):
if not self._is_lineage_enabled(dag_run.dag):
if not is_selective_lineage_enabled(dag_run.dag):
return
data_interval_start = dag_run.data_interval_start.isoformat() if dag_run.data_interval_start else None
data_interval_end = dag_run.data_interval_end.isoformat() if dag_run.data_interval_end else None
Expand All @@ -290,7 +279,7 @@ def on_dag_run_running(self, dag_run: DagRun, msg: str):

@hookimpl
def on_dag_run_success(self, dag_run: DagRun, msg: str):
if not self._is_lineage_enabled(dag_run.dag):
if not is_selective_lineage_enabled(dag_run.dag):
return
if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_success`")
Expand All @@ -299,7 +288,7 @@ def on_dag_run_success(self, dag_run: DagRun, msg: str):

@hookimpl
def on_dag_run_failed(self, dag_run: DagRun, msg: str):
if not self._is_lineage_enabled(dag_run.dag):
if not is_selective_lineage_enabled(dag_run.dag):
return
if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_failed`")
Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/openlineage/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ config:
example: "airflow.operators.bash.BashOperator;airflow.operators.python.PythonOperator"
default: ""
version_added: 1.1.0
opt_in:
selective_enable:
description: |
Set this to true if you want to enable OpenLineage metadata extraction and emission
on DAG and/or Task level.
If this setting is enabled, OpenLineage integration won't collect and emit metadata,
unless you explicitly enable it per `DAG` or `Task` using `enable_lineage` method.
type: boolean
default: "False"
example: ~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,25 @@

from __future__ import annotations

import logging
from typing import TypeVar

from airflow.models import DAG, Operator, Param
from airflow.models.xcom_arg import XComArg

ENABLE_OL_PARAM_NAME = "_enable_ol"
ENABLE_OL_PARAM_NAME = "_selective_enable_ol"
ENABLE_OL_PARAM = Param(True, const=True)
DISABLE_OL_PARAM = Param(False, const=False)
T = TypeVar("T", bound="DAG | Operator")

log = logging.getLogger(__name__)


def enable_lineage(obj: T) -> T:
"""Set selective enable OpenLineage parameter to True.
The method also propagates param to tasks if the object is DAG.
"""
if isinstance(obj, XComArg):
enable_lineage(obj.operator)
return obj
Expand All @@ -41,6 +48,10 @@ def enable_lineage(obj: T) -> T:


def disable_lineage(obj: T) -> T:
"""Set selective enable OpenLineage parameter to False.
The method also propagates param to tasks if the object is DAG.
"""
if isinstance(obj, XComArg):
disable_lineage(obj.operator)
return obj
Expand All @@ -53,10 +64,24 @@ def disable_lineage(obj: T) -> T:


def is_task_lineage_enabled(task: Operator) -> bool:
"""Check if selective enable OpenLineage parameter is set to True on task level."""
if task.params.get(ENABLE_OL_PARAM_NAME) is False:
log.debug(
"OpenLineage event emission suppressed. Task for this functionality is selectively disabled."
)
return task.params.get(ENABLE_OL_PARAM_NAME) is True


def is_dag_lineage_enabled(dag: DAG) -> bool:
"""Check if DAG is selectively enabled to emit OpenLineage events.
The method also checks if selective enable parameter is set to True
or if any of the tasks in DAG is selectively enabled.
"""
if dag.params.get(ENABLE_OL_PARAM_NAME) is False:
log.debug(
"OpenLineage event emission suppressed. DAG for this functionality is selectively disabled."
)
return dag.params.get(ENABLE_OL_PARAM_NAME) is True or any(
is_task_lineage_enabled(task) for task in dag.tasks
)
20 changes: 18 additions & 2 deletions airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,21 @@
# TODO: move this maybe to Airflow's logic?
from openlineage.client.utils import RedactMixin

from airflow.models import DAG, BaseOperator, MappedOperator
from airflow.providers.openlineage import conf
from airflow.providers.openlineage.plugins.facets import (
AirflowMappedTaskRunFacet,
AirflowRunFacet,
)
from airflow.providers.openlineage.utils.selective_enable import (
is_dag_lineage_enabled,
is_task_lineage_enabled,
)
from airflow.utils.context import AirflowContextDeprecationWarning
from airflow.utils.log.secrets_masker import Redactable, Redacted, SecretsMasker, should_hide_value_for_key

if TYPE_CHECKING:
from airflow.models import DAG, BaseOperator, DagRun, MappedOperator, TaskInstance
from airflow.models import DagRun, TaskInstance


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -73,6 +78,18 @@ def is_operator_disabled(operator: BaseOperator | MappedOperator) -> bool:
return get_fully_qualified_class_name(operator) in conf.disabled_operators()


def is_selective_lineage_enabled(obj: DAG | BaseOperator | MappedOperator) -> bool:
"""If selective enable is active check if DAG or Task is enabled to emit events."""
if not conf.selective_enable():
return True
if isinstance(obj, DAG):
return is_dag_lineage_enabled(obj)
elif isinstance(obj, (BaseOperator, MappedOperator)):
return is_task_lineage_enabled(obj)
else:
raise TypeError("is_selective_lineage_enabled can only be used on DAG or Operator objects")


class InfoJsonEncodable(dict):
"""
Airflow objects might not be json-encodable overall.
Expand Down Expand Up @@ -329,7 +346,6 @@ def wrapper(*args, **kwargs):
return f(*args, **kwargs)
except Exception as e:
log.warning(e)
raise e

return wrapper

Expand Down
16 changes: 8 additions & 8 deletions docs/apache-airflow-providers-openlineage/guides/user.rst
Original file line number Diff line number Diff line change
Expand Up @@ -267,26 +267,26 @@ a string of semicolon separated Airflow Operators full import paths to ``extract
Enabling OpenLineage on DAG/task level
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

One can selectively enable OpenLineage for specific DAGs and tasks by using the ``opt_in`` policy.
To enable this policy, set the ``opt_in`` option to True in the [openlineage] section of your Airflow configuration file:
One can selectively enable OpenLineage for specific DAGs and tasks by using the ``selective_enable`` policy.
To enable this policy, set the ``selective_enable`` option to True in the [openlineage] section of your Airflow configuration file:

.. code-block:: ini
[openlineage]
opt_in = True
selective_enable = True
While ``opt_in`` enables selective control, the ``disabled`` :ref:`option <options:disable>` still has precedence.
If you set ``disabled`` to True in the configuration, OpenLineage will be disabled for all DAGs and tasks regardless of the ``opt_in`` setting.
While ``selective_enable`` enables selective control, the ``disabled`` :ref:`option <options:disable>` still has precedence.
If you set ``disabled`` to True in the configuration, OpenLineage will be disabled for all DAGs and tasks regardless of the ``selective_enable`` setting.

Once the ``opt_in`` policy is enabled, you can choose to enable OpenLineage
Once the ``selective_enable`` policy is enabled, you can choose to enable OpenLineage
for individual DAGs and tasks using the ``enable_lineage`` and ``disable_lineage`` functions.

1. Enabling Lineage on a DAG:

.. code-block:: python
from airflow.providers.openlineage.utils.opt_in import disable_lineage, enable_lineage
from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage
with enable_lineage(DAG(...)):
# Tasks within this DAG will have lineage tracking enabled
Expand All @@ -300,7 +300,7 @@ While enabling lineage on a DAG implicitly enables it for all tasks within that

.. code-block:: python
from airflow.providers.openlineage.utils.opt_in import disable_lineage, enable_lineage
from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage
with DAG(...) as dag:
t1 = MyOperator(...)
Expand Down
4 changes: 2 additions & 2 deletions scripts/ci/docker-compose/integration-openlineage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
version: "3.8"
services:
marquez:
image: marquezproject/marquez:0.4.0
image: marquezproject/marquez:0.40.0
labels:
breeze.description: "Integration required for Openlineage hooks."
environment:
Expand All @@ -34,7 +34,7 @@ services:
entrypoint: ["./entrypoint.sh"]

marquez_web:
image: marquezproject/marquez-web:0.4.0
image: marquezproject/marquez-web:0.40.0
environment:
- MARQUEZ_HOST=marquez
- MARQUEZ_PORT=5000
Expand Down
Loading

0 comments on commit e0a08d9

Please sign in to comment.