Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Reduce amount of state we pull out when attempting to send catchup PDUs. #12963

Merged
merged 3 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12963.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce the amount of state we pull from the DB.
31 changes: 20 additions & 11 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import ReadReceipt
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.visibility import filter_events_for_server

if TYPE_CHECKING:
import synapse.server
Expand Down Expand Up @@ -77,6 +78,7 @@ def __init__(
):
self._server_name = hs.hostname
self._clock = hs.get_clock()
self._storage_controllers = hs.get_storage_controllers()
self._store = hs.get_datastores().main
self._transaction_manager = transaction_manager
self._instance_name = hs.get_instance_name()
Expand Down Expand Up @@ -442,6 +444,12 @@ async def _catch_up_transmission_loop(self) -> None:
"This should not happen." % event_ids
)

logger.info(
"Catching up destination %s with %d PDUs",
self._destination,
len(catchup_pdus),
)

# We send transactions with events from one room only, as its likely
# that the remote will have to do additional processing, which may
# take some time. It's better to give it small amounts of work
Expand Down Expand Up @@ -487,19 +495,20 @@ async def _catch_up_transmission_loop(self) -> None:
):
continue

# Filter out events where the server is not in the room,
# e.g. it may have left/been kicked. *Ideally* we'd pull
# out the kick and send that, but it's a rare edge case
# so we don't bother for now (the server that sent the
# kick should send it out if its online).
hosts = await self._state.get_hosts_in_room_at_events(
p.room_id, [p.event_id]
)
if self._destination not in hosts:
continue

new_pdus.append(p)

# Filter out events where the server is not in the room,
# e.g. it may have left/been kicked. *Ideally* we'd pull
# out the kick and send that, but it's a rare edge case
# so we don't bother for now (the server that sent the
# kick should send it out if its online).
new_pdus = await filter_events_for_server(
self._storage_controllers,
self._destination,
new_pdus,
redact=False,
)

# If we've filtered out all the extremities, fall back to
# sending the original event. This should ensure that the
# server gets at least some of missed events (especially if
Expand Down