From f757a54be6ca903fc30ae82edbd1fd330accd678 Mon Sep 17 00:00:00 2001 From: yuqian90 Date: Sun, 2 Feb 2020 17:44:36 +0800 Subject: [PATCH] [AIRFLOW-6527] Make send_task_to_executor timeout configurable (#7143) --- airflow/config_templates/config.yml | 8 ++++++++ airflow/config_templates/default_airflow.cfg | 4 ++++ airflow/executors/celery_executor.py | 6 ++++-- tests/executors/test_celery_executor.py | 4 ++++ 4 files changed, 20 insertions(+), 2 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 33e0db8fcbd9..72b8f452acfa 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1176,6 +1176,14 @@ type: string example: ~ default: "prefork" + - name: operation_timeout + description: | + The number of seconds to wait before timing out ``send_task_to_executor`` or + ``fetch_celery_task_state`` operations. + version_added: ~ + type: int + example: ~ + default: "2" - name: celery_broker_transport_options description: | This section is for specifying options which can be passed to the diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index abc538a0f0ce..f66854e2bfee 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -563,6 +563,10 @@ ssl_cacert = # https://docs.celeryproject.org/en/latest/userguide/concurrency/eventlet.html pool = prefork +# The number of seconds to wait before timing out ``send_task_to_executor`` or +# ``fetch_celery_task_state`` operations. +operation_timeout = 2 + [celery_broker_transport_options] # This section is for specifying options which can be passed to the diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 78c417b7b688..963279961b78 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -41,6 +41,8 @@ CELERY_SEND_ERR_MSG_HEADER = 'Error sending Celery task' +OPERATION_TIMEOUT = conf.getint('celery', 'operation_timeout', fallback=2) + ''' To start the celery worker, run the command: airflow celery worker @@ -102,7 +104,7 @@ def fetch_celery_task_state(celery_task: Tuple[TaskInstanceKeyType, AsyncResult] """ try: - with timeout(seconds=2): + with timeout(seconds=OPERATION_TIMEOUT): # Accessing state property of celery task will make actual network request # to get the current state of the task. return celery_task[0], celery_task[1].state @@ -122,7 +124,7 @@ def send_task_to_executor(task_tuple: TaskInstanceInCelery) \ """Sends task to executor.""" key, _, command, queue, task_to_run = task_tuple try: - with timeout(seconds=2): + with timeout(seconds=OPERATION_TIMEOUT): result = task_to_run.apply_async(args=[command], queue=queue) except Exception as e: # pylint: disable=broad-except exception_traceback = "Celery Task ID: {}\n{}".format(key, traceback.format_exc()) diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py index 9ec3fa0336cd..adcc40361058 100644 --- a/tests/executors/test_celery_executor.py +++ b/tests/executors/test_celery_executor.py @@ -187,5 +187,9 @@ def test_gauge_executor_metrics(self, mock_stats_gauge, mock_trigger_tasks, mock mock_stats_gauge.assert_has_calls(calls) +def test_operation_timeout_config(): + assert celery_executor.OPERATION_TIMEOUT == 2 + + if __name__ == '__main__': unittest.main()