Skip to content

Commit

Permalink
Merge pull request #473 from elastic-coders/fix_connection_polling_take2
Browse files Browse the repository at this point in the history
Fix connection pooling
  • Loading branch information
fafhrd91 committed Aug 26, 2015
2 parents c577876 + d0c26e4 commit 3c3fdfa
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 25 deletions.
2 changes: 1 addition & 1 deletion aiohttp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def request(self, method, url, *,
try:
yield from resp.start(conn, read_until_eof)
except:
resp.close()
resp.close(force=True)
conn.close()
raise
except (aiohttp.HttpProcessingError,
Expand Down
26 changes: 9 additions & 17 deletions aiohttp/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,12 +291,6 @@ def connect(self, req):
else:
transport, proto = yield from self._create_connection(req)

if not self._force_close:
if self._conns.get(key, None) is None:
self._conns[key] = []

self._conns[key].append((transport, proto,
self._loop.time()))
except asyncio.TimeoutError as exc:
raise ClientTimeoutError(
'Connection timeout to host %s:%s ssl:%s' % key) from exc
Expand All @@ -309,7 +303,10 @@ def connect(self, req):
return conn

def _get(self, key):
conns = self._conns.get(key)
try:
conns = self._conns[key]
except KeyError:
return None, None
t1 = self._loop.time()
while conns:
transport, proto, t0 = conns.pop()
Expand All @@ -318,8 +315,12 @@ def _get(self, key):
transport.close()
transport = None
else:
if not conns:
# The very last connection was reclaimed: drop the key
del self._conns[key]
return transport, proto

# No more connections: drop the key
del self._conns[key]
return None, None

def _release(self, key, req, transport, protocol, *, should_close=False):
Expand Down Expand Up @@ -357,15 +358,6 @@ def _release(self, key, req, transport, protocol, *, should_close=False):

reader = protocol.reader
if should_close or (reader.output and not reader.output.at_eof()):
conns = self._conns.get(key)
if conns is not None and len(conns) >= 0:
# Issue #253: An empty array will eventually be
# removed by cleanup, but it's better to pop straight
# away, because cleanup might not get called (e.g. if
# keepalive is False).
if not acquired:
self._conns.pop(key, None)

transport.close()
else:
conns = self._conns.get(key)
Expand Down
20 changes: 13 additions & 7 deletions tests/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def test_get_expired(self):
tr, proto = unittest.mock.Mock(), unittest.mock.Mock()
conn._conns[1] = [(tr, proto, self.loop.time() - 1000)]
self.assertEqual(conn._get(1), (None, None))
self.assertEqual(conn._conns[1], [])
self.assertFalse(conn._conns)
conn.close()

def test_release(self):
Expand Down Expand Up @@ -278,8 +278,17 @@ def test_release_close(self):
self.assertFalse(conn._conns)
self.assertTrue(tr.close.called)

def test_release_pop_empty_conns(self):
# see issue #253
def test_get_pop_empty_conns(self):
# see issue #473
conn = aiohttp.BaseConnector(loop=self.loop)
key = ('127.0.0.1', 80, False)
conn._conns[key] = []
tr, proto = conn._get(key)
self.assertEqual((None, None), (tr, proto))
self.assertFalse(conn._conns)

def test_release_close_do_not_add_to_pool(self):
# see issue #473
conn = aiohttp.BaseConnector(loop=self.loop)
req = unittest.mock.Mock()
resp = unittest.mock.Mock()
Expand All @@ -288,13 +297,10 @@ def test_release_pop_empty_conns(self):

key = ('127.0.0.1', 80, False)

conn._conns[key] = []

tr, proto = unittest.mock.Mock(), unittest.mock.Mock()
conn._acquired[key].add(tr)
conn._release(key, req, tr, proto)
self.assertEqual({}, conn._conns)
self.assertTrue(tr.close.called)
self.assertFalse(conn._conns)

def test_release_close_do_not_delete_existing_connections(self):
key = ('127.0.0.1', 80, False)
Expand Down

0 comments on commit 3c3fdfa

Please sign in to comment.