Skip to content

Commit

Permalink
disable retry (#17078)
Browse files Browse the repository at this point in the history
* disable retry

* update

* Updates
  • Loading branch information
xiangyan99 authored Mar 5, 2021
1 parent 1025c0b commit 692c819
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 253 deletions.
3 changes: 2 additions & 1 deletion sdk/core/azure-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@

- Added `azure.core.messaging.CloudEvent` model that follows the cloud event spec.
- Added `azure.core.serialization.NULL` sentinel value
- Improve `repr`s for `HttpRequest` and `HttpResponse`s #16972

### Bug Fixes

- Improve `repr`s for `HttpRequest` and `HttpResponse`s #16972
- Disable retry in stream downloading. (thanks to @jochen-ott-by @hoffmann for the contribution) #16723

## 1.11.0 (2021-02-08)

Expand Down
54 changes: 15 additions & 39 deletions sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@
import asyncio
import aiohttp
from multidict import CIMultiDict

from requests.exceptions import (
ChunkedEncodingError,
StreamConsumedError)
from requests.exceptions import StreamConsumedError

from azure.core.configuration import ConnectionConfiguration
from azure.core.exceptions import ServiceRequestError, ServiceResponseError
Expand Down Expand Up @@ -215,41 +212,20 @@ def __len__(self):
return self.content_length

async def __anext__(self):
retry_active = True
retry_total = 3
retry_interval = 1 # 1 second
while retry_active:
try:
chunk = await self.response.internal_response.content.read(self.block_size)
if not chunk:
raise _ResponseStopIteration()
self.downloaded += self.block_size
return chunk
except _ResponseStopIteration:
self.response.internal_response.close()
raise StopAsyncIteration()
except (ChunkedEncodingError, ConnectionError):
retry_total -= 1
if retry_total <= 0:
retry_active = False
else:
await asyncio.sleep(retry_interval)
headers = {'range': 'bytes=' + str(self.downloaded) + '-'}
resp = await self.pipeline.run(self.request, stream=True, headers=headers)
if resp.http_response.status_code == 416:
raise
chunk = await self.response.internal_response.content.read(self.block_size)
if not chunk:
raise StopAsyncIteration()
self.downloaded += len(chunk)
return chunk
continue
except StreamConsumedError:
raise
except Exception as err:
_LOGGER.warning("Unable to stream download: %s", err)
self.response.internal_response.close()
raise
try:
chunk = await self.response.internal_response.content.read(self.block_size)
if not chunk:
raise _ResponseStopIteration()
return chunk
except _ResponseStopIteration:
self.response.internal_response.close()
raise StopAsyncIteration()
except StreamConsumedError:
raise
except Exception as err:
_LOGGER.warning("Unable to stream download: %s", err)
self.response.internal_response.close()
raise

class AioHttpTransportResponse(AsyncHttpResponse):
"""Methods for accessing response body data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,50 +155,24 @@ def __len__(self):

async def __anext__(self):
loop = _get_running_loop()
retry_active = True
retry_total = 3
retry_interval = 1 # 1 second
while retry_active:
try:
chunk = await loop.run_in_executor(
None,
_iterate_response_content,
self.iter_content_func,
)
if not chunk:
raise _ResponseStopIteration()
self.downloaded += self.block_size
return chunk
except _ResponseStopIteration:
self.response.internal_response.close()
raise StopAsyncIteration()
except (requests.exceptions.ChunkedEncodingError,
requests.exceptions.ConnectionError):
retry_total -= 1
if retry_total <= 0:
retry_active = False
else:
await asyncio.sleep(retry_interval)
headers = {'range': 'bytes=' + str(self.downloaded) + '-'}
resp = self.pipeline.run(self.request, stream=True, headers=headers)
if resp.status_code == 416:
raise
chunk = await loop.run_in_executor(
None,
_iterate_response_content,
self.iter_content_func,
)
if not chunk:
raise StopIteration()
self.downloaded += len(chunk)
return chunk
continue
except requests.exceptions.StreamConsumedError:
raise
except Exception as err:
_LOGGER.warning("Unable to stream download: %s", err)
self.response.internal_response.close()
raise
try:
chunk = await loop.run_in_executor(
None,
_iterate_response_content,
self.iter_content_func,
)
if not chunk:
raise _ResponseStopIteration()
return chunk
except _ResponseStopIteration:
self.response.internal_response.close()
raise StopAsyncIteration()
except requests.exceptions.StreamConsumedError:
raise
except Exception as err:
_LOGGER.warning("Unable to stream download: %s", err)
self.response.internal_response.close()
raise


class AsyncioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportResponse): # type: ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from __future__ import absolute_import
import logging
from typing import Iterator, Optional, Any, Union, TypeVar
import time
import urllib3 # type: ignore
from urllib3.util.retry import Retry # type: ignore
import requests
Expand Down Expand Up @@ -107,7 +106,6 @@ def __init__(self, pipeline, response):
self.block_size = response.block_size
self.iter_content_func = self.response.internal_response.iter_content(self.block_size)
self.content_length = int(response.headers.get('Content-Length', 0))
self.downloaded = 0

def __len__(self):
return self.content_length
Expand All @@ -116,42 +114,20 @@ def __iter__(self):
return self

def __next__(self):
retry_active = True
retry_total = 3
retry_interval = 1 # 1 second
while retry_active:
try:
chunk = next(self.iter_content_func)
if not chunk:
raise StopIteration()
self.downloaded += self.block_size
return chunk
except StopIteration:
self.response.internal_response.close()
try:
chunk = next(self.iter_content_func)
if not chunk:
raise StopIteration()
except (requests.exceptions.ChunkedEncodingError,
requests.exceptions.ConnectionError):
retry_total -= 1
if retry_total <= 0:
retry_active = False
else:
time.sleep(retry_interval)
headers = {'range': 'bytes=' + str(self.downloaded) + '-'}
resp = self.pipeline.run(self.request, stream=True, headers=headers)
if resp.http_response.status_code == 416:
raise
chunk = next(self.iter_content_func)
if not chunk:
raise StopIteration()
self.downloaded += len(chunk)
return chunk
continue
except requests.exceptions.StreamConsumedError:
raise
except Exception as err:
_LOGGER.warning("Unable to stream download: %s", err)
self.response.internal_response.close()
raise
return chunk
except StopIteration:
self.response.internal_response.close()
raise StopIteration()
except requests.exceptions.StreamConsumedError:
raise
except Exception as err:
_LOGGER.warning("Unable to stream download: %s", err)
self.response.internal_response.close()
raise
next = __next__ # Python 2 compatibility.


Expand Down
74 changes: 22 additions & 52 deletions sdk/core/azure-core/azure/core/pipeline/transport/_requests_trio.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,59 +68,29 @@ def __len__(self):
return self.content_length

async def __anext__(self):
retry_active = True
retry_total = 3
while retry_active:
try:
try:
try:
chunk = await trio.to_thread.run_sync(
_iterate_response_content,
self.iter_content_func,
)
except AttributeError: # trio < 0.12.1
chunk = await trio.run_sync_in_worker_thread( # pylint: disable=no-member
_iterate_response_content,
self.iter_content_func,
)
if not chunk:
raise _ResponseStopIteration()
self.downloaded += self.block_size
return chunk
except _ResponseStopIteration:
self.response.internal_response.close()
raise StopAsyncIteration()
except (requests.exceptions.ChunkedEncodingError,
requests.exceptions.ConnectionError):
retry_total -= 1
if retry_total <= 0:
retry_active = False
else:
await trio.sleep(1)
headers = {'range': 'bytes=' + str(self.downloaded) + '-'}
resp = self.pipeline.run(self.request, stream=True, headers=headers)
if resp.status_code == 416:
raise
try:
chunk = await trio.to_thread.run_sync(
_iterate_response_content,
self.iter_content_func,
)
except AttributeError: # trio < 0.12.1
chunk = await trio.run_sync_in_worker_thread( # pylint: disable=no-member
_iterate_response_content,
self.iter_content_func,
)
if not chunk:
raise StopIteration()
self.downloaded += len(chunk)
return chunk
continue
except requests.exceptions.StreamConsumedError:
raise
except Exception as err:
_LOGGER.warning("Unable to stream download: %s", err)
self.response.internal_response.close()
raise
chunk = await trio.to_thread.run_sync(
_iterate_response_content,
self.iter_content_func,
)
except AttributeError: # trio < 0.12.1
chunk = await trio.run_sync_in_worker_thread( # pylint: disable=no-member
_iterate_response_content,
self.iter_content_func,
)
if not chunk:
raise _ResponseStopIteration()
return chunk
except _ResponseStopIteration:
self.response.internal_response.close()
raise StopAsyncIteration()
except requests.exceptions.StreamConsumedError:
raise
except Exception as err:
_LOGGER.warning("Unable to stream download: %s", err)
self.response.internal_response.close()
raise

class TrioRequestsTransportResponse(AsyncHttpResponse, RequestsTransportResponse): # type: ignore
"""Asynchronous streaming of data from the response.
Expand Down
Loading

0 comments on commit 692c819

Please sign in to comment.