Skip to content

Commit

Permalink
fix timeout issue (#593)
Browse files Browse the repository at this point in the history
  • Loading branch information
thehesiod authored Jun 4, 2018
1 parent 19bd26d commit a04ddb1
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 70 deletions.
5 changes: 3 additions & 2 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
Changes
-------
0.9.1a0 (2018-XX-XX)
^^^^^^^^^^^^^^^^^^^^
0.9.1 (2018-05-04)
^^^^^^^^^^^^^^^^^^
* fix timeout bug introduced in last release

0.9.0 (2018-06-01)
^^^^^^^^^^^^^^^^^^
Expand Down
2 changes: 1 addition & 1 deletion aiobotocore/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .session import get_session, AioSession

__all__ = ['get_session', 'AioSession']
__version__ = '0.9.1a0'
__version__ = '0.9.1'
2 changes: 1 addition & 1 deletion aiobotocore/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ async def _request(self, method, url, headers, data, verify, stream):
url = URL(url, encoded=True)
resp = await self._aio_session.request(
method, url=url, headers=headers_, data=data, proxy=proxy,
verify_ssl=verify, timeout=None)
verify_ssl=verify)

# If we're not streaming, read the content so we can retry any timeout
# errors, see:
Expand Down
5 changes: 3 additions & 2 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
-e .
coverage==4.5.1
flake8==3.5.0

# we specify flask directly and don't use moto[server] as we want to fix the flask version
Flask==0.12.2

# until https://github.com/spulec/moto/pull/1611 is released
git+git://github.com/thehesiod/moto.git@fix-copy-source-query#egg=moto
# until release with https://github.com/spulec/moto/pull/1611 is available
git+git://github.com/spulec/moto.git@80929292584ee78affc07643d16fae6bb31b4014#egg=moto

pytest-cov==2.5.1
pytest==3.4.1
Expand Down
87 changes: 46 additions & 41 deletions tests/mock_server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import aiohttp
import aiohttp.web
from aiohttp.web import StreamResponse
import pytest
Expand All @@ -8,17 +9,15 @@
import subprocess as sp
import sys
import time
import threading
import socket
from unittest import mock
import multiprocessing


_proxy_bypass = {
"http": None,
"https": None,
}


host = "localhost"


Expand All @@ -30,33 +29,46 @@ def get_free_tcp_port():
return port


class AIOServer(threading.Thread):
# This runs in a subprocess for a variety of reasons
# 1) early versions of python 3.5 did not correctly set one thread per run loop
# 2) aiohttp uses get_event_loop instead of using the passed in run loop
# 3) aiohttp shutdown can be hairy
class AIOServer(multiprocessing.Process):
"""
This is a mock AWS service which will 5 seconds before returning
a response to test socket timeouts.
"""
def __init__(self):
super().__init__(target=self._run)
self._loop = None
self._port = get_free_tcp_port()
self.start()
self.endpoint_url = 'http://{}:{}'.format(host, self._port)
self._shutdown_evt = threading.Event()
self.daemon = True # die when parent dies

def _run(self):
self._loop = asyncio.new_event_loop()
app = aiohttp.web.Application(loop=self._loop)
asyncio.set_event_loop(asyncio.new_event_loop())
app = aiohttp.web.Application()
app.router.add_route('*', '/ok', self.ok)
app.router.add_route('*', '/{anything:.*}', self.stream_handler)

try:
# We need to mock `.get_event_loop` function and return
# `self._loop` explicitly because from `aiohttp>=3.0.0` we can't
# pass `loop` as a kwargs into `run_app`.
with mock.patch('asyncio.get_event_loop', return_value=self._loop):
aiohttp.web.run_app(app, host=host, port=self._port,
handle_signals=False)
aiohttp.web.run_app(app, host=host, port=self._port,
handle_signals=False)
except BaseException:
pytest.fail('unable to start and connect to aiohttp server')
raise
finally:
self._shutdown_evt.set()

async def __aenter__(self):
self.start()
await self._wait_until_up()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
try:
self.terminate()
except BaseException:
pytest.fail("Unable to shut down server")
raise

