From 129cb16e6a26aef65c849811c5c556aefb960942 Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Wed, 3 Mar 2021 17:30:41 -0800 Subject: [PATCH 1/3] disable retry --- .../azure/core/pipeline/transport/_aiohttp.py | 49 ++++-------- .../pipeline/transport/_requests_asyncio.py | 62 +++++----------- .../pipeline/transport/_requests_basic.py | 49 ++++-------- .../core/pipeline/transport/_requests_trio.py | 74 ++++++------------- .../test_stream_generator_async.py | 44 ----------- .../azure-core/tests/test_stream_generator.py | 44 +---------- 6 files changed, 68 insertions(+), 254 deletions(-) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py index 9468ac9d3756..77ee732179b0 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py @@ -215,41 +215,20 @@ def __len__(self): return self.content_length async def __anext__(self): - retry_active = True - retry_total = 3 - retry_interval = 1 # 1 second - while retry_active: - try: - chunk = await self.response.internal_response.content.read(self.block_size) - if not chunk: - raise _ResponseStopIteration() - self.downloaded += self.block_size - return chunk - except _ResponseStopIteration: - self.response.internal_response.close() - raise StopAsyncIteration() - except (ChunkedEncodingError, ConnectionError): - retry_total -= 1 - if retry_total <= 0: - retry_active = False - else: - await asyncio.sleep(retry_interval) - headers = {'range': 'bytes=' + str(self.downloaded) + '-'} - resp = await self.pipeline.run(self.request, stream=True, headers=headers) - if resp.http_response.status_code == 416: - raise - chunk = await self.response.internal_response.content.read(self.block_size) - if not chunk: - raise StopAsyncIteration() - self.downloaded += len(chunk) - return chunk - continue - except StreamConsumedError: - raise - except Exception as err: - _LOGGER.warning("Unable to stream download: %s", err) - self.response.internal_response.close() - raise + try: + chunk = await self.response.internal_response.content.read(self.block_size) + if not chunk: + raise _ResponseStopIteration() + return chunk + except _ResponseStopIteration: + self.response.internal_response.close() + raise StopAsyncIteration() + except StreamConsumedError: + raise + except Exception as err: + _LOGGER.warning("Unable to stream download: %s", err) + self.response.internal_response.close() + raise class AioHttpTransportResponse(AsyncHttpResponse): """Methods for accessing response body data. diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py index 087a373b0732..90c53675f866 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_asyncio.py @@ -155,50 +155,24 @@ def __len__(self): async def __anext__(self): loop = _get_running_loop() - retry_active = True - retry_total = 3 - retry_interval = 1 # 1 second - while retry_active: - try: - chunk = await loop.run_in_executor( - None, - _iterate_response_content, - self.iter_content_func, - ) - if not chunk: - raise _ResponseStopIteration() - self.downloaded += self.block_size - return chunk - except _ResponseStopIteration: - self.response.internal_response.close() - raise StopAsyncIteration() - except (requests.exceptions.ChunkedEncodingError, - requests.exceptions.ConnectionError): - retry_total -= 1 - if retry_total <= 0: - retry_active = False - else: - await asyncio.sleep(retry_interval) - headers = {'range': 'bytes=' + str(self.downloaded) + '-'} - resp = self.pipeline.run(self.request, stream=True, headers=headers) - if resp.status_code == 416: - raise - chunk = await loop.run_in_executor( - None, - _iterate_response_content, - self.iter_content_func, - ) - if not chunk: - raise StopIteration() - self.downloaded += len(chunk) - return chunk - continue - except requests.exceptions.StreamConsumedError: - raise - except Exception as err: - _LOGGER.warning("Unable to stream download: %s", err) - self.response.internal_response.close() - raise + try: + chunk = await loop.run_in_executor( + None, + _iterate_response_content, + self.iter_content_func, + ) + if not chunk: + raise _ResponseStopIteration() + return chunk + except _ResponseStopIteration: + self.response.internal_response.close() + raise StopAsyncIteration() + except requests.exceptions.StreamConsumedError: + raise + except Exception as err: + _LOGGER.warning("Unable to stream download: %s", err) + self.response.internal_response.close() + raise class AsyncioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportResponse): # type: ignore diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py index a6d8582422bd..9972bb259be6 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py @@ -107,7 +107,6 @@ def __init__(self, pipeline, response): self.block_size = response.block_size self.iter_content_func = self.response.internal_response.iter_content(self.block_size) self.content_length = int(response.headers.get('Content-Length', 0)) - self.downloaded = 0 def __len__(self): return self.content_length @@ -116,42 +115,20 @@ def __iter__(self): return self def __next__(self): - retry_active = True - retry_total = 3 - retry_interval = 1 # 1 second - while retry_active: - try: - chunk = next(self.iter_content_func) - if not chunk: - raise StopIteration() - self.downloaded += self.block_size - return chunk - except StopIteration: - self.response.internal_response.close() + try: + chunk = next(self.iter_content_func) + if not chunk: raise StopIteration() - except (requests.exceptions.ChunkedEncodingError, - requests.exceptions.ConnectionError): - retry_total -= 1 - if retry_total <= 0: - retry_active = False - else: - time.sleep(retry_interval) - headers = {'range': 'bytes=' + str(self.downloaded) + '-'} - resp = self.pipeline.run(self.request, stream=True, headers=headers) - if resp.http_response.status_code == 416: - raise - chunk = next(self.iter_content_func) - if not chunk: - raise StopIteration() - self.downloaded += len(chunk) - return chunk - continue - except requests.exceptions.StreamConsumedError: - raise - except Exception as err: - _LOGGER.warning("Unable to stream download: %s", err) - self.response.internal_response.close() - raise + return chunk + except StopIteration: + self.response.internal_response.close() + raise StopIteration() + except requests.exceptions.StreamConsumedError: + raise + except Exception as err: + _LOGGER.warning("Unable to stream download: %s", err) + self.response.internal_response.close() + raise next = __next__ # Python 2 compatibility. diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py index bfc28735df5c..04ddd453bbf5 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py @@ -68,59 +68,29 @@ def __len__(self): return self.content_length async def __anext__(self): - retry_active = True - retry_total = 3 - while retry_active: + try: try: - try: - chunk = await trio.to_thread.run_sync( - _iterate_response_content, - self.iter_content_func, - ) - except AttributeError: # trio < 0.12.1 - chunk = await trio.run_sync_in_worker_thread( # pylint: disable=no-member - _iterate_response_content, - self.iter_content_func, - ) - if not chunk: - raise _ResponseStopIteration() - self.downloaded += self.block_size - return chunk - except _ResponseStopIteration: - self.response.internal_response.close() - raise StopAsyncIteration() - except (requests.exceptions.ChunkedEncodingError, - requests.exceptions.ConnectionError): - retry_total -= 1 - if retry_total <= 0: - retry_active = False - else: - await trio.sleep(1) - headers = {'range': 'bytes=' + str(self.downloaded) + '-'} - resp = self.pipeline.run(self.request, stream=True, headers=headers) - if resp.status_code == 416: - raise - try: - chunk = await trio.to_thread.run_sync( - _iterate_response_content, - self.iter_content_func, - ) - except AttributeError: # trio < 0.12.1 - chunk = await trio.run_sync_in_worker_thread( # pylint: disable=no-member - _iterate_response_content, - self.iter_content_func, - ) - if not chunk: - raise StopIteration() - self.downloaded += len(chunk) - return chunk - continue - except requests.exceptions.StreamConsumedError: - raise - except Exception as err: - _LOGGER.warning("Unable to stream download: %s", err) - self.response.internal_response.close() - raise + chunk = await trio.to_thread.run_sync( + _iterate_response_content, + self.iter_content_func, + ) + except AttributeError: # trio < 0.12.1 + chunk = await trio.run_sync_in_worker_thread( # pylint: disable=no-member + _iterate_response_content, + self.iter_content_func, + ) + if not chunk: + raise _ResponseStopIteration() + return chunk + except _ResponseStopIteration: + self.response.internal_response.close() + raise StopAsyncIteration() + except requests.exceptions.StreamConsumedError: + raise + except Exception as err: + _LOGGER.warning("Unable to stream download: %s", err) + self.response.internal_response.close() + raise class TrioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportResponse): # type: ignore """Asynchronous streaming of data from the response. diff --git a/sdk/core/azure-core/tests/async_tests/test_stream_generator_async.py b/sdk/core/azure-core/tests/async_tests/test_stream_generator_async.py index f7368159615f..358c557ad287 100644 --- a/sdk/core/azure-core/tests/async_tests/test_stream_generator_async.py +++ b/sdk/core/azure-core/tests/async_tests/test_stream_generator_async.py @@ -53,50 +53,6 @@ class AsyncMock(mock.MagicMock): async def __call__(self, *args, **kwargs): return super(AsyncMock, self).__call__(*args, **kwargs) - http_request = HttpRequest('GET', 'http://127.0.0.1/') - pipeline = AsyncPipeline(MockTransport()) - http_response = AsyncHttpResponse(http_request, None) - http_response.internal_response = MockInternalResponse() - stream = AioHttpStreamDownloadGenerator(pipeline, http_response) - with mock.patch('asyncio.sleep', new_callable=AsyncMock): - with pytest.raises(StopAsyncIteration): - await stream.__anext__() - -@pytest.mark.asyncio -async def test_connection_error_416(): - class MockTransport(AsyncHttpTransport): - def __init__(self): - self._count = 0 - - async def __aexit__(self, exc_type, exc_val, exc_tb): - pass - async def close(self): - pass - async def open(self): - pass - - async def send(self, request, **kwargs): - request = HttpRequest('GET', 'http://127.0.0.1/') - response = AsyncHttpResponse(request, None) - response.status_code = 416 - return response - - class MockContent(): - async def read(self, block_size): - raise ConnectionError - - class MockInternalResponse(): - def __init__(self): - self.headers = {} - self.content = MockContent() - - async def close(self): - pass - - class AsyncMock(mock.MagicMock): - async def __call__(self, *args, **kwargs): - return super(AsyncMock, self).__call__(*args, **kwargs) - http_request = HttpRequest('GET', 'http://127.0.0.1/') pipeline = AsyncPipeline(MockTransport()) http_response = AsyncHttpResponse(http_request, None) diff --git a/sdk/core/azure-core/tests/test_stream_generator.py b/sdk/core/azure-core/tests/test_stream_generator.py index 4f40bba885eb..f111d9a73b85 100644 --- a/sdk/core/azure-core/tests/test_stream_generator.py +++ b/sdk/core/azure-core/tests/test_stream_generator.py @@ -49,48 +49,6 @@ def iter_content(self, block_size): def close(self): pass - http_request = HttpRequest('GET', 'http://127.0.0.1/') - pipeline = Pipeline(MockTransport()) - http_response = HttpResponse(http_request, None) - http_response.internal_response = MockInternalResponse() - stream = StreamDownloadGenerator(pipeline, http_response) - with mock.patch('time.sleep', return_value=None): - with pytest.raises(StopIteration): - stream.__next__() - -def test_connection_error_416(): - class MockTransport(HttpTransport): - def __init__(self): - self._count = 0 - - def __exit__(self, exc_type, exc_val, exc_tb): - pass - def close(self): - pass - def open(self): - pass - - def send(self, request, **kwargs): - request = HttpRequest('GET', 'http://127.0.0.1/') - response = HttpResponse(request, None) - response.status_code = 416 - return response - - def next(self): - self.__next__() - - def __next__(self): - if self._count == 0: - self._count += 1 - raise requests.exceptions.ConnectionError - - class MockInternalResponse(): - def iter_content(self, block_size): - return MockTransport() - - def close(self): - pass - http_request = HttpRequest('GET', 'http://127.0.0.1/') pipeline = Pipeline(MockTransport()) http_response = HttpResponse(http_request, None) @@ -98,4 +56,4 @@ def close(self): stream = StreamDownloadGenerator(pipeline, http_response) with mock.patch('time.sleep', return_value=None): with pytest.raises(requests.exceptions.ConnectionError): - stream.__next__() \ No newline at end of file + stream.__next__() From a4962601443264cd7cee27ab9e78f9d8df346baf Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Thu, 4 Mar 2021 09:03:08 -0800 Subject: [PATCH 2/3] update --- .../azure-core/azure/core/pipeline/transport/_aiohttp.py | 5 +---- .../azure/core/pipeline/transport/_requests_basic.py | 1 - 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py index 77ee732179b0..af553bc71b30 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py @@ -30,10 +30,7 @@ import asyncio import aiohttp from multidict import CIMultiDict - -from requests.exceptions import ( - ChunkedEncodingError, - StreamConsumedError) +from requests.exceptions import StreamConsumedError from azure.core.configuration import ConnectionConfiguration from azure.core.exceptions import ServiceRequestError, ServiceResponseError diff --git a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py index 9972bb259be6..ed8a7382c55d 100644 --- a/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py +++ b/sdk/core/azure-core/azure/core/pipeline/transport/_requests_basic.py @@ -26,7 +26,6 @@ from __future__ import absolute_import import logging from typing import Iterator, Optional, Any, Union, TypeVar -import time import urllib3 # type: ignore from urllib3.util.retry import Retry # type: ignore import requests From 360cf17aff3c835ca9d6dd0616b9a45d7bbc7205 Mon Sep 17 00:00:00 2001 From: Xiang Yan Date: Fri, 5 Mar 2021 11:53:26 -0800 Subject: [PATCH 3/3] Updates --- sdk/core/azure-core/CHANGELOG.md | 6 ++- .../test_stream_generator_async.py | 52 ++++++++++++++++++- .../azure-core/tests/test_stream_generator.py | 47 +++++++++++++++++ 3 files changed, 103 insertions(+), 2 deletions(-) diff --git a/sdk/core/azure-core/CHANGELOG.md b/sdk/core/azure-core/CHANGELOG.md index 7a2d0c81144d..499453b1de04 100644 --- a/sdk/core/azure-core/CHANGELOG.md +++ b/sdk/core/azure-core/CHANGELOG.md @@ -5,7 +5,11 @@ ### Features - Added `azure.core.messaging.CloudEvent` model that follows the cloud event spec. -- Added `azure.core.serialization.NULL` sentinel value +- Added `azure.core.serialization.NULL` sentinel value + +### Bug Fixes + +- Disable retry in stream downloading. (thanks to @jochen-ott-by @hoffmann for the contribution) #16723 ## 1.11.0 (2021-02-08) diff --git a/sdk/core/azure-core/tests/async_tests/test_stream_generator_async.py b/sdk/core/azure-core/tests/async_tests/test_stream_generator_async.py index 358c557ad287..0a4d6017e6c4 100644 --- a/sdk/core/azure-core/tests/async_tests/test_stream_generator_async.py +++ b/sdk/core/azure-core/tests/async_tests/test_stream_generator_async.py @@ -2,12 +2,15 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. # ------------------------------------ +import requests from azure.core.pipeline.transport import ( HttpRequest, AsyncHttpResponse, AsyncHttpTransport, + AsyncioRequestsTransportResponse, + AioHttpTransport, ) -from azure.core.pipeline import AsyncPipeline +from azure.core.pipeline import AsyncPipeline, PipelineResponse from azure.core.pipeline.transport._aiohttp import AioHttpStreamDownloadGenerator from unittest import mock import pytest @@ -61,3 +64,50 @@ async def __call__(self, *args, **kwargs): with mock.patch('asyncio.sleep', new_callable=AsyncMock): with pytest.raises(ConnectionError): await stream.__anext__() + +@pytest.mark.asyncio +async def test_response_streaming_error_behavior(): + # Test to reproduce https://github.com/Azure/azure-sdk-for-python/issues/16723 + block_size = 103 + total_response_size = 500 + req_response = requests.Response() + req_request = requests.Request() + + class FakeStreamWithConnectionError: + # fake object for urllib3.response.HTTPResponse + + def stream(self, chunk_size, decode_content=False): + assert chunk_size == block_size + left = total_response_size + while left > 0: + if left <= block_size: + raise requests.exceptions.ConnectionError() + data = b"X" * min(chunk_size, left) + left -= len(data) + yield data + + def close(self): + pass + + req_response.raw = FakeStreamWithConnectionError() + + response = AsyncioRequestsTransportResponse( + req_request, + req_response, + block_size, + ) + + async def mock_run(self, *args, **kwargs): + return PipelineResponse( + None, + requests.Response(), + None, + ) + + transport = AioHttpTransport() + pipeline = AsyncPipeline(transport) + pipeline.run = mock_run + downloader = response.stream_download(pipeline) + with pytest.raises(requests.exceptions.ConnectionError): + while True: + await downloader.__anext__() diff --git a/sdk/core/azure-core/tests/test_stream_generator.py b/sdk/core/azure-core/tests/test_stream_generator.py index f111d9a73b85..8b30b00a4f50 100644 --- a/sdk/core/azure-core/tests/test_stream_generator.py +++ b/sdk/core/azure-core/tests/test_stream_generator.py @@ -7,6 +7,8 @@ HttpRequest, HttpResponse, HttpTransport, + RequestsTransport, + RequestsTransportResponse, ) from azure.core.pipeline import Pipeline, PipelineResponse from azure.core.pipeline.transport._requests_basic import StreamDownloadGenerator @@ -57,3 +59,48 @@ def close(self): with mock.patch('time.sleep', return_value=None): with pytest.raises(requests.exceptions.ConnectionError): stream.__next__() + +def test_response_streaming_error_behavior(): + # Test to reproduce https://github.com/Azure/azure-sdk-for-python/issues/16723 + block_size = 103 + total_response_size = 500 + req_response = requests.Response() + req_request = requests.Request() + + class FakeStreamWithConnectionError: + # fake object for urllib3.response.HTTPResponse + + def stream(self, chunk_size, decode_content=False): + assert chunk_size == block_size + left = total_response_size + while left > 0: + if left <= block_size: + raise requests.exceptions.ConnectionError() + data = b"X" * min(chunk_size, left) + left -= len(data) + yield data + + def close(self): + pass + + req_response.raw = FakeStreamWithConnectionError() + + response = RequestsTransportResponse( + req_request, + req_response, + block_size, + ) + + def mock_run(self, *args, **kwargs): + return PipelineResponse( + None, + requests.Response(), + None, + ) + + transport = RequestsTransport() + pipeline = Pipeline(transport) + pipeline.run = mock_run + downloader = response.stream_download(pipeline) + with pytest.raises(requests.exceptions.ConnectionError): + full_response = b"".join(downloader)