From 11d7db26cd5d20601788a8a2d4dd78b6b56357bb Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Mon, 25 May 2020 02:43:49 -0700 Subject: [PATCH 01/38] First pass attempt at "guest mode" To allow Trio to efficiently cohabitate with arbitrary other loops. --- docs/source/design.rst | 71 +++++++++++++++ notes-to-self/aio-guest-test.py | 48 ++++++++++ trio/_core/__init__.py | 1 + trio/_core/_io_epoll.py | 30 ++++++- trio/_core/_io_kqueue.py | 22 ++++- trio/_core/_io_windows.py | 19 +++- trio/_core/_run.py | 153 ++++++++++++++++++++++++++------ trio/lowlevel.py | 1 + 8 files changed, 306 insertions(+), 39 deletions(-) create mode 100644 notes-to-self/aio-guest-test.py diff --git a/docs/source/design.rst b/docs/source/design.rst index 6251f22cdb..55eaf6d652 100644 --- a/docs/source/design.rst +++ b/docs/source/design.rst @@ -534,3 +534,74 @@ there is no provision for "pluggable" backends. The intuition here is that we'd rather focus our energy on making one set of solid, official backends that provide a high-quality experience out-of-the-box on all supported systems. + + +Guest mode +---------- + +XX TODO: document this properly + +the basic idea of pushing ``get_events`` into a thread + +actual core logic is identical whether running in regular mode or +guest mode; alternate between waiting for I/O+timeout vs running trio tasks + +one extra wrinkle is: normally tasks can only become runnable, and +deadline can only change, if trio task is running. In guest mode, +that's no longer true. So reschedule() and deadline changes need to +potentially trigger the scheduler, or at least update the I/O +deadline. Do this in the simplest possible way: force the I/O thread +to return immediately via the normal path. + +subtlety around wait_all_tasks_blocked and 'events' semantics + +diagram:: + + + Normal mode + + Main thread executing trio.run: + + +---------------------------+ + | wait for I/O+timeout | + +---------------------------+ + | run trio tasks | + +---------------------------+ + | wait for I/O+timeout | + +---------------------------+ + | run trio tasks | + +---------------------------+ + | wait for I/O+timeout | + +---------------------------+ + | run trio tasks | + +---------------------------+ + . + . + . + + + Guest mode + + Main thread executing host loop: Trio I/O thread: + + +---------------------------+ + | host loop does its thing | +---------------------------+ + | | | wait for trio I/O+timeout | + +---------------------------+ +---------------------------+ + / + +---------------------------+ <---------------/ + | run trio tasks | + +---------------------------+ ----------------\ + \ + +---------------------------+ v + | host loop does its thing | +---------------------------+ + | | | wait for trio I/O+timeout | + +---------------------------+ +---------------------------+ + / + +---------------------------+ <---------------/ + | run trio tasks | + +---------------------------+ ----------------\ + \ + . + . + . diff --git a/notes-to-self/aio-guest-test.py b/notes-to-self/aio-guest-test.py new file mode 100644 index 0000000000..5e9b398132 --- /dev/null +++ b/notes-to-self/aio-guest-test.py @@ -0,0 +1,48 @@ +import asyncio +import trio + +async def aio_main(): + loop = asyncio.get_running_loop() + + trio_done_fut = asyncio.Future() + def trio_done_callback(main_outcome): + print(f"trio_main finished: {main_outcome!r}") + trio_done_fut.set_result(main_outcome) + + trio.lowlevel.start_guest_run( + trio_main, + run_sync_soon_threadsafe=loop.call_soon_threadsafe, + done_callback=trio_done_callback, + ) + + (await trio_done_fut).unwrap() + + +async def trio_main(): + print("trio_main!") + + to_trio, from_aio = trio.open_memory_channel(float("inf")) + from_trio = asyncio.Queue() + + asyncio.create_task(aio_pingpong(from_trio, to_trio)) + + from_trio.put_nowait(0) + + async for n in from_aio: + print(f"trio got: {n}") + await trio.sleep(1) + from_trio.put_nowait(n + 1) + if n >= 10: + return + +async def aio_pingpong(from_trio, to_trio): + print("aio_pingpong!") + + while True: + n = await from_trio.get() + print(f"aio got: {n}") + await asyncio.sleep(1) + to_trio.send_nowait(n + 1) + + +asyncio.run(aio_main()) diff --git a/trio/_core/__init__.py b/trio/_core/__init__.py index 136bfe6b98..e22485149d 100644 --- a/trio/_core/__init__.py +++ b/trio/_core/__init__.py @@ -48,6 +48,7 @@ wait_writable, notify_closing, Nursery, + start_guest_run, ) # Has to come after _run to resolve a circular import diff --git a/trio/_core/_io_epoll.py b/trio/_core/_io_epoll.py index 71f46c40a7..da331b35e8 100644 --- a/trio/_core/_io_epoll.py +++ b/trio/_core/_io_epoll.py @@ -5,6 +5,7 @@ from .. import _core from ._run import _public from ._io_common import wake_all +from ._wakeup_socketpair import WakeupSocketpair @attr.s(slots=True, eq=False, frozen=True) @@ -184,6 +185,10 @@ class EpollIOManager: _epoll = attr.ib(factory=select.epoll) # {fd: EpollWaiters} _registered = attr.ib(factory=lambda: defaultdict(EpollWaiters)) + _force_wakeup = attr.ib(factory=WakeupSocketpair) + + def __attrs_post_init__(self): + self._epoll.register(self._force_wakeup.wakeup_sock, select.EPOLLIN) def statistics(self): tasks_waiting_read = 0 @@ -200,14 +205,31 @@ def statistics(self): def close(self): self._epoll.close() + self._force_wakeup.close() + + def force_wakeup(self): + self._force_wakeup.wakeup_thread_and_signal_safe() - # Called internally by the task runner: - def handle_io(self, timeout): + # Return value must be False-y IFF the timeout expired, NOT if any I/O + # happened or force_wakeup was called. Otherwise it can be anything; gets + # passed straight through to process_events. + def get_events(self, timeout): # max_events must be > 0 or epoll gets cranky + # accessing self._registered from a thread looks dangerous, but it's + # OK because it doesn't matter if our value is a little bit off. max_events = max(1, len(self._registered)) - events = self._epoll.poll(timeout, max_events) + return self._epoll.poll(timeout, max_events) + + def process_events(self, events): for fd, flags in events: - waiters = self._registered[fd] + try: + waiters = self._registered[fd] + except KeyError: + if fd == self._force_wakeup.wakeup_sock.fileno(): + self._force_wakeup.drain() + continue + else: + raise # EPOLLONESHOT always clears the flags when an event is delivered waiters.current_flags = 0 # Clever hack stolen from selectors.EpollSelector: an event diff --git a/trio/_core/_io_kqueue.py b/trio/_core/_io_kqueue.py index b194e85f53..3eee7d4988 100644 --- a/trio/_core/_io_kqueue.py +++ b/trio/_core/_io_kqueue.py @@ -7,6 +7,7 @@ from .. import _core from ._run import _public +from ._wakeup_socketpair import WakeupSocketpair @attr.s(slots=True, eq=False, frozen=True) @@ -21,6 +22,13 @@ class KqueueIOManager: _kqueue = attr.ib(factory=select.kqueue) # {(ident, filter): Task or UnboundedQueue} _registered = attr.ib(factory=dict) + _force_wakeup = attr.ib(factory=WakeupSocketpair) + + def __attrs_post_init__(self): + force_wakeup_event = select.kevent( + self._force_wakeup.wakeup_sock, select.KQ_FILTER_READ, select.KQ_EV_ADD + ) + self._kqueue.control([force_wakeup_event], 0) def statistics(self): tasks_waiting = 0 @@ -35,7 +43,7 @@ def statistics(self): def close(self): self._kqueue.close() - def handle_io(self, timeout): + def get_events(self, timeout): # max_events must be > 0 or kqueue gets cranky # and we generally want this to be strictly larger than the actual # number of events we get, so that we can tell that we've gotten @@ -50,9 +58,19 @@ def handle_io(self, timeout): else: timeout = 0 # and loop back to the start + return events + + def process_events(self, events): for event in events: key = (event.ident, event.filter) - receiver = self._registered[key] + try: + receiver = self._registered[key] + except KeyError: + if event.ident == self._force_wakeup.wakeup_sock.fileno(): + self._force_wakeup.drain() + continue + else: + raise if event.flags & select.KQ_EV_ONESHOT: del self._registered[key] if type(receiver) is _core.Task: diff --git a/trio/_core/_io_windows.py b/trio/_core/_io_windows.py index 57056e465e..bd034c27fb 100644 --- a/trio/_core/_io_windows.py +++ b/trio/_core/_io_windows.py @@ -171,7 +171,8 @@ class CKeys(enum.IntEnum): AFD_POLL = 0 WAIT_OVERLAPPED = 1 LATE_CANCEL = 2 - USER_DEFINED = 3 # and above + FORCE_WAKEUP = 3 + USER_DEFINED = 4 # and above def _check(success): @@ -388,7 +389,12 @@ def statistics(self): completion_key_monitors=len(self._completion_key_queues), ) - def handle_io(self, timeout): + def force_wakeup(self): + _check( + kernel32.PostQueuedCompletionStatus(self._iocp, 0, CKeys.FORCE_WAKEUP, 0) + ) + + def get_events(self, timeout): received = ffi.new("PULONG") milliseconds = round(1000 * timeout) if timeout > 0 and milliseconds == 0: @@ -402,8 +408,11 @@ def handle_io(self, timeout): except OSError as exc: if exc.winerror != ErrorCodes.WAIT_TIMEOUT: # pragma: no cover raise - return - for i in range(received[0]): + return 0 + return received[0] + + def process_events(received): + for i in range(received): entry = self._events[i] if entry.lpCompletionKey == CKeys.AFD_POLL: lpo = entry.lpOverlapped @@ -465,6 +474,8 @@ def handle_io(self, timeout): # try changing this line to # _core.reschedule(waiter, outcome.Error(exc)) raise exc + elif entry.lpCompletionKey == CKeys.FORCE_WAKEUP: + pass else: # dispatch on lpCompletionKey queue = self._completion_key_queues[entry.lpCompletionKey] diff --git a/trio/_core/_run.py b/trio/_core/_run.py index 4de4e9a2bf..763bcfa70b 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -507,10 +507,22 @@ def _might_change_registered_deadline(self): if old != new: self._registered_deadline = new runner = GLOBAL_RUN_CONTEXT.runner + if runner.is_guest: + try: + (old_next_deadline, _), _ = runner.deadlines.peekitem(0) + except IndexError: + old_next_deadline = None if old != inf: del runner.deadlines[old, id(self)] if new != inf: runner.deadlines[new, id(self)] = self + if runner.is_guest: + try: + (new_next_deadline, _), _ = runner.deadlines.peekitem(0) + except IndexError: + new_next_deadline = None + if old_next_deadline != new_next_deadline: + runner.force_guest_tick_asap() @property def deadline(self): @@ -1118,6 +1130,62 @@ class Runner: entry_queue = attr.ib(factory=EntryQueue) trio_token = attr.ib(default=None) + # Guest mode stuff + is_guest = attr.ib(default=False) + run_sync_soon_threadsafe = attr.ib(default=None) + done_callback = attr.ib(default=None) + unrolled_run_gen = attr.ib(default=None) + unrolled_run_next_send = attr.ib(default=None) + guest_tick_scheduled = attr.ib(default=False) + + def guest_tick(self): + # XX no signal handling support at all currently + assert self.is_guest + try: + timeout = self.unrolled_run_gen.send(self.unrolled_run_next_send) + except StopIteration: + GLOBAL_RUN_CONTEXT.__dict__.clear() + self.close() + # XX if we had KI support, we'd have to do something with it here + self.done_callback(self.main_task_outcome) + return + + self.unrolled_run_next_send = None + + # Optimization: do a zero-timeout check for already-pending I/O from + # the main thread + events = self.io_manager.get_events(0) + if events or timeout <= 0: + self.unrolled_run_next_send = events + self.guest_tick_scheduled = True + self.run_sync_soon_threadsafe(self.guest_tick) + else: + self.guest_tick_scheduled = False + + def get_events(): + return self.io_manager.get_events(timeout) + + def deliver(events_outcome): + def in_main_thread(): + self.unrolled_run_next_send = events_outcome.unwrap() + self.guest_tick_scheduled = True + self.guest_tick() + + self.run_sync_soon_threadsafe(in_main_thread) + + # XX temporary placeholder until #1545 is merged + def start_thread_soon(d, fn): + t = threading.Thread(daemon=True, target=lambda: deliver(capture(fn))) + t.start() + + start_thread_soon(deliver, get_events) + + def force_guest_tick_asap(self): + if self.guest_tick_scheduled: + return + self.guest_tick_scheduled = True + self.io_manager.force_wakeup() + def close(self): self.io_manager.close() self.entry_queue.close() @@ -1222,6 +1290,8 @@ def reschedule(self, task, next_send=_NO_SEND): task._next_send = next_send task._abort_func = None task.custom_sleep_data = None + if not self.runq and self.is_guest: + self.force_guest_tick_asap() self.runq.append(task) if self.instruments: self.instrument("task_scheduled", task) @@ -1579,6 +1649,30 @@ def remove_instrument(self, instrument): ################################################################ +def setup_runner(clock, instruments): + """Create a Runner object and install it as the GLOBAL_RUN_CONTEXT.""" + # It wouldn't be *hard* to support nested calls to run(), but I can't + # think of a single good reason for it, so let's be conservative for + # now: + if hasattr(GLOBAL_RUN_CONTEXT, "runner"): + raise RuntimeError("Attempted to call run() from inside a run()") + + if clock is None: + clock = SystemClock() + instruments = list(instruments) + io_manager = TheIOManager() + system_context = copy_context() + system_context.run(current_async_library_cvar.set, "trio") + runner = Runner( + clock=clock, + instruments=instruments, + io_manager=io_manager, + system_context=system_context, + ) + GLOBAL_RUN_CONTEXT.runner = runner + return runner + + def run( async_fn, *args, @@ -1656,28 +1750,7 @@ def run( __tracebackhide__ = True - # Do error-checking up front, before we enter the TrioInternalError - # try/catch - # - # It wouldn't be *hard* to support nested calls to run(), but I can't - # think of a single good reason for it, so let's be conservative for - # now: - if hasattr(GLOBAL_RUN_CONTEXT, "runner"): - raise RuntimeError("Attempted to call run() from inside a run()") - - if clock is None: - clock = SystemClock() - instruments = list(instruments) - io_manager = TheIOManager() - system_context = copy_context() - system_context.run(current_async_library_cvar.set, "trio") - runner = Runner( - clock=clock, - instruments=instruments, - io_manager=io_manager, - system_context=system_context, - ) - GLOBAL_RUN_CONTEXT.runner = runner + runner = setup_runner(clock, instruments) locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True # KI handling goes outside the core try/except/finally to avoid a window @@ -1688,10 +1761,14 @@ def run( try: with closing(runner): with runner.entry_queue.wakeup.wakeup_on_signals(): - # The main reason this is split off into its own - # function is just to get rid of this extra - # indentation. - run_impl(runner, async_fn, args) + gen = unrolled_run(runner, async_fn, args) + next_send = None + while True: + try: + timeout = gen.send(next_send) + except StopIteration: + break + next_send = runner.io_manager.get_events(timeout) except TrioInternalError: raise except BaseException as exc: @@ -1714,12 +1791,29 @@ def run( raise KeyboardInterrupt +def start_guest_run( + async_fn, + *args, + run_sync_soon_threadsafe, + done_callback, + clock=None, + instruments=(), +): + runner = setup_runner(clock, instruments) + runner.is_guest = True + runner.run_sync_soon_threadsafe = run_sync_soon_threadsafe + runner.done_callback = done_callback + runner.unrolled_run_gen = unrolled_run(runner, async_fn, args) + runner.guest_tick_scheduled = True + run_sync_soon_threadsafe(runner.guest_tick) + + # 24 hours is arbitrary, but it avoids issues like people setting timeouts of # 10**20 and then getting integer overflows in the underlying system calls. _MAX_TIMEOUT = 24 * 60 * 60 -def run_impl(runner, async_fn, args): +def unrolled_run(runner, async_fn, args): __tracebackhide__ = True if runner.instruments: @@ -1751,7 +1845,8 @@ def run_impl(runner, async_fn, args): if runner.instruments: runner.instrument("before_io_wait", timeout) - runner.io_manager.handle_io(timeout) + events = yield timeout + runner.io_manager.process_events(events) if runner.instruments: runner.instrument("after_io_wait", timeout) @@ -1767,7 +1862,7 @@ def run_impl(runner, async_fn, args): else: break - if not runner.runq and idle_primed: + if not runner.runq and not events and idle_primed: while runner.waiting_for_idle: key, task = runner.waiting_for_idle.peekitem(0) if key[:2] == (cushion, tiebreaker): diff --git a/trio/lowlevel.py b/trio/lowlevel.py index 3ce3e741ba..b54b3dba52 100644 --- a/trio/lowlevel.py +++ b/trio/lowlevel.py @@ -42,6 +42,7 @@ wait_writable, notify_closing, start_thread_soon, + start_guest_run, ) # Unix-specific symbols From 5b75987eeb3214541483282fc228fcad571bd49b Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Mon, 25 May 2020 03:07:00 -0700 Subject: [PATCH 02/38] fix stupid missing arg in _io_windows.py --- trio/_core/_io_windows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trio/_core/_io_windows.py b/trio/_core/_io_windows.py index bd034c27fb..18b0876ca5 100644 --- a/trio/_core/_io_windows.py +++ b/trio/_core/_io_windows.py @@ -411,7 +411,7 @@ def get_events(self, timeout): return 0 return received[0] - def process_events(received): + def process_events(self, received): for i in range(received): entry = self._events[i] if entry.lpCompletionKey == CKeys.AFD_POLL: From 90180663680b46438ffc34df1fc99228d440052c Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Wed, 27 May 2020 00:33:07 -0700 Subject: [PATCH 03/38] Use thread cache to vroom vroom faster --- trio/_core/_run.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/trio/_core/_run.py b/trio/_core/_run.py index 763bcfa70b..c113e124c1 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -35,6 +35,7 @@ PermanentlyDetachCoroutineObject, WaitTaskRescheduled, ) +from ._thread_cache import start_thread_soon from .. import _core from .._deprecate import deprecated from .._util import Final, NoPublicConstructor, coroutine_or_error @@ -1173,12 +1174,7 @@ def in_main_thread(): self.run_sync_soon_threadsafe(in_main_thread) - # XX temporary placeholder until #1545 is merged - def start_thread_soon(d, fn): - t = threading.Thread(daemon=True, target=lambda: deliver(capture(fn))) - t.start() - - start_thread_soon(deliver, get_events) + start_thread_soon(get_events, deliver) def force_guest_tick_asap(self): if self.guest_tick_scheduled: From 06e7523b432ff504ba30fcb0a0af13ad27e95625 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Wed, 27 May 2020 00:33:22 -0700 Subject: [PATCH 04/38] Add a big comment explaining the trickiest change --- trio/_core/_run.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/trio/_core/_run.py b/trio/_core/_run.py index c113e124c1..d3d229bf3b 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -1858,7 +1858,27 @@ def unrolled_run(runner, async_fn, args): else: break - if not runner.runq and not events and idle_primed: + # idle_primed=True means: if the IO wait hit the timeout, and still + # nothing is happening, then we should start waking up + # wait_all_tasks_blocked tasks. But there are some subtleties in + # defining "nothing is happening". + # + # 'not runner.runq' means that no tasks are currently runnable. 'not + # events' means that the last IO wait call hit its full timeout. These + # are very similar, and if idle_primed=True and we're running in + # regular mode then they always go together. But, in *guest* mode, + # they can happen independently, even when idle_primed=True: + # + # - runner.runq=empty and events=True: the host loop adjusted a + # deadline and that forced an IO wakeup before the timeout expired, + # even though no actual tasks were scheduled. + # + # - runner.runq=nonempty and events=False: the IO wait hit its + # timeout, but then some code in the host thread rescheduled a task + # before we got here. + # + # So we need to check both. + if idle_primed and not runner.runq and not events: while runner.waiting_for_idle: key, task = runner.waiting_for_idle.peekitem(0) if key[:2] == (cushion, tiebreaker): From 513d7440d99c4613be689f14ab87e36725800a4e Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 28 May 2020 12:03:45 -0700 Subject: [PATCH 05/38] reduce indentation --- trio/_core/_run.py | 20 ++++++++++---------- trio/_core/_wakeup_socketpair.py | 29 +++++++++++++++++++---------- 2 files changed, 29 insertions(+), 20 deletions(-) diff --git a/trio/_core/_run.py b/trio/_core/_run.py index d3d229bf3b..77c6c182c3 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -1755,16 +1755,15 @@ def run( try: with ki_manager(runner.deliver_ki, restrict_keyboard_interrupt_to_checkpoints): try: - with closing(runner): - with runner.entry_queue.wakeup.wakeup_on_signals(): - gen = unrolled_run(runner, async_fn, args) - next_send = None - while True: - try: - timeout = gen.send(next_send) - except StopIteration: - break - next_send = runner.io_manager.get_events(timeout) + runner.entry_queue.wakeup.wakeup_on_signals() + gen = unrolled_run(runner, async_fn, args) + next_send = None + while True: + try: + timeout = gen.send(next_send) + except StopIteration: + break + next_send = runner.io_manager.get_events(timeout) except TrioInternalError: raise except BaseException as exc: @@ -1773,6 +1772,7 @@ def run( ) from exc finally: GLOBAL_RUN_CONTEXT.__dict__.clear() + runner.close() # Inlined copy of runner.main_task_outcome.unwrap() to avoid # cluttering every single Trio traceback with an extra frame. if type(runner.main_task_outcome) is Value: diff --git a/trio/_core/_wakeup_socketpair.py b/trio/_core/_wakeup_socketpair.py index 3513cc1ab3..aefdb14ca3 100644 --- a/trio/_core/_wakeup_socketpair.py +++ b/trio/_core/_wakeup_socketpair.py @@ -38,6 +38,7 @@ def __init__(self): self.write_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) except OSError: pass + self.old_wakeup_fd = None def wakeup_thread_and_signal_safe(self): try: @@ -56,21 +57,29 @@ def drain(self): except BlockingIOError: pass - @contextmanager - def wakeup_on_signals(self): - if not is_main_thread(): - yield + def wakeup_on_signals(self, trust_host_loop_to_wake_on_signals=False): + assert self.old_wakeup_fd is None + if not is_main_thread() or trust_host_loop_to_wake_on_signals: return fd = self.write_sock.fileno() if HAVE_WARN_ON_FULL_BUFFER: - old_wakeup_fd = signal.set_wakeup_fd(fd, warn_on_full_buffer=False) + self.old_wakeup_fd = signal.set_wakeup_fd(fd, warn_on_full_buffer=False) else: - old_wakeup_fd = signal.set_wakeup_fd(fd) - try: - yield - finally: - signal.set_wakeup_fd(old_wakeup_fd) + self.old_wakeup_fd = signal.set_wakeup_fd(fd) + if self.old_wakeup_fd != -1: + warnings.warn( + RuntimeWarning( + "It looks like Trio's signal handling code might have " + "collided with another library you're using. If you're " + "running Trio in guest mode, then this might mean you " + "should set trust_host_loop_to_wake_on_signals=True. " + "Otherwise, file a bug on Trio and we'll help you figure " + "out what's going on." + ) + ) def close(self): self.wakeup_sock.close() self.write_sock.close() + if self.old_wakeup_fd is not None: + signal.set_wakeup_fd(self.old_wakeup_fd) From 5631de48d60add125ed969495a2c82bef613a636 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 28 May 2020 12:34:09 -0700 Subject: [PATCH 06/38] guest mode: Add basic signal handling, and send TrioInternalError to done callback --- trio/_core/_run.py | 418 ++++++++++++++++--------------- trio/_core/_wakeup_socketpair.py | 4 +- 2 files changed, 223 insertions(+), 199 deletions(-) diff --git a/trio/_core/_run.py b/trio/_core/_run.py index 77c6c182c3..fbca089ec1 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -1136,31 +1136,30 @@ class Runner: run_sync_soon_threadsafe = attr.ib(default=None) done_callback = attr.ib(default=None) unrolled_run_gen = attr.ib(default=None) - unrolled_run_next_send = attr.ib(default=None) + unrolled_run_next_send = attr.ib(factory=lambda: Value(None)) guest_tick_scheduled = attr.ib(default=False) def guest_tick(self): - # XX no signal handling support at all currently + locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True assert self.is_guest try: - timeout = self.unrolled_run_gen.send(self.unrolled_run_next_send) + timeout = self.unrolled_run_next_send.send(self.unrolled_run_gen) except StopIteration: - GLOBAL_RUN_CONTEXT.__dict__.clear() - self.close() # XX if we had KI support, we'd have to do something with it here self.done_callback(self.main_task_outcome) return - - self.unrolled_run_next_send = None - - # Optimization: do a zero-timeout check for already-pending I/O from - # the main thread - events = self.io_manager.get_events(0) - if events or timeout <= 0: - self.unrolled_run_next_send = events + except TrioInternalError as exc: + self.done_callback(Error(exc)) + + # Optimization: try to skip going into the thread if we can avoid it + events_outcome = capture(self.io_manager.get_events, 0) + if timeout <= 0 or type(events_outcome) is Error or events_outcome.value: + # No need to go into the thread + self.unrolled_run_next_send = events_outcome self.guest_tick_scheduled = True self.run_sync_soon_threadsafe(self.guest_tick) else: + # Need to go into the thread and call get_events() there self.guest_tick_scheduled = False def get_events(): @@ -1168,7 +1167,7 @@ def get_events(): def deliver(events_outcome): def in_main_thread(): - self.unrolled_run_next_send = events_outcome.unwrap() + self.unrolled_run_next_send = events_outcome self.guest_tick_scheduled = True self.guest_tick() @@ -1744,35 +1743,24 @@ def run( """ + locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True __tracebackhide__ = True runner = setup_runner(clock, instruments) - locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True - # KI handling goes outside the core try/except/finally to avoid a window - # where KeyboardInterrupt would be allowed and converted into an + # KI handling goes outside unrolled_run to avoid an interval where + # KeyboardInterrupt would be allowed and converted into an # TrioInternalError: try: with ki_manager(runner.deliver_ki, restrict_keyboard_interrupt_to_checkpoints): - try: - runner.entry_queue.wakeup.wakeup_on_signals() - gen = unrolled_run(runner, async_fn, args) - next_send = None - while True: - try: - timeout = gen.send(next_send) - except StopIteration: - break - next_send = runner.io_manager.get_events(timeout) - except TrioInternalError: - raise - except BaseException as exc: - raise TrioInternalError( - "internal error in Trio - please file a bug!" - ) from exc - finally: - GLOBAL_RUN_CONTEXT.__dict__.clear() - runner.close() + gen = unrolled_run(runner, async_fn, args) + next_send = None + while True: + try: + timeout = gen.send(next_send) + except StopIteration: + break + next_send = runner.io_manager.get_events(timeout) # Inlined copy of runner.main_task_outcome.unwrap() to avoid # cluttering every single Trio traceback with an extra frame. if type(runner.main_task_outcome) is Value: @@ -1792,6 +1780,7 @@ def start_guest_run( *args, run_sync_soon_threadsafe, done_callback, + trust_host_loop_to_wake_on_signals=False, clock=None, instruments=(), ): @@ -1799,7 +1788,12 @@ def start_guest_run( runner.is_guest = True runner.run_sync_soon_threadsafe = run_sync_soon_threadsafe runner.done_callback = done_callback - runner.unrolled_run_gen = unrolled_run(runner, async_fn, args) + runner.unrolled_run_gen = unrolled_run( + runner, + async_fn, + args, + trust_host_loop_to_wake_on_signals=trust_host_loop_to_wake_on_signals, + ) runner.guest_tick_scheduled = True run_sync_soon_threadsafe(runner.guest_tick) @@ -1809,178 +1803,208 @@ def start_guest_run( _MAX_TIMEOUT = 24 * 60 * 60 -def unrolled_run(runner, async_fn, args): +# Weird quirk: this is written as a generator in order to support "guest +# mode", where our core event loop gets unrolled into a series of callbacks on +# the host loop. If you're doing a regular trio.run then this gets run +# straight through. +def unrolled_run(runner, async_fn, args, trust_host_loop_to_wake_on_signals=False): + locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True __tracebackhide__ = True - if runner.instruments: - runner.instrument("before_run") - runner.clock.start_clock() - runner.init_task = runner.spawn_impl( - runner.init, (async_fn, args), None, "", system_task=True, - ) - - # You know how people talk about "event loops"? This 'while' loop right - # here is our event loop: - while runner.tasks: - if runner.runq: - timeout = 0 - elif runner.deadlines: - deadline, _ = runner.deadlines.keys()[0] - timeout = runner.clock.deadline_to_sleep_time(deadline) - else: - timeout = _MAX_TIMEOUT - timeout = min(max(0, timeout), _MAX_TIMEOUT) - - idle_primed = False - if runner.waiting_for_idle: - cushion, tiebreaker, _ = runner.waiting_for_idle.keys()[0] - if cushion < timeout: - timeout = cushion - idle_primed = True + try: + if not trust_host_loop_to_wake_on_signals: + runner.entry_queue.wakeup.wakeup_on_signals() if runner.instruments: - runner.instrument("before_io_wait", timeout) - - events = yield timeout - runner.io_manager.process_events(events) + runner.instrument("before_run") + runner.clock.start_clock() + runner.init_task = runner.spawn_impl( + runner.init, (async_fn, args), None, "", system_task=True, + ) - if runner.instruments: - runner.instrument("after_io_wait", timeout) - - # Process cancellations due to deadline expiry - now = runner.clock.current_time() - while runner.deadlines: - (deadline, _), cancel_scope = runner.deadlines.peekitem(0) - if deadline <= now: - # This removes the given scope from runner.deadlines: - cancel_scope.cancel() - idle_primed = False + # You know how people talk about "event loops"? This 'while' loop right + # here is our event loop: + while runner.tasks: + if runner.runq: + timeout = 0 + elif runner.deadlines: + deadline, _ = runner.deadlines.keys()[0] + timeout = runner.clock.deadline_to_sleep_time(deadline) else: - break - - # idle_primed=True means: if the IO wait hit the timeout, and still - # nothing is happening, then we should start waking up - # wait_all_tasks_blocked tasks. But there are some subtleties in - # defining "nothing is happening". - # - # 'not runner.runq' means that no tasks are currently runnable. 'not - # events' means that the last IO wait call hit its full timeout. These - # are very similar, and if idle_primed=True and we're running in - # regular mode then they always go together. But, in *guest* mode, - # they can happen independently, even when idle_primed=True: - # - # - runner.runq=empty and events=True: the host loop adjusted a - # deadline and that forced an IO wakeup before the timeout expired, - # even though no actual tasks were scheduled. - # - # - runner.runq=nonempty and events=False: the IO wait hit its - # timeout, but then some code in the host thread rescheduled a task - # before we got here. - # - # So we need to check both. - if idle_primed and not runner.runq and not events: - while runner.waiting_for_idle: - key, task = runner.waiting_for_idle.peekitem(0) - if key[:2] == (cushion, tiebreaker): - del runner.waiting_for_idle[key] - runner.reschedule(task) - else: - break + timeout = _MAX_TIMEOUT + timeout = min(max(0, timeout), _MAX_TIMEOUT) - # Process all runnable tasks, but only the ones that are already - # runnable now. Anything that becomes runnable during this cycle needs - # to wait until the next pass. This avoids various starvation issues - # by ensuring that there's never an unbounded delay between successive - # checks for I/O. - # - # Also, we randomize the order of each batch to avoid assumptions - # about scheduling order sneaking in. In the long run, I suspect we'll - # either (a) use strict FIFO ordering and document that for - # predictability/determinism, or (b) implement a more sophisticated - # scheduler (e.g. some variant of fair queueing), for better behavior - # under load. For now, this is the worst of both worlds - but it keeps - # our options open. (If we do decide to go all in on deterministic - # scheduling, then there are other things that will probably need to - # change too, like the deadlines tie-breaker and the non-deterministic - # ordering of task._notify_queues.) - batch = list(runner.runq) - if _ALLOW_DETERMINISTIC_SCHEDULING: - # We're running under Hypothesis, and pytest-trio has patched this - # in to make the scheduler deterministic and avoid flaky tests. - # It's not worth the (small) performance cost in normal operation, - # since we'll shuffle the list and _r is only seeded for tests. - batch.sort(key=lambda t: t._counter) - runner.runq.clear() - _r.shuffle(batch) - while batch: - task = batch.pop() - GLOBAL_RUN_CONTEXT.task = task + idle_primed = False + if runner.waiting_for_idle: + cushion, tiebreaker, _ = runner.waiting_for_idle.keys()[0] + if cushion < timeout: + timeout = cushion + idle_primed = True if runner.instruments: - runner.instrument("before_task_step", task) + runner.instrument("before_io_wait", timeout) - next_send_fn = task._next_send_fn - next_send = task._next_send - task._next_send_fn = task._next_send = None - final_outcome = None - try: - # We used to unwrap the Outcome object here and send/throw its - # contents in directly, but it turns out that .throw() is - # buggy, at least on CPython 3.6: - # https://bugs.python.org/issue29587 - # https://bugs.python.org/issue29590 - # So now we send in the Outcome object and unwrap it on the - # other side. - msg = task.context.run(next_send_fn, next_send) - except StopIteration as stop_iteration: - final_outcome = Value(stop_iteration.value) - except BaseException as task_exc: - # Store for later, removing uninteresting top frames: 1 frame - # we always remove, because it's this function catching it, - # and then in addition we remove however many more Context.run - # adds. - tb = task_exc.__traceback__.tb_next - for _ in range(CONTEXT_RUN_TB_FRAMES): - tb = tb.tb_next - final_outcome = Error(task_exc.with_traceback(tb)) - - if final_outcome is not None: - # We can't call this directly inside the except: blocks above, - # because then the exceptions end up attaching themselves to - # other exceptions as __context__ in unwanted ways. - runner.task_exited(task, final_outcome) - else: - task._schedule_points += 1 - if msg is CancelShieldedCheckpoint: - runner.reschedule(task) - elif type(msg) is WaitTaskRescheduled: - task._cancel_points += 1 - task._abort_func = msg.abort_func - # KI is "outside" all cancel scopes, so check for it - # before checking for regular cancellation: - if runner.ki_pending and task is runner.main_task: - task._attempt_delivery_of_pending_ki() - task._attempt_delivery_of_any_pending_cancel() - elif type(msg) is PermanentlyDetachCoroutineObject: - # Pretend the task just exited with the given outcome - runner.task_exited(task, msg.final_outcome) - else: - exc = TypeError( - "trio.run received unrecognized yield message {!r}. " - "Are you trying to use a library written for some " - "other framework like asyncio? That won't work " - "without some kind of compatibility shim.".format(msg) - ) - # The foreign library probably doesn't adhere to our - # protocol of unwrapping whatever outcome gets sent in. - # Instead, we'll arrange to throw `exc` in directly, - # which works for at least asyncio and curio. - runner.reschedule(task, exc) - task._next_send_fn = task.coro.throw + # Driver will call io_manager.get_events(timeout) and pass it back + # in throuh the yield + events = yield timeout + runner.io_manager.process_events(events) if runner.instruments: - runner.instrument("after_task_step", task) - del GLOBAL_RUN_CONTEXT.task + runner.instrument("after_io_wait", timeout) + + # Process cancellations due to deadline expiry + now = runner.clock.current_time() + while runner.deadlines: + (deadline, _), cancel_scope = runner.deadlines.peekitem(0) + if deadline <= now: + # This removes the given scope from runner.deadlines: + cancel_scope.cancel() + idle_primed = False + else: + break + + # idle_primed=True means: if the IO wait hit the timeout, and still + # nothing is happening, then we should start waking up + # wait_all_tasks_blocked tasks. But there are some subtleties in + # defining "nothing is happening". + # + # 'not runner.runq' means that no tasks are currently runnable. + # 'not events' means that the last IO wait call hit its full + # timeout. These are very similar, and if idle_primed=True and + # we're running in regular mode then they always go together. But, + # in *guest* mode, they can happen independently, even when + # idle_primed=True: + # + # - runner.runq=empty and events=True: the host loop adjusted a + # deadline and that forced an IO wakeup before the timeout expired, + # even though no actual tasks were scheduled. + # + # - runner.runq=nonempty and events=False: the IO wait hit its + # timeout, but then some code in the host thread rescheduled a task + # before we got here. + # + # So we need to check both. + if idle_primed and not runner.runq and not events: + while runner.waiting_for_idle: + key, task = runner.waiting_for_idle.peekitem(0) + if key[:2] == (cushion, tiebreaker): + del runner.waiting_for_idle[key] + runner.reschedule(task) + else: + break + + # Process all runnable tasks, but only the ones that are already + # runnable now. Anything that becomes runnable during this cycle + # needs to wait until the next pass. This avoids various + # starvation issues by ensuring that there's never an unbounded + # delay between successive checks for I/O. + # + # Also, we randomize the order of each batch to avoid assumptions + # about scheduling order sneaking in. In the long run, I suspect + # we'll either (a) use strict FIFO ordering and document that for + # predictability/determinism, or (b) implement a more + # sophisticated scheduler (e.g. some variant of fair queueing), + # for better behavior under load. For now, this is the worst of + # both worlds - but it keeps our options open. (If we do decide to + # go all in on deterministic scheduling, then there are other + # things that will probably need to change too, like the deadlines + # tie-breaker and the non-deterministic ordering of + # task._notify_queues.) + batch = list(runner.runq) + if _ALLOW_DETERMINISTIC_SCHEDULING: + # We're running under Hypothesis, and pytest-trio has patched + # this in to make the scheduler deterministic and avoid flaky + # tests. It's not worth the (small) performance cost in normal + # operation, since we'll shuffle the list and _r is only + # seeded for tests. + batch.sort(key=lambda t: t._counter) + runner.runq.clear() + _r.shuffle(batch) + while batch: + task = batch.pop() + GLOBAL_RUN_CONTEXT.task = task + + if runner.instruments: + runner.instrument("before_task_step", task) + + next_send_fn = task._next_send_fn + next_send = task._next_send + task._next_send_fn = task._next_send = None + final_outcome = None + try: + # We used to unwrap the Outcome object here and send/throw + # its contents in directly, but it turns out that .throw() + # is buggy, at least on CPython 3.6: + # https://bugs.python.org/issue29587 + # https://bugs.python.org/issue29590 + # So now we send in the Outcome object and unwrap it on the + # other side. + msg = task.context.run(next_send_fn, next_send) + except StopIteration as stop_iteration: + final_outcome = Value(stop_iteration.value) + except BaseException as task_exc: + # Store for later, removing uninteresting top frames: 1 + # frame we always remove, because it's this function + # catching it, and then in addition we remove however many + # more Context.run adds. + tb = task_exc.__traceback__.tb_next + for _ in range(CONTEXT_RUN_TB_FRAMES): + tb = tb.tb_next + final_outcome = Error(task_exc.with_traceback(tb)) + + if final_outcome is not None: + # We can't call this directly inside the except: blocks + # above, because then the exceptions end up attaching + # themselves to other exceptions as __context__ in + # unwanted ways. + runner.task_exited(task, final_outcome) + else: + task._schedule_points += 1 + if msg is CancelShieldedCheckpoint: + runner.reschedule(task) + elif type(msg) is WaitTaskRescheduled: + task._cancel_points += 1 + task._abort_func = msg.abort_func + # KI is "outside" all cancel scopes, so check for it + # before checking for regular cancellation: + if runner.ki_pending and task is runner.main_task: + task._attempt_delivery_of_pending_ki() + task._attempt_delivery_of_any_pending_cancel() + elif type(msg) is PermanentlyDetachCoroutineObject: + # Pretend the task just exited with the given outcome + runner.task_exited(task, msg.final_outcome) + else: + exc = TypeError( + "trio.run received unrecognized yield message {!r}. " + "Are you trying to use a library written for some " + "other framework like asyncio? That won't work " + "without some kind of compatibility shim.".format(msg) + ) + # The foreign library probably doesn't adhere to our + # protocol of unwrapping whatever outcome gets sent in. + # Instead, we'll arrange to throw `exc` in directly, + # which works for at least asyncio and curio. + runner.reschedule(task, exc) + task._next_send_fn = task.coro.throw + + if runner.instruments: + runner.instrument("after_task_step", task) + del GLOBAL_RUN_CONTEXT.task + + except GeneratorExit: + warnings.warn( + RuntimeWarning( + "Trio guest run got abandoned without properly finishing... " + "weird stuff might happen" + ) + ) + except TrioInternalError: + raise + except BaseException as exc: + raise TrioInternalError("internal error in Trio - please file a bug!") from exc + finally: + GLOBAL_RUN_CONTEXT.__dict__.clear() + runner.close() ################################################################ diff --git a/trio/_core/_wakeup_socketpair.py b/trio/_core/_wakeup_socketpair.py index aefdb14ca3..7d0694f9c7 100644 --- a/trio/_core/_wakeup_socketpair.py +++ b/trio/_core/_wakeup_socketpair.py @@ -57,9 +57,9 @@ def drain(self): except BlockingIOError: pass - def wakeup_on_signals(self, trust_host_loop_to_wake_on_signals=False): + def wakeup_on_signals(self): assert self.old_wakeup_fd is None - if not is_main_thread() or trust_host_loop_to_wake_on_signals: + if not is_main_thread(): return fd = self.write_sock.fileno() if HAVE_WARN_ON_FULL_BUFFER: From b1319582b8e0d1ef9a89ec9b8559f5def61e08ce Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 28 May 2020 13:10:52 -0700 Subject: [PATCH 07/38] Add a basic test of guest mode --- trio/_core/tests/test_guest_mode.py | 52 +++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 trio/_core/tests/test_guest_mode.py diff --git a/trio/_core/tests/test_guest_mode.py b/trio/_core/tests/test_guest_mode.py new file mode 100644 index 0000000000..5485ca8be2 --- /dev/null +++ b/trio/_core/tests/test_guest_mode.py @@ -0,0 +1,52 @@ +import pytest +import asyncio + +import trio + + +def test_guest_mode_basic(): + async def aio_main(): + loop = asyncio.get_running_loop() + + trio_done_fut = asyncio.Future() + + def trio_done_callback(main_outcome): + print(f"trio_main finished: {main_outcome!r}") + trio_done_fut.set_result(main_outcome) + + trio.lowlevel.start_guest_run( + trio_main, + run_sync_soon_threadsafe=loop.call_soon_threadsafe, + done_callback=trio_done_callback, + ) + + return (await trio_done_fut).unwrap() + + async def trio_main(): + print("trio_main!") + + to_trio, from_aio = trio.open_memory_channel(float("inf")) + from_trio = asyncio.Queue() + + aio_task = asyncio.create_task(aio_pingpong(from_trio, to_trio)) + + from_trio.put_nowait(0) + + async for n in from_aio: + print(f"trio got: {n}") + from_trio.put_nowait(n + 1) + if n >= 10: + aio_task.cancel() + return "trio-main-done" + + async def aio_pingpong(from_trio, to_trio): + print("aio_pingpong!") + + while True: + n = await from_trio.get() + print(f"aio got: {n}") + to_trio.send_nowait(n + 1) + + loop = asyncio.new_event_loop() + assert loop.run_until_complete(aio_main()) == "trio-main-done" + loop.close() From 91e3554a782c9c831fa9857cb6c4bf75f6659c41 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 28 May 2020 13:21:39 -0700 Subject: [PATCH 08/38] Try to adapt to py36 asyncio limitations --- trio/_core/tests/test_guest_mode.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/trio/_core/tests/test_guest_mode.py b/trio/_core/tests/test_guest_mode.py index 5485ca8be2..8e3cbfbc8c 100644 --- a/trio/_core/tests/test_guest_mode.py +++ b/trio/_core/tests/test_guest_mode.py @@ -5,9 +5,9 @@ def test_guest_mode_basic(): - async def aio_main(): - loop = asyncio.get_running_loop() + loop = asyncio.new_event_loop() + async def aio_main(): trio_done_fut = asyncio.Future() def trio_done_callback(main_outcome): @@ -47,6 +47,5 @@ async def aio_pingpong(from_trio, to_trio): print(f"aio got: {n}") to_trio.send_nowait(n + 1) - loop = asyncio.new_event_loop() assert loop.run_until_complete(aio_main()) == "trio-main-done" loop.close() From 4340d3a0983658b80c009448a32354cf8a058a4a Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 28 May 2020 13:30:04 -0700 Subject: [PATCH 09/38] Add missing return --- trio/_core/_run.py | 1 + 1 file changed, 1 insertion(+) diff --git a/trio/_core/_run.py b/trio/_core/_run.py index fbca089ec1..27362c0caf 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -1150,6 +1150,7 @@ def guest_tick(self): return except TrioInternalError as exc: self.done_callback(Error(exc)) + return # Optimization: try to skip going into the thread if we can avoid it events_outcome = capture(self.io_manager.get_events, 0) From fd81370369fa190c9966867da2c3ccf1554ccb33 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 28 May 2020 13:33:43 -0700 Subject: [PATCH 10/38] 3.6 asyncio is super annoying --- trio/_core/tests/test_guest_mode.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trio/_core/tests/test_guest_mode.py b/trio/_core/tests/test_guest_mode.py index 8e3cbfbc8c..6925868b9f 100644 --- a/trio/_core/tests/test_guest_mode.py +++ b/trio/_core/tests/test_guest_mode.py @@ -28,7 +28,7 @@ async def trio_main(): to_trio, from_aio = trio.open_memory_channel(float("inf")) from_trio = asyncio.Queue() - aio_task = asyncio.create_task(aio_pingpong(from_trio, to_trio)) + aio_task = asyncio.ensure_future(aio_pingpong(from_trio, to_trio)) from_trio.put_nowait(0) From 1469d446c80106b7fc3492b483d102a4e81706ba Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 28 May 2020 13:52:16 -0700 Subject: [PATCH 11/38] missing import --- trio/_core/_wakeup_socketpair.py | 1 + 1 file changed, 1 insertion(+) diff --git a/trio/_core/_wakeup_socketpair.py b/trio/_core/_wakeup_socketpair.py index 7d0694f9c7..392a2c0b2e 100644 --- a/trio/_core/_wakeup_socketpair.py +++ b/trio/_core/_wakeup_socketpair.py @@ -2,6 +2,7 @@ import sys from contextlib import contextmanager import signal +import warnings from .. import _core from .._util import is_main_thread From cc4d144092fd4c8167d98dcc79a3fbc4d725b47a Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 28 May 2020 13:52:25 -0700 Subject: [PATCH 12/38] avoid some set_wakeup_fd warnings --- trio/_core/tests/test_guest_mode.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/trio/_core/tests/test_guest_mode.py b/trio/_core/tests/test_guest_mode.py index 6925868b9f..16d76db819 100644 --- a/trio/_core/tests/test_guest_mode.py +++ b/trio/_core/tests/test_guest_mode.py @@ -18,6 +18,9 @@ def trio_done_callback(main_outcome): trio_main, run_sync_soon_threadsafe=loop.call_soon_threadsafe, done_callback=trio_done_callback, + # Not all versions of asyncio we test on can actually be trusted, + # but this test doesn't care about signal handling. + trust_host_loop_to_wake_on_signals=True, ) return (await trio_done_fut).unwrap() From b7a21f2a5c367f8c190586c403dfc88b43b48147 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 28 May 2020 14:07:36 -0700 Subject: [PATCH 13/38] Add missing method that was causing guest mode test to hang on macOS --- trio/_core/_io_kqueue.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/trio/_core/_io_kqueue.py b/trio/_core/_io_kqueue.py index 3eee7d4988..5b3d6f6592 100644 --- a/trio/_core/_io_kqueue.py +++ b/trio/_core/_io_kqueue.py @@ -43,6 +43,9 @@ def statistics(self): def close(self): self._kqueue.close() + def force_wakeup(self): + self._force_wakeup.wakeup_thread_and_signal_safe() + def get_events(self, timeout): # max_events must be > 0 or kqueue gets cranky # and we generally want this to be strictly larger than the actual From 50322d90958cfec994e8e903c56abc2e0969e24e Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 28 May 2020 15:48:25 -0700 Subject: [PATCH 14/38] Get better debug info from test_guest_mode_basic --- trio/_core/tests/test_guest_mode.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/trio/_core/tests/test_guest_mode.py b/trio/_core/tests/test_guest_mode.py index 16d76db819..83bdda6dd0 100644 --- a/trio/_core/tests/test_guest_mode.py +++ b/trio/_core/tests/test_guest_mode.py @@ -1,5 +1,6 @@ import pytest import asyncio +import traceback import trio @@ -45,10 +46,16 @@ async def trio_main(): async def aio_pingpong(from_trio, to_trio): print("aio_pingpong!") - while True: - n = await from_trio.get() - print(f"aio got: {n}") - to_trio.send_nowait(n + 1) + try: + while True: + n = await from_trio.get() + print(f"aio got: {n}") + to_trio.send_nowait(n + 1) + except asyncio.CancelledError: + raise + except: + traceback.print_exc() + raise assert loop.run_until_complete(aio_main()) == "trio-main-done" loop.close() From 5b7f4ddeba6453e5994c49c2dcc55e2cea0cbf35 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 28 May 2020 15:48:40 -0700 Subject: [PATCH 15/38] cffi does not implicitly coerce 0 to NULL --- trio/_core/_io_windows.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/trio/_core/_io_windows.py b/trio/_core/_io_windows.py index 18b0876ca5..35fc15e02e 100644 --- a/trio/_core/_io_windows.py +++ b/trio/_core/_io_windows.py @@ -391,7 +391,9 @@ def statistics(self): def force_wakeup(self): _check( - kernel32.PostQueuedCompletionStatus(self._iocp, 0, CKeys.FORCE_WAKEUP, 0) + kernel32.PostQueuedCompletionStatus( + self._iocp, 0, CKeys.FORCE_WAKEUP, ffi.NULL + ) ) def get_events(self, timeout): From 3ec60fc36c3493e512a4f48ae4ce19db9e17f8aa Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 28 May 2020 17:41:50 -0700 Subject: [PATCH 16/38] add some pragma: no covers --- trio/_core/_io_epoll.py | 2 +- trio/_core/_io_kqueue.py | 2 +- trio/_core/tests/test_guest_mode.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/trio/_core/_io_epoll.py b/trio/_core/_io_epoll.py index da331b35e8..6ff6467fbb 100644 --- a/trio/_core/_io_epoll.py +++ b/trio/_core/_io_epoll.py @@ -228,7 +228,7 @@ def process_events(self, events): if fd == self._force_wakeup.wakeup_sock.fileno(): self._force_wakeup.drain() continue - else: + else: # pragma: no cover raise # EPOLLONESHOT always clears the flags when an event is delivered waiters.current_flags = 0 diff --git a/trio/_core/_io_kqueue.py b/trio/_core/_io_kqueue.py index 5b3d6f6592..ce85453749 100644 --- a/trio/_core/_io_kqueue.py +++ b/trio/_core/_io_kqueue.py @@ -72,7 +72,7 @@ def process_events(self, events): if event.ident == self._force_wakeup.wakeup_sock.fileno(): self._force_wakeup.drain() continue - else: + else: # pragma: no cover raise if event.flags & select.KQ_EV_ONESHOT: del self._registered[key] diff --git a/trio/_core/tests/test_guest_mode.py b/trio/_core/tests/test_guest_mode.py index 83bdda6dd0..d7d8a6aa8f 100644 --- a/trio/_core/tests/test_guest_mode.py +++ b/trio/_core/tests/test_guest_mode.py @@ -53,7 +53,7 @@ async def aio_pingpong(from_trio, to_trio): to_trio.send_nowait(n + 1) except asyncio.CancelledError: raise - except: + except: # pragma: no cover traceback.print_exc() raise From 09783ec3d4ec16cac5e21003d4554f35a87aa845 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 28 May 2020 17:42:54 -0700 Subject: [PATCH 17/38] add missing import --- trio/_core/_run.py | 1 + 1 file changed, 1 insertion(+) diff --git a/trio/_core/_run.py b/trio/_core/_run.py index 27362c0caf..327e588430 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -9,6 +9,7 @@ from collections import deque import collections.abc from contextlib import contextmanager, closing +import warnings from contextvars import copy_context from math import inf From f023eeac10686bd785f80ad5a8ff3854761ead16 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 28 May 2020 20:36:08 -0700 Subject: [PATCH 18/38] TESTS --- trio/_core/_run.py | 145 +++++++------ trio/_core/tests/test_guest_mode.py | 316 +++++++++++++++++++++++++++- 2 files changed, 393 insertions(+), 68 deletions(-) diff --git a/trio/_core/_run.py b/trio/_core/_run.py index 327e588430..3972ba434a 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -510,19 +510,13 @@ def _might_change_registered_deadline(self): self._registered_deadline = new runner = GLOBAL_RUN_CONTEXT.runner if runner.is_guest: - try: - (old_next_deadline, _), _ = runner.deadlines.peekitem(0) - except IndexError: - old_next_deadline = None + old_next_deadline = runner.next_deadline() if old != inf: del runner.deadlines[old, id(self)] if new != inf: runner.deadlines[new, id(self)] = self if runner.is_guest: - try: - (new_next_deadline, _), _ = runner.deadlines.peekitem(0) - except IndexError: - new_next_deadline = None + new_next_deadline = runner.next_deadline() if old_next_deadline != new_next_deadline: runner.force_guest_tick_asap() @@ -1106,77 +1100,100 @@ class _RunStatistics: run_sync_soon_queue_size = attr.ib() -@attr.s(eq=False, hash=False) -class Runner: - clock = attr.ib() - instruments = attr.ib() - io_manager = attr.ib() - - # Run-local values, see _local.py - _locals = attr.ib(factory=dict) - - runq = attr.ib(factory=deque) - tasks = attr.ib(factory=set) - - # {(deadline, id(CancelScope)): CancelScope} - # only contains scopes with non-infinite deadlines that are currently - # attached to at least one task - deadlines = attr.ib(factory=SortedDict) - - init_task = attr.ib(default=None) - system_nursery = attr.ib(default=None) - system_context = attr.ib(default=None) - main_task = attr.ib(default=None) - main_task_outcome = attr.ib(default=None) - - entry_queue = attr.ib(factory=EntryQueue) - trio_token = attr.ib(default=None) - - # Guest mode stuff - is_guest = attr.ib(default=False) - run_sync_soon_threadsafe = attr.ib(default=None) - done_callback = attr.ib(default=None) - unrolled_run_gen = attr.ib(default=None) +# This holds all the state that gets trampolined back and forth between +# callbacks when we're running in guest mode. +# +# It has to be a separate object from Runner, and Runner *cannot* have hold +# references to it (directly or indirectly)! +# +# The idea is that we want a chance to detect if our host loop quits and stops +# driving us forward. We detect that by unrolled_run_gen being garbage +# collected, and hitting its 'except GeneratorExit:' block. So this only +# happens if unrolled_run_gen is GCed. +# +# The Runner state is referenced from the global GLOBAL_RUN_CONTEXT. The only +# way it gets *un*referenced is by unrolled_run_gen completing, e.g. by being +# GCed. But if Runner has a direct or indirect reference to it, and the host +# loop has abandoned it, then this will never happen! +# +# So this object can reference Runner, but Runner can't reference it. The only +# references to it are the "in flight" callback chain on the host loop / +# worker thread. +@attr.s(eq=False, hash=False, slots=True) +class GuestState: + runner = attr.ib() + run_sync_soon_threadsafe = attr.ib() + done_callback = attr.ib() + unrolled_run_gen = attr.ib() unrolled_run_next_send = attr.ib(factory=lambda: Value(None)) - guest_tick_scheduled = attr.ib(default=False) def guest_tick(self): locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True - assert self.is_guest try: timeout = self.unrolled_run_next_send.send(self.unrolled_run_gen) except StopIteration: # XX if we had KI support, we'd have to do something with it here - self.done_callback(self.main_task_outcome) + self.done_callback(self.runner.main_task_outcome) return except TrioInternalError as exc: self.done_callback(Error(exc)) return # Optimization: try to skip going into the thread if we can avoid it - events_outcome = capture(self.io_manager.get_events, 0) + events_outcome = capture(self.runner.io_manager.get_events, 0) if timeout <= 0 or type(events_outcome) is Error or events_outcome.value: # No need to go into the thread self.unrolled_run_next_send = events_outcome - self.guest_tick_scheduled = True + self.runner.guest_tick_scheduled = True self.run_sync_soon_threadsafe(self.guest_tick) else: # Need to go into the thread and call get_events() there - self.guest_tick_scheduled = False + self.runner.guest_tick_scheduled = False def get_events(): - return self.io_manager.get_events(timeout) + return self.runner.io_manager.get_events(timeout) def deliver(events_outcome): def in_main_thread(): self.unrolled_run_next_send = events_outcome - self.guest_tick_scheduled = True + self.runner.guest_tick_scheduled = True self.guest_tick() self.run_sync_soon_threadsafe(in_main_thread) start_thread_soon(get_events, deliver) + +@attr.s(eq=False, hash=False, slots=True) +class Runner: + clock = attr.ib() + instruments = attr.ib() + io_manager = attr.ib() + + # Run-local values, see _local.py + _locals = attr.ib(factory=dict) + + runq = attr.ib(factory=deque) + tasks = attr.ib(factory=set) + + # {(deadline, id(CancelScope)): CancelScope} + # only contains scopes with non-infinite deadlines that are currently + # attached to at least one task + deadlines = attr.ib(factory=SortedDict) + + init_task = attr.ib(default=None) + system_nursery = attr.ib(default=None) + system_context = attr.ib(default=None) + main_task = attr.ib(default=None) + main_task_outcome = attr.ib(default=None) + + entry_queue = attr.ib(factory=EntryQueue) + trio_token = attr.ib(default=None) + + # Guest mode stuff + is_guest = attr.ib(default=False) + guest_tick_scheduled = attr.ib(default=False) + def force_guest_tick_asap(self): if self.guest_tick_scheduled: return @@ -1189,6 +1206,14 @@ def close(self): if self.instruments: self.instrument("after_run") + def next_deadline(self): + try: + (next_deadline, _), _ = self.deadlines.peekitem(0) + except IndexError: + return inf + else: + return next_deadline + @_public def current_statistics(self): """Returns an object containing run-loop-level debugging information. @@ -1213,11 +1238,7 @@ def current_statistics(self): other attributes vary between backends. """ - if self.deadlines: - next_deadline, _ = self.deadlines.keys()[0] - seconds_to_next_deadline = next_deadline - self.current_time() - else: - seconds_to_next_deadline = float("inf") + seconds_to_next_deadline = self.next_deadline() - self.current_time() return _RunStatistics( tasks_living=len(self.tasks), tasks_runnable=len(self.runq), @@ -1788,16 +1809,20 @@ def start_guest_run( ): runner = setup_runner(clock, instruments) runner.is_guest = True - runner.run_sync_soon_threadsafe = run_sync_soon_threadsafe - runner.done_callback = done_callback - runner.unrolled_run_gen = unrolled_run( + runner.guest_tick_scheduled = True + + guest_state = GuestState( runner, - async_fn, - args, - trust_host_loop_to_wake_on_signals=trust_host_loop_to_wake_on_signals, + run_sync_soon_threadsafe, + done_callback, + unrolled_run( + runner, + async_fn, + args, + trust_host_loop_to_wake_on_signals=trust_host_loop_to_wake_on_signals, + ), ) - runner.guest_tick_scheduled = True - run_sync_soon_threadsafe(runner.guest_tick) + run_sync_soon_threadsafe(guest_state.guest_tick) # 24 hours is arbitrary, but it avoids issues like people setting timeouts of diff --git a/trio/_core/tests/test_guest_mode.py b/trio/_core/tests/test_guest_mode.py index d7d8a6aa8f..ab1fd9530a 100644 --- a/trio/_core/tests/test_guest_mode.py +++ b/trio/_core/tests/test_guest_mode.py @@ -1,31 +1,323 @@ import pytest import asyncio import traceback +import queue +from functools import partial +from math import inf +import signal +import socket import trio +import trio.testing +from .tutil import gc_collect_harder +# The simplest possible "host" loop. +# Nice features: +# - we can run code "outside" of trio using the schedule function passed to +# our main +# - final result is returned +# - any unhandled exceptions cause an immediate crash +def trivial_guest_run(trio_fn, **start_guest_run_kwargs): + todo = queue.Queue() -def test_guest_mode_basic(): + def run_sync_soon_threadsafe(fn): + todo.put(("run", fn)) + + def done_callback(outcome): + todo.put(("unwrap", outcome)) + + trio.lowlevel.start_guest_run( + trio_fn, + run_sync_soon_threadsafe, + run_sync_soon_threadsafe=run_sync_soon_threadsafe, + done_callback=done_callback, + **start_guest_run_kwargs, + ) + + try: + while True: + op, obj = todo.get() + if op == "run": + obj() + elif op == "unwrap": + return obj.unwrap() + else: # pragma: no cover + assert False + finally: + # Make sure that exceptions raised here don't capture these, so that + # if an exception does cause us to abandon a run then the Trio state + # has a chance to be GC'ed and warn about it. + del todo, run_sync_soon_threadsafe, done_callback + + +def test_guest_trivial(): + async def trio_return(in_host): + await trio.sleep(0) + return "ok" + + assert trivial_guest_run(trio_return) == "ok" + + async def trio_fail(in_host): + raise KeyError("whoopsiedaisy") + + with pytest.raises(KeyError, match="whoopsiedaisy"): + trivial_guest_run(trio_fail) + + +def test_guest_can_do_io(): + async def trio_main(in_host): + record = [] + a, b = trio.socket.socketpair() + with a, b: + async with trio.open_nursery() as nursery: + + async def do_receive(): + record.append(await a.recv(1)) + + nursery.start_soon(do_receive) + await trio.testing.wait_all_tasks_blocked() + + await b.send(b"x") + + assert record == [b"x"] + + trivial_guest_run(trio_main) + + +def test_host_can_directly_wake_trio_task(): + async def trio_main(in_host): + ev = trio.Event() + in_host(ev.set) + await ev.wait() + return "ok" + + assert trivial_guest_run(trio_main) == "ok" + + +def test_host_altering_deadlines_wakes_trio_up(): + def set_deadline(cscope, new_deadline): + cscope.deadline = new_deadline + + async def trio_main(in_host): + with trio.CancelScope() as cscope: + in_host(lambda: set_deadline(cscope, -inf)) + await trio.sleep_forever() + assert cscope.cancelled_caught + + with trio.CancelScope() as cscope: + in_host(lambda: set_deadline(cscope, -inf)) + await trio.sleep(999) + assert cscope.cancelled_caught + + return "ok" + + assert trivial_guest_run(trio_main) == "ok" + + +def test_warn_set_wakeup_fd_overwrite(): + assert signal.set_wakeup_fd(-1) == -1 + + async def trio_main(in_host): + return "ok" + + a, b = socket.socketpair() + with a, b: + a.setblocking(False) + + # Warn if there's already a wakeup fd + signal.set_wakeup_fd(a.fileno()) + try: + with pytest.warns(RuntimeWarning, match="signal handling code.*collided"): + assert trivial_guest_run(trio_main) == "ok" + finally: + assert signal.set_wakeup_fd(-1) == a.fileno() + + signal.set_wakeup_fd(a.fileno()) + try: + with pytest.warns(RuntimeWarning, match="signal handling code.*collided"): + assert ( + trivial_guest_run( + trio_main, trust_host_loop_to_wake_on_signals=False + ) + == "ok" + ) + finally: + assert signal.set_wakeup_fd(-1) == a.fileno() + + # Don't warn if there isn't already a wakeup fd + with pytest.warns(None) as record: + assert trivial_guest_run(trio_main) == "ok" + assert len(record) == 0 + + with pytest.warns(None) as record: + assert ( + trivial_guest_run(trio_main, trust_host_loop_to_wake_on_signals=True) + == "ok" + ) + assert len(record) == 0 + + # If there's already a wakeup fd, but we've been told to trust it, + # then it's left alone and there's no warning + signal.set_wakeup_fd(a.fileno()) + try: + + async def trio_check_wakeup_fd_unaltered(in_host): + fd = signal.set_wakeup_fd(-1) + assert fd == a.fileno() + signal.set_wakeup_fd(fd) + return "ok" + + with pytest.warns(None) as record: + assert ( + trivial_guest_run( + trio_check_wakeup_fd_unaltered, + trust_host_loop_to_wake_on_signals=True, + ) + == "ok" + ) + assert len(record) == 0 + finally: + assert signal.set_wakeup_fd(-1) == a.fileno() + + +def test_host_wakeup_doesnt_trigger_wait_all_tasks_blocked(): + # This is designed to hit the branch in unrolled_run where: + # idle_primed=True + # runner.runq is empty + # events is Truth-y + # ...and confirm that in this case, wait_all_tasks_blocked does not get + # triggered. + def set_deadline(cscope, new_deadline): + print(f"setting deadline {new_deadline}") + cscope.deadline = new_deadline + + async def trio_main(in_host): + async def sit_in_wait_all_tasks_blocked(watb_cscope): + with watb_cscope: + # Overall point of this test is that this + # wait_all_tasks_blocked should *not* return normally, but + # only by cancellation. + await trio.testing.wait_all_tasks_blocked(cushion=9999) + assert False # pragma: no cover + assert watb_cscope.cancelled_caught + + async def get_woken_by_host_deadline(watb_cscope): + with trio.CancelScope() as cscope: + print("scheduling stuff to happen") + # Altering the deadline from the host, to something in the + # future, will cause the run loop to wake up, but then + # discover that there is nothing to do and go back to sleep. + # This should *not* trigger wait_all_tasks_blocked. + # + # So the 'before_io_wait' here will wait until we're blocking + # with the wait_all_tasks_blocked primed, and then schedule a + # deadline change. The critical test is that this should *not* + # wake up 'sit_in_wait_all_tasks_blocked'. + # + # The after we've had a chance to wake up + # 'sit_in_wait_all_tasks_blocked', we want the test to + # actually end. So in after_io_wait we schedule a second host + # call to tear things down. + class InstrumentHelper: + def __init__(self): + self.primed = False + + def before_io_wait(self, timeout): + print(f"before_io_wait({timeout})") + if timeout == 9999: + assert not self.primed + in_host(lambda: set_deadline(cscope, 1e9)) + self.primed = True + + def after_io_wait(self, timeout): + if self.primed: + print("instrument triggered") + in_host(lambda: cscope.cancel()) + trio.lowlevel.remove_instrument(self) + + trio.lowlevel.add_instrument(InstrumentHelper()) + await trio.sleep_forever() + assert cscope.cancelled_caught + watb_cscope.cancel() + + async with trio.open_nursery() as nursery: + watb_cscope = trio.CancelScope() + nursery.start_soon(sit_in_wait_all_tasks_blocked, watb_cscope) + await trio.testing.wait_all_tasks_blocked() + nursery.start_soon(get_woken_by_host_deadline, watb_cscope) + + return "ok" + + assert trivial_guest_run(trio_main) == "ok" + + +def test_guest_warns_if_abandoned(): + # This warning is emitted from the garbage collector. So we have to make + # sure that our abandoned run is garbage. The easiest way to do this is to + # put it into a function, so that we're sure all the local state, + # traceback frames, etc. are garbage once it returns. + def do_abandoned_guest_run(): + async def abandoned_main(in_host): + in_host(lambda: 1 / 0) + while True: + await trio.sleep(0) + + with pytest.raises(ZeroDivisionError): + trivial_guest_run(abandoned_main) + + with pytest.warns(RuntimeWarning, match="Trio guest run got abandoned"): + do_abandoned_guest_run() + gc_collect_harder() + + # If you have problems some day figuring out what's holding onto a + # reference to the unrolled_run generator and making this test fail, + # then this might be useful to help track it down. (It assumes you + # also hack start_guest_run so that it does 'global W; W = + # weakref(unrolled_run_gen)'.) + # + # import gc + # print(trio._core._run.W) + # targets = [trio._core._run.W()] + # for i in range(15): + # new_targets = [] + # for target in targets: + # new_targets += gc.get_referrers(target) + # new_targets.remove(targets) + # print("#####################") + # print(f"depth {i}: {len(new_targets)}") + # print(new_targets) + # targets = new_targets + + with pytest.raises(RuntimeError): + trio.current_time() + + +def aiotrio_run(trio_fn, **start_guest_run_kwargs): loop = asyncio.new_event_loop() async def aio_main(): trio_done_fut = asyncio.Future() def trio_done_callback(main_outcome): - print(f"trio_main finished: {main_outcome!r}") + print(f"trio_fn finished: {main_outcome!r}") trio_done_fut.set_result(main_outcome) trio.lowlevel.start_guest_run( - trio_main, + trio_fn, run_sync_soon_threadsafe=loop.call_soon_threadsafe, done_callback=trio_done_callback, - # Not all versions of asyncio we test on can actually be trusted, - # but this test doesn't care about signal handling. - trust_host_loop_to_wake_on_signals=True, + **start_guest_run_kwargs, ) return (await trio_done_fut).unwrap() + try: + return loop.run_until_complete(aio_main()) + finally: + loop.close() + + +def test_guest_mode_on_asyncio(): async def trio_main(): print("trio_main!") @@ -57,5 +349,13 @@ async def aio_pingpong(from_trio, to_trio): traceback.print_exc() raise - assert loop.run_until_complete(aio_main()) == "trio-main-done" - loop.close() + assert ( + aiotrio_run( + trio_main, + # Not all versions of asyncio we test on can actually be trusted, + # but this test doesn't care about signal handling, and it's + # easier to just avoid the warnings. + trust_host_loop_to_wake_on_signals=True, + ) + == "trio-main-done" + ) From 1bd024915949459930101295d2575b3906957c33 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 28 May 2020 21:37:45 -0700 Subject: [PATCH 19/38] Make "assert no RuntimeWarnings" tests more reliable They were failing due to unrelated ResourceWarnings --- trio/_core/tests/test_guest_mode.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/trio/_core/tests/test_guest_mode.py b/trio/_core/tests/test_guest_mode.py index ab1fd9530a..9db7ea2f6a 100644 --- a/trio/_core/tests/test_guest_mode.py +++ b/trio/_core/tests/test_guest_mode.py @@ -147,14 +147,17 @@ async def trio_main(in_host): # Don't warn if there isn't already a wakeup fd with pytest.warns(None) as record: assert trivial_guest_run(trio_main) == "ok" - assert len(record) == 0 + # Apparently this is how you assert 'there were no RuntimeWarnings' + with pytest.raises(AssertionError): + record.pop(RuntimeWarning) with pytest.warns(None) as record: assert ( trivial_guest_run(trio_main, trust_host_loop_to_wake_on_signals=True) == "ok" ) - assert len(record) == 0 + with pytest.raises(AssertionError): + record.pop(RuntimeWarning) # If there's already a wakeup fd, but we've been told to trust it, # then it's left alone and there's no warning @@ -175,7 +178,8 @@ async def trio_check_wakeup_fd_unaltered(in_host): ) == "ok" ) - assert len(record) == 0 + with pytest.raises(AssertionError): + record.pop(RuntimeWarning) finally: assert signal.set_wakeup_fd(-1) == a.fileno() From ebea1228d5a4b877e47b39ad7d7f111a610b1cdf Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 28 May 2020 21:40:50 -0700 Subject: [PATCH 20/38] Fix indentation on some asserts --- trio/_core/tests/test_guest_mode.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/trio/_core/tests/test_guest_mode.py b/trio/_core/tests/test_guest_mode.py index 9db7ea2f6a..41873f5aee 100644 --- a/trio/_core/tests/test_guest_mode.py +++ b/trio/_core/tests/test_guest_mode.py @@ -102,12 +102,12 @@ async def trio_main(in_host): with trio.CancelScope() as cscope: in_host(lambda: set_deadline(cscope, -inf)) await trio.sleep_forever() - assert cscope.cancelled_caught + assert cscope.cancelled_caught with trio.CancelScope() as cscope: in_host(lambda: set_deadline(cscope, -inf)) await trio.sleep(999) - assert cscope.cancelled_caught + assert cscope.cancelled_caught return "ok" From ef8a85af0745c713627a95f3f7109f4561b6e8ad Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 28 May 2020 21:54:20 -0700 Subject: [PATCH 21/38] Add test that guest mode properly routes TrioInternalErrors --- trio/_core/tests/test_guest_mode.py | 42 +++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/trio/_core/tests/test_guest_mode.py b/trio/_core/tests/test_guest_mode.py index 41873f5aee..e01200c433 100644 --- a/trio/_core/tests/test_guest_mode.py +++ b/trio/_core/tests/test_guest_mode.py @@ -6,6 +6,7 @@ from math import inf import signal import socket +import threading import trio import trio.testing @@ -363,3 +364,44 @@ async def aio_pingpong(from_trio, to_trio): ) == "trio-main-done" ) + + +def test_guest_mode_internal_errors(monkeypatch, recwarn): + with monkeypatch.context() as m: + + async def crash_in_run_loop(in_host): + m.setattr("trio._core._run.GLOBAL_RUN_CONTEXT.runner.runq", "HI") + await trio.sleep(1) + + with pytest.raises(trio.TrioInternalError): + trivial_guest_run(crash_in_run_loop) + + with monkeypatch.context() as m: + + async def crash_in_io(in_host): + m.setattr("trio._core._run.TheIOManager.get_events", None) + await trio.sleep(0) + + with pytest.raises(trio.TrioInternalError): + trivial_guest_run(crash_in_io) + + with monkeypatch.context() as m: + + async def crash_in_worker_thread_io(in_host): + t = threading.current_thread() + old_get_events = trio._core._run.TheIOManager.get_events + + def bad_get_events(*args): + if threading.current_thread() is not t: + raise ValueError("oh no!") + else: + return old_get_events(*args) + + m.setattr("trio._core._run.TheIOManager.get_events", bad_get_events) + + await trio.sleep(1) + + with pytest.raises(trio.TrioInternalError): + trivial_guest_run(crash_in_worker_thread_io) + + gc_collect_harder() From cd86726f40a3fa61d61082dd550994ee5b38d593 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 28 May 2020 21:55:31 -0700 Subject: [PATCH 22/38] Replace some 'type(x) is y' with 'isinstance(x, y)' It turns out the latter is actually faster... --- trio/_core/_run.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/trio/_core/_run.py b/trio/_core/_run.py index 3972ba434a..d42db22b1f 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -821,7 +821,7 @@ def _check_nursery_closed(self): def _child_finished(self, task, outcome): self._children.remove(task) - if type(outcome) is Error: + if isinstance(outcome, Error): self._add_exc(outcome.error) self._check_nursery_closed() @@ -1141,7 +1141,7 @@ def guest_tick(self): # Optimization: try to skip going into the thread if we can avoid it events_outcome = capture(self.runner.io_manager.get_events, 0) - if timeout <= 0 or type(events_outcome) is Error or events_outcome.value: + if timeout <= 0 or isinstance(events_outcome, Error) or events_outcome.value: # No need to go into the thread self.unrolled_run_next_send = events_outcome self.runner.guest_tick_scheduled = True @@ -1786,7 +1786,7 @@ def run( next_send = runner.io_manager.get_events(timeout) # Inlined copy of runner.main_task_outcome.unwrap() to avoid # cluttering every single Trio traceback with an extra frame. - if type(runner.main_task_outcome) is Value: + if isinstance(runner.main_task_outcome, Value): return runner.main_task_outcome.value else: raise runner.main_task_outcome.error From b85176d2f16e27ae3b856c21cddfc03705c27b01 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Thu, 28 May 2020 22:01:48 -0700 Subject: [PATCH 23/38] Small coverage tweaks --- trio/_core/_io_epoll.py | 14 ++++++-------- trio/_core/tests/test_guest_mode.py | 7 +++++-- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/trio/_core/_io_epoll.py b/trio/_core/_io_epoll.py index 6ff6467fbb..665bbe66f7 100644 --- a/trio/_core/_io_epoll.py +++ b/trio/_core/_io_epoll.py @@ -186,9 +186,11 @@ class EpollIOManager: # {fd: EpollWaiters} _registered = attr.ib(factory=lambda: defaultdict(EpollWaiters)) _force_wakeup = attr.ib(factory=WakeupSocketpair) + _force_wakeup_fd = attr.ib(default=None) def __attrs_post_init__(self): self._epoll.register(self._force_wakeup.wakeup_sock, select.EPOLLIN) + self._force_wakeup_fd = self._force_wakeup.wakeup_sock.fileno() def statistics(self): tasks_waiting_read = 0 @@ -222,14 +224,10 @@ def get_events(self, timeout): def process_events(self, events): for fd, flags in events: - try: - waiters = self._registered[fd] - except KeyError: - if fd == self._force_wakeup.wakeup_sock.fileno(): - self._force_wakeup.drain() - continue - else: # pragma: no cover - raise + if fd == self._force_wakeup_fd: + self._force_wakeup.drain() + continue + waiters = self._registered[fd] # EPOLLONESHOT always clears the flags when an event is delivered waiters.current_flags = 0 # Clever hack stolen from selectors.EpollSelector: an event diff --git a/trio/_core/tests/test_guest_mode.py b/trio/_core/tests/test_guest_mode.py index e01200c433..e1caf4c062 100644 --- a/trio/_core/tests/test_guest_mode.py +++ b/trio/_core/tests/test_guest_mode.py @@ -106,6 +106,9 @@ async def trio_main(in_host): assert cscope.cancelled_caught with trio.CancelScope() as cscope: + # also do a change that doesn't affect the next deadline, just to + # exercise that path + in_host(lambda: set_deadline(cscope, inf)) in_host(lambda: set_deadline(cscope, -inf)) await trio.sleep(999) assert cscope.cancelled_caught @@ -229,13 +232,13 @@ def __init__(self): def before_io_wait(self, timeout): print(f"before_io_wait({timeout})") - if timeout == 9999: + if timeout == 9999: # pragma: no branch assert not self.primed in_host(lambda: set_deadline(cscope, 1e9)) self.primed = True def after_io_wait(self, timeout): - if self.primed: + if self.primed: # pragma: no branch print("instrument triggered") in_host(lambda: cscope.cancel()) trio.lowlevel.remove_instrument(self) From 35595006fce87e98f3c8adc3c978f8b628c0f371 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Fri, 29 May 2020 06:02:54 -0700 Subject: [PATCH 24/38] Fix test so it exercises what it's supposed to be exercising --- trio/_core/tests/test_guest_mode.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trio/_core/tests/test_guest_mode.py b/trio/_core/tests/test_guest_mode.py index e1caf4c062..f055ca1f0f 100644 --- a/trio/_core/tests/test_guest_mode.py +++ b/trio/_core/tests/test_guest_mode.py @@ -108,7 +108,7 @@ async def trio_main(in_host): with trio.CancelScope() as cscope: # also do a change that doesn't affect the next deadline, just to # exercise that path - in_host(lambda: set_deadline(cscope, inf)) + in_host(lambda: set_deadline(cscope, 1e6)) in_host(lambda: set_deadline(cscope, -inf)) await trio.sleep(999) assert cscope.cancelled_caught From d4343dc95fd72877cd2fd810fc685e171b1384ae Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Sat, 30 May 2020 02:44:02 -0700 Subject: [PATCH 25/38] Add run_sync_soon_not_threadsafe= kwarg on start_guest_run This is handy for host loops where the non-threadsafe version is significantly faster (e.g. tkinter). --- trio/_core/_run.py | 18 +++++++++----- trio/_core/tests/test_guest_mode.py | 38 +++++++++++++++++++++++++++-- 2 files changed, 48 insertions(+), 8 deletions(-) diff --git a/trio/_core/_run.py b/trio/_core/_run.py index d42db22b1f..ee1c1a454d 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -1123,6 +1123,7 @@ class _RunStatistics: class GuestState: runner = attr.ib() run_sync_soon_threadsafe = attr.ib() + run_sync_soon_not_threadsafe = attr.ib() done_callback = attr.ib() unrolled_run_gen = attr.ib() unrolled_run_next_send = attr.ib(factory=lambda: Value(None)) @@ -1145,7 +1146,7 @@ def guest_tick(self): # No need to go into the thread self.unrolled_run_next_send = events_outcome self.runner.guest_tick_scheduled = True - self.run_sync_soon_threadsafe(self.guest_tick) + self.run_sync_soon_not_threadsafe(self.guest_tick) else: # Need to go into the thread and call get_events() there self.runner.guest_tick_scheduled = False @@ -1803,6 +1804,7 @@ def start_guest_run( *args, run_sync_soon_threadsafe, done_callback, + run_sync_soon_not_threadsafe=None, trust_host_loop_to_wake_on_signals=False, clock=None, instruments=(), @@ -1811,18 +1813,22 @@ def start_guest_run( runner.is_guest = True runner.guest_tick_scheduled = True + if run_sync_soon_not_threadsafe is None: + run_sync_soon_not_threadsafe = run_sync_soon_threadsafe + guest_state = GuestState( - runner, - run_sync_soon_threadsafe, - done_callback, - unrolled_run( + runner=runner, + run_sync_soon_threadsafe=run_sync_soon_threadsafe, + run_sync_soon_not_threadsafe=run_sync_soon_not_threadsafe, + done_callback=done_callback, + unrolled_run_gen=unrolled_run( runner, async_fn, args, trust_host_loop_to_wake_on_signals=trust_host_loop_to_wake_on_signals, ), ) - run_sync_soon_threadsafe(guest_state.guest_tick) + run_sync_soon_not_threadsafe(guest_state.guest_tick) # 24 hours is arbitrary, but it avoids issues like people setting timeouts of diff --git a/trio/_core/tests/test_guest_mode.py b/trio/_core/tests/test_guest_mode.py index f055ca1f0f..c11c2dc70b 100644 --- a/trio/_core/tests/test_guest_mode.py +++ b/trio/_core/tests/test_guest_mode.py @@ -21,7 +21,22 @@ def trivial_guest_run(trio_fn, **start_guest_run_kwargs): todo = queue.Queue() + host_thread = threading.current_thread() + def run_sync_soon_threadsafe(fn): + if host_thread is threading.current_thread(): # pragma: no cover + crash = partial( + pytest.fail, "run_sync_soon_threadsafe called from host thread" + ) + todo.put(("run", crash)) + todo.put(("run", fn)) + + def run_sync_soon_not_threadsafe(fn): + if host_thread is not threading.current_thread(): # pragma: no cover + crash = partial( + pytest.fail, "run_sync_soon_not_threadsafe called from worker thread" + ) + todo.put(("run", crash)) todo.put(("run", fn)) def done_callback(outcome): @@ -29,8 +44,9 @@ def done_callback(outcome): trio.lowlevel.start_guest_run( trio_fn, - run_sync_soon_threadsafe, + run_sync_soon_not_threadsafe, run_sync_soon_threadsafe=run_sync_soon_threadsafe, + run_sync_soon_not_threadsafe=run_sync_soon_not_threadsafe, done_callback=done_callback, **start_guest_run_kwargs, ) @@ -300,7 +316,7 @@ async def abandoned_main(in_host): trio.current_time() -def aiotrio_run(trio_fn, **start_guest_run_kwargs): +def aiotrio_run(trio_fn, *, pass_not_threadsafe=True, **start_guest_run_kwargs): loop = asyncio.new_event_loop() async def aio_main(): @@ -310,6 +326,9 @@ def trio_done_callback(main_outcome): print(f"trio_fn finished: {main_outcome!r}") trio_done_fut.set_result(main_outcome) + if pass_not_threadsafe: + start_guest_run_kwargs["run_sync_soon_not_threadsafe"] = loop.call_soon + trio.lowlevel.start_guest_run( trio_fn, run_sync_soon_threadsafe=loop.call_soon_threadsafe, @@ -334,6 +353,10 @@ async def trio_main(): aio_task = asyncio.ensure_future(aio_pingpong(from_trio, to_trio)) + # Make sure we have at least one tick where we don't need to go into + # the thread + await trio.sleep(0) + from_trio.put_nowait(0) async for n in from_aio: @@ -368,6 +391,17 @@ async def aio_pingpong(from_trio, to_trio): == "trio-main-done" ) + assert ( + aiotrio_run( + trio_main, + # Also check that passing only call_soon_threadsafe works, via the + # fallback path where we use it for everything. + pass_not_threadsafe=False, + trust_host_loop_to_wake_on_signals=True, + ) + == "trio-main-done" + ) + def test_guest_mode_internal_errors(monkeypatch, recwarn): with monkeypatch.context() as m: From a827cfb308cfc13e8cf5bcbb3074569cd6a903a0 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Sun, 31 May 2020 19:40:14 -0700 Subject: [PATCH 26/38] First pass a comprehensive docs --- docs/source/design.rst | 3 + docs/source/reference-lowlevel.rst | 381 +++++++++++++++++++++++++++- trio/_core/_run.py | 57 ++++- trio/_core/_wakeup_socketpair.py | 2 +- trio/_core/tests/test_guest_mode.py | 10 +- 5 files changed, 440 insertions(+), 13 deletions(-) diff --git a/docs/source/design.rst b/docs/source/design.rst index 55eaf6d652..184e60c110 100644 --- a/docs/source/design.rst +++ b/docs/source/design.rst @@ -539,6 +539,9 @@ supported systems. Guest mode ---------- +One of Trio's more unusual features is that it supports being run in +"guest mode" on top of some other event loop (the "host"). + XX TODO: document this properly the basic idea of pushing ``get_events`` into a thread diff --git a/docs/source/reference-lowlevel.rst b/docs/source/reference-lowlevel.rst index b196cd0b43..371f47c365 100644 --- a/docs/source/reference-lowlevel.rst +++ b/docs/source/reference-lowlevel.rst @@ -467,7 +467,7 @@ this does serve to illustrate the basic structure of the Task API --------- +======== .. autofunction:: current_root_task() @@ -528,10 +528,387 @@ Task API :func:`wait_task_rescheduled` for details.) +.. _guest-mode: + +Using "guest mode" to run Trio on top of other event loops +========================================================== + +What is "guest mode"? +--------------------- + +An event loop acts as a central coordinator to manage all the IO +happening in your program. Normally, that means that your application +has to pick one event loop, and use it for everything. But what if you +like Trio, but also need to use a framework like `Qt +`__ or `PyGame +`__ that has its own event loop? Then you +need some way to run both event loops at once. + +It is possible to combine event loops, but the standard approaches all +have significant downsides: + +- **Polling:** this is where you use a `busy-loop + `__ to manually check + for IO on both event loops many times per second. This adds latency, + and wastes CPU time and electricity. + +- **Pluggable IO backends:** this is where you reimplement one of the + event loop APIs on top of the other, so you effectively end up with + just one event loop. This requires a significant amount of work for + each pair of event loops you want to integrate, and different + backends inevitably end up with inconsistent behavior, forcing users + to program against the least-common-denominator. And if the two + event loops expose different feature sets, it may not even be + possible to implement one in terms of the other. + +- **Running the two event loops in separate threads:** This works, but + most event loop APIs aren't thread-safe, so in this approach you + need to keep careful track of which code runs on which event loop, + and remember to use explicit inter-thread messaging whenever you + interact with the other loop – or else risk obscure race conditions + and data corruption. + +That's why Trio offers a fourth option: **guest mode**. Guest mode +lets you execute `trio.run` on top of some other "host" event loop, +like Qt. Its advantages are: + +- Efficiency: guest mode is event-driven instead of using a busy-loop, + so it has low latency and doesn't waste electricity. + +- No need to think about threads: your Trio code runs in the same + thread as the host event loop, so you can freely call Trio APIs from + the host, and call host APIs from Trio. For example, if you're + making a GUI app with Qt as the host loop, then making a `cancel + button `__ and connecting + it to a `trio.CancelScope` is as easy as writing:: + + # Trio code can create Qt objects without any special ceremony... + my_cancel_button = QPushButton("Cancel") + # ...and Qt can call back to Trio just as easily + my_cancel_button.clicked.connect(my_cancel_scope.cancel) + +- Consistent behavior: guest mode uses the code as regular Trio: the + same scheduler, same IO code, same everything. So you get the same + features and robustness that you're used to. + +- Simple integration and broad compatibility: pretty much every event + loop offers some threadsafe "schedule a callback" operation, and + that's all you need to use it as a host loop. + + +Really? How is that possible? +----------------------------- + +.. note:: + + You can use guest mode without reading this section. It's included + for those who enjoy understanding how things work. + +All event loops have the same basic structure. They loop through two +operations, over and over: + +1. Wait for the operating system to notify them that something + interesting has happened, like data arriving on a socket or a + timeout passing. They do this by invoking a platform-specific + ``sleep_until_something_happens()`` system call – ``select``, + ``epoll``, ``kqueue``, ``GetQueuedCompletionEvents``, etc. + +2. Run all the user tasks that care about whatever happened, then go + back to step 1. + +The problem here is step 1. Two different event loops on the same +thread can take turns running user tasks in step 2, but when they're +idle and nothing is happening, they can't both invoke their own +``sleep_until_something_happens()`` function at the same time. + +The "polling" and "pluggable backend" strategies solve this by hacking +the loops so both step 1s can run at the same time in the same thread. +Keeping everything in one thread is great for step 2, but the step 1 +hacks create problems. + +The "separate threads" strategy solves this by moving both steps into +separate threads. This makes step 1 work, but the downside is that now +the user tasks in step 2 are running separate threads as well, so +users are forced to deal with inter-thread coordination. + +The idea behind guest mode is to combine the best parts of each +approach: we move Trio's step 1 into a separate worker thread, while +keeping Trio's step 2 in the main host thread. This way, when the +application is idle, both event loops do their +``sleep_until_something_happens()`` at the same time in their own +threads. But when the app wakes up and your code is actually running, +it all happens in a single thread. The threading trickiness is all +handled transparently inside Trio. + +Concretely, we unroll Trio's internal event loop into a chain of +callbacks, and as each callback finishes, it schedules the next +callback onto the host loop or a worker thread as appropriate. So the +only thing the host loop has to provide is a way to schedule a +callback onto the main thread from a worker thread. + +Coordinating between Trio and the host loop does add some overhead. +The main cost is switching in and out of the background thread, since +this requires cross-thread messaging. This is cheap (on the order of a +few microseconds, assuming your host loop is implemented efficiently), +but it's not free. + +But, there's a nice optimization we can make: we only *need* the +thread when our ``sleep_until_something_happens()`` call actually +sleeps, that is, when the Trio part of your program is idle and has +nothing to do. So before we switch into the worker thread, we +double-check whether we're idle, and if not, then we skip the worker +thread and jump directly to step 2. This means that your app only pays +the extra thread-switching penalty at moments when it would otherwise +be sleeping, so it should have minimal effect on your app's overall +performance. + +The total overhead will depend on your host loop, your platform, your +application, etc. But we expect that in most cases, apps running in +guest mode should only be 5-10% slower than the same code using +`trio.run`. If you find that's not true for your app, then please let +us know and we'll see if we can fix it! + + +.. _guest-run-implementation: + +Implementing guest mode for your favorite event loop +---------------------------------------------------- + +Let's walk through what you need to do to integrate Trio's guest mode +with your favorite event loop. Treat this section like a checklist. + +**Getting started:** The first step is to get something basic working. +Here's a minimal example of running Trio on top of asyncio, that you +can use as a model:: + + import asyncio, trio + + # A tiny Trio program + async def trio_main(): + for i in range(5): + print(f"Hello from Trio!") + # This is inside Trio, so we have to use Trio APIs + await trio.sleep(1) + return "trio done!" + + # The code to run it as a guest inside asyncio + async def asyncio_main(): + asyncio_loop = asyncio.get_running_loop() + + def run_sync_soon_threadsafe(fn): + asyncio_loop.call_soon_threadsafe(fn) + + def done_callback(trio_main_outcome): + print(f"Trio program ended with: {trio_main_outcome}") + + # This is where the magic happens: + trio.lowlevel.start_guest_run( + trio_main, + run_sync_soon_threadsafe=run_sync_soon_threadsafe, + done_callback=done_callback, + ) + + # Let the host loop run for a while to give trio_main time to + # finish. (WARNING: This is a hack. See below for better + # approaches.) + # + # This function is in asyncio, so we have to use asyncio APIs. + await asyncio.sleep(10) + + asyncio.run(asyncio_main()) + +You can see we're using asyncio-specific APIs to start up a loop, and +then we call `trio.lowlevel.start_guest_run`. This function is very +similar to `trio.run`, and takes all the same arguments. But it has +two differences: + +First, instead of blocking until ``trio_main`` has finished, it +schedules ``trio_main`` to start running on top of the host loop, and +then returns immediately. So ``trio_main`` is running in the +background – that's why we have to sleep and give it time to finish. + +And second, it requires two extra keyword arguments: +``run_sync_soon_threadsafe``, and ``done_callback``. + +For ``run_sync_soon_threadsafe``, we need a function that takes a +synchronous callback, and schedules it to run on your host loop. And +this function needs to be "threadsafe" in the sense that you can +safely call it from any thread. So you need to figure out how to write +a function that does that using your host loop's API. For asyncio, +this is easy because `~asyncio.loop.call_soon_threadsafe` does exactly +what we need; for your loop, it might be more or less complicated. + +For ``done_callback``, you pass in a function that Trio will +automatically invoke when the Trio run finishes, so you know it's done +and what happened. For this basic starting version, we just print the +result; in the next section we'll discuss better alternatives. + +At this stage you should be able to run a simple Trio program inside +your host loop. Now we'll turn that prototype into something solid. + + +**Loop lifetimes:** One of the trickiest things in most event loops is +shutting down correctly. And having two event loops makes this even +harder! + +If you can, we recommend following this pattern: + +- Start up your host loop +- Immediately call `start_guest_run` to start Trio +- When Trio finishes and your ``done_callback`` is invoked, shut down + the host loop +- Make sure that nothing else shuts down your host loop + +This way, your two event loops have the same lifetime, and your +program automatically exits when your Trio function finishes. + +Here's how we'd extend our asyncio example to implement this pattern: + +.. code-block:: python3 + :emphasize-lines: 8-11,19-22 + + # Improved version, that shuts down properly after Trio finishes + async def asyncio_main(): + asyncio_loop = asyncio.get_running_loop() + + def run_sync_soon_threadsafe(fn): + asyncio_loop.call_soon_threadsafe(fn) + + # Revised 'done' callback: set a Future + done_fut = asyncio.Future() + def done_callback(trio_main_outcome): + done_fut.set_result(trio_main_outcome) + + trio.lowlevel.start_guest_run( + trio_main, + run_sync_soon_threadsafe=run_sync_soon_threadsafe, + done_callback=done_callback, + ) + + # Wait for the guest run to finish + trio_main_outcome = await done_fut + # Pass through the return value or exception from the guest run + return trio_main_outcome.unwrap() + +And then you can encapsulate all this machinery in a utility function +that exposes a `trio.run`-like API, but runs both loops together:: + + def trio_run_with_asyncio(trio_main, *args, **trio_run_kwargs): + async def asyncio_main(): + # same as above + ... + + return asyncio.run(asyncio_main()) + +Technically, it is possible to use other patterns. But there are some +important limitations you have to respect: + +- **You must let the Trio program run to completion.** Many event + loops let you stop the event loop at any point, and any pending + callbacks/tasks/etc. just... don't run. Trio follows a more + structured system, where you can cancel things, but the code always + runs to completion, so ``finally`` blocks run, resources are cleaned + up, etc. If you stop your host loop early, before the + ``done_callback`` is invoked, then that cuts off the Trio run in the + middle without a chance to clean up, and can leave both your program + and Trio itself in an inconsistent state. (For example, + + Some programs need to be able to quit at any time, for example in + response to a GUI window being closed or a user selecting a "Quit" + from a menu. To handle these cases, we recommend wrapping your whole + program in a `trio.CancelScope`, and cancelling it when you want to + quit. + +- Each host loop can only have one `start_guest_run` at a time. If you + try to start a second one, you'll get an error. If you need to run + multiple Trio functions at the same time, then start up a single + Trio run, open a nursery, and then start your functions as child + tasks in that nursery. + +Given these constraints, we think the simplest approach is to always +start and stop the two loops together. + +**Signal management:** `"Signals" +`__ are a low-level +inter-process communication primitive. When you hit control-C to kill +a program, that uses a signal. Signal handling in Python has `a lot of +moving parts +`__. +One of those parts is `signal.set_wakeup_fd`, which event loops use to +make sure that they wake up when a signal arrives so they can respond +to it. (If you've ever had an event loop ignore you when you hit +control-C, it was probably because they weren't using +`signal.set_wakeup_fd` correctly.) + +But, only one event loop can use `signal.set_wakeup_fd` at a time. And +in guest mode that can cause problems: Trio and the host loop might +start fighting over who's using `signal.set_wakeup_fd`. + +Some event loops, like asyncio, won't work correctly unless they win +this fight. Fortunately, Trio is a little less picky: as long as +*someone* makes sure that the program wakes up when a signal arrives, +it should work correctly. So if your host loop wants +`signal.set_wakeup_fd`, then you should disable Trio's +`signal.set_wakeup_fd` support, and then both loops will work +correctly. + +On the other hand, if your host loop doesn't use +`signal.set_wakeup_fd`, then the only way to make everything work +correctly is to *enable* Trio's `signal.set_wakeup_fd` support. + +By default, Trio assumes that your host loop doesn't use +`signal.set_wakeup_fd`. It does try to detect when this creates a +conflict with the host loop, and print a warning – but unfortunately, +by the time it detects it, the damage has already been done. So if +you're getting this warning, then you should disable Trio's +`signal.set_wakeup_fd` support by passing +``host_uses_signal_set_wakeup_fd=True`` to `start_guest_run`. + +If you aren't seeing any warnings with your initial prototype, you're +*probably* fine. But the only way to be certain is to check your host +loop's source. For example, asyncio may or may not use +`signal.set_wakeup_fd` depending on the Python version and operating +system. + + +**Control-C handling** XX FIXME + + +**A small optimization:** Finally, consider a small optimization. Some +event loops offer two versions of their "call this function soon" API: +one that can be used from any thread, and one that can only be used +from the event loop thread, with the latter being cheaper. For +example, asyncio has both `~asyncio.loop.call_soon_threadsafe` and +`~asyncio.loop.call_soon`. + +If you have a loop like this, then you can also pass a +``run_sync_soon_not_threadsafe=...`` kwarg to `start_guest_run`, and +Trio will automatically use it when appropriate. + +If your loop doesn't have a split like this, then don't worry about +it; ``run_sync_soon_not_threadsafe=`` is optional. (If it's not +passed, then Trio will just use your threadsafe version in all cases.) + + + +Limitations +----------- + +only one run + +clock, autojump clock, deadlock detection, ... + + +Reference +--------- + +.. autofunction:: start_guest_run + + .. _live-coroutine-handoff: Handing off live coroutine objects between coroutine runners ------------------------------------------------------------- +============================================================ Internally, Python's async/await syntax is built around the idea of "coroutine objects" and "coroutine runners". A coroutine object diff --git a/trio/_core/_run.py b/trio/_core/_run.py index ee1c1a454d..5be9f27e90 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -1103,7 +1103,7 @@ class _RunStatistics: # This holds all the state that gets trampolined back and forth between # callbacks when we're running in guest mode. # -# It has to be a separate object from Runner, and Runner *cannot* have hold +# It has to be a separate object from Runner, and Runner *cannot* hold # references to it (directly or indirectly)! # # The idea is that we want a chance to detect if our host loop quits and stops @@ -1805,10 +1805,57 @@ def start_guest_run( run_sync_soon_threadsafe, done_callback, run_sync_soon_not_threadsafe=None, - trust_host_loop_to_wake_on_signals=False, + host_uses_signal_set_wakeup_fd=False, clock=None, instruments=(), ): + """Start a "guest" run of Trio on top of some other "host" event loop. + + Each host loop can only have one guest run at a time. + + You should always let the Trio run finish before stopping the host loop; + if not, it may leave Trio's internal data structures in an inconsistent + state. You might be able to get away with it if you immediately exit the + program, but it's safest not to go there in the first place. + + So generally, the best way to do this is wrap this in a function that + starts the host loop and then immediately starts the guest run, and then + shuts down the host when the guest run completes. + + Args: + + run_sync_soon_threadsafe: An arbitrary callable, which will be passed a + function as its sole argument:: + + def my_run_sync_soon_threadsafe(fn): + hi() + + This callable should schedule ``fn`` to be run by the host on its + next pass through its loop. **Must support being called from + arbitrary threads.** + + done_callback: An arbitrary callable:: + + def my_done_callback(run_outcome): + hi() + + When the Trio run has finished, Trio will invoke this callback to let + you know. The argument is an `outcome.Outcome`, reporting what would + have been returned or raised by `trio.run`. This function can do + anything you want, but commonly you'll want it to shut down the + host loop, unwrap the outcome, etc. + + run_sync_soon_not_threadsafe: Optional. Like + ``run_sync_soon_threadsafe``, but will only be called from inside the + host loop's main thread. + + host_uses_signal_set_wakeup_fd (bool): Pass `True` if your host loop + uses `signal.set_wakeup_fd`, and `False` otherwise. For more details, + see :ref:`guest-run-implementation`. + + For the meaning of other arguments, see `trio.run`. + + """ runner = setup_runner(clock, instruments) runner.is_guest = True runner.guest_tick_scheduled = True @@ -1825,7 +1872,7 @@ def start_guest_run( runner, async_fn, args, - trust_host_loop_to_wake_on_signals=trust_host_loop_to_wake_on_signals, + host_uses_signal_set_wakeup_fd=host_uses_signal_set_wakeup_fd, ), ) run_sync_soon_not_threadsafe(guest_state.guest_tick) @@ -1840,12 +1887,12 @@ def start_guest_run( # mode", where our core event loop gets unrolled into a series of callbacks on # the host loop. If you're doing a regular trio.run then this gets run # straight through. -def unrolled_run(runner, async_fn, args, trust_host_loop_to_wake_on_signals=False): +def unrolled_run(runner, async_fn, args, host_uses_signal_set_wakeup_fd=False): locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True __tracebackhide__ = True try: - if not trust_host_loop_to_wake_on_signals: + if not host_uses_signal_set_wakeup_fd: runner.entry_queue.wakeup.wakeup_on_signals() if runner.instruments: diff --git a/trio/_core/_wakeup_socketpair.py b/trio/_core/_wakeup_socketpair.py index 392a2c0b2e..80d3090ee9 100644 --- a/trio/_core/_wakeup_socketpair.py +++ b/trio/_core/_wakeup_socketpair.py @@ -73,7 +73,7 @@ def wakeup_on_signals(self): "It looks like Trio's signal handling code might have " "collided with another library you're using. If you're " "running Trio in guest mode, then this might mean you " - "should set trust_host_loop_to_wake_on_signals=True. " + "should set host_uses_signal_set_wakeup_fd=True. " "Otherwise, file a bug on Trio and we'll help you figure " "out what's going on." ) diff --git a/trio/_core/tests/test_guest_mode.py b/trio/_core/tests/test_guest_mode.py index c11c2dc70b..5660f6eddc 100644 --- a/trio/_core/tests/test_guest_mode.py +++ b/trio/_core/tests/test_guest_mode.py @@ -157,7 +157,7 @@ async def trio_main(in_host): with pytest.warns(RuntimeWarning, match="signal handling code.*collided"): assert ( trivial_guest_run( - trio_main, trust_host_loop_to_wake_on_signals=False + trio_main, host_uses_signal_set_wakeup_fd=False ) == "ok" ) @@ -173,7 +173,7 @@ async def trio_main(in_host): with pytest.warns(None) as record: assert ( - trivial_guest_run(trio_main, trust_host_loop_to_wake_on_signals=True) + trivial_guest_run(trio_main, host_uses_signal_set_wakeup_fd=True) == "ok" ) with pytest.raises(AssertionError): @@ -194,7 +194,7 @@ async def trio_check_wakeup_fd_unaltered(in_host): assert ( trivial_guest_run( trio_check_wakeup_fd_unaltered, - trust_host_loop_to_wake_on_signals=True, + host_uses_signal_set_wakeup_fd=True, ) == "ok" ) @@ -386,7 +386,7 @@ async def aio_pingpong(from_trio, to_trio): # Not all versions of asyncio we test on can actually be trusted, # but this test doesn't care about signal handling, and it's # easier to just avoid the warnings. - trust_host_loop_to_wake_on_signals=True, + host_uses_signal_set_wakeup_fd=True, ) == "trio-main-done" ) @@ -397,7 +397,7 @@ async def aio_pingpong(from_trio, to_trio): # Also check that passing only call_soon_threadsafe works, via the # fallback path where we use it for everything. pass_not_threadsafe=False, - trust_host_loop_to_wake_on_signals=True, + host_uses_signal_set_wakeup_fd=True, ) == "trio-main-done" ) From 8018ceffda3c742c9471c1b7b39301f51c465663 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Sun, 31 May 2020 19:43:27 -0700 Subject: [PATCH 27/38] newsfragment --- newsfragments/399.feature.rst | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 newsfragments/399.feature.rst diff --git a/newsfragments/399.feature.rst b/newsfragments/399.feature.rst new file mode 100644 index 0000000000..4de52bfb30 --- /dev/null +++ b/newsfragments/399.feature.rst @@ -0,0 +1,3 @@ +If you want to use Trio, but are stuck with some other event loop like +Qt or PyGame, then good news: now you can have both. For details, see: +:ref:`guest-mode`. From eea80105a5356f88d7c564ad276a7ee58ec29102 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Sun, 31 May 2020 19:55:45 -0700 Subject: [PATCH 28/38] black --- trio/_core/tests/test_guest_mode.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/trio/_core/tests/test_guest_mode.py b/trio/_core/tests/test_guest_mode.py index 5660f6eddc..d4efba3449 100644 --- a/trio/_core/tests/test_guest_mode.py +++ b/trio/_core/tests/test_guest_mode.py @@ -156,9 +156,7 @@ async def trio_main(in_host): try: with pytest.warns(RuntimeWarning, match="signal handling code.*collided"): assert ( - trivial_guest_run( - trio_main, host_uses_signal_set_wakeup_fd=False - ) + trivial_guest_run(trio_main, host_uses_signal_set_wakeup_fd=False) == "ok" ) finally: From 7c0744b43b9693b58c522695362a5d73f98079b8 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Mon, 1 Jun 2020 01:26:48 -0700 Subject: [PATCH 29/38] Doc updates --- docs/source/reference-lowlevel.rst | 17 +++--- trio/_core/_run.py | 83 ++++++++++++++++++++++++++---- 2 files changed, 84 insertions(+), 16 deletions(-) diff --git a/docs/source/reference-lowlevel.rst b/docs/source/reference-lowlevel.rst index 371f47c365..9a4ffd6afb 100644 --- a/docs/source/reference-lowlevel.rst +++ b/docs/source/reference-lowlevel.rst @@ -587,9 +587,9 @@ like Qt. Its advantages are: # ...and Qt can call back to Trio just as easily my_cancel_button.clicked.connect(my_cancel_scope.cancel) -- Consistent behavior: guest mode uses the code as regular Trio: the - same scheduler, same IO code, same everything. So you get the same - features and robustness that you're used to. +- Consistent behavior: guest mode uses the same code as regular Trio: + the same scheduler, same IO code, same everything. So you get the + full feature set and everything acts the way you expect. - Simple integration and broad compatibility: pretty much every event loop offers some threadsafe "schedule a callback" operation, and @@ -810,12 +810,14 @@ important limitations you have to respect: runs to completion, so ``finally`` blocks run, resources are cleaned up, etc. If you stop your host loop early, before the ``done_callback`` is invoked, then that cuts off the Trio run in the - middle without a chance to clean up, and can leave both your program - and Trio itself in an inconsistent state. (For example, + middle without a chance to clean up. This can leave your code in an + inconsistent state, and will definitely leave Trio's internals in an + inconsistent state, which will cause errors if you try to use Trio + again in that thread. Some programs need to be able to quit at any time, for example in response to a GUI window being closed or a user selecting a "Quit" - from a menu. To handle these cases, we recommend wrapping your whole + from a menu. In these cases, we recommend wrapping your whole program in a `trio.CancelScope`, and cancelling it when you want to quit. @@ -889,6 +891,9 @@ If your loop doesn't have a split like this, then don't worry about it; ``run_sync_soon_not_threadsafe=`` is optional. (If it's not passed, then Trio will just use your threadsafe version in all cases.) +**That's it!** If you've followed all these steps, you should now have +a cleanly-integrated hybrid event loop. Go make some cool +GUIs/games/whatever! Limitations diff --git a/trio/_core/_run.py b/trio/_core/_run.py index 5be9f27e90..ca42e47769 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -1666,7 +1666,68 @@ def remove_instrument(self, instrument): ################################################################ # run ################################################################ - +# +# Trio's core task scheduler and coroutine runner is in 'unrolled_run'. It's +# called that because it has an unusual feature: it's actually a generator. +# Whenever it needs to fetch IO events from the OS, it yields, and waits for +# its caller to send the IO events back in. So the loop is "unrolled" into a +# sequence of generator send() calls. +# +# The reason for this unusual design is to support two different modes of +# operation, where the IO is handled differently. +# +# In normal mode using trio.run, the scheduler and IO run in the same thread: +# +# Main thread: +# +# +---------------------------+ +# | Run tasks | +# | (unrolled_run) | +# +---------------------------+ +# | Block waiting for I/O | +# | (io_manager.get_events) | +# +---------------------------+ +# | Run tasks | +# | (unrolled_run) | +# +---------------------------+ +# | Block waiting for I/O | +# | (io_manager.get_events) | +# +---------------------------+ +# : +# +# +# In guest mode using trio.start_guest_run, the scheduler runs on the main +# thread as a host loop callback, but blocking for IO gets pushed into a +# worker thread: +# +# Main thread executing host loop: Trio I/O thread: +# +# +---------------------------+ +# | Run Trio tasks | +# | (unrolled_run) | +# +---------------------------+ --------------+ +# v +# +---------------------------+ +----------------------------+ +# | Host loop does whatever | | Block waiting for Trio I/O | +# | it wants | | (io_manager.get_events) | +# +---------------------------+ +----------------------------+ +# | +# +---------------------------+ <-------------+ +# | Run Trio tasks | +# | (unrolled_run) | +# +---------------------------+ --------------+ +# v +# +---------------------------+ +----------------------------+ +# | Host loop does whatever | | Block waiting for Trio I/O | +# | it wants | | (io_manager.get_events) | +# +---------------------------+ +----------------------------+ +# : : +# +# Most of Trio's internals don't need to care about this difference. The main +# complication it creates is that in guest mode, we might need to wake up not +# just due to OS-reported IO events, but also because of code running on the +# host loop calling reschedule() or changing task deadlines. Search for +# 'is_guest' to see the special cases we need to handle this. def setup_runner(clock, instruments): """Create a Runner object and install it as the GLOBAL_RUN_CONTEXT.""" @@ -1818,9 +1879,9 @@ def start_guest_run( state. You might be able to get away with it if you immediately exit the program, but it's safest not to go there in the first place. - So generally, the best way to do this is wrap this in a function that - starts the host loop and then immediately starts the guest run, and then - shuts down the host when the guest run completes. + Generally, the best way to do this is wrap this in a function that starts + the host loop and then immediately starts the guest run, and then shuts + down the host when the guest run completes. Args: @@ -1828,16 +1889,16 @@ def start_guest_run( function as its sole argument:: def my_run_sync_soon_threadsafe(fn): - hi() + ... - This callable should schedule ``fn`` to be run by the host on its + This callable should schedule ``fn()`` to be run by the host on its next pass through its loop. **Must support being called from arbitrary threads.** done_callback: An arbitrary callable:: def my_done_callback(run_outcome): - hi() + ... When the Trio run has finished, Trio will invoke this callback to let you know. The argument is an `outcome.Outcome`, reporting what would @@ -1845,9 +1906,11 @@ def my_done_callback(run_outcome): anything you want, but commonly you'll want it to shut down the host loop, unwrap the outcome, etc. - run_sync_soon_not_threadsafe: Optional. Like - ``run_sync_soon_threadsafe``, but will only be called from inside the - host loop's main thread. + run_sync_soon_not_threadsafe: Like ``run_sync_soon_threadsafe``, but + will only be called from inside the host loop's main thread. + Optional, but if your host loop allows you to implement this more + efficiently than ``run_sync_soon_threadsafe`` then passing it will + make things a bit faster. host_uses_signal_set_wakeup_fd (bool): Pass `True` if your host loop uses `signal.set_wakeup_fd`, and `False` otherwise. For more details, From be88548e554b00d42596842a5c99380130ab4653 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Mon, 1 Jun 2020 01:59:41 -0700 Subject: [PATCH 30/38] Add note about custom clocks in guest mode --- docs/source/reference-lowlevel.rst | 14 +++++++++++--- docs/source/reference-testing.rst | 2 ++ trio/_core/_run.py | 1 + 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/docs/source/reference-lowlevel.rst b/docs/source/reference-lowlevel.rst index 9a4ffd6afb..9e6e8d8a21 100644 --- a/docs/source/reference-lowlevel.rst +++ b/docs/source/reference-lowlevel.rst @@ -899,9 +899,17 @@ GUIs/games/whatever! Limitations ----------- -only one run - -clock, autojump clock, deadlock detection, ... +In general, almost all Trio features should work in guest mode. The +exception is features which rely on Trio having a complete picture of +everything that your program is doing, since obviously, it can't +control the host loop or see what it's doing. + +Custom clocks can be used in guest mode, but they only affect Trio +timeouts, not host loop timeouts. And the :ref:`autojump clock +` and related `trio.testing.wait_all_tasks_blocked` can +technically be used in guest mode, but they'll only take Trio tasks +into account when decided whether to jump the clock or whether all +tasks are blocked. Reference diff --git a/docs/source/reference-testing.rst b/docs/source/reference-testing.rst index 40a275bbeb..76ecd4a2d4 100644 --- a/docs/source/reference-testing.rst +++ b/docs/source/reference-testing.rst @@ -16,6 +16,8 @@ Test harness integration .. decorator:: trio_test +.. _testing-time: + Time and timeouts ----------------- diff --git a/trio/_core/_run.py b/trio/_core/_run.py index ca42e47769..a0f5083f0a 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -1729,6 +1729,7 @@ def remove_instrument(self, instrument): # host loop calling reschedule() or changing task deadlines. Search for # 'is_guest' to see the special cases we need to handle this. + def setup_runner(clock, instruments): """Create a Runner object and install it as the GLOBAL_RUN_CONTEXT.""" # It wouldn't be *hard* to support nested calls to run(), but I can't From b84a715e5140b2242377e59cdd28048cc6c81942 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Mon, 1 Jun 2020 13:50:44 -0700 Subject: [PATCH 31/38] Fix comment --- trio/_core/_run.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/trio/_core/_run.py b/trio/_core/_run.py index a0f5083f0a..edb0369ca8 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -1696,8 +1696,8 @@ def remove_instrument(self, instrument): # : # # -# In guest mode using trio.start_guest_run, the scheduler runs on the main -# thread as a host loop callback, but blocking for IO gets pushed into a +# In guest mode using trio.lowlevel.start_guest_run, the scheduler runs on the +# main thread as a host loop callback, but blocking for IO gets pushed into a # worker thread: # # Main thread executing host loop: Trio I/O thread: From c120632ee9b78591144b54305faba6532107dda0 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Mon, 1 Jun 2020 14:32:54 -0700 Subject: [PATCH 32/38] Enable KI handling in guest mode --- trio/_core/_ki.py | 52 +++++++++++--------- trio/_core/_run.py | 74 +++++++++++++++++------------ trio/_core/tests/test_guest_mode.py | 33 +++++++++++++ 3 files changed, 105 insertions(+), 54 deletions(-) diff --git a/trio/_core/_ki.py b/trio/_core/_ki.py index ec22ff5fb6..01ce03ce59 100644 --- a/trio/_core/_ki.py +++ b/trio/_core/_ki.py @@ -3,6 +3,7 @@ import sys from contextlib import contextmanager from functools import wraps +import attr import async_generator @@ -170,26 +171,31 @@ def wrapper(*args, **kwargs): disable_ki_protection.__name__ = "disable_ki_protection" -@contextmanager -def ki_manager(deliver_cb, restrict_keyboard_interrupt_to_checkpoints): - if ( - not is_main_thread() - or signal.getsignal(signal.SIGINT) != signal.default_int_handler - ): - yield - return - - def handler(signum, frame): - assert signum == signal.SIGINT - protection_enabled = ki_protection_enabled(frame) - if protection_enabled or restrict_keyboard_interrupt_to_checkpoints: - deliver_cb() - else: - raise KeyboardInterrupt - - signal.signal(signal.SIGINT, handler) - try: - yield - finally: - if signal.getsignal(signal.SIGINT) is handler: - signal.signal(signal.SIGINT, signal.default_int_handler) +@attr.s +class KIManager: + handler = attr.ib(default=None) + + def install(self, deliver_cb, restrict_keyboard_interrupt_to_checkpoints): + assert self.handler is None + if ( + not is_main_thread() + or signal.getsignal(signal.SIGINT) != signal.default_int_handler + ): + return + + def handler(signum, frame): + assert signum == signal.SIGINT + protection_enabled = ki_protection_enabled(frame) + if protection_enabled or restrict_keyboard_interrupt_to_checkpoints: + deliver_cb() + else: + raise KeyboardInterrupt + + self.handler = handler + signal.signal(signal.SIGINT, handler) + + def close(self): + if self.handler is not None: + if signal.getsignal(signal.SIGINT) is self.handler: + signal.signal(signal.SIGINT, signal.default_int_handler) + self.handler = None diff --git a/trio/_core/_run.py b/trio/_core/_run.py index 6b16a4a9d2..a10d6cb57f 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -27,7 +27,7 @@ from ._exceptions import TrioInternalError, RunFinishedError, Cancelled from ._ki import ( LOCALS_KEY_KI_PROTECTION_ENABLED, - ki_manager, + KIManager, enable_ki_protection, ) from ._multierror import MultiError @@ -1148,11 +1148,9 @@ class GuestState: unrolled_run_next_send = attr.ib(factory=lambda: Value(None)) def guest_tick(self): - locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True try: timeout = self.unrolled_run_next_send.send(self.unrolled_run_gen) except StopIteration: - # XX if we had KI support, we'd have to do something with it here self.done_callback(self.runner.main_task_outcome) return except TrioInternalError as exc: @@ -1189,6 +1187,7 @@ class Runner: clock = attr.ib() instruments = attr.ib() io_manager = attr.ib() + ki_manager = attr.ib() # Run-local values, see _local.py _locals = attr.ib(factory=dict) @@ -1225,6 +1224,8 @@ def close(self): self.entry_queue.close() if self.instruments: self.instrument("after_run") + # This is where KI protection gets disabled, so we do it last + self.ki_manager.close() def next_deadline(self): try: @@ -1749,7 +1750,7 @@ def remove_instrument(self, instrument): # 'is_guest' to see the special cases we need to handle this. -def setup_runner(clock, instruments): +def setup_runner(clock, instruments, restrict_keyboard_interrupt_to_checkpoints): """Create a Runner object and install it as the GLOBAL_RUN_CONTEXT.""" # It wouldn't be *hard* to support nested calls to run(), but I can't # think of a single good reason for it, so let's be conservative for @@ -1763,12 +1764,20 @@ def setup_runner(clock, instruments): io_manager = TheIOManager() system_context = copy_context() system_context.run(current_async_library_cvar.set, "trio") + ki_manager = KIManager() + runner = Runner( clock=clock, instruments=instruments, io_manager=io_manager, system_context=system_context, + ki_manager=ki_manager, ) + + # This is where KI protection gets enabled, so we want to do it early - in + # particular before we start modifying global state like GLOBAL_RUN_CONTEXT + ki_manager.install(runner.deliver_ki, restrict_keyboard_interrupt_to_checkpoints) + GLOBAL_RUN_CONTEXT.runner = runner return runner @@ -1850,33 +1859,24 @@ def run( __tracebackhide__ = True - runner = setup_runner(clock, instruments) + runner = setup_runner( + clock, instruments, restrict_keyboard_interrupt_to_checkpoints + ) - # KI handling goes outside unrolled_run to avoid an interval where - # KeyboardInterrupt would be allowed and converted into an - # TrioInternalError: - try: - with ki_manager(runner.deliver_ki, restrict_keyboard_interrupt_to_checkpoints): - gen = unrolled_run(runner, async_fn, args) - next_send = None - while True: - try: - timeout = gen.send(next_send) - except StopIteration: - break - next_send = runner.io_manager.get_events(timeout) - # Inlined copy of runner.main_task_outcome.unwrap() to avoid - # cluttering every single Trio traceback with an extra frame. - if isinstance(runner.main_task_outcome, Value): - return runner.main_task_outcome.value - else: - raise runner.main_task_outcome.error - finally: - # To guarantee that we never swallow a KeyboardInterrupt, we have to - # check for pending ones once more after leaving the context manager: - if runner.ki_pending: - # Implicitly chains with any exception from outcome.unwrap(): - raise KeyboardInterrupt + gen = unrolled_run(runner, async_fn, args) + next_send = None + while True: + try: + timeout = gen.send(next_send) + except StopIteration: + break + next_send = runner.io_manager.get_events(timeout) + # Inlined copy of runner.main_task_outcome.unwrap() to avoid + # cluttering every single Trio traceback with an extra frame. + if isinstance(runner.main_task_outcome, Value): + return runner.main_task_outcome.value + else: + raise runner.main_task_outcome.error def start_guest_run( @@ -1888,6 +1888,7 @@ def start_guest_run( host_uses_signal_set_wakeup_fd=False, clock=None, instruments=(), + restrict_keyboard_interrupt_to_checkpoints=False, ): """Start a "guest" run of Trio on top of some other "host" event loop. @@ -1938,7 +1939,9 @@ def my_done_callback(run_outcome): For the meaning of other arguments, see `trio.run`. """ - runner = setup_runner(clock, instruments) + runner = setup_runner( + clock, instruments, restrict_keyboard_interrupt_to_checkpoints + ) runner.is_guest = True runner.guest_tick_scheduled = True @@ -2154,6 +2157,7 @@ def unrolled_run(runner, async_fn, args, host_uses_signal_set_wakeup_fd=False): del GLOBAL_RUN_CONTEXT.task except GeneratorExit: + # The run-loop generator has been garbage collected without finishing warnings.warn( RuntimeWarning( "Trio guest run got abandoned without properly finishing... " @@ -2167,6 +2171,14 @@ def unrolled_run(runner, async_fn, args, host_uses_signal_set_wakeup_fd=False): finally: GLOBAL_RUN_CONTEXT.__dict__.clear() runner.close() + # Have to do this after runner.close() has disabled KI protection, + # because otherwise there's a race where ki_pending could get set + # after we check it. + if runner.ki_pending: + ki = KeyboardInterrupt() + if isinstance(runner.main_task_outcome, Error): + ki.__context__ = runner.main_task_outcome.error + runner.main_task_outcome = Error(ki) ################################################################ diff --git a/trio/_core/tests/test_guest_mode.py b/trio/_core/tests/test_guest_mode.py index d4efba3449..46e741e392 100644 --- a/trio/_core/tests/test_guest_mode.py +++ b/trio/_core/tests/test_guest_mode.py @@ -11,6 +11,7 @@ import trio import trio.testing from .tutil import gc_collect_harder +from ..._util import signal_raise # The simplest possible "host" loop. # Nice features: @@ -440,3 +441,35 @@ def bad_get_events(*args): trivial_guest_run(crash_in_worker_thread_io) gc_collect_harder() + + +def test_guest_mode_ki(): + assert signal.getsignal(signal.SIGINT) is signal.default_int_handler + + # Check SIGINT in Trio func and in host func + async def trio_main(in_host): + with pytest.raises(KeyboardInterrupt): + signal_raise(signal.SIGINT) + + # Host SIGINT should get injected into Trio + in_host(partial(signal_raise, signal.SIGINT)) + await trio.sleep(10) + + with pytest.raises(KeyboardInterrupt) as excinfo: + trivial_guest_run(trio_main) + assert excinfo.value.__context__ is None + # Signal handler should be restored properly on exit + assert signal.getsignal(signal.SIGINT) is signal.default_int_handler + + # Also check chaining in the case where KI is injected after main exits + final_exc = KeyError("whoa") + + async def trio_main_raising(in_host): + in_host(partial(signal_raise, signal.SIGINT)) + raise final_exc + + with pytest.raises(KeyboardInterrupt) as excinfo: + trivial_guest_run(trio_main_raising) + assert excinfo.value.__context__ is final_exc + + assert signal.getsignal(signal.SIGINT) is signal.default_int_handler From 7c9ce288ee5d8e5f6452eed4be4d34c2ee44f3f8 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Mon, 1 Jun 2020 14:39:39 -0700 Subject: [PATCH 33/38] Add note about guest mode KI handling to docs --- docs/source/reference-lowlevel.rst | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/docs/source/reference-lowlevel.rst b/docs/source/reference-lowlevel.rst index 92f0e1dab3..77f3a83170 100644 --- a/docs/source/reference-lowlevel.rst +++ b/docs/source/reference-lowlevel.rst @@ -833,6 +833,15 @@ important limitations you have to respect: Trio run, open a nursery, and then start your functions as child tasks in that nursery. +- Unless you or your host loop register a handler for `signal.SIGINT` + before starting Trio (this is not common), then Trio will take over + delivery of `KeyboardInterrupt`\s. And since Trio can't tell which + host code is safe to interrupt, it will only deliver + `KeyboardInterrupt` into the Trio part of your code. This is fine if + your program is set up to exit when the Trio part exits, because the + `KeyboardInterrupt` will propagate out of Trio and then trigger the + shutdown of your host loop, which is just what you want. + Given these constraints, we think the simplest approach is to always start and stop the two loops together. From 19882c73577880bd359763bcc6d73dda2283ed1c Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Mon, 1 Jun 2020 16:42:06 -0700 Subject: [PATCH 34/38] Clarify that you can't magically share async code between host and guest --- docs/source/reference-lowlevel.rst | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/docs/source/reference-lowlevel.rst b/docs/source/reference-lowlevel.rst index 77f3a83170..b72a5fe457 100644 --- a/docs/source/reference-lowlevel.rst +++ b/docs/source/reference-lowlevel.rst @@ -582,17 +582,21 @@ like Qt. Its advantages are: so it has low latency and doesn't waste electricity. - No need to think about threads: your Trio code runs in the same - thread as the host event loop, so you can freely call Trio APIs from - the host, and call host APIs from Trio. For example, if you're - making a GUI app with Qt as the host loop, then making a `cancel - button `__ and connecting - it to a `trio.CancelScope` is as easy as writing:: + thread as the host event loop, so you can freely call sync Trio APIs + from the host, and call sync host APIs from Trio. For example, if + you're making a GUI app with Qt as the host loop, then making a + `cancel button `__ and + connecting it to a `trio.CancelScope` is as easy as writing:: # Trio code can create Qt objects without any special ceremony... my_cancel_button = QPushButton("Cancel") # ...and Qt can call back to Trio just as easily my_cancel_button.clicked.connect(my_cancel_scope.cancel) + (For async APIs, it's not that simple, but you can build on this + make explicit bridges between the two worlds, e.g. by passing async + functions through queues.) + - Consistent behavior: guest mode uses the same code as regular Trio: the same scheduler, same IO code, same everything. So you get the full feature set and everything acts the way you expect. From 7207cde2956874236e062175f8a38c3e3ecbb28b Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Mon, 1 Jun 2020 17:58:18 -0700 Subject: [PATCH 35/38] Remove defunct draft docs --- docs/source/design.rst | 74 ------------------------------------------ 1 file changed, 74 deletions(-) diff --git a/docs/source/design.rst b/docs/source/design.rst index 184e60c110..6251f22cdb 100644 --- a/docs/source/design.rst +++ b/docs/source/design.rst @@ -534,77 +534,3 @@ there is no provision for "pluggable" backends. The intuition here is that we'd rather focus our energy on making one set of solid, official backends that provide a high-quality experience out-of-the-box on all supported systems. - - -Guest mode ----------- - -One of Trio's more unusual features is that it supports being run in -"guest mode" on top of some other event loop (the "host"). - -XX TODO: document this properly - -the basic idea of pushing ``get_events`` into a thread - -actual core logic is identical whether running in regular mode or -guest mode; alternate between waiting for I/O+timeout vs running trio tasks - -one extra wrinkle is: normally tasks can only become runnable, and -deadline can only change, if trio task is running. In guest mode, -that's no longer true. So reschedule() and deadline changes need to -potentially trigger the scheduler, or at least update the I/O -deadline. Do this in the simplest possible way: force the I/O thread -to return immediately via the normal path. - -subtlety around wait_all_tasks_blocked and 'events' semantics - -diagram:: - - - Normal mode - - Main thread executing trio.run: - - +---------------------------+ - | wait for I/O+timeout | - +---------------------------+ - | run trio tasks | - +---------------------------+ - | wait for I/O+timeout | - +---------------------------+ - | run trio tasks | - +---------------------------+ - | wait for I/O+timeout | - +---------------------------+ - | run trio tasks | - +---------------------------+ - . - . - . - - - Guest mode - - Main thread executing host loop: Trio I/O thread: - - +---------------------------+ - | host loop does its thing | +---------------------------+ - | | | wait for trio I/O+timeout | - +---------------------------+ +---------------------------+ - / - +---------------------------+ <---------------/ - | run trio tasks | - +---------------------------+ ----------------\ - \ - +---------------------------+ v - | host loop does its thing | +---------------------------+ - | | | wait for trio I/O+timeout | - +---------------------------+ +---------------------------+ - / - +---------------------------+ <---------------/ - | run trio tasks | - +---------------------------+ ----------------\ - \ - . - . - . From c6697dbe3e5865f48a41b7e13952f3065f1d2d45 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Mon, 1 Jun 2020 17:58:40 -0700 Subject: [PATCH 36/38] Remove obsolete placeholder --- docs/source/reference-lowlevel.rst | 3 --- 1 file changed, 3 deletions(-) diff --git a/docs/source/reference-lowlevel.rst b/docs/source/reference-lowlevel.rst index b72a5fe457..bf5666d2b0 100644 --- a/docs/source/reference-lowlevel.rst +++ b/docs/source/reference-lowlevel.rst @@ -892,9 +892,6 @@ loop's source. For example, asyncio may or may not use system. -**Control-C handling** XX FIXME - - **A small optimization:** Finally, consider a small optimization. Some event loops offer two versions of their "call this function soon" API: one that can be used from any thread, and one that can only be used From 66967ce57e91ce5fefc2c5db3e98ac1b33ca0a52 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Mon, 1 Jun 2020 18:03:40 -0700 Subject: [PATCH 37/38] Make kqueue code more similar to epoll code --- trio/_core/_io_kqueue.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/trio/_core/_io_kqueue.py b/trio/_core/_io_kqueue.py index ce85453749..d2d80a7341 100644 --- a/trio/_core/_io_kqueue.py +++ b/trio/_core/_io_kqueue.py @@ -23,12 +23,14 @@ class KqueueIOManager: # {(ident, filter): Task or UnboundedQueue} _registered = attr.ib(factory=dict) _force_wakeup = attr.ib(factory=WakeupSocketpair) + _force_wakeup_fd = attr.ib(default=None) def __attrs_post_init__(self): force_wakeup_event = select.kevent( self._force_wakeup.wakeup_sock, select.KQ_FILTER_READ, select.KQ_EV_ADD ) self._kqueue.control([force_wakeup_event], 0) + self._force_wakeup_fd = self._force_wakeup.wakeup_sock.fileno() def statistics(self): tasks_waiting = 0 @@ -66,14 +68,10 @@ def get_events(self, timeout): def process_events(self, events): for event in events: key = (event.ident, event.filter) - try: - receiver = self._registered[key] - except KeyError: - if event.ident == self._force_wakeup.wakeup_sock.fileno(): - self._force_wakeup.drain() - continue - else: # pragma: no cover - raise + if event.ident == self._force_wakeup_fd: + self._force_wakeup.drain() + continue + receiver = self._registered[key] if event.flags & select.KQ_EV_ONESHOT: del self._registered[key] if type(receiver) is _core.Task: From 3083de56427725eb5c32a821a7d9ca8cebea671b Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Mon, 1 Jun 2020 18:06:50 -0700 Subject: [PATCH 38/38] Improve wording in docs --- docs/source/reference-lowlevel.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/source/reference-lowlevel.rst b/docs/source/reference-lowlevel.rst index bf5666d2b0..36b37e3e37 100644 --- a/docs/source/reference-lowlevel.rst +++ b/docs/source/reference-lowlevel.rst @@ -593,9 +593,9 @@ like Qt. Its advantages are: # ...and Qt can call back to Trio just as easily my_cancel_button.clicked.connect(my_cancel_scope.cancel) - (For async APIs, it's not that simple, but you can build on this - make explicit bridges between the two worlds, e.g. by passing async - functions through queues.) + (For async APIs, it's not that simple, but you can use sync APIs to + build explicit bridges between the two worlds, e.g. by passing async + functions and their results back and forth through queues.) - Consistent behavior: guest mode uses the same code as regular Trio: the same scheduler, same IO code, same everything. So you get the