async def ok(self, request):
return aiohttp.web.Response()
Expand All @@ -73,31 +85,24 @@ async def stream_handler(self, request):
await resp.drain()
return resp

def wait_until_up(self):
connected = False
for i in range(0, 30):
try:
# we need to bypass the proxies due to monkey patches
requests.get(self.endpoint_url + '/ok', timeout=0.5,
proxies=_proxy_bypass)
connected = True
break
except (requests.exceptions.ConnectionError,
requests.exceptions.ReadTimeout):
time.sleep(0.5)
except BaseException:
pytest.fail('unable to start and connect to aiohttp server')
raise

if not connected:
pytest.fail('unable to start and connect to aiohttp server')

async def stop(self):
if self._loop:
self._loop.stop()

if not self._shutdown_evt.wait(20):
pytest.fail("Unable to shut down server")
async def _wait_until_up(self):
async with aiohttp.ClientSession() as session:
for i in range(0, 30):
if self.exitcode is not None:
pytest.fail('unable to start/connect to aiohttp server')
return

try:
# we need to bypass the proxies due to monkey patches
await session.get(self.endpoint_url + '/ok', timeout=0.5)
return
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
await asyncio.sleep(0.5)
except BaseException:
pytest.fail('unable to start/connect to aiohttp server')
raise

pytest.fail('unable to start and connect to aiohttp server')


def start_service(service_name, host, port):
Expand Down
64 changes: 41 additions & 23 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,34 +47,52 @@ def test_connector_args():
@pytest.mark.moto
@pytest.mark.asyncio
async def test_connector_timeout(event_loop):
server = AIOServer()
session = AioSession(loop=event_loop)
config = AioConfig(max_pool_connections=1, connect_timeout=1,
retries={'max_attempts': 0})
async with session.create_client('s3', config=config,
endpoint_url=server.endpoint_url,
aws_secret_access_key='xxx',
aws_access_key_id='xxx') as s3_client:
async with AIOServer() as server, \
session.create_client('s3', config=config,
endpoint_url=server.endpoint_url,
aws_secret_access_key='xxx',
aws_access_key_id='xxx') as s3_client:

try:
server.wait_until_up()

async def get_and_wait():
await s3_client.get_object(Bucket='foo', Key='bar')
await asyncio.sleep(100)
async def get_and_wait():
await s3_client.get_object(Bucket='foo', Key='bar')
await asyncio.sleep(100)

task1 = asyncio.Task(get_and_wait(), loop=event_loop)
task2 = asyncio.Task(get_and_wait(), loop=event_loop)
task1 = asyncio.Task(get_and_wait(), loop=event_loop)
task2 = asyncio.Task(get_and_wait(), loop=event_loop)

try:
done, pending = await asyncio.wait([task1, task2],
timeout=3, loop=event_loop)
try:
done, pending = await asyncio.wait([task1, task2],
timeout=3, loop=event_loop)

# second request should not timeout just because there isn't a
# connector available
assert len(pending) == 2
finally:
task1.cancel()
task2.cancel()
# second request should not timeout just because there isn't a
# connector available
assert len(pending) == 2
finally:
await server.stop()
task1.cancel()
task2.cancel()


# Enable this once https://github.com/aio-libs/aiohttp/issues/3053 is fixed
# @pytest.mark.moto
# @pytest.mark.asyncio
# async def test_connector_timeout2(event_loop):
# session = AioSession(loop=event_loop)
# config = AioConfig(max_pool_connections=1, connect_timeout=1,
# read_timeout=1, retries={'max_attempts': 0})
# async with AIOServer() as server, \
# session.create_client('s3', config=config,
# endpoint_url=server.endpoint_url,
# aws_secret_access_key='xxx',
# aws_access_key_id='xxx') as s3_client:
#
# with pytest.raises(TimeoutError):
# try:
# resp = await s3_client.get_object(Bucket='foo', Key='bar')
# print()
# await resp["Body"].read()
# print()
# except BaseException as e:
# raise

0 comments on commit a04ddb1

Please sign in to comment.