Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Optimise _get_state_after_missing_prev_event: use /state #12040

Merged
merged 3 commits into from
Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12040.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Optimise fetching large quantities of missing room state over federation.
43 changes: 39 additions & 4 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -897,10 +897,24 @@ async def _get_state_after_missing_prev_event(
logger.debug("We are also missing %i auth events", len(missing_auth_events))

missing_events = missing_desired_events | missing_auth_events
logger.debug("Fetching %i events from remote", len(missing_events))
await self._get_events_and_persist(
destination=destination, room_id=room_id, event_ids=missing_events
)

# Making an individual request for each of 1000s of events has a lot of
# overhead. On the other hand, we don't really want to fetch all of the events
# if we already have most of them.
#
# As an arbitrary heuristic, if we are missing more than 10% of the events, then
# we fetch the whole state.
#
# TODO: might it be better to have an API which lets us do an aggregate event
# request
if (len(missing_events) * 10) >= len(auth_event_ids) + len(state_event_ids):
logger.debug("Requesting complete state from remote")
await self._get_state_and_persist(destination, room_id, event_id)
else:
logger.debug("Fetching %i events from remote", len(missing_events))
await self._get_events_and_persist(
destination=destination, room_id=room_id, event_ids=missing_events
)

# we need to make sure we re-load from the database to get the rejected
# state correct.
Expand Down Expand Up @@ -959,6 +973,27 @@ async def _get_state_after_missing_prev_event(

return remote_state

async def _get_state_and_persist(
self, destination: str, room_id: str, event_id: str
) -> None:
"""Get the complete room state at a given event, and persist any new events
as outliers"""
room_version = await self._store.get_room_version(room_id)
auth_events, state_events = await self._federation_client.get_room_state(
destination, room_id, event_id=event_id, room_version=room_version
)
logger.info("/state returned %i events", len(auth_events) + len(state_events))

await self._auth_and_persist_outliers(
room_id, itertools.chain(auth_events, state_events)
)

# we also need the event itself.
if not await self._store.have_seen_event(room_id, event_id):
await self._get_events_and_persist(
destination=destination, room_id=room_id, event_ids=(event_id,)
)

async def _process_received_pdu(
self,
origin: str,
Expand Down
8 changes: 3 additions & 5 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
Dict,
Iterable,
List,
NoReturn,
Optional,
Set,
Tuple,
Expand Down Expand Up @@ -1330,10 +1329,9 @@ def have_seen_events_txn(
return results

@cached(max_entries=100000, tree=True)
async def have_seen_event(self, room_id: str, event_id: str) -> NoReturn:
# this only exists for the benefit of the @cachedList descriptor on
# _have_seen_events_dict
raise NotImplementedError()
async def have_seen_event(self, room_id: str, event_id: str) -> bool:
res = await self._have_seen_events_dict(((room_id, event_id),))
return res[(room_id, event_id)]

def _get_current_state_event_counts_txn(
self, txn: LoggingTransaction, room_id: str
Expand Down
225 changes: 225 additions & 0 deletions tests/handlers/test_federation_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import mock

from synapse.events import make_event_from_dict
from synapse.events.snapshot import EventContext
from synapse.federation.transport.client import StateRequestResponse
from synapse.logging.context import LoggingContext
from synapse.rest import admin
from synapse.rest.client import login, room

from tests import unittest
from tests.test_utils import event_injection, make_awaitable


class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
servlets = [
admin.register_servlets,
login.register_servlets,
room.register_servlets,
]

def make_homeserver(self, reactor, clock):
# mock out the federation transport client
self.mock_federation_transport_client = mock.Mock(
spec=["get_room_state_ids", "get_room_state", "get_event"]
)
return super().setup_test_homeserver(
federation_transport_client=self.mock_federation_transport_client
)

def test_process_pulled_event_with_missing_state(self) -> None:
"""Ensure that we correctly handle pulled events with lots of missing state

In this test, we pretend we are processing a "pulled" event (eg, via backfill
or get_missing_events). The pulled event has a prev_event we haven't previously
seen, so the server requests the state at that prev_event. There is a lot
of state we don't have, so we expect the server to make a /state request.

We check that the pulled event is correctly persisted, and that the state is
as we expect.
"""
return self._test_process_pulled_event_with_missing_state(False)

def test_process_pulled_event_with_missing_state_where_prev_is_outlier(
self,
) -> None:
"""Ensure that we correctly handle pulled events with lots of missing state

A slight modification to test_process_pulled_event_with_missing_state. Again
we have a "pulled" event which refers to a prev_event with lots of state,
but in this case we already have the prev_event (as an outlier, obviously -
if it were a regular event, we wouldn't need to request the state).
"""
return self._test_process_pulled_event_with_missing_state(True)

def _test_process_pulled_event_with_missing_state(
self, prev_exists_as_outlier: bool
) -> None:
OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
main_store = self.hs.get_datastores().main
state_storage = self.hs.get_storage().state

# create the room
user_id = self.register_user("kermit", "test")
tok = self.login("kermit", "test")
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
room_version = self.get_success(main_store.get_room_version(room_id))

# allow the remote user to send state events
self.helper.send_state(
room_id,
"m.room.power_levels",
{"events_default": 0, "state_default": 0},
tok=tok,
)

# add the remote user to the room
member_event = self.get_success(
event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join")
)

initial_state_map = self.get_success(main_store.get_current_state_ids(room_id))

auth_event_ids = [
initial_state_map[("m.room.create", "")],
initial_state_map[("m.room.power_levels", "")],
initial_state_map[("m.room.join_rules", "")],
member_event.event_id,
]

# mock up a load of state events which we are missing
state_events = [
make_event_from_dict(
self.add_hashes_and_signatures(
{
"type": "test_state_type",
"state_key": f"state_{i}",
"room_id": room_id,
"sender": OTHER_USER,
"prev_events": [member_event.event_id],
"auth_events": auth_event_ids,
"origin_server_ts": 1,
"depth": 10,
"content": {"body": f"state_{i}"},
}
),
room_version,
)
for i in range(1, 10)
]

# this is the state that we are going to claim is active at the prev_event.
state_at_prev_event = state_events + self.get_success(
main_store.get_events_as_list(initial_state_map.values())
)

# mock up a prev event.
# Depending on the test, we either persist this upfront (as an outlier),
# or let the server request it.
prev_event = make_event_from_dict(
self.add_hashes_and_signatures(
{
"type": "test_regular_type",
"room_id": room_id,
"sender": OTHER_USER,
"prev_events": [],
"auth_events": auth_event_ids,
"origin_server_ts": 1,
"depth": 11,
"content": {"body": "missing_prev"},
}
),
room_version,
)
if prev_exists_as_outlier:
prev_event.internal_metadata.outlier = True
persistence = self.hs.get_storage().persistence
self.get_success(
persistence.persist_event(prev_event, EventContext.for_outlier())
)
else:

async def get_event(destination: str, event_id: str, timeout=None):
self.assertEqual(destination, self.OTHER_SERVER_NAME)
self.assertEqual(event_id, prev_event.event_id)
return {"pdus": [prev_event.get_pdu_json()]}

self.mock_federation_transport_client.get_event.side_effect = get_event

# mock up a regular event to pass into _process_pulled_event
pulled_event = make_event_from_dict(
self.add_hashes_and_signatures(
{
"type": "test_regular_type",
"room_id": room_id,
"sender": OTHER_USER,
"prev_events": [prev_event.event_id],
"auth_events": auth_event_ids,
"origin_server_ts": 1,
"depth": 12,
"content": {"body": "pulled"},
}
),
room_version,
)

# we expect an outbound request to /state_ids, so stub that out
self.mock_federation_transport_client.get_room_state_ids.return_value = (
make_awaitable(
{
"pdu_ids": [e.event_id for e in state_at_prev_event],
"auth_chain_ids": [],
}
)
)

# we also expect an outbound request to /state
self.mock_federation_transport_client.get_room_state.return_value = (
make_awaitable(
StateRequestResponse(auth_events=[], state=state_at_prev_event)
)
)

# we have to bump the clock a bit, to keep the retry logic in
# FederationClient.get_pdu happy
self.reactor.advance(60000)

# Finally, the call under test: send the pulled event into _process_pulled_event
with LoggingContext("test"):
self.get_success(
self.hs.get_federation_event_handler()._process_pulled_event(
self.OTHER_SERVER_NAME, pulled_event, backfilled=False
)
)

# check that the event is correctly persisted
persisted = self.get_success(main_store.get_event(pulled_event.event_id))
self.assertIsNotNone(persisted, "pulled event was not persisted at all")
self.assertFalse(
persisted.internal_metadata.is_outlier(), "pulled event was an outlier"
)

# check that the state at that event is as expected
state = self.get_success(
state_storage.get_state_ids_for_event(pulled_event.event_id)
)
expected_state = {
(e.type, e.state_key): e.event_id for e in state_at_prev_event
}
self.assertEqual(state, expected_state)

if prev_exists_as_outlier:
self.mock_federation_transport_client.get_event.assert_not_called()