Skip to content

Commit

Permalink
fix: remove custom retry loop (#948)
Browse files Browse the repository at this point in the history
* fix: remove custom retry loop

* removed async implementation

* removed unneeded imports

---------

Co-authored-by: Kevin Zheng <147537668+gkevinzheng@users.noreply.github.com>
  • Loading branch information
daniel-sanche and gkevinzheng authored Aug 14, 2024
1 parent fabc701 commit 04bb206
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 577 deletions.
92 changes: 8 additions & 84 deletions google/cloud/firestore_v1/async_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@
"""Helpers for applying Google Cloud Firestore changes in a transaction."""


import asyncio
import random
from typing import Any, AsyncGenerator, Callable, Coroutine

from google.api_core import exceptions, gapic_v1
from google.api_core import retry_async as retries

from google.cloud.firestore_v1 import _helpers, async_batch, types
from google.cloud.firestore_v1 import _helpers, async_batch
from google.cloud.firestore_v1.async_document import (
AsyncDocumentReference,
DocumentSnapshot,
Expand All @@ -33,18 +31,12 @@
_CANT_COMMIT,
_CANT_ROLLBACK,
_EXCEED_ATTEMPTS_TEMPLATE,
_INITIAL_SLEEP,
_MAX_SLEEP,
_MULTIPLIER,
_WRITE_READ_ONLY,
MAX_ATTEMPTS,
BaseTransaction,
_BaseTransactional,
)

# Types needed only for Type Hints
from google.cloud.firestore_v1.client import Client


class AsyncTransaction(async_batch.AsyncWriteBatch, BaseTransaction):
"""Accumulate read-and-write operations to be sent in a transaction.
Expand Down Expand Up @@ -140,8 +132,13 @@ async def _commit(self) -> list:
if not self.in_progress:
raise ValueError(_CANT_COMMIT)

commit_response = await _commit_with_retry(
self._client, self._write_pbs, self._id
commit_response = await self._client._firestore_api.commit(
request={
"database": self._client._database_string,
"writes": self._write_pbs,
"transaction": self._id,
},
metadata=self._client._rpc_metadata,
)

self._clean_up()
Expand Down Expand Up @@ -313,76 +310,3 @@ def async_transactional(
the wrapped callable.
"""
return _AsyncTransactional(to_wrap)


# TODO(crwilcox): this was 'coroutine' from pytype merge-pyi...
async def _commit_with_retry(
client: Client, write_pbs: list, transaction_id: bytes
) -> types.CommitResponse:
"""Call ``Commit`` on the GAPIC client with retry / sleep.
Retries the ``Commit`` RPC on Unavailable. Usually this RPC-level
retry is handled by the underlying GAPICd client, but in this case it
doesn't because ``Commit`` is not always idempotent. But here we know it
is "idempotent"-like because it has a transaction ID. We also need to do
our own retry to special-case the ``INVALID_ARGUMENT`` error.
Args:
client (:class:`~google.cloud.firestore_v1.client.Client`):
A client with GAPIC client and configuration details.
write_pbs (List[:class:`google.cloud.proto.firestore.v1.write.Write`, ...]):
A ``Write`` protobuf instance to be committed.
transaction_id (bytes):
ID of an existing transaction that this commit will run in.
Returns:
:class:`google.cloud.firestore_v1.types.CommitResponse`:
The protobuf response from ``Commit``.
Raises:
~google.api_core.exceptions.GoogleAPICallError: If a non-retryable
exception is encountered.
"""
current_sleep = _INITIAL_SLEEP
while True:
try:
return await client._firestore_api.commit(
request={
"database": client._database_string,
"writes": write_pbs,
"transaction": transaction_id,
},
metadata=client._rpc_metadata,
)
except exceptions.ServiceUnavailable:
# Retry
pass

current_sleep = await _sleep(current_sleep)


async def _sleep(
current_sleep: float, max_sleep: float = _MAX_SLEEP, multiplier: float = _MULTIPLIER
) -> float:
"""Sleep and produce a new sleep time.
.. _Exponential Backoff And Jitter: https://www.awsarchitectureblog.com/\
2015/03/backoff.html
Select a duration between zero and ``current_sleep``. It might seem
counterintuitive to have so much jitter, but
`Exponential Backoff And Jitter`_ argues that "full jitter" is
the best strategy.
Args:
current_sleep (float): The current "max" for sleep interval.
max_sleep (Optional[float]): Eventual "max" sleep time
multiplier (Optional[float]): Multiplier for exponential backoff.
Returns:
float: Newly doubled ``current_sleep`` or ``max_sleep`` (whichever
is smaller)
"""
actual_sleep = random.uniform(0.0, current_sleep)
await asyncio.sleep(actual_sleep)
return min(multiplier * current_sleep, max_sleep)
6 changes: 0 additions & 6 deletions google/cloud/firestore_v1/base_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@
_CANT_ROLLBACK: str = _MISSING_ID_TEMPLATE.format("rolled back")
_CANT_COMMIT: str = _MISSING_ID_TEMPLATE.format("committed")
_WRITE_READ_ONLY: str = "Cannot perform write operation in read-only transaction."
_INITIAL_SLEEP: float = 1.0
"""float: Initial "max" for sleep interval. To be used in :func:`_sleep`."""
_MAX_SLEEP: float = 30.0
"""float: Eventual "max" sleep time. To be used in :func:`_sleep`."""
_MULTIPLIER: float = 2.0
"""float: Multiplier for exponential backoff. To be used in :func:`_sleep`."""
_EXCEED_ATTEMPTS_TEMPLATE: str = "Failed to commit transaction in {:d} attempts."
_CANT_RETRY_READ_ONLY: str = "Only read-write transactions can be retried."

Expand Down
87 changes: 8 additions & 79 deletions google/cloud/firestore_v1/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
"""Helpers for applying Google Cloud Firestore changes in a transaction."""


import random
import time
from typing import Any, Callable, Generator

from google.api_core import exceptions, gapic_v1
Expand All @@ -31,17 +29,13 @@
_CANT_COMMIT,
_CANT_ROLLBACK,
_EXCEED_ATTEMPTS_TEMPLATE,
_INITIAL_SLEEP,
_MAX_SLEEP,
_MULTIPLIER,
_WRITE_READ_ONLY,
MAX_ATTEMPTS,
BaseTransaction,
_BaseTransactional,
)
from google.cloud.firestore_v1.document import DocumentReference
from google.cloud.firestore_v1.query import Query
from google.cloud.firestore_v1.types import CommitResponse


