From d272a696c8a5fa319e13422c5df9f4d0f5136339 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Fri, 11 Mar 2022 08:55:46 +0100 Subject: [PATCH 01/13] Use a sequence to generate AS transaction IDs, drop `last_txn` AS state Switching to a sequence means there's no need to track `last_txn` on the AS state table to generate new TXN IDs. This also means that there is no longer contention between the AS scheduler and AS handler on updates to the `application_services_state` table, which will prevent serialization errors during the complete AS txn transaction. --- synapse/storage/databases/main/appservice.py | 82 ++++++-------------- 1 file changed, 22 insertions(+), 60 deletions(-) diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 06944465582b..6ce03106fb47 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -29,6 +29,8 @@ from synapse.storage.database import DatabasePool, LoggingDatabaseConnection from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.roommember import RoomMemberWorkerStore +from synapse.storage.types import Cursor +from synapse.storage.util.sequence import build_sequence_generator from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.descriptors import _CacheContext, cached @@ -72,6 +74,21 @@ def __init__( ) self.exclusive_user_regex = _make_exclusive_regex(self.services_cache) + def get_max_as_txn_id(txn: Cursor) -> int: + txn.execute( + "SELECT COALESCE(max(txn_id), 0) FROM application_services_txns" + ) + return txn.fetchone()[0] # type: ignore + + self._as_txn_seq_gen = build_sequence_generator( + db_conn, + self.database_engine, + get_max_as_txn_id, + "application_services_txn_id_seq", + table="application_services_txns", + id_column="txn_id", + ) + super().__init__(database, db_conn, hs) def get_app_services(self): @@ -237,21 +254,7 @@ async def create_appservice_txn( """ def _create_appservice_txn(txn): - # work out new txn id (highest txn id for this service += 1) - # The highest id may be the last one sent (in which case it is last_txn) - # or it may be the highest in the txns list (which are waiting to be/are - # being sent) - last_txn_id = self._get_last_txn(txn, service.id) - - txn.execute( - "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?", - (service.id,), - ) - highest_txn_id = txn.fetchone()[0] - if highest_txn_id is None: - highest_txn_id = 0 - - new_txn_id = max(highest_txn_id, last_txn_id) + 1 + new_txn_id = self._as_txn_seq_gen.get_next_id_txn(txn) # Insert new txn into txn table event_ids = json_encoder.encode([e.event_id for e in events]) @@ -285,40 +288,10 @@ async def complete_appservice_txn( """ txn_id = int(txn_id) - def _complete_appservice_txn(txn): - # Debugging query: Make sure the txn being completed is EXACTLY +1 from - # what was there before. If it isn't, we've got problems (e.g. the AS - # has probably missed some events), so whine loudly but still continue, - # since it shouldn't fail completion of the transaction. - last_txn_id = self._get_last_txn(txn, service.id) - if (last_txn_id + 1) != txn_id: - logger.error( - "appservice: Completing a transaction which has an ID > 1 from " - "the last ID sent to this AS. We've either dropped events or " - "sent it to the AS out of order. FIX ME. last_txn=%s " - "completing_txn=%s service_id=%s", - last_txn_id, - txn_id, - service.id, - ) - - # Set current txn_id for AS to 'txn_id' - self.db_pool.simple_upsert_txn( - txn, - "application_services_state", - {"as_id": service.id}, - {"last_txn": txn_id}, - ) - - # Delete txn - self.db_pool.simple_delete_txn( - txn, - "application_services_txns", - {"txn_id": txn_id, "as_id": service.id}, - ) - - await self.db_pool.runInteraction( - "complete_appservice_txn", _complete_appservice_txn + await self.db_pool.simple_delete( + "application_services_txns", + {"txn_id": txn_id, "as_id": service.id}, + "delete_completed_as_txn", ) async def get_oldest_unsent_txn( @@ -372,17 +345,6 @@ def _get_oldest_unsent_txn(txn): unused_fallback_keys={}, ) - def _get_last_txn(self, txn, service_id: Optional[str]) -> int: - txn.execute( - "SELECT last_txn FROM application_services_state WHERE as_id=?", - (service_id,), - ) - last_txn_id = txn.fetchone() - if last_txn_id is None or last_txn_id[0] is None: # no row exists - return 0 - else: - return int(last_txn_id[0]) # select 'last_txn' col - async def set_appservice_last_pos(self, pos: int) -> None: def set_appservice_last_pos_txn(txn): txn.execute( From cceb2b9caa4bfd0137e46b3bf4328d151084a6cb Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Mon, 28 Mar 2022 11:22:43 +0100 Subject: [PATCH 02/13] Show warning when running slow AS sequence query --- synapse/storage/databases/main/appservice.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 6ce03106fb47..e4c3d97c7b87 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -75,6 +75,7 @@ def __init__( self.exclusive_user_regex = _make_exclusive_regex(self.services_cache) def get_max_as_txn_id(txn: Cursor) -> int: + logger.warning("Falling back to slow query, you should port to postgres") txn.execute( "SELECT COALESCE(max(txn_id), 0) FROM application_services_txns" ) From 722f6ce13e86402c47ec58b3b24964f93851828b Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Mon, 28 Mar 2022 11:23:32 +0100 Subject: [PATCH 03/13] Restore writing AS state `last_txn` for this release Writing this column can be removed in a later release once the migration to the sequence has been completed. --- synapse/storage/databases/main/appservice.py | 23 +++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index e4c3d97c7b87..122a6cc42222 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -287,12 +287,25 @@ async def complete_appservice_txn( txn_id: The transaction ID being completed. service: The application service which was sent this transaction. """ - txn_id = int(txn_id) - await self.db_pool.simple_delete( - "application_services_txns", - {"txn_id": txn_id, "as_id": service.id}, - "delete_completed_as_txn", + def _complete_appservice_txn(txn): + # Set current txn_id for AS to 'txn_id' + self.db_pool.simple_upsert_txn( + txn, + "application_services_state", + {"as_id": service.id}, + {"last_txn": txn_id}, + ) + + # Delete txn + self.db_pool.simple_delete_txn( + txn, + "application_services_txns", + {"txn_id": txn_id, "as_id": service.id}, + ) + + await self.db_pool.runInteraction( + "complete_appservice_txn", _complete_appservice_txn ) async def get_oldest_unsent_txn( From 81f5220751fbe9ce0c2887d81a238ebc7663b0f3 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Mon, 28 Mar 2022 11:26:58 +0100 Subject: [PATCH 04/13] Add changelog file --- changelog.d/12209.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/12209.misc diff --git a/changelog.d/12209.misc b/changelog.d/12209.misc new file mode 100644 index 000000000000..d9a1c93354ba --- /dev/null +++ b/changelog.d/12209.misc @@ -0,0 +1 @@ +Switch to using a sequence to generate AS transaction IDs. Contributed by Nick Beeper. From 6d35ceb32a5cad88855b5fd138f85296e576e672 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Mon, 28 Mar 2022 12:37:52 +0100 Subject: [PATCH 05/13] Fix database variable --- synapse/storage/databases/main/appservice.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 122a6cc42222..573131112b82 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -83,7 +83,7 @@ def get_max_as_txn_id(txn: Cursor) -> int: self._as_txn_seq_gen = build_sequence_generator( db_conn, - self.database_engine, + database.engine, get_max_as_txn_id, "application_services_txn_id_seq", table="application_services_txns", From ccc2e63779b210432b7af4655f0b7ba27ffffd3a Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Mon, 28 Mar 2022 12:39:31 +0100 Subject: [PATCH 06/13] Update AS storage tests for new sequence generated TXN IDs --- tests/storage/test_appservice.py | 53 ++------------------------------ 1 file changed, 3 insertions(+), 50 deletions(-) diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index ee599f433667..f756f0ade8aa 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -168,15 +168,6 @@ def _insert_txn(self, as_id, txn_id, events): (as_id, txn_id, json.dumps([e.event_id for e in events])), ) - def _set_last_txn(self, as_id, txn_id): - return self.db_pool.runOperation( - self.engine.convert_param_style( - "INSERT INTO application_services_state(as_id, last_txn, state) " - "VALUES(?,?,?)" - ), - (as_id, txn_id, ApplicationServiceState.UP.value), - ) - def test_get_appservice_state_none( self, ) -> None: @@ -274,12 +265,11 @@ def test_create_appservice_txn_first( self.assertEqual(txn.events, events) self.assertEqual(txn.service, service) - def test_create_appservice_txn_older_last_txn( + def test_create_appservice_txn_sequence_increment( self, ) -> None: service = Mock(id=self.as_list[0]["id"]) events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")]) - self.get_success(self._set_last_txn(service.id, 9643)) # AS is falling behind self.get_success(self._insert_txn(service.id, 9644, events)) self.get_success(self._insert_txn(service.id, 9645, events)) txn = self.get_success( @@ -289,43 +279,6 @@ def test_create_appservice_txn_older_last_txn( self.assertEqual(txn.events, events) self.assertEqual(txn.service, service) - def test_create_appservice_txn_up_to_date_last_txn( - self, - ) -> None: - service = Mock(id=self.as_list[0]["id"]) - events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")]) - self.get_success(self._set_last_txn(service.id, 9643)) - txn = self.get_success( - self.store.create_appservice_txn(service, events, [], [], {}, {}) - ) - self.assertEqual(txn.id, 9644) - self.assertEqual(txn.events, events) - self.assertEqual(txn.service, service) - - def test_create_appservice_txn_up_fuzzing( - self, - ) -> None: - service = Mock(id=self.as_list[0]["id"]) - events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")]) - self.get_success(self._set_last_txn(service.id, 9643)) - - # dump in rows with higher IDs to make sure the queries aren't wrong. - self.get_success(self._set_last_txn(self.as_list[1]["id"], 119643)) - self.get_success(self._set_last_txn(self.as_list[2]["id"], 9)) - self.get_success(self._set_last_txn(self.as_list[3]["id"], 9643)) - self.get_success(self._insert_txn(self.as_list[1]["id"], 119644, events)) - self.get_success(self._insert_txn(self.as_list[1]["id"], 119645, events)) - self.get_success(self._insert_txn(self.as_list[1]["id"], 119646, events)) - self.get_success(self._insert_txn(self.as_list[2]["id"], 10, events)) - self.get_success(self._insert_txn(self.as_list[3]["id"], 9643, events)) - - txn = self.get_success( - self.store.create_appservice_txn(service, events, [], [], {}, {}) - ) - self.assertEqual(txn.id, 9644) - self.assertEqual(txn.events, events) - self.assertEqual(txn.service, service) - def test_complete_appservice_txn_first_txn( self, ) -> None: @@ -359,13 +312,13 @@ def test_complete_appservice_txn_first_txn( ) self.assertEqual(0, len(res)) - def test_complete_appservice_txn_existing_in_state_table( + def test_complete_appservice_txn_updates_last_txn_state( self, ) -> None: service = Mock(id=self.as_list[0]["id"]) events = [Mock(event_id="e1"), Mock(event_id="e2")] txn_id = 5 - self.get_success(self._set_last_txn(service.id, 4)) + self._set_state(self.as_list[0]["id"], ApplicationServiceState.UP) self.get_success(self._insert_txn(service.id, txn_id, events)) self.get_success( self.store.complete_appservice_txn(txn_id=txn_id, service=service) From ca0daba20bc3b011364380eadf22bdfd8fc4bf07 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Mon, 28 Mar 2022 12:47:52 +0100 Subject: [PATCH 07/13] Add database migration to create postgres sequence --- .../schema/main/delta/68/06as_txn_seq.py | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 synapse/storage/schema/main/delta/68/06as_txn_seq.py diff --git a/synapse/storage/schema/main/delta/68/06as_txn_seq.py b/synapse/storage/schema/main/delta/68/06as_txn_seq.py new file mode 100644 index 000000000000..9192d6a26a5c --- /dev/null +++ b/synapse/storage/schema/main/delta/68/06as_txn_seq.py @@ -0,0 +1,45 @@ +# Copyright 2022 Beeper +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +""" +Adds a postgres SEQUENCE for generating application service transaction IDs. +""" + +from synapse.storage.engines import PostgresEngine + + +def run_create(cur, database_engine, *args, **kwargs): + if isinstance(database_engine, PostgresEngine): + # If we already have some AS TXNs we want to start from the current + # maximum value. There are two potential places this is stored - the + # actual TXNs themselves *and* the AS state table. At time of migration + # it is possible the TXNs table is empty so we must include the AS state + # last_txn as a potential option, and pick the maximum. + + cur.execute("SELECT COALESCE(max(txn_id), 0) FROM application_services_txns") + row = cur.fetchone() + txn_max = row[0] + + cur.execute("SELECT COALESCE(max(last_txn), 0) FROM application_services_state") + row = cur.fetchone() + last_txn_max = row[0] + + start_val = max(last_txn_max, txn_max) + 1 + + cur.execute("CREATE SEQUENCE application_services_txn_id_seq START WITH %s", (start_val,)) + + +def run_upgrade(*args, **kwargs): + pass From ca4a78a14e716ceaf38928c4dcd54796eefbf860 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Mon, 28 Mar 2022 12:51:32 +0100 Subject: [PATCH 08/13] Lint migration --- synapse/storage/schema/main/delta/68/06as_txn_seq.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/storage/schema/main/delta/68/06as_txn_seq.py b/synapse/storage/schema/main/delta/68/06as_txn_seq.py index 9192d6a26a5c..62fa13f253d9 100644 --- a/synapse/storage/schema/main/delta/68/06as_txn_seq.py +++ b/synapse/storage/schema/main/delta/68/06as_txn_seq.py @@ -38,7 +38,10 @@ def run_create(cur, database_engine, *args, **kwargs): start_val = max(last_txn_max, txn_max) + 1 - cur.execute("CREATE SEQUENCE application_services_txn_id_seq START WITH %s", (start_val,)) + cur.execute( + "CREATE SEQUENCE application_services_txn_id_seq START WITH %s", + (start_val,), + ) def run_upgrade(*args, **kwargs): From c9c0b06b3517b6e50fcdc316d42eb955fd64e4d9 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Mon, 28 Mar 2022 13:39:14 +0100 Subject: [PATCH 09/13] Remove AS txn sequence test as tested elsewhere The sequence generators are already tested elsewhere with implementation specifics for each DB type, so no point duplicating that here. --- tests/storage/test_appservice.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index f756f0ade8aa..f45ad709cda5 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -265,20 +265,6 @@ def test_create_appservice_txn_first( self.assertEqual(txn.events, events) self.assertEqual(txn.service, service) - def test_create_appservice_txn_sequence_increment( - self, - ) -> None: - service = Mock(id=self.as_list[0]["id"]) - events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")]) - self.get_success(self._insert_txn(service.id, 9644, events)) - self.get_success(self._insert_txn(service.id, 9645, events)) - txn = self.get_success( - self.store.create_appservice_txn(service, events, [], [], {}, {}) - ) - self.assertEqual(txn.id, 9646) - self.assertEqual(txn.events, events) - self.assertEqual(txn.service, service) - def test_complete_appservice_txn_first_txn( self, ) -> None: From d62adeee3e503b0094b62f21cf20e141687138fb Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Wed, 30 Mar 2022 21:00:56 +0100 Subject: [PATCH 10/13] Update DB schema migration to version 69 --- synapse/storage/schema/__init__.py | 5 ++++- .../main/delta/{68/06as_txn_seq.py => 69/01as_txn_seq.py} | 4 ---- 2 files changed, 4 insertions(+), 5 deletions(-) rename synapse/storage/schema/main/delta/{68/06as_txn_seq.py => 69/01as_txn_seq.py} (97%) diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 7b21c1b96db9..ea900e0f3d35 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -SCHEMA_VERSION = 68 # remember to update the list below when updating +SCHEMA_VERSION = 69 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -58,6 +58,9 @@ - event_reference_hashes is no longer read. - `events` has `state_key` and `rejection_reason` columns, which are populated for new events. + +Changes in SCHEMA_VERSION = 69: + - Use sequence to generate future `application_services_txns.txn_id`s """ diff --git a/synapse/storage/schema/main/delta/68/06as_txn_seq.py b/synapse/storage/schema/main/delta/69/01as_txn_seq.py similarity index 97% rename from synapse/storage/schema/main/delta/68/06as_txn_seq.py rename to synapse/storage/schema/main/delta/69/01as_txn_seq.py index 62fa13f253d9..24bd4b391eee 100644 --- a/synapse/storage/schema/main/delta/68/06as_txn_seq.py +++ b/synapse/storage/schema/main/delta/69/01as_txn_seq.py @@ -42,7 +42,3 @@ def run_create(cur, database_engine, *args, **kwargs): "CREATE SEQUENCE application_services_txn_id_seq START WITH %s", (start_val,), ) - - -def run_upgrade(*args, **kwargs): - pass From 175f4d34bb8b505c837f4d955d2b5826ae6b9709 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Wed, 30 Mar 2022 21:03:19 +0100 Subject: [PATCH 11/13] Update changelog w/note about shutting AS worker down before upgrade --- changelog.d/12209.misc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/12209.misc b/changelog.d/12209.misc index d9a1c93354ba..f1a14832f376 100644 --- a/changelog.d/12209.misc +++ b/changelog.d/12209.misc @@ -1 +1 @@ -Switch to using a sequence to generate AS transaction IDs. Contributed by Nick Beeper. +Switch to using a sequence to generate AS transaction IDs. Contributed by Nick Beeper. If running synapse with a dedicated appservice worker, this MUST be shutdown before upgrading the main process and database. From 5803f05a18f11991e6971f8e213783f29c77007d Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 1 Apr 2022 13:53:18 +0100 Subject: [PATCH 12/13] Update changelog.d/12209.misc --- changelog.d/12209.misc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/12209.misc b/changelog.d/12209.misc index f1a14832f376..d145b5eb04cb 100644 --- a/changelog.d/12209.misc +++ b/changelog.d/12209.misc @@ -1 +1 @@ -Switch to using a sequence to generate AS transaction IDs. Contributed by Nick Beeper. If running synapse with a dedicated appservice worker, this MUST be shutdown before upgrading the main process and database. +Switch to using a sequence to generate AS transaction IDs. Contributed by Nick Beeper. If running synapse with a dedicated appservice worker, this MUST be stopped before upgrading the main process and database. From 226820b2962640014d13361d68275ba2f7be79af Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 1 Apr 2022 14:04:25 +0100 Subject: [PATCH 13/13] Add notes to upgrade.md --- docs/upgrade.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/docs/upgrade.md b/docs/upgrade.md index 062e823333c7..f6d226526a78 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -85,6 +85,19 @@ process, for example: dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb ``` +# Upgrading to v1.57.0 + +## Changes to database schema for application services + +Synapse v1.57.0 includes a [change](https://github.com/matrix-org/synapse/pull/12209) to the +way transaction IDs are managed for application services. If your deployment uses a dedicated +worker for application service traffic, **it must be stopped** when the database is upgraded +(which normally happens when the main process is upgraded), to ensure the change is made safely +without any risk of reusing transaction IDs. + +Deployments which do not use separate worker processes can be upgraded as normal. Similarly, +deployments where no applciation services are in use can be upgraded as normal. + # Upgrading to v1.56.0 ## Groups/communities feature has been deprecated