Skip to content

Commit

Permalink
fix AioJSONParser (#873)
Browse files Browse the repository at this point in the history
  • Loading branch information
thehesiod committed Jul 13, 2021
1 parent 4020c1e commit 247b7ac
Show file tree
Hide file tree
Showing 12 changed files with 254 additions and 16 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ jobs:
pipenv check || true
pipenv graph
make flake mototest
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v1.5.2
with:
token: ${{ secrets.CODECOV_TOKEN }} # not required for public repos
files: ./coverage.xml
flags: unittests # optional
name: codecov-umbrella # optional
fail_ci_if_error: true # optional (default = false)
path_to_write_report: ./codecov_report.txt
verbose: true # optional (default = false)


pre-deploy:
name: Pre-Deploy
Expand Down
6 changes: 5 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
Changes
-------
1.3.3 (2021-07-12)
^^^^^^^^^^^^^^^^^^
* fix AioJSONParser #872

1.3.2 (2021-07-07)
^^^^^^^^^^^^^^^^^^
* Bump to botocore 1.20.106

1.3.1 (2021-06-11)
^^^^^^^^^^^^^^^^^^
* TCPConnector: change deprecated ssl_context to ssl
* fix non awaited generate presigned url calls # 868
* fix non awaited generate presigned url calls #868

1.3.0 (2021-04-09)
^^^^^^^^^^^^^^^^^^
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ cov cover coverage: flake
mototest:
docker pull alpine
docker pull lambci/lambda:python3.8
BOTO_CONFIG=/dev/null pipenv run python3 -Wd -X tracemalloc=5 -X faulthandler -m pytest -vv -m moto -n auto --cov-report term --cov-report html --cov=aiobotocore --cov=tests --log-cli-level=DEBUG aiobotocore tests
BOTO_CONFIG=/dev/null pipenv run python3 -Wd -X tracemalloc=5 -X faulthandler -m pytest -vv -m moto -n auto --cov-report term --cov-report html --cov-report xml --cov=aiobotocore --cov=tests --log-cli-level=DEBUG aiobotocore tests
@echo "open file://`pwd`/htmlcov/index.html"


Expand Down
2 changes: 1 addition & 1 deletion aiobotocore/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .session import get_session, AioSession

__all__ = ['get_session', 'AioSession']
__version__ = '1.3.2'
__version__ = '1.3.3'
31 changes: 28 additions & 3 deletions aiobotocore/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,41 @@ async def _do_get_response(self, request, operation_model):

protocol = operation_model.metadata['protocol']
parser = self._response_parser_factory.create_parser(protocol)
parsed_response = parser.parse(
response_dict, operation_model.output_shape)

if asyncio.iscoroutinefunction(parser.parse):
parsed_response = await parser.parse(
response_dict, operation_model.output_shape)
else:
parsed_response = parser.parse(
response_dict, operation_model.output_shape)

if http_response.status_code >= 300:
self._add_modeled_error_fields(
await self._add_modeled_error_fields(
response_dict, parsed_response,
operation_model, parser,
)
history_recorder.record('PARSED_RESPONSE', parsed_response)
return (http_response, parsed_response), None

async def _add_modeled_error_fields(
self, response_dict, parsed_response,
operation_model, parser,
):
error_code = parsed_response.get("Error", {}).get("Code")
if error_code is None:
return
service_model = operation_model.service_model
error_shape = service_model.shape_for_error_code(error_code)
if error_shape is None:
return

if asyncio.iscoroutinefunction(parser.parse):
modeled_parse = await parser.parse(response_dict, error_shape)
else:
modeled_parse = parser.parse(response_dict, error_shape)
# TODO: avoid naming conflicts with ResponseMetadata and Error
parsed_response.update(modeled_parse)

# NOTE: The only line changed here changing time.sleep to asyncio.sleep
async def _needs_retry(self, attempts, operation_model, request_dict,
response=None, caught_exception=None):
Expand Down
14 changes: 13 additions & 1 deletion aiobotocore/eventstream.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from botocore.eventstream import EventStream, EventStreamBuffer
from botocore.eventstream import EventStream, EventStreamBuffer, NoInitialResponseError


class AioEventStream(EventStream):
Expand All @@ -20,3 +20,15 @@ async def __anext__(self):
parsed_event = self._parse_event(event)
if parsed_event:
yield parsed_event

async def get_initial_response(self):
try:
async for event in self._event_generator:
event_type = event.headers.get(':event-type')
if event_type == 'initial-response':
return event

break
except StopIteration:
pass
raise NoInitialResponseError()
64 changes: 63 additions & 1 deletion aiobotocore/parsers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from botocore.parsers import ResponseParserFactory, RestXMLParser, \
RestJSONParser, JSONParser, QueryParser, EC2QueryParser
RestJSONParser, JSONParser, QueryParser, EC2QueryParser, \
NoInitialResponseError, ResponseParserError, LOG, lowercase_dict
from .eventstream import AioEventStream


Expand All @@ -25,11 +26,68 @@ def _create_event_stream(self, response, shape):


class AioJSONParser(JSONParser):
async def _do_parse(self, response, shape):
parsed = {}
if shape is not None:
event_name = shape.event_stream_name
if event_name:
parsed = await self._handle_event_stream(response, shape, event_name)
else:
parsed = self._handle_json_body(response['body'], shape)
self._inject_response_metadata(parsed, response['headers'])
return parsed

def _create_event_stream(self, response, shape):
parser = self._event_stream_parser
name = response['context'].get('operation_name')
return AioEventStream(response['body'], shape, parser, name)

async def _handle_event_stream(self, response, shape, event_name):
event_stream_shape = shape.members[event_name]
event_stream = self._create_event_stream(response, event_stream_shape)
try:
event = await event_stream.get_initial_response()
except NoInitialResponseError:
error_msg = 'First event was not of type initial-response'
raise ResponseParserError(error_msg)
parsed = self._handle_json_body(event.payload, shape)
parsed[event_name] = event_stream
return parsed

# this is actually from ResponseParser however for now JSONParser is the
# only class that needs this async
async def parse(self, response, shape):
LOG.debug('Response headers: %s', response['headers'])
LOG.debug('Response body:\n%s', response['body'])
if response['status_code'] >= 301:
if self._is_generic_error_response(response):
parsed = self._do_generic_error_parse(response)
elif self._is_modeled_error_shape(shape):
parsed = self._do_modeled_error_parse(response, shape)
# We don't want to decorate the modeled fields with metadata
return parsed
else:
parsed = self._do_error_parse(response, shape)
else:
parsed = await self._do_parse(response, shape)

# We don't want to decorate event stream responses with metadata
if shape and shape.serialization.get('eventstream'):
return parsed

# Add ResponseMetadata if it doesn't exist and inject the HTTP
# status code and headers from the response.
if isinstance(parsed, dict):
response_metadata = parsed.get('ResponseMetadata', {})
response_metadata['HTTPStatusCode'] = response['status_code']
# Ensure that the http header keys are all lower cased. Older
# versions of urllib3 (< 1.11) would unintentionally do this for us
# (see urllib3#633). We need to do this conversion manually now.
headers = response['headers']
response_metadata['HTTPHeaders'] = lowercase_dict(headers)
parsed['ResponseMetadata'] = response_metadata
return parsed


class AioRestJSONParser(RestJSONParser):
def _create_event_stream(self, response, shape):
Expand All @@ -51,3 +109,7 @@ class AioResponseParserFactory(ResponseParserFactory):
def create_parser(self, protocol_name):
parser_cls = PROTOCOL_PARSERS[protocol_name]
return parser_cls(**self._defaults)


def create_parser(protocol):
return AioResponseParserFactory().create_parser(protocol)
28 changes: 28 additions & 0 deletions aiobotocore/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import wrapt
from botocore.exceptions import IncompleteReadError, ReadTimeoutError
from aiobotocore import parsers


class AioReadTimeoutError(ReadTimeoutError, asyncio.TimeoutError):
Expand Down Expand Up @@ -109,3 +110,30 @@ def _verify_content_length(self):
raise IncompleteReadError(
actual_bytes=self._self_amount_read,
expected_bytes=int(self._self_content_length))


async def get_response(operation_model, http_response):
protocol = operation_model.metadata['protocol']
response_dict = {
'headers': http_response.headers,
'status_code': http_response.status_code,
}
# TODO: Unfortunately, we have to have error logic here.
# If it looks like an error, in the streaming response case we
# need to actually grab the contents.
if response_dict['status_code'] >= 300:
response_dict['body'] = http_response.content
elif operation_model.has_streaming_output:
response_dict['body'] = StreamingBody(
http_response.raw, response_dict['headers'].get('content-length'))
else:
response_dict['body'] = http_response.content

parser = parsers.create_parser(protocol)
if asyncio.iscoroutinefunction(parser.parse):
parsed = await parser.parse(
response_dict, operation_model.output_shape)
else:
parsed = parser.parse(
response_dict, operation_model.output_shape)
return http_response, parsed
23 changes: 17 additions & 6 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
from contextlib import AsyncExitStack
import random
import string
import aiobotocore
Expand Down Expand Up @@ -232,6 +233,14 @@ async def ec2_client(session, region, config, ec2_server, mocking_test):
yield client


@pytest.fixture
async def kinesis_client(session, region, config, kinesis_server, mocking_test):
kw = moto_config(kinesis_server) if mocking_test else {}
async with session.create_client('kinesis', region_name=region,
config=config, **kw) as client:
yield client


async def recursive_delete(s3_client, bucket_name):
# Recursively deletes a bucket and all of its contents.
paginator = s3_client.get_paginator('list_object_versions')
Expand Down Expand Up @@ -456,14 +465,16 @@ async def sqs_queue_url(sqs_client):
try:
yield queue_url
finally:
await delete_sqs_queue(sqs_client, queue_url)
response = await sqs_client.delete_queue(
QueueUrl=queue_url
)
assert_status_code(response, 200)


async def delete_sqs_queue(sqs_client, queue_url):
response = await sqs_client.delete_queue(
QueueUrl=queue_url
)
assert_status_code(response, 200)
@pytest.fixture
async def exit_stack():
async with AsyncExitStack() as es:
yield es


pytest_plugins = ['mock_server']
6 changes: 6 additions & 0 deletions tests/mock_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,9 @@ async def rds_server():
async def ec2_server():
async with MotoService('ec2') as svc:
yield svc.endpoint_url


@pytest.fixture
async def kinesis_server():
async with MotoService('kinesis') as svc:
yield svc.endpoint_url
71 changes: 71 additions & 0 deletions tests/test_eventstreams.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import asyncio
from contextlib import AsyncExitStack

import pytest
# TODO once Moto supports either S3 Select or Kinesis SubscribeToShard then
# this can be tested against a real AWS API

import botocore.parsers

import aiobotocore.session
from aiobotocore.eventstream import AioEventStream

TEST_STREAM_DATA = (
Expand Down Expand Up @@ -97,3 +102,69 @@ async def test_eventstream_no_iter(s3_client):
with pytest.raises(NotImplementedError):
for _ in event_stream:
pass


@pytest.mark.asyncio
async def test_kinesis_stream_json_parser(exit_stack: AsyncExitStack):
# unfortunately moto doesn't support kinesis register_stream_consumer +
# subscribe_to_shard yet
stream_name = "my_stream"
stream_arn = consumer_arn = None
consumer_name = 'consumer'

session = aiobotocore.session.AioSession()

kinesis_client = await exit_stack.enter_async_context(
session.create_client('kinesis'))
await kinesis_client.create_stream(StreamName=stream_name, ShardCount=1)

while (describe_response := (await kinesis_client.describe_stream(
StreamName=stream_name))) and \
describe_response['StreamDescription']['StreamStatus'] == 'CREATING':
print("Waiting for stream creation")
await asyncio.sleep(1)

shard_id = describe_response["StreamDescription"]["Shards"][0]["ShardId"]
stream_arn = describe_response["StreamDescription"]["StreamARN"]

try:
# Create some data
keys = [str(i) for i in range(1, 5)]
for k in keys:
await kinesis_client.put_record(StreamName=stream_name, Data=k,
PartitionKey=k)

register_response = await kinesis_client.register_stream_consumer(
StreamARN=stream_arn,
ConsumerName=consumer_name
)
consumer_arn = register_response['Consumer']['ConsumerARN']

while (describe_response := (await kinesis_client.describe_stream_consumer(
StreamARN=stream_arn, ConsumerName=consumer_name,
ConsumerARN=consumer_arn))) and \
describe_response['ConsumerDescription'][
'ConsumerStatus'] == 'CREATING':
print("Waiting for stream consumer creation")
await asyncio.sleep(1)

starting_position = {
'Type': 'LATEST'
}
subscribe_response = await kinesis_client.subscribe_to_shard(
ConsumerARN=consumer_arn,
ShardId=shard_id,
StartingPosition=starting_position
)
async for event in subscribe_response['EventStream']:
assert event['SubscribeToShardEvent']['Records'] == []
break
finally:
if consumer_arn:
await kinesis_client.deregister_stream_consumer(
StreamARN=stream_arn,
ConsumerName=consumer_name,
ConsumerARN=consumer_arn
)

await kinesis_client.delete_stream(StreamName=stream_name)
Loading

0 comments on commit 247b7ac

Please sign in to comment.