Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a graceful shutdown period to allow tasks to complete. #7188

Merged
merged 29 commits into from
Feb 11, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1a82747
Add a graceful shutdown period to allow tasks to complete.
Dreamsorcerer Jan 28, 2023
2ebd759
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 28, 2023
485fd1d
Create 7188.feature
Dreamsorcerer Jan 28, 2023
0f67bd8
Update web_reference.rst
Dreamsorcerer Jan 28, 2023
f3a3375
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 28, 2023
b8daaea
Update web_advanced.rst
Dreamsorcerer Jan 28, 2023
1438a9f
Update http_writer.py
Dreamsorcerer Jan 28, 2023
fd09429
Update test_run_app.py
Dreamsorcerer Jan 28, 2023
524fd25
Update web_runner.py
Dreamsorcerer Jan 28, 2023
6356e1e
Add parent task to excludes.
Dreamsorcerer Jan 28, 2023
2bbec65
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 28, 2023
4329fb3
Enable handler_cancellation by default in tests.
Dreamsorcerer Jan 28, 2023
4fcfd12
Mypy
Dreamsorcerer Jan 28, 2023
d35c0ba
Syntax
Dreamsorcerer Jan 28, 2023
024c27a
Shield close call.
Dreamsorcerer Jan 28, 2023
19b5e64
Tweak timing
Dreamsorcerer Jan 28, 2023
7f17c12
Tweak timing
Dreamsorcerer Jan 28, 2023
e6ae701
Tweak timing
Dreamsorcerer Jan 28, 2023
1594fcc
Update docs/web_advanced.rst
Dreamsorcerer Jan 28, 2023
2e92f2f
Update CHANGES/7188.feature
Dreamsorcerer Jan 28, 2023
9e1311f
Debug
Dreamsorcerer Jan 28, 2023
8a3a72b
Merge branch 'Dreamsorcerer-graceful-shutdown' of github.com:aio-libs…
Dreamsorcerer Jan 28, 2023
9ee2b46
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 28, 2023
a921707
Debug
Dreamsorcerer Jan 28, 2023
cd3db03
Debug
Dreamsorcerer Jan 28, 2023
d614444
Cleanup
Dreamsorcerer Jan 28, 2023
808511e
Update test_run_app.py
Dreamsorcerer Feb 11, 2023
3261168
Update docs/web_reference.rst
Dreamsorcerer Feb 11, 2023
69c08d8
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/7188.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added a graceful shutdown period which allows pending tasks to complete before the application's cleanup is called. The period can be adjusted with the ``shutdown_timeout`` parameter. -- by :user:`Dreamsorcerer`
Dreamsorcerer marked this conversation as resolved.
Show resolved Hide resolved
1 change: 1 addition & 0 deletions aiohttp/http_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def _write(self, chunk: bytes) -> None:
self.buffer_size += size
self.output_size += size

print("FOO", self._transport.__class__, self._transport.is_closing())
if self._transport is None or self._transport.is_closing():
raise ConnectionResetError("Cannot write to closing transport")
self._transport.write(chunk)
Expand Down
18 changes: 17 additions & 1 deletion aiohttp/web_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import signal
import socket
from abc import ABC, abstractmethod
from contextlib import suppress
from typing import Any, List, Optional, Set, Type

from yarl import URL
Expand Down Expand Up @@ -80,11 +81,21 @@ async def stop(self) -> None:
# named pipes do not have wait_closed property
if hasattr(self._server, "wait_closed"):
await self._server.wait_closed()

# Wait for pending tasks for a given time limit.
with suppress(asyncio.TimeoutError):
await asyncio.wait_for(self._wait(), timeout=self._shutdown_timeout)

await self._runner.shutdown()
assert self._runner.server
await self._runner.server.shutdown(self._shutdown_timeout)
self._runner._unreg_site(self)

async def _wait(self) -> None:
exclude = self._runner.starting_tasks | {asyncio.current_task()}
while tasks := asyncio.all_tasks() - exclude:
await asyncio.wait(tasks)


class TCPSite(BaseSite):
__slots__ = ("_host", "_port", "_reuse_address", "_reuse_port")
Expand Down Expand Up @@ -247,7 +258,7 @@ async def start(self) -> None:


class BaseRunner(ABC):
__slots__ = ("_handle_signals", "_kwargs", "_server", "_sites")
__slots__ = ("starting_tasks", "_handle_signals", "_kwargs", "_server", "_sites")

def __init__(self, *, handle_signals: bool = False, **kwargs: Any) -> None:
self._handle_signals = handle_signals
Expand Down Expand Up @@ -287,6 +298,11 @@ async def setup(self) -> None:
pass

self._server = await self._make_server()
# On shutdown we want to avoid waiting on tasks which run forever.
# It's very likely that all tasks which run forever will have been created by
# the time we have completed the application startup (in self._make_server()),
# so we just record all running tasks here and exclude them later.
self.starting_tasks = asyncio.all_tasks()

@abstractmethod
async def shutdown(self) -> None:
Expand Down
203 changes: 201 additions & 2 deletions tests/test_run_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
import ssl
import subprocess
import sys
from typing import Any
import time
from typing import Any, Callable, NoReturn
from unittest import mock
from uuid import uuid4

import pytest
from conftest import needs_unix

