Skip to content

Commit

Permalink
introduce connector capacity feature #1601
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikolay Kim committed Feb 9, 2017
1 parent 0ddc337 commit 965e8be
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 199 deletions.
5 changes: 5 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ CHANGES
1.4.0 (XXXX-XX-XX)
------------------

- Added `capacity` parameter for client connector object.
Capacity is the total number of simultaneous connections. #1601

- Deprecate connector's `limit` parameter #1601

- Dropped: `aiohttp.protocol.HttpPrefixParser` #1590

- Dropped: Servers response's `.started`, `.start()` and `.can_start()` method #1591
Expand Down
200 changes: 105 additions & 95 deletions aiohttp/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
import sys
import traceback
import warnings
from collections import defaultdict
from hashlib import md5, sha1, sha256
from itertools import chain
from types import MappingProxyType

import aiohttp
Expand Down Expand Up @@ -86,22 +84,29 @@ def release(self):

def detach(self):
if self._transport is not None:
self._connector._release_acquired(self._key, self._transport)
self._connector._release_acquired(self._transport)
self._transport = None

@property
def closed(self):
return self._transport is None


class _TransportPlaceholder:
""" placeholder for BaseConnector.connect function """

def close(self):
pass


class BaseConnector(object):
"""Base connector class.
conn_timeout - (optional) Connect timeout.
keepalive_timeout - (optional) Keep-alive timeout.
force_close - Set to True to force close and do reconnect
after each request (and between redirects).
limit - The limit of simultaneous connections to the same endpoint.
capacity - The total number of simultaneous connections.
disable_cleanup_closed - Disable clean-up closed ssl transports.
loop - Optional event loop.
"""
Expand All @@ -113,8 +118,16 @@ class BaseConnector(object):
_cleanup_closed_period = 2.0

def __init__(self, *, conn_timeout=None, keepalive_timeout=sentinel,
force_close=False, limit=20, time_service=None,
disable_cleanup_closed=False, loop=None):
force_close=False, capacity=20, limit=sentinel,
time_service=None, disable_cleanup_closed=False, loop=None):

if limit is not sentinel:
capacity = limit
warnings.warn('`limit` is deprecated use `capacity` instead #1601',
DeprecationWarning, stacklevel=2)

if capacity is None or capacity is sentinel:
capacity = 0

