Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sliding Sync: Add receipts extension (MSC3960) #17489

Merged
merged 18 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17489.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add receipts extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
4 changes: 3 additions & 1 deletion synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,10 @@ async def get_new_events(
room_ids: Iterable[str],
is_guest: bool,
explicit_room_id: Optional[str] = None,
to_key: Optional[MultiWriterStreamToken] = None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it okay to add the to_key arg to the get_new_events(...) signature even though there is a EventSource.get_new_events(...) abstract method?

I noticed the presence variation seems to do it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine to add extra optional arguments to overrides (as its still compatible wit the base class)

) -> Tuple[List[JsonMapping], MultiWriterStreamToken]:
to_key = self.get_current_key()
if to_key is None:
to_key = self.get_current_key()

if from_key == to_key:
return [], to_key
Expand Down
251 changes: 198 additions & 53 deletions synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
DeviceListUpdates,
JsonDict,
JsonMapping,
MultiWriterStreamToken,
PersistedEventPosition,
Requester,
RoomStreamToken,
Expand Down Expand Up @@ -640,7 +641,12 @@ async def handle_room(room_id: str) -> None:

extensions = await self.get_extensions_response(
sync_config=sync_config,
lists=lists,
actual_lists=lists,
# TODO: Once https://github.com/element-hq/synapse/pull/17479 merges, this
# will need to be updated to make sure it includes everything before the
# pre-filter on `relevant_room_map`.
actual_room_ids=set(rooms.keys()),
actual_room_response_map=rooms,
from_token=from_token,
to_token=to_token,
)
Expand Down Expand Up @@ -1846,15 +1852,21 @@ async def get_room_sync_data(
async def get_extensions_response(
self,
sync_config: SlidingSyncConfig,
lists: Dict[str, SlidingSyncResult.SlidingWindowList],
actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
actual_room_ids: Set[str],
actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult],
to_token: StreamToken,
from_token: Optional[SlidingSyncStreamToken],
) -> SlidingSyncResult.Extensions:
"""Handle extension requests.

Args:
sync_config: Sync configuration
lists: Sliding window API. A map of list key to list results.
actual_lists: Sliding window API. A map of list key to list results in the
Sliding Sync response.
actual_room_ids: The actual room IDs in the the Sliding Sync response.
actual_room_response_map: A map of room ID to room results in the the
Sliding Sync response.
to_token: The point in the stream to sync up to.
from_token: The point in the stream to sync from.
"""
Expand Down Expand Up @@ -1883,18 +1895,103 @@ async def get_extensions_response(
if sync_config.extensions.account_data is not None:
account_data_response = await self.get_account_data_extension_response(
sync_config=sync_config,
lists=lists,
actual_lists=actual_lists,
actual_room_ids=actual_room_ids,
account_data_request=sync_config.extensions.account_data,
to_token=to_token,
from_token=from_token,
)

receipts_response = None
if sync_config.extensions.receipts is not None:
receipts_response = await self.get_receipts_extension_response(
sync_config=sync_config,
actual_lists=actual_lists,
actual_room_ids=actual_room_ids,
actual_room_response_map=actual_room_response_map,
receipts_request=sync_config.extensions.receipts,
to_token=to_token,
from_token=from_token,
)

return SlidingSyncResult.Extensions(
to_device=to_device_response,
e2ee=e2ee_response,
account_data=account_data_response,
receipts=receipts_response,
)

def find_relevant_room_ids_for_extension(
self,
requested_lists: Optional[List[str]],
requested_room_ids: Optional[List[str]],
actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
actual_room_ids: Set[str],
) -> Set[str]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This functionality is tested by test_extensions_lists_rooms_relevant_rooms

"""
Handle the reserved `lists`/`rooms` keys for extensions. Extensions should only
return results for rooms in the Sliding Sync response. This matches up the
requested rooms/lists with the actual lists/rooms in the Sliding Sync response.

{"lists": []} // Do not process any lists.
{"lists": ["rooms", "dms"]} // Process only a subset of lists.
{"lists": ["*"]} // Process all lists defined in the Sliding Window API. (This is the default.)

{"rooms": []} // Do not process any specific rooms.
{"rooms": ["!a:b", "!c:d"]} // Process only a subset of room subscriptions.
{"rooms": ["*"]} // Process all room subscriptions defined in the Room Subscription API. (This is the default.)

Args:
requested_lists: The `lists` from the extension request.
requested_room_ids: The `rooms` from the extension request.
actual_lists: The actual lists from the Sliding Sync response.
actual_room_ids: The actual room subscriptions from the Sliding Sync request.
"""

# We only want to include account data for rooms that are already in the sliding
# sync response AND that were requested in the account data request.
relevant_room_ids: Set[str] = set()

# See what rooms from the room subscriptions we should get account data for
if requested_room_ids is not None:
for room_id in requested_room_ids:
# A wildcard means we process all rooms from the room subscriptions
if room_id == "*":
relevant_room_ids.update(actual_room_ids)
break

if room_id in actual_room_ids:
relevant_room_ids.add(room_id)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

# See what rooms from the sliding window lists we should get account data for
if requested_lists is not None:
for list_key in requested_lists:
# Just some typing because we share the variable name in multiple places
actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None

# A wildcard means we process rooms from all lists
if list_key == "*":
for actual_list in actual_lists.values():
# We only expect a single SYNC operation for any list
assert len(actual_list.ops) == 1
sync_op = actual_list.ops[0]
assert sync_op.op == OperationType.SYNC
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should find a better way of tracking the rooms than via the sync ops at some point, given we want to remove them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we remove the ops, we will just get a straight list here instead of needing to dive a few levels into the ops. Doesn't seem like a problem


relevant_room_ids.update(sync_op.room_ids)

break

actual_list = actual_lists.get(list_key)
if actual_list is not None:
# We only expect a single SYNC operation for any list
assert len(actual_list.ops) == 1
sync_op = actual_list.ops[0]
assert sync_op.op == OperationType.SYNC

relevant_room_ids.update(sync_op.room_ids)

return relevant_room_ids

async def get_to_device_extension_response(
self,
sync_config: SlidingSyncConfig,
Expand Down Expand Up @@ -2022,7 +2119,8 @@ async def get_e2ee_extension_response(
async def get_account_data_extension_response(
self,
sync_config: SlidingSyncConfig,
lists: Dict[str, SlidingSyncResult.SlidingWindowList],
actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
actual_room_ids: Set[str],
account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension,
to_token: StreamToken,
from_token: Optional[SlidingSyncStreamToken],
Expand All @@ -2031,7 +2129,9 @@ async def get_account_data_extension_response(

Args:
sync_config: Sync configuration
lists: Sliding window API. A map of list key to list results.
actual_lists: Sliding window API. A map of list key to list results in the
Sliding Sync response.
actual_room_ids: The actual room IDs in the the Sliding Sync response.
account_data_request: The account_data extension from the request
to_token: The point in the stream to sync up to.
from_token: The point in the stream to sync from.
Expand All @@ -2044,6 +2144,7 @@ async def get_account_data_extension_response(

global_account_data_map: Mapping[str, JsonMapping] = {}
if from_token is not None:
# TODO: This should take into account the `from_token` and `to_token`
global_account_data_map = (
await self.store.get_updated_global_account_data_for_user(
user_id, from_token.stream_token.account_data_key
Expand All @@ -2055,76 +2156,40 @@ async def get_account_data_extension_response(
)
if have_push_rules_changed:
global_account_data_map = dict(global_account_data_map)
# TODO: This should take into account the `from_token` and `to_token`
global_account_data_map[AccountDataTypes.PUSH_RULES] = (
await self.push_rules_handler.push_rules_for_user(sync_config.user)
)
else:
# TODO: This should take into account the `to_token`
all_global_account_data = await self.store.get_global_account_data_for_user(
user_id
)

global_account_data_map = dict(all_global_account_data)
# TODO: This should take into account the `to_token`
global_account_data_map[AccountDataTypes.PUSH_RULES] = (
await self.push_rules_handler.push_rules_for_user(sync_config.user)
)

# We only want to include account data for rooms that are already in the sliding
# sync response AND that were requested in the account data request.
relevant_room_ids: Set[str] = set()

# See what rooms from the room subscriptions we should get account data for
if (
account_data_request.rooms is not None
and sync_config.room_subscriptions is not None
):
actual_room_ids = sync_config.room_subscriptions.keys()

for room_id in account_data_request.rooms:
# A wildcard means we process all rooms from the room subscriptions
if room_id == "*":
relevant_room_ids.update(sync_config.room_subscriptions.keys())
break

if room_id in actual_room_ids:
relevant_room_ids.add(room_id)

# See what rooms from the sliding window lists we should get account data for
if account_data_request.lists is not None:
for list_key in account_data_request.lists:
# Just some typing because we share the variable name in multiple places
actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None

# A wildcard means we process rooms from all lists
if list_key == "*":
for actual_list in lists.values():
# We only expect a single SYNC operation for any list
assert len(actual_list.ops) == 1
sync_op = actual_list.ops[0]
assert sync_op.op == OperationType.SYNC

relevant_room_ids.update(sync_op.room_ids)

break

actual_list = lists.get(list_key)
if actual_list is not None:
# We only expect a single SYNC operation for any list
assert len(actual_list.ops) == 1
sync_op = actual_list.ops[0]
assert sync_op.op == OperationType.SYNC

relevant_room_ids.update(sync_op.room_ids)

# Fetch room account data
account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] = {}
relevant_room_ids = self.find_relevant_room_ids_for_extension(
requested_lists=account_data_request.lists,
requested_room_ids=account_data_request.rooms,
actual_lists=actual_lists,
actual_room_ids=actual_room_ids,
)
if len(relevant_room_ids) > 0:
if from_token is not None:
# TODO: This should take into account the `from_token` and `to_token`
account_data_by_room_map = (
await self.store.get_updated_room_account_data_for_user(
user_id, from_token.stream_token.account_data_key
)
)
else:
# TODO: This should take into account the `to_token`
account_data_by_room_map = (
await self.store.get_room_account_data_for_user(user_id)
)
Expand All @@ -2141,6 +2206,86 @@ async def get_account_data_extension_response(
account_data_by_room_map=account_data_by_room_map,
)

async def get_receipts_extension_response(
self,
sync_config: SlidingSyncConfig,
actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
actual_room_ids: Set[str],
actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult],
receipts_request: SlidingSyncConfig.Extensions.ReceiptsExtension,
to_token: StreamToken,
from_token: Optional[SlidingSyncStreamToken],
) -> Optional[SlidingSyncResult.Extensions.ReceiptsExtension]:
"""Handle Receipts extension (MSC3960)

Args:
sync_config: Sync configuration
actual_lists: Sliding window API. A map of list key to list results in the
Sliding Sync response.
actual_room_ids: The actual room IDs in the the Sliding Sync response.
actual_room_response_map: A map of room ID to room results in the the
Sliding Sync response.
account_data_request: The account_data extension from the request
to_token: The point in the stream to sync up to.
from_token: The point in the stream to sync from.
"""
# Skip if the extension is not enabled
if not receipts_request.enabled:
return None

relevant_room_ids = self.find_relevant_room_ids_for_extension(
requested_lists=receipts_request.lists,
requested_room_ids=receipts_request.rooms,
actual_lists=actual_lists,
actual_room_ids=actual_room_ids,
)

room_id_to_receipt_map: Dict[str, JsonMapping] = {}
if len(relevant_room_ids) > 0:
receipt_source = self.event_sources.sources.receipt
receipts, _ = await receipt_source.get_new_events(
user=sync_config.user,
from_key=(
from_token.stream_token.receipt_key
if from_token
else MultiWriterStreamToken(stream=0)
),
to_key=to_token.receipt_key,
# This is a dummy value and isn't used in the function
limit=0,
room_ids=relevant_room_ids,
is_guest=False,
)

for receipt in receipts:
# These fields should exist for every receipt
room_id = receipt["room_id"]
type = receipt["type"]
content = receipt["content"]

room_result = actual_room_response_map.get(room_id)
if room_result is not None:
if room_result.initial:
# TODO: In the future, it would be good to fetch less receipts
# out of the database in the first place but we would need to
# add a new `event_id` index to `receipts_linearized`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW it won't necessarily need an index, doing filtering in the DB still reduces the amount of work that needs to be done.

relevant_event_ids = [
event.event_id for event in room_result.timeline_events
]

assert isinstance(content, dict)
content = {
event_id: content_value
for event_id, content_value in content.items()
if event_id in relevant_event_ids
}

room_id_to_receipt_map[room_id] = {"type": type, "content": content}
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

return SlidingSyncResult.Extensions.ReceiptsExtension(
room_id_to_receipt_map=room_id_to_receipt_map,
)


class HaveSentRoomFlag(Enum):
"""Flag for whether we have sent the room down a sliding sync connection.
Expand Down
6 changes: 6 additions & 0 deletions synapse/rest/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,12 @@ async def encode_extensions(
},
}

if extensions.receipts is not None:
serialized_extensions["receipts"] = {
# Same as the the top-level `account_data.events` field in Sync v2.
"rooms": extensions.receipts.room_id_to_receipt_map,
}

return serialized_extensions


Expand Down
Loading
Loading