Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable keepalives for websockets #2701

Merged
merged 3 commits into from
Feb 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ Changelog

.. towncrier release notes start


2.3.10 (XXXX-XX-XX)
===================

- Fix 100% CPU usage on HTTP GET and websocket connection just after it (#1955)

2.3.9 (2018-01-16)
==================

Expand Down
12 changes: 9 additions & 3 deletions aiohttp/web_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class RequestHandler(asyncio.streams.FlowControlMixin, asyncio.Protocol):
"""
_request_count = 0
_keepalive = False # keep transport open
KEEPALIVE_RESCHEDULE_DELAY = 1

def __init__(self, manager, *, loop=None,
keepalive_timeout=75, # NGINX default value is 75 secs
Expand Down Expand Up @@ -321,6 +322,9 @@ def keep_alive(self, val):
:param bool val: new state.
"""
self._keepalive = val
if self._keepalive_handle:
self._keepalive_handle.cancel()
self._keepalive_handle = None

def close(self):
"""Stop accepting new pipelinig messages and close
Expand Down Expand Up @@ -352,7 +356,7 @@ def log_exception(self, *args, **kw):
self.logger.exception(*args, **kw)

def _process_keepalive(self):
if self._force_close:
if self._force_close or not self._keepalive:
return

next = self._keepalive_time + self._keepalive_timeout
Expand All @@ -363,8 +367,10 @@ def _process_keepalive(self):
self.force_close(send_last_heartbeat=True)
return

self._keepalive_handle = self._loop.call_at(
next, self._process_keepalive)
# not all request handlers are done,
# reschedule itself to next second
self._keepalive_handle = self._loop.call_later(
self.KEEPALIVE_RESCHEDULE_DELAY, self._process_keepalive)

def pause_reading(self):
if not self._reading_paused:
Expand Down
2 changes: 2 additions & 0 deletions aiohttp/web_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ def _post_start(self, request, protocol, writer):
request._protocol, limit=2 ** 16, loop=self._loop)
request.protocol.set_parser(WebSocketReader(
self._reader, compress=self._compress))
# disable HTTP keepalive for WebSocket
request.protocol.keep_alive(False)

def can_prepare(self, request):
if self._writer is not None:
Expand Down
22 changes: 16 additions & 6 deletions tests/test_web_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from aiohttp import helpers, http, streams, web


@pytest.yield_fixture
@pytest.fixture
def make_srv(loop, manager):
srv = None

Expand Down Expand Up @@ -72,12 +72,12 @@ def handle(request):
return wrapper


@pytest.yield_fixture
@pytest.fixture
def writer(srv):
return http.PayloadWriter(srv.writer, srv._loop)


@pytest.yield_fixture
@pytest.fixture
def transport(buf):
transport = mock.Mock()

Expand Down Expand Up @@ -204,7 +204,7 @@ def test_connection_made(make_srv):
assert not srv._force_close


def test_connection_made_with_keepaplive(make_srv, transport):
def test_connection_made_with_tcp_keepaplive(make_srv, transport):
srv = make_srv()

sock = mock.Mock()
Expand All @@ -214,7 +214,7 @@ def test_connection_made_with_keepaplive(make_srv, transport):
socket.SO_KEEPALIVE, 1)


def test_connection_made_without_keepaplive(make_srv):
def test_connection_made_without_tcp_keepaplive(make_srv):
srv = make_srv(tcp_keepalive=False)

sock = mock.Mock()
Expand Down Expand Up @@ -260,6 +260,15 @@ def test_srv_keep_alive(srv):
assert not srv._keepalive


def test_srv_keep_alive_disable(srv):
handle = srv._keepalive_handle = mock.Mock()

srv.keep_alive(False)
assert not srv._keepalive
assert srv._keepalive_handle is None
handle.cancel.assert_called_with()


def test_slow_request(make_srv):
with pytest.warns(DeprecationWarning):
make_srv(slow_request_timeout=0.01)
Expand Down Expand Up @@ -583,6 +592,7 @@ def test_handle_500(srv, loop, buf, transport, request_handler):
@asyncio.coroutine
def test_keep_alive(make_srv, loop, transport, ceil):
srv = make_srv(keepalive_timeout=0.05)
srv.KEEPALIVE_RESCHEDULE_DELAY = 0.1
srv.connection_made(transport)

srv.keep_alive(True)
Expand All @@ -600,7 +610,7 @@ def test_keep_alive(make_srv, loop, transport, ceil):
assert srv._keepalive_handle is not None
assert not transport.close.called

yield from asyncio.sleep(0.1, loop=loop)
yield from asyncio.sleep(0.2, loop=loop)
assert transport.close.called
assert srv._waiters[0].cancelled

Expand Down
29 changes: 29 additions & 0 deletions tests/test_web_websocket_functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -758,3 +758,32 @@ def handler(request):
yield from ws.receive()

assert cancelled


@asyncio.coroutine
def test_websocket_disable_keepalive(loop, test_client):
@asyncio.coroutine
def handler(request):
ws = web.WebSocketResponse()
if not ws.can_prepare(request):
return web.Response(text='OK')
assert request.protocol._keepalive
yield from ws.prepare(request)
assert not request.protocol._keepalive
assert not request.protocol._keepalive_handle

yield from ws.send_str('OK')
yield from ws.close()
return ws

app = web.Application()
app.router.add_route('GET', '/', handler)
client = yield from test_client(app)

resp = yield from client.get('/')
txt = yield from resp.text()
assert txt == 'OK'

ws = yield from client.ws_connect('/')
data = yield from ws.receive_str()
assert data == 'OK'