Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix threadsafety in ThreadedMemoryReactorClock #8497

Merged
merged 5 commits into from
Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8497.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a threadsafety bug in unit tests.
36 changes: 32 additions & 4 deletions tests/server.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import json
import logging
from collections import deque
from io import SEEK_END, BytesIO
from typing import Callable

import attr
from typing_extensions import Deque
from zope.interface import implementer

from twisted.internet import address, threads, udp
Expand Down Expand Up @@ -251,6 +254,7 @@ def __init__(self):
self._tcp_callbacks = {}
self._udp = []
lookups = self.lookups = {}
self._thread_callbacks = deque() # type: Deque[Callable[[], None]]()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you using a deque simply for the performance benefits of popleft?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm using it because I don't think a list is threadsafe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing some research it looks to be threadsafe in CPython, however using a deque is likely a safer bet.


@implementer(IResolverSimple)
class FakeResolver:
Expand All @@ -272,10 +276,10 @@ def callFromThread(self, callback, *args, **kwargs):
"""
Make the callback fire in the next reactor iteration.
"""
d = Deferred()
d.addCallback(lambda x: callback(*args, **kwargs))
self.callLater(0, d.callback, True)
return d
cb = lambda: callback(*args, **kwargs)
# it's not safe to call callLater() here, so we append the callback to a
# separate queue.
self._thread_callbacks.append(cb)

def getThreadPool(self):
return self.threadpool
Expand Down Expand Up @@ -303,6 +307,30 @@ def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):

return conn

def advance(self, amount):
# first advance our reactor's time, and run any "callLater" callbacks that
# makes ready
Comment on lines +311 to +312
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is indeed easier to understand - though I'm not sure if "makes ready" is a typo.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not a typo, though there may be better ways of expressing it.

Suppose our fake reactor thinks it is 12:00:00, and we have registered a callLater callback for 12:00:02. advance(5) means that the callback will now be "ready" to run: it was "made ready" by advancing the reactor.

super().advance(amount)

# now run any "callFromThread" callbacks
while True:
try:
callback = self._thread_callbacks.popleft()
except IndexError:
break
callback()

# check for more "callLater" callbacks added by the thread callback
# This isn't required in a regular reactor, but it ends up meaning that
# our database queries can complete in a single call to `advance` [1] which
# simplifies tests.
#
# [1]: we replace the threadbool backing the db connection pool with a
richvdh marked this conversation as resolved.
Show resolved Hide resolved
# mock ThreadPool which doesn't really use threads; but still use
richvdh marked this conversation as resolved.
Show resolved Hide resolved
# reactor.callFromThread to feed results back from the db functions to the
# main thread.
super().advance(0)


class ThreadPool:
"""
Expand Down