Skip to content

Commit

Permalink
Handle IntegrityError while creating TIs (apache#10136)
Browse files Browse the repository at this point in the history
While doing a trigger_dag from UI, DagRun gets created first and then WebServer starts creating TIs. Meanwhile, Scheduler also picks up the DagRun and starts creating the TIs, which results in IntegrityError as the Primary key constraint gets violated. This happens when a DAG has a good number of tasks.

Also, changing the TIs array with a set for faster lookups for Dags with too many tasks.
  • Loading branch information
msumit committed Aug 7, 2020
1 parent d2540e6 commit 2102122
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 3 deletions.
14 changes: 11 additions & 3 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from sqlalchemy import (
Boolean, Column, DateTime, Index, Integer, PickleType, String, UniqueConstraint, and_, func, or_,
)
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import synonym
from sqlalchemy.orm.session import Session
Expand Down Expand Up @@ -439,10 +440,10 @@ def verify_integrity(self, session=None):
tis = self.get_task_instances(session=session)

# check for removed or restored tasks
task_ids = []
task_ids = set()
for ti in tis:
task_instance_mutation_hook(ti)
task_ids.append(ti.task_id)
task_ids.add(ti.task_id)
task = None
try:
task = dag.get_task(ti.task_id)
Expand Down Expand Up @@ -477,7 +478,14 @@ def verify_integrity(self, session=None):
task_instance_mutation_hook(ti)
session.add(ti)

session.commit()
try:
session.commit()
except IntegrityError as err:
self.log.info(str(err))
self.log.info('Hit IntegrityError while creating the TIs for '
f'{dag.dag_id} - {self.execution_date}.')
self.log.info('Doing session rollback.')
session.rollback()

@staticmethod
def get_run(session, dag_id, execution_date):
Expand Down
19 changes: 19 additions & 0 deletions tests/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,25 @@ def with_all_tasks_removed(dag):
flaky_ti.refresh_from_db()
self.assertEqual(State.NONE, flaky_ti.state)

def test_already_added_task_instances_can_be_ignored(self):
dag = DAG('triggered_dag', start_date=DEFAULT_DATE)
dag.add_task(DummyOperator(task_id='first_task', owner='test'))

dagrun = self.create_dag_run(dag)
first_ti = dagrun.get_task_instances()[0]
self.assertEqual('first_task', first_ti.task_id)
self.assertEqual(State.NONE, first_ti.state)

# Lets assume that the above TI was added into DB by webserver, but if scheduler
# is running the same method at the same time it would find 0 TIs for this dag
# and proceeds further to create TIs. Hence mocking DagRun.get_task_instances
# method to return an empty list of TIs.
with mock.patch.object(DagRun, 'get_task_instances') as mock_gtis:
mock_gtis.return_value = []
dagrun.verify_integrity()
first_ti.refresh_from_db()
self.assertEqual(State.NONE, first_ti.state)

@parameterized.expand([(state,) for state in State.task_states])
@mock.patch('airflow.models.dagrun.task_instance_mutation_hook')
def test_task_instance_mutation_hook(self, state, mock_hook):
Expand Down

0 comments on commit 2102122

Please sign in to comment.