From 9cb167cabec9cd96153830d812730b5e3706188c Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 8 Sep 2022 11:43:43 -0400 Subject: [PATCH 01/27] Update filtering to include the thread notifications flag. --- synapse/api/filtering.py | 10 ++++++++++ synapse/config/experimental.py | 2 ++ synapse/rest/client/versions.py | 2 ++ 3 files changed, 14 insertions(+) diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index f7f46f8d8008..c6e44dcf822e 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -84,6 +84,7 @@ "contains_url": {"type": "boolean"}, "lazy_load_members": {"type": "boolean"}, "include_redundant_members": {"type": "boolean"}, + "org.matrix.msc3773.unread_thread_notifications": {"type": "boolean"}, # Include or exclude events with the provided labels. # cf https://github.com/matrix-org/matrix-doc/pull/2326 "org.matrix.labels": {"type": "array", "items": {"type": "string"}}, @@ -240,6 +241,9 @@ def lazy_load_members(self) -> bool: def include_redundant_members(self) -> bool: return self._room_state_filter.include_redundant_members + def unread_thread_notifications(self) -> bool: + return self._room_timeline_filter.unread_thread_notifications + async def filter_presence( self, events: Iterable[UserPresenceState] ) -> List[UserPresenceState]: @@ -304,6 +308,12 @@ def __init__(self, hs: "HomeServer", filter_json: JsonDict): self.include_redundant_members = filter_json.get( "include_redundant_members", False ) + if hs.config.experimental.msc3773_enabled: + self.unread_thread_notifications: bool = filter_json.get( + "org.matrix.msc3773.unread_thread_notifications", False + ) + else: + self.unread_thread_notifications = False self.types = filter_json.get("types", None) self.not_types = filter_json.get("not_types", []) diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index 702b81e636c9..51d6d9c03548 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -84,6 +84,8 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: # MSC3772: A push rule for mutual relations. self.msc3772_enabled: bool = experimental.get("msc3772_enabled", False) + # MSC3773: Thread notifications + self.msc3773_enabled: bool = experimental.get("msc3773_enabled", False) # MSC3715: dir param on /relations. self.msc3715_enabled: bool = experimental.get("msc3715_enabled", False) diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py index c516cda95d5c..195b75388234 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py @@ -103,6 +103,8 @@ def on_GET(self, request: Request) -> Tuple[int, JsonDict]: "org.matrix.msc3030": self.config.experimental.msc3030_enabled, # Adds support for thread relations, per MSC3440. "org.matrix.msc3440.stable": True, # TODO: remove when "v1.3" is added above + # Support for thread notification counts. + "org.matrix.msc3773": self.config.experimental.msc3773_enabled, # Allows moderators to fetch redacted event content as described in MSC2815 "fi.mau.msc2815": self.config.experimental.msc2815_enabled, }, From 8ac2f322d9d9e569a161be0a17c92213143d0538 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 9 Sep 2022 08:35:56 -0400 Subject: [PATCH 02/27] Ensure that the thread_id column is non-null and then require it to be non-null. --- synapse/storage/schema/__init__.py | 6 +- .../73/01thread_notifications_backfill.sql | 29 +++++ ...thread_notifications_not_null.sql.postgres | 19 ++++ ...02thread_notifications_not_null.sql.sqlite | 100 ++++++++++++++++++ 4 files changed, 151 insertions(+), 3 deletions(-) create mode 100644 synapse/storage/schema/main/delta/73/01thread_notifications_backfill.sql create mode 100644 synapse/storage/schema/main/delta/73/02thread_notifications_not_null.sql.postgres create mode 100644 synapse/storage/schema/main/delta/73/02thread_notifications_not_null.sql.sqlite diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 68e055c66471..2d6ac659b448 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -87,9 +87,9 @@ SCHEMA_COMPAT_VERSION = ( - # The groups tables are no longer accessible, so synapses with SCHEMA_VERSION < 72 - # could break. - 72 + # The threads_id column must exist for event_push_actions, event_push_summary, + # receipts_linearized, and receipts_graph. + 73 ) """Limit on how far the synapse codebase can be rolled back without breaking db compat diff --git a/synapse/storage/schema/main/delta/73/01thread_notifications_backfill.sql b/synapse/storage/schema/main/delta/73/01thread_notifications_backfill.sql new file mode 100644 index 000000000000..0ffde9bbeb18 --- /dev/null +++ b/synapse/storage/schema/main/delta/73/01thread_notifications_backfill.sql @@ -0,0 +1,29 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Forces the background updates from 06thread_notifications.sql to run in the +-- foreground as code will now require those to be "done". + +DELETE FROM background_updates WHERE update_name = 'event_push_backfill_thread_id'; + +-- Overwrite any null thread_id columns. +UPDATE event_push_actions_staging SET thread_id = 'main' WHERE thread_id IS NULL; +UPDATE event_push_actions SET thread_id = 'main' WHERE thread_id IS NULL; +UPDATE event_push_summary SET thread_id = 'main' WHERE thread_id IS NULL; + +-- Do not run the event_push_summary_unique_index job if it is pending; the +-- thread_id field will be made required. +DELETE FROM background_updates WHERE update_name = 'event_push_summary_unique_index'; +DROP INDEX IF EXISTS event_push_summary_unique_index; diff --git a/synapse/storage/schema/main/delta/73/02thread_notifications_not_null.sql.postgres b/synapse/storage/schema/main/delta/73/02thread_notifications_not_null.sql.postgres new file mode 100644 index 000000000000..33674f8c6268 --- /dev/null +++ b/synapse/storage/schema/main/delta/73/02thread_notifications_not_null.sql.postgres @@ -0,0 +1,19 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- The columns can now be made non-nullable. +ALTER TABLE event_push_actions_staging ALTER COLUMN thread_id SET NOT NULL; +ALTER TABLE event_push_actions ALTER COLUMN thread_id SET NOT NULL; +ALTER TABLE event_push_summary ALTER COLUMN thread_id SET NOT NULL; diff --git a/synapse/storage/schema/main/delta/73/02thread_notifications_not_null.sql.sqlite b/synapse/storage/schema/main/delta/73/02thread_notifications_not_null.sql.sqlite new file mode 100644 index 000000000000..53e70ee153b1 --- /dev/null +++ b/synapse/storage/schema/main/delta/73/02thread_notifications_not_null.sql.sqlite @@ -0,0 +1,100 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- SQLite doesn't support modifying columns to an existing table, so it must +-- be recreated. + +-- Create the new tables. +CREATE TABLE event_push_actions_staging_new ( + event_id TEXT NOT NULL, + user_id TEXT NOT NULL, + actions TEXT NOT NULL, + notif SMALLINT NOT NULL, + highlight SMALLINT NOT NULL, + unread SMALLINT, + thread_id TEXT NOT NULL +); + +CREATE TABLE event_push_actions_new ( + room_id TEXT NOT NULL, + event_id TEXT NOT NULL, + user_id TEXT NOT NULL, + profile_tag VARCHAR(32), + actions TEXT NOT NULL, + topological_ordering BIGINT, + stream_ordering BIGINT, + notif SMALLINT, + highlight SMALLINT, + unread SMALLINT, + thread_id TEXT NOT NULL, + CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag) +); + +CREATE TABLE event_push_summary_new ( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + notif_count BIGINT NOT NULL, + stream_ordering BIGINT NOT NULL, + unread_count BIGINT, + last_receipt_stream_ordering BIGINT, + thread_id TEXT NOT NULL +); + +-- Swap the indexes. +DROP INDEX IF EXISTS event_push_actions_staging_id; +CREATE INDEX event_push_actions_staging_id ON event_push_actions_staging_new(event_id); + +DROP INDEX IF EXISTS event_push_actions_room_id_user_id; +DROP INDEX IF EXISTS event_push_actions_rm_tokens; +DROP INDEX IF EXISTS event_push_actions_stream_ordering; +DROP INDEX IF EXISTS event_push_actions_u_highlight; +DROP INDEX IF EXISTS event_push_actions_highlights_index; +CREATE INDEX event_push_actions_room_id_user_id on event_push_actions_new(room_id, user_id); +CREATE INDEX event_push_actions_rm_tokens on event_push_actions_new( user_id, room_id, topological_ordering, stream_ordering ); +CREATE INDEX event_push_actions_stream_ordering on event_push_actions_new( stream_ordering, user_id ); +CREATE INDEX event_push_actions_u_highlight ON event_push_actions_new (user_id, stream_ordering); +CREATE INDEX event_push_actions_highlights_index ON event_push_actions_new (user_id, room_id, topological_ordering, stream_ordering); + +-- Copy the data. +INSERT INTO event_push_actions_staging_new (event_id, user_id, actions, notif, highlight, unread, thread_id) + SELECT event_id, user_id, actions, notif, highlight, unread, thread_id + FROM event_push_actions_staging; + +INSERT INTO event_push_actions_new (room_id, event_id, user_id, profile_tag, actions, topological_ordering, stream_ordering, notif, highlight, unread, thread_id) + SELECT room_id, event_id, user_id, profile_tag, actions, topological_ordering, stream_ordering, notif, highlight, unread, thread_id + FROM event_push_actions; + +INSERT INTO event_push_summary_new (user_id, room_id, notif_count, stream_ordering, unread_count, last_receipt_stream_ordering, thread_id) + SELECT user_id, room_id, notif_count, stream_ordering, unread_count, last_receipt_stream_ordering, thread_id + FROM event_push_summary; + +-- Drop the old tables. +DROP TABLE event_push_actions_staging; +DROP TABLE event_push_actions; +DROP TABLE event_push_summary; + +-- Rename the tables. +ALTER TABLE event_push_actions_staging_new RENAME TO event_push_actions_staging; +ALTER TABLE event_push_actions_new RENAME TO event_push_actions; +ALTER TABLE event_push_summary_new RENAME TO event_push_summary; + +-- Re-run background updates from 72/02event_push_actions_index.sql and +-- 72/06thread_notifications.sql. +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (7302, 'event_push_summary_unique_index2', '{}') + ON CONFLICT (update_name) DO NOTHING; +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (7302, 'event_push_actions_stream_highlight_index', '{}') + ON CONFLICT (update_name) DO NOTHING; From 111fe5799f5644217410ca79080a0a71867fe255 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 8 Sep 2022 11:51:17 -0400 Subject: [PATCH 03/27] Add infrastructure to pass notifications per thread. --- synapse/handlers/sync.py | 40 ++++++++++++++++--- synapse/push/push_tools.py | 9 ++++- synapse/rest/client/sync.py | 4 ++ .../databases/main/event_push_actions.py | 39 +++++++++++++----- .../replication/slave/storage/test_events.py | 17 ++++++-- tests/storage/test_event_push_actions.py | 3 +- 6 files changed, 89 insertions(+), 23 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 5293fa4d0e01..528f8dad11f3 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -40,7 +40,7 @@ from synapse.logging.context import current_context from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span from synapse.push.clientformat import format_push_rules_for_user -from synapse.storage.databases.main.event_push_actions import NotifCounts +from synapse.storage.databases.main.event_push_actions import RoomNotifCounts from synapse.storage.roommember import MemberSummary from synapse.storage.state import StateFilter from synapse.types import ( @@ -128,6 +128,7 @@ class JoinedSyncResult: ephemeral: List[JsonDict] account_data: List[JsonDict] unread_notifications: JsonDict + unread_thread_notifications: JsonDict summary: Optional[JsonDict] unread_count: int @@ -278,6 +279,8 @@ def __init__(self, hs: "HomeServer"): self.rooms_to_exclude = hs.config.server.rooms_to_exclude_from_sync + self._msc3773_enabled = hs.config.experimental.msc3773_enabled + async def wait_for_sync_for_user( self, requester: Requester, @@ -1272,7 +1275,7 @@ async def _find_missing_partial_state_memberships( async def unread_notifs_for_room_id( self, room_id: str, sync_config: SyncConfig - ) -> NotifCounts: + ) -> RoomNotifCounts: with Measure(self.clock, "unread_notifs_for_room_id"): return await self.store.get_unread_event_push_actions_by_room_for_user( @@ -2343,6 +2346,7 @@ async def _generate_room_entry( ephemeral=ephemeral, account_data=account_data_events, unread_notifications=unread_notifications, + unread_thread_notifications={}, summary=summary, unread_count=0, ) @@ -2350,10 +2354,36 @@ async def _generate_room_entry( if room_sync or always_include: notifs = await self.unread_notifs_for_room_id(room_id, sync_config) - unread_notifications["notification_count"] = notifs.notify_count - unread_notifications["highlight_count"] = notifs.highlight_count + # Notifications for the main timeline. + notify_count = notifs.main_timeline.notify_count + highlight_count = notifs.main_timeline.highlight_count + unread_count = notifs.main_timeline.unread_count - room_sync.unread_count = notifs.unread_count + # Check the sync configuration. + if ( + self._msc3773_enabled + and sync_config.filter_collection.unread_thread_notifications() + ): + # And add info for each thread. + room_sync.unread_thread_notifications = { + thread_id: { + "notification_count": thread_notifs.notify_count, + "highlight_count": thread_notifs.highlight_count, + } + for thread_id, thread_notifs in notifs.threads.items() + if thread_id is not None + } + + else: + # Combine the unread counts for all threads and main timeline. + for thread_notifs in notifs.threads.values(): + notify_count += thread_notifs.notify_count + highlight_count += thread_notifs.highlight_count + unread_count += thread_notifs.unread_count + + unread_notifications["notification_count"] = notify_count + unread_notifications["highlight_count"] = highlight_count + room_sync.unread_count = unread_count sync_result_builder.joined.append(room_sync) diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index 658bf373b7c9..edeba27a4553 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -39,7 +39,12 @@ async def get_room_unread_count(room_id: str) -> None: await concurrently_execute(get_room_unread_count, joins, 10) for notifs in room_notifs: - if notifs.notify_count == 0: + # Combine the counts from all the threads. + notify_count = notifs.main_timeline.notify_count + sum( + n.notify_count for n in notifs.threads.values() + ) + + if notify_count == 0: continue if group_by_room: @@ -47,7 +52,7 @@ async def get_room_unread_count(room_id: str) -> None: badge += 1 else: # increment the badge count by the number of unread messages in the room - badge += notifs.notify_count + badge += notify_count return badge diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index c2989765cee0..f1c23d68e51a 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -509,6 +509,10 @@ async def encode_room( ephemeral_events = room.ephemeral result["ephemeral"] = {"events": ephemeral_events} result["unread_notifications"] = room.unread_notifications + if room.unread_thread_notifications: + result[ + "org.matrix.msc3773.unread_thread_notifications" + ] = room.unread_thread_notifications result["summary"] = room.summary if self._msc2654_enabled: result["org.matrix.msc2654.unread_count"] = room.unread_count diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 6b8668d2dcfe..096ca652e9b9 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -157,7 +157,7 @@ class UserPushAction(EmailPushAction): @attr.s(slots=True, auto_attribs=True) class NotifCounts: """ - The per-user, per-room count of notifications. Used by sync and push. + The per-user, per-room, per-thread count of notifications. Used by sync and push. """ notify_count: int = 0 @@ -165,6 +165,21 @@ class NotifCounts: highlight_count: int = 0 +@attr.s(slots=True, auto_attribs=True) +class RoomNotifCounts: + """ + The per-user, per-room count of notifications. Used by sync and push. + """ + + main_timeline: NotifCounts + # Map of thread ID to the notification counts. + threads: Dict[str, NotifCounts] + + def __len__(self) -> int: + # To properly account for the amount of space in any caches. + return len(self.threads) + 1 + + def _serialize_action( actions: Collection[Union[Mapping, str]], is_highlight: bool ) -> str: @@ -331,12 +346,12 @@ def add_thread_id_txn( return result - @cached(tree=True, max_entries=5000) + @cached(tree=True, max_entries=5000, iterable=True) async def get_unread_event_push_actions_by_room_for_user( self, room_id: str, user_id: str, - ) -> NotifCounts: + ) -> RoomNotifCounts: """Get the notification count, the highlight count and the unread message count for a given user in a given room after their latest read receipt. @@ -349,8 +364,9 @@ async def get_unread_event_push_actions_by_room_for_user( user_id: The user to retrieve the counts for. Returns - A NotifCounts object containing the notification count, the highlight count - and the unread message count. + A RoomNotifCounts object containing the notification count, the + highlight count and the unread message count for both the main timeline + and threads. """ return await self.db_pool.runInteraction( "get_unread_event_push_actions_by_room", @@ -364,7 +380,7 @@ def _get_unread_counts_by_receipt_txn( txn: LoggingTransaction, room_id: str, user_id: str, - ) -> NotifCounts: + ) -> RoomNotifCounts: # Get the stream ordering of the user's latest receipt in the room. result = self.get_last_receipt_for_user_txn( txn, @@ -402,7 +418,7 @@ def _get_unread_counts_by_pos_txn( room_id: str, user_id: str, receipt_stream_ordering: int, - ) -> NotifCounts: + ) -> RoomNotifCounts: """Get the number of unread messages for a user/room that have happened since the given stream ordering. @@ -414,9 +430,10 @@ def _get_unread_counts_by_pos_txn( receipt in the room. If there are no receipts, the stream ordering of the user's join event. - Returns - A NotifCounts object containing the notification count, the highlight count - and the unread message count. + Returns: + A RoomNotifCounts object containing the notification count, the + highlight count and the unread message count for both the main timeline + and threads. """ counts = NotifCounts() @@ -481,7 +498,7 @@ def _get_unread_counts_by_pos_txn( counts.notify_count += notify_count counts.unread_count += unread_count - return counts + return RoomNotifCounts(counts, {}) def _get_notif_unread_count_for_user_room( self, diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index 49a21e2e8581..c8d22468c24f 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -22,7 +22,10 @@ from synapse.events import FrozenEvent, _EventInternalMetadata, make_event_from_dict from synapse.handlers.room import RoomEventSource from synapse.replication.slave.storage.events import SlavedEventStore -from synapse.storage.databases.main.event_push_actions import NotifCounts +from synapse.storage.databases.main.event_push_actions import ( + NotifCounts, + RoomNotifCounts, +) from synapse.storage.roommember import GetRoomsForUserWithStreamOrdering, RoomsForUser from synapse.types import PersistedEventPosition @@ -178,7 +181,9 @@ def test_push_actions_for_user(self, send_receipt: bool): self.check( "get_unread_event_push_actions_by_room_for_user", [ROOM_ID, USER_ID_2], - NotifCounts(highlight_count=0, unread_count=0, notify_count=0), + RoomNotifCounts( + NotifCounts(highlight_count=0, unread_count=0, notify_count=0), {} + ), ) self.persist( @@ -191,7 +196,9 @@ def test_push_actions_for_user(self, send_receipt: bool): self.check( "get_unread_event_push_actions_by_room_for_user", [ROOM_ID, USER_ID_2], - NotifCounts(highlight_count=0, unread_count=0, notify_count=1), + RoomNotifCounts( + NotifCounts(highlight_count=0, unread_count=0, notify_count=1), {} + ), ) self.persist( @@ -206,7 +213,9 @@ def test_push_actions_for_user(self, send_receipt: bool): self.check( "get_unread_event_push_actions_by_room_for_user", [ROOM_ID, USER_ID_2], - NotifCounts(highlight_count=1, unread_count=0, notify_count=2), + RoomNotifCounts( + NotifCounts(highlight_count=1, unread_count=0, notify_count=2), {} + ), ) def test_get_rooms_for_user_with_stream_ordering(self): diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index fc43d7edd183..95f740ed1043 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -77,13 +77,14 @@ def _assert_counts(noitf_count: int, highlight_count: int) -> None: ) ) self.assertEqual( - counts, + counts.main_timeline, NotifCounts( notify_count=noitf_count, unread_count=0, highlight_count=highlight_count, ), ) + self.assertEqual(counts.threads, {}) def _create_event(highlight: bool = False) -> str: result = self.helper.send_event( From 62aa85bc417a13b7a7a6233c3c43f0ed5b8c7870 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 8 Sep 2022 14:25:43 -0400 Subject: [PATCH 04/27] Calculate thread specific notification counts. --- changelog.d/13776.feature | 1 + synapse/storage/database.py | 2 +- .../databases/main/event_push_actions.py | 152 +++++++++------ tests/storage/test_event_push_actions.py | 175 ++++++++++++++++++ 4 files changed, 271 insertions(+), 59 deletions(-) create mode 100644 changelog.d/13776.feature diff --git a/changelog.d/13776.feature b/changelog.d/13776.feature new file mode 100644 index 000000000000..22bce125ce5e --- /dev/null +++ b/changelog.d/13776.feature @@ -0,0 +1 @@ +Experimental support for thread-specific notifications ([MSC3773](https://github.com/matrix-org/matrix-spec-proposals/pull/3773)). diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 921cd4dc5ee0..aafd8431f601 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -94,7 +94,7 @@ "event_search": "event_search_event_id_idx", "local_media_repository_thumbnails": "local_media_repository_thumbnails_method_idx", "remote_media_cache_thumbnails": "remote_media_repository_thumbnails_method_idx", - "event_push_summary": "event_push_summary_unique_index", + "event_push_summary": "event_push_summary_unique_index2", } diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 096ca652e9b9..7cc500e8d3c7 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -437,6 +437,7 @@ def _get_unread_counts_by_pos_txn( """ counts = NotifCounts() + thread_counts = {} # First we pull the counts from the summary table. # @@ -453,7 +454,7 @@ def _get_unread_counts_by_pos_txn( # receipt). txn.execute( """ - SELECT stream_ordering, notif_count, COALESCE(unread_count, 0) + SELECT stream_ordering, notif_count, COALESCE(unread_count, 0), thread_id FROM event_push_summary WHERE room_id = ? AND user_id = ? AND ( @@ -463,42 +464,70 @@ def _get_unread_counts_by_pos_txn( """, (room_id, user_id, receipt_stream_ordering, receipt_stream_ordering), ) - row = txn.fetchone() + max_summary_stream_ordering = 0 + for summary_stream_ordering, notif_count, unread_count, thread_id in txn: + if thread_id == "main": + counts = NotifCounts( + notify_count=notif_count, unread_count=unread_count + ) + # TODO Delete zeroed out threads completely from the database. + elif notif_count or unread_count: + thread_counts[thread_id] = NotifCounts( + notify_count=notif_count, unread_count=unread_count + ) - summary_stream_ordering = 0 - if row: - summary_stream_ordering = row[0] - counts.notify_count += row[1] - counts.unread_count += row[2] + # XXX All threads should have the same stream ordering? + max_summary_stream_ordering = max( + summary_stream_ordering, max_summary_stream_ordering + ) # Next we need to count highlights, which aren't summarised sql = """ - SELECT COUNT(*) FROM event_push_actions + SELECT COUNT(*), thread_id FROM event_push_actions WHERE user_id = ? AND room_id = ? AND stream_ordering > ? AND highlight = 1 + GROUP BY thread_id """ txn.execute(sql, (user_id, room_id, receipt_stream_ordering)) - row = txn.fetchone() - if row: - counts.highlight_count += row[0] + for highlight_count, thread_id in txn: + if thread_id == "main": + counts.highlight_count += highlight_count + elif highlight_count: + if thread_id in thread_counts: + thread_counts[thread_id].highlight_count += highlight_count + else: + thread_counts[thread_id] = NotifCounts( + notify_count=0, unread_count=0, highlight_count=highlight_count + ) # Finally we need to count push actions that aren't included in the # summary returned above. This might be due to recent events that haven't # been summarised yet or the summary is out of date due to a recent read # receipt. start_unread_stream_ordering = max( - receipt_stream_ordering, summary_stream_ordering + receipt_stream_ordering, max_summary_stream_ordering ) - notify_count, unread_count = self._get_notif_unread_count_for_user_room( + unread_counts = self._get_notif_unread_count_for_user_room( txn, room_id, user_id, start_unread_stream_ordering ) - counts.notify_count += notify_count - counts.unread_count += unread_count + for notif_count, unread_count, thread_id in unread_counts: + if thread_id == "main": + counts.notify_count += notif_count + counts.unread_count += unread_count + elif thread_id in thread_counts: + thread_counts[thread_id].notify_count += notif_count + thread_counts[thread_id].unread_count += unread_count + else: + thread_counts[thread_id] = NotifCounts( + notify_count=notif_count, + unread_count=unread_count, + highlight_count=0, + ) - return RoomNotifCounts(counts, {}) + return RoomNotifCounts(counts, thread_counts) def _get_notif_unread_count_for_user_room( self, @@ -507,7 +536,7 @@ def _get_notif_unread_count_for_user_room( user_id: str, stream_ordering: int, max_stream_ordering: Optional[int] = None, - ) -> Tuple[int, int]: + ) -> List[Tuple[int, int, str]]: """Returns the notify and unread counts from `event_push_actions` for the given user/room in the given range. @@ -523,13 +552,14 @@ def _get_notif_unread_count_for_user_room( If this is not given, then no maximum is applied. Return: - A tuple of the notif count and unread count in the given range. + A tuple of the notif count and unread count in the given range for + each thread. """ # If there have been no events in the room since the stream ordering, # there can't be any push actions either. if not self._events_stream_cache.has_entity_changed(room_id, stream_ordering): - return 0, 0 + return [] clause = "" args = [user_id, room_id, stream_ordering] @@ -540,26 +570,23 @@ def _get_notif_unread_count_for_user_room( # If the max stream ordering is less than the min stream ordering, # then obviously there are zero push actions in that range. if max_stream_ordering <= stream_ordering: - return 0, 0 + return [] sql = f""" SELECT COUNT(CASE WHEN notif = 1 THEN 1 END), - COUNT(CASE WHEN unread = 1 THEN 1 END) - FROM event_push_actions ea - WHERE user_id = ? + COUNT(CASE WHEN unread = 1 THEN 1 END), + thread_id + FROM event_push_actions ea + WHERE user_id = ? AND room_id = ? AND ea.stream_ordering > ? {clause} + GROUP BY thread_id """ txn.execute(sql, args) - row = txn.fetchone() - - if row: - return cast(Tuple[int, int], row) - - return 0, 0 + return cast(List[Tuple[int, int, str]], txn.fetchall()) async def get_push_action_users_in_range( self, min_stream_ordering: int, max_stream_ordering: int @@ -1103,26 +1130,34 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: # Fetch the notification counts between the stream ordering of the # latest receipt and what was previously summarised. - notif_count, unread_count = self._get_notif_unread_count_for_user_room( + unread_counts = self._get_notif_unread_count_for_user_room( txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering ) - # Replace the previous summary with the new counts. - # - # TODO(threads): Upsert per-thread instead of setting them all to main. - self.db_pool.simple_upsert_txn( + # First mark the summary for all threads in the room as cleared. + self.db_pool.simple_update_txn( txn, table="event_push_summary", - keyvalues={"room_id": room_id, "user_id": user_id}, - values={ - "notif_count": notif_count, - "unread_count": unread_count, + keyvalues={"user_id": user_id, "room_id": room_id}, + updatevalues={ + "notif_count": 0, + "unread_count": 0, "stream_ordering": old_rotate_stream_ordering, "last_receipt_stream_ordering": stream_ordering, - "thread_id": "main", }, ) + # Then any updated threads get their notification count and unread + # count updated. + self.db_pool.simple_upsert_many_txn( + txn, + table="event_push_summary", + key_names=("room_id", "user_id", "thread_id"), + key_values=[(room_id, user_id, row[2]) for row in unread_counts], + value_names=("notif_count", "unread_count"), + value_values=[(row[0], row[1]) for row in unread_counts], + ) + # We always update `event_push_summary_last_receipt_stream_id` to # ensure that we don't rescan the same receipts for remote users. @@ -1208,23 +1243,23 @@ def _rotate_notifs_before_txn( # Calculate the new counts that should be upserted into event_push_summary sql = """ - SELECT user_id, room_id, + SELECT user_id, room_id, thread_id, coalesce(old.%s, 0) + upd.cnt, upd.stream_ordering FROM ( - SELECT user_id, room_id, count(*) as cnt, + SELECT user_id, room_id, thread_id, count(*) as cnt, max(ea.stream_ordering) as stream_ordering FROM event_push_actions AS ea - LEFT JOIN event_push_summary AS old USING (user_id, room_id) + LEFT JOIN event_push_summary AS old USING (user_id, room_id, thread_id) WHERE ? < ea.stream_ordering AND ea.stream_ordering <= ? AND ( old.last_receipt_stream_ordering IS NULL OR old.last_receipt_stream_ordering < ea.stream_ordering ) AND %s = 1 - GROUP BY user_id, room_id + GROUP BY user_id, room_id, thread_id ) AS upd - LEFT JOIN event_push_summary AS old USING (user_id, room_id) + LEFT JOIN event_push_summary AS old USING (user_id, room_id, thread_id) """ # First get the count of unread messages. @@ -1238,11 +1273,11 @@ def _rotate_notifs_before_txn( # object because we might not have the same amount of rows in each of them. To do # this, we use a dict indexed on the user ID and room ID to make it easier to # populate. - summaries: Dict[Tuple[str, str], _EventPushSummary] = {} + summaries: Dict[Tuple[str, str, str], _EventPushSummary] = {} for row in txn: - summaries[(row[0], row[1])] = _EventPushSummary( - unread_count=row[2], - stream_ordering=row[3], + summaries[(row[0], row[1], row[2])] = _EventPushSummary( + unread_count=row[3], + stream_ordering=row[4], notif_count=0, ) @@ -1253,34 +1288,35 @@ def _rotate_notifs_before_txn( ) for row in txn: - if (row[0], row[1]) in summaries: - summaries[(row[0], row[1])].notif_count = row[2] + if (row[0], row[1], row[2]) in summaries: + summaries[(row[0], row[1], row[2])].notif_count = row[3] else: # Because the rules on notifying are different than the rules on marking # a message unread, we might end up with messages that notify but aren't # marked unread, so we might not have a summary for this (user, room) # tuple to complete. - summaries[(row[0], row[1])] = _EventPushSummary( + summaries[(row[0], row[1], row[2])] = _EventPushSummary( unread_count=0, - stream_ordering=row[3], - notif_count=row[2], + stream_ordering=row[4], + notif_count=row[3], ) logger.info("Rotating notifications, handling %d rows", len(summaries)) - # TODO(threads): Update on a per-thread basis. self.db_pool.simple_upsert_many_txn( txn, table="event_push_summary", - key_names=("user_id", "room_id"), - key_values=[(user_id, room_id) for user_id, room_id in summaries], - value_names=("notif_count", "unread_count", "stream_ordering", "thread_id"), + key_names=("user_id", "room_id", "thread_id"), + key_values=[ + (user_id, room_id, thread_id) + for user_id, room_id, thread_id in summaries + ], + value_names=("notif_count", "unread_count", "stream_ordering"), value_values=[ ( summary.notif_count, summary.unread_count, summary.stream_ordering, - "main", ) for summary in summaries.values() ], diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index 95f740ed1043..4cdd8b8b9ad0 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -12,12 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Optional + from twisted.test.proto_helpers import MemoryReactor from synapse.rest import admin from synapse.rest.client import login, room from synapse.server import HomeServer from synapse.storage.databases.main.event_push_actions import NotifCounts +from synapse.types import JsonDict from synapse.util import Clock from tests.unittest import HomeserverTestCase @@ -130,6 +133,7 @@ def _mark_read(event_id: str) -> None: _assert_counts(0, 0) _create_event() + _assert_counts(1, 0) _rotate() _assert_counts(1, 0) @@ -180,6 +184,177 @@ def _mark_read(event_id: str) -> None: _rotate() _assert_counts(0, 0) + def test_count_aggregation_threads(self) -> None: + """ + This is essentially the same test as test_count_aggregation, but adds + events to the main timeline and to a thread. + """ + + # Create a user to receive notifications and send receipts. + user_id = self.register_user("user1235", "pass") + token = self.login("user1235", "pass") + + # And another users to send events. + other_id = self.register_user("other", "pass") + other_token = self.login("other", "pass") + + # Create a room and put both users in it. + room_id = self.helper.create_room_as(user_id, tok=token) + self.helper.join(room_id, other_id, tok=other_token) + thread_id: str + + last_event_id: str + + def _assert_counts( + noitf_count: int, + highlight_count: int, + thread_notif_count: int, + thread_highlight_count: int, + ) -> None: + counts = self.get_success( + self.store.db_pool.runInteraction( + "get-unread-counts", + self.store._get_unread_counts_by_receipt_txn, + room_id, + user_id, + ) + ) + self.assertEqual( + counts.main_timeline, + NotifCounts( + notify_count=noitf_count, + unread_count=0, + highlight_count=highlight_count, + ), + ) + if thread_notif_count or thread_highlight_count: + self.assertEqual( + counts.threads, + { + thread_id: NotifCounts( + notify_count=thread_notif_count, + unread_count=0, + highlight_count=thread_highlight_count, + ), + }, + ) + else: + self.assertEqual(counts.threads, {}) + + def _create_event( + highlight: bool = False, thread_id: Optional[str] = None + ) -> str: + content: JsonDict = { + "msgtype": "m.text", + "body": user_id if highlight else "msg", + } + if thread_id: + content["m.relates_to"] = { + "rel_type": "m.thread", + "event_id": thread_id, + } + + result = self.helper.send_event( + room_id, + type="m.room.message", + content=content, + tok=other_token, + ) + nonlocal last_event_id + last_event_id = result["event_id"] + return last_event_id + + def _rotate() -> None: + self.get_success(self.store._rotate_notifs()) + + def _mark_read(event_id: str) -> None: + self.get_success( + self.store.insert_receipt( + room_id, + "m.read", + user_id=user_id, + event_ids=[event_id], + data={}, + ) + ) + + _assert_counts(0, 0, 0, 0) + thread_id = _create_event() + _assert_counts(1, 0, 0, 0) + _rotate() + _assert_counts(1, 0, 0, 0) + + _create_event(thread_id=thread_id) + _assert_counts(1, 0, 1, 0) + _rotate() + _assert_counts(1, 0, 1, 0) + + _create_event() + _assert_counts(2, 0, 1, 0) + _rotate() + _assert_counts(2, 0, 1, 0) + + event_id = _create_event(thread_id=thread_id) + _assert_counts(2, 0, 2, 0) + _rotate() + _assert_counts(2, 0, 2, 0) + + _create_event() + _create_event(thread_id=thread_id) + _mark_read(event_id) + _assert_counts(1, 0, 1, 0) + + _mark_read(last_event_id) + _assert_counts(0, 0, 0, 0) + + _create_event() + _create_event(thread_id=thread_id) + _assert_counts(1, 0, 1, 0) + _rotate() + _assert_counts(1, 0, 1, 0) + + # Delete old event push actions, this should not affect the (summarised) count. + self.get_success(self.store._remove_old_push_actions_that_have_rotated()) + _assert_counts(1, 0, 1, 0) + + _mark_read(last_event_id) + _assert_counts(0, 0, 0, 0) + + _create_event(True) + _assert_counts(1, 1, 0, 0) + _rotate() + _assert_counts(1, 1, 0, 0) + + event_id = _create_event(True, thread_id) + _assert_counts(1, 1, 1, 1) + _rotate() + _assert_counts(1, 1, 1, 1) + + # Check that adding another notification and rotating after highlight + # works. + _create_event() + _rotate() + _assert_counts(2, 1, 1, 1) + + _create_event(thread_id=thread_id) + _rotate() + _assert_counts(2, 1, 2, 1) + + # Check that sending read receipts at different points results in the + # right counts. + _mark_read(event_id) + _assert_counts(1, 0, 1, 0) + _mark_read(last_event_id) + _assert_counts(0, 0, 0, 0) + + _create_event(True) + _create_event(True, thread_id) + _assert_counts(1, 1, 1, 1) + _mark_read(last_event_id) + _assert_counts(0, 0, 0, 0) + _rotate() + _assert_counts(0, 0, 0, 0) + def test_find_first_stream_ordering_after_ts(self) -> None: def add_event(so: int, ts: int) -> None: self.get_success( From cb679e2379f981bc4fa3675b9ac37d642dc32985 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 16 Sep 2022 11:27:29 -0400 Subject: [PATCH 05/27] Clarify comment. --- synapse/storage/databases/main/event_push_actions.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 7cc500e8d3c7..59faf9c07bc7 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -476,7 +476,11 @@ def _get_unread_counts_by_pos_txn( notify_count=notif_count, unread_count=unread_count ) - # XXX All threads should have the same stream ordering? + # Summaries will only be used if they have not been invalidated by + # a recent receipt; track the latest stream ordering or a valid summary. + # + # Note that since there's only one read receipt in the room per user, + # valid summaries are contiguous. max_summary_stream_ordering = max( summary_stream_ordering, max_summary_stream_ordering ) From ba00c5f08f13b51359f3156d4ff958ea5330a4e8 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 16 Sep 2022 12:17:02 -0400 Subject: [PATCH 06/27] Simplify handling of summaries with neither notifications or unread counts. --- .../databases/main/event_push_actions.py | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 59faf9c07bc7..a7dd37b27ded 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -452,6 +452,9 @@ def _get_unread_counts_by_pos_txn( # date (as the row was written by an older version of Synapse that # updated `event_push_summary` synchronously when persisting a new read # receipt). + # + # Note that rows in event_push_summary are not immediately deleted when + # the summary is reset, so avoid pulling those entries. txn.execute( """ SELECT stream_ordering, notif_count, COALESCE(unread_count, 0), thread_id @@ -460,7 +463,7 @@ def _get_unread_counts_by_pos_txn( AND ( (last_receipt_stream_ordering IS NULL AND stream_ordering > ?) OR last_receipt_stream_ordering = ? - ) + ) AND (notif_count OR unread_count) """, (room_id, user_id, receipt_stream_ordering, receipt_stream_ordering), ) @@ -470,8 +473,7 @@ def _get_unread_counts_by_pos_txn( counts = NotifCounts( notify_count=notif_count, unread_count=unread_count ) - # TODO Delete zeroed out threads completely from the database. - elif notif_count or unread_count: + else: thread_counts[thread_id] = NotifCounts( notify_count=notif_count, unread_count=unread_count ) @@ -498,13 +500,12 @@ def _get_unread_counts_by_pos_txn( for highlight_count, thread_id in txn: if thread_id == "main": counts.highlight_count += highlight_count - elif highlight_count: - if thread_id in thread_counts: - thread_counts[thread_id].highlight_count += highlight_count - else: - thread_counts[thread_id] = NotifCounts( - notify_count=0, unread_count=0, highlight_count=highlight_count - ) + elif thread_id in thread_counts: + thread_counts[thread_id].highlight_count += highlight_count + else: + thread_counts[thread_id] = NotifCounts( + notify_count=0, unread_count=0, highlight_count=highlight_count + ) # Finally we need to count push actions that aren't included in the # summary returned above. This might be due to recent events that haven't From eb56567ebee9206f44b1d4c5b1b6bd4ce9a4ced9 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 16 Sep 2022 12:33:55 -0400 Subject: [PATCH 07/27] Delete old push summaries. --- .../databases/main/event_push_actions.py | 64 +++++++++++++++++-- 1 file changed, 60 insertions(+), 4 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index a7dd37b27ded..a79a34a871ec 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -1333,7 +1333,14 @@ def _rotate_notifs_before_txn( ) async def _remove_old_push_actions_that_have_rotated(self) -> None: - """Clear out old push actions that have been summarised.""" + """ + Performs two clean-ups: + + 1. Clear out old push actions that have been summarised (and are older + than 1 day ago). + 2. Clear out old push summaries that are empty (and are older than 30 + days ago). + """ # We want to clear out anything that is older than a day that *has* already # been rotated. @@ -1343,7 +1350,7 @@ async def _remove_old_push_actions_that_have_rotated(self) -> None: retcol="stream_ordering", ) - max_stream_ordering_to_delete = min( + max_action_stream_ordering_to_delete = min( rotated_upto_stream_ordering, self.stream_ordering_day_ago ) @@ -1361,7 +1368,7 @@ def remove_old_push_actions_that_have_rotated_txn( ORDER BY stream_ordering ASC LIMIT 1 OFFSET ? """, ( - max_stream_ordering_to_delete, + max_action_stream_ordering_to_delete, batch_size, ), ) @@ -1370,7 +1377,7 @@ def remove_old_push_actions_that_have_rotated_txn( if stream_row: (stream_ordering,) = stream_row else: - stream_ordering = max_stream_ordering_to_delete + stream_ordering = max_action_stream_ordering_to_delete # We need to use a inclusive bound here to handle the case where a # single stream ordering has more than `batch_size` rows. @@ -1386,6 +1393,47 @@ def remove_old_push_actions_that_have_rotated_txn( return txn.rowcount < batch_size + max_summary_stream_ordering_to_delete = self.stream_ordering_month_ago + + def remove_old_read_push_summaries_txn(txn: LoggingTransaction) -> bool: + # We don't want to clear out too much at a time, so we bound our + # deletes. + batch_size = self._rotate_count + + txn.execute( + """ + SELECT stream_ordering FROM event_push_summary + WHERE stream_ordering <= ? AND notif_count = 0 AND COALESCE(unread_count, 0) = 0 + ORDER BY stream_ordering ASC LIMIT 1 OFFSET ? + """, + ( + max_summary_stream_ordering_to_delete, + batch_size, + ), + ) + stream_row = txn.fetchone() + + if stream_row: + (stream_ordering,) = stream_row + else: + stream_ordering = max_summary_stream_ordering_to_delete + + # We need to use a inclusive bound here to handle the case where a + # single stream ordering has more than `batch_size` rows. + txn.execute( + """ + DELETE FROM event_push_summary + WHERE stream_ordering <= ? AND notif_count = 0 AND COALESCE(unread_count, 0) = 0 + """, + (stream_ordering,), + ) + + logger.info( + "Rotating notifications, deleted %s push summaries", txn.rowcount + ) + + return txn.rowcount < batch_size + while True: done = await self.db_pool.runInteraction( "_remove_old_push_actions_that_have_rotated", @@ -1394,6 +1442,14 @@ def remove_old_push_actions_that_have_rotated_txn( if done: break + while True: + done = await self.db_pool.runInteraction( + "_remove_old_read_push_summaries_txn", + remove_old_read_push_summaries_txn, + ) + if done: + break + class EventPushActionsStore(EventPushActionsWorkerStore): EPA_HIGHLIGHT_INDEX = "epa_highlight_index" From 8b63c5baac6e44b550dd8fe3f04d3891c22446bf Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 19 Sep 2022 09:27:18 -0400 Subject: [PATCH 08/27] Fix postgres compatibility. --- synapse/storage/databases/main/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index a79a34a871ec..2bf67a12fe46 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -463,7 +463,7 @@ def _get_unread_counts_by_pos_txn( AND ( (last_receipt_stream_ordering IS NULL AND stream_ordering > ?) OR last_receipt_stream_ordering = ? - ) AND (notif_count OR unread_count) + ) AND (notif_count != 0 OR COALESCE(unread_count, 0) != 0) """, (room_id, user_id, receipt_stream_ordering, receipt_stream_ordering), ) From c4f2d50571029130b0445027e6ec0ef658db0104 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 20 Sep 2022 13:14:38 -0400 Subject: [PATCH 09/27] Create a constant for "main". --- synapse/api/constants.py | 3 +++ synapse/push/bulk_push_rule_evaluator.py | 4 ++-- synapse/storage/databases/main/event_push_actions.py | 8 ++++---- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/synapse/api/constants.py b/synapse/api/constants.py index c178ddf070b4..3882f5c9b251 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -31,6 +31,9 @@ # the maximum length for a user id is 255 characters MAX_USERID_LENGTH = 255 +# Constant value used for the pseudo-thread which is the main timeline. +MAIN_TIMELINE: Final = "main" + class Membership: diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 404379ef67d3..d2c306b1d4b9 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -30,7 +30,7 @@ from prometheus_client import Counter -from synapse.api.constants import EventTypes, Membership, RelationTypes +from synapse.api.constants import MAIN_TIMELINE, EventTypes, Membership, RelationTypes from synapse.event_auth import auth_types_for_event, get_user_power_level from synapse.events import EventBase, relation_from_event from synapse.events.snapshot import EventContext @@ -277,7 +277,7 @@ async def action_for_event_by_user( # If the event does not have a relation, then cannot have any mutual # relations or thread ID. relations = {} - thread_id = "main" + thread_id = MAIN_TIMELINE if relation: relations = await self._get_mutual_relations( relation.parent_id, diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 2bf67a12fe46..ab390fe0df3a 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -88,7 +88,7 @@ import attr -from synapse.api.constants import ReceiptTypes +from synapse.api.constants import MAIN_TIMELINE, ReceiptTypes from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import ( @@ -469,7 +469,7 @@ def _get_unread_counts_by_pos_txn( ) max_summary_stream_ordering = 0 for summary_stream_ordering, notif_count, unread_count, thread_id in txn: - if thread_id == "main": + if thread_id == MAIN_TIMELINE: counts = NotifCounts( notify_count=notif_count, unread_count=unread_count ) @@ -498,7 +498,7 @@ def _get_unread_counts_by_pos_txn( """ txn.execute(sql, (user_id, room_id, receipt_stream_ordering)) for highlight_count, thread_id in txn: - if thread_id == "main": + if thread_id == MAIN_TIMELINE: counts.highlight_count += highlight_count elif thread_id in thread_counts: thread_counts[thread_id].highlight_count += highlight_count @@ -519,7 +519,7 @@ def _get_unread_counts_by_pos_txn( ) for notif_count, unread_count, thread_id in unread_counts: - if thread_id == "main": + if thread_id == MAIN_TIMELINE: counts.notify_count += notif_count counts.unread_count += unread_count elif thread_id in thread_counts: From 6927e5939374b88cac0cb07e3dccfe0dc0385388 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 20 Sep 2022 13:50:38 -0400 Subject: [PATCH 10/27] Reduce duplicated code. --- .../databases/main/event_push_actions.py | 43 ++++++------------- 1 file changed, 13 insertions(+), 30 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index ab390fe0df3a..3e4384d734b4 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -436,8 +436,12 @@ def _get_unread_counts_by_pos_txn( and threads. """ - counts = NotifCounts() + main_counts = NotifCounts() thread_counts = {} + def _get_thread(thread_id: str) -> NotifCounts: + if thread_id == MAIN_TIMELINE: + return main_counts + return thread_counts.setdefault(thread_id, NotifCounts()) # First we pull the counts from the summary table. # @@ -469,14 +473,9 @@ def _get_unread_counts_by_pos_txn( ) max_summary_stream_ordering = 0 for summary_stream_ordering, notif_count, unread_count, thread_id in txn: - if thread_id == MAIN_TIMELINE: - counts = NotifCounts( - notify_count=notif_count, unread_count=unread_count - ) - else: - thread_counts[thread_id] = NotifCounts( - notify_count=notif_count, unread_count=unread_count - ) + counts = _get_thread(thread_id) + counts.notify_count += notif_count + counts.unread_count += unread_count # Summaries will only be used if they have not been invalidated by # a recent receipt; track the latest stream ordering or a valid summary. @@ -498,14 +497,7 @@ def _get_unread_counts_by_pos_txn( """ txn.execute(sql, (user_id, room_id, receipt_stream_ordering)) for highlight_count, thread_id in txn: - if thread_id == MAIN_TIMELINE: - counts.highlight_count += highlight_count - elif thread_id in thread_counts: - thread_counts[thread_id].highlight_count += highlight_count - else: - thread_counts[thread_id] = NotifCounts( - notify_count=0, unread_count=0, highlight_count=highlight_count - ) + _get_thread(thread_id).highlight_count += highlight_count # Finally we need to count push actions that aren't included in the # summary returned above. This might be due to recent events that haven't @@ -519,20 +511,11 @@ def _get_unread_counts_by_pos_txn( ) for notif_count, unread_count, thread_id in unread_counts: - if thread_id == MAIN_TIMELINE: - counts.notify_count += notif_count - counts.unread_count += unread_count - elif thread_id in thread_counts: - thread_counts[thread_id].notify_count += notif_count - thread_counts[thread_id].unread_count += unread_count - else: - thread_counts[thread_id] = NotifCounts( - notify_count=notif_count, - unread_count=unread_count, - highlight_count=0, - ) + counts = _get_thread(thread_id) + counts.notify_count += notif_count + counts.unread_count += unread_count - return RoomNotifCounts(counts, thread_counts) + return RoomNotifCounts(main_counts, thread_counts) def _get_notif_unread_count_for_user_room( self, From 1d0597591bfb081d095962f5ea5a7d4f6714960b Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 20 Sep 2022 14:53:56 -0400 Subject: [PATCH 11/27] Lint --- synapse/storage/databases/main/event_push_actions.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 3e4384d734b4..e4448131c481 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -437,7 +437,8 @@ def _get_unread_counts_by_pos_txn( """ main_counts = NotifCounts() - thread_counts = {} + thread_counts: Dict[str, NotifCounts] = {} + def _get_thread(thread_id: str) -> NotifCounts: if thread_id == MAIN_TIMELINE: return main_counts From 55d15a311d06a6a8f742363a4ac1cce12b73a8ac Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 22 Sep 2022 09:55:11 -0400 Subject: [PATCH 12/27] Threads must already be summarized between the stream orderings that are being updated. --- synapse/storage/databases/main/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index e4448131c481..51be0db07bde 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -1138,7 +1138,7 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: # Then any updated threads get their notification count and unread # count updated. - self.db_pool.simple_upsert_many_txn( + self.db_pool.simple_update_many_txn( txn, table="event_push_summary", key_names=("room_id", "user_id", "thread_id"), From 56c21e42d142967b8e5daf28ec898b560f94101a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 22 Sep 2022 09:56:55 -0400 Subject: [PATCH 13/27] Don't delete empty push summaries. --- .../databases/main/event_push_actions.py | 66 ++----------------- 1 file changed, 5 insertions(+), 61 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 51be0db07bde..5a8a9d32f61b 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -457,9 +457,6 @@ def _get_thread(thread_id: str) -> NotifCounts: # date (as the row was written by an older version of Synapse that # updated `event_push_summary` synchronously when persisting a new read # receipt). - # - # Note that rows in event_push_summary are not immediately deleted when - # the summary is reset, so avoid pulling those entries. txn.execute( """ SELECT stream_ordering, notif_count, COALESCE(unread_count, 0), thread_id @@ -1318,12 +1315,8 @@ def _rotate_notifs_before_txn( async def _remove_old_push_actions_that_have_rotated(self) -> None: """ - Performs two clean-ups: - - 1. Clear out old push actions that have been summarised (and are older - than 1 day ago). - 2. Clear out old push summaries that are empty (and are older than 30 - days ago). + Clear out old push actions that have been summarised (and are older than + 1 day ago). """ # We want to clear out anything that is older than a day that *has* already @@ -1334,7 +1327,7 @@ async def _remove_old_push_actions_that_have_rotated(self) -> None: retcol="stream_ordering", ) - max_action_stream_ordering_to_delete = min( + max_stream_ordering_to_delete = min( rotated_upto_stream_ordering, self.stream_ordering_day_ago ) @@ -1352,7 +1345,7 @@ def remove_old_push_actions_that_have_rotated_txn( ORDER BY stream_ordering ASC LIMIT 1 OFFSET ? """, ( - max_action_stream_ordering_to_delete, + max_stream_ordering_to_delete, batch_size, ), ) @@ -1361,7 +1354,7 @@ def remove_old_push_actions_that_have_rotated_txn( if stream_row: (stream_ordering,) = stream_row else: - stream_ordering = max_action_stream_ordering_to_delete + stream_ordering = max_stream_ordering_to_delete # We need to use a inclusive bound here to handle the case where a # single stream ordering has more than `batch_size` rows. @@ -1377,47 +1370,6 @@ def remove_old_push_actions_that_have_rotated_txn( return txn.rowcount < batch_size - max_summary_stream_ordering_to_delete = self.stream_ordering_month_ago - - def remove_old_read_push_summaries_txn(txn: LoggingTransaction) -> bool: - # We don't want to clear out too much at a time, so we bound our - # deletes. - batch_size = self._rotate_count - - txn.execute( - """ - SELECT stream_ordering FROM event_push_summary - WHERE stream_ordering <= ? AND notif_count = 0 AND COALESCE(unread_count, 0) = 0 - ORDER BY stream_ordering ASC LIMIT 1 OFFSET ? - """, - ( - max_summary_stream_ordering_to_delete, - batch_size, - ), - ) - stream_row = txn.fetchone() - - if stream_row: - (stream_ordering,) = stream_row - else: - stream_ordering = max_summary_stream_ordering_to_delete - - # We need to use a inclusive bound here to handle the case where a - # single stream ordering has more than `batch_size` rows. - txn.execute( - """ - DELETE FROM event_push_summary - WHERE stream_ordering <= ? AND notif_count = 0 AND COALESCE(unread_count, 0) = 0 - """, - (stream_ordering,), - ) - - logger.info( - "Rotating notifications, deleted %s push summaries", txn.rowcount - ) - - return txn.rowcount < batch_size - while True: done = await self.db_pool.runInteraction( "_remove_old_push_actions_that_have_rotated", @@ -1426,14 +1378,6 @@ def remove_old_read_push_summaries_txn(txn: LoggingTransaction) -> bool: if done: break - while True: - done = await self.db_pool.runInteraction( - "_remove_old_read_push_summaries_txn", - remove_old_read_push_summaries_txn, - ) - if done: - break - class EventPushActionsStore(EventPushActionsWorkerStore): EPA_HIGHLIGHT_INDEX = "epa_highlight_index" From f20620fa94840ad4f2c615fd04463cb4329077ed Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 12 Sep 2022 13:20:53 -0400 Subject: [PATCH 14/27] Update constraints and indexes now that thread ID is used. --- .../03thread_receipts_non_null.sql.postgres | 23 ++++++ .../73/03thread_receipts_non_null.sql.sqlite | 76 +++++++++++++++++++ 2 files changed, 99 insertions(+) create mode 100644 synapse/storage/schema/main/delta/73/03thread_receipts_non_null.sql.postgres create mode 100644 synapse/storage/schema/main/delta/73/03thread_receipts_non_null.sql.sqlite diff --git a/synapse/storage/schema/main/delta/73/03thread_receipts_non_null.sql.postgres b/synapse/storage/schema/main/delta/73/03thread_receipts_non_null.sql.postgres new file mode 100644 index 000000000000..3e0bc9e5eb0b --- /dev/null +++ b/synapse/storage/schema/main/delta/73/03thread_receipts_non_null.sql.postgres @@ -0,0 +1,23 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Drop constraint on (room_id, receipt_type, user_id). + +-- Rebuild the unique constraint with the thread_id. +ALTER TABLE receipts_linearized + DROP CONSTRAINT receipts_linearized_uniqueness; + +ALTER TABLE receipts_graph + DROP CONSTRAINT receipts_graph_uniqueness; diff --git a/synapse/storage/schema/main/delta/73/03thread_receipts_non_null.sql.sqlite b/synapse/storage/schema/main/delta/73/03thread_receipts_non_null.sql.sqlite new file mode 100644 index 000000000000..328a694dc0a0 --- /dev/null +++ b/synapse/storage/schema/main/delta/73/03thread_receipts_non_null.sql.sqlite @@ -0,0 +1,76 @@ +/* Copyright 2022 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Drop constraint on (room_id, receipt_type, user_id). +-- +-- SQLite doesn't support modifying constraints to an existing table, so it must +-- be recreated. + +-- Create the new tables. +CREATE TABLE receipts_linearized_new ( + stream_id BIGINT NOT NULL, + room_id TEXT NOT NULL, + receipt_type TEXT NOT NULL, + user_id TEXT NOT NULL, + event_id TEXT NOT NULL, + thread_id TEXT, + event_stream_ordering BIGINT, + data TEXT NOT NULL, + CONSTRAINT receipts_linearized_uniqueness_thread UNIQUE (room_id, receipt_type, user_id, thread_id) +); + +CREATE TABLE receipts_graph_new ( + room_id TEXT NOT NULL, + receipt_type TEXT NOT NULL, + user_id TEXT NOT NULL, + event_ids TEXT NOT NULL, + thread_id TEXT, + data TEXT NOT NULL, + CONSTRAINT receipts_graph_uniqueness_thread UNIQUE (room_id, receipt_type, user_id, thread_id) +); + +-- Drop the old indexes. +DROP INDEX IF EXISTS receipts_linearized_id; +DROP INDEX IF EXISTS receipts_linearized_room_stream; +DROP INDEX IF EXISTS receipts_linearized_user; + +-- Copy the data. +INSERT INTO receipts_linearized_new (stream_id, room_id, receipt_type, user_id, event_id, data) + SELECT stream_id, room_id, receipt_type, user_id, event_id, data + FROM receipts_linearized; +INSERT INTO receipts_graph_new (room_id, receipt_type, user_id, event_ids, data) + SELECT room_id, receipt_type, user_id, event_ids, data + FROM receipts_graph; + +-- Drop the old tables. +DROP TABLE receipts_linearized; +DROP TABLE receipts_graph; + +-- Rename the tables. +ALTER TABLE receipts_linearized_new RENAME TO receipts_linearized; +ALTER TABLE receipts_graph_new RENAME TO receipts_graph; + +-- Create the indices. +CREATE INDEX receipts_linearized_id ON receipts_linearized( stream_id ); +CREATE INDEX receipts_linearized_room_stream ON receipts_linearized( room_id, stream_id ); +CREATE INDEX receipts_linearized_user ON receipts_linearized( user_id ); + +-- Re-run background updates from 72/08thread_receipts.sql. +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (7303, 'receipts_linearized_unique_index', '{}') + ON CONFLICT (update_name) DO NOTHING; +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (7303, 'receipts_graph_unique_index', '{}') + ON CONFLICT (update_name) DO NOTHING; From 52b0a3d3e2843f1f978b9719d6b8257304c12e37 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 22 Sep 2022 13:24:52 -0400 Subject: [PATCH 15/27] Mark threads as read separately. --- changelog.d/13877.feature | 1 + .../databases/main/event_push_actions.py | 249 +++++++++++++++--- synapse/storage/databases/main/receipts.py | 5 +- tests/storage/test_event_push_actions.py | 188 ++++++++++++- 4 files changed, 405 insertions(+), 38 deletions(-) create mode 100644 changelog.d/13877.feature diff --git a/changelog.d/13877.feature b/changelog.d/13877.feature new file mode 100644 index 000000000000..d0cb902dffd0 --- /dev/null +++ b/changelog.d/13877.feature @@ -0,0 +1 @@ +Experimental support for thread-specific receipts ([MSC3771](https://github.com/matrix-org/matrix-spec-proposals/pull/3771)). diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 3e76009f7c28..493fe8a15253 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -95,6 +95,7 @@ DatabasePool, LoggingDatabaseConnection, LoggingTransaction, + PostgresEngine, ) from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.stream import StreamWorkerStore @@ -427,8 +428,8 @@ def _get_unread_counts_by_pos_txn( room_id: The room ID to get unread counts for. user_id: The user ID to get unread counts for. receipt_stream_ordering: The stream ordering of the user's latest - receipt in the room. If there are no receipts, the stream ordering - of the user's join event. + unthreaded receipt in the room. If there are no unthreaded receipts, + the stream ordering of the user's join event. Returns: A RoomNotifCounts object containing the notification count, the @@ -444,6 +445,20 @@ def _get_thread(thread_id: str) -> NotifCounts: return main_counts return thread_counts.setdefault(thread_id, NotifCounts()) + receipt_types_clause, receipts_args = make_in_list_sql_clause( + self.database_engine, + "receipt_type", + (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE), + ) + + # PostgreSQL and SQLite differ in comparing scalar numerics. + if isinstance(self.database_engine, PostgresEngine): + # GREATEST ignores NULLs. + receipt_stream_clause = "GREATEST(receipt_stream_ordering, ?)" + else: + # MAX returns NULL if any are NULL, so COALESCE to 0 first. + receipt_stream_clause = "MAX(COALESCE(receipt_stream_ordering, 0), ?)" + # First we pull the counts from the summary table. # # We check that `last_receipt_stream_ordering` matches the stream @@ -458,57 +473,151 @@ def _get_thread(thread_id: str) -> NotifCounts: # updated `event_push_summary` synchronously when persisting a new read # receipt). txn.execute( - """ - SELECT stream_ordering, notif_count, COALESCE(unread_count, 0), thread_id + f""" + SELECT notif_count, COALESCE(unread_count, 0), thread_id FROM event_push_summary + LEFT JOIN ( + SELECT thread_id, MAX(stream_ordering) AS receipt_stream_ordering + FROM receipts_linearized + LEFT JOIN events USING (room_id, event_id) + WHERE + user_id = ? + AND room_id = ? + AND {receipt_types_clause} + GROUP BY thread_id + ) AS receipts USING (thread_id) WHERE room_id = ? AND user_id = ? AND ( - (last_receipt_stream_ordering IS NULL AND stream_ordering > ?) - OR last_receipt_stream_ordering = ? + (last_receipt_stream_ordering IS NULL AND stream_ordering > {receipt_stream_clause}) + OR last_receipt_stream_ordering = {receipt_stream_clause} ) AND (notif_count != 0 OR COALESCE(unread_count, 0) != 0) """, - (room_id, user_id, receipt_stream_ordering, receipt_stream_ordering), + ( + user_id, + room_id, + *receipts_args, + room_id, + user_id, + receipt_stream_ordering, + receipt_stream_ordering, + ), ) - max_summary_stream_ordering = 0 - for summary_stream_ordering, notif_count, unread_count, thread_id in txn: + summarised_threads = set() + for notif_count, unread_count, thread_id in txn: + summarised_threads.add(thread_id) counts = _get_thread(thread_id) counts.notify_count += notif_count counts.unread_count += unread_count - # Summaries will only be used if they have not been invalidated by - # a recent receipt; track the latest stream ordering or a valid summary. - # - # Note that since there's only one read receipt in the room per user, - # valid summaries are contiguous. - max_summary_stream_ordering = max( - summary_stream_ordering, max_summary_stream_ordering - ) - # Next we need to count highlights, which aren't summarised - sql = """ + sql = f""" SELECT COUNT(*), thread_id FROM event_push_actions + LEFT JOIN ( + SELECT thread_id, MAX(stream_ordering) AS receipt_stream_ordering + FROM receipts_linearized + LEFT JOIN events USING (room_id, event_id) + WHERE + user_id = ? + AND room_id = ? + AND {receipt_types_clause} + GROUP BY thread_id + ) AS receipts USING (thread_id) WHERE user_id = ? AND room_id = ? - AND stream_ordering > ? + AND stream_ordering > {receipt_stream_clause} AND highlight = 1 GROUP BY thread_id """ - txn.execute(sql, (user_id, room_id, receipt_stream_ordering)) + txn.execute( + sql, + ( + user_id, + room_id, + *receipts_args, + user_id, + room_id, + receipt_stream_ordering, + ), + ) for highlight_count, thread_id in txn: _get_thread(thread_id).highlight_count += highlight_count + # For threads which were summarised we need to count actions since the last + # rotation. + thread_id_clause, thread_id_args = make_in_list_sql_clause( + self.database_engine, "thread_id", summarised_threads + ) + + # The (inclusive) event stream ordering that was previously summarised. + rotated_upto_stream_ordering = self.db_pool.simple_select_one_onecol_txn( + txn, + table="event_push_summary_stream_ordering", + keyvalues={}, + retcol="stream_ordering", + ) + + unread_counts = self._get_notif_unread_count_for_user_room( + txn, room_id, user_id, rotated_upto_stream_ordering + ) + for notif_count, unread_count, thread_id in unread_counts: + if thread_id not in summarised_threads: + continue + + if thread_id == "main": + counts.notify_count += notif_count + counts.unread_count += unread_count + elif thread_id in thread_counts: + thread_counts[thread_id].notify_count += notif_count + thread_counts[thread_id].unread_count += unread_count + else: + # Previous thread summaries of 0 are discarded above. + # + # TODO If empty summaries are deleted this can be removed. + thread_counts[thread_id] = NotifCounts( + notify_count=notif_count, + unread_count=unread_count, + highlight_count=0, + ) + # Finally we need to count push actions that aren't included in the # summary returned above. This might be due to recent events that haven't # been summarised yet or the summary is out of date due to a recent read # receipt. - start_unread_stream_ordering = max( - receipt_stream_ordering, max_summary_stream_ordering - ) - unread_counts = self._get_notif_unread_count_for_user_room( - txn, room_id, user_id, start_unread_stream_ordering + sql = f""" + SELECT + COUNT(CASE WHEN notif = 1 THEN 1 END), + COUNT(CASE WHEN unread = 1 THEN 1 END), + thread_id + FROM event_push_actions + LEFT JOIN ( + SELECT thread_id, MAX(stream_ordering) AS receipt_stream_ordering + FROM receipts_linearized + LEFT JOIN events USING (room_id, event_id) + WHERE + user_id = ? + AND room_id = ? + AND {receipt_types_clause} + GROUP BY thread_id + ) AS receipts USING (thread_id) + WHERE user_id = ? + AND room_id = ? + AND stream_ordering > {receipt_stream_clause} + AND NOT {thread_id_clause} + GROUP BY thread_id + """ + txn.execute( + sql, + ( + user_id, + room_id, + *receipts_args, + user_id, + room_id, + receipt_stream_ordering, + *thread_id_args, + ), ) - - for notif_count, unread_count, thread_id in unread_counts: + for notif_count, unread_count, thread_id in txn: counts = _get_thread(thread_id) counts.notify_count += notif_count counts.unread_count += unread_count @@ -522,6 +631,7 @@ def _get_notif_unread_count_for_user_room( user_id: str, stream_ordering: int, max_stream_ordering: Optional[int] = None, + thread_id: Optional[str] = None, ) -> List[Tuple[int, int, str]]: """Returns the notify and unread counts from `event_push_actions` for the given user/room in the given range. @@ -547,10 +657,10 @@ def _get_notif_unread_count_for_user_room( if not self._events_stream_cache.has_entity_changed(room_id, stream_ordering): return [] - clause = "" + stream_ordering_clause = "" args = [user_id, room_id, stream_ordering] if max_stream_ordering is not None: - clause = "AND ea.stream_ordering <= ?" + stream_ordering_clause = "AND ea.stream_ordering <= ?" args.append(max_stream_ordering) # If the max stream ordering is less than the min stream ordering, @@ -558,6 +668,11 @@ def _get_notif_unread_count_for_user_room( if max_stream_ordering <= stream_ordering: return [] + thread_id_clause = "" + if thread_id is not None: + thread_id_clause = "AND thread_id = ?" + args.append(thread_id) + sql = f""" SELECT COUNT(CASE WHEN notif = 1 THEN 1 END), @@ -567,7 +682,8 @@ def _get_notif_unread_count_for_user_room( WHERE user_id = ? AND room_id = ? AND ea.stream_ordering > ? - {clause} + {stream_ordering_clause} + {thread_id_clause} GROUP BY thread_id """ @@ -1083,7 +1199,7 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: ) sql = """ - SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering + SELECT r.stream_id, r.room_id, r.user_id, r.thread_id, e.stream_ordering FROM receipts_linearized AS r INNER JOIN events AS e USING (event_id) WHERE ? < r.stream_id AND r.stream_id <= ? AND user_id LIKE ? @@ -1106,13 +1222,18 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: ) rows = txn.fetchall() - # For each new read receipt we delete push actions from before it and - # recalculate the summary. - for _, room_id, user_id, stream_ordering in rows: + # First handle all the rows without a thread ID (i.e. ones that apply to + # the entire room). + for _, room_id, user_id, thread_id, stream_ordering in rows: # Only handle our own read receipts. if not self.hs.is_mine_id(user_id): continue + if thread_id is not None: + continue + + # For each new read receipt we delete push actions from before it and + # recalculate the summary. txn.execute( """ DELETE FROM event_push_actions @@ -1154,6 +1275,64 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: value_values=[(row[0], row[1]) for row in unread_counts], ) + # For each new read receipt we delete push actions from before it and + # recalculate the summary. + for _, room_id, user_id, thread_id, stream_ordering in rows: + # Only handle our own read receipts. + if not self.hs.is_mine_id(user_id): + continue + + if thread_id is None: + continue + + # For each new read receipt we delete push actions from before it and + # recalculate the summary. + txn.execute( + """ + DELETE FROM event_push_actions + WHERE room_id = ? + AND user_id = ? + AND thread_id = ? + AND stream_ordering <= ? + AND highlight = 0 + """, + (room_id, user_id, thread_id, stream_ordering), + ) + + # Fetch the notification counts between the stream ordering of the + # latest receipt and what was previously summarised. + unread_counts = self._get_notif_unread_count_for_user_room( + txn, + room_id, + user_id, + stream_ordering, + old_rotate_stream_ordering, + thread_id, + ) + # unread_counts will be a list of 0 or 1 items. + if unread_counts: + notif_count, unread_count, _ = unread_counts[0] + else: + notif_count = 0 + unread_count = 0 + + # Update the summary of this specific thread. + self.db_pool.simple_upsert_txn( + txn, + table="event_push_summary", + keyvalues={ + "room_id": room_id, + "user_id": user_id, + "thread_id": thread_id, + }, + values={ + "notif_count": notif_count, + "unread_count": unread_count, + "stream_ordering": old_rotate_stream_ordering, + "last_receipt_stream_ordering": stream_ordering, + }, + ) + # We always update `event_push_summary_last_receipt_stream_id` to # ensure that we don't rescan the same receipts for remote users. diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 52fe0db92405..c7e812224869 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -170,8 +170,8 @@ def get_last_receipt_for_user_txn( receipt_types: Collection[str], ) -> Optional[Tuple[str, int]]: """ - Fetch the event ID and stream_ordering for the latest receipt in a room - with one of the given receipt types. + Fetch the event ID and stream_ordering for the latest unthreaded receipt + in a room with one of the given receipt types. Args: user_id: The user to fetch receipts for. @@ -193,6 +193,7 @@ def get_last_receipt_for_user_txn( WHERE {clause} AND user_id = ? AND room_id = ? + AND thread_id IS NULL ORDER BY stream_ordering DESC LIMIT 1 """ diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index 89f986ac3427..97dc1cc60a0f 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -312,7 +312,7 @@ def _create_event( def _rotate() -> None: self.get_success(self.store._rotate_notifs()) - def _mark_read(event_id: str, thread_id: Optional[str] = None) -> None: + def _mark_read(event_id: str, thread_id: str = "main") -> None: self.get_success( self.store.insert_receipt( room_id, @@ -348,9 +348,12 @@ def _mark_read(event_id: str, thread_id: Optional[str] = None) -> None: _create_event() _create_event(thread_id=thread_id) _mark_read(event_id) + _assert_counts(1, 0, 3, 0) + _mark_read(event_id, thread_id) _assert_counts(1, 0, 1, 0) _mark_read(last_event_id) + _mark_read(last_event_id, thread_id) _assert_counts(0, 0, 0, 0) _create_event() @@ -364,6 +367,7 @@ def _mark_read(event_id: str, thread_id: Optional[str] = None) -> None: _assert_counts(1, 0, 1, 0) _mark_read(last_event_id) + _mark_read(last_event_id, thread_id) _assert_counts(0, 0, 0, 0) _create_event(True) @@ -389,8 +393,190 @@ def _mark_read(event_id: str, thread_id: Optional[str] = None) -> None: # Check that sending read receipts at different points results in the # right counts. _mark_read(event_id) + _assert_counts(1, 0, 2, 1) + _mark_read(event_id, thread_id) _assert_counts(1, 0, 1, 0) _mark_read(last_event_id) + _assert_counts(0, 0, 1, 0) + _mark_read(last_event_id, thread_id) + _assert_counts(0, 0, 0, 0) + + _create_event(True) + _create_event(True, thread_id) + _assert_counts(1, 1, 1, 1) + _mark_read(last_event_id) + _mark_read(last_event_id, thread_id) + _assert_counts(0, 0, 0, 0) + _rotate() + _assert_counts(0, 0, 0, 0) + + def test_count_aggregation_mixed(self) -> None: + """ + This is essentially the same test as test_count_aggregation_threads, but + sends both unthreaded and threaded receipts. + """ + + # Create a user to receive notifications and send receipts. + user_id = self.register_user("user1235", "pass") + token = self.login("user1235", "pass") + + # And another users to send events. + other_id = self.register_user("other", "pass") + other_token = self.login("other", "pass") + + # Create a room and put both users in it. + room_id = self.helper.create_room_as(user_id, tok=token) + self.helper.join(room_id, other_id, tok=other_token) + thread_id: str + + last_event_id: str + + def _assert_counts( + noitf_count: int, + highlight_count: int, + thread_notif_count: int, + thread_highlight_count: int, + ) -> None: + counts = self.get_success( + self.store.db_pool.runInteraction( + "get-unread-counts", + self.store._get_unread_counts_by_receipt_txn, + room_id, + user_id, + ) + ) + self.assertEqual( + counts.main_timeline, + NotifCounts( + notify_count=noitf_count, + unread_count=0, + highlight_count=highlight_count, + ), + ) + if thread_notif_count or thread_highlight_count: + self.assertEqual( + counts.threads, + { + thread_id: NotifCounts( + notify_count=thread_notif_count, + unread_count=0, + highlight_count=thread_highlight_count, + ), + }, + ) + else: + self.assertEqual(counts.threads, {}) + + def _create_event( + highlight: bool = False, thread_id: Optional[str] = None + ) -> str: + content: JsonDict = { + "msgtype": "m.text", + "body": user_id if highlight else "msg", + } + if thread_id: + content["m.relates_to"] = { + "rel_type": "m.thread", + "event_id": thread_id, + } + + result = self.helper.send_event( + room_id, + type="m.room.message", + content=content, + tok=other_token, + ) + nonlocal last_event_id + last_event_id = result["event_id"] + return last_event_id + + def _rotate() -> None: + self.get_success(self.store._rotate_notifs()) + + def _mark_read(event_id: str, thread_id: Optional[str] = None) -> None: + self.get_success( + self.store.insert_receipt( + room_id, + "m.read", + user_id=user_id, + event_ids=[event_id], + thread_id=thread_id, + data={}, + ) + ) + + _assert_counts(0, 0, 0, 0) + thread_id = _create_event() + _assert_counts(1, 0, 0, 0) + _rotate() + _assert_counts(1, 0, 0, 0) + + _create_event(thread_id=thread_id) + _assert_counts(1, 0, 1, 0) + _rotate() + _assert_counts(1, 0, 1, 0) + + _create_event() + _assert_counts(2, 0, 1, 0) + _rotate() + _assert_counts(2, 0, 1, 0) + + event_id = _create_event(thread_id=thread_id) + _assert_counts(2, 0, 2, 0) + _rotate() + _assert_counts(2, 0, 2, 0) + + _create_event() + _create_event(thread_id=thread_id) + _mark_read(event_id) + _assert_counts(1, 0, 1, 0) + + _mark_read(last_event_id, "main") + _mark_read(last_event_id, thread_id) + _assert_counts(0, 0, 0, 0) + + _create_event() + _create_event(thread_id=thread_id) + _assert_counts(1, 0, 1, 0) + _rotate() + _assert_counts(1, 0, 1, 0) + + # Delete old event push actions, this should not affect the (summarised) count. + self.get_success(self.store._remove_old_push_actions_that_have_rotated()) + _assert_counts(1, 0, 1, 0) + + _mark_read(last_event_id) + _assert_counts(0, 0, 0, 0) + + _create_event(True) + _assert_counts(1, 1, 0, 0) + _rotate() + _assert_counts(1, 1, 0, 0) + + event_id = _create_event(True, thread_id) + _assert_counts(1, 1, 1, 1) + _rotate() + _assert_counts(1, 1, 1, 1) + + # Check that adding another notification and rotating after highlight + # works. + _create_event() + _rotate() + _assert_counts(2, 1, 1, 1) + + _create_event(thread_id=thread_id) + _rotate() + _assert_counts(2, 1, 2, 1) + + # Check that sending read receipts at different points results in the + # right counts. + _mark_read(event_id) + _assert_counts(1, 0, 1, 0) + _mark_read(event_id, "main") + _assert_counts(1, 0, 1, 0) + _mark_read(last_event_id, "main") + _assert_counts(0, 0, 1, 0) + _mark_read(last_event_id, thread_id) _assert_counts(0, 0, 0, 0) _create_event(True) From fb502444988bbe81c2b3774e614b273b131f7e26 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 28 Sep 2022 08:36:19 -0400 Subject: [PATCH 16/27] Use MAIN_TIMELINE constant in more places. --- synapse/storage/databases/main/event_push_actions.py | 2 +- tests/storage/test_event_push_actions.py | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 493fe8a15253..23f0eef10c27 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -563,7 +563,7 @@ def _get_thread(thread_id: str) -> NotifCounts: if thread_id not in summarised_threads: continue - if thread_id == "main": + if thread_id == MAIN_TIMELINE: counts.notify_count += notif_count counts.unread_count += unread_count elif thread_id in thread_counts: diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index 97dc1cc60a0f..6fa0cafb756f 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -16,6 +16,7 @@ from twisted.test.proto_helpers import MemoryReactor +from synapse.api.constants import MAIN_TIMELINE from synapse.rest import admin from synapse.rest.client import login, room from synapse.server import HomeServer @@ -312,7 +313,7 @@ def _create_event( def _rotate() -> None: self.get_success(self.store._rotate_notifs()) - def _mark_read(event_id: str, thread_id: str = "main") -> None: + def _mark_read(event_id: str, thread_id: str = MAIN_TIMELINE) -> None: self.get_success( self.store.insert_receipt( room_id, @@ -531,7 +532,7 @@ def _mark_read(event_id: str, thread_id: Optional[str] = None) -> None: _mark_read(event_id) _assert_counts(1, 0, 1, 0) - _mark_read(last_event_id, "main") + _mark_read(last_event_id, MAIN_TIMELINE) _mark_read(last_event_id, thread_id) _assert_counts(0, 0, 0, 0) @@ -572,9 +573,9 @@ def _mark_read(event_id: str, thread_id: Optional[str] = None) -> None: # right counts. _mark_read(event_id) _assert_counts(1, 0, 1, 0) - _mark_read(event_id, "main") + _mark_read(event_id, MAIN_TIMELINE) _assert_counts(1, 0, 1, 0) - _mark_read(last_event_id, "main") + _mark_read(last_event_id, MAIN_TIMELINE) _assert_counts(0, 0, 1, 0) _mark_read(last_event_id, thread_id) _assert_counts(0, 0, 0, 0) From d6d7788f7f1ac650bd70c26dbe4a13c65fa1413c Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 28 Sep 2022 11:31:35 -0400 Subject: [PATCH 17/27] Combine logic for processing receipts. --- .../databases/main/event_push_actions.py | 123 +++++++----------- 1 file changed, 46 insertions(+), 77 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 23f0eef10c27..0477a1e348c6 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -1222,81 +1222,33 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: ) rows = txn.fetchall() - # First handle all the rows without a thread ID (i.e. ones that apply to - # the entire room). - for _, room_id, user_id, thread_id, stream_ordering in rows: - # Only handle our own read receipts. - if not self.hs.is_mine_id(user_id): - continue - - if thread_id is not None: - continue - - # For each new read receipt we delete push actions from before it and - # recalculate the summary. - txn.execute( - """ - DELETE FROM event_push_actions - WHERE room_id = ? - AND user_id = ? - AND stream_ordering <= ? - AND highlight = 0 - """, - (room_id, user_id, stream_ordering), - ) - - # Fetch the notification counts between the stream ordering of the - # latest receipt and what was previously summarised. - unread_counts = self._get_notif_unread_count_for_user_room( - txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering - ) - - # First mark the summary for all threads in the room as cleared. - self.db_pool.simple_update_txn( - txn, - table="event_push_summary", - keyvalues={"user_id": user_id, "room_id": room_id}, - updatevalues={ - "notif_count": 0, - "unread_count": 0, - "stream_ordering": old_rotate_stream_ordering, - "last_receipt_stream_ordering": stream_ordering, - }, - ) - - # Then any updated threads get their notification count and unread - # count updated. - self.db_pool.simple_update_many_txn( - txn, - table="event_push_summary", - key_names=("room_id", "user_id", "thread_id"), - key_values=[(room_id, user_id, row[2]) for row in unread_counts], - value_names=("notif_count", "unread_count"), - value_values=[(row[0], row[1]) for row in unread_counts], - ) - # For each new read receipt we delete push actions from before it and # recalculate the summary. + # + # Care must be taken of whether it is a threaded or unthreaded receipt. for _, room_id, user_id, thread_id, stream_ordering in rows: # Only handle our own read receipts. if not self.hs.is_mine_id(user_id): continue - if thread_id is None: - continue + thread_clause = "" + thread_args: Tuple = () + if thread_id is not None: + thread_clause = "AND thread_id = ?" + thread_args = (thread_id,) # For each new read receipt we delete push actions from before it and # recalculate the summary. txn.execute( - """ + f""" DELETE FROM event_push_actions WHERE room_id = ? AND user_id = ? - AND thread_id = ? AND stream_ordering <= ? AND highlight = 0 + {thread_clause} """, - (room_id, user_id, thread_id, stream_ordering), + (room_id, user_id, stream_ordering, *thread_args), ) # Fetch the notification counts between the stream ordering of the @@ -1309,28 +1261,45 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: old_rotate_stream_ordering, thread_id, ) - # unread_counts will be a list of 0 or 1 items. - if unread_counts: - notif_count, unread_count, _ = unread_counts[0] - else: - notif_count = 0 - unread_count = 0 - # Update the summary of this specific thread. - self.db_pool.simple_upsert_txn( + # For an unthreaded receipt, mark the summary for all threads in the room + # as cleared. + if thread_id is None: + self.db_pool.simple_update_txn( + txn, + table="event_push_summary", + keyvalues={"user_id": user_id, "room_id": room_id}, + updatevalues={ + "notif_count": 0, + "unread_count": 0, + "stream_ordering": old_rotate_stream_ordering, + "last_receipt_stream_ordering": stream_ordering, + }, + ) + + # For a threaded receipt, we *always* want to update that receipt, + # event if there are no new notifications in that thread. This ensures + # the stream_ordering & last_receipt_stream_ordering are updated. + elif not unread_counts: + unread_counts = [(0, 0, thread_id)] + + # Then any updated threads get their notification count and unread + # count updated. + self.db_pool.simple_update_many_txn( txn, table="event_push_summary", - keyvalues={ - "room_id": room_id, - "user_id": user_id, - "thread_id": thread_id, - }, - values={ - "notif_count": notif_count, - "unread_count": unread_count, - "stream_ordering": old_rotate_stream_ordering, - "last_receipt_stream_ordering": stream_ordering, - }, + key_names=("room_id", "user_id", "thread_id"), + key_values=[(room_id, user_id, row[2]) for row in unread_counts], + value_names=( + "notif_count", + "unread_count", + "stream_ordering", + "last_receipt_stream_ordering", + ), + value_values=[ + (row[0], row[1], old_rotate_stream_ordering, stream_ordering) + for row in unread_counts + ], ) # We always update `event_push_summary_last_receipt_stream_id` to From 162bd8dcdf23c7a2b409396acba330fb3b550698 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 28 Sep 2022 12:00:40 -0400 Subject: [PATCH 18/27] Expand comment and rename variables for clarity. --- .../databases/main/event_push_actions.py | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 0477a1e348c6..e6e99f786964 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -418,7 +418,7 @@ def _get_unread_counts_by_pos_txn( txn: LoggingTransaction, room_id: str, user_id: str, - receipt_stream_ordering: int, + unthreaded_receipt_stream_ordering: int, ) -> RoomNotifCounts: """Get the number of unread messages for a user/room that have happened since the given stream ordering. @@ -427,7 +427,7 @@ def _get_unread_counts_by_pos_txn( txn: The database transaction. room_id: The room ID to get unread counts for. user_id: The user ID to get unread counts for. - receipt_stream_ordering: The stream ordering of the user's latest + unthreaded_receipt_stream_ordering: The stream ordering of the user's latest unthreaded receipt in the room. If there are no unthreaded receipts, the stream ordering of the user's join event. @@ -451,13 +451,17 @@ def _get_thread(thread_id: str) -> NotifCounts: (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE), ) + # A clause to get the latest receipt stream ordering taking into account + # both unthreaded and threaded receipts. This takes a single parameter: + # receipt_stream_ordering. + # # PostgreSQL and SQLite differ in comparing scalar numerics. if isinstance(self.database_engine, PostgresEngine): # GREATEST ignores NULLs. - receipt_stream_clause = "GREATEST(receipt_stream_ordering, ?)" + receipt_stream_clause = "GREATEST(threaded_receipt_stream_ordering, ?)" else: # MAX returns NULL if any are NULL, so COALESCE to 0 first. - receipt_stream_clause = "MAX(COALESCE(receipt_stream_ordering, 0), ?)" + receipt_stream_clause = "MAX(COALESCE(threaded_receipt_stream_ordering, 0), ?)" # First we pull the counts from the summary table. # @@ -477,7 +481,7 @@ def _get_thread(thread_id: str) -> NotifCounts: SELECT notif_count, COALESCE(unread_count, 0), thread_id FROM event_push_summary LEFT JOIN ( - SELECT thread_id, MAX(stream_ordering) AS receipt_stream_ordering + SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering FROM receipts_linearized LEFT JOIN events USING (room_id, event_id) WHERE @@ -498,8 +502,8 @@ def _get_thread(thread_id: str) -> NotifCounts: *receipts_args, room_id, user_id, - receipt_stream_ordering, - receipt_stream_ordering, + unthreaded_receipt_stream_ordering, + unthreaded_receipt_stream_ordering, ), ) summarised_threads = set() @@ -513,7 +517,7 @@ def _get_thread(thread_id: str) -> NotifCounts: sql = f""" SELECT COUNT(*), thread_id FROM event_push_actions LEFT JOIN ( - SELECT thread_id, MAX(stream_ordering) AS receipt_stream_ordering + SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering FROM receipts_linearized LEFT JOIN events USING (room_id, event_id) WHERE @@ -536,7 +540,7 @@ def _get_thread(thread_id: str) -> NotifCounts: *receipts_args, user_id, room_id, - receipt_stream_ordering, + unthreaded_receipt_stream_ordering, ), ) for highlight_count, thread_id in txn: @@ -590,7 +594,7 @@ def _get_thread(thread_id: str) -> NotifCounts: thread_id FROM event_push_actions LEFT JOIN ( - SELECT thread_id, MAX(stream_ordering) AS receipt_stream_ordering + SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering FROM receipts_linearized LEFT JOIN events USING (room_id, event_id) WHERE @@ -613,7 +617,7 @@ def _get_thread(thread_id: str) -> NotifCounts: *receipts_args, user_id, room_id, - receipt_stream_ordering, + unthreaded_receipt_stream_ordering, *thread_id_args, ), ) From e7b5421e47c174b139da2eef3e032d7a40f93cb2 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 28 Sep 2022 12:16:20 -0400 Subject: [PATCH 19/27] Clarify comment. --- .../storage/databases/main/event_push_actions.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index e6e99f786964..b5b1b5fc90dd 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -465,12 +465,16 @@ def _get_thread(thread_id: str) -> NotifCounts: # First we pull the counts from the summary table. # - # We check that `last_receipt_stream_ordering` matches the stream - # ordering given. If it doesn't match then a new read receipt has arrived and - # we haven't yet updated the counts in `event_push_summary` to reflect - # that; in that case we simply ignore `event_push_summary` counts - # and do a manual count of all of the rows in the `event_push_actions` table - # for this user/room. + # We check that `last_receipt_stream_ordering` matches the stream ordering of the + # latest receipt for the thread (which may be either the unthreaded read receipt + # or the threaded read receipt). + # + # If it doesn't match then a new read receipt has arrived and we haven't yet + # updated the counts in `event_push_summary` to reflect that; in that case we + # simply ignore `event_push_summary` counts. + # + # We then do a manual count of all of the rows in the `event_push_actions` table + # for any user/room/thread which did not have a valid summary found. # # If `last_receipt_stream_ordering` is null then that means it's up to # date (as the row was written by an older version of Synapse that From 5f5e9adcc0eb0bf0fe4ff3f815ccd814af59da2b Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 28 Sep 2022 12:22:49 -0400 Subject: [PATCH 20/27] Improve docstrings. --- synapse/storage/databases/main/event_push_actions.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index b5b1b5fc90dd..71df1943c95b 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -654,6 +654,11 @@ def _get_notif_unread_count_for_user_room( stream_ordering: The (exclusive) minimum stream ordering to consider. max_stream_ordering: The (inclusive) maximum stream ordering to consider. If this is not given, then no maximum is applied. + thread_id: The thread ID to fetch unread counts for. If this is not provided + then the results for *all* threads is returned. + + Note that if this is provided the resulting list will only have 0 or + 1 tuples in it. Return: A tuple of the notif count and unread count in the given range for @@ -676,6 +681,7 @@ def _get_notif_unread_count_for_user_room( if max_stream_ordering <= stream_ordering: return [] + # Either limit the results to a specific thread or fetch all threads. thread_id_clause = "" if thread_id is not None: thread_id_clause = "AND thread_id = ?" From 16a60b96854dddf47a37ed06ed1ba643a755922e Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 28 Sep 2022 12:24:27 -0400 Subject: [PATCH 21/27] Rename function. --- synapse/storage/databases/main/event_push_actions.py | 2 +- synapse/storage/databases/main/receipts.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 71df1943c95b..ad0cbc1c5499 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -383,7 +383,7 @@ def _get_unread_counts_by_receipt_txn( user_id: str, ) -> RoomNotifCounts: # Get the stream ordering of the user's latest receipt in the room. - result = self.get_last_receipt_for_user_txn( + result = self.get_last_unthreaded_receipt_for_user_txn( txn, user_id, room_id, diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index c7e812224869..86192bd8f3d7 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -151,7 +151,7 @@ async def get_last_receipt_event_id_for_user( """ result = await self.db_pool.runInteraction( "get_last_receipt_event_id_for_user", - self.get_last_receipt_for_user_txn, + self.get_last_unthreaded_receipt_for_user_txn, user_id, room_id, receipt_types, @@ -162,7 +162,7 @@ async def get_last_receipt_event_id_for_user( event_id, _ = result return event_id - def get_last_receipt_for_user_txn( + def get_last_unthreaded_receipt_for_user_txn( self, txn: LoggingTransaction, user_id: str, From f6a99c87e005d44c7ac4e5b44ed77b3cb896e776 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 28 Sep 2022 15:30:28 -0400 Subject: [PATCH 22/27] Lint --- synapse/storage/databases/main/event_push_actions.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index ad0cbc1c5499..4edb32ef3be8 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -461,7 +461,9 @@ def _get_thread(thread_id: str) -> NotifCounts: receipt_stream_clause = "GREATEST(threaded_receipt_stream_ordering, ?)" else: # MAX returns NULL if any are NULL, so COALESCE to 0 first. - receipt_stream_clause = "MAX(COALESCE(threaded_receipt_stream_ordering, 0), ?)" + receipt_stream_clause = ( + "MAX(COALESCE(threaded_receipt_stream_ordering, 0), ?)" + ) # First we pull the counts from the summary table. # From 6b2384d215864db4d9ec24f1e3d8db5a9a7339d2 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 29 Sep 2022 10:53:17 -0400 Subject: [PATCH 23/27] Fix typo. --- synapse/storage/databases/main/event_push_actions.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 32e8b8c47fe0..0300e57427dd 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -472,11 +472,11 @@ def _get_thread(thread_id: str) -> NotifCounts: # updated the counts in `event_push_summary` to reflect that; in that case we # simply ignore `event_push_summary` counts. # - # We then do a manual count of all of the rows in the `event_push_actions` table + # We then do a manual count of all the rows in the `event_push_actions` table # for any user/room/thread which did not have a valid summary found. # - # If `last_receipt_stream_ordering` is null then that means it's up to - # date (as the row was written by an older version of Synapse that + # If `last_receipt_stream_ordering` is null then that means it's up-to-date + # (as the row was written by an older version of Synapse that # updated `event_push_summary` synchronously when persisting a new read # receipt). txn.execute( From 041fe7f39bca11bcb250e5205ec57822d96bef40 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 29 Sep 2022 11:28:21 -0400 Subject: [PATCH 24/27] Only attempt to find threaded receipts newer than the latest unthreaded receipt. --- .../databases/main/event_push_actions.py | 29 +++++++------------ 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 0300e57427dd..69a5cf73afb1 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -95,7 +95,6 @@ DatabasePool, LoggingDatabaseConnection, LoggingTransaction, - PostgresEngine, ) from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.stream import StreamWorkerStore @@ -448,20 +447,6 @@ def _get_thread(thread_id: str) -> NotifCounts: (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE), ) - # A clause to get the latest receipt stream ordering taking into account - # both unthreaded and threaded receipts. This takes a single parameter: - # receipt_stream_ordering. - # - # PostgreSQL and SQLite differ in comparing scalar numerics. - if isinstance(self.database_engine, PostgresEngine): - # GREATEST ignores NULLs. - receipt_stream_clause = "GREATEST(threaded_receipt_stream_ordering, ?)" - else: - # MAX returns NULL if any are NULL, so COALESCE to 0 first. - receipt_stream_clause = ( - "MAX(COALESCE(threaded_receipt_stream_ordering, 0), ?)" - ) - # First we pull the counts from the summary table. # # We check that `last_receipt_stream_ordering` matches the stream ordering of the @@ -490,18 +475,20 @@ def _get_thread(thread_id: str) -> NotifCounts: WHERE user_id = ? AND room_id = ? + AND stream_ordering > ? AND {receipt_types_clause} GROUP BY thread_id ) AS receipts USING (thread_id) WHERE room_id = ? AND user_id = ? AND ( - (last_receipt_stream_ordering IS NULL AND stream_ordering > {receipt_stream_clause}) - OR last_receipt_stream_ordering = {receipt_stream_clause} + (last_receipt_stream_ordering IS NULL AND stream_ordering > COALESCE(threaded_receipt_stream_ordering, ?)) + OR last_receipt_stream_ordering = COALESCE(threaded_receipt_stream_ordering, ?) ) AND (notif_count != 0 OR COALESCE(unread_count, 0) != 0) """, ( user_id, room_id, + unthreaded_receipt_stream_ordering, *receipts_args, room_id, user_id, @@ -526,12 +513,13 @@ def _get_thread(thread_id: str) -> NotifCounts: WHERE user_id = ? AND room_id = ? + AND stream_ordering > ? AND {receipt_types_clause} GROUP BY thread_id ) AS receipts USING (thread_id) WHERE user_id = ? AND room_id = ? - AND stream_ordering > {receipt_stream_clause} + AND stream_ordering > COALESCE(threaded_receipt_stream_ordering, ?) AND highlight = 1 GROUP BY thread_id """ @@ -540,6 +528,7 @@ def _get_thread(thread_id: str) -> NotifCounts: ( user_id, room_id, + unthreaded_receipt_stream_ordering, *receipts_args, user_id, room_id, @@ -603,12 +592,13 @@ def _get_thread(thread_id: str) -> NotifCounts: WHERE user_id = ? AND room_id = ? + AND stream_ordering > ? AND {receipt_types_clause} GROUP BY thread_id ) AS receipts USING (thread_id) WHERE user_id = ? AND room_id = ? - AND stream_ordering > {receipt_stream_clause} + AND stream_ordering > COALESCE(threaded_receipt_stream_ordering, ?) AND NOT {thread_id_clause} GROUP BY thread_id """ @@ -617,6 +607,7 @@ def _get_thread(thread_id: str) -> NotifCounts: ( user_id, room_id, + unthreaded_receipt_stream_ordering, *receipts_args, user_id, room_id, From 0b1b432c8840824f21097be356b93ebf3787b4db Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 4 Oct 2022 08:27:00 -0400 Subject: [PATCH 25/27] Update for changes in develop. --- ...ns_backfill.sql => 06thread_notifications_backfill.sql} | 0 ...stgres => 07thread_notifications_not_null.sql.postgres} | 0 ...l.sqlite => 07thread_notifications_not_null.sql.sqlite} | 7 ++++--- 3 files changed, 4 insertions(+), 3 deletions(-) rename synapse/storage/schema/main/delta/73/{01thread_notifications_backfill.sql => 06thread_notifications_backfill.sql} (100%) rename synapse/storage/schema/main/delta/73/{02thread_notifications_not_null.sql.postgres => 07thread_notifications_not_null.sql.postgres} (100%) rename synapse/storage/schema/main/delta/73/{02thread_notifications_not_null.sql.sqlite => 07thread_notifications_not_null.sql.sqlite} (96%) diff --git a/synapse/storage/schema/main/delta/73/01thread_notifications_backfill.sql b/synapse/storage/schema/main/delta/73/06thread_notifications_backfill.sql similarity index 100% rename from synapse/storage/schema/main/delta/73/01thread_notifications_backfill.sql rename to synapse/storage/schema/main/delta/73/06thread_notifications_backfill.sql diff --git a/synapse/storage/schema/main/delta/73/02thread_notifications_not_null.sql.postgres b/synapse/storage/schema/main/delta/73/07thread_notifications_not_null.sql.postgres similarity index 100% rename from synapse/storage/schema/main/delta/73/02thread_notifications_not_null.sql.postgres rename to synapse/storage/schema/main/delta/73/07thread_notifications_not_null.sql.postgres diff --git a/synapse/storage/schema/main/delta/73/02thread_notifications_not_null.sql.sqlite b/synapse/storage/schema/main/delta/73/07thread_notifications_not_null.sql.sqlite similarity index 96% rename from synapse/storage/schema/main/delta/73/02thread_notifications_not_null.sql.sqlite rename to synapse/storage/schema/main/delta/73/07thread_notifications_not_null.sql.sqlite index 53e70ee153b1..7cc97cb9087f 100644 --- a/synapse/storage/schema/main/delta/73/02thread_notifications_not_null.sql.sqlite +++ b/synapse/storage/schema/main/delta/73/07thread_notifications_not_null.sql.sqlite @@ -24,7 +24,8 @@ CREATE TABLE event_push_actions_staging_new ( notif SMALLINT NOT NULL, highlight SMALLINT NOT NULL, unread SMALLINT, - thread_id TEXT NOT NULL + thread_id TEXT NOT NULL, + inserted_ts BIGINT ); CREATE TABLE event_push_actions_new ( @@ -68,8 +69,8 @@ CREATE INDEX event_push_actions_u_highlight ON event_push_actions_new (user_id, CREATE INDEX event_push_actions_highlights_index ON event_push_actions_new (user_id, room_id, topological_ordering, stream_ordering); -- Copy the data. -INSERT INTO event_push_actions_staging_new (event_id, user_id, actions, notif, highlight, unread, thread_id) - SELECT event_id, user_id, actions, notif, highlight, unread, thread_id +INSERT INTO event_push_actions_staging_new (event_id, user_id, actions, notif, highlight, unread, thread_id, inserted_ts) + SELECT event_id, user_id, actions, notif, highlight, unread, thread_id, inserted_ts FROM event_push_actions_staging; INSERT INTO event_push_actions_new (room_id, event_id, user_id, profile_tag, actions, topological_ordering, stream_ordering, notif, highlight, unread, thread_id) From 20ace0f11abc9da6c04add4ed903ca775fecbd79 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 4 Oct 2022 08:54:58 -0400 Subject: [PATCH 26/27] Update delta numbers. --- ...l.sql.postgres => 08thread_receipts_non_null.sql.postgres} | 0 ..._null.sql.sqlite => 08thread_receipts_non_null.sql.sqlite} | 4 ++-- 2 files changed, 2 insertions(+), 2 deletions(-) rename synapse/storage/schema/main/delta/73/{03thread_receipts_non_null.sql.postgres => 08thread_receipts_non_null.sql.postgres} (100%) rename synapse/storage/schema/main/delta/73/{03thread_receipts_non_null.sql.sqlite => 08thread_receipts_non_null.sql.sqlite} (96%) diff --git a/synapse/storage/schema/main/delta/73/03thread_receipts_non_null.sql.postgres b/synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.postgres similarity index 100% rename from synapse/storage/schema/main/delta/73/03thread_receipts_non_null.sql.postgres rename to synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.postgres diff --git a/synapse/storage/schema/main/delta/73/03thread_receipts_non_null.sql.sqlite b/synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.sqlite similarity index 96% rename from synapse/storage/schema/main/delta/73/03thread_receipts_non_null.sql.sqlite rename to synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.sqlite index 328a694dc0a0..e664889fbce8 100644 --- a/synapse/storage/schema/main/delta/73/03thread_receipts_non_null.sql.sqlite +++ b/synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.sqlite @@ -69,8 +69,8 @@ CREATE INDEX receipts_linearized_user ON receipts_linearized( user_id ); -- Re-run background updates from 72/08thread_receipts.sql. INSERT INTO background_updates (ordering, update_name, progress_json) VALUES - (7303, 'receipts_linearized_unique_index', '{}') + (7308, 'receipts_linearized_unique_index', '{}') ON CONFLICT (update_name) DO NOTHING; INSERT INTO background_updates (ordering, update_name, progress_json) VALUES - (7303, 'receipts_graph_unique_index', '{}') + (7308, 'receipts_graph_unique_index', '{}') ON CONFLICT (update_name) DO NOTHING; From bd6c80c07da4a9e0ac269ff30c67c22e377762da Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 4 Oct 2022 08:55:25 -0400 Subject: [PATCH 27/27] Update background index numbers. --- .../main/delta/73/07thread_notifications_not_null.sql.sqlite | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/schema/main/delta/73/07thread_notifications_not_null.sql.sqlite b/synapse/storage/schema/main/delta/73/07thread_notifications_not_null.sql.sqlite index 7cc97cb9087f..5322ad77a4d7 100644 --- a/synapse/storage/schema/main/delta/73/07thread_notifications_not_null.sql.sqlite +++ b/synapse/storage/schema/main/delta/73/07thread_notifications_not_null.sql.sqlite @@ -94,8 +94,8 @@ ALTER TABLE event_push_summary_new RENAME TO event_push_summary; -- Re-run background updates from 72/02event_push_actions_index.sql and -- 72/06thread_notifications.sql. INSERT INTO background_updates (ordering, update_name, progress_json) VALUES - (7302, 'event_push_summary_unique_index2', '{}') + (7307, 'event_push_summary_unique_index2', '{}') ON CONFLICT (update_name) DO NOTHING; INSERT INTO background_updates (ordering, update_name, progress_json) VALUES - (7302, 'event_push_actions_stream_highlight_index', '{}') + (7307, 'event_push_actions_stream_highlight_index', '{}') ON CONFLICT (update_name) DO NOTHING;