class Transaction(batch.WriteBatch, BaseTransaction):
Expand Down Expand Up @@ -138,7 +132,14 @@ def _commit(self) -> list:
if not self.in_progress:
raise ValueError(_CANT_COMMIT)

commit_response = _commit_with_retry(self._client, self._write_pbs, self._id)
commit_response = self._client._firestore_api.commit(
request={
"database": self._client._database_string,
"writes": self._write_pbs,
"transaction": self._id,
},
metadata=self._client._rpc_metadata,
)

self._clean_up()
return list(commit_response.write_results)
Expand Down Expand Up @@ -301,75 +302,3 @@ def transactional(to_wrap: Callable) -> _Transactional:
the wrapped callable.
"""
return _Transactional(to_wrap)


def _commit_with_retry(
client, write_pbs: list, transaction_id: bytes
) -> CommitResponse:
"""Call ``Commit`` on the GAPIC client with retry / sleep.
Retries the ``Commit`` RPC on Unavailable. Usually this RPC-level
retry is handled by the underlying GAPICd client, but in this case it
doesn't because ``Commit`` is not always idempotent. But here we know it
is "idempotent"-like because it has a transaction ID. We also need to do
our own retry to special-case the ``INVALID_ARGUMENT`` error.
Args:
client (:class:`~google.cloud.firestore_v1.client.Client`):
A client with GAPIC client and configuration details.
write_pbs (List[:class:`google.cloud.proto.firestore.v1.write.Write`, ...]):
A ``Write`` protobuf instance to be committed.
transaction_id (bytes):
ID of an existing transaction that this commit will run in.
Returns:
:class:`google.cloud.firestore_v1.types.CommitResponse`:
The protobuf response from ``Commit``.
Raises:
~google.api_core.exceptions.GoogleAPICallError: If a non-retryable
exception is encountered.
"""
current_sleep = _INITIAL_SLEEP
while True:
try:
return client._firestore_api.commit(
request={
"database": client._database_string,
"writes": write_pbs,
"transaction": transaction_id,
},
metadata=client._rpc_metadata,
)
except exceptions.ServiceUnavailable:
# Retry
pass

current_sleep = _sleep(current_sleep)


def _sleep(
current_sleep: float, max_sleep: float = _MAX_SLEEP, multiplier: float = _MULTIPLIER
) -> float:
"""Sleep and produce a new sleep time.
.. _Exponential Backoff And Jitter: https://www.awsarchitectureblog.com/\
2015/03/backoff.html
Select a duration between zero and ``current_sleep``. It might seem
counterintuitive to have so much jitter, but
`Exponential Backoff And Jitter`_ argues that "full jitter" is
the best strategy.
Args:
current_sleep (float): The current "max" for sleep interval.
max_sleep (Optional[float]): Eventual "max" sleep time
multiplier (Optional[float]): Multiplier for exponential backoff.
Returns:
float: Newly doubled ``current_sleep`` or ``max_sleep`` (whichever
is smaller)
"""
actual_sleep = random.uniform(0.0, current_sleep)
time.sleep(actual_sleep)
return min(multiplier * current_sleep, max_sleep)
Loading

0 comments on commit 04bb206

Please sign in to comment.