diff --git a/lib/portage/util/futures/_asyncio/__init__.py b/lib/portage/util/futures/_asyncio/__init__.py index 8805e35756..cfdbe13511 100644 --- a/lib/portage/util/futures/_asyncio/__init__.py +++ b/lib/portage/util/futures/_asyncio/__init__.py @@ -26,6 +26,7 @@ import sys import types +import warnings import weakref import asyncio as _real_asyncio @@ -46,6 +47,7 @@ ) import threading +from typing import Optional import portage @@ -251,11 +253,35 @@ def _wrap_loop(loop=None): # The default loop returned by _wrap_loop should be consistent # with global_event_loop, in order to avoid accidental registration # of callbacks with a loop that is not intended to run. - loop = loop or _safe_loop() - return loop if hasattr(loop, "_asyncio_wrapper") else _AsyncioEventLoop(loop=loop) + if hasattr(loop, "_asyncio_wrapper"): + return loop + + # This returns a running loop if it exists, and otherwise returns + # a loop associated with the current thread. + safe_loop = _safe_loop(create=loop is None) + if safe_loop is not None and (loop is None or safe_loop._loop is loop): + return safe_loop + + if safe_loop is None: + msg = f"_wrap_loop argument '{loop}' not associated with thread '{threading.get_ident()}'" + else: + msg = f"_wrap_loop argument '{loop}' different frome loop '{safe_loop._loop}' already associated with thread '{threading.get_ident()}'" + + if portage._internal_caller: + raise AssertionError(msg) + # It's not known whether exernal API consumers will trigger this case, + # so if it happens then emit a UserWarning before returning a temporary + # AsyncioEventLoop instance. + warnings.warn(msg, UserWarning, stacklevel=2) -def _safe_loop(): + # We could possibly add a weak reference in _thread_weakrefs.loops when + # safe_loop is None, but if safe_loop is not None, then there is a + # collision in _thread_weakrefs.loops that would need to be resolved. + return _AsyncioEventLoop(loop=loop) + + +def _safe_loop(create: Optional[bool] = True) -> Optional[_AsyncioEventLoop]: """ Return an event loop that's safe to use within the current context. For portage internal callers or external API consumers calling from @@ -276,8 +302,13 @@ def _safe_loop(): are added to a WeakValueDictionary, and closed via an atexit hook if they still exist during exit for the current pid. - @rtype: asyncio.AbstractEventLoop (or compatible) - @return: event loop instance + @type create: bool + @param create: Create a loop by default if a loop is not already associated + with the current thread. If create is False, then return None if a loop + is not already associated with the current thread. + @rtype: AsyncioEventLoop or None + @return: event loop instance, or None if the create parameter is False and + a loop is not already associated with the current thread. """ loop = _get_running_loop() if loop is not None: @@ -292,6 +323,8 @@ def _safe_loop(): try: loop = _thread_weakrefs.loops[thread_key] except KeyError: + if not create: + return None try: try: _loop = _real_asyncio.get_running_loop()