From 7e87f8382892d8cafcd66db76709d39a1af0952a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 26 May 2020 11:43:17 +0100 Subject: [PATCH] Replace device_27_unique_idx bg update with a fg one (#7562) The bg update never managed to complete, because it kept being interrupted by transactions which want to take a lock. Just doing it in the foreground isn't that bad, and is a good deal simpler. --- UPGRADE.rst | 12 ++- changelog.d/7562.misc | 1 + synapse/storage/data_stores/main/devices.py | 34 ++------ ...lists_outbound_last_success_unique_idx.sql | 28 ------- .../schema/delta/58/06dlols_unique_idx.py | 80 +++++++++++++++++++ synapse/storage/database.py | 1 - synapse/storage/prepare_database.py | 13 ++- 7 files changed, 104 insertions(+), 65 deletions(-) create mode 100644 changelog.d/7562.misc delete mode 100644 synapse/storage/data_stores/main/schema/delta/58/04device_lists_outbound_last_success_unique_idx.sql create mode 100644 synapse/storage/data_stores/main/schema/delta/58/06dlols_unique_idx.py diff --git a/UPGRADE.rst b/UPGRADE.rst index 41c47e964d57..3b5627e85214 100644 --- a/UPGRADE.rst +++ b/UPGRADE.rst @@ -75,9 +75,15 @@ for example: wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb -Upgrading to v1.13.0 +Upgrading to v1.14.0 ==================== +This version includes a database update which is run as part of the upgrade, +and which may take a couple of minutes in the case of a large server. Synapse +will not respond to HTTP requests while this update is taking place. + +Upgrading to v1.13.0 +==================== Incorrect database migration in old synapse versions ---------------------------------------------------- @@ -136,12 +142,12 @@ back to v1.12.4 you need to: 2. Decrease the schema version in the database: .. code:: sql - + UPDATE schema_version SET version = 57; 3. Downgrade Synapse by following the instructions for your installation method in the "Rolling back to older versions" section above. - + Upgrading to v1.12.0 ==================== diff --git a/changelog.d/7562.misc b/changelog.d/7562.misc new file mode 100644 index 000000000000..3c25cd991730 --- /dev/null +++ b/changelog.d/7562.misc @@ -0,0 +1 @@ +Improve performance of `mark_as_sent_devices_by_remote`. diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index 417ac8dc7c16..fb9f798e29b7 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -55,10 +55,6 @@ BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes" -BG_UPDATE_DROP_DEVICE_LISTS_OUTBOUND_LAST_SUCCESS_NON_UNIQUE_IDX = ( - "drop_device_lists_outbound_last_success_non_unique_idx" -) - class DeviceWorkerStore(SQLBaseStore): def get_device(self, user_id, device_id): @@ -749,19 +745,13 @@ def __init__(self, database: Database, db_conn, hs): BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES, self._remove_duplicate_outbound_pokes, ) - # create a unique index on device_lists_outbound_last_success - self.db.updates.register_background_index_update( + # a pair of background updates that were added during the 1.14 release cycle, + # but replaced with 58/06dlols_unique_idx.py + self.db.updates.register_noop_background_update( "device_lists_outbound_last_success_unique_idx", - index_name="device_lists_outbound_last_success_unique_idx", - table="device_lists_outbound_last_success", - columns=["destination", "user_id"], - unique=True, ) - - # once that completes, we can remove the old non-unique index. - self.db.updates.register_background_update_handler( - BG_UPDATE_DROP_DEVICE_LISTS_OUTBOUND_LAST_SUCCESS_NON_UNIQUE_IDX, - self._drop_device_lists_outbound_last_success_non_unique_idx, + self.db.updates.register_noop_background_update( + "drop_device_lists_outbound_last_success_non_unique_idx", ) @defer.inlineCallbacks @@ -838,20 +828,6 @@ def _txn(txn): return rows - async def _drop_device_lists_outbound_last_success_non_unique_idx( - self, progress, batch_size - ): - def f(txn): - txn.execute("DROP INDEX IF EXISTS device_lists_outbound_last_success_idx") - - await self.db.runInteraction( - "drop_device_lists_outbound_last_success_non_unique_idx", f, - ) - await self.db.updates._end_background_update( - BG_UPDATE_DROP_DEVICE_LISTS_OUTBOUND_LAST_SUCCESS_NON_UNIQUE_IDX - ) - return 1 - class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): def __init__(self, database: Database, db_conn, hs): diff --git a/synapse/storage/data_stores/main/schema/delta/58/04device_lists_outbound_last_success_unique_idx.sql b/synapse/storage/data_stores/main/schema/delta/58/04device_lists_outbound_last_success_unique_idx.sql deleted file mode 100644 index d5e6deb87803..000000000000 --- a/synapse/storage/data_stores/main/schema/delta/58/04device_lists_outbound_last_success_unique_idx.sql +++ /dev/null @@ -1,28 +0,0 @@ -/* Copyright 2020 The Matrix.org Foundation C.I.C - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - --- register a background update which will create a unique index on --- device_lists_outbound_last_success -INSERT into background_updates (ordering, update_name, progress_json) - VALUES (5804, 'device_lists_outbound_last_success_unique_idx', '{}'); - --- once that completes, we can drop the old index. -INSERT into background_updates (ordering, update_name, progress_json, depends_on) - VALUES ( - 5804, - 'drop_device_lists_outbound_last_success_non_unique_idx', - '{}', - 'device_lists_outbound_last_success_unique_idx' - ); diff --git a/synapse/storage/data_stores/main/schema/delta/58/06dlols_unique_idx.py b/synapse/storage/data_stores/main/schema/delta/58/06dlols_unique_idx.py new file mode 100644 index 000000000000..d353f2bcb361 --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/58/06dlols_unique_idx.py @@ -0,0 +1,80 @@ +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This migration rebuilds the device_lists_outbound_last_success table without duplicate +entries, and with a UNIQUE index. +""" + +import logging +from io import StringIO + +from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine +from synapse.storage.prepare_database import execute_statements_from_stream +from synapse.storage.types import Cursor + +logger = logging.getLogger(__name__) + + +def run_upgrade(*args, **kwargs): + pass + + +def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs): + # some instances might already have this index, in which case we can skip this + if isinstance(database_engine, PostgresEngine): + cur.execute( + """ + SELECT 1 FROM pg_class WHERE relkind = 'i' + AND relname = 'device_lists_outbound_last_success_unique_idx' + """ + ) + + if cur.rowcount: + logger.info( + "Unique index exists on device_lists_outbound_last_success: " + "skipping rebuild" + ) + return + + logger.info("Rebuilding device_lists_outbound_last_success with unique index") + execute_statements_from_stream(cur, StringIO(_rebuild_commands)) + + +# there might be duplicates, so the easiest way to achieve this is to create a new +# table with the right data, and renaming it into place + +_rebuild_commands = """ +DROP TABLE IF EXISTS device_lists_outbound_last_success_new; + +CREATE TABLE device_lists_outbound_last_success_new ( + destination TEXT NOT NULL, + user_id TEXT NOT NULL, + stream_id BIGINT NOT NULL +); + +-- this took about 30 seconds on matrix.org's 16 million rows. +INSERT INTO device_lists_outbound_last_success_new + SELECT destination, user_id, MAX(stream_id) FROM device_lists_outbound_last_success + GROUP BY destination, user_id; + +-- and this another 30 seconds. +CREATE UNIQUE INDEX device_lists_outbound_last_success_unique_idx + ON device_lists_outbound_last_success_new (destination, user_id); + +DROP TABLE device_lists_outbound_last_success; + +ALTER TABLE device_lists_outbound_last_success_new + RENAME TO device_lists_outbound_last_success; +""" diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 9947dbce7787..b112ff3df2b7 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -78,7 +78,6 @@ "device_lists_remote_extremeties": "device_lists_remote_extremeties_unique_idx", "device_lists_remote_cache": "device_lists_remote_cache_unique_idx", "event_search": "event_search_event_id_idx", - "device_lists_outbound_last_success": "device_lists_outbound_last_success_unique_idx", } diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 640f2425846d..9afc145340bb 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -19,10 +19,12 @@ import os import re from collections import Counter +from typing import TextIO import attr from synapse.storage.engines.postgres import PostgresEngine +from synapse.storage.types import Cursor logger = logging.getLogger(__name__) @@ -479,8 +481,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams) ) logger.info("applying schema %s for %s", name, modname) - for statement in get_statements(stream): - cur.execute(statement) + execute_statements_from_stream(cur, stream) # Mark as done. cur.execute( @@ -538,8 +539,12 @@ def get_statements(f): def executescript(txn, schema_path): with open(schema_path, "r") as f: - for statement in get_statements(f): - txn.execute(statement) + execute_statements_from_stream(txn, f) + + +def execute_statements_from_stream(cur: Cursor, f: TextIO): + for statement in get_statements(f): + cur.execute(statement) def _get_or_create_schema_state(txn, database_engine):