Skip to content

Commit

Permalink
Code review fixes. Add safe guard check to pool
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkusSintonen committed Jun 16, 2024
1 parent 27e73bb commit b19e879
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 111 deletions.
37 changes: 17 additions & 20 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@

from .._backends.auto import AutoBackend
from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend
from .._exceptions import (
ConnectionNotAvailable,
ServerDisconnectedError,
UnsupportedProtocol,
)
from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
from .._models import Origin, Request, Response
from .._synchronization import AsyncEvent, AsyncShieldCancellation, AsyncThreadLock
from .connection import AsyncHTTPConnection
Expand Down Expand Up @@ -200,7 +196,7 @@ async def handle_async_request(self, request: Request) -> Response:
response = await connection.handle_async_request(
pool_request.request
)
except (ConnectionNotAvailable, ServerDisconnectedError):
except ConnectionNotAvailable:
# In some cases a connection may initially be available to
# handle a request, but then become unavailable.
#
Expand Down Expand Up @@ -244,7 +240,7 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
closing_connections = []

# First we handle cleaning up any connections that are closed,
# have expired their keep-alive, or surplus idle connections.
# have expired, or surplus idle connections.
for connection in list(self._connections):
if connection.is_closed():
# log: "removing closed connection"
Expand All @@ -271,15 +267,12 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
for connection in self._connections
if connection.can_handle_request(origin) and connection.is_available()
]
idle_connections = [
connection for connection in self._connections if connection.is_idle()
]

# There are three cases for how we may be able to handle the request:
#
# 1. There is an existing connection that can handle the request.
# 2. We can create a new connection to handle the request.
# 3. We can close an idle connection and then create a new connection
# 3. We can close an idle/expired connection and then create a new connection
# to handle the request.
if available_connections:
# log: "reusing existing connection"
Expand All @@ -290,15 +283,19 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
elif idle_connections:
# log: "closing idle connection"
connection = idle_connections[0]
self._connections.remove(connection)
closing_connections.append(connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
else:
purged_connection = next(
(c for c in self._connections if c.is_idle() or c.has_expired()),
None,
)
if purged_connection is not None:
# log: "closing idle connection"
self._connections.remove(purged_connection)
closing_connections.append(purged_connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)

return closing_connections

Expand Down
45 changes: 24 additions & 21 deletions httpcore/_async/http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,7 @@ async def handle_async_request(self, request: Request) -> Response:
f"to {self._origin}"
)

async with self._state_lock:
if self._state == HTTPConnectionState.SERVER_DISCONNECTED:
raise ServerDisconnectedError()

# If the HTTP connection is idle but the socket is readable, then the
# only valid state is that the socket is about to return b"", indicating
# a server-initiated disconnect.
server_disconnected = (
self._state == HTTPConnectionState.IDLE
and self._network_stream.get_extra_info("is_readable")
)
if server_disconnected:
self._state = HTTPConnectionState.SERVER_DISCONNECTED
raise ServerDisconnectedError()

if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
self._request_count += 1
self._state = HTTPConnectionState.ACTIVE
self._expire_at = None
else:
raise ConnectionNotAvailable()
await self._update_state()

try:
kwargs = {"request": request}
Expand Down Expand Up @@ -158,6 +138,29 @@ async def handle_async_request(self, request: Request) -> Response:
await self._response_closed()
raise exc

async def _update_state(self) -> None:
async with self._state_lock:
# If the HTTP connection is idle but the socket is readable, then the
# only valid state is that the socket is about to return b"", indicating
# a server-initiated disconnect.
server_disconnected = (
self._state == HTTPConnectionState.IDLE
and self._network_stream.get_extra_info("is_readable")
)
if (
server_disconnected
or self._state == HTTPConnectionState.SERVER_DISCONNECTED
):
self._state = HTTPConnectionState.SERVER_DISCONNECTED
raise ServerDisconnectedError()

if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
self._request_count += 1
self._state = HTTPConnectionState.ACTIVE
self._expire_at = None
else:
raise ConnectionNotAvailable()

# Sending the request...

