Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry 'UNAVAILABLE' exceptions during streaming #3889

Closed
wants to merge 9 commits into from
40 changes: 0 additions & 40 deletions docs/spanner/snapshot-usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,26 +62,6 @@ fails if the result set is too large,
manually, perform all iteration within the context of the
``with database.snapshot()`` block.

.. note::

If streaming a chunk raises an exception, the application can
retry the ``read``, passing the ``resume_token`` from ``StreamingResultSet``
which raised the error. E.g.:

.. code:: python

result = snapshot.read(table, columns, keys)
while True:
try:
for row in result.rows:
print row
except Exception:
result = snapshot.read(
table, columns, keys, resume_token=result.resume_token)
continue
else:
break



Execute a SQL Select Statement
Expand Down Expand Up @@ -112,26 +92,6 @@ fails if the result set is too large,
manually, perform all iteration within the context of the
``with database.snapshot()`` block.

.. note::

If streaming a chunk raises an exception, the application can
retry the query, passing the ``resume_token`` from ``StreamingResultSet``
which raised the error. E.g.:

.. code:: python

result = snapshot.execute_sql(QUERY)
while True:
try:
for row in result.rows:
print row
except Exception:
result = snapshot.execute_sql(
QUERY, resume_token=result.resume_token)
continue
else:
break


Next Step
---------
Expand Down
6 changes: 0 additions & 6 deletions docs/spanner/transaction-usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ fails if the result set is too large,
for row in result.rows:
print(row)

.. note::

If streaming a chunk fails due to a "resumable" error,
:meth:`Session.read` retries the ``StreamingRead`` API reqeust,
passing the ``resume_token`` from the last partial result streamed.


Execute a SQL Select Statement
------------------------------
Expand Down
17 changes: 4 additions & 13 deletions spanner/google/cloud/spanner/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,7 @@ def snapshot(self, **kw):

return Snapshot(self, **kw)

def read(self, table, columns, keyset, index='', limit=0,
resume_token=b''):
def read(self, table, columns, keyset, index='', limit=0):
"""Perform a ``StreamingRead`` API request for rows in a table.

:type table: str
Expand All @@ -185,17 +184,12 @@ def read(self, table, columns, keyset, index='', limit=0,
:type limit: int
:param limit: (Optional) maxiumn number of rows to return

:type resume_token: bytes
:param resume_token: token for resuming previously-interrupted read

:rtype: :class:`~google.cloud.spanner.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
"""
return self.snapshot().read(
table, columns, keyset, index, limit, resume_token)
return self.snapshot().read(table, columns, keyset, index, limit)

def execute_sql(self, sql, params=None, param_types=None, query_mode=None,
resume_token=b''):
def execute_sql(self, sql, params=None, param_types=None, query_mode=None):
"""Perform an ``ExecuteStreamingSql`` API request.

:type sql: str
Expand All @@ -216,14 +210,11 @@ def execute_sql(self, sql, params=None, param_types=None, query_mode=None,
:param query_mode: Mode governing return of results / query plan. See
https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest.QueryMode1

:type resume_token: bytes
:param resume_token: token for resuming previously-interrupted query

:rtype: :class:`~google.cloud.spanner.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
"""
return self.snapshot().execute_sql(
sql, params, param_types, query_mode, resume_token)
sql, params, param_types, query_mode)

def batch(self):
"""Factory to create a batch for this session.
Expand Down
34 changes: 18 additions & 16 deletions spanner/google/cloud/spanner/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

"""Model a set of read-only queries to a database as a snapshot."""

import functools

from google.protobuf.struct_pb2 import Struct
from google.cloud.proto.spanner.v1.transaction_pb2 import TransactionOptions
from google.cloud.proto.spanner.v1.transaction_pb2 import TransactionSelector
Expand Down Expand Up @@ -49,8 +51,7 @@ def _make_txn_selector(self): # pylint: disable=redundant-returns-doc
"""
raise NotImplementedError