from aiohttp import web
from aiohttp import ClientConnectorError, ClientSession, web
from aiohttp.test_utils import make_mocked_coro
from aiohttp.web_runner import BaseRunner

Expand Down Expand Up @@ -926,3 +927,201 @@ async def init():

web.run_app(init(), print=stopper(patched_loop), loop=patched_loop)
assert count == 3


class TestShutdown:
def raiser(self) -> NoReturn:
raise KeyboardInterrupt

async def stop(self, request: web.Request) -> web.Response:
asyncio.get_running_loop().call_soon(self.raiser)
return web.Response()

def run_app(
self, port: int, timeout: int, task, extra_test=None, cleanup=None
) -> asyncio.Task:
async def test() -> None:
await asyncio.sleep(1)
async with ClientSession() as sess:
async with sess.get(f"http://localhost:{port}/") as resp:
pass
async with sess.get(f"http://localhost:{port}/stop") as resp:
pass

if extra_test:
await extra_test(sess)

async def run_test(app: web.Application) -> None:
nonlocal test_task
test_task = asyncio.create_task(test())
yield
await test_task

Check notice

Code scanning / CodeQL

Statement has no effect

This statement has no effect.

async def handler(request: web.Request) -> web.Response:
nonlocal t
t = asyncio.create_task(task())
return web.Response(text="FOO")

t = test_task = None
app = web.Application()
app.cleanup_ctx.append(run_test)
if cleanup:
app.on_shutdown.append(cleanup)
app.router.add_get("/", handler)
app.router.add_get("/stop", self.stop)

web.run_app(app, port=port, shutdown_timeout=timeout)
assert test_task.exception() is None
return t

def test_shutdown_wait_for_task(
self, aiohttp_unused_port: Callable[[], int]
) -> None:
port = aiohttp_unused_port()
finished = False

async def task():
nonlocal finished
await asyncio.sleep(2)
finished = True

t = self.run_app(port, 3, task)

assert finished is True
assert t.done()
assert not t.cancelled()

def test_shutdown_timeout_task(
self, aiohttp_unused_port: Callable[[], int]
) -> None:
port = aiohttp_unused_port()
finished = False

async def task():
nonlocal finished
await asyncio.sleep(2)
finished = True

t = self.run_app(port, 1, task)

assert finished is False
assert t.done()
assert t.cancelled()

def test_shutdown_wait_for_spawned_task(
self, aiohttp_unused_port: Callable[[], int]
) -> None:
port = aiohttp_unused_port()
finished = False
finished_sub = False
sub_t = None

async def sub_task():
nonlocal finished_sub
await asyncio.sleep(1.5)
finished_sub = True

async def task():
nonlocal finished, sub_t
await asyncio.sleep(0.5)
sub_t = asyncio.create_task(sub_task())
finished = True

t = self.run_app(port, 3, task)

assert finished is True
assert t.done()
assert not t.cancelled()
assert finished_sub is True
assert sub_t.done()
assert not sub_t.cancelled()

def test_shutdown_timeout_not_reached(
self, aiohttp_unused_port: Callable[[], int]
) -> None:
port = aiohttp_unused_port()
finished = False

async def task():
nonlocal finished
await asyncio.sleep(1)
finished = True

start_time = time.time()
t = self.run_app(port, 15, task)

assert finished is True
assert t.done()
# Verify run_app has not waited for timeout.
assert time.time() - start_time < 10

def test_shutdown_new_conn_rejected(
self, aiohttp_unused_port: Callable[[], int]
) -> None:
port = aiohttp_unused_port()
finished = False

async def task() -> None:
nonlocal finished
await asyncio.sleep(2)
finished = True

async def test(sess: ClientSession) -> None:
# Ensure we are in the middle of shutdown (waiting for task()).
await asyncio.sleep(1)
with pytest.raises(ClientConnectorError):
# Use a new session to try and open a new connection.
async with ClientSession() as sess:
async with sess.get(f"http://localhost:{port}/") as resp:
pass
assert finished is False

t = self.run_app(port, 3, task, test)

assert finished is True
assert t.done()

def test_shutdown_pending_handler_responds(
self, aiohttp_unused_port: Callable[[], int]
) -> None:
port = aiohttp_unused_port()
finished = False

async def test() -> None:
async def test_resp(sess):
async with sess.get(f"http://localhost:{port}/") as resp:
assert await resp.text() == "FOO"

await asyncio.sleep(1)
async with ClientSession() as sess:
t = asyncio.create_task(test_resp(sess))
await asyncio.sleep(1)
# Handler is in-progress while we trigger server shutdown.
async with sess.get(f"http://localhost:{port}/stop") as resp:
pass

assert finished == False
# Handler should still complete and produce a response.
await t

Check notice

Code scanning / CodeQL

Statement has no effect

This statement has no effect.

async def run_test(app: web.Application) -> None:
nonlocal t
t = asyncio.create_task(test())
yield
await t

Check notice

Code scanning / CodeQL

Statement has no effect

This statement has no effect.

async def handler(request: web.Request) -> web.Response:
nonlocal finished
await asyncio.sleep(3)
finished = True
return web.Response(text="FOO")

t = None
app = web.Application()
app.cleanup_ctx.append(run_test)
app.router.add_get("/", handler)
app.router.add_get("/stop", self.stop)

web.run_app(app, port=port, shutdown_timeout=5)
assert t.exception() is None
assert finished is True