From 87fcc1c54e409d1958476182c5289cc1ae7b8003 Mon Sep 17 00:00:00 2001 From: aoen Date: Wed, 18 Dec 2019 16:15:54 +0200 Subject: [PATCH] [TWTR][CX-17516] Requeue tasks in the queued state (#27) --- airflow/executors/base_executor.py | 4 ++-- airflow/jobs.py | 7 ++++++- airflow/version.py | 2 +- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 701ac66f8b72..b71d9685bcc9 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -54,9 +54,9 @@ def queue_command(self, task_instance, command, priority=1, queue=None): key = task_instance.key if key not in self.queued_tasks and key not in self.running: self.log.info("Adding to queue: %s", command) - self.queued_tasks[key] = (command, priority, queue, task_instance) else: - self.log.info("could not queue task {}".format(key)) + self.log.info("Adding to queue even though already queued or running {}".format(command, key)) + self.queued_tasks[key] = (command, priority, queue, task_instance) def queue_task_instance( self, diff --git a/airflow/jobs.py b/airflow/jobs.py index bfdfc5e3d186..b5a05ff70586 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1728,8 +1728,13 @@ def _execute_helper(self, processor_manager): scheduled_dag_ids = ", ".join(simple_dag_bag.dag_ids) self.log.info('DAGs to be executed: {}'.format(scheduled_dag_ids)) + # TODO(CX-17516): State.QUEUED has been added here which is a hack as the Celery + # Executor does not reliably enqueue tasks with the my MySQL broker, and we have + # seen tasks hang after they get queued. The effect of this hack is queued tasks + # will constantly be requeued and resent to the executor (Celery). + # This should be removed when we switch away from the MySQL Celery backend. self._execute_task_instances(simple_dag_bag, - (State.SCHEDULED,)) + (State.SCHEDULED, State.QUEUED)) # Call heartbeats self.log.debug("Heartbeating the executor") diff --git a/airflow/version.py b/airflow/version.py index 4cf0c63e5a6a..8959caccb68a 100644 --- a/airflow/version.py +++ b/airflow/version.py @@ -18,4 +18,4 @@ # under the License. # -version = '1.10.0+twtr24' +version = '1.10.0+twtr25'