def read(self, table, columns, keyset, index='', limit=0,
resume_token=b''):
def read(self, table, columns, keyset, index='', limit=0):
"""Perform a ``StreamingRead`` API request for rows in a table.

:type table: str
Expand All @@ -69,9 +70,6 @@ def read(self, table, columns, keyset, index='', limit=0,
:type limit: int
:param limit: (Optional) maxiumn number of rows to return

:type resume_token: bytes
:param resume_token: token for resuming previously-interrupted read

:rtype: :class:`~google.cloud.spanner.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
:raises ValueError:
Expand All @@ -92,17 +90,20 @@ def read(self, table, columns, keyset, index='', limit=0,
iterator = api.streaming_read(
self._session.name, table, columns, keyset.to_pb(),
transaction=transaction, index=index, limit=limit,
resume_token=resume_token, options=options)
options=options)

self._read_request_count += 1

restart = functools.partial(
api.streaming_read, self._session.name, table, columns, keyset,
index=index, limit=limit)

if self._multi_use:
return StreamedResultSet(iterator, source=self)
return StreamedResultSet(iterator, restart, source=self)
else:
return StreamedResultSet(iterator)
return StreamedResultSet(iterator, restart)

def execute_sql(self, sql, params=None, param_types=None, query_mode=None,
resume_token=b''):
def execute_sql(self, sql, params=None, param_types=None, query_mode=None):
"""Perform an ``ExecuteStreamingSql`` API request for rows in a table.

