Skip to content

Commit

Permalink
Merge pull request #357 from KeepSafe/refactor_connector_cleanup
Browse files Browse the repository at this point in the history
Refactor connector cleanup
  • Loading branch information
asvetlov committed May 11, 2015
2 parents 015d80a + e15c602 commit f99348b
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 12 deletions.
24 changes: 16 additions & 8 deletions aiohttp/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import traceback
import warnings

from math import ceil

from . import hdrs
from .client import ClientRequest
Expand Down Expand Up @@ -160,31 +161,38 @@ def _cleanup(self):
now = self._loop.time()

connections = {}
timeout = self._keepalive_timeout

for key, conns in self._conns.items():
alive = []
for transport, proto, t0 in conns:
if transport is not None:
if proto and not proto.is_connected():
transport = None
elif (now - t0) > self._keepalive_timeout:
transport.close()
transport = None
else:
delta = t0 + self._keepalive_timeout - now
if delta < 0:
transport.close()
transport = None
elif delta < timeout:
timeout = delta

if transport:
if transport is not None:
alive.append((transport, proto, t0))
if alive:
connections[key] = alive

if connections:
self._cleanup_handle = self._loop.call_later(
self._keepalive_timeout, self._cleanup)
self._cleanup_handle = self._loop.call_at(
ceil(now + timeout), self._cleanup)

self._conns = connections

def _start_cleanup_task(self):
if self._cleanup_handle is None:
self._cleanup_handle = self._loop.call_later(
self._keepalive_timeout, self._cleanup)
now = self._loop.time()
self._cleanup_handle = self._loop.call_at(
ceil(now + self._keepalive_timeout), self._cleanup)

def close(self):
"""Close all opened transports."""
Expand Down
34 changes: 30 additions & 4 deletions tests/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,13 +374,14 @@ def test_connect_oserr(self):

def test_start_cleanup_task(self):
loop = unittest.mock.Mock()
conn = aiohttp.BaseConnector(loop=loop)
loop.time.return_value = 1.5
conn = aiohttp.BaseConnector(loop=loop, keepalive_timeout=10)
self.assertIsNone(conn._cleanup_handle)

conn._start_cleanup_task()
self.assertIsNotNone(conn._cleanup_handle)
loop.call_later.assert_called_with(
conn._keepalive_timeout, conn._cleanup)
loop.call_at.assert_called_with(
12, conn._cleanup)

def test_cleanup(self):
testset = {
Expand All @@ -402,15 +403,40 @@ def test_cleanup(self):
self.assertEqual(conn._conns, {})
self.assertIsNone(conn._cleanup_handle)

def test_cleanup2(self):
testset = {1: [(unittest.mock.Mock(), unittest.mock.Mock(), 300)]}
testset[1][0][1].is_connected.return_value = True

conn = aiohttp.BaseConnector(loop=loop)
loop = unittest.mock.Mock()
loop.time.return_value = 300.1

conn = aiohttp.BaseConnector(loop=loop, keepalive_timeout=10)
conn._conns = testset
conn._cleanup()
self.assertEqual(conn._conns, testset)

self.assertIsNotNone(conn._cleanup_handle)
loop.call_at.assert_called_with(
310, conn._cleanup)
conn.close()

def test_cleanup3(self):
testset = {1: [(unittest.mock.Mock(), unittest.mock.Mock(), 290.1),
(unittest.mock.Mock(), unittest.mock.Mock(), 305.1)]}
testset[1][0][1].is_connected.return_value = True

loop = unittest.mock.Mock()
loop.time.return_value = 308.5

conn = aiohttp.BaseConnector(loop=loop, keepalive_timeout=10)
conn._conns = testset

conn._cleanup()
self.assertEqual(conn._conns, {1: [testset[1][1]]})

self.assertIsNotNone(conn._cleanup_handle)
loop.call_at.assert_called_with(
316, conn._cleanup)
conn.close()

def test_tcp_connector_ctor(self):
Expand Down

0 comments on commit f99348b

Please sign in to comment.