Skip to content

Commit

Permalink
[CP][EWT-548][AIRFLOW-6527] Make send_task_to_executor timeout config…
Browse files Browse the repository at this point in the history
…urable (apache#63)

CP of f757a54

Commit already available in 1.10.12
  • Loading branch information
msumit committed Nov 19, 2020
1 parent 48be0f9 commit d8c473e
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 3 deletions.
4 changes: 4 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,10 @@ ssl_cacert =
# https://docs.celeryproject.org/en/latest/userguide/concurrency/eventlet.html
pool = prefork

# The number of seconds to wait before timing out ``send_task_to_executor`` or
# ``fetch_celery_task_state`` operations.
operation_timeout = 10

[celery_broker_transport_options]
# This section is for specifying options which can be passed to the
# underlying celery broker transport. See:
Expand Down
6 changes: 4 additions & 2 deletions airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

CELERY_SEND_ERR_MSG_HEADER = 'Error sending Celery task'

OPERATION_TIMEOUT = configuration.getint('celery', 'operation_timeout', fallback=2)

'''
To start the celery worker, run the command:
airflow worker
Expand Down Expand Up @@ -100,7 +102,7 @@ def fetch_celery_task_state(celery_task):
"""

try:
with timeout(seconds=2):
with timeout(seconds=OPERATION_TIMEOUT):
# Accessing state property of celery task will make actual network request
# to get the current state of the task.
res = (celery_task[0], celery_task[1].state)
Expand All @@ -114,7 +116,7 @@ def fetch_celery_task_state(celery_task):
def send_task_to_executor(task_tuple):
key, simple_ti, command, queue, task = task_tuple
try:
with timeout(seconds=2):
with timeout(seconds=OPERATION_TIMEOUT):
result = task.apply_async(args=[command], queue=queue)
except Exception as e:
exception_traceback = "Celery Task ID: {}\n{}".format(key,
Expand Down
2 changes: 1 addition & 1 deletion airflow/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
# under the License.
#

version = '1.10.4+twtr23'
version = '1.10.4+twtr24'

0 comments on commit d8c473e

Please sign in to comment.