Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix bug where 5s delays would occasionally happen. #15150

Merged
merged 4 commits into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15150.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug when using workers where 5s delays would occaisonally occur. Introduced in v1.76.0.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
18 changes: 18 additions & 0 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,24 @@ async def _run_notifier_loop(self) -> None:
except Exception:
logger.exception("Failed to replicate")

# The last token we send may not match the current
# token, in which case we want to send out a `POSITION`
# to tell other workers the actual current position.
Comment on lines +241 to +243
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whyyy do we want to do this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the other workers then may wait for a long time for that position to be sent over replication, due to #14820.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeahh, but could that be in the comment pls pls?

if updates[-1][0] < current_token:
logger.info(
"Sending position: %s -> %s",
stream.NAME,
current_token,
)
self.command_handler.send_command(
PositionCommand(
stream.NAME,
self._instance_name,
updates[-1][0],
current_token,
)
)

logger.debug("No more pending updates, breaking poke loop")
finally:
self.pending_updates = False
Expand Down
60 changes: 60 additions & 0 deletions tests/replication/tcp/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,63 @@ def test_wait_for_stream_position(self) -> None:
self.get_success(ctx_worker1.__aexit__(None, None, None))

self.assertTrue(d.called)

def test_wait_for_stream_position_rdata(self) -> None:
"""Check that wait for stream position correctly waits for an update
from the correct instance, when RDATA is sent.
"""
store = self.hs.get_datastores().main
cmd_handler = self.hs.get_replication_command_handler()
data_handler = self.hs.get_replication_data_handler()

worker1 = self.make_worker_hs(
"synapse.app.generic_worker",
extra_config={
"worker_name": "worker1",
"run_background_tasks_on": "worker1",
"redis": {"enabled": True},
},
)

cache_id_gen = worker1.get_datastores().main._cache_id_gen
assert cache_id_gen is not None

self.replicate()

# First, make sure the master knows that `worker1` exists.
initial_token = cache_id_gen.get_current_token()
cmd_handler.send_command(
PositionCommand("caches", "worker1", initial_token, initial_token)
)
self.replicate()

# `wait_for_stream_position` should only return once master receives a
# notification that `next_token2` has persisted.
ctx_worker1 = cache_id_gen.get_next_mult(2)
next_token1, next_token2 = self.get_success(ctx_worker1.__aenter__())

d = defer.ensureDeferred(
data_handler.wait_for_stream_position("worker1", "caches", next_token2)
)
self.assertFalse(d.called)

# Insert an entry into the cache stream with token `next_token1`, but
# not `next_token2`. The master should still get told about
# `next_token2` via a `POSITION`
self.get_success(
store.db_pool.simple_insert(
table="cache_invalidation_stream_by_instance",
values={
"stream_id": next_token1,
"instance_name": "worker1",
"cache_func": "foo",
"keys": [],
"invalidation_ts": 0,
},
)
)

# ... but worker1 finishing (and so sending an update) should.
self.get_success(ctx_worker1.__aexit__(None, None, None))
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

self.assertTrue(d.called)