Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tables] Fix for async retry policy #17810

Merged
merged 6 commits into from
Apr 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/tables/azure-data-tables/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 12.0.0b6 (2021-04-06)
* Updated deserialization of datetime fields in entities to support preservation of the service format with additional decimal place.
* Passing a string parameter into a query filter will now be escaped to protect against injection.
* Fixed bug in incrementing retries in async retry policy

## 12.0.0b5 (2021-03-09)
* This version and all future versions will require Python 2.7 or Python 3.6+, Python 3.5 is no longer supported.
Expand Down
151 changes: 74 additions & 77 deletions sdk/tables/azure-data-tables/azure/data/tables/_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,78 @@ def is_retry(response, mode):
return True
return False

def set_next_host_location(settings, request):
"""
A function which sets the next host location on the request, if applicable.
:param ~azure.storage.models.RetryContext context:
The retry context containing the previous host location and the request
to evaluate and possibly modify.
"""
if settings["hosts"] and all(settings["hosts"].values()):
url = urlparse(request.url)
# If there's more than one possible location, retry to the alternative
if settings["mode"] == LocationMode.PRIMARY:
settings["mode"] = LocationMode.SECONDARY
else:
settings["mode"] = LocationMode.PRIMARY
updated = url._replace(netloc=settings["hosts"].get(settings["mode"]))
request.url = updated.geturl()

def increment(settings, request, response=None, error=None):
"""Increment the retry counters.
:param Any request:
:param dict settings:
:param Any response: A pipeline response object.
:param Any error: An error encountered during the request, or
None if the response was received successfully.
:keyword callable cls: A custom type or function that will be passed the direct response
:return: Whether the retry attempts are exhausted.
:rtype: None
"""
settings["total"] -= 1

if error and isinstance(error, ServiceRequestError):
# Errors when we're fairly sure that the server did not receive the
# request, so it should be safe to retry.
settings["connect"] -= 1
settings["history"].append(RequestHistory(request, error=error))

elif error and isinstance(error, ServiceResponseError):
# Errors that occur after the request has been started, so we should
# assume that the server began processing it.
settings["read"] -= 1
settings["history"].append(RequestHistory(request, error=error))

else:
# Incrementing because of a server error like a 500 in
# status_forcelist and a the given method is in the whitelist
if response:
settings["status"] -= 1
settings["history"].append(
RequestHistory(request, http_response=response)
)

if not is_exhausted(settings):
if request.method not in ["PUT"] and settings["retry_secondary"]:
set_next_host_location(settings, request)

# rewind the request body if it is a stream
if request.body and hasattr(request.body, "read"):
# no position was saved, then retry would not work
if settings["body_position"] is None:
return False
try:
# attempt to rewind the body to the initial position
request.body.seek(settings["body_position"], SEEK_SET)
except (UnsupportedOperation, ValueError):
# if body is not seekable, then retry would not work
return False
settings["count"] += 1
return True
return False


def urljoin(base_url, stub_url):
parsed = urlparse(base_url)
Expand Down Expand Up @@ -464,24 +536,6 @@ def get_backoff_time(self, settings):
random_range_end = backoff + self.random_jitter_range
return random_generator.uniform(random_range_start, random_range_end)

def _set_next_host_location(self, settings, request): # pylint: disable=no-self-use
"""
A function which sets the next host location on the request, if applicable.
:param ~azure.storage.models.RetryContext context:
The retry context containing the previous host location and the request
to evaluate and possibly modify.
"""
if settings["hosts"] and all(settings["hosts"].values()):
url = urlparse(request.url)
# If there's more than one possible location, retry to the alternative
if settings["mode"] == LocationMode.PRIMARY:
settings["mode"] = LocationMode.SECONDARY
else:
settings["mode"] = LocationMode.PRIMARY
updated = url._replace(netloc=settings["hosts"].get(settings["mode"]))
request.url = updated.geturl()

def configure_retries(
self, request
): # pylint: disable=no-self-use, arguments-differ
Expand Down Expand Up @@ -530,63 +584,6 @@ def sleep(self, settings, transport): # pylint: disable=arguments-differ
return
transport.sleep(backoff)

