Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refuse to upgrade database on worker processes #8266

Merged
merged 4 commits into from
Sep 7, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8266.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Do not attempt to upgrade upgrade database schema on worker processes.
63 changes: 46 additions & 17 deletions synapse/storage/prepare_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,25 +71,36 @@ def prepare_database(db_conn, database_engine, config, databases=["main", "state
if version_info:
user_version, delta_files, upgraded = version_info

# config should only be None when we are preparing an in-memory SQLite db,
# which should be empty.
if config is None:
if user_version != SCHEMA_VERSION:
# If we don't pass in a config file then we are expecting to
# have already upgraded the DB.
raise UpgradeDatabaseException(
"Expected database schema version %i but got %i"
% (SCHEMA_VERSION, user_version)
)
else:
_upgrade_existing_database(
cur,
user_version,
delta_files,
upgraded,
database_engine,
config,
databases=databases,
raise ValueError(
"config==None in prepare_database, but databse is not empty"
)

# if it's a worker app, refuse to upgrade the database, to avoid multiple
# workers doing it at once.
if config.worker_app is not None and user_version != SCHEMA_VERSION:
raise UpgradeDatabaseException(
"Expected database schema version %i but got %i"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to update these error messages to say that master should be started first? I believe these are fairly user visible errors and so it'd be good to be clear how to fix the problem.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair. done, I hope.

% (SCHEMA_VERSION, user_version)
)

_upgrade_existing_database(
cur,
user_version,
delta_files,
upgraded,
database_engine,
config,
databases=databases,
)
else:
# if it's a worker app, refuse to upgrade the database, to avoid multiple
# workers doing it at once.
if config and config.worker_app is not None:
raise UpgradeDatabaseException("Database schema uninitialised.")

_setup_new_database(cur, database_engine, databases=databases)

# check if any of our configured dynamic modules want a database
Expand Down Expand Up @@ -295,6 +306,8 @@ def _upgrade_existing_database(
else:
assert config

is_worker = config and config.worker_app is not None

if current_version > SCHEMA_VERSION:
raise ValueError(
"Cannot use this database as it is too "
Expand Down Expand Up @@ -322,7 +335,7 @@ def _upgrade_existing_database(
specific_engine_extensions = (".sqlite", ".postgres")

for v in range(start_ver, SCHEMA_VERSION + 1):
logger.info("Upgrading schema to v%d", v)
logger.info("Applying schema deltas for v%d", v)

# We need to search both the global and per data store schema
# directories for schema updates.
Expand Down Expand Up @@ -382,9 +395,15 @@ def _upgrade_existing_database(
continue

root_name, ext = os.path.splitext(file_name)

if ext == ".py":
# This is a python upgrade module. We need to import into some
# package and then execute its `run_upgrade` function.
if is_worker:
raise PrepareDatabaseException(
"Not applying delta %s on worker process", relative_path
)

module_name = "synapse.storage.v%d_%s" % (v, root_name)
with open(absolute_path) as python_file:
module = imp.load_source(module_name, absolute_path, python_file)
Expand All @@ -399,10 +418,18 @@ def _upgrade_existing_database(
continue
elif ext == ".sql":
# A plain old .sql file, just read and execute it
if is_worker:
raise PrepareDatabaseException(
"Not applying delta %s on worker process", relative_path
)
logger.info("Applying schema %s", relative_path)
executescript(cur, absolute_path)
elif ext == specific_engine_extension and root_name.endswith(".sql"):
# A .sql file specific to our engine; just read and execute it
if is_worker:
raise PrepareDatabaseException(
"Not applying delta %s on worker process", relative_path
)
logger.info("Applying engine-specific schema %s", relative_path)
executescript(cur, absolute_path)
elif ext in specific_engine_extensions and root_name.endswith(".sql"):
Expand Down Expand Up @@ -432,6 +459,8 @@ def _upgrade_existing_database(
(v, True),
)

logger.info("Schema now up to date")


def _apply_module_schemas(txn, database_engine, config):
"""Apply the module schemas for the dynamic modules, if any
Expand Down