Skip to content

Commit

Permalink
Define web.Application.on_startup() handler
Browse files Browse the repository at this point in the history
Changes:

- Define `web.Application.on_startup()` signal handler.
- Run `app.on_startup()` along with the request handler within an event
loop in `web.run_app()`.
- Tests for `Application.on_startup` signal
- Tests for multiple tasks to run on startup, including two long running
- Extend tests on `web.run_app()`: ensure that `Application.startup()`
is called within `web.run_app()`.
- Add documentation for `Application.on_startup` signal handler:
    - Describe possible use cases
    - Minor: fix typo

Notes:

In the `tests/test_run_app.py` increased delay for `loop.call_later` from
`0.01s` to `0.02s` since with the old value loop used to stop
before coroutines gathered for `on_startup` finished, that caused
an exception.
  • Loading branch information
f0t0n committed Aug 22, 2016
1 parent d08954e commit 9302e7e
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 5 deletions.
21 changes: 18 additions & 3 deletions aiohttp/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def __init__(self, *, logger=web_logger, loop=None,
self._on_pre_signal = PreSignal()
self._on_post_signal = PostSignal()
self._on_response_prepare = Signal(self)
self._on_startup = Signal(self)
self._on_shutdown = Signal(self)
self._on_cleanup = Signal(self)

Expand All @@ -190,6 +191,10 @@ def on_pre_signal(self):
def on_post_signal(self):
return self._on_post_signal

@property
def on_startup(self):
return self._on_startup

@property
def on_shutdown(self):
return self._on_shutdown
Expand All @@ -214,6 +219,14 @@ def make_handler(self, **kwargs):
return self._handler_factory(
self, self.router, loop=self.loop, **kwargs)

@asyncio.coroutine
def startup(self):
"""Causes on_startup signal
Should be called in the event loop along with the request handler.
"""
yield from self.on_startup.send(self)

@asyncio.coroutine
def shutdown(self):
"""Causes on_shutdown signal
Expand Down Expand Up @@ -267,9 +280,11 @@ def run_app(app, *, host='0.0.0.0', port=None,
loop = app.loop

handler = app.make_handler()
srv = loop.run_until_complete(loop.create_server(handler, host, port,
ssl=ssl_context,
backlog=backlog))
server = loop.create_server(handler, host, port, ssl=ssl_context,
backlog=backlog)
srv, startup_res = loop.run_until_complete(asyncio.gather(server,
app.startup(),
loop=loop))

scheme = 'https' if ssl_context else 'http'
print("======== Running on {scheme}://{host}:{port}/ ========\n"
Expand Down
59 changes: 58 additions & 1 deletion docs/web.rst
Original file line number Diff line number Diff line change
Expand Up @@ -944,7 +944,7 @@ handler::

return ws

Signal handler may looks like::
Signal handler may look like::

async def on_shutdown(app):
for ws in app['websockets']:
Expand Down Expand Up @@ -986,6 +986,63 @@ finalizing. It's pretty close to :func:`run_app` utility function::
loop.run_until_complete(app.cleanup())
loop.close()

.. _aiohttp-web-background-tasks:

Background tasks
-----------------

Sometimes there's a need to perform some asynchronous operations just
after application start-up.

Even more, in some sophisticated systems there could be a need to run some
background tasks in the event loop along with the application's request
handler. Such as listening to message queue or other network message/event
sources (e.g. ZeroMQ, Redis Pub/Sub, AMQP, etc.) to react to received messages
within the application.

For example the background task could listen to ZeroMQ on :data:`zmq.SUB` socket,
process and forward retrieved messages to clients connected via WebSocket
that are stored somewhere in the application
(e.g. in the :obj:`application['websockets']` list).

To run such short and long running background tasks aiohttp provides an
ability to register :attr:`Application.on_startup` signal handler(s) that
will run along with the application's request handler.

For example there's a need to run one quick task and two long running
tasks that will live till the application is alive. The appropriate
background tasks could be registered as an :attr:`Application.on_startup`
signal handlers as shown in the example below::

app = web.Application()

async def quickly_notify_monitoring(app):
"""Send notification to monitoring service about the app process start-up"""
pass

async def listen_to_zeromq(app):
"""Listen to messages on zmq.SUB socket"""
pass

async def listen_to_redis(app):
"""Listen to messages from Redis Pub/Sub"""
pass

async def run_all_long_running_tasks(app):
return await asyncio.gather(listen_to_zeromq(app),
listen_to_redis(app),
loop=app.loop)
app.on_startup.append(quickly_notify_monitoring)
app.on_startup.append(run_all_long_running_tasks)
web.run_app(app)


The :func:`quickly_notify_monitoring` from the example above will complete
and exit but :func:`listen_to_zeromq` and :func:`listen_to_redis` will take
forever.
An :attr:`Application.on_cleanup` signal handler may be used to send a
cancellation to all registered long-running tasks.


CORS support
------------
Expand Down
24 changes: 23 additions & 1 deletion docs/web_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,21 @@ duplicated like one using :meth:`Application.copy`.
async def on_prepare(request, response):
pass

.. attribute:: on_startup

A :class:`~aiohttp.signals.Signal` that is fired on application start-up.

Subscribers may use the signal to run background tasks in the event
loop along with the application's request handler just after the
application start-up.

Signal handlers should have the following signature::

async def on_startup(app):
pass

.. seealso:: :ref:`aiohttp-web-background-tasks`.

.. attribute:: on_shutdown

A :class:`~aiohttp.signals.Signal` that is fired on application shutdown.
Expand Down Expand Up @@ -1076,7 +1091,6 @@ duplicated like one using :meth:`Application.copy`.

.. seealso:: :ref:`aiohttp-web-graceful-shutdown` and :attr:`on_shutdown`.


.. method:: make_handler(**kwargs)

Creates HTTP protocol factory for handling requests.
Expand Down Expand Up @@ -1104,6 +1118,14 @@ duplicated like one using :meth:`Application.copy`.
secure_proxy_ssl_header='X-Forwarded-Proto'),
'0.0.0.0', 8080)

.. coroutinemethod:: startup()

A :ref:`coroutine<coroutine>` that will be called along with the
application's request handler.

The purpose of the method is calling :attr:`on_startup` signal
handlers.

.. coroutinemethod:: shutdown()

A :ref:`coroutine<coroutine>` that should be called on
Expand Down
8 changes: 8 additions & 0 deletions tests/test_run_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,22 @@ def test_run_app_http(loop, mocker):
loop.call_later(0.02, loop.stop)

app = web.Application(loop=loop)
mocker.spy(app, 'startup')

web.run_app(app, print=lambda *args: None)

assert loop.is_closed()
loop.create_server.assert_called_with(mock.ANY, '0.0.0.0', 8080,
ssl=None, backlog=128)
app.startup.assert_called_once_with()


def test_run_app_https(loop, mocker):
mocker.spy(loop, 'create_server')
loop.call_later(0.02, loop.stop)

app = web.Application(loop=loop)
mocker.spy(app, 'startup')

ssl_context = ssl.create_default_context()

Expand All @@ -30,6 +33,7 @@ def test_run_app_https(loop, mocker):
assert loop.is_closed()
loop.create_server.assert_called_with(mock.ANY, '0.0.0.0', 8443,
ssl=ssl_context, backlog=128)
app.startup.assert_called_once_with()


def test_run_app_nondefault_host_port(loop, unused_port, mocker):
Expand All @@ -40,22 +44,26 @@ def test_run_app_nondefault_host_port(loop, unused_port, mocker):
loop.call_later(0.02, loop.stop)

app = web.Application(loop=loop)
mocker.spy(app, 'startup')

web.run_app(app, host=host, port=port, print=lambda *args: None)

assert loop.is_closed()
loop.create_server.assert_called_with(mock.ANY, host, port,
ssl=None, backlog=128)
app.startup.assert_called_once_with()


def test_run_app_custom_backlog(loop, mocker):
mocker.spy(loop, 'create_server')
loop.call_later(0.02, loop.stop)

app = web.Application(loop=loop)
mocker.spy(app, 'startup')

web.run_app(app, backlog=10, print=lambda *args: None)

assert loop.is_closed()
loop.create_server.assert_called_with(mock.ANY, '0.0.0.0', 8080,
ssl=None, backlog=10)
app.startup.assert_called_once_with()
45 changes: 45 additions & 0 deletions tests/test_web_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,48 @@ def on_shutdown(app_param):

yield from app.shutdown()
assert called


@pytest.mark.run_loop
def test_on_startup(loop):
app = web.Application(loop=loop)

blocking_called = False
long_running1_called = False
long_running2_called = False
all_long_running_called = False

def on_startup_blocking(app_param):
nonlocal blocking_called
assert app is app_param
blocking_called = True

@asyncio.coroutine
def long_running1(app_param):
nonlocal long_running1_called
assert app is app_param
long_running1_called = True

@asyncio.coroutine
def long_running2(app_param):
nonlocal long_running2_called
assert app is app_param
long_running2_called = True

@asyncio.coroutine
def on_startup_all_long_running(app_param):
nonlocal all_long_running_called
assert app is app_param
all_long_running_called = True
return (yield from asyncio.gather(long_running1(app_param),
long_running2(app_param),
loop=app_param.loop))

app.on_startup.append(on_startup_blocking)
app.on_startup.append(on_startup_all_long_running)

yield from app.startup()
assert blocking_called
assert long_running1_called
assert long_running2_called
assert all_long_running_called

0 comments on commit 9302e7e

Please sign in to comment.