Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

get-or-create-connection should be an atomic operation #81

Merged
merged 1 commit into from
May 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from ssl import SSLContext
from typing import AsyncIterator, Callable, Dict, Optional, Set, Tuple

from .._backends.auto import AsyncSemaphore, AutoBackend
from .._backends.auto import AsyncLock, AsyncSemaphore, AutoBackend
from .._exceptions import PoolTimeout
from .._threadlock import ThreadLock
from .._types import URL, Headers, Origin, TimeoutDict
Expand Down Expand Up @@ -107,6 +107,12 @@ def _connection_semaphore(self) -> AsyncSemaphore:

return self._internal_semaphore

@property
def _connection_acquiry_lock(self) -> AsyncLock:
if not hasattr(self, "_internal_connection_acquiry_lock"):
self._internal_connection_acquiry_lock = self._backend.create_lock()
return self._internal_connection_acquiry_lock

async def request(
self,
method: bytes,
Expand All @@ -123,13 +129,17 @@ async def request(

connection: Optional[AsyncHTTPConnection] = None
while connection is None:
connection = await self._get_connection_from_pool(origin)

if connection is None:
connection = AsyncHTTPConnection(
origin=origin, http2=self._http2, ssl_context=self._ssl_context,
)
await self._add_to_pool(connection, timeout=timeout)
async with self._connection_acquiry_lock:
# We get-or-create a connection as an atomic operation, to ensure
# that HTTP/2 requests issued in close concurrency will end up
# on the same connection.
connection = await self._get_connection_from_pool(origin)

if connection is None:
connection = AsyncHTTPConnection(
origin=origin, http2=self._http2, ssl_context=self._ssl_context,
)
await self._add_to_pool(connection, timeout=timeout)

try:
response = await connection.request(
Expand Down
26 changes: 18 additions & 8 deletions httpcore/_sync/connection_pool.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from ssl import SSLContext
from typing import Iterator, Callable, Dict, Optional, Set, Tuple

from .._backends.auto import SyncSemaphore, SyncBackend
from .._backends.auto import SyncLock, SyncSemaphore, SyncBackend
from .._exceptions import PoolTimeout
from .._threadlock import ThreadLock
from .._types import URL, Headers, Origin, TimeoutDict
Expand Down Expand Up @@ -107,6 +107,12 @@ def _connection_semaphore(self) -> SyncSemaphore:

return self._internal_semaphore

@property
def _connection_acquiry_lock(self) -> SyncLock:
if not hasattr(self, "_internal_connection_acquiry_lock"):
self._internal_connection_acquiry_lock = self._backend.create_lock()
return self._internal_connection_acquiry_lock

def request(
self,
method: bytes,
Expand All @@ -123,13 +129,17 @@ def request(

connection: Optional[SyncHTTPConnection] = None
while connection is None:
connection = self._get_connection_from_pool(origin)

if connection is None:
connection = SyncHTTPConnection(
origin=origin, http2=self._http2, ssl_context=self._ssl_context,
)
self._add_to_pool(connection, timeout=timeout)
with self._connection_acquiry_lock:
# We get-or-create a connection as an atomic operation, to ensure
# that HTTP/2 requests issued in close concurrency will end up
# on the same connection.
connection = self._get_connection_from_pool(origin)

if connection is None:
connection = SyncHTTPConnection(
origin=origin, http2=self._http2, ssl_context=self._ssl_context,
)
self._add_to_pool(connection, timeout=timeout)

try:
response = connection.request(
Expand Down