async def _send_request_headers(self, request: Request) -> None:
Expand Down
17 changes: 10 additions & 7 deletions httpcore/_async/http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,7 @@ async def handle_async_request(self, request: Request) -> Response:
f"to {self._origin}"
)

async with self._state_lock:
if self._state in (HTTPConnectionState.ACTIVE, HTTPConnectionState.IDLE):
self._request_count += 1
self._expire_at = None
self._state = HTTPConnectionState.ACTIVE
else:
raise ConnectionNotAvailable()
await self._update_state()

async with self._init_lock:
if not self._sent_connection_init:
Expand Down Expand Up @@ -184,6 +178,15 @@ async def handle_async_request(self, request: Request) -> Response:

raise exc

async def _update_state(self) -> None:
async with self._state_lock:
if self._state in (HTTPConnectionState.ACTIVE, HTTPConnectionState.IDLE):
self._request_count += 1
self._expire_at = None
self._state = HTTPConnectionState.ACTIVE
else:
raise ConnectionNotAvailable()

async def _send_connection_init(self, request: Request) -> None:
"""
The HTTP/2 connection requires some initial setup before we can start
Expand Down
12 changes: 9 additions & 3 deletions httpcore/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@ def map_exceptions(map: ExceptionMapping) -> Iterator[None]:


class ConnectionNotAvailable(Exception):
pass
"""
This error is handled by the connection pool.
Users should not see this error directly when using connection pool.
"""


class ServerDisconnectedError(Exception):
pass
class ServerDisconnectedError(ConnectionNotAvailable):
"""
This error is handled by the connection pool.
Users should not see this error directly when using connection pool.
"""


class ProxyError(Exception):
Expand Down
37 changes: 17 additions & 20 deletions httpcore/_sync/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@

from .._backends.sync import SyncBackend
from .._backends.base import SOCKET_OPTION, NetworkBackend
from .._exceptions import (
ConnectionNotAvailable,
ServerDisconnectedError,
UnsupportedProtocol,
)
from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
from .._models import Origin, Request, Response
from .._synchronization import Event, ShieldCancellation, ThreadLock
from .connection import HTTPConnection
Expand Down Expand Up @@ -200,7 +196,7 @@ def handle_request(self, request: Request) -> Response:
response = connection.handle_request(
pool_request.request
)
except (ConnectionNotAvailable, ServerDisconnectedError):
except ConnectionNotAvailable:
# In some cases a connection may initially be available to
# handle a request, but then become unavailable.
#
Expand Down Expand Up @@ -244,7 +240,7 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
closing_connections = []

# First we handle cleaning up any connections that are closed,
# have expired their keep-alive, or surplus idle connections.
# have expired, or surplus idle connections.
for connection in list(self._connections):
if connection.is_closed():
# log: "removing closed connection"
Expand All @@ -271,15 +267,12 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
for connection in self._connections
if connection.can_handle_request(origin) and connection.is_available()
]
idle_connections = [
connection for connection in self._connections if connection.is_idle()
]

# There are three cases for how we may be able to handle the request:
#
# 1. There is an existing connection that can handle the request.
# 2. We can create a new connection to handle the request.
# 3. We can close an idle connection and then create a new connection
# 3. We can close an idle/expired connection and then create a new connection
# to handle the request.
if available_connections:
# log: "reusing existing connection"
Expand All @@ -290,15 +283,19 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
elif idle_connections:
# log: "closing idle connection"
connection = idle_connections[0]
self._connections.remove(connection)
closing_connections.append(connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
else:
purged_connection = next(
(c for c in self._connections if c.is_idle() or c.has_expired()),
None,
)
if purged_connection is not None:
# log: "closing idle connection"
self._connections.remove(purged_connection)
closing_connections.append(purged_connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)

return closing_connections

Expand Down
45 changes: 24 additions & 21 deletions httpcore/_sync/http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,7 @@ def handle_request(self, request: Request) -> Response:
f"to {self._origin}"
)

with self._state_lock:
if self._state == HTTPConnectionState.SERVER_DISCONNECTED:
raise ServerDisconnectedError()

# If the HTTP connection is idle but the socket is readable, then the
# only valid state is that the socket is about to return b"", indicating
# a server-initiated disconnect.
server_disconnected = (
self._state == HTTPConnectionState.IDLE
and self._network_stream.get_extra_info("is_readable")
)
if server_disconnected:
self._state = HTTPConnectionState.SERVER_DISCONNECTED
raise ServerDisconnectedError()

if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
self._request_count += 1
self._state = HTTPConnectionState.ACTIVE
self._expire_at = None
else:
raise ConnectionNotAvailable()
self._update_state()

try:
kwargs = {"request": request}
Expand Down Expand Up @@ -158,6 +138,29 @@ def handle_request(self, request: Request) -> Response:
self._response_closed()
raise exc

def _update_state(self) -> None:
with self._state_lock:
# If the HTTP connection is idle but the socket is readable, then the
# only valid state is that the socket is about to return b"", indicating
# a server-initiated disconnect.
server_disconnected = (
self._state == HTTPConnectionState.IDLE
and self._network_stream.get_extra_info("is_readable")
)
if (
server_disconnected
or self._state == HTTPConnectionState.SERVER_DISCONNECTED
):
self._state = HTTPConnectionState.SERVER_DISCONNECTED
raise ServerDisconnectedError()

if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
self._request_count += 1
self._state = HTTPConnectionState.ACTIVE
self._expire_at = None
else:
raise ConnectionNotAvailable()

# Sending the request...

def _send_request_headers(self, request: Request) -> None:
Expand Down
17 changes: 10 additions & 7 deletions httpcore/_sync/http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,7 @@ def handle_request(self, request: Request) -> Response:
f"to {self._origin}"
)

with self._state_lock:
if self._state in (HTTPConnectionState.ACTIVE, HTTPConnectionState.IDLE):
self._request_count += 1
self._expire_at = None
self._state = HTTPConnectionState.ACTIVE
else:
raise ConnectionNotAvailable()
self._update_state()

with self._init_lock:
if not self._sent_connection_init:
Expand Down Expand Up @@ -184,6 +178,15 @@ def handle_request(self, request: Request) -> Response:

raise exc

def _update_state(self) -> None:
with self._state_lock:
if self._state in (HTTPConnectionState.ACTIVE, HTTPConnectionState.IDLE):
self._request_count += 1
self._expire_at = None
self._state = HTTPConnectionState.ACTIVE
else:
raise ConnectionNotAvailable()

def _send_connection_init(self, request: Request) -> None:
"""
The HTTP/2 connection requires some initial setup before we can start
Expand Down
7 changes: 1 addition & 6 deletions tests/_async/test_http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import pytest