def increment(
self, settings, request, response=None, error=None, **kwargs
): # pylint: disable=unused-argument, arguments-differ
# type: (...)->None
"""Increment the retry counters.
:param Any request:
:param dict settings:
:param Any response: A pipeline response object.
:param Any error: An error encountered during the request, or
None if the response was received successfully.
:keyword callable cls: A custom type or function that will be passed the direct response
:return: Whether the retry attempts are exhausted.
:rtype: None
"""
settings["total"] -= 1

if error and isinstance(error, ServiceRequestError):
# Errors when we're fairly sure that the server did not receive the
# request, so it should be safe to retry.
settings["connect"] -= 1
settings["history"].append(RequestHistory(request, error=error))

elif error and isinstance(error, ServiceResponseError):
# Errors that occur after the request has been started, so we should
# assume that the server began processing it.
settings["read"] -= 1
settings["history"].append(RequestHistory(request, error=error))

else:
# Incrementing because of a server error like a 500 in
# status_forcelist and a the given method is in the whitelist
if response:
settings["status"] -= 1
settings["history"].append(
RequestHistory(request, http_response=response)
)

if not is_exhausted(settings):
if request.method not in ["PUT"] and settings["retry_secondary"]:
self._set_next_host_location(settings, request)

# rewind the request body if it is a stream
if request.body and hasattr(request.body, "read"):
# no position was saved, then retry would not work
if settings["body_position"] is None:
return False
try:
# attempt to rewind the body to the initial position
request.body.seek(settings["body_position"], SEEK_SET)
except (UnsupportedOperation, ValueError):
# if body is not seekable, then retry would not work
return False
settings["count"] += 1
return True
return False

def send(self, request):
"""
:param Any request:
Expand All @@ -599,7 +596,7 @@ def send(self, request):
try:
response = self.next.send(request)
if is_retry(response, retry_settings["mode"]):
retries_remaining = self.increment(
retries_remaining = increment(
retry_settings,
request=request.http_request,
response=response.http_response,
Expand All @@ -615,7 +612,7 @@ def send(self, request):
continue
break
except AzureError as err:
retries_remaining = self.increment(
retries_remaining = increment(
retry_settings, request=request.http_request, error=err
)
if retries_remaining:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from azure.core.pipeline.policies import AsyncHTTPPolicy, AsyncRetryPolicy
from azure.core.exceptions import AzureError

from .._policies import is_retry, TablesRetryPolicy
from .._policies import is_retry, increment, TablesRetryPolicy

if TYPE_CHECKING:
from azure.core.pipeline import PipelineRequest, PipelineResponse
Expand Down Expand Up @@ -168,8 +168,10 @@ async def send(self, request):
try:
response = await self.next.send(request)
if is_retry(response, retry_settings["mode"]):
retries_remaining = self.increment(
retry_settings, response=response.http_response
retries_remaining = increment(
retry_settings,
request=request.http_request,
response=response.http_response
)
if retries_remaining:
await retry_hook(
Expand All @@ -182,7 +184,8 @@ async def send(self, request):
continue
break
except AzureError as err:
retries_remaining = self.increment(retry_settings, error=err)
retries_remaining = increment(
retry_settings, request=request.http_request, error=err)
if retries_remaining:
await retry_hook(
retry_settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,12 @@ async def create_table(
:dedent: 8
:caption: Creating a table from the TableClient object.
"""
table_properties = TableProperties(table_name=self.table_name, **kwargs)
table_properties = TableProperties(table_name=self.table_name)
try:
metadata, _ = await self._client.table.create(
table_properties,
cls=kwargs.pop("cls", _return_headers_and_deserialized),
**kwargs
)
return _trim_service_metadata(metadata)
except HttpResponseError as error:
Expand Down
27 changes: 27 additions & 0 deletions sdk/tables/azure-data-tables/tests/_shared/testcase.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,30 @@ def generate_sas_token(self):

def generate_fake_token(self):
return FakeTokenCredential()


class ResponseCallback(object):
def __init__(self, status=None, new_status=None):
self.status = status
self.new_status = new_status
self.first = True
self.count = 0

def override_first_status(self, response):
if self.first and response.http_response.status_code == self.status:
response.http_response.status_code = self.new_status
self.first = False
self.count += 1

def override_status(self, response):
if response.http_response.status_code == self.status:
response.http_response.status_code = self.new_status
self.count += 1


class RetryCounter(object):
def __init__(self):
self.count = 0

def simple_count(self, retry_context):
self.count += 1
Loading