Skip to content

Commit

Permalink
Add minimal asyncio.AbstractEventLoop implementation (bug 649588)
Browse files Browse the repository at this point in the history
This provides minimal interoperability with existing asyncio code,
by adding a portage.util.futures.unix_events.DefaultEventLoopPolicy
class that makes asyncio use portage's internal event loop when an
instance is passed into asyncio.set_event_loop_policy(). The
get_event_loop() method of this policy returns an instance of a
_PortageEventLoop class that wraps portage's internal event loop and
implements asyncio's AbstractEventLoop interface.

The portage.util.futures.asyncio module refers to the real
asyncio module when available, and otherwise falls back to a
minimal implementation that works with python2.7. The included
EventLoopInForkTestCase demonstrates usage, and works with all
supported versions of python, include python2.7.

In python3.4 and later, API consumers can use asyncio coroutines,
since _PortageEventLoop is compatible with asyncio.Task!

Bug: https://bugs.gentoo.org/649588
  • Loading branch information
zmedico committed Apr 11, 2018
1 parent 1689911 commit 142d08c
Show file tree
Hide file tree
Showing 9 changed files with 477 additions and 9 deletions.
Empty file.
Empty file.
59 changes: 59 additions & 0 deletions pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright 2018 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2

import multiprocessing
import os

from portage.tests import TestCase
from portage.util.futures import asyncio
from portage.util.futures.unix_events import DefaultEventLoopPolicy


def fork_main(parent_conn, child_conn):
parent_conn.close()
loop = asyncio.get_event_loop()
# This fails with python's default event loop policy,
# see https://bugs.python.org/issue22087.
loop.run_until_complete(asyncio.sleep(0.1))


def async_main(fork_exitcode, loop=None):
loop = loop or asyncio.get_event_loop()

# Since python2.7 does not support Process.sentinel, use Pipe to
# monitor for process exit.
parent_conn, child_conn = multiprocessing.Pipe()

def eof_callback(proc):
loop.remove_reader(parent_conn.fileno())
parent_conn.close()
proc.join()
fork_exitcode.set_result(proc.exitcode)

proc = multiprocessing.Process(target=fork_main, args=(parent_conn, child_conn))
loop.add_reader(parent_conn.fileno(), eof_callback, proc)
proc.start()
child_conn.close()


class EventLoopInForkTestCase(TestCase):
"""
The default asyncio event loop policy does not support loops
running in forks, see https://bugs.python.org/issue22087.
Portage's DefaultEventLoopPolicy supports forks.
"""

def testEventLoopInForkTestCase(self):
initial_policy = asyncio.get_event_loop_policy()
if not isinstance(initial_policy, DefaultEventLoopPolicy):
asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
try:
loop = asyncio.get_event_loop()
fork_exitcode = loop.create_future()
# Make async_main fork while the loop is running, which would
# trigger https://bugs.python.org/issue22087 with asyncio's
# default event loop policy.
loop.call_soon(async_main, fork_exitcode)
assert loop.run_until_complete(fork_exitcode) == os.EX_OK
finally:
asyncio.set_event_loop_policy(initial_policy)
11 changes: 5 additions & 6 deletions pym/portage/util/_eventloop/EventLoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@

import portage
portage.proxy.lazyimport.lazyimport(globals(),
'portage.util.futures.futures:_EventLoopFuture',
'portage.util.futures.futures:Future',
'portage.util.futures.executor.fork:ForkExecutor',
'portage.util.futures.unix_events:_PortageEventLoop',
)

from portage import OrderedDict
Expand Down Expand Up @@ -188,15 +189,13 @@ def __init__(self, main=True):
self._sigchld_write = None
self._sigchld_src_id = None
self._pid = os.getpid()
self._asyncio_wrapper = _PortageEventLoop(loop=self)

def create_future(self):
"""
Create a Future object attached to the loop. This returns
an instance of _EventLoopFuture, because EventLoop is currently
missing some of the asyncio.AbstractEventLoop methods that
asyncio.Future requires.
Create a Future object attached to the loop.
"""
return _EventLoopFuture(loop=self)
return Future(loop=self._asyncio_wrapper)

def _new_source_id(self):
"""
Expand Down
11 changes: 11 additions & 0 deletions pym/portage/util/futures/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Copyright 2018 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2

__all__ = (
'asyncio',
)

try:
import asyncio
except ImportError:
from portage.util.futures import _asyncio as asyncio
114 changes: 114 additions & 0 deletions pym/portage/util/futures/_asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Copyright 2018 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2

__all__ = (
'ensure_future',
'get_event_loop',
'get_event_loop_policy',
'set_event_loop_policy',
'sleep',
'Task',
)

try:
import threading
except ImportError:
import dummy_threading as threading

import portage
portage.proxy.lazyimport.lazyimport(globals(),
'portage.util.futures.unix_events:DefaultEventLoopPolicy',
)
from portage.util.futures.futures import Future

_lock = threading.Lock()
_policy = None


def get_event_loop_policy():
"""
Get the current event loop policy.
@rtype: asyncio.AbstractEventLoopPolicy (or compatible)
@return: the current event loop policy
"""
global _lock, _policy
with _lock:
if _policy is None:
_policy = DefaultEventLoopPolicy()
return _policy


def set_event_loop_policy(policy):
"""
Set the current event loop policy. If policy is None, the default
policy is restored.
@type policy: asyncio.AbstractEventLoopPolicy or None
@param policy: new event loop policy
"""
global _lock, _policy
with _lock:
_policy = policy or DefaultEventLoopPolicy()


def get_event_loop():
"""
Equivalent to calling get_event_loop_policy().get_event_loop().
@rtype: asyncio.AbstractEventLoop (or compatible)
@return: the event loop for the current context
"""
return get_event_loop_policy().get_event_loop()


class Task(Future):
"""
Schedule the execution of a coroutine: wrap it in a future. A task
is a subclass of Future.
"""
def __init__(self, coro, loop=None):
raise NotImplementedError


def ensure_future(coro_or_future, loop=None):
"""
Wrap a coroutine or an awaitable in a future.
If the argument is a Future, it is returned directly.
@type coro_or_future: coroutine or Future
@param coro_or_future: coroutine or future to wrap
@type loop: asyncio.AbstractEventLoop (or compatible)
@param loop: event loop
@rtype: asyncio.Future (or compatible)
@return: an instance of Future
"""
if isinstance(coro_or_future, Future):
return coro_or_future
raise NotImplementedError


def sleep(delay, result=None, loop=None):
"""
Create a future that completes after a given time (in seconds). If
result is provided, it is produced to the caller when the future
completes.
@type delay: int or float
@param delay: delay seconds
@type result: object
@param result: result of the future
@type loop: asyncio.AbstractEventLoop (or compatible)
@param loop: event loop
@rtype: asyncio.Future (or compatible)
@return: an instance of Future
"""
loop = loop or get_event_loop()
future = loop.create_future()
handle = loop.call_later(delay, future.set_result, result)
def cancel_callback(future):
if future.cancelled():
handle.cancel()
future.add_done_callback(cancel_callback)
return future
Loading

0 comments on commit 142d08c

Please sign in to comment.