diff --git a/tests/replication/tcp/streams/_base.py b/tests/replication/tcp/streams/_base.py index 8ff02479261a..f412164415f1 100644 --- a/tests/replication/tcp/streams/_base.py +++ b/tests/replication/tcp/streams/_base.py @@ -15,6 +15,8 @@ from mock import Mock +from synapse.app.generic_worker import GenericWorkerServer +from synapse.replication.tcp.client import ReplicationDataHandler from synapse.replication.tcp.handler import ReplicationCommandHandler from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory @@ -26,18 +28,34 @@ class BaseStreamTestCase(unittest.HomeserverTestCase): """Base class for tests of the replication streams""" - def make_homeserver(self, reactor, clock): - self.test_handler = Mock(wraps=TestReplicationDataHandler()) - return self.setup_test_homeserver(replication_data_handler=self.test_handler) - def prepare(self, reactor, clock, hs): # build a replication server server_factory = ReplicationStreamProtocolFactory(hs) self.streamer = hs.get_replication_streamer() self.server = server_factory.buildProtocol(None) - repl_handler = ReplicationCommandHandler(hs) - repl_handler.handler = self.test_handler + # Make a new HomeServer object for the worker + config = self.default_config() + config["worker_app"] = "synapse.app.generic_worker" + + self.worker_hs = self.setup_test_homeserver( + http_client=None, + homeserverToUse=GenericWorkerServer, + config=config, + reactor=self.reactor, + ) + + self.test_handler = Mock( + wraps=TestReplicationDataHandler(self.worker_hs.get_datastore()) + ) + self.worker_hs.replication_data_handler = self.test_handler + + # Since we use sqlite in memory databases we need to make sure the + # databases objects are the same. + self.worker_hs.get_datastore().db = hs.get_datastore().db + + repl_handler = ReplicationCommandHandler(self.worker_hs) + self.client = ClientReplicationStreamProtocol( hs, "client", "test", clock, repl_handler, ) @@ -75,16 +93,15 @@ def replicate(self): self.pump(0.1) -class TestReplicationDataHandler: +class TestReplicationDataHandler(ReplicationDataHandler): """Drop-in for ReplicationDataHandler which just collects RDATA rows""" - def __init__(self): + def __init__(self, hs): + super().__init__(hs) self.streams = set() self._received_rdata_rows = [] async def on_rdata(self, stream_name, token, rows): + await super().on_rdata(stream_name, token, rows) for r in rows: self._received_rdata_rows.append((stream_name, token, r)) - - async def on_position(self, stream_name, token): - pass diff --git a/tests/replication/tcp/streams/test_receipts.py b/tests/replication/tcp/streams/test_receipts.py index a0206f7363e6..e26fe0943f9d 100644 --- a/tests/replication/tcp/streams/test_receipts.py +++ b/tests/replication/tcp/streams/test_receipts.py @@ -24,7 +24,6 @@ def test_receipt(self): self.reconnect() # make the client subscribe to the receipts stream - self.test_handler.streams.add("receipts") # tell the master to send a new receipt self.get_success(