import httpcore
from httpcore._exceptions import ServerDisconnectedError


@pytest.mark.anyio
Expand Down Expand Up @@ -202,11 +201,7 @@ def get_extra_info(self, info: str) -> typing.Any:
assert conn.is_idle() and not conn.has_expired()
stream.mock_is_readable = True # Simulate connection breakage

with pytest.raises(ServerDisconnectedError):
await conn.request("GET", "https://example.com/")
assert conn.has_expired() and not conn.is_idle()

with pytest.raises(ServerDisconnectedError):
with pytest.raises(httpcore.ServerDisconnectedError):
await conn.request("GET", "https://example.com/")
assert conn.has_expired() and not conn.is_idle()

Expand Down
7 changes: 1 addition & 6 deletions tests/_sync/test_http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import pytest

import httpcore
from httpcore._exceptions import ServerDisconnectedError



Expand Down Expand Up @@ -202,11 +201,7 @@ def get_extra_info(self, info: str) -> typing.Any:
assert conn.is_idle() and not conn.has_expired()
stream.mock_is_readable = True # Simulate connection breakage

with pytest.raises(ServerDisconnectedError):
conn.request("GET", "https://example.com/")
assert conn.has_expired() and not conn.is_idle()

with pytest.raises(ServerDisconnectedError):
with pytest.raises(httpcore.ServerDisconnectedError):
conn.request("GET", "https://example.com/")
assert conn.has_expired() and not conn.is_idle()

Expand Down

0 comments on commit b19e879

Please sign in to comment.