From f4165f9357184a3398ac2a2d4fa8dc755e82f341 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 24 Feb 2023 11:52:36 +0000 Subject: [PATCH 1/4] Fix bug where 5s delays would occaisonally happen. This only affects deployments using workers. --- synapse/replication/tcp/resource.py | 18 ++++++++ tests/replication/tcp/test_handler.py | 60 +++++++++++++++++++++++++++ 2 files changed, 78 insertions(+) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 9d17eff71451..347467d863ec 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -238,6 +238,24 @@ async def _run_notifier_loop(self) -> None: except Exception: logger.exception("Failed to replicate") + # The last token we send may not match the current + # token, in which case we want to send out a `POSITION` + # to tell other workers the actual current position. + if updates[-1][0] < current_token: + logger.info( + "Sending position: %s -> %s", + stream.NAME, + current_token, + ) + self.command_handler.send_command( + PositionCommand( + stream.NAME, + self._instance_name, + updates[-1][0], + current_token, + ) + ) + logger.debug("No more pending updates, breaking poke loop") finally: self.pending_updates = False diff --git a/tests/replication/tcp/test_handler.py b/tests/replication/tcp/test_handler.py index bf927beb6a87..f0868a80b1c5 100644 --- a/tests/replication/tcp/test_handler.py +++ b/tests/replication/tcp/test_handler.py @@ -141,3 +141,63 @@ def test_wait_for_stream_position(self) -> None: self.get_success(ctx_worker1.__aexit__(None, None, None)) self.assertTrue(d.called) + + def test_wait_for_stream_position_rdata(self) -> None: + """Check that wait for stream position correctly waits for an update + from the correct instance, when RDATA is sent. + """ + store = self.hs.get_datastores().main + cmd_handler = self.hs.get_replication_command_handler() + data_handler = self.hs.get_replication_data_handler() + + worker1 = self.make_worker_hs( + "synapse.app.generic_worker", + extra_config={ + "worker_name": "worker1", + "run_background_tasks_on": "worker1", + "redis": {"enabled": True}, + }, + ) + + cache_id_gen = worker1.get_datastores().main._cache_id_gen + assert cache_id_gen is not None + + self.replicate() + + # First, make sure the master knows that `worker1` exists. + initial_token = cache_id_gen.get_current_token() + cmd_handler.send_command( + PositionCommand("caches", "worker1", initial_token, initial_token) + ) + self.replicate() + + # `wait_for_stream_position` should only return once master receives a + # notification that `next_token2` has persisted. + ctx_worker1 = cache_id_gen.get_next_mult(2) + next_token1, next_token2 = self.get_success(ctx_worker1.__aenter__()) + + d = defer.ensureDeferred( + data_handler.wait_for_stream_position("worker1", "caches", next_token2) + ) + self.assertFalse(d.called) + + # Insert an entry into the cache stream with token `next_token1`, but + # not `next_token2`. The master should still get told about + # `next_token2` via a `POSITION` + self.get_success( + store.db_pool.simple_insert( + table="cache_invalidation_stream_by_instance", + values={ + "stream_id": next_token1, + "instance_name": "worker1", + "cache_func": "foo", + "keys": [], + "invalidation_ts": 0, + }, + ) + ) + + # ... but worker1 finishing (and so sending an update) should. + self.get_success(ctx_worker1.__aexit__(None, None, None)) + + self.assertTrue(d.called) From cebb0a5d180f25aa30e84e7a3608cd67c50889e0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 24 Feb 2023 11:55:59 +0000 Subject: [PATCH 2/4] Newsfile --- changelog.d/15150.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/15150.bugfix diff --git a/changelog.d/15150.bugfix b/changelog.d/15150.bugfix new file mode 100644 index 000000000000..7bb238c496bd --- /dev/null +++ b/changelog.d/15150.bugfix @@ -0,0 +1 @@ +Fix bug when using workers where 5s delays would occaisonally occur. Introduced in v1.76.0. From 2690a3b50dde7ef5349647ef07a26312dd8634cf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 24 Feb 2023 13:57:12 +0000 Subject: [PATCH 3/4] Update changelog.d/15150.bugfix Co-authored-by: David Robertson --- changelog.d/15150.bugfix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/15150.bugfix b/changelog.d/15150.bugfix index 7bb238c496bd..8668bc587f8b 100644 --- a/changelog.d/15150.bugfix +++ b/changelog.d/15150.bugfix @@ -1 +1 @@ -Fix bug when using workers where 5s delays would occaisonally occur. Introduced in v1.76.0. +Fix a bug introduced in Synapse 1.76 where 5s delays would occasionally occur in deployments using workers. From f1928a2befad7eb91fcc8e5f086e9396880c93fb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 24 Feb 2023 13:59:44 +0000 Subject: [PATCH 4/4] Words --- tests/replication/tcp/test_handler.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/replication/tcp/test_handler.py b/tests/replication/tcp/test_handler.py index f0868a80b1c5..bab77b2df787 100644 --- a/tests/replication/tcp/test_handler.py +++ b/tests/replication/tcp/test_handler.py @@ -182,8 +182,7 @@ def test_wait_for_stream_position_rdata(self) -> None: self.assertFalse(d.called) # Insert an entry into the cache stream with token `next_token1`, but - # not `next_token2`. The master should still get told about - # `next_token2` via a `POSITION` + # not `next_token2`. self.get_success( store.db_pool.simple_insert( table="cache_invalidation_stream_by_instance", @@ -197,7 +196,9 @@ def test_wait_for_stream_position_rdata(self) -> None: ) ) - # ... but worker1 finishing (and so sending an update) should. + # Finish the context manager, triggering the data to be sent to master. self.get_success(ctx_worker1.__aexit__(None, None, None)) + # Master should get told about `next_token2`, so the deferred should + # resolve. self.assertTrue(d.called)