diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 49a1ed36..30a72d42 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -1,7 +1,7 @@ from ssl import SSLContext from typing import AsyncIterator, Callable, Dict, Optional, Set, Tuple -from .._backends.auto import AsyncSemaphore, AutoBackend +from .._backends.auto import AsyncLock, AsyncSemaphore, AutoBackend from .._exceptions import PoolTimeout from .._threadlock import ThreadLock from .._types import URL, Headers, Origin, TimeoutDict @@ -107,6 +107,12 @@ def _connection_semaphore(self) -> AsyncSemaphore: return self._internal_semaphore + @property + def _connection_acquiry_lock(self) -> AsyncLock: + if not hasattr(self, "_internal_connection_acquiry_lock"): + self._internal_connection_acquiry_lock = self._backend.create_lock() + return self._internal_connection_acquiry_lock + async def request( self, method: bytes, @@ -123,13 +129,17 @@ async def request( connection: Optional[AsyncHTTPConnection] = None while connection is None: - connection = await self._get_connection_from_pool(origin) - - if connection is None: - connection = AsyncHTTPConnection( - origin=origin, http2=self._http2, ssl_context=self._ssl_context, - ) - await self._add_to_pool(connection, timeout=timeout) + async with self._connection_acquiry_lock: + # We get-or-create a connection as an atomic operation, to ensure + # that HTTP/2 requests issued in close concurrency will end up + # on the same connection. + connection = await self._get_connection_from_pool(origin) + + if connection is None: + connection = AsyncHTTPConnection( + origin=origin, http2=self._http2, ssl_context=self._ssl_context, + ) + await self._add_to_pool(connection, timeout=timeout) try: response = await connection.request( diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index 7a877e61..d08da742 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -1,7 +1,7 @@ from ssl import SSLContext from typing import Iterator, Callable, Dict, Optional, Set, Tuple -from .._backends.auto import SyncSemaphore, SyncBackend +from .._backends.auto import SyncLock, SyncSemaphore, SyncBackend from .._exceptions import PoolTimeout from .._threadlock import ThreadLock from .._types import URL, Headers, Origin, TimeoutDict @@ -107,6 +107,12 @@ def _connection_semaphore(self) -> SyncSemaphore: return self._internal_semaphore + @property + def _connection_acquiry_lock(self) -> SyncLock: + if not hasattr(self, "_internal_connection_acquiry_lock"): + self._internal_connection_acquiry_lock = self._backend.create_lock() + return self._internal_connection_acquiry_lock + def request( self, method: bytes, @@ -123,13 +129,17 @@ def request( connection: Optional[SyncHTTPConnection] = None while connection is None: - connection = self._get_connection_from_pool(origin) - - if connection is None: - connection = SyncHTTPConnection( - origin=origin, http2=self._http2, ssl_context=self._ssl_context, - ) - self._add_to_pool(connection, timeout=timeout) + with self._connection_acquiry_lock: + # We get-or-create a connection as an atomic operation, to ensure + # that HTTP/2 requests issued in close concurrency will end up + # on the same connection. + connection = self._get_connection_from_pool(origin) + + if connection is None: + connection = SyncHTTPConnection( + origin=origin, http2=self._http2, ssl_context=self._ssl_context, + ) + self._add_to_pool(connection, timeout=timeout) try: response = connection.request(