Skip to content

Commit

Permalink
Allow specifying read consistency (#4343)
Browse files Browse the repository at this point in the history
* Closes #4340 - Specify Read Consistency

* review changes

* merge conflicts; correct imports
  • Loading branch information
chemelnucfin authored Nov 9, 2017
1 parent 10ef028 commit 4941cbf
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 111 deletions.
79 changes: 33 additions & 46 deletions datastore/google/cloud/datastore/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,21 @@

import os

from google.cloud.datastore_v1.proto import datastore_pb2 as _datastore_pb2

from google.cloud._helpers import _LocalStack
from google.cloud._helpers import (
_determine_default_project as _base_default_project)
from google.cloud._helpers import (_determine_default_project as
_base_default_project)
from google.cloud.client import ClientWithProject
from google.cloud.environment_vars import DISABLE_GRPC
from google.cloud.environment_vars import GCD_DATASET
from google.cloud.environment_vars import GCD_HOST

from google.cloud.datastore._http import HTTPDatastoreAPI
from google.cloud.datastore import helpers
from google.cloud.datastore._http import HTTPDatastoreAPI
from google.cloud.datastore.batch import Batch
from google.cloud.datastore.entity import Entity
from google.cloud.datastore.key import Key
from google.cloud.datastore.query import Query
from google.cloud.datastore.transaction import Transaction
from google.cloud.environment_vars import DISABLE_GRPC
from google.cloud.environment_vars import GCD_DATASET
from google.cloud.environment_vars import GCD_HOST

try:
from google.cloud.datastore._gax import make_datastore_api
_HAVE_GRPC = True
Expand Down Expand Up @@ -131,7 +129,7 @@ def _extended_lookup(datastore_api, project, key_pbs,
results = []

loop_num = 0
read_options = _get_read_options(eventual, transaction_id)
read_options = helpers.get_read_options(eventual, transaction_id)
while loop_num < _MAX_LOOPS: # loop against possible deferred.
loop_num += 1
lookup_response = datastore_api.lookup(
Expand Down Expand Up @@ -279,7 +277,8 @@ def current_transaction(self):
if isinstance(transaction, Transaction):
return transaction

def get(self, key, missing=None, deferred=None, transaction=None):
def get(self, key, missing=None, deferred=None,
transaction=None, eventual=False):
"""Retrieve an entity from a single key (if it exists).
.. note::
Expand All @@ -305,15 +304,26 @@ def get(self, key, missing=None, deferred=None, transaction=None):
:param transaction: (Optional) Transaction to use for read consistency.
If not passed, uses current transaction, if set.
:type eventual: bool
:param eventual: (Optional) Defaults to strongly consistent (False).
Setting True will use eventual consistency, but cannot
be used inside a transaction or will raise ValueError.
:rtype: :class:`google.cloud.datastore.entity.Entity` or ``NoneType``
:returns: The requested entity if it exists.
:raises: :class:`ValueError` if eventual is True and in a transaction.
"""
entities = self.get_multi(keys=[key], missing=missing,
deferred=deferred, transaction=transaction)
entities = self.get_multi(keys=[key],
missing=missing,
deferred=deferred,
transaction=transaction,
eventual=eventual)
if entities:
return entities[0]

def get_multi(self, keys, missing=None, deferred=None, transaction=None):
def get_multi(self, keys, missing=None, deferred=None,
transaction=None, eventual=False):
"""Retrieve entities, along with their attributes.
:type keys: list of :class:`google.cloud.datastore.key.Key`
Expand All @@ -334,10 +344,17 @@ def get_multi(self, keys, missing=None, deferred=None, transaction=None):
:param transaction: (Optional) Transaction to use for read consistency.
If not passed, uses current transaction, if set.
:type eventual: bool
:param eventual: (Optional) Defaults to strongly consistent (False).
Setting True will use eventual consistency,
but cannot be used inside a transaction or
will raise ValueError.
:rtype: list of :class:`google.cloud.datastore.entity.Entity`
:returns: The requested entities.
:raises: :class:`ValueError` if one or more of ``keys`` has a project
which does not match our project.
:raises: :class:`ValueError` if eventual is True and in a transaction.
"""
if not keys:
return []
Expand All @@ -353,7 +370,8 @@ def get_multi(self, keys, missing=None, deferred=None, transaction=None):
entity_pbs = _extended_lookup(
datastore_api=self._datastore_api,
project=self.project,
key_pbs=[k.to_protobuf() for k in keys],
key_pbs=[key.to_protobuf() for key in keys],
eventual=eventual,
missing=missing,
deferred=deferred,
transaction_id=transaction and transaction.id,
Expand Down Expand Up @@ -581,34 +599,3 @@ def do_something(entity):
if 'namespace' not in kwargs:
kwargs['namespace'] = self.namespace
return Query(self, **kwargs)


def _get_read_options(eventual, transaction_id):
"""Validate rules for read options, and assign to the request.
Helper method for ``lookup()`` and ``run_query``.
:type eventual: bool
:param eventual: Flag indicating if ``EVENTUAL`` or ``STRONG``
consistency should be used.
:type transaction_id: bytes
:param transaction_id: A transaction identifier (may be null).
:rtype: :class:`.datastore_pb2.ReadOptions`
:returns: The read options corresponding to the inputs.
:raises: :class:`ValueError` if ``eventual`` is ``True`` and the
``transaction_id`` is not ``None``.
"""
if transaction_id is None:
if eventual:
return _datastore_pb2.ReadOptions(
read_consistency=_datastore_pb2.ReadOptions.EVENTUAL)
else:
return _datastore_pb2.ReadOptions()
else:
if eventual:
raise ValueError('eventual must be False when in a transaction')
else:
return _datastore_pb2.ReadOptions(
transaction=transaction_id)
42 changes: 37 additions & 5 deletions datastore/google/cloud/datastore/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@

import datetime
import itertools

from google.protobuf import struct_pb2
from google.type import latlng_pb2
import six

from google.cloud._helpers import _datetime_to_pb_timestamp
from google.cloud._helpers import _pb_timestamp_to_datetime
from google.cloud.datastore_v1.proto import entity_pb2 as _entity_pb2
from google.cloud.datastore_v1.proto import datastore_pb2
from google.cloud.datastore_v1.proto import entity_pb2
from google.cloud.datastore.entity import Entity
from google.cloud.datastore.key import Key

from google.protobuf import struct_pb2
from google.type import latlng_pb2


def _get_meaning(value_pb, is_list=False):
"""Get the meaning from a protobuf value.
Expand Down Expand Up @@ -204,7 +205,7 @@ def entity_to_protobuf(entity):
:rtype: :class:`.entity_pb2.Entity`
:returns: The protobuf representing the entity.
"""
entity_pb = _entity_pb2.Entity()
entity_pb = entity_pb2.Entity()
if entity.key is not None:
key_pb = entity.key.to_protobuf()
entity_pb.key.CopyFrom(key_pb)
Expand Down Expand Up @@ -233,6 +234,37 @@ def entity_to_protobuf(entity):
return entity_pb


def get_read_options(eventual, transaction_id):
"""Validate rules for read options, and assign to the request.
Helper method for ``lookup()`` and ``run_query``.
:type eventual: bool
:param eventual: Flag indicating if ``EVENTUAL`` or ``STRONG``
consistency should be used.
:type transaction_id: bytes
:param transaction_id: A transaction identifier (may be null).
:rtype: :class:`.datastore_pb2.ReadOptions`
:returns: The read options corresponding to the inputs.
:raises: :class:`ValueError` if ``eventual`` is ``True`` and the
``transaction_id`` is not ``None``.
"""
if transaction_id is None:
if eventual:
return datastore_pb2.ReadOptions(
read_consistency=datastore_pb2.ReadOptions.EVENTUAL)
else:
return datastore_pb2.ReadOptions()
else:
if eventual:
raise ValueError('eventual must be False when in a transaction')
else:
return datastore_pb2.ReadOptions(
transaction=transaction_id)


def key_from_protobuf(pb):
"""Factory method for creating a key based on a protobuf.
Expand Down
66 changes: 41 additions & 25 deletions datastore/google/cloud/datastore/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,18 @@
from google.api_core import page_iterator
from google.cloud._helpers import _ensure_tuple_or_list

from google.cloud.datastore_v1.proto import datastore_pb2 as _datastore_pb2
from google.cloud.datastore_v1.proto import entity_pb2 as _entity_pb2
from google.cloud.datastore_v1.proto import query_pb2 as _query_pb2
from google.cloud.datastore_v1.proto import entity_pb2
from google.cloud.datastore_v1.proto import query_pb2
from google.cloud.datastore import helpers
from google.cloud.datastore.key import Key


_NOT_FINISHED = _query_pb2.QueryResultBatch.NOT_FINISHED
_NOT_FINISHED = query_pb2.QueryResultBatch.NOT_FINISHED

_FINISHED = (
_query_pb2.QueryResultBatch.NO_MORE_RESULTS,
_query_pb2.QueryResultBatch.MORE_RESULTS_AFTER_LIMIT,
_query_pb2.QueryResultBatch.MORE_RESULTS_AFTER_CURSOR,
query_pb2.QueryResultBatch.NO_MORE_RESULTS,
query_pb2.QueryResultBatch.MORE_RESULTS_AFTER_LIMIT,
query_pb2.QueryResultBatch.MORE_RESULTS_AFTER_CURSOR,
)


Expand Down Expand Up @@ -81,11 +80,11 @@ class Query(object):
"""

OPERATORS = {
'<=': _query_pb2.PropertyFilter.LESS_THAN_OR_EQUAL,
'>=': _query_pb2.PropertyFilter.GREATER_THAN_OR_EQUAL,
'<': _query_pb2.PropertyFilter.LESS_THAN,
'>': _query_pb2.PropertyFilter.GREATER_THAN,
'=': _query_pb2.PropertyFilter.EQUAL,
'<=': query_pb2.PropertyFilter.LESS_THAN_OR_EQUAL,
'>=': query_pb2.PropertyFilter.GREATER_THAN_OR_EQUAL,
'<': query_pb2.PropertyFilter.LESS_THAN,
'>': query_pb2.PropertyFilter.GREATER_THAN,
'=': query_pb2.PropertyFilter.EQUAL,
}
"""Mapping of operator strings and their protobuf equivalents."""

Expand Down Expand Up @@ -331,7 +330,7 @@ def distinct_on(self, value):
self._distinct_on[:] = value

def fetch(self, limit=None, offset=0, start_cursor=None, end_cursor=None,
client=None):
client=None, eventual=False):
"""Execute the Query; return an iterator for the matching entities.
For example::
Expand All @@ -358,18 +357,28 @@ def fetch(self, limit=None, offset=0, start_cursor=None, end_cursor=None,
:param end_cursor: (Optional) cursor passed through to the iterator.
:type client: :class:`google.cloud.datastore.client.Client`
:param client: client used to connect to datastore.
:param client: (Optional) client used to connect to datastore.
If not supplied, uses the query's value.
:type eventual: bool
:param eventual: (Optional) Defaults to strongly consistent (False).
Setting True will use eventual consistency,
but cannot be used inside a transaction or
will raise ValueError.
:rtype: :class:`Iterator`
:returns: The iterator for the query.
"""
if client is None:
client = self._client

return Iterator(
self, client, limit=limit, offset=offset,
start_cursor=start_cursor, end_cursor=end_cursor)
return Iterator(self,
client,
limit=limit,
offset=offset,
start_cursor=start_cursor,
end_cursor=end_cursor,
eventual=eventual)


class Iterator(page_iterator.Iterator):
Expand All @@ -396,18 +405,25 @@ class Iterator(page_iterator.Iterator):
:type end_cursor: bytes
:param end_cursor: (Optional) Cursor to end paging through
query results.
:type eventual: bool
:param eventual: (Optional) Defaults to strongly consistent (False).
Setting True will use eventual consistency,
but cannot be used inside a transaction or
will raise ValueError.
"""

next_page_token = None

def __init__(self, query, client, limit=None, offset=None,
start_cursor=None, end_cursor=None):
start_cursor=None, end_cursor=None, eventual=False):
super(Iterator, self).__init__(
client=client, item_to_value=_item_to_entity,
page_token=start_cursor, max_results=limit)
self._query = query
self._offset = offset
self._end_cursor = end_cursor
self._eventual = eventual
# The attributes below will change over the life of the iterator.
self._more_results = True
self._skipped_results = 0
Expand Down Expand Up @@ -483,12 +499,12 @@ def _next_page(self):
query_pb = self._build_protobuf()
transaction = self.client.current_transaction
if transaction is None:
read_options = _datastore_pb2.ReadOptions()
transaction_id = None
else:
read_options = _datastore_pb2.ReadOptions(
transaction=transaction.id)
transaction_id = transaction.id
read_options = helpers.get_read_options(self._eventual, transaction_id)

partition_id = _entity_pb2.PartitionId(
partition_id = entity_pb2.PartitionId(
project_id=self._query.project,
namespace_id=self._query.namespace)
response_pb = self.client._datastore_api.run_query(
Expand All @@ -512,7 +528,7 @@ def _pb_from_query(query):
it does not contain "in-flight" fields for ongoing query
executions (cursors, offset, limit).
"""
pb = _query_pb2.Query()
pb = query_pb2.Query()

for projection_name in query.projection:
pb.projection.add().property.name = projection_name
Expand All @@ -521,15 +537,15 @@ def _pb_from_query(query):
pb.kind.add().name = query.kind

composite_filter = pb.filter.composite_filter
composite_filter.op = _query_pb2.CompositeFilter.AND
composite_filter.op = query_pb2.CompositeFilter.AND

if query.ancestor:
ancestor_pb = query.ancestor.to_protobuf()

# Filter on __key__ HAS_ANCESTOR == ancestor.
ancestor_filter = composite_filter.filters.add().property_filter
ancestor_filter.property.name = '__key__'
ancestor_filter.op = _query_pb2.PropertyFilter.HAS_ANCESTOR
ancestor_filter.op = query_pb2.PropertyFilter.HAS_ANCESTOR
ancestor_filter.value.key_value.CopyFrom(ancestor_pb)

for property_name, operator, value in query.filters:
Expand Down
Loading

0 comments on commit 4941cbf

Please sign in to comment.