Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry for yield_rows #4882

Merged
merged 35 commits into from
Mar 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
d540349
Add retry for read rows
Feb 14, 2018
4aa5702
Fix line lengths
Feb 14, 2018
f7d4c69
Add parameter start_inclusive to _create_row_request
Feb 14, 2018
28892ce
Update unit test case for yield rows retry
Feb 14, 2018
8d141f0
Fix line lengths
Feb 14, 2018
902f11b
Edit table.py as requested changes
Feb 16, 2018
4afabb1
Added code review changes from @jonparrott.
Feb 19, 2018
79698b1
Fix parameter name on _RetryableReadRows class
Feb 22, 2018
9b778be
Add retry for Deadline Exceeded on read rows
Feb 23, 2018
cafc5ad
Fix line over-indented
Feb 23, 2018
f974752
Fix line over-indented
Feb 23, 2018
13adfd6
Refactore yield_rows retry
Feb 23, 2018
ca69a2f
1. Moved last_scanned_row_key into YieldRowsData.
Feb 27, 2018
02c154e
Fix line over-indented
Feb 27, 2018
b6da7fa
Fix row_data.py
Feb 27, 2018
f1a2d92
Change row.py as requested
Feb 27, 2018
65c1c4b
Create new test case for yield_rows iterator failure
Feb 28, 2018
170a173
Fix for start_inclusive on retry read rows.
Mar 1, 2018
5da3fd3
Merge branch 'master' of https://github.com/zakons/google-cloud-pytho…
Mar 1, 2018
a610431
Move retry read rows into YieldRowsData.read_rows to avoid throwing e…
Mar 8, 2018
f930b22
Refactor update of request_pb.
Mar 8, 2018
0fe27de
Add new line at the end of the file
Mar 8, 2018
2b47791
Integrate Retry for the read rows retry code.
Mar 9, 2018
14c1c96
Change default for max_count to None.
zakons Mar 12, 2018
4a6128e
Revert changes
zakons Mar 12, 2018
04533d0
fix __next__ = next
Mar 13, 2018
ba533e9
Merge branch 'feature/read_rows_retry' of https://github.com/zakons/g…
Mar 13, 2018
f81fded
Add documentation for new parameters on YieldRowsData
zakons Mar 13, 2018
5ad48d4
Working on unit test code
Mar 14, 2018
ca6f925
Refactor retry read rows using PR review comments.
Mar 16, 2018
bf9fd93
Fix lines over-indented
Mar 16, 2018
0a08c48
Update retry read rows for an edge case where iterator fails on the f…
Mar 16, 2018
67c44ad
Update docstrings for refactored methods.
zakons Mar 16, 2018
3d3477f
Remove functools
Mar 19, 2018
99fb43a
Merge branch 'feature/read_rows_retry' of https://github.com/zakons/g…
Mar 19, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bigtable/google/cloud/bigtable/row.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ def _delete_cells(self, column_family_id, columns, time_range=None,
def _retry_commit_exception(exc):
if isinstance(exc, grpc.RpcError):
exc = exceptions.from_grpc_error(exc)
return isinstance(exc, exceptions.ServiceUnavailable)
return isinstance(exc, (exceptions.ServiceUnavailable,
exceptions.DeadlineExceeded))


class DirectRow(_SetDeleteRow):
Expand Down
91 changes: 72 additions & 19 deletions bigtable/google/cloud/bigtable/row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
import copy
import six

import grpc

from google.api_core import exceptions
from google.api_core import retry
from google.cloud._helpers import _datetime_from_microseconds
from google.cloud._helpers import _to_bytes

Expand Down Expand Up @@ -187,27 +191,29 @@ class InvalidChunk(RuntimeError):
class PartialRowsData(object):
"""Convenience wrapper for consuming a ``ReadRows`` streaming response.

:type response_iterator: :class:`~google.cloud.exceptions.GrpcRendezvous`
:param response_iterator: A streaming iterator returned from a
``ReadRows`` request.
:type read_method: :class:`client._data_stub.ReadRows`
:param read_method: ``ReadRows`` method.

:type request: :class:`data_messages_v2_pb2.ReadRowsRequest`
:param request: The ``ReadRowsRequest`` message used to create a
ReadRowsResponse iterator.
"""

START = 'Start' # No responses yet processed.
NEW_ROW = 'New row' # No cells yet complete for row
ROW_IN_PROGRESS = 'Row in progress' # Some cells complete for row
CELL_IN_PROGRESS = 'Cell in progress' # Incomplete cell for row

def __init__(self, response_iterator):
self._response_iterator = response_iterator
self._generator = YieldRowsData(response_iterator)
def __init__(self, read_method, request):
self._generator = YieldRowsData(read_method, request)

# Fully-processed rows, keyed by `row_key`
self.rows = {}

def __eq__(self, other):
if not isinstance(other, self.__class__):
return NotImplemented
return other._response_iterator == self._response_iterator
return other._generator == self._generator

def __ne__(self, other):
return not self == other
Expand All @@ -234,25 +240,37 @@ def consume_all(self, max_loops=None):
self.rows[row.row_key] = row


def _retry_read_rows_exception(exc):
if isinstance(exc, grpc.RpcError):
exc = exceptions.from_grpc_error(exc)
return isinstance(exc, (exceptions.ServiceUnavailable,
exceptions.DeadlineExceeded))


class YieldRowsData(object):
"""Convenience wrapper for consuming a ``ReadRows`` streaming response.

:type response_iterator: :class:`~google.cloud.exceptions.GrpcRendezvous`
:param response_iterator: A streaming iterator returned from a
``ReadRows`` request.
:type read_method: :class:`client._data_stub.ReadRows`
:param read_method: ``ReadRows`` method.

:type request: :class:`data_messages_v2_pb2.ReadRowsRequest`
:param request: The ``ReadRowsRequest`` message used to create a
ReadRowsResponse iterator. If the iterator fails, a new
iterator is created, allowing the scan to continue from
the point just beyond the last successfully read row,
identified by self.last_scanned_row_key. The retry happens
inside of the Retry class, using a predicate for the
expected exceptions during iteration.
"""

START = 'Start' # No responses yet processed.
NEW_ROW = 'New row' # No cells yet complete for row
ROW_IN_PROGRESS = 'Row in progress' # Some cells complete for row
CELL_IN_PROGRESS = 'Cell in progress' # Incomplete cell for row

def __init__(self, response_iterator):
self._response_iterator = response_iterator
def __init__(self, read_method, request):
# Counter for responses pulled from iterator
self._counter = 0
# Maybe cached from previous response
self._last_scanned_row_key = None
# In-progress row, unset until first response, after commit/reset
self._row = None
# Last complete row, unset until first commit
Expand All @@ -262,6 +280,12 @@ def __init__(self, response_iterator):
# Last complete cell, unset until first completion, after new row
self._previous_cell = None

# May be cached from previous response
self.last_scanned_row_key = None
self.read_method = read_method
self.request = request
self.response_iterator = read_method(request)

@property
def state(self):
"""State machine state.
Expand All @@ -270,7 +294,7 @@ def state(self):
:returns: name of state corresponding to current row / chunk
processing.
"""
if self._last_scanned_row_key is None:
if self.last_scanned_row_key is None:
return self.START
if self._row is None:
assert self._cell is None
Expand All @@ -284,7 +308,35 @@ def state(self):

def cancel(self):
"""Cancels the iterator, closing the stream."""
self._response_iterator.cancel()
self.response_iterator.cancel()

def _create_retry_request(self):
"""Helper for :meth:`read_rows`."""
row_range = self.request.rows.row_ranges.pop()
range_kwargs = {}
# start AFTER the row_key of the last successfully read row
range_kwargs['start_key_open'] = self.last_scanned_row_key
range_kwargs['end_key_open'] = row_range.end_key_open
self.request.rows.row_ranges.add(**range_kwargs)

def _on_error(self, exc):
"""Helper for :meth:`read_rows`."""
# restart the read scan from AFTER the last successfully read row
if self.last_scanned_row_key:
self._create_retry_request()

self.response_iterator = self.read_method(self.request)

def _read_next(self):
"""Helper for :meth:`read_rows`."""
return six.next(self.response_iterator)

def _read_next_response(self):
"""Helper for :meth:`read_rows`."""
retry_ = retry.Retry(
predicate=_retry_read_rows_exception,
deadline=60)
return retry_(self._read_next, on_error=self._on_error)()

def read_rows(self):
"""Consume the ``ReadRowsResponse's`` from the stream.
Expand All @@ -295,17 +347,17 @@ def read_rows(self):
"""
while True:
try:
response = six.next(self._response_iterator)
response = self._read_next_response()
except StopIteration:
break

self._counter += 1

if self._last_scanned_row_key is None: # first response
if self.last_scanned_row_key is None: # first response
if response.last_scanned_row_key:
raise InvalidReadRowsResponse()

self._last_scanned_row_key = response.last_scanned_row_key
self.last_scanned_row_key = response.last_scanned_row_key

row = self._row
cell = self._cell
Expand Down Expand Up @@ -343,6 +395,7 @@ def read_rows(self):

yield self._row

self.last_scanned_row_key = self._row.row_key
self._row, self._previous_row = None, self._row
self._previous_cell = None
row = cell = None
Expand Down
25 changes: 12 additions & 13 deletions bigtable/google/cloud/bigtable/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
"""User-friendly container for Google Cloud Bigtable Table."""


from grpc import StatusCode

from google.api_core.exceptions import RetryError
from google.api_core.retry import if_exception_type
from google.api_core.retry import Retry
Expand All @@ -32,7 +34,6 @@
from google.cloud.bigtable.row import DirectRow
from google.cloud.bigtable.row_data import PartialRowsData
from google.cloud.bigtable.row_data import YieldRowsData
from grpc import StatusCode


# Maximum number of mutations in bulk (MutateRowsRequest message):
Expand Down Expand Up @@ -262,11 +263,10 @@ def read_row(self, row_key, filter_=None):
:raises: :class:`ValueError <exceptions.ValueError>` if a commit row
chunk is never encountered.
"""
request_pb = _create_row_request(self.name, row_key=row_key,
filter_=filter_)
request = _create_row_request(self.name, row_key=row_key,
filter_=filter_)
client = self._instance._client
response_iterator = client._data_stub.ReadRows(request_pb)
rows_data = PartialRowsData(response_iterator)
rows_data = PartialRowsData(client._data_stub.ReadRows, request)
rows_data.consume_all()
if rows_data.state not in (rows_data.NEW_ROW, rows_data.START):
raise ValueError('The row remains partial / is not committed.')
Expand Down Expand Up @@ -308,13 +308,12 @@ def read_rows(self, start_key=None, end_key=None, limit=None,
:returns: A :class:`.PartialRowsData` convenience wrapper for consuming
the streamed results.
"""
request_pb = _create_row_request(
request = _create_row_request(
self.name, start_key=start_key, end_key=end_key, filter_=filter_,
limit=limit, end_inclusive=end_inclusive)
client = self._instance._client
response_iterator = client._data_stub.ReadRows(request_pb)
# We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse`
return PartialRowsData(response_iterator)

return PartialRowsData(client._data_stub.ReadRows, request)

def yield_rows(self, start_key=None, end_key=None, limit=None,
filter_=None):
Expand Down Expand Up @@ -343,13 +342,13 @@ def yield_rows(self, start_key=None, end_key=None, limit=None,
:rtype: :class:`.PartialRowData`
:returns: A :class:`.PartialRowData` for each row returned
"""
request_pb = _create_row_request(

request = _create_row_request(
self.name, start_key=start_key, end_key=end_key, filter_=filter_,
limit=limit)
client = self._instance._client
response_iterator = client._data_stub.ReadRows(request_pb)
# We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse`
generator = YieldRowsData(response_iterator)

generator = YieldRowsData(client._data_stub.ReadRows, request)
for row in generator.read_rows():
yield row

Expand Down
Loading