diff --git a/changelog.d/7565.bugfix b/changelog.d/7565.bugfix new file mode 100644 index 000000000000..35fabb9a0b4d --- /dev/null +++ b/changelog.d/7565.bugfix @@ -0,0 +1 @@ +Fix exception `'GenericWorkerReplicationHandler' object has no attribute 'send_federation_ack'`, introduced in v1.13.0. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 2906b93f6a74..440341cb3cef 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -17,7 +17,7 @@ import contextlib import logging import sys -from typing import Dict, Iterable +from typing import Dict, Iterable, Optional, Set from typing_extensions import ContextManager @@ -677,10 +677,9 @@ def __init__(self, hs): self.notify_pushers = hs.config.start_pushers self.pusher_pool = hs.get_pusherpool() + self.send_handler = None # type: Optional[FederationSenderHandler] if hs.config.send_federation: - self.send_handler = FederationSenderHandler(hs, self) - else: - self.send_handler = None + self.send_handler = FederationSenderHandler(hs) async def on_rdata(self, stream_name, instance_name, token, rows): await super().on_rdata(stream_name, instance_name, token, rows) @@ -718,7 +717,7 @@ async def _process_and_notify(self, stream_name, instance_name, token, rows): if entities: self.notifier.on_new_event("to_device_key", token, users=entities) elif stream_name == DeviceListsStream.NAME: - all_room_ids = set() + all_room_ids = set() # type: Set[str] for row in rows: if row.entity.startswith("@"): room_ids = await self.store.get_rooms_for_user(row.entity) @@ -769,24 +768,33 @@ def on_remote_server_up(self, server: str): class FederationSenderHandler(object): - """Processes the replication stream and forwards the appropriate entries - to the federation sender. + """Processes the fedration replication stream + + This class is only instantiate on the worker responsible for sending outbound + federation transactions. It receives rows from the replication stream and forwards + the appropriate entries to the FederationSender class. """ - def __init__(self, hs: GenericWorkerServer, replication_client): + def __init__(self, hs: GenericWorkerServer): self.store = hs.get_datastore() self._is_mine_id = hs.is_mine_id self.federation_sender = hs.get_federation_sender() - self.replication_client = replication_client - + self._hs = hs + + # if the worker is restarted, we want to pick up where we left off in + # the replication stream, so load the position from the database. + # + # XXX is this actually worthwhile? Whenever the master is restarted, we'll + # drop some rows anyway (which is mostly fine because we're only dropping + # typing and presence notifications). If the replication stream is + # unreliable, why do we do all this hoop-jumping to store the position in the + # database? See also https://github.com/matrix-org/synapse/issues/7535. + # self.federation_position = self.store.federation_out_pos_startup - self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer") + self._fed_position_linearizer = Linearizer(name="_fed_position_linearizer") self._last_ack = self.federation_position - self._room_serials = {} - self._room_typing = {} - def on_start(self): # There may be some events that are persisted but haven't been sent, # so send them now. @@ -849,22 +857,34 @@ async def _on_new_receipts(self, rows): await self.federation_sender.send_read_receipt(receipt_info) async def update_token(self, token): + """Update the record of where we have processed to in the federation stream. + + Called after we have processed a an update received over replication. Sends + a FEDERATION_ACK back to the master, and stores the token that we have processed + in `federation_stream_position` so that we can restart where we left off. + """ try: self.federation_position = token # We linearize here to ensure we don't have races updating the token + # + # XXX this appears to be redundant, since the ReplicationCommandHandler + # has a linearizer which ensures that we only process one line of + # replication data at a time. Should we remove it, or is it doing useful + # service for robustness? Or could we replace it with an assertion that + # we're not being re-entered? + with (await self._fed_position_linearizer.queue(None)): - if self._last_ack < self.federation_position: - await self.store.update_federation_out_pos( - "federation", self.federation_position - ) + await self.store.update_federation_out_pos( + "federation", self.federation_position + ) - # We ACK this token over replication so that the master can drop - # its in memory queues - self.replication_client.send_federation_ack( - self.federation_position - ) - self._last_ack = self.federation_position + # We ACK this token over replication so that the master can drop + # its in memory queues + self._hs.get_tcp_replication().send_federation_ack( + self.federation_position + ) + self._last_ack = self.federation_position except Exception: logger.exception("Error updating federation stream position") diff --git a/tests/replication/test_federation_ack.py b/tests/replication/test_federation_ack.py new file mode 100644 index 000000000000..5448d9f0dc3b --- /dev/null +++ b/tests/replication/test_federation_ack.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from synapse.app.generic_worker import GenericWorkerServer +from synapse.replication.tcp.commands import FederationAckCommand +from synapse.replication.tcp.protocol import AbstractConnection +from synapse.replication.tcp.streams.federation import FederationStream + +from tests.unittest import HomeserverTestCase + + +class FederationAckTestCase(HomeserverTestCase): + def default_config(self) -> dict: + config = super().default_config() + config["worker_app"] = "synapse.app.federation_sender" + config["send_federation"] = True + return config + + def make_homeserver(self, reactor, clock): + hs = self.setup_test_homeserver(homeserverToUse=GenericWorkerServer) + return hs + + def test_federation_ack_sent(self): + """A FEDERATION_ACK should be sent back after each RDATA federation + + This test checks that the federation sender is correctly sending back + FEDERATION_ACK messages. The test works by spinning up a federation_sender + worker server, and then fishing out its ReplicationCommandHandler. We wire + the RCH up to a mock connection (so that we can observe the command being sent) + and then poke in an RDATA row. + + XXX: it might be nice to do this by pretending to be a synapse master worker + (or a redis server), and having the worker connect to us via a mocked-up TCP + transport, rather than assuming that the implementation has a + ReplicationCommandHandler. + """ + rch = self.hs.get_tcp_replication() + + # wire up the ReplicationCommandHandler to a mock connection + mock_connection = mock.Mock(spec=AbstractConnection) + rch.new_connection(mock_connection) + + # tell it it received an RDATA row + self.get_success( + rch.on_rdata( + "federation", + "master", + token=10, + rows=[FederationStream.FederationStreamRow(type="x", data=[1, 2, 3])], + ) + ) + + # now check that the FEDERATION_ACK was sent + mock_connection.send_command.assert_called_once() + cmd = mock_connection.send_command.call_args[0][0] + assert isinstance(cmd, FederationAckCommand) + self.assertEqual(cmd.token, 10)