Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aiomoto #766

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added aiobotocore/aiomoto/__init__.py
Empty file.
184 changes: 184 additions & 0 deletions aiobotocore/aiomoto/aiomoto_fixtures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
"""
AWS asyncio test fixtures
"""

import aiobotocore.client
import aiobotocore.config
import pytest

from aiobotocore.aiomoto.aiomoto_services import MotoService
from aiobotocore.aiomoto.utils import AWS_ACCESS_KEY_ID
from aiobotocore.aiomoto.utils import AWS_SECRET_ACCESS_KEY


#
# Asyncio AWS Services
#


@pytest.fixture
async def aio_aws_batch_server():
async with MotoService("batch") as svc:
svc.reset()
yield svc.endpoint_url


@pytest.fixture
async def aio_aws_cloudformation_server():
async with MotoService("cloudformation") as svc:
svc.reset()
yield svc.endpoint_url


@pytest.fixture
async def aio_aws_ec2_server():
async with MotoService("ec2") as svc:
svc.reset()
yield svc.endpoint_url


@pytest.fixture
async def aio_aws_ecs_server():
async with MotoService("ecs") as svc:
svc.reset()
yield svc.endpoint_url


@pytest.fixture
async def aio_aws_iam_server():
async with MotoService("iam") as svc:
yield svc.endpoint_url


@pytest.fixture
async def aio_aws_dynamodb2_server():
async with MotoService("dynamodb2") as svc:
svc.reset()
yield svc.endpoint_url


@pytest.fixture
async def aio_aws_logs_server():
# cloud watch logs
async with MotoService("logs") as svc:
svc.reset()
yield svc.endpoint_url


@pytest.fixture
async def aio_aws_s3_server():
async with MotoService("s3") as svc:
svc.reset()
yield svc.endpoint_url


@pytest.fixture
async def aio_aws_sns_server():
async with MotoService("sns") as svc:
svc.reset()
yield svc.endpoint_url


@pytest.fixture
async def aio_aws_sqs_server():
async with MotoService("sqs") as svc:
svc.reset()
yield svc.endpoint_url


#
# Asyncio AWS Clients
#


@pytest.fixture
def aio_aws_session(aws_credentials, aws_region, event_loop):
# pytest-asyncio provides and manages the `event_loop`

session = aiobotocore.get_session(loop=event_loop)
session.user_agent_name = "aiomoto"

assert session.get_default_client_config() is None
aioconfig = aiobotocore.config.AioConfig(
max_pool_connections=1, region_name=aws_region
)

# Note: tried to use proxies for the aiobotocore.endpoint, to replace
# 'https://batch.us-west-2.amazonaws.com/v1/describejobqueues', but
# the moto.server does not behave as a proxy server. Leaving this
# here for the record to avoid trying to do it again sometime later.
# proxies = {
# 'http': os.getenv("HTTP_PROXY", "http://127.0.0.1:5000/moto-api/"),
# 'https': os.getenv("HTTPS_PROXY", "http://127.0.0.1:5000/moto-api/"),
# }
# assert aioconfig.proxies is None
# aioconfig.proxies = proxies

session.set_default_client_config(aioconfig)
assert session.get_default_client_config() == aioconfig

