From 225fa0dc59b98efbb120047479cb61ae26e9df3c Mon Sep 17 00:00:00 2001 From: paras Date: Thu, 6 Jun 2019 14:47:23 +0530 Subject: [PATCH 1/4] retry param added in page iterator --- datastore/google/cloud/datastore/helpers.py | 9 +++ datastore/google/cloud/datastore/query.py | 16 ++++- datastore/google/cloud/datastore/retry.py | 55 ++++++++++++++++ datastore/tests/unit/test_query.py | 14 +++++ datastore/tests/unit/test_retry.py | 69 +++++++++++++++++++++ 5 files changed, 161 insertions(+), 2 deletions(-) create mode 100644 datastore/google/cloud/datastore/retry.py create mode 100644 datastore/tests/unit/test_retry.py diff --git a/datastore/google/cloud/datastore/helpers.py b/datastore/google/cloud/datastore/helpers.py index db6f150eff8b..3b8676600862 100644 --- a/datastore/google/cloud/datastore/helpers.py +++ b/datastore/google/cloud/datastore/helpers.py @@ -18,6 +18,7 @@ """ import datetime +import functools import itertools from google.protobuf import struct_pb2 @@ -471,6 +472,14 @@ def _set_protobuf_value(value_pb, val): setattr(value_pb, attr, val) +def _call_api(fnc_call, retry, *args, **kwargs): + + call = functools.partial(fnc_call, *args, **kwargs) + if retry: + call = retry(call) + return call() + + class GeoPoint(object): """Simple container for a geo point value. diff --git a/datastore/google/cloud/datastore/query.py b/datastore/google/cloud/datastore/query.py index f7979d12be70..ac7a55477772 100644 --- a/datastore/google/cloud/datastore/query.py +++ b/datastore/google/cloud/datastore/query.py @@ -15,6 +15,7 @@ """Create / interact with Google Cloud Datastore queries.""" import base64 +import functools from google.api_core import page_iterator from google.cloud._helpers import _ensure_tuple_or_list @@ -23,6 +24,7 @@ from google.cloud.datastore_v1.proto import query_pb2 from google.cloud.datastore import helpers from google.cloud.datastore.key import Key +from google.cloud.datastore.retry import DEFAULT_RETRY _NOT_FINISHED = query_pb2.QueryResultBatch.NOT_FINISHED @@ -512,9 +514,12 @@ def _process_query_results(self, response_pb): return [result.entity for result in response_pb.batch.entity_results] - def _next_page(self): + def _next_page(self, retry=DEFAULT_RETRY): """Get the next page in the iterator. + :type retry: :class:`google.api_core.retry.Retry` + :param retry: (Optional) How to retry the RPC. + :rtype: :class:`~google.cloud.iterator.Page` :returns: The next page in the iterator (or :data:`None` if there are no pages left). @@ -537,7 +542,14 @@ def _next_page(self): self._query.project, partition_id, read_options, query=query_pb ) entity_pbs = self._process_query_results(response_pb) - return page_iterator.Page(self, entity_pbs, self.item_to_value) + # return page_iterator.Page(self, entity_pbs, self.item_to_value) + return helpers._call_api( + page_iterator.Page, + retry, + parent=self, + items=entity_pbs, + item_to_value=self.item_to_value, + ) def _pb_from_query(query): diff --git a/datastore/google/cloud/datastore/retry.py b/datastore/google/cloud/datastore/retry.py new file mode 100644 index 000000000000..4bc4b757f45d --- /dev/null +++ b/datastore/google/cloud/datastore/retry.py @@ -0,0 +1,55 @@ +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.api_core import exceptions +from google.api_core import retry + + +_RETRYABLE_REASONS = frozenset( + ["rateLimitExceeded", "backendError", "internalError", "badGateway"] +) + +_UNSTRUCTURED_RETRYABLE_TYPES = ( + exceptions.TooManyRequests, + exceptions.InternalServerError, + exceptions.BadGateway, +) + + +def _should_retry(exc): + """Predicate for determining when to retry. + + We retry if and only if the 'reason' is 'backendError' + or 'rateLimitExceeded'. + """ + if not hasattr(exc, "errors"): + return False + + if len(exc.errors) == 0: + # Check for unstructured error returns, e.g. from GFE + return isinstance(exc, _UNSTRUCTURED_RETRYABLE_TYPES) + + reason = exc.errors[0]["reason"] + return reason in _RETRYABLE_REASONS + + +DEFAULT_RETRY = retry.Retry(predicate=_should_retry) +"""The default retry object. + +Any method with a ``retry`` parameter will be retried automatically, +with reasonable defaults. To disable retry, pass ``retry=None``. +To modify the default retry behavior, call a ``with_XXX`` method +on ``DEFAULT_RETRY``. For example, to change the deadline to 30 seconds, +pass ``retry=bigquery.DEFAULT_RETRY.with_deadline(30)``. +""" diff --git a/datastore/tests/unit/test_query.py b/datastore/tests/unit/test_query.py index 01a005f4eb78..45b75f194d3b 100644 --- a/datastore/tests/unit/test_query.py +++ b/datastore/tests/unit/test_query.py @@ -559,6 +559,20 @@ def test__next_page_no_more(self): self.assertIsNone(page) ds_api.run_query.assert_not_called() + def test__next_page_w_error(self): + from google.cloud.datastore.query import Query + from google.cloud.datastore_v1.proto import query_pb2 + + project = "prujekt" + more_enum = query_pb2.QueryResultBatch.NOT_FINISHED + result = _make_query_response([], b"", more_enum, 0) + ds_api = _make_datastore_api(result) + client = _Client(project, datastore_api=ds_api) + query = Query(client) + iterator = self._make_one(query, client) + iterator._next_page(retry=None) + self.assertEqual(client._datastore_api.call_count, 0) + class Test__item_to_entity(unittest.TestCase): def _call_fut(self, iterator, entity_pb): diff --git a/datastore/tests/unit/test_retry.py b/datastore/tests/unit/test_retry.py new file mode 100644 index 000000000000..737b4730adf6 --- /dev/null +++ b/datastore/tests/unit/test_retry.py @@ -0,0 +1,69 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import mock + + +class Test_should_retry(unittest.TestCase): + def _call_fut(self, exc): + from google.cloud.datastore.retry import _should_retry + + return _should_retry(exc) + + def test_wo_errors_attribute(self): + self.assertFalse(self._call_fut(object())) + + def test_w_empty_errors(self): + exc = mock.Mock(errors=[], spec=["errors"]) + self.assertFalse(self._call_fut(exc)) + + def test_w_non_matching_reason(self): + exc = mock.Mock(errors=[{"reason": "bogus"}], spec=["errors"]) + self.assertFalse(self._call_fut(exc)) + + def test_w_backendError(self): + exc = mock.Mock(errors=[{"reason": "backendError"}], spec=["errors"]) + self.assertTrue(self._call_fut(exc)) + + def test_w_rateLimitExceeded(self): + exc = mock.Mock(errors=[{"reason": "rateLimitExceeded"}], spec=["errors"]) + self.assertTrue(self._call_fut(exc)) + + def test_w_unstructured_too_many_requests(self): + from google.api_core.exceptions import TooManyRequests + + exc = TooManyRequests("testing") + self.assertTrue(self._call_fut(exc)) + + def test_w_internalError(self): + exc = mock.Mock(errors=[{"reason": "internalError"}], spec=["errors"]) + self.assertTrue(self._call_fut(exc)) + + def test_w_unstructured_internal_server_error(self): + from google.api_core.exceptions import InternalServerError + + exc = InternalServerError("testing") + self.assertTrue(self._call_fut(exc)) + + def test_w_badGateway(self): + exc = mock.Mock(errors=[{"reason": "badGateway"}], spec=["errors"]) + self.assertTrue(self._call_fut(exc)) + + def test_w_unstructured_bad_gateway(self): + from google.api_core.exceptions import BadGateway + + exc = BadGateway("testing") + self.assertTrue(self._call_fut(exc)) From 095d12c7fed10bd9b1133bbcd3f26f15f6331eb3 Mon Sep 17 00:00:00 2001 From: paras Date: Mon, 1 Jul 2019 21:15:04 +0530 Subject: [PATCH 2/4] extra insertion correction --- datastore/google/cloud/datastore/query.py | 1 - 1 file changed, 1 deletion(-) diff --git a/datastore/google/cloud/datastore/query.py b/datastore/google/cloud/datastore/query.py index ac7a55477772..5641f9ab8c56 100644 --- a/datastore/google/cloud/datastore/query.py +++ b/datastore/google/cloud/datastore/query.py @@ -15,7 +15,6 @@ """Create / interact with Google Cloud Datastore queries.""" import base64 -import functools from google.api_core import page_iterator from google.cloud._helpers import _ensure_tuple_or_list From e7cbd0a1886749e76b11122920e5b46db56322cf Mon Sep 17 00:00:00 2001 From: paras Date: Wed, 10 Jul 2019 17:00:04 +0530 Subject: [PATCH 3/4] test case added for retry calling, correction in retry --- datastore/google/cloud/datastore/helpers.py | 6 ++---- datastore/google/cloud/datastore/query.py | 1 - datastore/google/cloud/datastore/retry.py | 4 ++-- datastore/tests/unit/test_helpers.py | 19 ++++++++++++++++++- 4 files changed, 22 insertions(+), 8 deletions(-) diff --git a/datastore/google/cloud/datastore/helpers.py b/datastore/google/cloud/datastore/helpers.py index 3b8676600862..467ca6d4e51a 100644 --- a/datastore/google/cloud/datastore/helpers.py +++ b/datastore/google/cloud/datastore/helpers.py @@ -473,11 +473,9 @@ def _set_protobuf_value(value_pb, val): def _call_api(fnc_call, retry, *args, **kwargs): - - call = functools.partial(fnc_call, *args, **kwargs) if retry: - call = retry(call) - return call() + return retry(functools.partial(fnc_call, *args, **kwargs))() + return fnc_call(*args, **kwargs) class GeoPoint(object): diff --git a/datastore/google/cloud/datastore/query.py b/datastore/google/cloud/datastore/query.py index 5641f9ab8c56..a216b5758dd7 100644 --- a/datastore/google/cloud/datastore/query.py +++ b/datastore/google/cloud/datastore/query.py @@ -541,7 +541,6 @@ def _next_page(self, retry=DEFAULT_RETRY): self._query.project, partition_id, read_options, query=query_pb ) entity_pbs = self._process_query_results(response_pb) - # return page_iterator.Page(self, entity_pbs, self.item_to_value) return helpers._call_api( page_iterator.Page, retry, diff --git a/datastore/google/cloud/datastore/retry.py b/datastore/google/cloud/datastore/retry.py index 4bc4b757f45d..7fcafa6fcf31 100644 --- a/datastore/google/cloud/datastore/retry.py +++ b/datastore/google/cloud/datastore/retry.py @@ -1,4 +1,4 @@ -# Copyright 2018 Google LLC +# Copyright 2019 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -51,5 +51,5 @@ def _should_retry(exc): with reasonable defaults. To disable retry, pass ``retry=None``. To modify the default retry behavior, call a ``with_XXX`` method on ``DEFAULT_RETRY``. For example, to change the deadline to 30 seconds, -pass ``retry=bigquery.DEFAULT_RETRY.with_deadline(30)``. +pass ``retry=datastore.DEFAULT_RETRY.with_deadline(30)``. """ diff --git a/datastore/tests/unit/test_helpers.py b/datastore/tests/unit/test_helpers.py index 995d9cfa2330..ebbc549a6fcd 100644 --- a/datastore/tests/unit/test_helpers.py +++ b/datastore/tests/unit/test_helpers.py @@ -13,7 +13,7 @@ # limitations under the License. import unittest - +from mock import Mock class Test__new_value_pb(unittest.TestCase): def _call_fut(self, entity_pb, name): @@ -991,6 +991,23 @@ def test_array_value_meaning_partially_unset(self): self.assertEqual(result, [meaning1, None]) +class Test___call_api(unittest.TestCase): + + def _call_fut(self, fnc_call, retry, *args, **kwargs): + from google.cloud.datastore.helpers import _call_api + return _call_api(fnc_call, retry, *args, **kwargs) + + def test_call_api(self): + from google.cloud.datastore.retry import DEFAULT_RETRY + retry = DEFAULT_RETRY + fnc_call = Mock() + self._call_fut(fnc_call, retry) + fnc_call1 = Mock() + self._call_fut(fnc_call1, None) + self.assertEqual(fnc_call.call_count, 1) + self.assertEqual(fnc_call1.call_count, 1) + + class TestGeoPoint(unittest.TestCase): @staticmethod def _get_target_class(): From 425e731eb96c8f36858a0985eba9ebdbdb6bcc04 Mon Sep 17 00:00:00 2001 From: paras Date: Tue, 16 Jul 2019 18:30:58 +0530 Subject: [PATCH 4/4] removed extra error code, correction real error response --- datastore/google/cloud/datastore/retry.py | 22 +++++++++------------ datastore/tests/unit/test_helpers.py | 4 +++- datastore/tests/unit/test_retry.py | 24 +++++------------------ 3 files changed, 17 insertions(+), 33 deletions(-) diff --git a/datastore/google/cloud/datastore/retry.py b/datastore/google/cloud/datastore/retry.py index 7fcafa6fcf31..e59dac76e5dc 100644 --- a/datastore/google/cloud/datastore/retry.py +++ b/datastore/google/cloud/datastore/retry.py @@ -16,32 +16,28 @@ from google.api_core import retry -_RETRYABLE_REASONS = frozenset( - ["rateLimitExceeded", "backendError", "internalError", "badGateway"] -) +_RETRYABLE_REASONS = frozenset(["DEADLINE_EXCEEDED", "UNAVAILABLE"]) _UNSTRUCTURED_RETRYABLE_TYPES = ( exceptions.TooManyRequests, exceptions.InternalServerError, - exceptions.BadGateway, ) def _should_retry(exc): """Predicate for determining when to retry. - We retry if and only if the 'reason' is 'backendError' - or 'rateLimitExceeded'. + We retry if and only if the 'error status' is 'DEADLINE_EXCEEDED' + or 'UNAVAILABLE'. """ - if not hasattr(exc, "errors"): - return False - - if len(exc.errors) == 0: - # Check for unstructured error returns, e.g. from GFE + if hasattr(exc, "errors"): + # Check for unstructured error returns return isinstance(exc, _UNSTRUCTURED_RETRYABLE_TYPES) - reason = exc.errors[0]["reason"] - return reason in _RETRYABLE_REASONS + if hasattr(exc, "error"): + reason = exc.error["status"] + return reason in _RETRYABLE_REASONS + return False DEFAULT_RETRY = retry.Retry(predicate=_should_retry) diff --git a/datastore/tests/unit/test_helpers.py b/datastore/tests/unit/test_helpers.py index ebbc549a6fcd..af2514cd14dd 100644 --- a/datastore/tests/unit/test_helpers.py +++ b/datastore/tests/unit/test_helpers.py @@ -15,6 +15,7 @@ import unittest from mock import Mock + class Test__new_value_pb(unittest.TestCase): def _call_fut(self, entity_pb, name): from google.cloud.datastore.helpers import _new_value_pb @@ -992,13 +993,14 @@ def test_array_value_meaning_partially_unset(self): class Test___call_api(unittest.TestCase): - def _call_fut(self, fnc_call, retry, *args, **kwargs): from google.cloud.datastore.helpers import _call_api + return _call_api(fnc_call, retry, *args, **kwargs) def test_call_api(self): from google.cloud.datastore.retry import DEFAULT_RETRY + retry = DEFAULT_RETRY fnc_call = Mock() self._call_fut(fnc_call, retry) diff --git a/datastore/tests/unit/test_retry.py b/datastore/tests/unit/test_retry.py index 737b4730adf6..4c9f420c6aab 100644 --- a/datastore/tests/unit/test_retry.py +++ b/datastore/tests/unit/test_retry.py @@ -31,15 +31,15 @@ def test_w_empty_errors(self): self.assertFalse(self._call_fut(exc)) def test_w_non_matching_reason(self): - exc = mock.Mock(errors=[{"reason": "bogus"}], spec=["errors"]) + exc = mock.Mock(error={"status": "bogus"}, spec=["error"]) self.assertFalse(self._call_fut(exc)) - def test_w_backendError(self): - exc = mock.Mock(errors=[{"reason": "backendError"}], spec=["errors"]) + def test_w_DEADLINE_EXCEEDED(self): + exc = mock.Mock(error={"status": "DEADLINE_EXCEEDED"}, spec=["error"]) self.assertTrue(self._call_fut(exc)) - def test_w_rateLimitExceeded(self): - exc = mock.Mock(errors=[{"reason": "rateLimitExceeded"}], spec=["errors"]) + def test_w_UNAVAILABLE(self): + exc = mock.Mock(error={"status": "UNAVAILABLE"}, spec=["error"]) self.assertTrue(self._call_fut(exc)) def test_w_unstructured_too_many_requests(self): @@ -48,22 +48,8 @@ def test_w_unstructured_too_many_requests(self): exc = TooManyRequests("testing") self.assertTrue(self._call_fut(exc)) - def test_w_internalError(self): - exc = mock.Mock(errors=[{"reason": "internalError"}], spec=["errors"]) - self.assertTrue(self._call_fut(exc)) - def test_w_unstructured_internal_server_error(self): from google.api_core.exceptions import InternalServerError exc = InternalServerError("testing") self.assertTrue(self._call_fut(exc)) - - def test_w_badGateway(self): - exc = mock.Mock(errors=[{"reason": "badGateway"}], spec=["errors"]) - self.assertTrue(self._call_fut(exc)) - - def test_w_unstructured_bad_gateway(self): - from google.api_core.exceptions import BadGateway - - exc = BadGateway("testing") - self.assertTrue(self._call_fut(exc))