From b72a8625aa74eeee5eae96fe3c8b534c0cb9f47e Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Fri, 2 Sep 2022 07:59:11 +0100 Subject: [PATCH 1/8] Add `event_stream_ordering` column to `receipts_linearized` table --- .../72/05receipts_event_stream_ordering.sql | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql diff --git a/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql b/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql new file mode 100644 index 000000000000..7911250c2c26 --- /dev/null +++ b/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql @@ -0,0 +1,16 @@ +/* Copyright 2022 Beeper + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE receipts_linearized ADD COLUMN event_stream_ordering integer; From 335f41300cadd0380d149a7589e84b5bc992ff02 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Fri, 2 Sep 2022 09:41:15 +0100 Subject: [PATCH 2/8] Write receipt `event_stream_ordering` column --- synapse/storage/databases/main/receipts.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 124c70ad37b6..6c655618148a 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -664,6 +664,14 @@ def _insert_linearized_receipt_txn( self._receipts_stream_cache.entity_has_changed, room_id, stream_id ) + upsert_values: Dict[str, Any] = { + "stream_id": stream_id, + "event_id": event_id, + "data": json_encoder.encode(data), + } + if stream_ordering is not None: + upsert_values["event_stream_ordering"] = stream_ordering + self.db_pool.simple_upsert_txn( txn, table="receipts_linearized", @@ -672,11 +680,7 @@ def _insert_linearized_receipt_txn( "receipt_type": receipt_type, "user_id": user_id, }, - values={ - "stream_id": stream_id, - "event_id": event_id, - "data": json_encoder.encode(data), - }, + values=upsert_values, # receipts_linearized has a unique constraint on # (user_id, room_id, receipt_type), so no need to lock lock=False, From 1b3a2cb132d24d3ca22e1f407cfcde200930788a Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Fri, 2 Sep 2022 09:47:47 +0100 Subject: [PATCH 3/8] Add changelog file --- changelog.d/13703.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/13703.misc diff --git a/changelog.d/13703.misc b/changelog.d/13703.misc new file mode 100644 index 000000000000..685a29b17d4b --- /dev/null +++ b/changelog.d/13703.misc @@ -0,0 +1 @@ +Add & populate `event_stream_ordering` column on receipts table for future optimisation of push action processing. Contributed by Nick @ Beeper (@fizzadar). From c0da6fafee7ff5098944a8888686351a728cd7da Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Fri, 9 Sep 2022 09:03:55 +0100 Subject: [PATCH 4/8] Add background job to populate receipts `event_stream_ordering` --- synapse/storage/databases/main/receipts.py | 71 ++++++++++++++++++- .../72/05receipts_event_stream_ordering.sql | 3 + 2 files changed, 73 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 6c655618148a..3cd066110a2a 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -838,5 +838,74 @@ def _insert_graph_receipt_txn( ) -class ReceiptsStore(ReceiptsWorkerStore): +class ReceiptsBackgroundUpdateStore(SQLBaseStore): + POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering" + + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + self.db_pool.updates.register_background_update_handler( + self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING, + self._populate_receipt_event_stream_ordering, + ) + + async def _populate_receipt_event_stream_ordering( + self, progress: JsonDict, batch_size: int + ) -> int: + def _populate_receipt_event_stream_ordering_txn(txn: LoggingTransaction) -> bool: + + if "max_stream_id" in progress: + max_stream_id = progress["max_stream_id"] + else: + txn.execute("SELECT max(stream_id) FROM receipts_linearized") + res = txn.fetchone() + if res is None or res[0] is None: + return True + else: + max_stream_id = res[0] + + start = progress.get("stream_id", 0) + stop = start + batch_size + + sql = """ + UPDATE receipts_linearized + SET event_stream_ordering = ( + SELECT stream_ordering + FROM events + WHERE event_id = receipts_linearized.event_id + ) + WHERE stream_id >= ? AND stream_id < ? + """ + txn.execute(sql, (start, stop)) + + self.db_pool.updates._background_update_progress_txn( + txn, + self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING, + { + "stream_id": stop, + "max_stream_id": max_stream_id, + }, + ) + + return stop > max_stream_id + + finished = await self.db_pool.runInteraction( + "_remove_devices_from_device_inbox_txn", + _populate_receipt_event_stream_ordering_txn, + ) + + if finished: + await self.db_pool.updates._end_background_update( + self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING + ) + + return batch_size + + +class ReceiptsStore(ReceiptsWorkerStore, ReceiptsBackgroundUpdateStore): pass diff --git a/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql b/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql index 7911250c2c26..e40a9fa4a4c9 100644 --- a/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql +++ b/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql @@ -14,3 +14,6 @@ */ ALTER TABLE receipts_linearized ADD COLUMN event_stream_ordering integer; + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('populate_event_stream_ordering', '{}'); From a0faa6c35186f2529efacdbb78765b2430d45ea1 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Fri, 9 Sep 2022 09:14:08 +0100 Subject: [PATCH 5/8] Formatting --- synapse/storage/databases/main/receipts.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 3cd066110a2a..87d43750f437 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -857,7 +857,9 @@ def __init__( async def _populate_receipt_event_stream_ordering( self, progress: JsonDict, batch_size: int ) -> int: - def _populate_receipt_event_stream_ordering_txn(txn: LoggingTransaction) -> bool: + def _populate_receipt_event_stream_ordering_txn( + txn: LoggingTransaction, + ) -> bool: if "max_stream_id" in progress: max_stream_id = progress["max_stream_id"] From 3f98ef770f65f956d9d5047d46321afbe50b8495 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Fri, 9 Sep 2022 09:26:38 +0100 Subject: [PATCH 6/8] Add receipts background store to port db script --- synapse/_scripts/synapse_port_db.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 543bba27c29e..30983c47fbb7 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -67,6 +67,7 @@ ) from synapse.storage.databases.main.presence import PresenceBackgroundUpdateStore from synapse.storage.databases.main.pusher import PusherWorkerStore +from synapse.storage.databases.main.receipts import ReceiptsBackgroundUpdateStore from synapse.storage.databases.main.registration import ( RegistrationBackgroundUpdateStore, find_max_generated_user_id_localpart, @@ -203,6 +204,7 @@ class Store( PushRuleStore, PusherWorkerStore, PresenceBackgroundUpdateStore, + ReceiptsBackgroundUpdateStore, ): def execute(self, f: Callable[..., R], *args: Any, **kwargs: Any) -> Awaitable[R]: return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs) From b6e832186f19fae89a90f95d47f281a59efee462 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Mon, 12 Sep 2022 15:25:37 +0100 Subject: [PATCH 7/8] Use BIGINT for stream ordering Co-authored-by: Erik Johnston --- .../schema/main/delta/72/05receipts_event_stream_ordering.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql b/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql index e40a9fa4a4c9..2a822f4509f6 100644 --- a/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql +++ b/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql @@ -13,7 +13,7 @@ * limitations under the License. */ -ALTER TABLE receipts_linearized ADD COLUMN event_stream_ordering integer; +ALTER TABLE receipts_linearized ADD COLUMN event_stream_ordering BIGINT; INSERT INTO background_updates (update_name, progress_json) VALUES ('populate_event_stream_ordering', '{}'); From 3814eb89371f21c995cb48aa2904bfc47758c457 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Mon, 12 Sep 2022 15:26:52 +0100 Subject: [PATCH 8/8] Always set `event_stream_ordering` --- synapse/storage/databases/main/receipts.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 87d43750f437..76613b0c74d8 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -664,14 +664,6 @@ def _insert_linearized_receipt_txn( self._receipts_stream_cache.entity_has_changed, room_id, stream_id ) - upsert_values: Dict[str, Any] = { - "stream_id": stream_id, - "event_id": event_id, - "data": json_encoder.encode(data), - } - if stream_ordering is not None: - upsert_values["event_stream_ordering"] = stream_ordering - self.db_pool.simple_upsert_txn( txn, table="receipts_linearized", @@ -680,7 +672,12 @@ def _insert_linearized_receipt_txn( "receipt_type": receipt_type, "user_id": user_id, }, - values=upsert_values, + values={ + "stream_id": stream_id, + "event_id": event_id, + "event_stream_ordering": stream_ordering, + "data": json_encoder.encode(data), + }, # receipts_linearized has a unique constraint on # (user_id, room_id, receipt_type), so no need to lock lock=False,