Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix bug where 5s delays would occasionally happen. #15150

Merged
merged 4 commits into from
Feb 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15150.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.76 where 5s delays would occasionally occur in deployments using workers.
18 changes: 18 additions & 0 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,24 @@ async def _run_notifier_loop(self) -> None:
except Exception:
logger.exception("Failed to replicate")

# The last token we send may not match the current
# token, in which case we want to send out a `POSITION`
# to tell other workers the actual current position.
Comment on lines +241 to +243
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whyyy do we want to do this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the other workers then may wait for a long time for that position to be sent over replication, due to #14820.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeahh, but could that be in the comment pls pls?

if updates[-1][0] < current_token:
logger.info(
"Sending position: %s -> %s",
stream.NAME,
current_token,
)
self.command_handler.send_command(
PositionCommand(
stream.NAME,
self._instance_name,
updates[-1][0],
current_token,
)
)

logger.debug("No more pending updates, breaking poke loop")
finally:
self.pending_updates = False
Expand Down
61 changes: 61 additions & 0 deletions tests/replication/tcp/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,64 @@ def test_wait_for_stream_position(self) -> None:
self.get_success(ctx_worker1.__aexit__(None, None, None))

self.assertTrue(d.called)

def test_wait_for_stream_position_rdata(self) -> None:
"""Check that wait for stream position correctly waits for an update
from the correct instance, when RDATA is sent.
"""
store = self.hs.get_datastores().main
cmd_handler = self.hs.get_replication_command_handler()
data_handler = self.hs.get_replication_data_handler()

worker1 = self.make_worker_hs(
"synapse.app.generic_worker",
extra_config={
"worker_name": "worker1",
"run_background_tasks_on": "worker1",
"redis": {"enabled": True},
},
)

cache_id_gen = worker1.get_datastores().main._cache_id_gen
assert cache_id_gen is not None

self.replicate()

# First, make sure the master knows that `worker1` exists.
initial_token = cache_id_gen.get_current_token()
cmd_handler.send_command(
PositionCommand("caches", "worker1", initial_token, initial_token)
)
self.replicate()

# `wait_for_stream_position` should only return once master receives a
# notification that `next_token2` has persisted.
ctx_worker1 = cache_id_gen.get_next_mult(2)
next_token1, next_token2 = self.get_success(ctx_worker1.__aenter__())

d = defer.ensureDeferred(
data_handler.wait_for_stream_position("worker1", "caches", next_token2)
)
self.assertFalse(d.called)

# Insert an entry into the cache stream with token `next_token1`, but
# not `next_token2`.
self.get_success(
store.db_pool.simple_insert(
table="cache_invalidation_stream_by_instance",
values={
"stream_id": next_token1,
"instance_name": "worker1",
"cache_func": "foo",
"keys": [],
"invalidation_ts": 0,
},
)
)

# Finish the context manager, triggering the data to be sent to master.
self.get_success(ctx_worker1.__aexit__(None, None, None))

# Master should get told about `next_token2`, so the deferred should
# resolve.
self.assertTrue(d.called)