if force_close:
if keepalive_timeout is not None and \
Expand All @@ -133,12 +146,12 @@ def __init__(self, *, conn_timeout=None, keepalive_timeout=sentinel,
self._source_traceback = traceback.extract_stack(sys._getframe(1))

self._conns = {}
self._acquired = defaultdict(set)
self._capacity = capacity
self._acquired = set()
self._conn_timeout = conn_timeout
self._keepalive_timeout = keepalive_timeout
self._force_close = force_close
self._limit = limit
self._waiters = defaultdict(list)
self._waiters = []

if time_service is not None:
self._time_service_owner = False
Expand Down Expand Up @@ -201,16 +214,22 @@ def force_close(self):
return self._force_close

@property
def limit(self):
"""The limit for simultaneous connections to the same endpoint.
def capacity(self):
"""The total number for simultaneous connections.
Endpoints are the same if they are have equal
(host, port, is_ssl) triple.
The default capacity size is 20.
"""
return self._capacity

@property
def limit(self):
"""The total number for simultaneous connections.
If limit is None the connector has no limit.
The default limit size is 20.
Deprecated
"""
return self._limit
warnings.warn('`limit` is deprecated use `capacity` instead #1601',
DeprecationWarning)
return self._capacity

def _cleanup(self):
"""Cleanup unused transports."""
Expand Down Expand Up @@ -275,11 +294,11 @@ def close(self):
if self._time_service_owner:
self._time_service.close()

for key, data in self._conns.items():
for data in self._conns.values():
for transport, proto, t0 in data:
transport.close()

for transport in chain(*self._acquired.values()):
for transport in self._acquired:
transport.close()

# cacnel cleanup task
Expand All @@ -296,6 +315,7 @@ def close(self):
finally:
self._conns.clear()
self._acquired.clear()
self._waiters.clear()
self._cleanup_handle = None
self._cleanup_closed_transports.clear()
self._cleanup_closed_handle = None
Expand All @@ -313,64 +333,55 @@ def closed(self):
@asyncio.coroutine
def connect(self, req):
"""Get from pool or create new connection."""
key = (req.host, req.port, req.ssl)

limit = self._limit
if limit is not None:
fut = helpers.create_future(self._loop)
waiters = self._waiters[key]

# The limit defines the maximum number of concurrent connections
# for a key. Waiters must be counted against the limit, even before
# the underlying connection is created.
available = limit - len(waiters) - len(self._acquired[key])
if self._capacity:
# The capacity defines the maximum number of concurrent connections
available = (self._capacity -
len(self._waiters) - len(self._acquired))

# Don't wait if there are connections available.
if available > 0:
fut.set_result(None)

# This connection will now count towards the limit.
waiters.append(fut)
# Wait if there are not available connections.
if available <= 0:
fut = helpers.create_future(self._loop)

try:
if limit is not None:
# This connection will now count towards the limit.
self._waiters.append(fut)
yield from fut
self._waiters.remove(fut)

transport, proto = self._get(key)
if transport is None:
try:
if self._conn_timeout:
transport, proto = yield from asyncio.wait_for(
self._create_connection(req),
self._conn_timeout, loop=self._loop)
else:
transport, proto = \
yield from self._create_connection(req)

except asyncio.TimeoutError as exc:
raise ClientTimeoutError(
'Connection timeout to host {0[0]}:{0[1]} ssl:{0[2]}'
.format(key)) from exc
except OSError as exc:
raise ClientOSError(
exc.errno,
'Cannot connect to host {0[0]}:{0[1]} ssl:{0[2]} [{1}]'
.format(key, exc.strerror)) from exc
except:
self._release_waiter(key)
raise

self._acquired[key].add(transport)
conn = Connection(self, key, req, transport, proto, self._loop)
return conn
key = (req.host, req.port, req.ssl)
transport, proto = self._get(key)
if transport is None:
placeholder = _TransportPlaceholder()
self._acquired.add(placeholder)
try:
if self._conn_timeout:
transport, proto = yield from asyncio.wait_for(
self._create_connection(req),
self._conn_timeout, loop=self._loop)
else:
transport, proto = yield from self._create_connection(req)

except asyncio.TimeoutError as exc:
raise ClientTimeoutError(
'Connection timeout to host {0[0]}:{0[1]} ssl:{0[2]}'
.format(key)) from exc
except OSError as exc:
raise ClientOSError(
exc.errno,
'Cannot connect to host {0[0]}:{0[1]} ssl:{0[2]} [{1}]'
.format(key, exc.strerror)) from exc
finally:
self._acquired.remove(placeholder)

self._acquired.add(transport)
return Connection(self, key, req, transport, proto, self._loop)

def _get(self, key):
try:
conns = self._conns[key]
except KeyError:
return None, None

t1 = self._loop.time()
t1 = self._time_service.loop_time()
while conns:
transport, proto, t0 = conns.pop()
if transport is not None and proto.is_connected():
Expand All @@ -388,39 +399,34 @@ def _get(self, key):
del self._conns[key]
return None, None

def _release_waiter(self, key):
waiters = self._waiters[key]
while waiters:
waiter = waiters.pop(0)
if not waiter.done():
waiter.set_result(None)
break
def _release_waiters(self):
if self._capacity > 0:
available = self._capacity - len(self._acquired)
if available > 0:
for waiter in self._waiters[:available]:
if not waiter.done():
waiter.set_result(None)

def _release_acquired(self, key, transport):
def _release_acquired(self, transport):
if self._closed:
# acquired connection is already released on connector closing
return

acquired = self._acquired[key]
try:
acquired.remove(transport)
self._acquired.remove(transport)
except KeyError: # pragma: no cover
# this may be result of undetermenistic order of objects
# finalization due garbage collection.
return None

return acquired
pass
else:
self._release_waiters()

def _release(self, key, req, transport, protocol, *, should_close=False):
if self._closed:
# acquired connection is already released on connector closing
return

acquired = self._release_acquired(key, transport)

if self._limit is not None and acquired is not None:
if len(acquired) < self._limit:
self._release_waiter(key)
self._release_acquired(transport)

resp = req.response

Expand All @@ -440,7 +446,7 @@ def _release(self, key, req, transport, protocol, *, should_close=False):
conns = self._conns.get(key)
if conns is None:
conns = self._conns[key] = []
conns.append((transport, protocol, self._loop.time()))
conns.append((transport, protocol, self._time_service.loop_time()))
reader.unset_parser()

@asyncio.coroutine
Expand Down Expand Up @@ -471,18 +477,20 @@ class TCPConnector(BaseConnector):
keepalive_timeout - (optional) Keep-alive timeout.
force_close - Set to True to force close and do reconnect
after each request (and between redirects).
limit - The limit of simultaneous connections to the same endpoint.
capacity - The total number of simultaneous connections.
loop - Optional event loop.
"""

def __init__(self, *, verify_ssl=True, fingerprint=None,
resolve=sentinel, use_dns_cache=sentinel,
family=0, ssl_context=None, local_addr=None, resolver=None,
family=0, ssl_context=None, local_addr=None,
resolver=None, time_service=None,
conn_timeout=None, keepalive_timeout=sentinel,
force_close=False, limit=20, loop=None):
super().__init__(conn_timeout=conn_timeout,
force_close=False, capacity=20, limit=sentinel, loop=None):
super().__init__(time_service=time_service, conn_timeout=conn_timeout,
keepalive_timeout=keepalive_timeout,
force_close=force_close, limit=limit, loop=loop)
force_close=force_close,
capacity=capacity, limit=limit, loop=loop)

