Skip to content

Commit

Permalink
[TWTR][CX-17516] Requeue tasks in the queued state (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
aoen authored Dec 18, 2019
1 parent 54bd095 commit 87fcc1c
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 4 deletions.
4 changes: 2 additions & 2 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ def queue_command(self, task_instance, command, priority=1, queue=None):
key = task_instance.key
if key not in self.queued_tasks and key not in self.running:
self.log.info("Adding to queue: %s", command)
self.queued_tasks[key] = (command, priority, queue, task_instance)
else:
self.log.info("could not queue task {}".format(key))
self.log.info("Adding to queue even though already queued or running {}".format(command, key))
self.queued_tasks[key] = (command, priority, queue, task_instance)

def queue_task_instance(
self,
Expand Down
7 changes: 6 additions & 1 deletion airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1728,8 +1728,13 @@ def _execute_helper(self, processor_manager):
scheduled_dag_ids = ", ".join(simple_dag_bag.dag_ids)
self.log.info('DAGs to be executed: {}'.format(scheduled_dag_ids))

# TODO(CX-17516): State.QUEUED has been added here which is a hack as the Celery
# Executor does not reliably enqueue tasks with the my MySQL broker, and we have
# seen tasks hang after they get queued. The effect of this hack is queued tasks
# will constantly be requeued and resent to the executor (Celery).
# This should be removed when we switch away from the MySQL Celery backend.
self._execute_task_instances(simple_dag_bag,
(State.SCHEDULED,))
(State.SCHEDULED, State.QUEUED))

# Call heartbeats
self.log.debug("Heartbeating the executor")
Expand Down
2 changes: 1 addition & 1 deletion airflow/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
# under the License.
#

version = '1.10.0+twtr24'
version = '1.10.0+twtr25'

0 comments on commit 87fcc1c

Please sign in to comment.