From 616240276133f450e6966ff5ede863d7a81bc3ff Mon Sep 17 00:00:00 2001 From: Sumit Maheshwari Date: Thu, 24 Sep 2020 22:07:51 +0530 Subject: [PATCH] [TWTTR] Don't enqueue tasks again if already queued for K8sExecutor(#60) Basically reverting commit 87fcc1c and making changes specifically into the Celery Executor class only. --- airflow/executors/base_executor.py | 4 ++-- airflow/executors/celery_executor.py | 8 ++++++++ airflow/version.py | 2 +- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 4787b9f021684..a1442dab24654 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -57,9 +57,9 @@ def queue_command(self, simple_task_instance, command, priority=1, queue=None): key = simple_task_instance.key if key not in self.queued_tasks and key not in self.running: self.log.info("Adding to queue: %s", command) + self.queued_tasks[key] = (command, priority, queue, simple_task_instance) else: - self.log.info("Adding to queue even though already queued or running {}".format(command, key)) - self.queued_tasks[key] = (command, priority, queue, simple_task_instance) + self.log.error("could not queue task %s", key) def queue_task_instance( self, diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 2c108bc7c76c3..4302253e6bd39 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -155,6 +155,14 @@ def start(self): self._sync_parallelism ) + def queue_command(self, simple_task_instance, command, priority=1, queue=None): + key = simple_task_instance.key + if key not in self.queued_tasks and key not in self.running: + self.log.info("Adding to queue: %s", command) + else: + self.log.info("Adding to queue even though already queued or running {}".format(command, key)) + self.queued_tasks[key] = (command, priority, queue, simple_task_instance) + def _num_tasks_per_send_process(self, to_send_count): """ How many Celery tasks should each worker process send. diff --git a/airflow/version.py b/airflow/version.py index b8a4b08d49281..39582d1ce56dc 100644 --- a/airflow/version.py +++ b/airflow/version.py @@ -18,5 +18,5 @@ # under the License. # -version = '1.10.4+twtr19' +version = '1.10.4+twtr21'