Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition that leads Quart to hang with uvicorn #848

Merged
merged 25 commits into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions 846_quart_race.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import asyncio
from typing import Generator, Any, Set

from quart import Quart, request as qrequest

from starlette.applications import Starlette
from starlette.responses import Response
import uvicorn

qapp = Quart(__name__)
sapp = Starlette()


@sapp.route('/', methods=['POST'])
async def starlette(request):
data = await request.body()
return Response(data)


@qapp.route('/', methods=['POST'])
async def quart():
data = await qrequest.get_data()
return data
# return data, 200, {'Connection': 'close'}


async def aapp(scope, receive, send):
if scope["type"] == "http":
asgi_handler = ASGIHTTPConnection()
await asgi_handler(receive, send)


class ASGIHTTPConnection:

def __init__(self):
self.body = Body()

async def __call__(self, receive, send):
receiver_task = asyncio.ensure_future(self.handle_messages(self.body, receive))
handler_task = asyncio.ensure_future(self.handle_request(self.body, send))
done, pending = await asyncio.wait(
[handler_task, receiver_task], return_when=asyncio.FIRST_COMPLETED
)
await self._cancel_tasks(pending)

async def _cancel_tasks(self, tasks: Set[asyncio.Future]) -> None:
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)

async def handle_messages(self, body, receive) -> None:
while True:
message = await receive()
if message["type"] == "http.request":
body.append(message.get("body", b""))
if not message.get("more_body", False):
body.set_complete()
elif message["type"] == "http.disconnect":
return

async def handle_request(self, body, send) -> None:
data = await body
await send({
'type': 'http.response.start',
'status': 200,
'headers': [(b'content-length', b"%d" % len(data))],
})
await send({
'type': 'http.response.body',
'body': data,
'more_body': False,
})


class Body:

def __init__(self) -> None:
self._data = bytearray()
self._complete: asyncio.Event = asyncio.Event()

def __await__(self) -> Generator[Any, None, Any]:
yield from self._complete.wait().__await__()
return bytes(self._data)

def append(self, data: bytes) -> None:
self._data.extend(data)

def set_complete(self) -> None:
self._complete.set()


async def wait_for_disconnect(receive):
while True:
p = await receive()
if p['type'] == 'http.disconnect':
print('Disconnected!')
break


async def app748(scope, receive, send):
await asyncio.sleep(0.2)
m = await receive()

if m['type'] == 'lifespan.startup':
await send({'type': 'lifespan.startup.complete'})
elif m['type'] == 'http.request':
if scope['path'] == '/':
asyncio.create_task(wait_for_disconnect(receive))
await asyncio.sleep(0.2)

await send({'type': 'http.response.start', 'status': 404})
await send({'type': 'http.response.body', 'body': b'Not found!\n'})

if __name__ == '__main__':
# uvicorn.run("846_quart_race:app748", log_level="trace")
uvicorn.run("846_quart_race:aapp", log_level="trace")
# uvicorn.run("846_quart_race:qapp", log_level="trace")
# uvicorn.run("846_quart_race:sapp", log_level="trace")


7 changes: 7 additions & 0 deletions payload.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
wrk.method = "POST"
wrk.headers["Content-Type"] = "application/json"
wrk.body = [[
{"Inputs":[
{"Text":"They have been given more opportunities to influence the formation and activities of the legislative and executive bodiies.","ModeOverride":"Proactive"}
],"RequestVersion":2}
]]
10 changes: 5 additions & 5 deletions uvicorn/protocols/http/httptools_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ def __init__(self, config, server_state, _loop=None):
self.headers = None
self.expect_100_continue = False
self.cycle = None
self.message_event = asyncio.Event()

# Protocol interface
def connection_made(self, transport):
Expand All @@ -146,7 +145,8 @@ def connection_lost(self, exc):

if self.cycle and not self.cycle.response_complete:
self.cycle.disconnected = True
self.message_event.set()
if self.cycle:
self.cycle.message_event.set()
if self.flow is not None:
self.flow.resume_writing()

Expand Down Expand Up @@ -267,7 +267,7 @@ def on_headers_complete(self):
access_logger=self.access_logger,
access_log=self.access_log,
default_headers=self.default_headers,
message_event=self.message_event,
message_event=asyncio.Event(),
expect_100_continue=self.expect_100_continue,
keep_alive=http_version != "1.0",
on_response=self.on_response_complete,
Expand All @@ -288,13 +288,13 @@ def on_body(self, body: bytes):
self.cycle.body += body
if len(self.cycle.body) > HIGH_WATER_LIMIT:
self.flow.pause_reading()
self.message_event.set()
self.cycle.message_event.set()

def on_message_complete(self):
if self.parser.should_upgrade() or self.cycle.response_complete:
return
self.cycle.more_body = False
self.message_event.set()
self.cycle.message_event.set()

def on_response_complete(self):
# Callback for pipelined HTTP requests to be started.
Expand Down