From a5706183a871ffd37da869d610cc289ce888f334 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 26 Aug 2020 12:06:15 +0100 Subject: [PATCH 1/4] Fix `wait_for_stream_position` for multiple waiters. This fixes a bug where having multiple callers waiting on the same stream and position will cause it to try and compare two deferreds, which fails (due to the sorted list having an entry of `Tuple[int, Deferred]`). --- synapse/python_dependencies.py | 4 ++-- synapse/replication/tcp/client.py | 26 +++++++++++++++++++------- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index dd77a44b8db0..5294d2975d6a 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -66,8 +66,8 @@ "msgpack>=0.5.2", "phonenumbers>=8.2.0", "prometheus_client>=0.0.18,<0.9.0", - # we use attr.validators.deep_iterable, which arrived in 19.1.0 - "attrs>=19.1.0", + # we use attrs `order` param, which arrived in 19.2.0 + "attrs>=19.2.0", "netaddr>=0.7.18", "Jinja2>=2.9", "bleach>=1.4.3", diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index fcf8ebf1e74f..356feb738b5b 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -18,6 +18,8 @@ import logging from typing import TYPE_CHECKING, Dict, List, Tuple +import attr + from twisted.internet.defer import Deferred from twisted.internet.protocol import ReconnectingClientFactory @@ -109,9 +111,7 @@ def __init__(self, hs: "HomeServer"): # Map from stream to list of deferreds waiting for the stream to # arrive at a particular position. The lists are sorted by stream position. - self._streams_to_waiters = ( - {} - ) # type: Dict[str, List[Tuple[int, Deferred[None]]]] + self._streams_to_waiters = {} # type: Dict[str, List[_WaitForPositionEntry]] async def on_rdata( self, stream_name: str, instance_name: str, token: int, rows: list @@ -169,11 +169,11 @@ async def on_rdata( # `len(list)` works for both cases. index_of_first_deferred_not_called = len(waiting_list) - for idx, (position, deferred) in enumerate(waiting_list): - if position <= token: + for idx, entry in enumerate(waiting_list): + if entry.position <= token: try: with PreserveLoggingContext(): - deferred.callback(None) + entry.deferred.callback(None) except Exception: # The deferred has been cancelled or timed out. pass @@ -221,7 +221,7 @@ async def wait_for_stream_position( # We insert into the list using heapq as it is more efficient than # pushing then resorting each time. - heapq.heappush(waiting_list, (position, deferred)) + heapq.heappush(waiting_list, _WaitForPositionEntry(position, deferred)) # We measure here to get in flight counts and average waiting time. with Measure(self._clock, "repl.wait_for_stream_position"): @@ -230,3 +230,15 @@ async def wait_for_stream_position( logger.info( "Finished waiting for repl stream %r to reach %s", stream_name, position ) + + +@attr.s( + frozen=True, slots=True, order=True, +) +class _WaitForPositionEntry: + """Entry for type `_streams_to_waiters` that is comparable. A tuple can't + be used as `Deferred` is not comparable. + """ + + position = attr.ib(type=int) + deferred = attr.ib(type=Deferred, order=False) From 27a7b4b0a4f68b1435e42ac0f970f3a1ca046fb8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 28 Aug 2020 10:23:26 +0100 Subject: [PATCH 2/4] Newsfile --- changelog.d/8196.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8196.misc diff --git a/changelog.d/8196.misc b/changelog.d/8196.misc new file mode 100644 index 000000000000..c42baf0e56c6 --- /dev/null +++ b/changelog.d/8196.misc @@ -0,0 +1 @@ +Fix `wait_for_stream_position` to allow multiple waiters on same stream ID. From 3407cd8bc44eebbe653be900c9f6726ae5d7d12c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 28 Aug 2020 16:10:06 +0100 Subject: [PATCH 3/4] Rewrite to not require attrs 19.2 --- synapse/python_dependencies.py | 6 ++++-- synapse/replication/tcp/client.py | 28 ++++++++-------------------- 2 files changed, 12 insertions(+), 22 deletions(-) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 5294d2975d6a..2d995ec456a5 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -66,8 +66,10 @@ "msgpack>=0.5.2", "phonenumbers>=8.2.0", "prometheus_client>=0.0.18,<0.9.0", - # we use attrs `order` param, which arrived in 19.2.0 - "attrs>=19.2.0", + # we use attr.validators.deep_iterable, which arrived in 19.1.0 (Note: + # Fedora 31 only has 19.1, so if we want to upgrade we should wait until 33 + # is out in November.) + "attrs>=19.1.0", "netaddr>=0.7.18", "Jinja2>=2.9", "bleach>=1.4.3", diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 356feb738b5b..8b9c584afafa 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -14,12 +14,9 @@ # limitations under the License. """A replication client for use by synapse workers. """ -import heapq import logging from typing import TYPE_CHECKING, Dict, List, Tuple -import attr - from twisted.internet.defer import Deferred from twisted.internet.protocol import ReconnectingClientFactory @@ -111,7 +108,9 @@ def __init__(self, hs: "HomeServer"): # Map from stream to list of deferreds waiting for the stream to # arrive at a particular position. The lists are sorted by stream position. - self._streams_to_waiters = {} # type: Dict[str, List[_WaitForPositionEntry]] + self._streams_to_waiters = ( + {} + ) # type: Dict[str, List[Tuple[int, Deferred[None]]]] async def on_rdata( self, stream_name: str, instance_name: str, token: int, rows: list @@ -169,11 +168,11 @@ async def on_rdata( # `len(list)` works for both cases. index_of_first_deferred_not_called = len(waiting_list) - for idx, entry in enumerate(waiting_list): - if entry.position <= token: + for idx, (position, deferred) in enumerate(waiting_list): + if position <= token: try: with PreserveLoggingContext(): - entry.deferred.callback(None) + deferred.callback(None) except Exception: # The deferred has been cancelled or timed out. pass @@ -221,7 +220,8 @@ async def wait_for_stream_position( # We insert into the list using heapq as it is more efficient than # pushing then resorting each time. - heapq.heappush(waiting_list, _WaitForPositionEntry(position, deferred)) + waiting_list.append((position, deferred)) + waiting_list.sort(key=lambda t: t[0]) # We measure here to get in flight counts and average waiting time. with Measure(self._clock, "repl.wait_for_stream_position"): @@ -230,15 +230,3 @@ async def wait_for_stream_position( logger.info( "Finished waiting for repl stream %r to reach %s", stream_name, position ) - - -@attr.s( - frozen=True, slots=True, order=True, -) -class _WaitForPositionEntry: - """Entry for type `_streams_to_waiters` that is comparable. A tuple can't - be used as `Deferred` is not comparable. - """ - - position = attr.ib(type=int) - deferred = attr.ib(type=Deferred, order=False) From 7df3d12eec6646707525e122b82688d7ec30099c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 28 Aug 2020 16:16:16 +0100 Subject: [PATCH 4/4] Remove spurious comment --- synapse/replication/tcp/client.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 8b9c584afafa..d6ecf5b32703 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -218,8 +218,6 @@ async def wait_for_stream_position( waiting_list = self._streams_to_waiters.setdefault(stream_name, []) - # We insert into the list using heapq as it is more efficient than - # pushing then resorting each time. waiting_list.append((position, deferred)) waiting_list.sort(key=lambda t: t[0])