:type sql: str
Expand All @@ -122,9 +123,6 @@ def execute_sql(self, sql, params=None, param_types=None, query_mode=None,
:param query_mode: Mode governing return of results / query plan. See
https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest.QueryMode1

:type resume_token: bytes
:param resume_token: token for resuming previously-interrupted query

:rtype: :class:`~google.cloud.spanner.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
:raises ValueError:
Expand Down Expand Up @@ -153,14 +151,18 @@ def execute_sql(self, sql, params=None, param_types=None, query_mode=None,
iterator = api.execute_streaming_sql(
self._session.name, sql,
transaction=transaction, params=params_pb, param_types=param_types,
query_mode=query_mode, resume_token=resume_token, options=options)
query_mode=query_mode, options=options)

self._read_request_count += 1

restart = functools.partial(
api.execute_streaming_sql, self._session.name, sql,
params=params, param_types=param_types, query_mode=query_mode)

if self._multi_use:
return StreamedResultSet(iterator, source=self)
return StreamedResultSet(iterator, restart, source=self)
else:
return StreamedResultSet(iterator)
return StreamedResultSet(iterator, restart)


class Snapshot(_SnapshotBase):
Expand Down
53 changes: 50 additions & 3 deletions spanner/google/cloud/spanner/streamed.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,39 @@

"""Wrapper for streaming results."""

from google.api.core import exceptions
from google.api.core import retry
from google.protobuf.struct_pb2 import ListValue
from google.protobuf.struct_pb2 import Value
from google.cloud import exceptions
from google.cloud.proto.spanner.v1 import type_pb2
import six

# pylint: disable=ungrouped-imports
from google.cloud.spanner._helpers import _parse_value_pb
# pylint: enable=ungrouped-imports

_RESTART_DEADLINE = 30.0 # seconds

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.



# pylint: disable=invalid-name
# Pylint sees this as a constant, but it is also an alias that should be
# considered a function.
if_unavailable_error = retry.if_exception_type((
exceptions.ServiceUnavailable,
))
"""A predicate that checks if an exception is a transient API error.

For streaming the result of ``read`` / ``execute_sql`` requests, only
the following server errors are considered transient:

- :class:`google.api.core.exceptions.ServiceUnavailable` - HTTP 503, gRPC
``UNAVAILABLE``.
"""

retry_unavailable = retry.Retry(predicate=if_unavailable_error)

This comment was marked as spam.

"""Used by `StreamedResultSet.consume_next`."""
# pylint: enable=invalid-name


class StreamedResultSet(object):
"""Process a sequence of partial result sets into a single set of row data.
Expand All @@ -34,11 +57,18 @@ class StreamedResultSet(object):
:class:`google.cloud.proto.spanner.v1.result_set_pb2.PartialResultSet`
instances.

:type restart: callable
:param restart:
Function (typically curried via :func:`functools.partial`) used to
restart the initial request if a retriable error is raised during
streaming.

:type source: :class:`~google.cloud.spanner.snapshot.Snapshot`
:param source: Snapshot from which the result set was fetched.
"""
def __init__(self, response_iterator, source=None):
def __init__(self, response_iterator, restart, source=None):

This comment was marked as spam.

This comment was marked as spam.

self._response_iterator = response_iterator
self._restart = restart
self._rows = [] # Fully-processed rows
self._counter = 0 # Counter for processed responses
self._metadata = None # Until set from first PRS
Expand Down Expand Up @@ -125,12 +155,29 @@ def _merge_values(self, values):
self._rows.append(self._current_row)
self._current_row = []

def _restart_iterator(self, _exc_ignored):
"""Helper for :meth:`consume_next`."""
if self._resume_token in (None, b''):
raise

self._response_iterator = self._restart(
resume_token=self._resume_token)

def _bump_iterator(self):
"""Helper for :meth:`consume_next`."""
return six.next(self._response_iterator)

def consume_next(self):
"""Consume the next partial result set from the stream.

Parse the result set into new/existing rows in :attr:`_rows`

:raises ValueError:
if the sleep generator somehow does not yield values.
"""
response = six.next(self._response_iterator)
response = retry_unavailable(

This comment was marked as spam.

self._bump_iterator, on_error=self._restart_iterator)()

self._counter += 1
self._resume_token = response.resume_token

Expand Down
2 changes: 1 addition & 1 deletion spanner/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@


REQUIREMENTS = [
'google-cloud-core >= 0.27.0, < 0.28dev',
'google-cloud-core >= 0.27.1, < 0.28dev',
'grpcio >= 1.2.0, < 2.0dev',
'gapic-google-cloud-spanner-v1 >= 0.15.0, < 0.16dev',
'gapic-google-cloud-spanner-admin-database-v1 >= 0.15.0, < 0.16dev',
Expand Down
22 changes: 8 additions & 14 deletions spanner/tests/unit/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ def test_read(self):
KEYSET = KeySet(keys=KEYS)
INDEX = 'email-address-index'
LIMIT = 20
TOKEN = b'DEADBEEF'
database = _Database(self.DATABASE_NAME)
session = self._make_one(database)
session._session_id = 'DEADBEEF'
Expand All @@ -279,28 +278,26 @@ def __init__(self, session, **kwargs):
self._session = session
self._kwargs = kwargs.copy()

def read(self, table, columns, keyset, index='', limit=0,
resume_token=b''):
def read(self, table, columns, keyset, index='', limit=0):
_read_with.append(
(table, columns, keyset, index, limit, resume_token))
(table, columns, keyset, index, limit))
return expected

with _Monkey(MUT, Snapshot=_Snapshot):
found = session.read(
TABLE_NAME, COLUMNS, KEYSET,
index=INDEX, limit=LIMIT, resume_token=TOKEN)
index=INDEX, limit=LIMIT)

self.assertIs(found, expected)

self.assertEqual(len(_read_with), 1)
(table, columns, key_set, index, limit, resume_token) = _read_with[0]
(table, columns, key_set, index, limit) = _read_with[0]

self.assertEqual(table, TABLE_NAME)
self.assertEqual(columns, COLUMNS)
self.assertEqual(key_set, KEYSET)
self.assertEqual(index, INDEX)
self.assertEqual(limit, LIMIT)
self.assertEqual(resume_token, TOKEN)

def test_execute_sql_not_created(self):
SQL = 'SELECT first_name, age FROM citizens'
Expand All @@ -315,7 +312,6 @@ def test_execute_sql_defaults(self):
from google.cloud._testing import _Monkey

SQL = 'SELECT first_name, age FROM citizens'
TOKEN = b'DEADBEEF'
database = _Database(self.DATABASE_NAME)
session = self._make_one(database)
session._session_id = 'DEADBEEF'
Expand All @@ -330,25 +326,23 @@ def __init__(self, session, **kwargs):
self._kwargs = kwargs.copy()

def execute_sql(
self, sql, params=None, param_types=None, query_mode=None,
resume_token=None):
self, sql, params=None, param_types=None, query_mode=None):
_executed_sql_with.append(
(sql, params, param_types, query_mode, resume_token))
(sql, params, param_types, query_mode))
return expected

with _Monkey(MUT, Snapshot=_Snapshot):
found = session.execute_sql(SQL, resume_token=TOKEN)
found = session.execute_sql(SQL)

self.assertIs(found, expected)

self.assertEqual(len(_executed_sql_with), 1)
sql, params, param_types, query_mode, token = _executed_sql_with[0]
sql, params, param_types, query_mode = _executed_sql_with[0]

self.assertEqual(sql, SQL)
self.assertEqual(params, None)
self.assertEqual(param_types, None)
self.assertEqual(query_mode, None)
self.assertEqual(token, TOKEN)

def test_batch_not_created(self):
database = _Database(self.DATABASE_NAME)
Expand Down
Loading