Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix wait_for_stream_position for multiple waiters. #8196

Merged
merged 4 commits into from
Aug 28, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8196.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix `wait_for_stream_position` to allow multiple waiters on same stream ID.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a bugfix as this can't currently happen, it only starts being a problem with sharded event persister.

I also plan to squash all these changelogs together anyway.

4 changes: 2 additions & 2 deletions synapse/python_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@
"msgpack>=0.5.2",
"phonenumbers>=8.2.0",
"prometheus_client>=0.0.18,<0.9.0",
# we use attr.validators.deep_iterable, which arrived in 19.1.0
"attrs>=19.1.0",
# we use attrs `order` param, which arrived in 19.2.0
"attrs>=19.2.0",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be fine, but I can also implement the comparison functions manually (its just so much more verbose).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we checked with packagers and no one had an issue with it.

"netaddr>=0.7.18",
"Jinja2>=2.9",
"bleach>=1.4.3",
Expand Down
26 changes: 19 additions & 7 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import logging
from typing import TYPE_CHECKING, Dict, List, Tuple

import attr

from twisted.internet.defer import Deferred
from twisted.internet.protocol import ReconnectingClientFactory

Expand Down Expand Up @@ -109,9 +111,7 @@ def __init__(self, hs: "HomeServer"):

# Map from stream to list of deferreds waiting for the stream to
# arrive at a particular position. The lists are sorted by stream position.
self._streams_to_waiters = (
{}
) # type: Dict[str, List[Tuple[int, Deferred[None]]]]
self._streams_to_waiters = {} # type: Dict[str, List[_WaitForPositionEntry]]

async def on_rdata(
self, stream_name: str, instance_name: str, token: int, rows: list
Expand Down Expand Up @@ -169,11 +169,11 @@ async def on_rdata(
# `len(list)` works for both cases.
index_of_first_deferred_not_called = len(waiting_list)

for idx, (position, deferred) in enumerate(waiting_list):
if position <= token:
for idx, entry in enumerate(waiting_list):
if entry.position <= token:
try:
with PreserveLoggingContext():
deferred.callback(None)
entry.deferred.callback(None)
except Exception:
# The deferred has been cancelled or timed out.
pass
Expand Down Expand Up @@ -221,7 +221,7 @@ async def wait_for_stream_position(

# We insert into the list using heapq as it is more efficient than
# pushing then resorting each time.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this comment should be updated now? 🤨

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woops

heapq.heappush(waiting_list, (position, deferred))
heapq.heappush(waiting_list, _WaitForPositionEntry(position, deferred))

# We measure here to get in flight counts and average waiting time.
with Measure(self._clock, "repl.wait_for_stream_position"):
Expand All @@ -230,3 +230,15 @@ async def wait_for_stream_position(
logger.info(
"Finished waiting for repl stream %r to reach %s", stream_name, position
)


@attr.s(
frozen=True, slots=True, order=True,
)
class _WaitForPositionEntry:
"""Entry for type `_streams_to_waiters` that is comparable. A tuple can't
be used as `Deferred` is not comparable.
"""

position = attr.ib(type=int)
deferred = attr.ib(type=Deferred, order=False)