From 4941cbf2d613aa1d60aacafd50319dadac282270 Mon Sep 17 00:00:00 2001 From: chemelnucfin Date: Wed, 8 Nov 2017 21:51:38 -0800 Subject: [PATCH] Allow specifying read consistency (#4343) * Closes #4340 - Specify Read Consistency * review changes * merge conflicts; correct imports --- datastore/google/cloud/datastore/client.py | 79 +++++++++------------ datastore/google/cloud/datastore/helpers.py | 42 +++++++++-- datastore/google/cloud/datastore/query.py | 66 ++++++++++------- datastore/tests/unit/test_client.py | 35 --------- datastore/tests/unit/test_helpers.py | 35 +++++++++ 5 files changed, 146 insertions(+), 111 deletions(-) diff --git a/datastore/google/cloud/datastore/client.py b/datastore/google/cloud/datastore/client.py index ec522cc5c1cc..d1091e119a67 100644 --- a/datastore/google/cloud/datastore/client.py +++ b/datastore/google/cloud/datastore/client.py @@ -15,23 +15,21 @@ import os -from google.cloud.datastore_v1.proto import datastore_pb2 as _datastore_pb2 - from google.cloud._helpers import _LocalStack -from google.cloud._helpers import ( - _determine_default_project as _base_default_project) +from google.cloud._helpers import (_determine_default_project as + _base_default_project) from google.cloud.client import ClientWithProject -from google.cloud.environment_vars import DISABLE_GRPC -from google.cloud.environment_vars import GCD_DATASET -from google.cloud.environment_vars import GCD_HOST - -from google.cloud.datastore._http import HTTPDatastoreAPI from google.cloud.datastore import helpers +from google.cloud.datastore._http import HTTPDatastoreAPI from google.cloud.datastore.batch import Batch from google.cloud.datastore.entity import Entity from google.cloud.datastore.key import Key from google.cloud.datastore.query import Query from google.cloud.datastore.transaction import Transaction +from google.cloud.environment_vars import DISABLE_GRPC +from google.cloud.environment_vars import GCD_DATASET +from google.cloud.environment_vars import GCD_HOST + try: from google.cloud.datastore._gax import make_datastore_api _HAVE_GRPC = True @@ -131,7 +129,7 @@ def _extended_lookup(datastore_api, project, key_pbs, results = [] loop_num = 0 - read_options = _get_read_options(eventual, transaction_id) + read_options = helpers.get_read_options(eventual, transaction_id) while loop_num < _MAX_LOOPS: # loop against possible deferred. loop_num += 1 lookup_response = datastore_api.lookup( @@ -279,7 +277,8 @@ def current_transaction(self): if isinstance(transaction, Transaction): return transaction - def get(self, key, missing=None, deferred=None, transaction=None): + def get(self, key, missing=None, deferred=None, + transaction=None, eventual=False): """Retrieve an entity from a single key (if it exists). .. note:: @@ -305,15 +304,26 @@ def get(self, key, missing=None, deferred=None, transaction=None): :param transaction: (Optional) Transaction to use for read consistency. If not passed, uses current transaction, if set. + :type eventual: bool + :param eventual: (Optional) Defaults to strongly consistent (False). + Setting True will use eventual consistency, but cannot + be used inside a transaction or will raise ValueError. + :rtype: :class:`google.cloud.datastore.entity.Entity` or ``NoneType`` :returns: The requested entity if it exists. + + :raises: :class:`ValueError` if eventual is True and in a transaction. """ - entities = self.get_multi(keys=[key], missing=missing, - deferred=deferred, transaction=transaction) + entities = self.get_multi(keys=[key], + missing=missing, + deferred=deferred, + transaction=transaction, + eventual=eventual) if entities: return entities[0] - def get_multi(self, keys, missing=None, deferred=None, transaction=None): + def get_multi(self, keys, missing=None, deferred=None, + transaction=None, eventual=False): """Retrieve entities, along with their attributes. :type keys: list of :class:`google.cloud.datastore.key.Key` @@ -334,10 +344,17 @@ def get_multi(self, keys, missing=None, deferred=None, transaction=None): :param transaction: (Optional) Transaction to use for read consistency. If not passed, uses current transaction, if set. + :type eventual: bool + :param eventual: (Optional) Defaults to strongly consistent (False). + Setting True will use eventual consistency, + but cannot be used inside a transaction or + will raise ValueError. + :rtype: list of :class:`google.cloud.datastore.entity.Entity` :returns: The requested entities. :raises: :class:`ValueError` if one or more of ``keys`` has a project which does not match our project. + :raises: :class:`ValueError` if eventual is True and in a transaction. """ if not keys: return [] @@ -353,7 +370,8 @@ def get_multi(self, keys, missing=None, deferred=None, transaction=None): entity_pbs = _extended_lookup( datastore_api=self._datastore_api, project=self.project, - key_pbs=[k.to_protobuf() for k in keys], + key_pbs=[key.to_protobuf() for key in keys], + eventual=eventual, missing=missing, deferred=deferred, transaction_id=transaction and transaction.id, @@ -581,34 +599,3 @@ def do_something(entity): if 'namespace' not in kwargs: kwargs['namespace'] = self.namespace return Query(self, **kwargs) - - -def _get_read_options(eventual, transaction_id): - """Validate rules for read options, and assign to the request. - - Helper method for ``lookup()`` and ``run_query``. - - :type eventual: bool - :param eventual: Flag indicating if ``EVENTUAL`` or ``STRONG`` - consistency should be used. - - :type transaction_id: bytes - :param transaction_id: A transaction identifier (may be null). - - :rtype: :class:`.datastore_pb2.ReadOptions` - :returns: The read options corresponding to the inputs. - :raises: :class:`ValueError` if ``eventual`` is ``True`` and the - ``transaction_id`` is not ``None``. - """ - if transaction_id is None: - if eventual: - return _datastore_pb2.ReadOptions( - read_consistency=_datastore_pb2.ReadOptions.EVENTUAL) - else: - return _datastore_pb2.ReadOptions() - else: - if eventual: - raise ValueError('eventual must be False when in a transaction') - else: - return _datastore_pb2.ReadOptions( - transaction=transaction_id) diff --git a/datastore/google/cloud/datastore/helpers.py b/datastore/google/cloud/datastore/helpers.py index 11e21aa46da0..942819403a7f 100644 --- a/datastore/google/cloud/datastore/helpers.py +++ b/datastore/google/cloud/datastore/helpers.py @@ -19,17 +19,18 @@ import datetime import itertools - -from google.protobuf import struct_pb2 -from google.type import latlng_pb2 import six from google.cloud._helpers import _datetime_to_pb_timestamp from google.cloud._helpers import _pb_timestamp_to_datetime -from google.cloud.datastore_v1.proto import entity_pb2 as _entity_pb2 +from google.cloud.datastore_v1.proto import datastore_pb2 +from google.cloud.datastore_v1.proto import entity_pb2 from google.cloud.datastore.entity import Entity from google.cloud.datastore.key import Key +from google.protobuf import struct_pb2 +from google.type import latlng_pb2 + def _get_meaning(value_pb, is_list=False): """Get the meaning from a protobuf value. @@ -204,7 +205,7 @@ def entity_to_protobuf(entity): :rtype: :class:`.entity_pb2.Entity` :returns: The protobuf representing the entity. """ - entity_pb = _entity_pb2.Entity() + entity_pb = entity_pb2.Entity() if entity.key is not None: key_pb = entity.key.to_protobuf() entity_pb.key.CopyFrom(key_pb) @@ -233,6 +234,37 @@ def entity_to_protobuf(entity): return entity_pb +def get_read_options(eventual, transaction_id): + """Validate rules for read options, and assign to the request. + + Helper method for ``lookup()`` and ``run_query``. + + :type eventual: bool + :param eventual: Flag indicating if ``EVENTUAL`` or ``STRONG`` + consistency should be used. + + :type transaction_id: bytes + :param transaction_id: A transaction identifier (may be null). + + :rtype: :class:`.datastore_pb2.ReadOptions` + :returns: The read options corresponding to the inputs. + :raises: :class:`ValueError` if ``eventual`` is ``True`` and the + ``transaction_id`` is not ``None``. + """ + if transaction_id is None: + if eventual: + return datastore_pb2.ReadOptions( + read_consistency=datastore_pb2.ReadOptions.EVENTUAL) + else: + return datastore_pb2.ReadOptions() + else: + if eventual: + raise ValueError('eventual must be False when in a transaction') + else: + return datastore_pb2.ReadOptions( + transaction=transaction_id) + + def key_from_protobuf(pb): """Factory method for creating a key based on a protobuf. diff --git a/datastore/google/cloud/datastore/query.py b/datastore/google/cloud/datastore/query.py index 28febdd1d422..ea16d4ee0690 100644 --- a/datastore/google/cloud/datastore/query.py +++ b/datastore/google/cloud/datastore/query.py @@ -19,19 +19,18 @@ from google.api_core import page_iterator from google.cloud._helpers import _ensure_tuple_or_list -from google.cloud.datastore_v1.proto import datastore_pb2 as _datastore_pb2 -from google.cloud.datastore_v1.proto import entity_pb2 as _entity_pb2 -from google.cloud.datastore_v1.proto import query_pb2 as _query_pb2 +from google.cloud.datastore_v1.proto import entity_pb2 +from google.cloud.datastore_v1.proto import query_pb2 from google.cloud.datastore import helpers from google.cloud.datastore.key import Key -_NOT_FINISHED = _query_pb2.QueryResultBatch.NOT_FINISHED +_NOT_FINISHED = query_pb2.QueryResultBatch.NOT_FINISHED _FINISHED = ( - _query_pb2.QueryResultBatch.NO_MORE_RESULTS, - _query_pb2.QueryResultBatch.MORE_RESULTS_AFTER_LIMIT, - _query_pb2.QueryResultBatch.MORE_RESULTS_AFTER_CURSOR, + query_pb2.QueryResultBatch.NO_MORE_RESULTS, + query_pb2.QueryResultBatch.MORE_RESULTS_AFTER_LIMIT, + query_pb2.QueryResultBatch.MORE_RESULTS_AFTER_CURSOR, ) @@ -81,11 +80,11 @@ class Query(object): """ OPERATORS = { - '<=': _query_pb2.PropertyFilter.LESS_THAN_OR_EQUAL, - '>=': _query_pb2.PropertyFilter.GREATER_THAN_OR_EQUAL, - '<': _query_pb2.PropertyFilter.LESS_THAN, - '>': _query_pb2.PropertyFilter.GREATER_THAN, - '=': _query_pb2.PropertyFilter.EQUAL, + '<=': query_pb2.PropertyFilter.LESS_THAN_OR_EQUAL, + '>=': query_pb2.PropertyFilter.GREATER_THAN_OR_EQUAL, + '<': query_pb2.PropertyFilter.LESS_THAN, + '>': query_pb2.PropertyFilter.GREATER_THAN, + '=': query_pb2.PropertyFilter.EQUAL, } """Mapping of operator strings and their protobuf equivalents.""" @@ -331,7 +330,7 @@ def distinct_on(self, value): self._distinct_on[:] = value def fetch(self, limit=None, offset=0, start_cursor=None, end_cursor=None, - client=None): + client=None, eventual=False): """Execute the Query; return an iterator for the matching entities. For example:: @@ -358,18 +357,28 @@ def fetch(self, limit=None, offset=0, start_cursor=None, end_cursor=None, :param end_cursor: (Optional) cursor passed through to the iterator. :type client: :class:`google.cloud.datastore.client.Client` - :param client: client used to connect to datastore. + :param client: (Optional) client used to connect to datastore. If not supplied, uses the query's value. + :type eventual: bool + :param eventual: (Optional) Defaults to strongly consistent (False). + Setting True will use eventual consistency, + but cannot be used inside a transaction or + will raise ValueError. + :rtype: :class:`Iterator` :returns: The iterator for the query. """ if client is None: client = self._client - return Iterator( - self, client, limit=limit, offset=offset, - start_cursor=start_cursor, end_cursor=end_cursor) + return Iterator(self, + client, + limit=limit, + offset=offset, + start_cursor=start_cursor, + end_cursor=end_cursor, + eventual=eventual) class Iterator(page_iterator.Iterator): @@ -396,18 +405,25 @@ class Iterator(page_iterator.Iterator): :type end_cursor: bytes :param end_cursor: (Optional) Cursor to end paging through query results. + + :type eventual: bool + :param eventual: (Optional) Defaults to strongly consistent (False). + Setting True will use eventual consistency, + but cannot be used inside a transaction or + will raise ValueError. """ next_page_token = None def __init__(self, query, client, limit=None, offset=None, - start_cursor=None, end_cursor=None): + start_cursor=None, end_cursor=None, eventual=False): super(Iterator, self).__init__( client=client, item_to_value=_item_to_entity, page_token=start_cursor, max_results=limit) self._query = query self._offset = offset self._end_cursor = end_cursor + self._eventual = eventual # The attributes below will change over the life of the iterator. self._more_results = True self._skipped_results = 0 @@ -483,12 +499,12 @@ def _next_page(self): query_pb = self._build_protobuf() transaction = self.client.current_transaction if transaction is None: - read_options = _datastore_pb2.ReadOptions() + transaction_id = None else: - read_options = _datastore_pb2.ReadOptions( - transaction=transaction.id) + transaction_id = transaction.id + read_options = helpers.get_read_options(self._eventual, transaction_id) - partition_id = _entity_pb2.PartitionId( + partition_id = entity_pb2.PartitionId( project_id=self._query.project, namespace_id=self._query.namespace) response_pb = self.client._datastore_api.run_query( @@ -512,7 +528,7 @@ def _pb_from_query(query): it does not contain "in-flight" fields for ongoing query executions (cursors, offset, limit). """ - pb = _query_pb2.Query() + pb = query_pb2.Query() for projection_name in query.projection: pb.projection.add().property.name = projection_name @@ -521,7 +537,7 @@ def _pb_from_query(query): pb.kind.add().name = query.kind composite_filter = pb.filter.composite_filter - composite_filter.op = _query_pb2.CompositeFilter.AND + composite_filter.op = query_pb2.CompositeFilter.AND if query.ancestor: ancestor_pb = query.ancestor.to_protobuf() @@ -529,7 +545,7 @@ def _pb_from_query(query): # Filter on __key__ HAS_ANCESTOR == ancestor. ancestor_filter = composite_filter.filters.add().property_filter ancestor_filter.property.name = '__key__' - ancestor_filter.op = _query_pb2.PropertyFilter.HAS_ANCESTOR + ancestor_filter.op = query_pb2.PropertyFilter.HAS_ANCESTOR ancestor_filter.value.key_value.CopyFrom(ancestor_pb) for property_name, operator, value in query.filters: diff --git a/datastore/tests/unit/test_client.py b/datastore/tests/unit/test_client.py index 51b3e2651531..53a32f59252b 100644 --- a/datastore/tests/unit/test_client.py +++ b/datastore/tests/unit/test_client.py @@ -1033,41 +1033,6 @@ def test_query_w_namespace_collision(self): client, project=self.PROJECT, namespace=namespace2, kind=kind) -class Test__get_read_options(unittest.TestCase): - - def _call_fut(self, eventual, transaction_id): - from google.cloud.datastore.client import _get_read_options - - return _get_read_options(eventual, transaction_id) - - def test_eventual_w_transaction(self): - with self.assertRaises(ValueError): - self._call_fut(True, b'123') - - def test_eventual_wo_transaction(self): - from google.cloud.datastore_v1.proto import datastore_pb2 - - read_options = self._call_fut(True, None) - expected = datastore_pb2.ReadOptions( - read_consistency=datastore_pb2.ReadOptions.EVENTUAL) - self.assertEqual(read_options, expected) - - def test_default_w_transaction(self): - from google.cloud.datastore_v1.proto import datastore_pb2 - - txn_id = b'123abc-easy-as' - read_options = self._call_fut(False, txn_id) - expected = datastore_pb2.ReadOptions(transaction=txn_id) - self.assertEqual(read_options, expected) - - def test_default_wo_transaction(self): - from google.cloud.datastore_v1.proto import datastore_pb2 - - read_options = self._call_fut(False, None) - expected = datastore_pb2.ReadOptions() - self.assertEqual(read_options, expected) - - class _NoCommitBatch(object): def __init__(self, client): diff --git a/datastore/tests/unit/test_helpers.py b/datastore/tests/unit/test_helpers.py index 5e91de0196f4..3624665a2a05 100644 --- a/datastore/tests/unit/test_helpers.py +++ b/datastore/tests/unit/test_helpers.py @@ -498,6 +498,41 @@ def test_w_nothing_in_pb(self): self.assertRaises(ValueError, self._call_fut, pb) +class Test__get_read_options(unittest.TestCase): + + def _call_fut(self, eventual, transaction_id): + from google.cloud.datastore.helpers import get_read_options + + return get_read_options(eventual, transaction_id) + + def test_eventual_w_transaction(self): + with self.assertRaises(ValueError): + self._call_fut(True, b'123') + + def test_eventual_wo_transaction(self): + from google.cloud.datastore_v1.proto import datastore_pb2 + + read_options = self._call_fut(True, None) + expected = datastore_pb2.ReadOptions( + read_consistency=datastore_pb2.ReadOptions.EVENTUAL) + self.assertEqual(read_options, expected) + + def test_default_w_transaction(self): + from google.cloud.datastore_v1.proto import datastore_pb2 + + txn_id = b'123abc-easy-as' + read_options = self._call_fut(False, txn_id) + expected = datastore_pb2.ReadOptions(transaction=txn_id) + self.assertEqual(read_options, expected) + + def test_default_wo_transaction(self): + from google.cloud.datastore_v1.proto import datastore_pb2 + + read_options = self._call_fut(False, None) + expected = datastore_pb2.ReadOptions() + self.assertEqual(read_options, expected) + + class Test__pb_attr_value(unittest.TestCase): def _call_fut(self, val):