Skip to content

Commit

Permalink
Deprecate DAG.run method (apache#42417)
Browse files Browse the repository at this point in the history
This method relies on local backfill mode, which is slated for removal in 3.0. We have suitable alternatives such as DAG.test() and triggering dags via API.
  • Loading branch information
dstandish committed Sep 24, 2024
1 parent 38eb5ce commit bd8d5ee
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 55 deletions.
9 changes: 9 additions & 0 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import sys
import time
import traceback
import warnings
import weakref
from collections import abc, defaultdict, deque
from contextlib import ExitStack
Expand Down Expand Up @@ -88,6 +89,7 @@
DuplicateTaskIdFound,
FailStopDagInvalidTriggerRule,
ParamValidationError,
RemovedInAirflow3Warning,
TaskDeferred,
TaskNotFound,
UnknownExecutorException,
Expand Down Expand Up @@ -2331,6 +2333,13 @@ def run(
:param run_at_least_once: If true, always run the DAG at least once even
if no logical run exists within the time range.
"""
warnings.warn(
"`DAG.run()` is deprecated and will be removed in Airflow 3.0. Consider "
"using `DAG.test()` instead, or trigger your dag via API.",
RemovedInAirflow3Warning,
stacklevel=2,
)

from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.backfill_job_runner import BackfillJobRunner

Expand Down
116 changes: 69 additions & 47 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from airflow.dag_processing.manager import DagFileProcessorAgent
from airflow.datasets import Dataset
from airflow.datasets.manager import DatasetManager
from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
from airflow.executors.base_executor import BaseExecutor
from airflow.executors.executor_constants import MOCK_EXECUTOR
from airflow.executors.executor_loader import ExecutorLoader
Expand Down Expand Up @@ -2838,6 +2838,10 @@ def evaluate_dagrun(
This is hackish: a dag run is created but its tasks are
run by a backfill.
"""

# todo: AIP-78 remove along with DAG.run()
# this only tests the backfill job runner, not the scheduler

if run_kwargs is None:
run_kwargs = {}

Expand Down Expand Up @@ -2898,40 +2902,49 @@ def test_dagrun_fail(self):
"""
DagRuns with one failed and one incomplete root task -> FAILED
"""
self.evaluate_dagrun(
dag_id="test_dagrun_states_fail",
expected_task_states={
"test_dagrun_fail": State.FAILED,
"test_dagrun_succeed": State.UPSTREAM_FAILED,
},
dagrun_state=State.FAILED,
)
# todo: AIP-78 remove along with DAG.run()
# this only tests the backfill job runner, not the scheduler
with pytest.warns(RemovedInAirflow3Warning):
self.evaluate_dagrun(
dag_id="test_dagrun_states_fail",
expected_task_states={
"test_dagrun_fail": State.FAILED,
"test_dagrun_succeed": State.UPSTREAM_FAILED,
},
dagrun_state=State.FAILED,
)

def test_dagrun_success(self):
"""
DagRuns with one failed and one successful root task -> SUCCESS
"""
self.evaluate_dagrun(
dag_id="test_dagrun_states_success",
expected_task_states={
"test_dagrun_fail": State.FAILED,
"test_dagrun_succeed": State.SUCCESS,
},
dagrun_state=State.SUCCESS,
)
# todo: AIP-78 remove along with DAG.run()
# this only tests the backfill job runner, not the scheduler
with pytest.warns(RemovedInAirflow3Warning):
self.evaluate_dagrun(
dag_id="test_dagrun_states_success",
expected_task_states={
"test_dagrun_fail": State.FAILED,
"test_dagrun_succeed": State.SUCCESS,
},
dagrun_state=State.SUCCESS,
)

def test_dagrun_root_fail(self):
"""
DagRuns with one successful and one failed root task -> FAILED
"""
self.evaluate_dagrun(
dag_id="test_dagrun_states_root_fail",
expected_task_states={
"test_dagrun_succeed": State.SUCCESS,
"test_dagrun_fail": State.FAILED,
},
dagrun_state=State.FAILED,
)
# todo: AIP-78 remove along with DAG.run()
# this only tests the backfill job runner, not the scheduler
with pytest.warns(RemovedInAirflow3Warning):
self.evaluate_dagrun(
dag_id="test_dagrun_states_root_fail",
expected_task_states={
"test_dagrun_succeed": State.SUCCESS,
"test_dagrun_fail": State.FAILED,
},
dagrun_state=State.FAILED,
)

def test_dagrun_root_fail_unfinished(self):
"""
Expand All @@ -2952,9 +2965,12 @@ def test_dagrun_root_fail_unfinished(self):
)
self.null_exec.mock_task_fail(dag_id, "test_dagrun_fail", dr.run_id)

for _ in _mock_executor(self.null_exec):
with pytest.raises(AirflowException):
dag.run(start_date=dr.execution_date, end_date=dr.execution_date)
# todo: AIP-78 remove this test along with DAG.run()
# this only tests the backfill job runner, not the scheduler
with pytest.warns(RemovedInAirflow3Warning):
for _ in _mock_executor(self.null_exec):
with pytest.raises(AirflowException):
dag.run(start_date=dr.execution_date, end_date=dr.execution_date)

# Mark the successful task as never having run since we want to see if the
# dagrun will be in a running state despite having an unfinished task.
Expand Down Expand Up @@ -2994,16 +3010,19 @@ def test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date(self):
if ignore_first_depends_on_past=True and the dagrun execution_date
is after the start_date.
"""
self.evaluate_dagrun(
dag_id="test_dagrun_states_deadlock",
expected_task_states={
"test_depends_on_past": State.SUCCESS,
"test_depends_on_past_2": State.SUCCESS,
},
dagrun_state=State.SUCCESS,
advance_execution_date=True,
run_kwargs=dict(ignore_first_depends_on_past=True),
)
# todo: AIP-78 remove along with DAG.run()
# this only tests the backfill job runner, not the scheduler
with pytest.warns(RemovedInAirflow3Warning):
self.evaluate_dagrun(
dag_id="test_dagrun_states_deadlock",
expected_task_states={
"test_depends_on_past": State.SUCCESS,
"test_depends_on_past_2": State.SUCCESS,
},
dagrun_state=State.SUCCESS,
advance_execution_date=True,
run_kwargs=dict(ignore_first_depends_on_past=True),
)

def test_dagrun_deadlock_ignore_depends_on_past(self):
"""
Expand All @@ -3012,15 +3031,18 @@ def test_dagrun_deadlock_ignore_depends_on_past(self):
test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date except
that start_date == execution_date so depends_on_past is irrelevant).
"""
self.evaluate_dagrun(
dag_id="test_dagrun_states_deadlock",
expected_task_states={
"test_depends_on_past": State.SUCCESS,
"test_depends_on_past_2": State.SUCCESS,
},
dagrun_state=State.SUCCESS,
run_kwargs=dict(ignore_first_depends_on_past=True),
)
# todo: AIP-78 remove along with DAG.run()
# this only tests the backfill job runner, not the scheduler
with pytest.warns(RemovedInAirflow3Warning):
self.evaluate_dagrun(
dag_id="test_dagrun_states_deadlock",
expected_task_states={
"test_depends_on_past": State.SUCCESS,
"test_depends_on_past_2": State.SUCCESS,
},
dagrun_state=State.SUCCESS,
run_kwargs=dict(ignore_first_depends_on_past=True),
)

@pytest.mark.parametrize(
"configs",
Expand Down
20 changes: 12 additions & 8 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
AirflowException,
DuplicateTaskIdFound,
ParamValidationError,
RemovedInAirflow3Warning,
UnknownExecutorException,
)
from airflow.executors import executor_loader
Expand Down Expand Up @@ -2733,14 +2734,17 @@ def test_dataset_expression(self, session: Session) -> None:

@mock.patch("airflow.models.dag.run_job")
def test_dag_executors(self, run_job_mock):
dag = DAG(dag_id="test", schedule=None)
reload(executor_loader)
with conf_vars({("core", "executor"): "SequentialExecutor"}):
dag.run()
assert isinstance(run_job_mock.call_args_list[0].kwargs["job"].executor, SequentialExecutor)

dag.run(local=True)
assert isinstance(run_job_mock.call_args_list[1].kwargs["job"].executor, LocalExecutor)
# todo: AIP-78 remove along with DAG.run()
# this only tests the backfill job runner, not the scheduler
with pytest.warns(RemovedInAirflow3Warning):
dag = DAG(dag_id="test", schedule=None)
reload(executor_loader)
with conf_vars({("core", "executor"): "SequentialExecutor"}):
dag.run()
assert isinstance(run_job_mock.call_args_list[0].kwargs["job"].executor, SequentialExecutor)

dag.run(local=True)
assert isinstance(run_job_mock.call_args_list[1].kwargs["job"].executor, LocalExecutor)


class TestQueries:
Expand Down

0 comments on commit bd8d5ee

Please sign in to comment.