Skip to content

Commit

Permalink
Add thread identity check to add_callback (#2469)
Browse files Browse the repository at this point in the history
* Add thread identity check to add_callback

Fixes #2463

This reduces the overhead of add_callback when called on the same thread
as the event loop.  This uses asyncio's call_soon rather than
call_soon_threadsafe.

* Separately define add_callback_from_signal
  • Loading branch information
mrocklin authored and bdarnell committed Sep 16, 2018
1 parent 3fe23bc commit 627eafb
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 4 deletions.
23 changes: 21 additions & 2 deletions tornado/platform/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import concurrent.futures
import functools

from threading import get_ident
from tornado.gen import convert_yielded
from tornado.ioloop import IOLoop, _Selectable

Expand Down Expand Up @@ -60,8 +61,16 @@ def initialize(self, asyncio_loop: asyncio.AbstractEventLoop, # type: ignore
if loop.is_closed():
del IOLoop._ioloop_for_asyncio[loop]
IOLoop._ioloop_for_asyncio[asyncio_loop] = self

self._thread_identity = 0

super(BaseAsyncIOLoop, self).initialize(**kwargs)

def assign_thread_identity() -> None:
self._thread_identity = get_ident()

self.add_callback(assign_thread_identity)

def close(self, all_fds: bool=False) -> None:
self.closing = True
for fd in list(self.handlers):
Expand Down Expand Up @@ -157,8 +166,12 @@ def remove_timeout(self, timeout: object) -> None:
timeout.cancel() # type: ignore

def add_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None:
if get_ident() == self._thread_identity:
call_soon = self.asyncio_loop.call_soon
else:
call_soon = self.asyncio_loop.call_soon_threadsafe
try:
self.asyncio_loop.call_soon_threadsafe(
call_soon(
self._run_callback,
functools.partial(callback, *args, **kwargs))
except RuntimeError:
Expand All @@ -169,7 +182,13 @@ def add_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None:
# eventually execute).
pass

add_callback_from_signal = add_callback
def add_callback_from_signal(self, callback: Callable, *args: Any, **kwargs: Any) -> None:
try:
self.asyncio_loop.call_soon_threadsafe(
self._run_callback,
functools.partial(callback, *args, **kwargs))
except RuntimeError:
pass

def run_in_executor(self, executor: Optional[concurrent.futures.Executor],
func: Callable[..., _T], *args: Any) -> Awaitable[_T]:
Expand Down
10 changes: 8 additions & 2 deletions tornado/test/httpclient_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,17 +547,23 @@ def test_str(self):
class SyncHTTPClientTest(unittest.TestCase):
def setUp(self):
self.server_ioloop = IOLoop()
event = threading.Event()

@gen.coroutine
def init_server():
sock, self.port = bind_unused_port()
app = Application([('/', HelloWorldHandler)])
self.server = HTTPServer(app)
self.server.add_socket(sock)
self.server_ioloop.run_sync(init_server)
event.set()

self.server_thread = threading.Thread(target=self.server_ioloop.start)
def start():
self.server_ioloop.run_sync(init_server)
self.server_ioloop.start()

self.server_thread = threading.Thread(target=start)
self.server_thread.start()
event.wait()

self.http_client = HTTPClient()

Expand Down

0 comments on commit 627eafb

Please sign in to comment.