Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre-create Celery db result tables before running Celery worker #9719

Merged
merged 2 commits into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions airflow/cli/commands/celery_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import daemon
import psutil
import sqlalchemy.exc
from celery import maybe_patch_concurrency
from celery.bin import worker as worker_bin
from daemon.pidfile import TimeoutPIDLockFile
Expand Down Expand Up @@ -112,6 +113,22 @@ def worker(args):
log=args.log_file,
)

if hasattr(celery_app.backend, 'ResultSession'):
# Pre-create the database tables now, otherwise SQLA via Celery has a
# race condition where one of the subprocesses can die with "Table
# already exists" error, because SQLA checks for which tables exist,
# then issues a CREATE TABLE, rather than doing CREATE TABLE IF NOT
# EXISTS
try:
session = celery_app.backend.ResultSession()
session.close()
except sqlalchemy.exc.IntegrityError:
# At least on postgres, trying to create a table that already exist
# gives a unique constraint violation or the
# "pg_type_typname_nsp_index" table. If this happens we can ignore
# it, we raced to create the tables and lost.
pass

# Setup Celery worker
worker_instance = worker_bin.worker(app=celery_app)
options = {
Expand Down
3 changes: 3 additions & 0 deletions tests/cli/commands/test_celery_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def test_validate_session_dbapi_exception(self, mock_session):

@pytest.mark.integration("redis")
@pytest.mark.integration("rabbitmq")
@pytest.mark.backend("mysql", "postgres")
class TestWorkerServeLogs(unittest.TestCase):

@classmethod
Expand Down Expand Up @@ -91,6 +92,7 @@ def test_skip_serve_logs_on_worker_start(self, mock_worker):
mock_popen.assert_not_called()


@pytest.mark.backend("mysql", "postgres")
class TestCeleryStopCommand(unittest.TestCase):
@classmethod
def setUpClass(cls):
Expand Down Expand Up @@ -144,6 +146,7 @@ def test_same_pid_file_is_used_in_start_and_stop(
mock_read_pid_from_pidfile.assert_called_once_with(pid_file)


@pytest.mark.backend("mysql", "postgres")
class TestWorkerStart(unittest.TestCase):
@classmethod
def setUpClass(cls):
Expand Down