diff --git a/uvicorn/protocols/http/h11_impl.py b/uvicorn/protocols/http/h11_impl.py index 6a362ff20..e9583d70b 100644 --- a/uvicorn/protocols/http/h11_impl.py +++ b/uvicorn/protocols/http/h11_impl.py @@ -113,7 +113,6 @@ def __init__(self, config, server_state, _loop=None): self.scope = None self.headers = None self.cycle = None - self.message_event = asyncio.Event() # Protocol interface def connection_made(self, transport): @@ -146,7 +145,8 @@ def connection_lost(self, exc): # Premature client disconnect pass - self.message_event.set() + if self.cycle is not None: + self.cycle.message_event.set() if self.flow is not None: self.flow.resume_writing() @@ -234,7 +234,7 @@ def handle_events(self): access_logger=self.access_logger, access_log=self.access_log, default_headers=self.default_headers, - message_event=self.message_event, + message_event=asyncio.Event(), on_response=self.on_response_complete, ) task = self.loop.create_task(self.cycle.run_asgi(app)) @@ -247,7 +247,7 @@ def handle_events(self): self.cycle.body += event.data if len(self.cycle.body) > HIGH_WATER_LIMIT: self.flow.pause_reading() - self.message_event.set() + self.cycle.message_event.set() elif event_type is h11.EndOfMessage: if self.conn.our_state is h11.DONE: @@ -255,7 +255,7 @@ def handle_events(self): self.conn.start_next_cycle() continue self.cycle.more_body = False - self.message_event.set() + self.cycle.message_event.set() def handle_upgrade(self, event): upgrade_value = None @@ -491,6 +491,7 @@ async def send(self, message): # Handle response completion if not more_body: self.response_complete = True + self.message_event.set() event = h11.EndOfMessage() output = self.conn.send(event) self.transport.write(output) diff --git a/uvicorn/protocols/http/httptools_impl.py b/uvicorn/protocols/http/httptools_impl.py index 15262326b..924e131ce 100644 --- a/uvicorn/protocols/http/httptools_impl.py +++ b/uvicorn/protocols/http/httptools_impl.py @@ -121,7 +121,6 @@ def __init__(self, config, server_state, _loop=None): self.headers = None self.expect_100_continue = False self.cycle = None - self.message_event = asyncio.Event() # Protocol interface def connection_made(self, transport): @@ -146,7 +145,8 @@ def connection_lost(self, exc): if self.cycle and not self.cycle.response_complete: self.cycle.disconnected = True - self.message_event.set() + if self.cycle is not None: + self.cycle.message_event.set() if self.flow is not None: self.flow.resume_writing() @@ -267,7 +267,7 @@ def on_headers_complete(self): access_logger=self.access_logger, access_log=self.access_log, default_headers=self.default_headers, - message_event=self.message_event, + message_event=asyncio.Event(), expect_100_continue=self.expect_100_continue, keep_alive=http_version != "1.0", on_response=self.on_response_complete, @@ -288,13 +288,13 @@ def on_body(self, body: bytes): self.cycle.body += body if len(self.cycle.body) > HIGH_WATER_LIMIT: self.flow.pause_reading() - self.message_event.set() + self.cycle.message_event.set() def on_message_complete(self): if self.parser.should_upgrade() or self.cycle.response_complete: return self.cycle.more_body = False - self.message_event.set() + self.cycle.message_event.set() def on_response_complete(self): # Callback for pipelined HTTP requests to be started. @@ -530,6 +530,7 @@ async def send(self, message): if self.expected_content_length != 0: raise RuntimeError("Response content shorter than Content-Length") self.response_complete = True + self.message_event.set() if not self.keep_alive: self.transport.close() self.on_response()