From d8c473e9119286f5fdb769880134c76f40bf42f6 Mon Sep 17 00:00:00 2001 From: Sumit Maheshwari Date: Thu, 19 Nov 2020 14:41:25 +0530 Subject: [PATCH] [CP][EWT-548][AIRFLOW-6527] Make send_task_to_executor timeout configurable (#63) CP of f757a54 Commit already available in 1.10.12 --- airflow/config_templates/default_airflow.cfg | 4 ++++ airflow/executors/celery_executor.py | 6 ++++-- airflow/version.py | 2 +- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 78a596a62312..ab454d61087d 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -453,6 +453,10 @@ ssl_cacert = # https://docs.celeryproject.org/en/latest/userguide/concurrency/eventlet.html pool = prefork +# The number of seconds to wait before timing out ``send_task_to_executor`` or +# ``fetch_celery_task_state`` operations. +operation_timeout = 10 + [celery_broker_transport_options] # This section is for specifying options which can be passed to the # underlying celery broker transport. See: diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 4302253e6bd3..f86c49f70720 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -40,6 +40,8 @@ CELERY_SEND_ERR_MSG_HEADER = 'Error sending Celery task' +OPERATION_TIMEOUT = configuration.getint('celery', 'operation_timeout', fallback=2) + ''' To start the celery worker, run the command: airflow worker @@ -100,7 +102,7 @@ def fetch_celery_task_state(celery_task): """ try: - with timeout(seconds=2): + with timeout(seconds=OPERATION_TIMEOUT): # Accessing state property of celery task will make actual network request # to get the current state of the task. res = (celery_task[0], celery_task[1].state) @@ -114,7 +116,7 @@ def fetch_celery_task_state(celery_task): def send_task_to_executor(task_tuple): key, simple_ti, command, queue, task = task_tuple try: - with timeout(seconds=2): + with timeout(seconds=OPERATION_TIMEOUT): result = task.apply_async(args=[command], queue=queue) except Exception as e: exception_traceback = "Celery Task ID: {}\n{}".format(key, diff --git a/airflow/version.py b/airflow/version.py index f4ce77bf918a..a3c1ec4aa49d 100644 --- a/airflow/version.py +++ b/airflow/version.py @@ -18,5 +18,5 @@ # under the License. # -version = '1.10.4+twtr23' +version = '1.10.4+twtr24'