From 0f8abe994d02f1e69606457632d62009f2de7678 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 8 Oct 2020 10:56:13 +0100 Subject: [PATCH 1/5] Fix threadsafety in ThreadedMemoryReactorClock This could, very occasionally, cause: ``` tests.test_visibility.FilterEventsForServerTestCase.test_large_room =============================================================================== [ERROR] Traceback (most recent call last): File "/src/tests/rest/media/v1/test_media_storage.py", line 86, in test_ensure_media_is_in_local_cache self.wait_on_thread(x) File "/src/tests/unittest.py", line 296, in wait_on_thread self.reactor.advance(0.01) File "/src/.tox/py35/lib/python3.5/site-packages/twisted/internet/task.py", line 826, in advance self._sortCalls() File "/src/.tox/py35/lib/python3.5/site-packages/twisted/internet/task.py", line 787, in _sortCalls self.calls.sort(key=lambda a: a.getTime()) builtins.ValueError: list modified during sort tests.rest.media.v1.test_media_storage.MediaStorageTests.test_ensure_media_is_in_local_cache ``` --- changelog.d/8497.misc | 1 + tests/server.py | 23 +++++++++++++++++++---- 2 files changed, 20 insertions(+), 4 deletions(-) create mode 100644 changelog.d/8497.misc diff --git a/changelog.d/8497.misc b/changelog.d/8497.misc new file mode 100644 index 000000000000..8bc05e8df63b --- /dev/null +++ b/changelog.d/8497.misc @@ -0,0 +1 @@ +Fix a threadsafety bug in unit tests. diff --git a/tests/server.py b/tests/server.py index f7f5276b2152..a8366026b67b 100644 --- a/tests/server.py +++ b/tests/server.py @@ -1,6 +1,8 @@ import json import logging +from collections import deque from io import SEEK_END, BytesIO +from typing import Any, Callable, Deque, List import attr from zope.interface import implementer @@ -251,6 +253,7 @@ def __init__(self): self._tcp_callbacks = {} self._udp = [] lookups = self.lookups = {} + self._thread_callbacks = deque() # type: Deque[Callable[[], None]]() @implementer(IResolverSimple) class FakeResolver: @@ -272,10 +275,10 @@ def callFromThread(self, callback, *args, **kwargs): """ Make the callback fire in the next reactor iteration. """ - d = Deferred() - d.addCallback(lambda x: callback(*args, **kwargs)) - self.callLater(0, d.callback, True) - return d + cb = lambda: callback(*args, **kwargs) + # it's not safe to call callLater() here, so we append the callback to a + # separate queue. + self._thread_callbacks.append(cb) def getThreadPool(self): return self.threadpool @@ -303,6 +306,18 @@ def connectTCP(self, host, port, factory, timeout=30, bindAddress=None): return conn + def advance(self, amount): + # run any "callFromThread" callbacks before anything registered with + # "callLater" + while True: + try: + callback = self._thread_callbacks.popleft() + except IndexError: + break + callback() + + super().advance(amount) + class ThreadPool: """ From a6690aee63870cdb4fca8ac2325cce5e00d7e0a8 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 8 Oct 2020 11:43:25 +0100 Subject: [PATCH 2/5] Fix imports and comments --- tests/server.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/server.py b/tests/server.py index a8366026b67b..5e7a7ccdf52e 100644 --- a/tests/server.py +++ b/tests/server.py @@ -2,7 +2,7 @@ import logging from collections import deque from io import SEEK_END, BytesIO -from typing import Any, Callable, Deque, List +from typing import Callable, Deque import attr from zope.interface import implementer @@ -307,8 +307,7 @@ def connectTCP(self, host, port, factory, timeout=30, bindAddress=None): return conn def advance(self, amount): - # run any "callFromThread" callbacks before anything registered with - # "callLater" + # first run any "callFromThread" callbacks while True: try: callback = self._thread_callbacks.popleft() @@ -316,6 +315,7 @@ def advance(self, amount): break callback() + # now let MemoryReactorClock run any pending "callLater" callbacks. super().advance(amount) From de92759eaf58e62d187ba3824f153c2dcd4635bb Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 8 Oct 2020 13:35:19 +0100 Subject: [PATCH 3/5] fix tests --- tests/server.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/tests/server.py b/tests/server.py index 5e7a7ccdf52e..72abfb1556e5 100644 --- a/tests/server.py +++ b/tests/server.py @@ -307,7 +307,11 @@ def connectTCP(self, host, port, factory, timeout=30, bindAddress=None): return conn def advance(self, amount): - # first run any "callFromThread" callbacks + # first advance our reactor's time, and run any "callLater" callbacks that + # makes ready + super().advance(amount) + + # now run any "callFromThread" callbacks while True: try: callback = self._thread_callbacks.popleft() @@ -315,8 +319,16 @@ def advance(self, amount): break callback() - # now let MemoryReactorClock run any pending "callLater" callbacks. - super().advance(amount) + # check for more "callLater" callbacks added by the thread callback + # This isn't required in a regular reactor, but it ends up meaning that + # our database queries can complete in a single call to `advance` [1] which + # simplifies tests. + # + # [1]: we replace the threadbool backing the db connection pool with a + # mock ThreadPool which doesn't really use threads; but still use + # reactor.callFromThread to feed results back from the db functions to the + # main thread. + super().advance(0) class ThreadPool: From fbbc8142bb941c863f317c6d17d296fe2f502a78 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 9 Oct 2020 11:29:42 +0100 Subject: [PATCH 4/5] fix on python 3.5 --- tests/server.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/server.py b/tests/server.py index 72abfb1556e5..799f1cf95f44 100644 --- a/tests/server.py +++ b/tests/server.py @@ -2,9 +2,10 @@ import logging from collections import deque from io import SEEK_END, BytesIO -from typing import Callable, Deque +from typing import Callable import attr +from typing_extensions import Deque from zope.interface import implementer from twisted.internet import address, threads, udp From 0d7c3b686715d4b6d47ab7ed696857ba4900f4e7 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 9 Oct 2020 16:31:53 +0100 Subject: [PATCH 5/5] Apply suggestions from code review Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> --- tests/server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/server.py b/tests/server.py index 799f1cf95f44..422c8b42ca51 100644 --- a/tests/server.py +++ b/tests/server.py @@ -325,8 +325,8 @@ def advance(self, amount): # our database queries can complete in a single call to `advance` [1] which # simplifies tests. # - # [1]: we replace the threadbool backing the db connection pool with a - # mock ThreadPool which doesn't really use threads; but still use + # [1]: we replace the threadpool backing the db connection pool with a + # mock ThreadPool which doesn't really use threads; but we still use # reactor.callFromThread to feed results back from the db functions to the # main thread. super().advance(0)