Skip to content

Commit

Permalink
feat(api_core): add retry param into PollingFuture() and it's inheritors
Browse files Browse the repository at this point in the history
Towards #6197
  • Loading branch information
IlyaFaer committed Dec 3, 2019
1 parent 38d8f2b commit 8896c5b
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 7 deletions.
5 changes: 4 additions & 1 deletion api_core/google/api_core/future/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,12 @@ def __init__(self, retry=DEFAULT_RETRY):
self._done_callbacks = []

@abc.abstractmethod
def done(self):
def done(self, retry=DEFAULT_RETRY):
"""Checks to see if the operation is complete.
Args:
retry (google.api_core.retry.Retry): (Optional) How to retry the RPC.
Returns:
bool: True if the operation is complete, False otherwise.
"""
Expand Down
17 changes: 12 additions & 5 deletions api_core/google/api_core/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,21 +145,28 @@ def _set_result_from_operation(self):
)
self.set_exception(exception)

def _refresh_and_update(self):
"""Refresh the operation and update the result if needed."""
def _refresh_and_update(self, retry=polling.DEFAULT_RETRY):
"""Refresh the operation and update the result if needed.
Args:
retry (google.api_core.retry.Retry): (Optional) How to retry the RPC.
"""
# If the currently cached operation is done, no need to make another
# RPC as it will not change once done.
if not self._operation.done:
self._operation = self._refresh()
self._operation = self._refresh(retry=retry)
self._set_result_from_operation()

def done(self):
def done(self, retry=polling.DEFAULT_RETRY):
"""Checks to see if the operation is complete.
Args:
retry (google.api_core.retry.Retry): (Optional) How to retry the RPC.
Returns:
bool: True if the operation is complete, False otherwise.
"""
self._refresh_and_update()
self._refresh_and_update(retry)
return self._operation.done

def cancel(self):
Expand Down
20 changes: 20 additions & 0 deletions api_core/tests/unit/test_operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

from google.api_core import operation
from google.api_core import operations_v1
from google.api_core import retry
from google.api_core import exceptions
from google.api_core.future import polling
from google.longrunning import operations_pb2
from google.protobuf import struct_pb2
from google.rpc import code_pb2
Expand Down Expand Up @@ -113,6 +116,23 @@ def test_result():
assert future.done()


def test_done_w_retry():
RETRY_PREDICATE = retry.if_exception_type(exceptions.TooManyRequests)
test_retry = retry.Retry(predicate=RETRY_PREDICATE)

expected_result = struct_pb2.Struct()
responses = [
make_operation_proto(),
# Second operation response includes the result.
make_operation_proto(done=True, response=expected_result),
]
future, _, _ = make_operation_future(responses)
future._refresh = mock.Mock()

future.done(retry=test_retry)
future._refresh.assert_called_once_with(retry=test_retry)


def test_exception():
expected_exception = status_pb2.Status(message="meep")
responses = [
Expand Down
1 change: 0 additions & 1 deletion bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,6 @@ def result(self, timeout=None, retry=DEFAULT_RETRY):
"""
if self.state is None:
self._begin(retry=retry)
# TODO: modify PollingFuture so it can pass a retry argument to done().
return super(_AsyncJob, self).result(timeout=timeout)

def cancelled(self):
Expand Down

0 comments on commit 8896c5b

Please sign in to comment.