diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 9126b59f..ee015597 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -41,6 +41,17 @@ jobs: pipenv check || true pipenv graph make flake mototest + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v1.5.2 + with: + token: ${{ secrets.CODECOV_TOKEN }} # not required for public repos + files: ./coverage.xml + flags: unittests # optional + name: codecov-umbrella # optional + fail_ci_if_error: true # optional (default = false) + path_to_write_report: ./codecov_report.txt + verbose: true # optional (default = false) + pre-deploy: name: Pre-Deploy diff --git a/CHANGES.rst b/CHANGES.rst index 8d387eba..054f06ed 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,5 +1,9 @@ Changes ------- +1.3.3 (2021-07-12) +^^^^^^^^^^^^^^^^^^ +* fix AioJSONParser #872 + 1.3.2 (2021-07-07) ^^^^^^^^^^^^^^^^^^ * Bump to botocore 1.20.106 @@ -7,7 +11,7 @@ Changes 1.3.1 (2021-06-11) ^^^^^^^^^^^^^^^^^^ * TCPConnector: change deprecated ssl_context to ssl -* fix non awaited generate presigned url calls # 868 +* fix non awaited generate presigned url calls #868 1.3.0 (2021-04-09) ^^^^^^^^^^^^^^^^^^ diff --git a/Makefile b/Makefile index 3a7ca81f..54226fe8 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,7 @@ cov cover coverage: flake mototest: docker pull alpine docker pull lambci/lambda:python3.8 - BOTO_CONFIG=/dev/null pipenv run python3 -Wd -X tracemalloc=5 -X faulthandler -m pytest -vv -m moto -n auto --cov-report term --cov-report html --cov=aiobotocore --cov=tests --log-cli-level=DEBUG aiobotocore tests + BOTO_CONFIG=/dev/null pipenv run python3 -Wd -X tracemalloc=5 -X faulthandler -m pytest -vv -m moto -n auto --cov-report term --cov-report html --cov-report xml --cov=aiobotocore --cov=tests --log-cli-level=DEBUG aiobotocore tests @echo "open file://`pwd`/htmlcov/index.html" diff --git a/aiobotocore/__init__.py b/aiobotocore/__init__.py index c5cd6d87..5919ff40 100644 --- a/aiobotocore/__init__.py +++ b/aiobotocore/__init__.py @@ -1,4 +1,4 @@ from .session import get_session, AioSession __all__ = ['get_session', 'AioSession'] -__version__ = '1.3.2' +__version__ = '1.3.3' diff --git a/aiobotocore/endpoint.py b/aiobotocore/endpoint.py index 0ae52dbd..02b0a4d9 100644 --- a/aiobotocore/endpoint.py +++ b/aiobotocore/endpoint.py @@ -179,16 +179,41 @@ async def _do_get_response(self, request, operation_model): protocol = operation_model.metadata['protocol'] parser = self._response_parser_factory.create_parser(protocol) - parsed_response = parser.parse( - response_dict, operation_model.output_shape) + + if asyncio.iscoroutinefunction(parser.parse): + parsed_response = await parser.parse( + response_dict, operation_model.output_shape) + else: + parsed_response = parser.parse( + response_dict, operation_model.output_shape) + if http_response.status_code >= 300: - self._add_modeled_error_fields( + await self._add_modeled_error_fields( response_dict, parsed_response, operation_model, parser, ) history_recorder.record('PARSED_RESPONSE', parsed_response) return (http_response, parsed_response), None + async def _add_modeled_error_fields( + self, response_dict, parsed_response, + operation_model, parser, + ): + error_code = parsed_response.get("Error", {}).get("Code") + if error_code is None: + return + service_model = operation_model.service_model + error_shape = service_model.shape_for_error_code(error_code) + if error_shape is None: + return + + if asyncio.iscoroutinefunction(parser.parse): + modeled_parse = await parser.parse(response_dict, error_shape) + else: + modeled_parse = parser.parse(response_dict, error_shape) + # TODO: avoid naming conflicts with ResponseMetadata and Error + parsed_response.update(modeled_parse) + # NOTE: The only line changed here changing time.sleep to asyncio.sleep async def _needs_retry(self, attempts, operation_model, request_dict, response=None, caught_exception=None): diff --git a/aiobotocore/eventstream.py b/aiobotocore/eventstream.py index 5a129d21..a58f7cc9 100644 --- a/aiobotocore/eventstream.py +++ b/aiobotocore/eventstream.py @@ -1,4 +1,4 @@ -from botocore.eventstream import EventStream, EventStreamBuffer +from botocore.eventstream import EventStream, EventStreamBuffer, NoInitialResponseError class AioEventStream(EventStream): @@ -20,3 +20,15 @@ async def __anext__(self): parsed_event = self._parse_event(event) if parsed_event: yield parsed_event + + async def get_initial_response(self): + try: + async for event in self._event_generator: + event_type = event.headers.get(':event-type') + if event_type == 'initial-response': + return event + + break + except StopIteration: + pass + raise NoInitialResponseError() diff --git a/aiobotocore/parsers.py b/aiobotocore/parsers.py index e6f3928f..9f4933f6 100644 --- a/aiobotocore/parsers.py +++ b/aiobotocore/parsers.py @@ -1,5 +1,6 @@ from botocore.parsers import ResponseParserFactory, RestXMLParser, \ - RestJSONParser, JSONParser, QueryParser, EC2QueryParser + RestJSONParser, JSONParser, QueryParser, EC2QueryParser, \ + NoInitialResponseError, ResponseParserError, LOG, lowercase_dict from .eventstream import AioEventStream @@ -25,11 +26,68 @@ def _create_event_stream(self, response, shape): class AioJSONParser(JSONParser): + async def _do_parse(self, response, shape): + parsed = {} + if shape is not None: + event_name = shape.event_stream_name + if event_name: + parsed = await self._handle_event_stream(response, shape, event_name) + else: + parsed = self._handle_json_body(response['body'], shape) + self._inject_response_metadata(parsed, response['headers']) + return parsed + def _create_event_stream(self, response, shape): parser = self._event_stream_parser name = response['context'].get('operation_name') return AioEventStream(response['body'], shape, parser, name) + async def _handle_event_stream(self, response, shape, event_name): + event_stream_shape = shape.members[event_name] + event_stream = self._create_event_stream(response, event_stream_shape) + try: + event = await event_stream.get_initial_response() + except NoInitialResponseError: + error_msg = 'First event was not of type initial-response' + raise ResponseParserError(error_msg) + parsed = self._handle_json_body(event.payload, shape) + parsed[event_name] = event_stream + return parsed + + # this is actually from ResponseParser however for now JSONParser is the + # only class that needs this async + async def parse(self, response, shape): + LOG.debug('Response headers: %s', response['headers']) + LOG.debug('Response body:\n%s', response['body']) + if response['status_code'] >= 301: + if self._is_generic_error_response(response): + parsed = self._do_generic_error_parse(response) + elif self._is_modeled_error_shape(shape): + parsed = self._do_modeled_error_parse(response, shape) + # We don't want to decorate the modeled fields with metadata + return parsed + else: + parsed = self._do_error_parse(response, shape) + else: + parsed = await self._do_parse(response, shape) + + # We don't want to decorate event stream responses with metadata + if shape and shape.serialization.get('eventstream'): + return parsed + + # Add ResponseMetadata if it doesn't exist and inject the HTTP + # status code and headers from the response. + if isinstance(parsed, dict): + response_metadata = parsed.get('ResponseMetadata', {}) + response_metadata['HTTPStatusCode'] = response['status_code'] + # Ensure that the http header keys are all lower cased. Older + # versions of urllib3 (< 1.11) would unintentionally do this for us + # (see urllib3#633). We need to do this conversion manually now. + headers = response['headers'] + response_metadata['HTTPHeaders'] = lowercase_dict(headers) + parsed['ResponseMetadata'] = response_metadata + return parsed + class AioRestJSONParser(RestJSONParser): def _create_event_stream(self, response, shape): @@ -51,3 +109,7 @@ class AioResponseParserFactory(ResponseParserFactory): def create_parser(self, protocol_name): parser_cls = PROTOCOL_PARSERS[protocol_name] return parser_cls(**self._defaults) + + +def create_parser(protocol): + return AioResponseParserFactory().create_parser(protocol) diff --git a/aiobotocore/response.py b/aiobotocore/response.py index c901d980..9d3f351b 100644 --- a/aiobotocore/response.py +++ b/aiobotocore/response.py @@ -2,6 +2,7 @@ import wrapt from botocore.exceptions import IncompleteReadError, ReadTimeoutError +from aiobotocore import parsers class AioReadTimeoutError(ReadTimeoutError, asyncio.TimeoutError): @@ -109,3 +110,30 @@ def _verify_content_length(self): raise IncompleteReadError( actual_bytes=self._self_amount_read, expected_bytes=int(self._self_content_length)) + + +async def get_response(operation_model, http_response): + protocol = operation_model.metadata['protocol'] + response_dict = { + 'headers': http_response.headers, + 'status_code': http_response.status_code, + } + # TODO: Unfortunately, we have to have error logic here. + # If it looks like an error, in the streaming response case we + # need to actually grab the contents. + if response_dict['status_code'] >= 300: + response_dict['body'] = http_response.content + elif operation_model.has_streaming_output: + response_dict['body'] = StreamingBody( + http_response.raw, response_dict['headers'].get('content-length')) + else: + response_dict['body'] = http_response.content + + parser = parsers.create_parser(protocol) + if asyncio.iscoroutinefunction(parser.parse): + parsed = await parser.parse( + response_dict, operation_model.output_shape) + else: + parsed = parser.parse( + response_dict, operation_model.output_shape) + return http_response, parsed diff --git a/tests/conftest.py b/tests/conftest.py index 1e76b756..89131474 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ import asyncio +from contextlib import AsyncExitStack import random import string import aiobotocore @@ -232,6 +233,14 @@ async def ec2_client(session, region, config, ec2_server, mocking_test): yield client +@pytest.fixture +async def kinesis_client(session, region, config, kinesis_server, mocking_test): + kw = moto_config(kinesis_server) if mocking_test else {} + async with session.create_client('kinesis', region_name=region, + config=config, **kw) as client: + yield client + + async def recursive_delete(s3_client, bucket_name): # Recursively deletes a bucket and all of its contents. paginator = s3_client.get_paginator('list_object_versions') @@ -456,14 +465,16 @@ async def sqs_queue_url(sqs_client): try: yield queue_url finally: - await delete_sqs_queue(sqs_client, queue_url) + response = await sqs_client.delete_queue( + QueueUrl=queue_url + ) + assert_status_code(response, 200) -async def delete_sqs_queue(sqs_client, queue_url): - response = await sqs_client.delete_queue( - QueueUrl=queue_url - ) - assert_status_code(response, 200) +@pytest.fixture +async def exit_stack(): + async with AsyncExitStack() as es: + yield es pytest_plugins = ['mock_server'] diff --git a/tests/mock_server.py b/tests/mock_server.py index 89f2044c..8a8c4ed3 100644 --- a/tests/mock_server.py +++ b/tests/mock_server.py @@ -152,3 +152,9 @@ async def rds_server(): async def ec2_server(): async with MotoService('ec2') as svc: yield svc.endpoint_url + + +@pytest.fixture +async def kinesis_server(): + async with MotoService('kinesis') as svc: + yield svc.endpoint_url diff --git a/tests/test_eventstreams.py b/tests/test_eventstreams.py index d61bc213..e952c9eb 100644 --- a/tests/test_eventstreams.py +++ b/tests/test_eventstreams.py @@ -1,8 +1,13 @@ +import asyncio +from contextlib import AsyncExitStack + import pytest # TODO once Moto supports either S3 Select or Kinesis SubscribeToShard then # this can be tested against a real AWS API import botocore.parsers + +import aiobotocore.session from aiobotocore.eventstream import AioEventStream TEST_STREAM_DATA = ( @@ -97,3 +102,69 @@ async def test_eventstream_no_iter(s3_client): with pytest.raises(NotImplementedError): for _ in event_stream: pass + + +@pytest.mark.asyncio +async def test_kinesis_stream_json_parser(exit_stack: AsyncExitStack): + # unfortunately moto doesn't support kinesis register_stream_consumer + + # subscribe_to_shard yet + stream_name = "my_stream" + stream_arn = consumer_arn = None + consumer_name = 'consumer' + + session = aiobotocore.session.AioSession() + + kinesis_client = await exit_stack.enter_async_context( + session.create_client('kinesis')) + await kinesis_client.create_stream(StreamName=stream_name, ShardCount=1) + + while (describe_response := (await kinesis_client.describe_stream( + StreamName=stream_name))) and \ + describe_response['StreamDescription']['StreamStatus'] == 'CREATING': + print("Waiting for stream creation") + await asyncio.sleep(1) + + shard_id = describe_response["StreamDescription"]["Shards"][0]["ShardId"] + stream_arn = describe_response["StreamDescription"]["StreamARN"] + + try: + # Create some data + keys = [str(i) for i in range(1, 5)] + for k in keys: + await kinesis_client.put_record(StreamName=stream_name, Data=k, + PartitionKey=k) + + register_response = await kinesis_client.register_stream_consumer( + StreamARN=stream_arn, + ConsumerName=consumer_name + ) + consumer_arn = register_response['Consumer']['ConsumerARN'] + + while (describe_response := (await kinesis_client.describe_stream_consumer( + StreamARN=stream_arn, ConsumerName=consumer_name, + ConsumerARN=consumer_arn))) and \ + describe_response['ConsumerDescription'][ + 'ConsumerStatus'] == 'CREATING': + print("Waiting for stream consumer creation") + await asyncio.sleep(1) + + starting_position = { + 'Type': 'LATEST' + } + subscribe_response = await kinesis_client.subscribe_to_shard( + ConsumerARN=consumer_arn, + ShardId=shard_id, + StartingPosition=starting_position + ) + async for event in subscribe_response['EventStream']: + assert event['SubscribeToShardEvent']['Records'] == [] + break + finally: + if consumer_arn: + await kinesis_client.deregister_stream_consumer( + StreamARN=stream_arn, + ConsumerName=consumer_name, + ConsumerARN=consumer_arn + ) + + await kinesis_client.delete_stream(StreamName=stream_name) diff --git a/tests/test_patches.py b/tests/test_patches.py index 9b6f5cd0..1f6ef7c4 100644 --- a/tests/test_patches.py +++ b/tests/test_patches.py @@ -21,8 +21,9 @@ create_waiter_with_client from botocore.eventstream import EventStream from botocore.parsers import ResponseParserFactory, PROTOCOL_PARSERS, \ - RestXMLParser, EC2QueryParser, QueryParser, JSONParser, RestJSONParser -from botocore.response import StreamingBody + RestXMLParser, EC2QueryParser, QueryParser, JSONParser, RestJSONParser, \ + create_parser +from botocore.response import StreamingBody, get_response from botocore.signers import RequestSigner, add_generate_presigned_url, \ generate_presigned_url, S3PostPresigner, add_generate_presigned_post, \ generate_presigned_post, generate_db_auth_token, add_generate_db_auth_token @@ -200,6 +201,7 @@ Endpoint._do_get_response: {'0bc57fbacf3c49ec5cd243b014d531a38b9b4138'}, Endpoint._needs_retry: {'0f40f52d8c90c6e10b4c9e1c4a5ca00ef2c72850'}, Endpoint._send: {'644c7e5bb88fecaa0b2a204411f8c7e69cc90bf1'}, + Endpoint._add_modeled_error_fields: {'1eefcfacbe9a2c3700c61982e565ce6c4cf1ea3a'}, EndpointCreator.create_endpoint: {'502315533a86991ea5f57c04973ea5c837bf6197'}, @@ -207,6 +209,7 @@ EventStream._create_raw_event_generator: { 'cc101f3ca2bca4f14ccd6b385af900a15f96967b'}, EventStream.__iter__: {'8a9b454943f8ef6e81f5794d641adddd1fdd5248'}, + EventStream.get_initial_response: {'aed648305970c90bb5d1e31f6fe5ff12cf6a2a06'}, # hooks.py HierarchicalEmitter._emit: {'5d9a6b1aea1323667a9310e707a9f0a006f8f6e8'}, @@ -226,10 +229,15 @@ EC2QueryParser._create_event_stream: {'0564ba55383a71cc1ba3e5be7110549d7e9992f5'}, QueryParser._create_event_stream: {'0564ba55383a71cc1ba3e5be7110549d7e9992f5'}, JSONParser._create_event_stream: {'0564ba55383a71cc1ba3e5be7110549d7e9992f5'}, + JSONParser._do_parse: {'9c3d5832e6c55a87630128cc8b9121579ef4a708'}, + JSONParser._handle_event_stream: {'3cf7bb1ecff0d72bafd7e7fd6625595b4060abd6'}, + JSONParser.parse: {'46e9e8ecf2ca3a9cdddbb40825cb58fb246b28f1'}, RestJSONParser._create_event_stream: {'0564ba55383a71cc1ba3e5be7110549d7e9992f5'}, + create_parser: {'37e9f1c3b60de17f477a9b79eae8e1acaa7c89d7'}, # response.py StreamingBody: {'b77bd0903f9013bc47c01f91c6d9bfb8a504d106'}, + get_response: {'f31b478792a5e0502f142daca881b69955e5c11d'}, # session.py Session.__init__: {'ccf156a76beda3425fb54363f3b2718dc0445f6d'},