From 2ff704b91c8a3888eae7bfc3b053168dc59bd66e Mon Sep 17 00:00:00 2001 From: Marcin Sulikowski Date: Sat, 10 Feb 2024 08:11:47 +0100 Subject: [PATCH] Fix spurious LocalProtocolError errors when processing pipelined requests (#2243) --- tests/protocols/test_http.py | 52 +++++++++++++++++++++--- uvicorn/protocols/http/h11_impl.py | 8 ++++ uvicorn/protocols/http/httptools_impl.py | 12 +++--- 3 files changed, 61 insertions(+), 11 deletions(-) diff --git a/tests/protocols/test_http.py b/tests/protocols/test_http.py index 5027be17a..8a00f38dc 100644 --- a/tests/protocols/test_http.py +++ b/tests/protocols/test_http.py @@ -176,6 +176,20 @@ def set_protocol(self, protocol): pass +class MockTimerHandle: + def __init__(self, loop_later_list, delay, callback, args): + self.loop_later_list = loop_later_list + self.delay = delay + self.callback = callback + self.args = args + self.cancelled = False + + def cancel(self): + if not self.cancelled: + self.cancelled = True + self.loop_later_list.remove(self) + + class MockLoop: def __init__(self): self._tasks = [] @@ -186,18 +200,20 @@ def create_task(self, coroutine): return MockTask() def call_later(self, delay, callback, *args): - self._later.insert(0, (delay, callback, args)) + handle = MockTimerHandle(self._later, delay, callback, args) + self._later.insert(0, handle) + return handle async def run_one(self): return await self._tasks.pop() def run_later(self, with_delay): later = [] - for delay, callback, args in self._later: - if with_delay >= delay: - callback(*args) + for timer_handle in self._later: + if with_delay >= timer_handle.delay: + timer_handle.callback(*timer_handle.args) else: - later.append((delay, callback, args)) + later.append(timer_handle) self._later = later @@ -315,6 +331,32 @@ async def test_keepalive_timeout(http_protocol_cls: HTTPProtocol): assert protocol.transport.is_closing() +@pytest.mark.anyio +async def test_keepalive_timeout_with_pipelined_requests( + http_protocol_cls: HTTPProtocol, +): + app = Response("Hello, world", media_type="text/plain") + + protocol = get_connected_protocol(app, http_protocol_cls) + protocol.data_received(SIMPLE_GET_REQUEST) + protocol.data_received(SIMPLE_GET_REQUEST) + + # After processing the first request, the keep-alive task should be + # disabled because the second request is not responded yet. + await protocol.loop.run_one() + assert b"HTTP/1.1 200 OK" in protocol.transport.buffer + assert b"Hello, world" in protocol.transport.buffer + assert protocol.timeout_keep_alive_task is None + + # Process the second request and ensure that the keep-alive task + # has been enabled again as the connection is now idle. + protocol.transport.clear_buffer() + await protocol.loop.run_one() + assert b"HTTP/1.1 200 OK" in protocol.transport.buffer + assert b"Hello, world" in protocol.transport.buffer + assert protocol.timeout_keep_alive_task is not None + + @pytest.mark.anyio async def test_close(http_protocol_cls: HTTPProtocol): app = Response(b"", status_code=204, headers={"connection": "close"}) diff --git a/uvicorn/protocols/http/h11_impl.py b/uvicorn/protocols/http/h11_impl.py index bee83122d..4922d1781 100644 --- a/uvicorn/protocols/http/h11_impl.py +++ b/uvicorn/protocols/http/h11_impl.py @@ -236,6 +236,14 @@ def handle_events(self) -> None: else: app = self.app + # When starting to process a request, disable the keep-alive + # timeout. Normally we disable this when receiving data from + # client and set back when finishing processing its request. + # However, for pipelined requests processing finishes after + # already receiving the next request and thus the timer may + # be set here, which we don't want. + self._unset_keepalive_if_required() + self.cycle = RequestResponseCycle( scope=self.scope, conn=self.conn, diff --git a/uvicorn/protocols/http/httptools_impl.py b/uvicorn/protocols/http/httptools_impl.py index 90a3bd9ff..e203745b1 100644 --- a/uvicorn/protocols/http/httptools_impl.py +++ b/uvicorn/protocols/http/httptools_impl.py @@ -326,22 +326,22 @@ def on_response_complete(self) -> None: if self.transport.is_closing(): return - # Set a short Keep-Alive timeout. self._unset_keepalive_if_required() - self.timeout_keep_alive_task = self.loop.call_later( - self.timeout_keep_alive, self.timeout_keep_alive_handler - ) - # Unpause data reads if needed. self.flow.resume_reading() - # Unblock any pipelined events. + # Unblock any pipelined events. If there are none, arm the + # Keep-Alive timeout instead. if self.pipeline: cycle, app = self.pipeline.pop() task = self.loop.create_task(cycle.run_asgi(app)) task.add_done_callback(self.tasks.discard) self.tasks.add(task) + else: + self.timeout_keep_alive_task = self.loop.call_later( + self.timeout_keep_alive, self.timeout_keep_alive_handler + ) def shutdown(self) -> None: """