Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry for yield_rows #4882

Merged
merged 35 commits into from
Mar 20, 2018
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
d540349
Add retry for read rows
Feb 14, 2018
4aa5702
Fix line lengths
Feb 14, 2018
f7d4c69
Add parameter start_inclusive to _create_row_request
Feb 14, 2018
28892ce
Update unit test case for yield rows retry
Feb 14, 2018
8d141f0
Fix line lengths
Feb 14, 2018
902f11b
Edit table.py as requested changes
Feb 16, 2018
4afabb1
Added code review changes from @jonparrott.
Feb 19, 2018
79698b1
Fix parameter name on _RetryableReadRows class
Feb 22, 2018
9b778be
Add retry for Deadline Exceeded on read rows
Feb 23, 2018
cafc5ad
Fix line over-indented
Feb 23, 2018
f974752
Fix line over-indented
Feb 23, 2018
13adfd6
Refactore yield_rows retry
Feb 23, 2018
ca69a2f
1. Moved last_scanned_row_key into YieldRowsData.
Feb 27, 2018
02c154e
Fix line over-indented
Feb 27, 2018
b6da7fa
Fix row_data.py
Feb 27, 2018
f1a2d92
Change row.py as requested
Feb 27, 2018
65c1c4b
Create new test case for yield_rows iterator failure
Feb 28, 2018
170a173
Fix for start_inclusive on retry read rows.
Mar 1, 2018
5da3fd3
Merge branch 'master' of https://github.com/zakons/google-cloud-pytho…
Mar 1, 2018
a610431
Move retry read rows into YieldRowsData.read_rows to avoid throwing e…
Mar 8, 2018
f930b22
Refactor update of request_pb.
Mar 8, 2018
0fe27de
Add new line at the end of the file
Mar 8, 2018
2b47791
Integrate Retry for the read rows retry code.
Mar 9, 2018
14c1c96
Change default for max_count to None.
zakons Mar 12, 2018
4a6128e
Revert changes
zakons Mar 12, 2018
04533d0
fix __next__ = next
Mar 13, 2018
ba533e9
Merge branch 'feature/read_rows_retry' of https://github.com/zakons/g…
Mar 13, 2018
f81fded
Add documentation for new parameters on YieldRowsData
zakons Mar 13, 2018
5ad48d4
Working on unit test code
Mar 14, 2018
ca6f925
Refactor retry read rows using PR review comments.
Mar 16, 2018
bf9fd93
Fix lines over-indented
Mar 16, 2018
0a08c48
Update retry read rows for an edge case where iterator fails on the f…
Mar 16, 2018
67c44ad
Update docstrings for refactored methods.
zakons Mar 16, 2018
3d3477f
Remove functools
Mar 19, 2018
99fb43a
Merge branch 'feature/read_rows_retry' of https://github.com/zakons/g…
Mar 19, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bigtable/google/cloud/bigtable/row.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ def _delete_cells(self, column_family_id, columns, time_range=None,
def _retry_commit_exception(exc):
if isinstance(exc, grpc.RpcError):
exc = exceptions.from_grpc_error(exc)
return isinstance(exc, exceptions.ServiceUnavailable)
return isinstance(exc, (exceptions.ServiceUnavailable,
exceptions.DeadlineExceeded))


class DirectRow(_SetDeleteRow):
Expand Down
11 changes: 6 additions & 5 deletions bigtable/google/cloud/bigtable/row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,6 @@ def __init__(self, response_iterator):
self._response_iterator = response_iterator
# Counter for responses pulled from iterator
self._counter = 0
# Maybe cached from previous response
self._last_scanned_row_key = None
# In-progress row, unset until first response, after commit/reset
self._row = None
# Last complete row, unset until first commit
Expand All @@ -261,6 +259,8 @@ def __init__(self, response_iterator):
self._cell = None
# Last complete cell, unset until first completion, after new row
self._previous_cell = None
# May be cached from previous response
self.last_scanned_row_key = None

@property
def state(self):
Expand All @@ -270,7 +270,7 @@ def state(self):
:returns: name of state corresponding to current row / chunk
processing.
"""
if self._last_scanned_row_key is None:
if self.last_scanned_row_key is None:
return self.START
if self._row is None:
assert self._cell is None
Expand Down Expand Up @@ -301,11 +301,11 @@ def read_rows(self):

self._counter += 1

if self._last_scanned_row_key is None: # first response
if self.last_scanned_row_key is None: # first response
if response.last_scanned_row_key:
raise InvalidReadRowsResponse()

self._last_scanned_row_key = response.last_scanned_row_key
self.last_scanned_row_key = response.last_scanned_row_key

row = self._row
cell = self._cell
Expand Down Expand Up @@ -343,6 +343,7 @@ def read_rows(self):

yield self._row

self.last_scanned_row_key = self._row.row_key
self._row, self._previous_row = None, self._row
self._previous_cell = None
row = cell = None
Expand Down
85 changes: 73 additions & 12 deletions bigtable/google/cloud/bigtable/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
"""User-friendly container for Google Cloud Bigtable Table."""


import functools
import logging

from grpc import StatusCode

from google.api_core.exceptions import RetryError
from google.api_core.retry import if_exception_type
from google.api_core.retry import Retry
Expand All @@ -27,18 +32,19 @@
table_pb2 as table_v2_pb2)
from google.cloud.bigtable.column_family import _gc_rule_from_pb
from google.cloud.bigtable.column_family import ColumnFamily
from google.cloud.bigtable.row import _retry_commit_exception
from google.cloud.bigtable.row import AppendRow
from google.cloud.bigtable.row import ConditionalRow
from google.cloud.bigtable.row import DirectRow
from google.cloud.bigtable.row_data import PartialRowsData
from google.cloud.bigtable.row_data import YieldRowsData
from grpc import StatusCode


# Maximum number of mutations in bulk (MutateRowsRequest message):
# (https://cloud.google.com/bigtable/docs/reference/data/rpc/
# google.bigtable.v2#google.bigtable.v2.MutateRowRequest)
_MAX_BULK_MUTATIONS = 100000
logging.getLogger().setLevel(logging.INFO)

This comment was marked as spam.

This comment was marked as spam.



class _BigtableRetryableError(Exception):
Expand Down Expand Up @@ -317,7 +323,7 @@ def read_rows(self, start_key=None, end_key=None, limit=None,
return PartialRowsData(response_iterator)

def yield_rows(self, start_key=None, end_key=None, limit=None,
filter_=None):
filter_=None, retry=True):
"""Read rows from this table.

:type start_key: bytes
Expand All @@ -340,17 +346,18 @@ def yield_rows(self, start_key=None, end_key=None, limit=None,
specified row(s). If unset, reads every column in
each row.

:type retry: bool
:param retry: (Optional) Apply retry on read rows.

:rtype: :class:`.PartialRowData`
:returns: A :class:`.PartialRowData` for each row returned
"""
request_pb = _create_row_request(
self.name, start_key=start_key, end_key=end_key, filter_=filter_,
limit=limit)
client = self._instance._client
response_iterator = client._data_stub.ReadRows(request_pb)
# We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse`
generator = YieldRowsData(response_iterator)
for row in generator.read_rows():

retryable_read_rows = _RetryableReadRows(
self._instance._client, self.name, start_key,
end_key, filter_, limit, retry)

for row in retryable_read_rows():
yield row

def mutate_rows(self, rows, retry=DEFAULT_RETRY):
Expand Down Expand Up @@ -532,8 +539,59 @@ def _do_mutate_retryable_rows(self):
return self.responses_statuses


class _RetryableReadRows(object):
"""A callable worker that can retry to read rows with transient errors.

This class is a callable that can retry reading rows that result in
transient errors.
"""

def __init__(self, client, table_name, start_key,
end_key, filter_, limit, retry):
self.generator = None
self.client = client
self.table_name = table_name
self.start_key = start_key
self.end_key = end_key
self.filter_ = filter_
self.limit = limit
self.retry = retry

def __call__(self):
if self.retry:
read_rows = functools.partial(self._do_read_retryable_rows)
retry_ = Retry(
predicate=_retry_commit_exception,
deadline=30)
return retry_(read_rows)()
else:
return self._do_read_retryable_rows()

def _do_read_retryable_rows(self):

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

if (self.generator and self.generator.last_scanned_row_key):
next_start_key = self.generator.last_scanned_row_key
logging.info('Start key is {} for retry read rows.'
.format(next_start_key))
else:
next_start_key = self.start_key

request_pb = _create_row_request(
self.table_name,
start_key=next_start_key,
end_key=self.end_key,
filter_=self.filter_,
limit=self.limit,
start_inclusive=False)
client = self.client
response_iterator = client._data_stub.ReadRows(request_pb)
self.generator = YieldRowsData(response_iterator)
for row in self.generator.read_rows():
yield row


def _create_row_request(table_name, row_key=None, start_key=None, end_key=None,
filter_=None, limit=None, end_inclusive=False):
filter_=None, limit=None,
end_inclusive=False, start_inclusive=True):
"""Creates a request to read rows in a table.

:type table_name: str
Expand Down Expand Up @@ -578,7 +636,10 @@ def _create_row_request(table_name, row_key=None, start_key=None, end_key=None,
range_kwargs = {}
if start_key is not None or end_key is not None:
if start_key is not None:
range_kwargs['start_key_closed'] = _to_bytes(start_key)
start_key_key = 'start_key_open'
if start_inclusive:
start_key_key = 'start_key_closed'
range_kwargs[start_key_key] = _to_bytes(start_key)
if end_key is not None:
end_key_key = 'end_key_open'
if end_inclusive:
Expand Down
6 changes: 3 additions & 3 deletions bigtable/tests/unit/test_row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def test_state_start(self):

def test_state_new_row_w_row(self):
yrd = self._make_one([])
yrd._last_scanned_row_key = ''
yrd.last_scanned_row_key = ''
yrd._row = object()
self.assertEqual(yrd.state, yrd.NEW_ROW)

Expand Down Expand Up @@ -370,9 +370,9 @@ def test_valid_last_scanned_row_key_on_start(self):
chunks=(), last_scanned_row_key='AFTER')
iterator = _MockCancellableIterator(response)
yrd = self._make_one(iterator)
yrd._last_scanned_row_key = 'BEFORE'
yrd.last_scanned_row_key = 'BEFORE'
self._consume_all(yrd)
self.assertEqual(yrd._last_scanned_row_key, 'AFTER')
self.assertEqual(yrd.last_scanned_row_key, 'AFTER')

def test_invalid_empty_chunk(self):
from google.cloud.bigtable.row_data import InvalidChunk
Expand Down
137 changes: 130 additions & 7 deletions bigtable/tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ class TestTable(unittest.TestCase):
TABLE_ID = 'table-id'
TABLE_NAME = INSTANCE_NAME + '/tables/' + TABLE_ID
ROW_KEY = b'row-key'
ROW_KEY_1 = b'row-key-1'
ROW_KEY_2 = b'row-key-2'
FAMILY_NAME = u'family'
QUALIFIER = b'qualifier'
TIMESTAMP_MICROS = 100
Expand Down Expand Up @@ -541,18 +543,109 @@ def test_yield_rows(self):
chunks = [chunk]

response = _ReadRowsResponseV2(chunks)
response_iterator = _MockCancellableIterator(response)
response_iterator = _MockReadRowsIterator(response)

# Patch the stub used by the API method.
client._data_stub = _FakeStub(response_iterator)

generator = table.yield_rows(retry=False)
rows = []
for row in table.yield_rows():
for row in generator:
rows.append(row)
result = rows[0]

self.assertEqual(result.row_key, self.ROW_KEY)

def test_yield_retry_rows_with_response_exception(self):
# import grpc

client = _Client()
instance = _Instance(self.INSTANCE_NAME, client=client)
table = self._make_one(self.TABLE_ID, instance)

# class ErrorUnavailable(grpc.RpcError, grpc.Call):
# """ErrorUnavailable exception"""
#
# def code(self):
# return grpc.StatusCode.UNAVAILABLE
#
# def details(self):
# return 'Endpoint read failed'
#
# class ErrorDeadlineExceeded(grpc.RpcError, grpc.Call):
# """ErrorDeadlineExceeded exception"""
#
# def code(self):
# return grpc.StatusCode.DEADLINE_EXCEEDED
#
# def details(self):
# return 'Error while reading table'

# Create response_iterator
chunk = _ReadRowsResponseCellChunkPB(
row_key=self.ROW_KEY,
family_name=self.FAMILY_NAME,
qualifier=self.QUALIFIER,
timestamp_micros=self.TIMESTAMP_MICROS,
value=self.VALUE,
commit_row=True
)

response = _ReadRowsResponseV2([chunk])
response_iterator= _MockReadRowsIterator(response)

# Patch the stub used by the API method.
client._data_stub = mock.MagicMock()
client._data_stub.ReadRows.side_effect = [response_iterator]

rows = []
for row in table.yield_rows():
rows.append(row)

result = rows[0]
self.assertEqual(result.row_key, self.ROW_KEY)

def test_yield_retry_rows(self):
client = _Client()
instance = _Instance(self.INSTANCE_NAME, client=client)
table = self._make_one(self.TABLE_ID, instance)

# Create response_iterator
chunk_1 = _ReadRowsResponseCellChunkPB(
row_key=self.ROW_KEY_1,
family_name=self.FAMILY_NAME,
qualifier=self.QUALIFIER,
timestamp_micros=self.TIMESTAMP_MICROS,
value=self.VALUE,
commit_row=True
)

chunk_2 = _ReadRowsResponseCellChunkPB(
row_key=self.ROW_KEY_2,
family_name=self.FAMILY_NAME,
qualifier=self.QUALIFIER,
timestamp_micros=self.TIMESTAMP_MICROS,
value=self.VALUE,
commit_row=True
)

response_1 = _ReadRowsResponseV2([chunk_1])
response_2 = _ReadRowsResponseV2([chunk_2])
response_retryable_iterator = _MockRetryableIterator([response_1,
response_2])

# Patch the stub used by the API method.
client._data_stub = mock.MagicMock()
client._data_stub.ReadRows.side_effect = [response_retryable_iterator]

rows = []
for row in table.yield_rows(start_key=self.ROW_KEY_1,
end_key=self.ROW_KEY_2):
rows.append(row)

result = rows[1]
self.assertEqual(result.row_key, self.ROW_KEY_2)

def test_sample_row_keys(self):
from tests.unit._testing import _FakeStub

Expand Down Expand Up @@ -1218,10 +1311,7 @@ def __init__(self, name, client=None):
self._client = client


class _MockCancellableIterator(object):

cancel_calls = 0

class _MockReadRowsIterator(object):
def __init__(self, *values):
self.iter_values = iter(values)

Expand All @@ -1232,8 +1322,41 @@ def __next__(self): # pragma: NO COVER Py3k
return self.next()


class _MockRetryableIterator(object):

def __init__(self, *values):
self.iter_values = values[0]
self.calls = 0

def next(self):
# import grpc
#
# class ErrorUnavailable(grpc.RpcError, grpc.Call):
# """ErrorUnavailable exception"""
#
# def code(self):
# return grpc.StatusCode.UNAVAILABLE
#
# def details(self):
# return 'Endpoint read failed'

self.calls += 1
if self.calls == 1:
first = self.iter_values[0]
return first
# elif self.calls == 2:
# raise ErrorUnavailable()
elif self.calls == 2:
return self.iter_values[1]
else:
raise StopIteration()

def __next__(self): # pragma: NO COVER Py3k

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

return self.next()


class _ReadRowsResponseV2(object):

def __init__(self, chunks, last_scanned_row_key=''):
self.chunks = chunks
self.last_scanned_row_key = last_scanned_row_key
self.last_scanned_row_key = last_scanned_row_key