Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dag.clear() to set multiple dags to running when necessary #15382

Merged
merged 7 commits into from
May 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ not have any effect in an existing deployment where the ``default_pool`` already

Previously this was controlled by `non_pooled_task_slot_count` in `[core]` section, which was not documented.

### `activate_dag_runs` argument of the function `clear_task_instances` is replaced with `dag_run_state`

To achieve the previous default behaviour of `clear_task_instances` with `activate_dag_runs=True`, no change is needed. To achieve the previous behaviour of `activate_dag_runs=False`, pass `dag_run_state=False` instead.

### `dag.set_dag_runs_state` is deprecated

The method `set_dag_runs_state` is no longer needed after a bug fix in PR: [#15382](https://github.com/apache/airflow/pull/15382). This method is now deprecated and will be removed in a future version.

## Airflow 2.1.0

### New "deprecated_api" extra
Expand Down
12 changes: 1 addition & 11 deletions airflow/api_connexion/endpoints/task_instance_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,18 +251,8 @@ def post_clear_task_instances(dag_id: str, session=None):
task_instances = dag.clear(get_tis=True, **data)
if not data["dry_run"]:
clear_task_instances(
task_instances,
session,
dag=dag,
activate_dag_runs=False, # We will set DagRun state later.
task_instances, session, dag=dag, dag_run_state=State.RUNNING if reset_dag_runs else False
)
if reset_dag_runs:
dag.set_dag_runs_state(
yuqian90 marked this conversation as resolved.
Show resolved Hide resolved
session=session,
start_date=data["start_date"],
end_date=data["end_date"],
state=State.RUNNING,
)
task_instances = task_instances.join(
DR, and_(DR.dag_id == TI.dag_id, DR.execution_date == TI.execution_date)
).add_column(DR.run_id)
Expand Down
21 changes: 8 additions & 13 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,11 @@ def set_dag_runs_state(
end_date: Optional[datetime] = None,
dag_ids: List[str] = None,
) -> None:
warnings.warn(
"This method is deprecated and will be removed in a future version.",
DeprecationWarning,
stacklevel=2,
)
dag_ids = dag_ids or [self.dag_id]
query = session.query(DagRun).filter(DagRun.dag_id.in_(dag_ids))
if start_date:
Expand Down Expand Up @@ -1175,7 +1180,8 @@ def clear(
:type include_subdags: bool
:param include_parentdag: Clear tasks in the parent dag of the subdag.
:type include_parentdag: bool
:param dag_run_state: state to set DagRun to
:param dag_run_state: state to set DagRun to. If set to False, dagrun state will not
be changed.
:param dry_run: Find the tasks to clear but don't clear them.
:type dry_run: bool
:param session: The sqlalchemy session to use
Expand All @@ -1196,20 +1202,17 @@ def clear(
"""
TI = TaskInstance
tis = session.query(TI)
dag_ids = []
if include_subdags:
# Crafting the right filter for dag_id and task_ids combo
conditions = []
for dag in self.subdags + [self]:
conditions.append((TI.dag_id == dag.dag_id) & TI.task_id.in_(dag.task_ids))
dag_ids.append(dag.dag_id)
tis = tis.filter(or_(*conditions))
else:
tis = session.query(TI).filter(TI.dag_id == self.dag_id)
tis = tis.filter(TI.task_id.in_(self.task_ids))

if include_parentdag and self.is_subdag and self.parent_dag is not None:
dag_ids.append(self.parent_dag.dag_id)
p_dag = self.parent_dag.sub_dag(
task_ids_or_regex=r"^{}$".format(self.dag_id.split('.')[1]),
include_upstream=False,
Expand Down Expand Up @@ -1343,15 +1346,7 @@ def clear(
tis,
session,
dag=self,
activate_dag_runs=False, # We will set DagRun state later.
)

self.set_dag_runs_state(
session=session,
start_date=start_date,
end_date=end_date,
state=dag_run_state,
dag_ids=dag_ids,
dag_run_state=dag_run_state,
)
else:
count = 0
Expand Down
19 changes: 13 additions & 6 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def set_error_file(error_file: str, error: Union[str, Exception]) -> None:
def clear_task_instances(
tis,
session,
activate_dag_runs=True,
dag_run_state: str = State.RUNNING,
dag=None,
):
"""
Expand All @@ -142,7 +142,8 @@ def clear_task_instances(

:param tis: a list of task instances
:param session: current session
:param activate_dag_runs: flag to check for active dag run
:param dag_run_state: state to set DagRun to. If set to False, dagrun state will not
be changed.
:param dag: DAG object
"""
job_ids = []
Expand Down Expand Up @@ -204,19 +205,25 @@ def clear_task_instances(
for job in session.query(BaseJob).filter(BaseJob.id.in_(job_ids)).all(): # noqa
job.state = State.SHUTDOWN

if activate_dag_runs and tis:
if dag_run_state is not False and tis:
from airflow.models.dagrun import DagRun # Avoid circular import

dates_by_dag_id = defaultdict(set)
for instance in tis:
dates_by_dag_id[instance.dag_id].add(instance.execution_date)

drs = (
session.query(DagRun)
.filter(
DagRun.dag_id.in_({ti.dag_id for ti in tis}),
DagRun.execution_date.in_({ti.execution_date for ti in tis}),
or_(
and_(DagRun.dag_id == dag_id, DagRun.execution_date.in_(dates))
yuqian90 marked this conversation as resolved.
Show resolved Hide resolved
for dag_id, dates in dates_by_dag_id.items()
)
)
.all()
)
for dr in drs:
dr.state = State.RUNNING
dr.state = dag_run_state
dr.start_date = timezone.utcnow()


Expand Down
96 changes: 96 additions & 0 deletions tests/sensors/test_external_task_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,53 @@ def dag_bag_ext():
return dag_bag


@pytest.fixture
def dag_bag_parent_child():
"""
Create a DagBag with two DAGs looking like this. task_1 of child_dag_1 on day 1 depends on
task_0 of parent_dag_0 on day 1. Therefore, when task_0 of parent_dag_0 on day 1 and day 2
are cleared, parent_dag_0 DagRuns need to be set to running on both days, but child_dag_1
only needs to be set to running on day 1.

day 1 day 2

parent_dag_0 task_0 task_0
|
|
v
child_dag_1 task_1 task_1

"""
dag_bag = DagBag(dag_folder=DEV_NULL, include_examples=False)

day_1 = DEFAULT_DATE

dag_0 = DAG("parent_dag_0", start_date=day_1, schedule_interval=None)
task_0 = ExternalTaskMarker(
task_id="task_0",
external_dag_id="child_dag_1",
external_task_id="task_1",
execution_date=day_1.isoformat(),
recursion_depth=3,
dag=dag_0,
)

dag_1 = DAG("child_dag_1", start_date=day_1, schedule_interval=None)
_ = ExternalTaskSensor(
task_id="task_1",
external_dag_id=dag_0.dag_id,
external_task_id=task_0.task_id,
execution_date_fn=lambda execution_date: day_1 if execution_date == day_1 else [],
mode='reschedule',
dag=dag_1,
)

for dag in [dag_0, dag_1]:
dag_bag.bag_dag(dag=dag, root_dag=dag)

return dag_bag


def run_tasks(dag_bag, execution_date=DEFAULT_DATE):
"""
Run all tasks in the DAGs in the given dag_bag. Return the TaskInstance objects as a dict
Expand Down Expand Up @@ -464,6 +511,55 @@ def test_external_task_marker_transitive(dag_bag_ext):
assert_ti_state_equal(ti_b_3, State.NONE)


# pylint: disable=redefined-outer-name
def test_external_task_marker_clear_activate(dag_bag_parent_child):
"""
Test clearing tasks across DAGs and make sure the right DagRuns are activated.
"""
from airflow.utils.session import create_session
from airflow.utils.types import DagRunType

dag_bag = dag_bag_parent_child
day_1 = DEFAULT_DATE
day_2 = DEFAULT_DATE + timedelta(days=1)

run_tasks(dag_bag, execution_date=day_1)
run_tasks(dag_bag, execution_date=day_2)

with create_session() as session:
for dag in dag_bag.dags.values():
for execution_date in [day_1, day_2]:
dagrun = dag.create_dagrun(
State.RUNNING, execution_date, run_type=DagRunType.MANUAL, session=session
)
dagrun.set_state(State.SUCCESS)
session.add(dagrun)

session.commit()

# Assert that dagruns of all the affected dags are set to SUCCESS before tasks are cleared.
for dag in dag_bag.dags.values():
for execution_date in [day_1, day_2]:
dagrun = dag.get_dagrun(execution_date=execution_date)
assert dagrun.state == State.SUCCESS

dag_0 = dag_bag.get_dag("parent_dag_0")
task_0 = dag_0.get_task("task_0")
clear_tasks(dag_bag, dag_0, task_0, start_date=day_1, end_date=day_2)

# Assert that dagruns of all the affected dags are set to RUNNING after tasks are cleared.
# Unaffected dagruns should be left as SUCCESS.
dagrun_0_1 = dag_bag.get_dag('parent_dag_0').get_dagrun(execution_date=day_1)
dagrun_0_2 = dag_bag.get_dag('parent_dag_0').get_dagrun(execution_date=day_2)
dagrun_1_1 = dag_bag.get_dag('child_dag_1').get_dagrun(execution_date=day_1)
dagrun_1_2 = dag_bag.get_dag('child_dag_1').get_dagrun(execution_date=day_2)

assert dagrun_0_1.state == State.RUNNING
assert dagrun_0_2.state == State.RUNNING
assert dagrun_1_1.state == State.RUNNING
assert dagrun_1_2.state == State.SUCCESS


def test_external_task_marker_future(dag_bag_ext):
"""
Test clearing tasks with no end_date. This is the case when users clear tasks with
Expand Down