From 64049e55a0a8ba1052bbea1f85c30e25c167fa5e Mon Sep 17 00:00:00 2001 From: euri10 Date: Wed, 11 Nov 2020 17:46:27 +0100 Subject: [PATCH 01/23] Revert "Cancel old keepalive-trigger before setting new one. (#832)" This reverts commit d5dcf80c --- uvicorn/protocols/http/h11_impl.py | 7 +------ uvicorn/protocols/http/httptools_impl.py | 7 +------ 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/uvicorn/protocols/http/h11_impl.py b/uvicorn/protocols/http/h11_impl.py index 6a362ff20..fb70038ef 100644 --- a/uvicorn/protocols/http/h11_impl.py +++ b/uvicorn/protocols/http/h11_impl.py @@ -153,14 +153,11 @@ def connection_lost(self, exc): def eof_received(self): pass - def _unset_keepalive_if_required(self): + def data_received(self, data): if self.timeout_keep_alive_task is not None: self.timeout_keep_alive_task.cancel() self.timeout_keep_alive_task = None - def data_received(self, data): - self._unset_keepalive_if_required() - self.conn.receive_data(data) self.handle_events() @@ -302,8 +299,6 @@ def on_response_complete(self): return # Set a short Keep-Alive timeout. - self._unset_keepalive_if_required() - self.timeout_keep_alive_task = self.loop.call_later( self.timeout_keep_alive, self.timeout_keep_alive_handler ) diff --git a/uvicorn/protocols/http/httptools_impl.py b/uvicorn/protocols/http/httptools_impl.py index 15262326b..1642b7abb 100644 --- a/uvicorn/protocols/http/httptools_impl.py +++ b/uvicorn/protocols/http/httptools_impl.py @@ -153,14 +153,11 @@ def connection_lost(self, exc): def eof_received(self): pass - def _unset_keepalive_if_required(self): + def data_received(self, data): if self.timeout_keep_alive_task is not None: self.timeout_keep_alive_task.cancel() self.timeout_keep_alive_task = None - def data_received(self, data): - self._unset_keepalive_if_required() - try: self.parser.feed_data(data) except httptools.HttpParserError: @@ -304,8 +301,6 @@ def on_response_complete(self): return # Set a short Keep-Alive timeout. - self._unset_keepalive_if_required() - self.timeout_keep_alive_task = self.loop.call_later( self.timeout_keep_alive, self.timeout_keep_alive_handler ) From 503e7a1bd6c92c35c0789520fc58f5c3713c85f3 Mon Sep 17 00:00:00 2001 From: euri10 Date: Wed, 11 Nov 2020 17:48:30 +0100 Subject: [PATCH 02/23] Revert "Revert "Cancel old keepalive-trigger before setting new one. (#832)"" This reverts commit 64049e55 --- uvicorn/protocols/http/h11_impl.py | 7 ++++++- uvicorn/protocols/http/httptools_impl.py | 7 ++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/uvicorn/protocols/http/h11_impl.py b/uvicorn/protocols/http/h11_impl.py index fb70038ef..6a362ff20 100644 --- a/uvicorn/protocols/http/h11_impl.py +++ b/uvicorn/protocols/http/h11_impl.py @@ -153,11 +153,14 @@ def connection_lost(self, exc): def eof_received(self): pass - def data_received(self, data): + def _unset_keepalive_if_required(self): if self.timeout_keep_alive_task is not None: self.timeout_keep_alive_task.cancel() self.timeout_keep_alive_task = None + def data_received(self, data): + self._unset_keepalive_if_required() + self.conn.receive_data(data) self.handle_events() @@ -299,6 +302,8 @@ def on_response_complete(self): return # Set a short Keep-Alive timeout. + self._unset_keepalive_if_required() + self.timeout_keep_alive_task = self.loop.call_later( self.timeout_keep_alive, self.timeout_keep_alive_handler ) diff --git a/uvicorn/protocols/http/httptools_impl.py b/uvicorn/protocols/http/httptools_impl.py index 1642b7abb..15262326b 100644 --- a/uvicorn/protocols/http/httptools_impl.py +++ b/uvicorn/protocols/http/httptools_impl.py @@ -153,11 +153,14 @@ def connection_lost(self, exc): def eof_received(self): pass - def data_received(self, data): + def _unset_keepalive_if_required(self): if self.timeout_keep_alive_task is not None: self.timeout_keep_alive_task.cancel() self.timeout_keep_alive_task = None + def data_received(self, data): + self._unset_keepalive_if_required() + try: self.parser.feed_data(data) except httptools.HttpParserError: @@ -301,6 +304,8 @@ def on_response_complete(self): return # Set a short Keep-Alive timeout. + self._unset_keepalive_if_required() + self.timeout_keep_alive_task = self.loop.call_later( self.timeout_keep_alive, self.timeout_keep_alive_handler ) From 4dc7639f46dcfe1817212e8cff527c9689f45ac7 Mon Sep 17 00:00:00 2001 From: euri10 Date: Thu, 12 Nov 2020 08:02:48 +0100 Subject: [PATCH 03/23] App that reproduce issue with quart --- 846_quart_race.py | 27 ++++++++++++++++++++++++ uvicorn/middleware/message_logger.py | 2 +- uvicorn/protocols/http/httptools_impl.py | 20 ++++++++++++++++++ 3 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 846_quart_race.py diff --git a/846_quart_race.py b/846_quart_race.py new file mode 100644 index 000000000..70b58721e --- /dev/null +++ b/846_quart_race.py @@ -0,0 +1,27 @@ +from quart import Quart, request as qrequest + +from starlette.applications import Starlette +from starlette.responses import Response +import uvicorn + +qapp = Quart(__name__) +sapp = Starlette() + + +@sapp.route('/', methods=['POST']) +async def starlette(request): + data = await request.body() + return Response(data) + + +@qapp.route('/', methods=['POST']) +async def quart(): + data = await qrequest.get_data() + return data + # return data, 200, {'Connection': 'close'} + +if __name__ == '__main__': + uvicorn.run("846_quart_race:qapp", log_level="trace") + # uvicorn.run("846_quart_race:sapp", log_level="trace") + + diff --git a/uvicorn/middleware/message_logger.py b/uvicorn/middleware/message_logger.py index a05b5c137..7ad001012 100644 --- a/uvicorn/middleware/message_logger.py +++ b/uvicorn/middleware/message_logger.py @@ -4,7 +4,7 @@ "body": "<{length} bytes>", "bytes": "<{length} bytes>", "text": "<{length} chars>", - "headers": "<...>", + # "headers": "<...>", } TRACE_LOG_LEVEL = 5 diff --git a/uvicorn/protocols/http/httptools_impl.py b/uvicorn/protocols/http/httptools_impl.py index 15262326b..601f2b3ca 100644 --- a/uvicorn/protocols/http/httptools_impl.py +++ b/uvicorn/protocols/http/httptools_impl.py @@ -154,6 +154,7 @@ def eof_received(self): pass def _unset_keepalive_if_required(self): + self.logger.log(TRACE_LOG_LEVEL, f"unset ka") if self.timeout_keep_alive_task is not None: self.timeout_keep_alive_task.cancel() self.timeout_keep_alive_task = None @@ -273,12 +274,14 @@ def on_headers_complete(self): on_response=self.on_response_complete, ) if existing_cycle is None or existing_cycle.response_complete: + self.logger.log(TRACE_LOG_LEVEL, f"ka: {existing_cycle.keep_alive if existing_cycle else None} existing: {existing_cycle} {existing_cycle.response_complete if existing_cycle else None}") # Standard case - start processing the request. task = self.loop.create_task(self.cycle.run_asgi(app)) task.add_done_callback(self.tasks.discard) self.tasks.add(task) else: # Pipelined HTTP requests need to be queued up. + self.logger.log(TRACE_LOG_LEVEL, "pipeline") self.flow.pause_reading() self.pipeline.insert(0, (self.cycle, app)) @@ -287,16 +290,20 @@ def on_body(self, body: bytes): return self.cycle.body += body if len(self.cycle.body) > HIGH_WATER_LIMIT: + self.logger.log(TRACE_LOG_LEVEL, "PAUSE") self.flow.pause_reading() self.message_event.set() def on_message_complete(self): + self.logger.log(TRACE_LOG_LEVEL, "message_complete") if self.parser.should_upgrade() or self.cycle.response_complete: return self.cycle.more_body = False self.message_event.set() + self.logger.log(TRACE_LOG_LEVEL, "message_complete end") def on_response_complete(self): + self.logger.log(TRACE_LOG_LEVEL, f"On response_complete") # Callback for pipelined HTTP requests to be started. self.server_state.total_requests += 1 @@ -313,13 +320,17 @@ def on_response_complete(self): # Unpause data reads if needed. self.flow.resume_reading() + self.logger.log(TRACE_LOG_LEVEL, f"read ok") # Unblock any pipelined events. if self.pipeline: + self.logger.log(TRACE_LOG_LEVEL, f"On response_complete pipeline") cycle, app = self.pipeline.pop() task = self.loop.create_task(cycle.run_asgi(app)) task.add_done_callback(self.tasks.discard) self.tasks.add(task) + self.logger.log(TRACE_LOG_LEVEL, f"On response END") + def shutdown(self): """ Called by the server to commence a graceful shutdown. @@ -415,6 +426,7 @@ async def run_asgi(self, app): self.logger.error(msg) self.transport.close() finally: + self.on_response = None async def send_500_response(self): @@ -437,9 +449,11 @@ async def send(self, message): message_type = message["type"] if self.flow.write_paused and not self.disconnected: + self.logger.log(TRACE_LOG_LEVEL, "Drain in send") await self.flow.drain() if self.disconnected: + self.logger.log(TRACE_LOG_LEVEL, "Disc in send") return if not self.response_started: @@ -482,6 +496,7 @@ async def send(self, message): self.expected_content_length = 0 self.chunked_encoding = True elif name == b"connection" and value.lower() == b"close": + self.logger.log(TRACE_LOG_LEVEL, "setting ka false") self.keep_alive = False content.extend([name, b": ", value, b"\r\n"]) @@ -527,10 +542,12 @@ async def send(self, message): # Handle response completion if not more_body: + self.logger.log(TRACE_LOG_LEVEL,f"3 D: {self.disconnected} C: {self.response_complete}") if self.expected_content_length != 0: raise RuntimeError("Response content shorter than Content-Length") self.response_complete = True if not self.keep_alive: + self.logger.log(TRACE_LOG_LEVEL, "not ka") self.transport.close() self.on_response() @@ -545,13 +562,16 @@ async def receive(self): self.waiting_for_100_continue = False if not self.disconnected and not self.response_complete: + self.logger.log(TRACE_LOG_LEVEL, f"0 D: {self.disconnected} C: {self.response_complete}") self.flow.resume_reading() await self.message_event.wait() self.message_event.clear() if self.disconnected or self.response_complete: + self.logger.log(TRACE_LOG_LEVEL, f"1 D: {self.disconnected} C: {self.response_complete}") message = {"type": "http.disconnect"} else: + self.logger.log(TRACE_LOG_LEVEL, f"2 D: {self.disconnected} C: {self.response_complete}") message = { "type": "http.request", "body": self.body, From ecd7502ba84c1271920099a3591047840c7d7991 Mon Sep 17 00:00:00 2001 From: euri10 Date: Thu, 12 Nov 2020 08:44:27 +0100 Subject: [PATCH 04/23] Trim logs --- 846_quart_race.py | 4 ++-- uvicorn/protocols/http/httptools_impl.py | 21 ++++++++------------- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/846_quart_race.py b/846_quart_race.py index 70b58721e..3b15c80a3 100644 --- a/846_quart_race.py +++ b/846_quart_race.py @@ -21,7 +21,7 @@ async def quart(): # return data, 200, {'Connection': 'close'} if __name__ == '__main__': - uvicorn.run("846_quart_race:qapp", log_level="trace") - # uvicorn.run("846_quart_race:sapp", log_level="trace") + # uvicorn.run("846_quart_race:qapp", log_level="trace") + uvicorn.run("846_quart_race:sapp", log_level="trace") diff --git a/uvicorn/protocols/http/httptools_impl.py b/uvicorn/protocols/http/httptools_impl.py index 601f2b3ca..8f33400e4 100644 --- a/uvicorn/protocols/http/httptools_impl.py +++ b/uvicorn/protocols/http/httptools_impl.py @@ -154,7 +154,6 @@ def eof_received(self): pass def _unset_keepalive_if_required(self): - self.logger.log(TRACE_LOG_LEVEL, f"unset ka") if self.timeout_keep_alive_task is not None: self.timeout_keep_alive_task.cancel() self.timeout_keep_alive_task = None @@ -274,14 +273,12 @@ def on_headers_complete(self): on_response=self.on_response_complete, ) if existing_cycle is None or existing_cycle.response_complete: - self.logger.log(TRACE_LOG_LEVEL, f"ka: {existing_cycle.keep_alive if existing_cycle else None} existing: {existing_cycle} {existing_cycle.response_complete if existing_cycle else None}") # Standard case - start processing the request. task = self.loop.create_task(self.cycle.run_asgi(app)) task.add_done_callback(self.tasks.discard) self.tasks.add(task) else: # Pipelined HTTP requests need to be queued up. - self.logger.log(TRACE_LOG_LEVEL, "pipeline") self.flow.pause_reading() self.pipeline.insert(0, (self.cycle, app)) @@ -295,15 +292,12 @@ def on_body(self, body: bytes): self.message_event.set() def on_message_complete(self): - self.logger.log(TRACE_LOG_LEVEL, "message_complete") if self.parser.should_upgrade() or self.cycle.response_complete: return self.cycle.more_body = False self.message_event.set() - self.logger.log(TRACE_LOG_LEVEL, "message_complete end") def on_response_complete(self): - self.logger.log(TRACE_LOG_LEVEL, f"On response_complete") # Callback for pipelined HTTP requests to be started. self.server_state.total_requests += 1 @@ -320,16 +314,13 @@ def on_response_complete(self): # Unpause data reads if needed. self.flow.resume_reading() - self.logger.log(TRACE_LOG_LEVEL, f"read ok") # Unblock any pipelined events. if self.pipeline: - self.logger.log(TRACE_LOG_LEVEL, f"On response_complete pipeline") cycle, app = self.pipeline.pop() task = self.loop.create_task(cycle.run_asgi(app)) task.add_done_callback(self.tasks.discard) self.tasks.add(task) - self.logger.log(TRACE_LOG_LEVEL, f"On response END") def shutdown(self): """ @@ -542,7 +533,7 @@ async def send(self, message): # Handle response completion if not more_body: - self.logger.log(TRACE_LOG_LEVEL,f"3 D: {self.disconnected} C: {self.response_complete}") + self.logger.log(TRACE_LOG_LEVEL,f"S0 D: {self.disconnected} C: {self.response_complete}") if self.expected_content_length != 0: raise RuntimeError("Response content shorter than Content-Length") self.response_complete = True @@ -557,21 +548,25 @@ async def send(self, message): raise RuntimeError(msg % message_type) async def receive(self): + self.logger.log(TRACE_LOG_LEVEL, f"R0 D:{self.disconnected} C: {self.response_complete}") if self.waiting_for_100_continue and not self.transport.is_closing(): self.transport.write(b"HTTP/1.1 100 Continue\r\n\r\n") self.waiting_for_100_continue = False if not self.disconnected and not self.response_complete: - self.logger.log(TRACE_LOG_LEVEL, f"0 D: {self.disconnected} C: {self.response_complete}") + self.logger.log(TRACE_LOG_LEVEL, + f"R1 D:{self.disconnected} C: {self.response_complete}") self.flow.resume_reading() await self.message_event.wait() self.message_event.clear() if self.disconnected or self.response_complete: - self.logger.log(TRACE_LOG_LEVEL, f"1 D: {self.disconnected} C: {self.response_complete}") + self.logger.log(TRACE_LOG_LEVEL, + f"R2 D:{self.disconnected} C: {self.response_complete}") message = {"type": "http.disconnect"} else: - self.logger.log(TRACE_LOG_LEVEL, f"2 D: {self.disconnected} C: {self.response_complete}") + self.logger.log(TRACE_LOG_LEVEL, + f"R3 D:{self.disconnected} C: {self.response_complete}") message = { "type": "http.request", "body": self.body, From c54b83a6923e52f37248d95b3ea6e335e7fb8b24 Mon Sep 17 00:00:00 2001 From: euri10 Date: Thu, 12 Nov 2020 09:06:34 +0100 Subject: [PATCH 05/23] Added diff logs --- 846_quart_race.py | 4 ++-- quart.log | 36 ++++++++++++++++++++++++++++++++++++ starlette.log | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 72 insertions(+), 2 deletions(-) create mode 100644 quart.log create mode 100644 starlette.log diff --git a/846_quart_race.py b/846_quart_race.py index 3b15c80a3..70b58721e 100644 --- a/846_quart_race.py +++ b/846_quart_race.py @@ -21,7 +21,7 @@ async def quart(): # return data, 200, {'Connection': 'close'} if __name__ == '__main__': - # uvicorn.run("846_quart_race:qapp", log_level="trace") - uvicorn.run("846_quart_race:sapp", log_level="trace") + uvicorn.run("846_quart_race:qapp", log_level="trace") + # uvicorn.run("846_quart_race:sapp", log_level="trace") diff --git a/quart.log b/quart.log new file mode 100644 index 000000000..20db81a68 --- /dev/null +++ b/quart.log @@ -0,0 +1,36 @@ +/home/lotso/PycharmProjects/uvicorn/venv/bin/python /home/lotso/.local/share/JetBrains/Toolbox/apps/PyCharm-P/ch-1/202.7660.27/plugins/python/helpers/pydev/pydevd.py --multiproc --qt-support=auto --client 127.0.0.1 --port 43221 --file /home/lotso/PycharmProjects/uvicorn/846_quart_race.py +pydev debugger: process 25982 is connecting +Connected to pydev debugger (build 202.7660.27) +INFO: Started server process [25982] +INFO: Waiting for application startup. +Executing wait_for=()] created at /home/lotso/.asdf/installs/python/3.8.6/lib/python3.8/asyncio/locks.py:306> cb=[run_until_complete..()] created at /home/lotso/PycharmProjects/uvicorn/uvicorn/_impl/asyncio.py:47> took 0.102 seconds +TRACE: ASGI [1] Started scope={'type': 'lifespan', 'asgi': {'version': '3.0', 'spec_version': '2.0'}} +TRACE: ASGI [1] Receive {'type': 'lifespan.startup'} +TRACE: ASGI [1] Send {'type': 'lifespan.startup.complete'} +INFO: Application startup complete. +INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit) +TRACE: 127.0.0.1:59136 - Connection made +TRACE: 127.0.0.1:59138 - Connection made +TRACE: 127.0.0.1:59136 - Connection lost +TRACE: 127.0.0.1:59138 - ASGI [2] Started scope={'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.1'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8000), 'client': ('127.0.0.1', 59138), 'scheme': 'http', 'method': 'POST', 'root_path': '', 'path': '/', 'raw_path': b'/', 'query_string': b'', 'headers': [(b'host', b'127.0.0.1:8000'), (b'content-type', b'application/json'), (b'content-length', b'199')]} +TRACE: R0 D:False C: False +TRACE: R1 D:False C: False +TRACE: R3 D:False C: False +TRACE: 127.0.0.1:59138 - ASGI [2] Receive {'type': 'http.request', 'body': '<199 bytes>', 'more_body': False} +TRACE: R0 D:False C: False +TRACE: R1 D:False C: False +TRACE: 127.0.0.1:59138 - ASGI [2] Send {'type': 'http.response.start', 'status': 200, 'headers': [(b'content-type', b'text/html; charset=utf-8'), (b'content-length', b'199')]} +TRACE: 127.0.0.1:59138 - ASGI [2] Send {'type': 'http.response.body', 'body': '<199 bytes>', 'more_body': True} +TRACE: 127.0.0.1:59138 - ASGI [2] Send {'type': 'http.response.body', 'body': '<0 bytes>', 'more_body': False} +TRACE: S0 D: False C: False +TRACE: 127.0.0.1:59138 - ASGI [3] Started scope={'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.1'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8000), 'client': ('127.0.0.1', 59138), 'scheme': 'http', 'method': 'POST', 'root_path': '', 'path': '/', 'raw_path': b'/', 'query_string': b'', 'headers': [(b'host', b'127.0.0.1:8000'), (b'content-type', b'application/json'), (b'content-length', b'199')]} +TRACE: R2 D:False C: True +TRACE: 127.0.0.1:59138 - ASGI [2] Receive {'type': 'http.disconnect'} +TRACE: R0 D:False C: False +TRACE: R1 D:False C: False +TRACE: 127.0.0.1:59138 - ASGI [2] Completed +INFO: 127.0.0.1:59138 - "POST / HTTP/1.1" 200 OK +TRACE: 127.0.0.1:59138 - Connection lost +TRACE: R2 D:True C: False +TRACE: 127.0.0.1:59138 - ASGI [3] Receive {'type': 'http.disconnect'} +TRACE: 127.0.0.1:59138 - ASGI [3] Completed \ No newline at end of file diff --git a/starlette.log b/starlette.log new file mode 100644 index 000000000..9f074d6bd --- /dev/null +++ b/starlette.log @@ -0,0 +1,34 @@ +/home/lotso/PycharmProjects/uvicorn/venv/bin/python /home/lotso/.local/share/JetBrains/Toolbox/apps/PyCharm-P/ch-1/202.7660.27/plugins/python/helpers/pydev/pydevd.py --multiproc --qt-support=auto --client 127.0.0.1 --port 41799 --file /home/lotso/PycharmProjects/uvicorn/846_quart_race.py +pydev debugger: process 26943 is connecting +Connected to pydev debugger (build 202.7660.27) +INFO: Started server process [26943] +INFO: Waiting for application startup. +Executing wait_for=()] created at /home/lotso/.asdf/installs/python/3.8.6/lib/python3.8/asyncio/locks.py:306> cb=[run_until_complete..()] created at /home/lotso/PycharmProjects/uvicorn/uvicorn/_impl/asyncio.py:47> took 0.101 seconds +TRACE: ASGI [1] Started scope={'type': 'lifespan', 'asgi': {'version': '3.0', 'spec_version': '2.0'}} +TRACE: ASGI [1] Receive {'type': 'lifespan.startup'} +TRACE: ASGI [1] Send {'type': 'lifespan.startup.complete'} +INFO: Application startup complete. +INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit) +TRACE: 127.0.0.1:59396 - Connection made +TRACE: 127.0.0.1:59398 - Connection made +TRACE: 127.0.0.1:59396 - Connection lost +TRACE: 127.0.0.1:59398 - ASGI [2] Started scope={'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.1'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8000), 'client': ('127.0.0.1', 59398), 'scheme': 'http', 'method': 'POST', 'root_path': '', 'path': '/', 'raw_path': b'/', 'query_string': b'', 'headers': [(b'host', b'127.0.0.1:8000'), (b'content-type', b'application/json'), (b'content-length', b'199')]} +TRACE: R0 D:False C: False +TRACE: R1 D:False C: False +TRACE: R3 D:False C: False +TRACE: 127.0.0.1:59398 - ASGI [2] Receive {'type': 'http.request', 'body': '<199 bytes>', 'more_body': False} +TRACE: 127.0.0.1:59398 - ASGI [2] Send {'type': 'http.response.start', 'status': 200, 'headers': [(b'content-length', b'199')]} +TRACE: 127.0.0.1:59398 - ASGI [2] Send {'type': 'http.response.body', 'body': '<199 bytes>'} +TRACE: S0 D: False C: False +TRACE: 127.0.0.1:59398 - ASGI [2] Completed +TRACE: 127.0.0.1:59398 - ASGI [3] Started scope={'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.1'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8000), 'client': ('127.0.0.1', 59398), 'scheme': 'http', 'method': 'POST', 'root_path': '', 'path': '/', 'raw_path': b'/', 'query_string': b'', 'headers': [(b'host', b'127.0.0.1:8000'), (b'content-type', b'application/json'), (b'content-length', b'199')]} +TRACE: R0 D:False C: False +TRACE: R1 D:False C: False +TRACE: R3 D:False C: False +TRACE: 127.0.0.1:59398 - ASGI [3] Receive {'type': 'http.request', 'body': '<199 bytes>', 'more_body': False} +TRACE: 127.0.0.1:59398 - ASGI [3] Send {'type': 'http.response.start', 'status': 200, 'headers': [(b'content-length', b'199')]} +TRACE: 127.0.0.1:59398 - ASGI [3] Send {'type': 'http.response.body', 'body': '<199 bytes>'} +TRACE: S0 D: False C: False +TRACE: 127.0.0.1:59398 - ASGI [3] Completed +TRACE: 127.0.0.1:59398 - ASGI [4] Started scope={'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.1'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8000), 'client': ('127.0.0.1', 59398), 'scheme': 'http', 'method': 'POST', 'root_path': '', 'path': '/', 'raw_path': b'/', 'query_string': b'', 'headers': [(b'host', b'127.0.0.1:8000'), (b'content-type', b'application/json'), (b'content-length', b'199')]} +TRACE: R0 D:False C: False \ No newline at end of file From ef691d1e25fa180f60f07c1d51bea462245b1f46 Mon Sep 17 00:00:00 2001 From: euri10 Date: Thu, 12 Nov 2020 09:10:53 +0100 Subject: [PATCH 06/23] Added lua payload --- payload.lua | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 payload.lua diff --git a/payload.lua b/payload.lua new file mode 100644 index 000000000..b841eeb13 --- /dev/null +++ b/payload.lua @@ -0,0 +1,7 @@ +wrk.method = "POST" +wrk.headers["Content-Type"] = "application/json" +wrk.body = [[ +{"Inputs":[ + {"Text":"They have been given more opportunities to influence the formation and activities of the legislative and executive bodiies.","ModeOverride":"Proactive"} + ],"RequestVersion":2} + ]] \ No newline at end of file From 2eb10f47958053516d6e969f9635b9fffeb1a839 Mon Sep 17 00:00:00 2001 From: euri10 Date: Thu, 12 Nov 2020 09:13:09 +0100 Subject: [PATCH 07/23] Less diff --- uvicorn/protocols/http/httptools_impl.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/uvicorn/protocols/http/httptools_impl.py b/uvicorn/protocols/http/httptools_impl.py index 8f33400e4..c1f63ef14 100644 --- a/uvicorn/protocols/http/httptools_impl.py +++ b/uvicorn/protocols/http/httptools_impl.py @@ -287,7 +287,6 @@ def on_body(self, body: bytes): return self.cycle.body += body if len(self.cycle.body) > HIGH_WATER_LIMIT: - self.logger.log(TRACE_LOG_LEVEL, "PAUSE") self.flow.pause_reading() self.message_event.set() @@ -321,7 +320,6 @@ def on_response_complete(self): task.add_done_callback(self.tasks.discard) self.tasks.add(task) - def shutdown(self): """ Called by the server to commence a graceful shutdown. @@ -417,7 +415,6 @@ async def run_asgi(self, app): self.logger.error(msg) self.transport.close() finally: - self.on_response = None async def send_500_response(self): @@ -440,11 +437,9 @@ async def send(self, message): message_type = message["type"] if self.flow.write_paused and not self.disconnected: - self.logger.log(TRACE_LOG_LEVEL, "Drain in send") await self.flow.drain() if self.disconnected: - self.logger.log(TRACE_LOG_LEVEL, "Disc in send") return if not self.response_started: @@ -487,7 +482,6 @@ async def send(self, message): self.expected_content_length = 0 self.chunked_encoding = True elif name == b"connection" and value.lower() == b"close": - self.logger.log(TRACE_LOG_LEVEL, "setting ka false") self.keep_alive = False content.extend([name, b": ", value, b"\r\n"]) From b541de848cd5a958972fee930f516aa58ad7efc5 Mon Sep 17 00:00:00 2001 From: euri10 Date: Thu, 12 Nov 2020 10:47:52 +0100 Subject: [PATCH 08/23] Add pure asgi app, cant reproduce --- 846_quart_race.py | 71 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 70 insertions(+), 1 deletion(-) diff --git a/846_quart_race.py b/846_quart_race.py index 70b58721e..d39679dfd 100644 --- a/846_quart_race.py +++ b/846_quart_race.py @@ -1,3 +1,6 @@ +import asyncio +from typing import Generator, Any, Set + from quart import Quart, request as qrequest from starlette.applications import Starlette @@ -20,8 +23,74 @@ async def quart(): return data # return data, 200, {'Connection': 'close'} + +async def aapp(scope, receive, send): + if scope["type"] == "http": + asgi_handler = ASGIHTTPConnection() + await asgi_handler(receive, send) + + +class ASGIHTTPConnection: + + def __init__(self): + self.body = Body() + + async def __call__(self, receive, send): + receiver_task = asyncio.ensure_future(self.handle_messages(self.body, receive)) + handler_task = asyncio.ensure_future(self.handle_request(self.body, send)) + done, pending = await asyncio.wait( + [handler_task, receiver_task], return_when=asyncio.FIRST_COMPLETED + ) + await self._cancel_tasks(pending) + + async def _cancel_tasks(self, tasks: Set[asyncio.Future]) -> None: + for task in tasks: + task.cancel() + await asyncio.gather(*tasks, return_exceptions=True) + + async def handle_messages(self, body, receive) -> None: + while True: + message = await receive() + if message["type"] == "http.request": + body.append(message.get("body", b"")) + if not message.get("more_body", False): + body.set_complete() + elif message["type"] == "http.disconnect": + return + + async def handle_request(self, body, send) -> None: + data = await body + await send({ + 'type': 'http.response.start', + 'status': 200, + 'headers': [(b'content-length', b"%d" % len(data))], + }) + await send({ + 'type': 'http.response.body', + 'body': data, + 'more_body': False, + }) + + +class Body: + + def __init__(self) -> None: + self._data = bytearray() + self._complete: asyncio.Event = asyncio.Event() + + def __await__(self) -> Generator[Any, None, Any]: + yield from self._complete.wait().__await__() + return bytes(self._data) + + def append(self, data: bytes) -> None: + self._data.extend(data) + + def set_complete(self) -> None: + self._complete.set() + if __name__ == '__main__': - uvicorn.run("846_quart_race:qapp", log_level="trace") + uvicorn.run("846_quart_race:aapp", log_level="trace") + # uvicorn.run("846_quart_race:qapp", log_level="trace") # uvicorn.run("846_quart_race:sapp", log_level="trace") From cb92e8518b3fcfebf984ec488e53a8d43cb046fe Mon Sep 17 00:00:00 2001 From: euri10 Date: Thu, 12 Nov 2020 11:13:54 +0100 Subject: [PATCH 09/23] Set event per cycle --- 846_quart_race.py | 4 ++-- uvicorn/protocols/http/httptools_impl.py | 11 ++++++----- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/846_quart_race.py b/846_quart_race.py index d39679dfd..fb6ec79ca 100644 --- a/846_quart_race.py +++ b/846_quart_race.py @@ -89,8 +89,8 @@ def set_complete(self) -> None: self._complete.set() if __name__ == '__main__': - uvicorn.run("846_quart_race:aapp", log_level="trace") - # uvicorn.run("846_quart_race:qapp", log_level="trace") + # uvicorn.run("846_quart_race:aapp", log_level="trace") + uvicorn.run("846_quart_race:qapp", log_level="trace") # uvicorn.run("846_quart_race:sapp", log_level="trace") diff --git a/uvicorn/protocols/http/httptools_impl.py b/uvicorn/protocols/http/httptools_impl.py index c1f63ef14..99d8f47a6 100644 --- a/uvicorn/protocols/http/httptools_impl.py +++ b/uvicorn/protocols/http/httptools_impl.py @@ -121,7 +121,6 @@ def __init__(self, config, server_state, _loop=None): self.headers = None self.expect_100_continue = False self.cycle = None - self.message_event = asyncio.Event() # Protocol interface def connection_made(self, transport): @@ -146,7 +145,7 @@ def connection_lost(self, exc): if self.cycle and not self.cycle.response_complete: self.cycle.disconnected = True - self.message_event.set() + self.cycle.message_event.set() if self.flow is not None: self.flow.resume_writing() @@ -267,7 +266,7 @@ def on_headers_complete(self): access_logger=self.access_logger, access_log=self.access_log, default_headers=self.default_headers, - message_event=self.message_event, + message_event=asyncio.Event(), expect_100_continue=self.expect_100_continue, keep_alive=http_version != "1.0", on_response=self.on_response_complete, @@ -288,13 +287,13 @@ def on_body(self, body: bytes): self.cycle.body += body if len(self.cycle.body) > HIGH_WATER_LIMIT: self.flow.pause_reading() - self.message_event.set() + self.cycle.message_event.set() def on_message_complete(self): if self.parser.should_upgrade() or self.cycle.response_complete: return self.cycle.more_body = False - self.message_event.set() + self.cycle.message_event.set() def on_response_complete(self): # Callback for pipelined HTTP requests to be started. @@ -531,10 +530,12 @@ async def send(self, message): if self.expected_content_length != 0: raise RuntimeError("Response content shorter than Content-Length") self.response_complete = True + self.message_event.set() if not self.keep_alive: self.logger.log(TRACE_LOG_LEVEL, "not ka") self.transport.close() self.on_response() + self.logger.log(TRACE_LOG_LEVEL,f"S1 D: {self.disconnected} C: {self.response_complete}") else: # Response already sent From 86a6d776c997e32dc864c165b90777b61b6a00c2 Mon Sep 17 00:00:00 2001 From: euri10 Date: Thu, 12 Nov 2020 11:16:31 +0100 Subject: [PATCH 10/23] Removed log trace --- uvicorn/middleware/message_logger.py | 2 +- uvicorn/protocols/http/httptools_impl.py | 8 -------- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/uvicorn/middleware/message_logger.py b/uvicorn/middleware/message_logger.py index 7ad001012..a05b5c137 100644 --- a/uvicorn/middleware/message_logger.py +++ b/uvicorn/middleware/message_logger.py @@ -4,7 +4,7 @@ "body": "<{length} bytes>", "bytes": "<{length} bytes>", "text": "<{length} chars>", - # "headers": "<...>", + "headers": "<...>", } TRACE_LOG_LEVEL = 5 diff --git a/uvicorn/protocols/http/httptools_impl.py b/uvicorn/protocols/http/httptools_impl.py index 99d8f47a6..0740a7a8d 100644 --- a/uvicorn/protocols/http/httptools_impl.py +++ b/uvicorn/protocols/http/httptools_impl.py @@ -526,16 +526,13 @@ async def send(self, message): # Handle response completion if not more_body: - self.logger.log(TRACE_LOG_LEVEL,f"S0 D: {self.disconnected} C: {self.response_complete}") if self.expected_content_length != 0: raise RuntimeError("Response content shorter than Content-Length") self.response_complete = True self.message_event.set() if not self.keep_alive: - self.logger.log(TRACE_LOG_LEVEL, "not ka") self.transport.close() self.on_response() - self.logger.log(TRACE_LOG_LEVEL,f"S1 D: {self.disconnected} C: {self.response_complete}") else: # Response already sent @@ -543,7 +540,6 @@ async def send(self, message): raise RuntimeError(msg % message_type) async def receive(self): - self.logger.log(TRACE_LOG_LEVEL, f"R0 D:{self.disconnected} C: {self.response_complete}") if self.waiting_for_100_continue and not self.transport.is_closing(): self.transport.write(b"HTTP/1.1 100 Continue\r\n\r\n") self.waiting_for_100_continue = False @@ -556,12 +552,8 @@ async def receive(self): self.message_event.clear() if self.disconnected or self.response_complete: - self.logger.log(TRACE_LOG_LEVEL, - f"R2 D:{self.disconnected} C: {self.response_complete}") message = {"type": "http.disconnect"} else: - self.logger.log(TRACE_LOG_LEVEL, - f"R3 D:{self.disconnected} C: {self.response_complete}") message = { "type": "http.request", "body": self.body, From bc613d240d8a66ff241a0c1ccc6792481579f18a Mon Sep 17 00:00:00 2001 From: euri10 Date: Thu, 12 Nov 2020 11:24:50 +0100 Subject: [PATCH 11/23] Message event is now in cycle --- 846_quart_race.py | 24 ++++++++++++++++++++++++ uvicorn/protocols/http/httptools_impl.py | 1 - 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/846_quart_race.py b/846_quart_race.py index fb6ec79ca..38143c747 100644 --- a/846_quart_race.py +++ b/846_quart_race.py @@ -88,7 +88,31 @@ def append(self, data: bytes) -> None: def set_complete(self) -> None: self._complete.set() + +async def wait_for_disconnect(receive): + while True: + p = await receive() + if p['type'] == 'http.disconnect': + print('Disconnected!') + break + + +async def app748(scope, receive, send): + await asyncio.sleep(0.2) + m = await receive() + + if m['type'] == 'lifespan.startup': + await send({'type': 'lifespan.startup.complete'}) + elif m['type'] == 'http.request': + if scope['path'] == '/foo': + asyncio.create_task(wait_for_disconnect(receive)) + await asyncio.sleep(0.2) + + await send({'type': 'http.response.start', 'status': 404}) + await send({'type': 'http.response.body', 'body': b'Not found!\n'}) + if __name__ == '__main__': + # uvicorn.run("846_quart_race:app748", log_level="trace") # uvicorn.run("846_quart_race:aapp", log_level="trace") uvicorn.run("846_quart_race:qapp", log_level="trace") # uvicorn.run("846_quart_race:sapp", log_level="trace") diff --git a/uvicorn/protocols/http/httptools_impl.py b/uvicorn/protocols/http/httptools_impl.py index 0740a7a8d..ece3485b4 100644 --- a/uvicorn/protocols/http/httptools_impl.py +++ b/uvicorn/protocols/http/httptools_impl.py @@ -529,7 +529,6 @@ async def send(self, message): if self.expected_content_length != 0: raise RuntimeError("Response content shorter than Content-Length") self.response_complete = True - self.message_event.set() if not self.keep_alive: self.transport.close() self.on_response() From ca9e4be9f30d129af60b97ba3a52e5e7097e38cc Mon Sep 17 00:00:00 2001 From: euri10 Date: Thu, 12 Nov 2020 11:26:13 +0100 Subject: [PATCH 12/23] Removed logs --- uvicorn/protocols/http/httptools_impl.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/uvicorn/protocols/http/httptools_impl.py b/uvicorn/protocols/http/httptools_impl.py index ece3485b4..d4a941898 100644 --- a/uvicorn/protocols/http/httptools_impl.py +++ b/uvicorn/protocols/http/httptools_impl.py @@ -544,8 +544,6 @@ async def receive(self): self.waiting_for_100_continue = False if not self.disconnected and not self.response_complete: - self.logger.log(TRACE_LOG_LEVEL, - f"R1 D:{self.disconnected} C: {self.response_complete}") self.flow.resume_reading() await self.message_event.wait() self.message_event.clear() From abe1df6c2358dbe8bac73001073660faa8ebbdf0 Mon Sep 17 00:00:00 2001 From: euri10 Date: Thu, 12 Nov 2020 12:03:05 +0100 Subject: [PATCH 13/23] Cannot set message if cycle is None --- 846_quart_race.py | 6 +++--- uvicorn/protocols/http/httptools_impl.py | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/846_quart_race.py b/846_quart_race.py index 38143c747..430da14c1 100644 --- a/846_quart_race.py +++ b/846_quart_race.py @@ -104,7 +104,7 @@ async def app748(scope, receive, send): if m['type'] == 'lifespan.startup': await send({'type': 'lifespan.startup.complete'}) elif m['type'] == 'http.request': - if scope['path'] == '/foo': + if scope['path'] == '/': asyncio.create_task(wait_for_disconnect(receive)) await asyncio.sleep(0.2) @@ -113,8 +113,8 @@ async def app748(scope, receive, send): if __name__ == '__main__': # uvicorn.run("846_quart_race:app748", log_level="trace") - # uvicorn.run("846_quart_race:aapp", log_level="trace") - uvicorn.run("846_quart_race:qapp", log_level="trace") + uvicorn.run("846_quart_race:aapp", log_level="trace") + # uvicorn.run("846_quart_race:qapp", log_level="trace") # uvicorn.run("846_quart_race:sapp", log_level="trace") diff --git a/uvicorn/protocols/http/httptools_impl.py b/uvicorn/protocols/http/httptools_impl.py index d4a941898..f8df0a8df 100644 --- a/uvicorn/protocols/http/httptools_impl.py +++ b/uvicorn/protocols/http/httptools_impl.py @@ -145,7 +145,8 @@ def connection_lost(self, exc): if self.cycle and not self.cycle.response_complete: self.cycle.disconnected = True - self.cycle.message_event.set() + if self.cycle: + self.cycle.message_event.set() if self.flow is not None: self.flow.resume_writing() From 7066ad3921773cc18fac1c6d166eb1f381e592d4 Mon Sep 17 00:00:00 2001 From: euri10 Date: Thu, 12 Nov 2020 13:14:54 +0100 Subject: [PATCH 14/23] Deleted logs --- quart.log | 36 ------------------------------------ starlette.log | 34 ---------------------------------- 2 files changed, 70 deletions(-) delete mode 100644 quart.log delete mode 100644 starlette.log diff --git a/quart.log b/quart.log deleted file mode 100644 index 20db81a68..000000000 --- a/quart.log +++ /dev/null @@ -1,36 +0,0 @@ -/home/lotso/PycharmProjects/uvicorn/venv/bin/python /home/lotso/.local/share/JetBrains/Toolbox/apps/PyCharm-P/ch-1/202.7660.27/plugins/python/helpers/pydev/pydevd.py --multiproc --qt-support=auto --client 127.0.0.1 --port 43221 --file /home/lotso/PycharmProjects/uvicorn/846_quart_race.py -pydev debugger: process 25982 is connecting -Connected to pydev debugger (build 202.7660.27) -INFO: Started server process [25982] -INFO: Waiting for application startup. -Executing wait_for=()] created at /home/lotso/.asdf/installs/python/3.8.6/lib/python3.8/asyncio/locks.py:306> cb=[run_until_complete..()] created at /home/lotso/PycharmProjects/uvicorn/uvicorn/_impl/asyncio.py:47> took 0.102 seconds -TRACE: ASGI [1] Started scope={'type': 'lifespan', 'asgi': {'version': '3.0', 'spec_version': '2.0'}} -TRACE: ASGI [1] Receive {'type': 'lifespan.startup'} -TRACE: ASGI [1] Send {'type': 'lifespan.startup.complete'} -INFO: Application startup complete. -INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit) -TRACE: 127.0.0.1:59136 - Connection made -TRACE: 127.0.0.1:59138 - Connection made -TRACE: 127.0.0.1:59136 - Connection lost -TRACE: 127.0.0.1:59138 - ASGI [2] Started scope={'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.1'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8000), 'client': ('127.0.0.1', 59138), 'scheme': 'http', 'method': 'POST', 'root_path': '', 'path': '/', 'raw_path': b'/', 'query_string': b'', 'headers': [(b'host', b'127.0.0.1:8000'), (b'content-type', b'application/json'), (b'content-length', b'199')]} -TRACE: R0 D:False C: False -TRACE: R1 D:False C: False -TRACE: R3 D:False C: False -TRACE: 127.0.0.1:59138 - ASGI [2] Receive {'type': 'http.request', 'body': '<199 bytes>', 'more_body': False} -TRACE: R0 D:False C: False -TRACE: R1 D:False C: False -TRACE: 127.0.0.1:59138 - ASGI [2] Send {'type': 'http.response.start', 'status': 200, 'headers': [(b'content-type', b'text/html; charset=utf-8'), (b'content-length', b'199')]} -TRACE: 127.0.0.1:59138 - ASGI [2] Send {'type': 'http.response.body', 'body': '<199 bytes>', 'more_body': True} -TRACE: 127.0.0.1:59138 - ASGI [2] Send {'type': 'http.response.body', 'body': '<0 bytes>', 'more_body': False} -TRACE: S0 D: False C: False -TRACE: 127.0.0.1:59138 - ASGI [3] Started scope={'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.1'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8000), 'client': ('127.0.0.1', 59138), 'scheme': 'http', 'method': 'POST', 'root_path': '', 'path': '/', 'raw_path': b'/', 'query_string': b'', 'headers': [(b'host', b'127.0.0.1:8000'), (b'content-type', b'application/json'), (b'content-length', b'199')]} -TRACE: R2 D:False C: True -TRACE: 127.0.0.1:59138 - ASGI [2] Receive {'type': 'http.disconnect'} -TRACE: R0 D:False C: False -TRACE: R1 D:False C: False -TRACE: 127.0.0.1:59138 - ASGI [2] Completed -INFO: 127.0.0.1:59138 - "POST / HTTP/1.1" 200 OK -TRACE: 127.0.0.1:59138 - Connection lost -TRACE: R2 D:True C: False -TRACE: 127.0.0.1:59138 - ASGI [3] Receive {'type': 'http.disconnect'} -TRACE: 127.0.0.1:59138 - ASGI [3] Completed \ No newline at end of file diff --git a/starlette.log b/starlette.log deleted file mode 100644 index 9f074d6bd..000000000 --- a/starlette.log +++ /dev/null @@ -1,34 +0,0 @@ -/home/lotso/PycharmProjects/uvicorn/venv/bin/python /home/lotso/.local/share/JetBrains/Toolbox/apps/PyCharm-P/ch-1/202.7660.27/plugins/python/helpers/pydev/pydevd.py --multiproc --qt-support=auto --client 127.0.0.1 --port 41799 --file /home/lotso/PycharmProjects/uvicorn/846_quart_race.py -pydev debugger: process 26943 is connecting -Connected to pydev debugger (build 202.7660.27) -INFO: Started server process [26943] -INFO: Waiting for application startup. -Executing wait_for=()] created at /home/lotso/.asdf/installs/python/3.8.6/lib/python3.8/asyncio/locks.py:306> cb=[run_until_complete..()] created at /home/lotso/PycharmProjects/uvicorn/uvicorn/_impl/asyncio.py:47> took 0.101 seconds -TRACE: ASGI [1] Started scope={'type': 'lifespan', 'asgi': {'version': '3.0', 'spec_version': '2.0'}} -TRACE: ASGI [1] Receive {'type': 'lifespan.startup'} -TRACE: ASGI [1] Send {'type': 'lifespan.startup.complete'} -INFO: Application startup complete. -INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit) -TRACE: 127.0.0.1:59396 - Connection made -TRACE: 127.0.0.1:59398 - Connection made -TRACE: 127.0.0.1:59396 - Connection lost -TRACE: 127.0.0.1:59398 - ASGI [2] Started scope={'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.1'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8000), 'client': ('127.0.0.1', 59398), 'scheme': 'http', 'method': 'POST', 'root_path': '', 'path': '/', 'raw_path': b'/', 'query_string': b'', 'headers': [(b'host', b'127.0.0.1:8000'), (b'content-type', b'application/json'), (b'content-length', b'199')]} -TRACE: R0 D:False C: False -TRACE: R1 D:False C: False -TRACE: R3 D:False C: False -TRACE: 127.0.0.1:59398 - ASGI [2] Receive {'type': 'http.request', 'body': '<199 bytes>', 'more_body': False} -TRACE: 127.0.0.1:59398 - ASGI [2] Send {'type': 'http.response.start', 'status': 200, 'headers': [(b'content-length', b'199')]} -TRACE: 127.0.0.1:59398 - ASGI [2] Send {'type': 'http.response.body', 'body': '<199 bytes>'} -TRACE: S0 D: False C: False -TRACE: 127.0.0.1:59398 - ASGI [2] Completed -TRACE: 127.0.0.1:59398 - ASGI [3] Started scope={'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.1'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8000), 'client': ('127.0.0.1', 59398), 'scheme': 'http', 'method': 'POST', 'root_path': '', 'path': '/', 'raw_path': b'/', 'query_string': b'', 'headers': [(b'host', b'127.0.0.1:8000'), (b'content-type', b'application/json'), (b'content-length', b'199')]} -TRACE: R0 D:False C: False -TRACE: R1 D:False C: False -TRACE: R3 D:False C: False -TRACE: 127.0.0.1:59398 - ASGI [3] Receive {'type': 'http.request', 'body': '<199 bytes>', 'more_body': False} -TRACE: 127.0.0.1:59398 - ASGI [3] Send {'type': 'http.response.start', 'status': 200, 'headers': [(b'content-length', b'199')]} -TRACE: 127.0.0.1:59398 - ASGI [3] Send {'type': 'http.response.body', 'body': '<199 bytes>'} -TRACE: S0 D: False C: False -TRACE: 127.0.0.1:59398 - ASGI [3] Completed -TRACE: 127.0.0.1:59398 - ASGI [4] Started scope={'type': 'http', 'asgi': {'version': '3.0', 'spec_version': '2.1'}, 'http_version': '1.1', 'server': ('127.0.0.1', 8000), 'client': ('127.0.0.1', 59398), 'scheme': 'http', 'method': 'POST', 'root_path': '', 'path': '/', 'raw_path': b'/', 'query_string': b'', 'headers': [(b'host', b'127.0.0.1:8000'), (b'content-type', b'application/json'), (b'content-length', b'199')]} -TRACE: R0 D:False C: False \ No newline at end of file From 5912510b82694cb8a7e4276e1984a238dc006187 Mon Sep 17 00:00:00 2001 From: euri10 Date: Thu, 12 Nov 2020 16:02:15 +0100 Subject: [PATCH 15/23] Test concurrent hang --- tests/test_client.py | 49 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/tests/test_client.py b/tests/test_client.py index 98edfdba0..7873d59d8 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,3 +1,8 @@ +import asyncio + +import httpx +import pytest + from tests.client import TestClient @@ -26,3 +31,47 @@ def test_explicit_host(): response = client.get("/", headers={"host": "example.org"}) assert response.status_code == 200 assert response.text == "hello, world" + + +async def wait_for_disconnect(receive): + while True: + p = await receive() + if p['type'] == 'http.disconnect': + print('Disconnected!') + break + + +async def hang(scope, receive, send): + await asyncio.sleep(0.2) + m = await receive() + + if m['type'] == 'lifespan.startup': + await send({'type': 'lifespan.startup.complete'}) + elif m['type'] == 'http.request': + if scope['path'] == '/foo': + print('foo') + asyncio.create_task(wait_for_disconnect(receive)) + await asyncio.sleep(0.2) + await send({'type': 'http.response.start', 'status': 200}) + await send({'type': 'http.response.body', 'body': b'FOO!\n'}) + elif scope['path'] == "/bar": + print('bar') + await send({'type': 'http.response.start', 'status': 200}) + await send({'type': 'http.response.body', 'body': b'BAR!\n'}) + else: + await send({'type': 'http.response.start', 'status': 404}) + await send({'type': 'http.response.body', 'body': b'Not found!\n'}) + + +@pytest.mark.asyncio +async def test_concurrent_requests() -> None: + async with httpx.AsyncClient(app=hang, base_url="http://testserver") as client: + tasks = [] + for i in range(1): + tasks.append(client.get("/foo")) + tasks.append(client.get("/bar")) + tasks.append(client.get("/")) + + results = await asyncio.gather(*tasks) + print([(r.status_code, r.content) for r in results]) + # do something with results \ No newline at end of file From f19151bcc1f7963f18a591f7166d9aeb0606a44e Mon Sep 17 00:00:00 2001 From: euri10 Date: Thu, 12 Nov 2020 17:42:43 +0100 Subject: [PATCH 16/23] Hanging test --- tests/test_client.py | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/tests/test_client.py b/tests/test_client.py index 7873d59d8..fdfd0f18a 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,7 +1,6 @@ import asyncio -import httpx -import pytest +import requests from tests.client import TestClient @@ -63,15 +62,8 @@ async def hang(scope, receive, send): await send({'type': 'http.response.body', 'body': b'Not found!\n'}) -@pytest.mark.asyncio -async def test_concurrent_requests() -> None: - async with httpx.AsyncClient(app=hang, base_url="http://testserver") as client: - tasks = [] - for i in range(1): - tasks.append(client.get("/foo")) - tasks.append(client.get("/bar")) - tasks.append(client.get("/")) - - results = await asyncio.gather(*tasks) - print([(r.status_code, r.content) for r in results]) - # do something with results \ No newline at end of file +def test_concurrent_requests() -> None: + with TestClient(hang) as s: + s.get("/foo") + s.get("/bar") + s.get("/") From 26a1c1cdcb7be778c3409c65c974c64066ecfdfe Mon Sep 17 00:00:00 2001 From: euri10 Date: Thu, 12 Nov 2020 18:10:29 +0100 Subject: [PATCH 17/23] Right test and modified client a little to be able to use context manager --- tests/client.py | 6 +++++- tests/test_client.py | 19 ++++++------------- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/tests/client.py b/tests/client.py index 829d81b1e..56d7f557d 100644 --- a/tests/client.py +++ b/tests/client.py @@ -4,7 +4,7 @@ from urllib.parse import unquote, urljoin, urlparse import requests - +from requests.adapters import HTTPAdapter class _HeaderDict(requests.packages.urllib3._collections.HTTPHeaderDict): def get_all(self, key, default): @@ -30,6 +30,9 @@ def __init__(self, app: typing.Callable, raise_server_exceptions=True) -> None: self.app = app self.raise_server_exceptions = raise_server_exceptions + def close(self): + pass + def send(self, request, *args, **kwargs): scheme, netloc, path, params, query, fragement = urlparse(request.url) if ":" in netloc: @@ -70,6 +73,7 @@ async def receive(): body = request.body if body is None: body_bytes = b"" + return {"type": "http.disconnect"} else: assert isinstance(body, bytes) body_bytes = body diff --git a/tests/test_client.py b/tests/test_client.py index fdfd0f18a..99d8b51ed 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -34,6 +34,7 @@ def test_explicit_host(): async def wait_for_disconnect(receive): while True: + print('h') p = await receive() if p['type'] == 'http.disconnect': print('Disconnected!') @@ -51,19 +52,11 @@ async def hang(scope, receive, send): print('foo') asyncio.create_task(wait_for_disconnect(receive)) await asyncio.sleep(0.2) - await send({'type': 'http.response.start', 'status': 200}) - await send({'type': 'http.response.body', 'body': b'FOO!\n'}) - elif scope['path'] == "/bar": - print('bar') - await send({'type': 'http.response.start', 'status': 200}) - await send({'type': 'http.response.body', 'body': b'BAR!\n'}) - else: - await send({'type': 'http.response.start', 'status': 404}) - await send({'type': 'http.response.body', 'body': b'Not found!\n'}) + await send({'type': 'http.response.start', 'status': 404}) + await send({'type': 'http.response.body', 'body': b'Not found!\n'}) def test_concurrent_requests() -> None: - with TestClient(hang) as s: - s.get("/foo") - s.get("/bar") - s.get("/") + with TestClient(hang) as client: + client.get("/foo") + client.get("/bar") From 4b38b5f0ec1810e269b7cb87a0a46372da0e5c0c Mon Sep 17 00:00:00 2001 From: euri10 Date: Fri, 13 Nov 2020 08:35:22 +0100 Subject: [PATCH 18/23] emoved test client changes and test --- tests/client.py | 4 ---- tests/test_client.py | 30 ------------------------------ 2 files changed, 34 deletions(-) diff --git a/tests/client.py b/tests/client.py index 56d7f557d..a352235bb 100644 --- a/tests/client.py +++ b/tests/client.py @@ -30,9 +30,6 @@ def __init__(self, app: typing.Callable, raise_server_exceptions=True) -> None: self.app = app self.raise_server_exceptions = raise_server_exceptions - def close(self): - pass - def send(self, request, *args, **kwargs): scheme, netloc, path, params, query, fragement = urlparse(request.url) if ":" in netloc: @@ -73,7 +70,6 @@ async def receive(): body = request.body if body is None: body_bytes = b"" - return {"type": "http.disconnect"} else: assert isinstance(body, bytes) body_bytes = body diff --git a/tests/test_client.py b/tests/test_client.py index 99d8b51ed..950666cf3 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -30,33 +30,3 @@ def test_explicit_host(): response = client.get("/", headers={"host": "example.org"}) assert response.status_code == 200 assert response.text == "hello, world" - - -async def wait_for_disconnect(receive): - while True: - print('h') - p = await receive() - if p['type'] == 'http.disconnect': - print('Disconnected!') - break - - -async def hang(scope, receive, send): - await asyncio.sleep(0.2) - m = await receive() - - if m['type'] == 'lifespan.startup': - await send({'type': 'lifespan.startup.complete'}) - elif m['type'] == 'http.request': - if scope['path'] == '/foo': - print('foo') - asyncio.create_task(wait_for_disconnect(receive)) - await asyncio.sleep(0.2) - await send({'type': 'http.response.start', 'status': 404}) - await send({'type': 'http.response.body', 'body': b'Not found!\n'}) - - -def test_concurrent_requests() -> None: - with TestClient(hang) as client: - client.get("/foo") - client.get("/bar") From 4ecb9610dacde415f9efc1903d6ae4f1841fc539 Mon Sep 17 00:00:00 2001 From: euri10 Date: Fri, 13 Nov 2020 08:35:48 +0100 Subject: [PATCH 19/23] Add event set in send coroutine --- uvicorn/protocols/http/httptools_impl.py | 1 + 1 file changed, 1 insertion(+) diff --git a/uvicorn/protocols/http/httptools_impl.py b/uvicorn/protocols/http/httptools_impl.py index f8df0a8df..f453b2b09 100644 --- a/uvicorn/protocols/http/httptools_impl.py +++ b/uvicorn/protocols/http/httptools_impl.py @@ -530,6 +530,7 @@ async def send(self, message): if self.expected_content_length != 0: raise RuntimeError("Response content shorter than Content-Length") self.response_complete = True + self.message_event.set() if not self.keep_alive: self.transport.close() self.on_response() From 3d3cf06d7856349fb53db4630b60c4e5874c8603 Mon Sep 17 00:00:00 2001 From: euri10 Date: Fri, 13 Nov 2020 08:36:48 +0100 Subject: [PATCH 20/23] Tests leftovers --- tests/client.py | 2 +- tests/test_client.py | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/tests/client.py b/tests/client.py index a352235bb..829d81b1e 100644 --- a/tests/client.py +++ b/tests/client.py @@ -4,7 +4,7 @@ from urllib.parse import unquote, urljoin, urlparse import requests -from requests.adapters import HTTPAdapter + class _HeaderDict(requests.packages.urllib3._collections.HTTPHeaderDict): def get_all(self, key, default): diff --git a/tests/test_client.py b/tests/test_client.py index 950666cf3..98edfdba0 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,7 +1,3 @@ -import asyncio - -import requests - from tests.client import TestClient From 39ea2592310224a6e9939698e0c195c91588234b Mon Sep 17 00:00:00 2001 From: euri10 Date: Fri, 13 Nov 2020 08:42:47 +0100 Subject: [PATCH 21/23] h11 implementation --- 846_quart_race.py | 4 ++-- uvicorn/protocols/http/h11_impl.py | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/846_quart_race.py b/846_quart_race.py index 430da14c1..664f57e95 100644 --- a/846_quart_race.py +++ b/846_quart_race.py @@ -113,8 +113,8 @@ async def app748(scope, receive, send): if __name__ == '__main__': # uvicorn.run("846_quart_race:app748", log_level="trace") - uvicorn.run("846_quart_race:aapp", log_level="trace") - # uvicorn.run("846_quart_race:qapp", log_level="trace") + # uvicorn.run("846_quart_race:aapp", log_level="trace") + uvicorn.run("846_quart_race:qapp", log_level="trace", http="h11") # uvicorn.run("846_quart_race:sapp", log_level="trace") diff --git a/uvicorn/protocols/http/h11_impl.py b/uvicorn/protocols/http/h11_impl.py index 6a362ff20..fcf3f444c 100644 --- a/uvicorn/protocols/http/h11_impl.py +++ b/uvicorn/protocols/http/h11_impl.py @@ -113,7 +113,6 @@ def __init__(self, config, server_state, _loop=None): self.scope = None self.headers = None self.cycle = None - self.message_event = asyncio.Event() # Protocol interface def connection_made(self, transport): @@ -146,7 +145,7 @@ def connection_lost(self, exc): # Premature client disconnect pass - self.message_event.set() + self.cycle.message_event.set() if self.flow is not None: self.flow.resume_writing() @@ -234,7 +233,7 @@ def handle_events(self): access_logger=self.access_logger, access_log=self.access_log, default_headers=self.default_headers, - message_event=self.message_event, + message_event=asyncio.Event(), on_response=self.on_response_complete, ) task = self.loop.create_task(self.cycle.run_asgi(app)) @@ -247,7 +246,7 @@ def handle_events(self): self.cycle.body += event.data if len(self.cycle.body) > HIGH_WATER_LIMIT: self.flow.pause_reading() - self.message_event.set() + self.cycle.message_event.set() elif event_type is h11.EndOfMessage: if self.conn.our_state is h11.DONE: @@ -255,7 +254,7 @@ def handle_events(self): self.conn.start_next_cycle() continue self.cycle.more_body = False - self.message_event.set() + self.cycle.message_event.set() def handle_upgrade(self, event): upgrade_value = None @@ -491,6 +490,7 @@ async def send(self, message): # Handle response completion if not more_body: self.response_complete = True + self.message_event.set() event = h11.EndOfMessage() output = self.conn.send(event) self.transport.write(output) From e51ac735ffa8c20b55a3c0b6532b96105b85e593 Mon Sep 17 00:00:00 2001 From: euri10 Date: Fri, 13 Nov 2020 08:47:09 +0100 Subject: [PATCH 22/23] Removed working files --- 846_quart_race.py | 120 ---------------------------------------------- payload.lua | 7 --- 2 files changed, 127 deletions(-) delete mode 100644 846_quart_race.py delete mode 100644 payload.lua diff --git a/846_quart_race.py b/846_quart_race.py deleted file mode 100644 index 664f57e95..000000000 --- a/846_quart_race.py +++ /dev/null @@ -1,120 +0,0 @@ -import asyncio -from typing import Generator, Any, Set - -from quart import Quart, request as qrequest - -from starlette.applications import Starlette -from starlette.responses import Response -import uvicorn - -qapp = Quart(__name__) -sapp = Starlette() - - -@sapp.route('/', methods=['POST']) -async def starlette(request): - data = await request.body() - return Response(data) - - -@qapp.route('/', methods=['POST']) -async def quart(): - data = await qrequest.get_data() - return data - # return data, 200, {'Connection': 'close'} - - -async def aapp(scope, receive, send): - if scope["type"] == "http": - asgi_handler = ASGIHTTPConnection() - await asgi_handler(receive, send) - - -class ASGIHTTPConnection: - - def __init__(self): - self.body = Body() - - async def __call__(self, receive, send): - receiver_task = asyncio.ensure_future(self.handle_messages(self.body, receive)) - handler_task = asyncio.ensure_future(self.handle_request(self.body, send)) - done, pending = await asyncio.wait( - [handler_task, receiver_task], return_when=asyncio.FIRST_COMPLETED - ) - await self._cancel_tasks(pending) - - async def _cancel_tasks(self, tasks: Set[asyncio.Future]) -> None: - for task in tasks: - task.cancel() - await asyncio.gather(*tasks, return_exceptions=True) - - async def handle_messages(self, body, receive) -> None: - while True: - message = await receive() - if message["type"] == "http.request": - body.append(message.get("body", b"")) - if not message.get("more_body", False): - body.set_complete() - elif message["type"] == "http.disconnect": - return - - async def handle_request(self, body, send) -> None: - data = await body - await send({ - 'type': 'http.response.start', - 'status': 200, - 'headers': [(b'content-length', b"%d" % len(data))], - }) - await send({ - 'type': 'http.response.body', - 'body': data, - 'more_body': False, - }) - - -class Body: - - def __init__(self) -> None: - self._data = bytearray() - self._complete: asyncio.Event = asyncio.Event() - - def __await__(self) -> Generator[Any, None, Any]: - yield from self._complete.wait().__await__() - return bytes(self._data) - - def append(self, data: bytes) -> None: - self._data.extend(data) - - def set_complete(self) -> None: - self._complete.set() - - -async def wait_for_disconnect(receive): - while True: - p = await receive() - if p['type'] == 'http.disconnect': - print('Disconnected!') - break - - -async def app748(scope, receive, send): - await asyncio.sleep(0.2) - m = await receive() - - if m['type'] == 'lifespan.startup': - await send({'type': 'lifespan.startup.complete'}) - elif m['type'] == 'http.request': - if scope['path'] == '/': - asyncio.create_task(wait_for_disconnect(receive)) - await asyncio.sleep(0.2) - - await send({'type': 'http.response.start', 'status': 404}) - await send({'type': 'http.response.body', 'body': b'Not found!\n'}) - -if __name__ == '__main__': - # uvicorn.run("846_quart_race:app748", log_level="trace") - # uvicorn.run("846_quart_race:aapp", log_level="trace") - uvicorn.run("846_quart_race:qapp", log_level="trace", http="h11") - # uvicorn.run("846_quart_race:sapp", log_level="trace") - - diff --git a/payload.lua b/payload.lua deleted file mode 100644 index b841eeb13..000000000 --- a/payload.lua +++ /dev/null @@ -1,7 +0,0 @@ -wrk.method = "POST" -wrk.headers["Content-Type"] = "application/json" -wrk.body = [[ -{"Inputs":[ - {"Text":"They have been given more opportunities to influence the formation and activities of the legislative and executive bodiies.","ModeOverride":"Proactive"} - ],"RequestVersion":2} - ]] \ No newline at end of file From 3df94d785c416be93513c1d898121bbe125a28e0 Mon Sep 17 00:00:00 2001 From: euri10 Date: Wed, 18 Nov 2020 20:48:53 +0100 Subject: [PATCH 23/23] Suggestions implemented --- uvicorn/protocols/http/h11_impl.py | 3 ++- uvicorn/protocols/http/httptools_impl.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/uvicorn/protocols/http/h11_impl.py b/uvicorn/protocols/http/h11_impl.py index fcf3f444c..e9583d70b 100644 --- a/uvicorn/protocols/http/h11_impl.py +++ b/uvicorn/protocols/http/h11_impl.py @@ -145,7 +145,8 @@ def connection_lost(self, exc): # Premature client disconnect pass - self.cycle.message_event.set() + if self.cycle is not None: + self.cycle.message_event.set() if self.flow is not None: self.flow.resume_writing() diff --git a/uvicorn/protocols/http/httptools_impl.py b/uvicorn/protocols/http/httptools_impl.py index f453b2b09..924e131ce 100644 --- a/uvicorn/protocols/http/httptools_impl.py +++ b/uvicorn/protocols/http/httptools_impl.py @@ -145,7 +145,7 @@ def connection_lost(self, exc): if self.cycle and not self.cycle.response_complete: self.cycle.disconnected = True - if self.cycle: + if self.cycle is not None: self.cycle.message_event.set() if self.flow is not None: self.flow.resume_writing()