Skip to content

Commit

Permalink
feat: retry API calls with exponential backoff (#287)
Browse files Browse the repository at this point in the history
Retries errors for idempotent API calls by default. Some API calls are conditionally idempotent (only idempotent if etag, generation, if_generation_match, if_metageneration_match are specified); in those cases, retries are also conditional on the inclusion of that data in the call.
  • Loading branch information
andrewsg authored Oct 16, 2020
1 parent 6f865d9 commit fbe5d9c
Show file tree
Hide file tree
Showing 16 changed files with 521 additions and 13 deletions.
6 changes: 6 additions & 0 deletions google/cloud/storage/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

from six.moves.urllib.parse import urlsplit
from google.cloud.storage.constants import _DEFAULT_TIMEOUT
from google.cloud.storage.retry import DEFAULT_RETRY
from google.cloud.storage.retry import DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED


STORAGE_EMULATOR_ENV_VAR = "STORAGE_EMULATOR_HOST"
Expand Down Expand Up @@ -205,6 +207,7 @@ def reload(
headers=self._encryption_headers(),
_target_object=self,
timeout=timeout,
retry=DEFAULT_RETRY,
)
self._set_properties(api_response)

Expand Down Expand Up @@ -306,6 +309,7 @@ def patch(
query_params=query_params,
_target_object=self,
timeout=timeout,
retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED,
)
self._set_properties(api_response)

Expand Down Expand Up @@ -368,13 +372,15 @@ def update(
if_metageneration_match=if_metageneration_match,
if_metageneration_not_match=if_metageneration_not_match,
)

api_response = client._connection.api_request(
method="PUT",
path=self.path,
data=self._properties,
query_params=query_params,
_target_object=self,
timeout=timeout,
retry=DEFAULT_RETRY_IF_METAGENERATION_SPECIFIED,
)
self._set_properties(api_response)

Expand Down
15 changes: 15 additions & 0 deletions google/cloud/storage/_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

"""Create / interact with Google Cloud Storage connections."""

import functools

from google.cloud import _http

from google.cloud.storage import __version__
Expand Down Expand Up @@ -46,3 +48,16 @@ def __init__(self, client, client_info=None, api_endpoint=DEFAULT_API_ENDPOINT):

API_URL_TEMPLATE = "{api_base_url}/storage/{api_version}{path}"
"""A template for the URL of a particular API call."""

def api_request(self, *args, **kwargs):
retry = kwargs.pop("retry", None)
call = functools.partial(super(Connection, self).api_request, *args, **kwargs)
if retry:
# If this is a ConditionalRetryPolicy, check conditions.
try:
retry = retry.get_retry_policy_if_conditions_met(**kwargs)
except AttributeError: # This is not a ConditionalRetryPolicy.
pass
if retry:
call = retry(call)
return call()
3 changes: 3 additions & 0 deletions google/cloud/storage/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
from google.cloud.storage.constants import NEARLINE_STORAGE_CLASS
from google.cloud.storage.constants import REGIONAL_LEGACY_STORAGE_CLASS
from google.cloud.storage.constants import STANDARD_STORAGE_CLASS
from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED


_API_ACCESS_ENDPOINT = "https://storage.googleapis.com"
Expand Down Expand Up @@ -2856,6 +2857,7 @@ def compose(
data=request,
_target_object=self,
timeout=timeout,
retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED,
)
self._set_properties(api_response)

Expand Down Expand Up @@ -3000,6 +3002,7 @@ def rewrite(
headers=headers,
_target_object=self,
timeout=timeout,
retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED,
)
rewritten = int(api_response["totalBytesRewritten"])
size = int(api_response["objectSize"])
Expand Down
24 changes: 20 additions & 4 deletions google/cloud/storage/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@
from google.cloud.storage.constants import STANDARD_STORAGE_CLASS
from google.cloud.storage.notification import BucketNotification
from google.cloud.storage.notification import NONE_PAYLOAD_FORMAT

from google.cloud.storage.retry import DEFAULT_RETRY
from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED
from google.cloud.storage.retry import DEFAULT_RETRY_IF_ETAG_IN_JSON

_UBLA_BPO_ENABLED_MESSAGE = (
"Pass only one of 'uniform_bucket_level_access_enabled' / "
Expand Down Expand Up @@ -1244,7 +1246,9 @@ def list_blobs(

client = self._require_client(client)
path = self.path + "/o"
api_request = functools.partial(client._connection.api_request, timeout=timeout)
api_request = functools.partial(
client._connection.api_request, timeout=timeout, retry=DEFAULT_RETRY
)
iterator = page_iterator.HTTPIterator(
client=client,
api_request=api_request,
Expand Down Expand Up @@ -1283,7 +1287,9 @@ def list_notifications(self, client=None, timeout=_DEFAULT_TIMEOUT):
"""
client = self._require_client(client)
path = self.path + "/notificationConfigs"
api_request = functools.partial(client._connection.api_request, timeout=timeout)
api_request = functools.partial(
client._connection.api_request, timeout=timeout, retry=DEFAULT_RETRY
)
iterator = page_iterator.HTTPIterator(
client=client,
api_request=api_request,
Expand Down Expand Up @@ -1424,6 +1430,7 @@ def delete(
query_params=query_params,
_target_object=None,
timeout=timeout,
retry=DEFAULT_RETRY,
)

def delete_blob(
Expand Down Expand Up @@ -1521,6 +1528,7 @@ def delete_blob(
query_params=query_params,
_target_object=None,
timeout=timeout,
retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED,
)

def delete_blobs(
Expand Down Expand Up @@ -1795,6 +1803,7 @@ def copy_blob(
query_params=query_params,
_target_object=new_blob,
timeout=timeout,
retry=DEFAULT_RETRY_IF_GENERATION_SPECIFIED,
)

if not preserve_acl:
Expand Down Expand Up @@ -2644,6 +2653,7 @@ def get_iam_policy(
query_params=query_params,
_target_object=None,
timeout=timeout,
retry=DEFAULT_RETRY,
)
return Policy.from_api_repr(info)

Expand Down Expand Up @@ -2689,6 +2699,7 @@ def set_iam_policy(self, policy, client=None, timeout=_DEFAULT_TIMEOUT):
data=resource,
_target_object=None,
timeout=timeout,
retry=DEFAULT_RETRY_IF_ETAG_IN_JSON,
)
return Policy.from_api_repr(info)

Expand Down Expand Up @@ -2727,7 +2738,11 @@ def test_iam_permissions(self, permissions, client=None, timeout=_DEFAULT_TIMEOU

path = "%s/iam/testPermissions" % (self.path,)
resp = client._connection.api_request(
method="GET", path=path, query_params=query_params, timeout=timeout
method="GET",
path=path,
query_params=query_params,
timeout=timeout,
retry=DEFAULT_RETRY,
)
return resp.get("permissions", [])

Expand Down Expand Up @@ -2967,6 +2982,7 @@ def lock_retention_policy(self, client=None, timeout=_DEFAULT_TIMEOUT):
query_params=query_params,
_target_object=self,
timeout=timeout,
retry=DEFAULT_RETRY,
)
self._set_properties(api_response)

Expand Down
18 changes: 14 additions & 4 deletions google/cloud/storage/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from google.cloud.storage.acl import BucketACL
from google.cloud.storage.acl import DefaultObjectACL
from google.cloud.storage.constants import _DEFAULT_TIMEOUT
from google.cloud.storage.retry import DEFAULT_RETRY


_marker = object()
Expand Down Expand Up @@ -255,7 +256,7 @@ def get_service_account_email(self, project=None, timeout=_DEFAULT_TIMEOUT):
project = self.project
path = "/projects/%s/serviceAccount" % (project,)
api_response = self._base_connection.api_request(
method="GET", path=path, timeout=timeout
method="GET", path=path, timeout=timeout, retry=DEFAULT_RETRY,
)
return api_response["email_address"]

Expand Down Expand Up @@ -531,6 +532,7 @@ def create_bucket(
data=properties,
_target_object=bucket,
timeout=timeout,
retry=DEFAULT_RETRY,
)

bucket._set_properties(api_response)
Expand Down Expand Up @@ -777,7 +779,9 @@ def list_buckets(
if fields is not None:
extra_params["fields"] = fields

api_request = functools.partial(self._connection.api_request, timeout=timeout)
api_request = functools.partial(
self._connection.api_request, retry=DEFAULT_RETRY, timeout=timeout
)

return page_iterator.HTTPIterator(
client=self,
Expand Down Expand Up @@ -829,7 +833,11 @@ def create_hmac_key(
qs_params["userProject"] = user_project

api_response = self._connection.api_request(
method="POST", path=path, query_params=qs_params, timeout=timeout
method="POST",
path=path,
query_params=qs_params,
timeout=timeout,
retry=None,
)
metadata = HMACKeyMetadata(self)
metadata._properties = api_response["metadata"]
Expand Down Expand Up @@ -893,7 +901,9 @@ def list_hmac_keys(
if user_project is not None:
extra_params["userProject"] = user_project

api_request = functools.partial(self._connection.api_request, timeout=timeout)
api_request = functools.partial(
self._connection.api_request, timeout=timeout, retry=DEFAULT_RETRY
)

return page_iterator.HTTPIterator(
client=self,
Expand Down
9 changes: 8 additions & 1 deletion google/cloud/storage/hmac_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from google.cloud._helpers import _rfc3339_to_datetime

from google.cloud.storage.constants import _DEFAULT_TIMEOUT
from google.cloud.storage.retry import DEFAULT_RETRY
from google.cloud.storage.retry import DEFAULT_RETRY_IF_ETAG_IN_JSON


class HMACKeyMetadata(object):
Expand Down Expand Up @@ -260,6 +262,7 @@ def update(self, timeout=_DEFAULT_TIMEOUT):
data=payload,
query_params=qs_params,
timeout=timeout,
retry=DEFAULT_RETRY_IF_ETAG_IN_JSON,
)

def delete(self, timeout=_DEFAULT_TIMEOUT):
Expand All @@ -283,5 +286,9 @@ def delete(self, timeout=_DEFAULT_TIMEOUT):
qs_params["userProject"] = self.user_project

self._client._connection.api_request(
method="DELETE", path=self.path, query_params=qs_params, timeout=timeout
method="DELETE",
path=self.path,
query_params=qs_params,
timeout=timeout,
retry=DEFAULT_RETRY,
)
14 changes: 12 additions & 2 deletions google/cloud/storage/notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from google.api_core.exceptions import NotFound

from google.cloud.storage.constants import _DEFAULT_TIMEOUT
from google.cloud.storage.retry import DEFAULT_RETRY


OBJECT_FINALIZE_EVENT_TYPE = "OBJECT_FINALIZE"
Expand Down Expand Up @@ -271,6 +272,7 @@ def create(self, client=None, timeout=_DEFAULT_TIMEOUT):
query_params=query_params,
data=properties,
timeout=timeout,
retry=None,
)

def exists(self, client=None, timeout=_DEFAULT_TIMEOUT):
Expand Down Expand Up @@ -347,7 +349,11 @@ def reload(self, client=None, timeout=_DEFAULT_TIMEOUT):
query_params["userProject"] = self.bucket.user_project

response = client._connection.api_request(
method="GET", path=self.path, query_params=query_params, timeout=timeout
method="GET",
path=self.path,
query_params=query_params,
timeout=timeout,
retry=DEFAULT_RETRY,
)
self._set_properties(response)

Expand Down Expand Up @@ -385,7 +391,11 @@ def delete(self, client=None, timeout=_DEFAULT_TIMEOUT):
query_params["userProject"] = self.bucket.user_project

client._connection.api_request(
method="DELETE", path=self.path, query_params=query_params, timeout=timeout
method="DELETE",
path=self.path,
query_params=query_params,
timeout=timeout,
retry=DEFAULT_RETRY,
)


Expand Down
Loading

0 comments on commit fbe5d9c

Please sign in to comment.