Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datastore: add retry param to page iterator. #8547

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions datastore/google/cloud/datastore/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""

import datetime
import functools
import itertools

from google.protobuf import struct_pb2
Expand Down Expand Up @@ -471,6 +472,12 @@ def _set_protobuf_value(value_pb, val):
setattr(value_pb, attr, val)


def _call_api(fnc_call, retry, *args, **kwargs):
if retry:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If retry is None, then avoide the partial and just return fnc_call(*args, **kwargs).

return retry(functools.partial(fnc_call, *args, **kwargs))()
return fnc_call(*args, **kwargs)


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add explicit unit tests for _call_api, including covering the case when retry is None.

class GeoPoint(object):
"""Simple container for a geo point value.

Expand Down
14 changes: 12 additions & 2 deletions datastore/google/cloud/datastore/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from google.cloud.datastore_v1.proto import query_pb2
from google.cloud.datastore import helpers
from google.cloud.datastore.key import Key
from google.cloud.datastore.retry import DEFAULT_RETRY


_NOT_FINISHED = query_pb2.QueryResultBatch.NOT_FINISHED
Expand Down Expand Up @@ -512,9 +513,12 @@ def _process_query_results(self, response_pb):

return [result.entity for result in response_pb.batch.entity_results]

def _next_page(self):
def _next_page(self, retry=DEFAULT_RETRY):
"""Get the next page in the iterator.

:type retry: :class:`google.api_core.retry.Retry`
:param retry: (Optional) How to retry the RPC.

:rtype: :class:`~google.cloud.iterator.Page`
:returns: The next page in the iterator (or :data:`None` if
there are no pages left).
Expand All @@ -537,7 +541,13 @@ def _next_page(self):
self._query.project, partition_id, read_options, query=query_pb
)
entity_pbs = self._process_query_results(response_pb)
return page_iterator.Page(self, entity_pbs, self.item_to_value)
return helpers._call_api(
page_iterator.Page,
retry,
parent=self,
items=entity_pbs,
item_to_value=self.item_to_value,
)


def _pb_from_query(query):
Expand Down
51 changes: 51 additions & 0 deletions datastore/google/cloud/datastore/retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from google.api_core import exceptions
from google.api_core import retry


_RETRYABLE_REASONS = frozenset(["DEADLINE_EXCEEDED", "UNAVAILABLE"])

_UNSTRUCTURED_RETRYABLE_TYPES = (
exceptions.TooManyRequests,
exceptions.InternalServerError,
)


def _should_retry(exc):
"""Predicate for determining when to retry.

We retry if and only if the 'error status' is 'DEADLINE_EXCEEDED'
or 'UNAVAILABLE'.
"""
if hasattr(exc, "errors"):
# Check for unstructured error returns
return isinstance(exc, _UNSTRUCTURED_RETRYABLE_TYPES)

if hasattr(exc, "error"):
reason = exc.error["status"]
return reason in _RETRYABLE_REASONS
return False


DEFAULT_RETRY = retry.Retry(predicate=_should_retry)
"""The default retry object.

Any method with a ``retry`` parameter will be retried automatically,
with reasonable defaults. To disable retry, pass ``retry=None``.
To modify the default retry behavior, call a ``with_XXX`` method
on ``DEFAULT_RETRY``. For example, to change the deadline to 30 seconds,
pass ``retry=datastore.DEFAULT_RETRY.with_deadline(30)``.
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just copying this file in wholesale from bigquery isn't appropriate: we should be refactoring to share this implementation.

19 changes: 19 additions & 0 deletions datastore/tests/unit/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import unittest
from mock import Mock


class Test__new_value_pb(unittest.TestCase):
Expand Down Expand Up @@ -991,6 +992,24 @@ def test_array_value_meaning_partially_unset(self):
self.assertEqual(result, [meaning1, None])


class Test___call_api(unittest.TestCase):
def _call_fut(self, fnc_call, retry, *args, **kwargs):
from google.cloud.datastore.helpers import _call_api

return _call_api(fnc_call, retry, *args, **kwargs)

def test_call_api(self):
from google.cloud.datastore.retry import DEFAULT_RETRY

retry = DEFAULT_RETRY
fnc_call = Mock()
self._call_fut(fnc_call, retry)
fnc_call1 = Mock()
self._call_fut(fnc_call1, None)
self.assertEqual(fnc_call.call_count, 1)
self.assertEqual(fnc_call1.call_count, 1)


class TestGeoPoint(unittest.TestCase):
@staticmethod
def _get_target_class():
Expand Down
14 changes: 14 additions & 0 deletions datastore/tests/unit/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,20 @@ def test__next_page_no_more(self):
self.assertIsNone(page)
ds_api.run_query.assert_not_called()

def test__next_page_w_error(self):
from google.cloud.datastore.query import Query
from google.cloud.datastore_v1.proto import query_pb2

project = "prujekt"
more_enum = query_pb2.QueryResultBatch.NOT_FINISHED
result = _make_query_response([], b"", more_enum, 0)
ds_api = _make_datastore_api(result)
client = _Client(project, datastore_api=ds_api)
query = Query(client)
iterator = self._make_one(query, client)
iterator._next_page(retry=None)
self.assertEqual(client._datastore_api.call_count, 0)


class Test__item_to_entity(unittest.TestCase):
def _call_fut(self, iterator, entity_pb):
Expand Down
55 changes: 55 additions & 0 deletions datastore/tests/unit/test_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest

import mock


class Test_should_retry(unittest.TestCase):
def _call_fut(self, exc):
from google.cloud.datastore.retry import _should_retry

return _should_retry(exc)

def test_wo_errors_attribute(self):
self.assertFalse(self._call_fut(object()))

def test_w_empty_errors(self):
exc = mock.Mock(errors=[], spec=["errors"])
self.assertFalse(self._call_fut(exc))

def test_w_non_matching_reason(self):
exc = mock.Mock(error={"status": "bogus"}, spec=["error"])
self.assertFalse(self._call_fut(exc))

def test_w_DEADLINE_EXCEEDED(self):
exc = mock.Mock(error={"status": "DEADLINE_EXCEEDED"}, spec=["error"])
self.assertTrue(self._call_fut(exc))

def test_w_UNAVAILABLE(self):
exc = mock.Mock(error={"status": "UNAVAILABLE"}, spec=["error"])
self.assertTrue(self._call_fut(exc))

def test_w_unstructured_too_many_requests(self):
from google.api_core.exceptions import TooManyRequests

exc = TooManyRequests("testing")
self.assertTrue(self._call_fut(exc))

def test_w_unstructured_internal_server_error(self):
from google.api_core.exceptions import InternalServerError

exc = InternalServerError("testing")
self.assertTrue(self._call_fut(exc))