if not verify_ssl and ssl_context is not None:
raise ValueError(
Expand Down Expand Up @@ -685,7 +693,6 @@ def _create_proxy_connection(self, req):
key = (req.host, req.port, req.ssl)
conn = Connection(self, key, proxy_req,
transport, proto, self._loop)
self._acquired[key].add(conn._transport)
proxy_resp = proxy_req.send(conn.writer, conn.reader)
try:
resp = yield from proxy_resp.start(conn, True)
Expand All @@ -694,7 +701,7 @@ def _create_proxy_connection(self, req):
conn.close()
raise
else:
conn.detach()
conn._transport = None
try:
if resp.status != 200:
raise HttpProxyError(code=resp.status,
Expand Down Expand Up @@ -725,7 +732,7 @@ class UnixConnector(BaseConnector):
keepalive_timeout - (optional) Keep-alive timeout.
force_close - Set to True to force close and do reconnect
after each request (and between redirects).
limit - The limit of simultaneous connections to the same endpoint.
capacity - The total number of simultaneous connections.
loop - Optional event loop.
Usage:
Expand All @@ -736,12 +743,15 @@ class UnixConnector(BaseConnector):
"""

def __init__(self, path, force_close=False, conn_timeout=None,
keepalive_timeout=sentinel, limit=20, loop=None):
def __init__(self, path, force_close=False,
time_service=None,
conn_timeout=None, keepalive_timeout=sentinel,
capacity=20, limit=sentinel, loop=None):
super().__init__(force_close=force_close,
time_service=time_service,
conn_timeout=conn_timeout,
keepalive_timeout=keepalive_timeout,
limit=limit, loop=loop)
capacity=capacity, limit=limit, loop=loop)
self._path = path

@property
Expand Down
Loading

0 comments on commit 965e8be

Please sign in to comment.