diff --git a/Doc/library/asyncio-api-index.rst b/Doc/library/asyncio-api-index.rst index d5b5659abc65e2..047e5bbc58ccad 100644 --- a/Doc/library/asyncio-api-index.rst +++ b/Doc/library/asyncio-api-index.rst @@ -48,6 +48,9 @@ await on multiple things with timeouts. * - :class:`Task` - Task object. + * - :func:`to_thread` + - Asychronously run a function in a separate OS thread. + * - :func:`run_coroutine_threadsafe` - Schedule a coroutine from another OS thread. diff --git a/Doc/library/asyncio-task.rst b/Doc/library/asyncio-task.rst index 2e963398d93001..7c2704090551b6 100644 --- a/Doc/library/asyncio-task.rst +++ b/Doc/library/asyncio-task.rst @@ -602,6 +602,62 @@ Waiting Primitives # ... +Running in Threads +================== + +.. coroutinefunction:: to_thread(func, /, \*args, \*\*kwargs) + + Asynchronously run function *func* in a separate thread. + + Any \*args and \*\*kwargs supplied for this function are directly passed + to *func*. + + Return an :class:`asyncio.Future` which represents the eventual result of + *func*. + + This coroutine function is primarily intended to be used for executing + IO-bound functions/methods that would otherwise block the event loop if + they were ran in the main thread. For example:: + + def blocking_io(): + print(f"start blocking_io at {time.strftime('%X')}") + # Note that time.sleep() can be replaced with any blocking + # IO-bound operation, such as file operations. + time.sleep(1) + print(f"blocking_io complete at {time.strftime('%X')}") + + async def main(): + print(f"started main at {time.strftime('%X')}") + + await asyncio.gather( + asyncio.to_thread(blocking_io), + asyncio.sleep(1)) + + print(f"finished main at {time.strftime('%X')}") + + + asyncio.run(main()) + + # Expected output: + # + # started main at 19:50:53 + # start blocking_io at 19:50:53 + # blocking_io complete at 19:50:54 + # finished main at 19:50:54 + + Directly calling `blocking_io()` in any coroutine would block the event loop + for its duration, resulting in an additional 1 second of run time. Instead, + by using `asyncio.to_thread()`, we can run it in a separate thread without + blocking the event loop. + + .. note:: + + Due to the :term:`GIL`, `asyncio.to_thread()` can typically only be used + to make IO-bound functions non-blocking. However, for extension modules + that release the GIL or alternative Python implementations that don't + have one, `asyncio.to_thread()` can also be used for CPU-bound functions. + + Scheduling From Other Threads ============================= diff --git a/Doc/whatsnew/3.9.rst b/Doc/whatsnew/3.9.rst index 593f523828703e..037e1055c79e52 100644 --- a/Doc/whatsnew/3.9.rst +++ b/Doc/whatsnew/3.9.rst @@ -282,6 +282,12 @@ that schedules a shutdown for the default executor that waits on the Added :class:`asyncio.PidfdChildWatcher`, a Linux-specific child watcher implementation that polls process file descriptors. (:issue:`38692`) +Added a new :term:`coroutine` :func:`asyncio.to_thread`. It is mainly used for +running IO-bound functions in a separate thread to avoid blocking the event +loop, and essentially works as a high-level version of +:meth:`~asyncio.loop.run_in_executor` that can directly take keyword arguments. +(Contributed by Kyle Stanley and Yury Selivanov in :issue:`32309`.) + compileall ---------- diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py index 28c2e2c429f34a..eb84bfb189ccf3 100644 --- a/Lib/asyncio/__init__.py +++ b/Lib/asyncio/__init__.py @@ -17,6 +17,7 @@ from .streams import * from .subprocess import * from .tasks import * +from .threads import * from .transports import * # Exposed for _asynciomodule.c to implement now deprecated @@ -35,6 +36,7 @@ streams.__all__ + subprocess.__all__ + tasks.__all__ + + threads.__all__ + transports.__all__) if sys.platform == 'win32': # pragma: no cover diff --git a/Lib/asyncio/threads.py b/Lib/asyncio/threads.py new file mode 100644 index 00000000000000..2f40467fe5bc7b --- /dev/null +++ b/Lib/asyncio/threads.py @@ -0,0 +1,21 @@ +"""High-level support for working with threads in asyncio""" + +import functools + +from . import events + + +__all__ = "to_thread", + + +async def to_thread(func, /, *args, **kwargs): + """Asynchronously run function *func* in a separate thread. + + Any *args and **kwargs supplied for this function are directly passed + to *func*. + + Return an asyncio.Future which represents the eventual result of *func*. + """ + loop = events.get_running_loop() + func_call = functools.partial(func, *args, **kwargs) + return await loop.run_in_executor(None, func_call) diff --git a/Lib/test/test_asyncio/test_threads.py b/Lib/test/test_asyncio/test_threads.py new file mode 100644 index 00000000000000..99a00f21832f3e --- /dev/null +++ b/Lib/test/test_asyncio/test_threads.py @@ -0,0 +1,79 @@ +"""Tests for asyncio/threads.py""" + +import asyncio +import unittest + +from unittest import mock +from test.test_asyncio import utils as test_utils + + +def tearDownModule(): + asyncio.set_event_loop_policy(None) + + +class ToThreadTests(test_utils.TestCase): + def setUp(self): + super().setUp() + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def tearDown(self): + self.loop.run_until_complete( + self.loop.shutdown_default_executor()) + self.loop.close() + asyncio.set_event_loop(None) + self.loop = None + super().tearDown() + + def test_to_thread(self): + async def main(): + return await asyncio.to_thread(sum, [40, 2]) + + result = self.loop.run_until_complete(main()) + self.assertEqual(result, 42) + + def test_to_thread_exception(self): + def raise_runtime(): + raise RuntimeError("test") + + async def main(): + await asyncio.to_thread(raise_runtime) + + with self.assertRaisesRegex(RuntimeError, "test"): + self.loop.run_until_complete(main()) + + def test_to_thread_once(self): + func = mock.Mock() + + async def main(): + await asyncio.to_thread(func) + + self.loop.run_until_complete(main()) + func.assert_called_once() + + def test_to_thread_concurrent(self): + func = mock.Mock() + + async def main(): + futs = [] + for _ in range(10): + fut = asyncio.to_thread(func) + futs.append(fut) + await asyncio.gather(*futs) + + self.loop.run_until_complete(main()) + self.assertEqual(func.call_count, 10) + + def test_to_thread_args_kwargs(self): + # Unlike run_in_executor(), to_thread() should directly accept kwargs. + func = mock.Mock() + + async def main(): + await asyncio.to_thread(func, 'test', something=True) + + self.loop.run_until_complete(main()) + func.assert_called_once_with('test', something=True) + + +if __name__ == "__main__": + unittest.main() diff --git a/Misc/NEWS.d/next/Library/2020-05-17-02-03-09.bpo-32309.KM9psl.rst b/Misc/NEWS.d/next/Library/2020-05-17-02-03-09.bpo-32309.KM9psl.rst new file mode 100644 index 00000000000000..6272c35edf4d57 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-05-17-02-03-09.bpo-32309.KM9psl.rst @@ -0,0 +1,4 @@ +Added a new :term:`coroutine` :func:`asyncio.to_thread`. It is mainly used for +running IO-bound functions in a separate thread to avoid blocking the event +loop, and essentially works as a high-level version of +:meth:`~asyncio.loop.run_in_executor` that can directly take keyword arguments. \ No newline at end of file