session.set_credentials(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
session.set_debug_logger(logger_name="aiomoto")

yield session


@pytest.fixture
async def aio_aws_client(aio_aws_session):
async def _get_client(service_name):
async with MotoService(service_name) as srv:
async with aio_aws_session.create_client(
service_name, endpoint_url=srv.endpoint_url
) as client:
yield client

return _get_client


@pytest.fixture
async def aio_aws_batch_client(aio_aws_session, aio_aws_batch_server):
async with aio_aws_session.create_client(
"batch", endpoint_url=aio_aws_batch_server
) as client:
yield client


@pytest.fixture
async def aio_aws_ec2_client(aio_aws_session, aio_aws_ec2_server):
async with aio_aws_session.create_client(
"ec2", endpoint_url=aio_aws_ec2_server
) as client:
yield client


@pytest.fixture
async def aio_aws_ecs_client(aio_aws_session, aio_aws_ecs_server):
async with aio_aws_session.create_client(
"ecs", endpoint_url=aio_aws_ecs_server
) as client:
yield client


@pytest.fixture
async def aio_aws_iam_client(aio_aws_session, aio_aws_iam_server):
async with aio_aws_session.create_client(
"iam", endpoint_url=aio_aws_iam_server
) as client:
client.meta.config.region_name = "aws-global" # not AWS_REGION
yield client


@pytest.fixture
async def aio_aws_logs_client(aio_aws_session, aio_aws_logs_server):
async with aio_aws_session.create_client(
"logs", endpoint_url=aio_aws_logs_server
) as client:
yield client


@pytest.fixture
async def aio_aws_s3_client(aio_aws_session, aio_aws_s3_server):
async with aio_aws_session.create_client(
"s3", endpoint_url=aio_aws_s3_server
) as client:
yield client
145 changes: 145 additions & 0 deletions aiobotocore/aiomoto/aiomoto_services.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import asyncio
import functools
import logging
import socket
import threading
import time
import os

# Third Party
import aiohttp
import moto.backends
import moto.server
import werkzeug.serving


HOST = "127.0.0.1"

_PYCHARM_HOSTED = os.environ.get("PYCHARM_HOSTED") == "1"
CONNECT_TIMEOUT = 90 if _PYCHARM_HOSTED else 10


def get_free_tcp_port(release_socket: bool = False):
sckt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sckt.bind(("", 0))
addr, port = sckt.getsockname()
if release_socket:
sckt.close()
return port

return sckt, port


class MotoService:
""" Will Create MotoService.
Service is ref-counted so there will only be one per process. Real Service will
be returned by `__aenter__`."""

_services = dict() # {name: instance}

def __init__(self, service_name: str, port: int = None):
self._service_name = service_name

if port:
self._socket = None
self._port = port
else:
self._socket, self._port = get_free_tcp_port()

self._thread = None
self._logger = logging.getLogger("MotoService")
self._refcount = None
self._ip_address = HOST
self._server = None

@property
def endpoint_url(self):
return "http://{}:{}".format(self._ip_address, self._port)

def reset(self):
# each service can have multiple regional backends
service_backends = moto.backends.BACKENDS[self._service_name]
for region_name, backend in service_backends.items():
backend.reset()

def __call__(self, func):
async def wrapper(*args, **kwargs):
await self._start()
try:
result = await func(*args, **kwargs)
finally:
await self._stop()
return result

functools.update_wrapper(wrapper, func)
wrapper.__wrapped__ = func
return wrapper

async def __aenter__(self):
svc = self._services.get(self._service_name)
if svc is None:
self._services[self._service_name] = self
self._refcount = 1
await self._start()
return self
else:
svc._refcount += 1
return svc

async def __aexit__(self, exc_type, exc_val, exc_tb):
self._refcount -= 1

if self._socket:
self._socket.close()
self._socket = None

if self._refcount == 0:
del self._services[self._service_name]
await self._stop()

def _server_entry(self):
self._main_app = moto.server.DomainDispatcherApplication(
moto.server.create_backend_app, service=self._service_name
)
self._main_app.debug = True

if self._socket:
self._socket.close() # release right before we use it
self._socket = None

self._server = werkzeug.serving.make_server(
self._ip_address, self._port, self._main_app, True
)
self._server.serve_forever()

async def _start(self):
self._thread = threading.Thread(target=self._server_entry, daemon=True)
self._thread.start()

async with aiohttp.ClientSession() as session:
start = time.time()

while time.time() - start < 10:
if not self._thread.is_alive():
break

try:
# we need to bypass the proxies due to monkeypatches
async with session.get(
self.endpoint_url + "/static", timeout=CONNECT_TIMEOUT
):
pass
break
except (asyncio.TimeoutError, aiohttp.ClientConnectionError):
await asyncio.sleep(0.5)
else:
await self._stop() # pytest.fail doesn't call stop_process
raise Exception(
"Cannot start MotoService: {}".format(self._service_name)
)

async def _stop(self):
if self._server:
self._server.shutdown()

self._thread.join()
Loading