Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Safe async cancellations. #880

Merged
merged 14 commits into from
Feb 12, 2024
Merged

Safe async cancellations. #880

merged 14 commits into from
Feb 12, 2024

Conversation

tomchristie
Copy link
Member

@tomchristie tomchristie commented Feb 6, 2024

Closes #830
Closes #785
Closes #861
Closes encode/httpx#1171

Let's talk about what's going on here.

We've got an issue with handling async cancellations correctly, which needs some re-working in order to comprehensively resolve it. We're somewhat in contrast to either urllib3 (sync) or aiohttp (async) here, because we're having to get both the thread-safe and the task-safe-plus-also-support-cancellations cases correct.

Currently we're at thread-safe plus task-safe, but missing correct handling of also allowing external cancellations at any point.

So then...


This pull request reworks the handling of the connection pool state. The state is a list of connections, a list of requests, and an association between each request and the connection that has been selected to handle it.

The fundamental change in this pull request is that the management of the pool state has been re-worked so that it does not include I/O.

def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
"""
Manage the state of the connection pool, assigning incoming
requests to connections as available.
Called whenever a new request is added or removed from the pool.
Any closing connections are returned, allowing the I/O for closing
those connections to be handled seperately.
"""
closing_connections = []
# First we handle cleaning up any connections that are closed,
# have expired their keep-alive, or surplus idle connections.
for connection in list(self._connections):
if connection.is_closed():
# log: "removing closed connection"
self._connections.remove(connection)
elif connection.has_expired():
# log: "closing expired connection"
self._connections.remove(connection)
closing_connections.append(connection)
elif (
connection.is_idle()
and len([connection.is_idle() for connection in self._connections])
> self._max_keepalive_connections
):
# log: "closing idle connection"
self._connections.remove(connection)
closing_connections.append(connection)
# Assign queued requests to connections.
queued_requests = [
request for request in self._requests if request.connection is None
]
for pool_request in queued_requests:
origin = pool_request.request.url.origin
avilable_connections = [
connection
for connection in self._connections
if connection.can_handle_request(origin) and connection.is_available()
]
idle_connections = [
connection for connection in self._connections if connection.is_idle()
]
# There are three cases for how we may be able to handle the request:
#
# 1. There is an existing connection that can handle the request.
# 2. We can create a new connection to handle the request.
# 3. We can close an idle connection and then create a new connection
# to handle the request.
if avilable_connections:
# log: "reusing existing connection"
connection = avilable_connections[0]
pool_request.assign_to_connection(connection)
elif len(self._connections) < self._max_connections:
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
elif idle_connections:
# log: "closing idle connection"
connection = idle_connections[0]
self._connections.remove(connection)
closing_connections.append(connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
return closing_connections

This ensures that updating the state of the connection pool is always atomic.


As part of resolving this, also adds connection pool __repr__ to demonstate the state more clearly.

The look like so...

<httpcore.ConnectionPool [Requests: 10 active, 3 queued | Connections: 2 active, idle]>

Comment on lines 68 to 71
assert info == [
"<AsyncHTTPConnection ['http://example.com:80', HTTP/1.1, ACTIVE, Request Count: 1]>",
"<AsyncHTTPConnection ['https://example.com:443', HTTP/1.1, IDLE, Request Count: 2]>",
"<AsyncHTTPConnection ['http://example.com:80', HTTP/1.1, ACTIVE, Request Count: 1]>",
]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ordering of connections changes slightly here.
We're now working with a policy of... connections remain in the order they were created.

Comment on lines 77 to 80
assert info == [
"<AsyncHTTPConnection ['http://example.com:80', HTTP/1.1, IDLE, Request Count: 1]>",
"<AsyncHTTPConnection ['https://example.com:443', HTTP/1.1, IDLE, Request Count: 2]>",
"<AsyncHTTPConnection ['http://example.com:80', HTTP/1.1, IDLE, Request Count: 1]>",
]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again the ordering is slightly different from previous behaviour.
I would suggest this is more obvious.

Comment on lines 228 to 230
assert info == [
"<AsyncHTTPConnection ['https://example.com:443', HTTP/2, IDLE, Request Count: 1]>",
"<AsyncHTTPConnection ['https://example.com:443', HTTP/2, CLOSED, Request Count: 1]>",
]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a marginal behavior change here.
We've (correctly) removed the closed connection from the pool.

extensions=response.extensions,
)

async def response_closed(self, status: RequestStatus) -> None:
def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where we mange the state of the connection pool, entirely within a non-I/O block. The async case cannot have cancellations or context-switches midway through the state management. The sync case is explicitly guarded with a thread lock.

Review is probably best made against the module as a whole, rather than the diff.

https://github.com/encode/httpcore/blob/f91789652485d5ec7651458bc20a9164ff56dbd6/httpcore/_async/connection_pool.py

@tomchristie tomchristie marked this pull request as ready for review February 7, 2024 15:01
@tomchristie tomchristie requested a review from a team February 7, 2024 15:14
@tomchristie tomchristie changed the title Connection pool work Safe async cancellations. Feb 7, 2024
@rattrayalex
Copy link

rattrayalex commented Feb 9, 2024

FYI, a user reports that this branch does indeed fix issues they saw in the OpenAI Python client library: openai/openai-python#1059 (comment)

Thank you for the work on this so far!

@tomchristie
Copy link
Member Author

Fantastic, thanks for the feedback.

@karpetrosyan
Copy link
Member

Amazing work @tomchristie, this PR rsolves many problems..

Copy link
Member

@karpetrosyan karpetrosyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we also need a changelog here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants