From e6e0b1c8e15bc91b0aa5307e01e9664e8e5b5947 Mon Sep 17 00:00:00 2001 From: Jon Wayne Parrott Date: Thu, 20 Jul 2017 09:47:41 -0700 Subject: [PATCH] Revert "RPC retries (second PR) (#3324)" This reverts commit 67f4ba47069146a9b93005e38046eb2cd59b150a. --- bigtable/google/cloud/bigtable/retry.py | 169 ------------------- bigtable/google/cloud/bigtable/row_data.py | 3 - bigtable/google/cloud/bigtable/table.py | 101 +++++++---- bigtable/tests/retry_test_script.txt | 38 ----- bigtable/tests/system.py | 78 --------- bigtable/tests/unit/_testing.py | 27 +-- bigtable/tests/unit/test_table.py | 185 ++------------------- 7 files changed, 81 insertions(+), 520 deletions(-) delete mode 100644 bigtable/google/cloud/bigtable/retry.py delete mode 100644 bigtable/tests/retry_test_script.txt diff --git a/bigtable/google/cloud/bigtable/retry.py b/bigtable/google/cloud/bigtable/retry.py deleted file mode 100644 index f20419ce4f8e..000000000000 --- a/bigtable/google/cloud/bigtable/retry.py +++ /dev/null @@ -1,169 +0,0 @@ -"""Provides function wrappers that implement retrying.""" -import random -import time -import six -import sys - -from google.cloud._helpers import _to_bytes -from google.cloud.bigtable._generated import ( - bigtable_pb2 as data_messages_v2_pb2) -from google.gax import config, errors -from grpc import RpcError - - -_MILLIS_PER_SECOND = 1000 - - -class ReadRowsIterator(object): - """Creates an iterator equivalent to a_iter, but that retries on certain - exceptions. - """ - - def __init__(self, client, name, start_key, end_key, filter_, limit, - retry_options, **kwargs): - self.client = client - self.retry_options = retry_options - self.name = name - self.start_key = start_key - self.start_key_closed = True - self.end_key = end_key - self.filter_ = filter_ - self.limit = limit - self.delay_mult = retry_options.backoff_settings.retry_delay_multiplier - self.max_delay_millis = \ - retry_options.backoff_settings.max_retry_delay_millis - self.timeout_mult = \ - retry_options.backoff_settings.rpc_timeout_multiplier - self.max_timeout = \ - (retry_options.backoff_settings.max_rpc_timeout_millis / - _MILLIS_PER_SECOND) - self.total_timeout = \ - (retry_options.backoff_settings.total_timeout_millis / - _MILLIS_PER_SECOND) - self.set_stream() - - def set_start_key(self, start_key): - """ - Sets the row key at which this iterator will begin reading. - """ - self.start_key = start_key - self.start_key_closed = False - - def set_stream(self): - """ - Resets the read stream by making an RPC on the 'ReadRows' endpoint. - """ - req_pb = _create_row_request(self.name, start_key=self.start_key, - start_key_closed=self.start_key_closed, - end_key=self.end_key, - filter_=self.filter_, limit=self.limit) - self.stream = self.client._data_stub.ReadRows(req_pb) - - def next(self, *args, **kwargs): - """ - Read and return the next row from the stream. - Retry on idempotent failure. - """ - delay = self.retry_options.backoff_settings.initial_retry_delay_millis - exc = errors.RetryError('Retry total timeout exceeded before any' - 'response was received') - timeout = (self.retry_options.backoff_settings - .initial_rpc_timeout_millis / - _MILLIS_PER_SECOND) - - now = time.time() - deadline = now + self.total_timeout - while deadline is None or now < deadline: - try: - return six.next(self.stream) - except StopIteration as stop: - raise stop - except RpcError as error: # pylint: disable=broad-except - code = config.exc_to_code(error) - if code not in self.retry_options.retry_codes: - six.reraise(type(error), error) - - # pylint: disable=redefined-variable-type - exc = errors.RetryError( - 'Retry total timeout exceeded with exception', error) - - # Sleep a random number which will, on average, equal the - # expected delay. - to_sleep = random.uniform(0, delay * 2) - time.sleep(to_sleep / _MILLIS_PER_SECOND) - delay = min(delay * self.delay_mult, self.max_delay_millis) - now = time.time() - timeout = min( - timeout * self.timeout_mult, self.max_timeout, - deadline - now) - self.set_stream() - - six.reraise(errors.RetryError, exc, sys.exc_info()[2]) - - def __next__(self, *args, **kwargs): - return self.next(*args, **kwargs) - - -def _create_row_request(table_name, row_key=None, start_key=None, - start_key_closed=True, end_key=None, filter_=None, - limit=None): - """Creates a request to read rows in a table. - - :type table_name: str - :param table_name: The name of the table to read from. - - :type row_key: bytes - :param row_key: (Optional) The key of a specific row to read from. - - :type start_key: bytes - :param start_key: (Optional) The beginning of a range of row keys to - read from. The range will include ``start_key``. If - left empty, will be interpreted as the empty string. - - :type end_key: bytes - :param end_key: (Optional) The end of a range of row keys to read from. - The range will not include ``end_key``. If left empty, - will be interpreted as an infinite string. - - :type filter_: :class:`.RowFilter` - :param filter_: (Optional) The filter to apply to the contents of the - specified row(s). If unset, reads the entire table. - - :type limit: int - :param limit: (Optional) The read will terminate after committing to N - rows' worth of results. The default (zero) is to return - all results. - - :rtype: :class:`data_messages_v2_pb2.ReadRowsRequest` - :returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs. - :raises: :class:`ValueError ` if both - ``row_key`` and one of ``start_key`` and ``end_key`` are set - """ - request_kwargs = {'table_name': table_name} - if (row_key is not None and - (start_key is not None or end_key is not None)): - raise ValueError('Row key and row range cannot be ' - 'set simultaneously') - range_kwargs = {} - if start_key is not None or end_key is not None: - if start_key is not None: - if start_key_closed: - range_kwargs['start_key_closed'] = _to_bytes(start_key) - else: - range_kwargs['start_key_open'] = _to_bytes(start_key) - if end_key is not None: - range_kwargs['end_key_open'] = _to_bytes(end_key) - if filter_ is not None: - request_kwargs['filter'] = filter_.to_pb() - if limit is not None: - request_kwargs['rows_limit'] = limit - - message = data_messages_v2_pb2.ReadRowsRequest(**request_kwargs) - - if row_key is not None: - message.rows.row_keys.append(_to_bytes(row_key)) - - if range_kwargs: - message.rows.row_ranges.add(**range_kwargs) - - return message diff --git a/bigtable/google/cloud/bigtable/row_data.py b/bigtable/google/cloud/bigtable/row_data.py index 0849e681b7e6..78179db25c4e 100644 --- a/bigtable/google/cloud/bigtable/row_data.py +++ b/bigtable/google/cloud/bigtable/row_data.py @@ -274,9 +274,6 @@ def consume_next(self): self._validate_chunk(chunk) - if hasattr(self._response_iterator, 'set_start_key'): - self._response_iterator.set_start_key(chunk.row_key) - if chunk.reset_row: row = self._row = None cell = self._cell = self._previous_cell = None diff --git a/bigtable/google/cloud/bigtable/table.py b/bigtable/google/cloud/bigtable/table.py index ad6fab88dcf9..40ef3a2ca2fb 100644 --- a/bigtable/google/cloud/bigtable/table.py +++ b/bigtable/google/cloud/bigtable/table.py @@ -17,6 +17,7 @@ import six +from google.cloud._helpers import _to_bytes from google.cloud.bigtable._generated import ( bigtable_pb2 as data_messages_v2_pb2) from google.cloud.bigtable._generated import ( @@ -29,26 +30,6 @@ from google.cloud.bigtable.row import ConditionalRow from google.cloud.bigtable.row import DirectRow from google.cloud.bigtable.row_data import PartialRowsData -from google.gax import RetryOptions, BackoffSettings -from google.cloud.bigtable.retry import ReadRowsIterator, _create_row_request -from grpc import StatusCode - -BACKOFF_SETTINGS = BackoffSettings( - initial_retry_delay_millis=10, - retry_delay_multiplier=1.3, - max_retry_delay_millis=30000, - initial_rpc_timeout_millis=25 * 60 * 1000, - rpc_timeout_multiplier=1.0, - max_rpc_timeout_millis=25 * 60 * 1000, - total_timeout_millis=30 * 60 * 1000 -) - -RETRY_CODES = [ - StatusCode.DEADLINE_EXCEEDED, - StatusCode.ABORTED, - StatusCode.INTERNAL, - StatusCode.UNAVAILABLE -] # Maximum number of mutations in bulk (MutateRowsRequest message): @@ -276,7 +257,7 @@ def read_row(self, row_key, filter_=None): return rows_data.rows[row_key] def read_rows(self, start_key=None, end_key=None, limit=None, - filter_=None, backoff_settings=None): + filter_=None): """Read rows from this table. :type start_key: bytes @@ -303,18 +284,13 @@ def read_rows(self, start_key=None, end_key=None, limit=None, :returns: A :class:`.PartialRowsData` convenience wrapper for consuming the streamed results. """ + request_pb = _create_row_request( + self.name, start_key=start_key, end_key=end_key, filter_=filter_, + limit=limit) client = self._instance._client - if backoff_settings is None: - backoff_settings = BACKOFF_SETTINGS - RETRY_OPTIONS = RetryOptions( - retry_codes=RETRY_CODES, - backoff_settings=backoff_settings - ) - - retrying_iterator = ReadRowsIterator(client, self.name, start_key, - end_key, filter_, limit, - RETRY_OPTIONS) - return PartialRowsData(retrying_iterator) + response_iterator = client._data_stub.ReadRows(request_pb) + # We expect an iterator of `data_messages_v2_pb2.ReadRowsResponse` + return PartialRowsData(response_iterator) def mutate_rows(self, rows): """Mutates multiple rows in bulk. @@ -383,6 +359,67 @@ def sample_row_keys(self): return response_iterator +def _create_row_request(table_name, row_key=None, start_key=None, end_key=None, + filter_=None, limit=None): + """Creates a request to read rows in a table. + + :type table_name: str + :param table_name: The name of the table to read from. + + :type row_key: bytes + :param row_key: (Optional) The key of a specific row to read from. + + :type start_key: bytes + :param start_key: (Optional) The beginning of a range of row keys to + read from. The range will include ``start_key``. If + left empty, will be interpreted as the empty string. + + :type end_key: bytes + :param end_key: (Optional) The end of a range of row keys to read from. + The range will not include ``end_key``. If left empty, + will be interpreted as an infinite string. + + :type filter_: :class:`.RowFilter` + :param filter_: (Optional) The filter to apply to the contents of the + specified row(s). If unset, reads the entire table. + + :type limit: int + :param limit: (Optional) The read will terminate after committing to N + rows' worth of results. The default (zero) is to return + all results. + + :rtype: :class:`data_messages_v2_pb2.ReadRowsRequest` + :returns: The ``ReadRowsRequest`` protobuf corresponding to the inputs. + :raises: :class:`ValueError ` if both + ``row_key`` and one of ``start_key`` and ``end_key`` are set + """ + request_kwargs = {'table_name': table_name} + if (row_key is not None and + (start_key is not None or end_key is not None)): + raise ValueError('Row key and row range cannot be ' + 'set simultaneously') + range_kwargs = {} + if start_key is not None or end_key is not None: + if start_key is not None: + range_kwargs['start_key_closed'] = _to_bytes(start_key) + if end_key is not None: + range_kwargs['end_key_open'] = _to_bytes(end_key) + if filter_ is not None: + request_kwargs['filter'] = filter_.to_pb() + if limit is not None: + request_kwargs['rows_limit'] = limit + + message = data_messages_v2_pb2.ReadRowsRequest(**request_kwargs) + + if row_key is not None: + message.rows.row_keys.append(_to_bytes(row_key)) + + if range_kwargs: + message.rows.row_ranges.add(**range_kwargs) + + return message + + def _mutate_rows_request(table_name, rows): """Creates a request to mutate rows in a table. diff --git a/bigtable/tests/retry_test_script.txt b/bigtable/tests/retry_test_script.txt deleted file mode 100644 index 863662e897ba..000000000000 --- a/bigtable/tests/retry_test_script.txt +++ /dev/null @@ -1,38 +0,0 @@ -# This retry script is processed by the retry server and the client under test. -# Client tests should parse any command beginning with "CLIENT:", send the corresponding RPC -# to the retry server and expect a valid response. -# "EXPECT" commands indicate the call the server is expecting the client to send. -# -# The retry server has one table named "table" that should be used for testing. -# There are three types of commands supported: -# READ -# Expect the corresponding rows to be returned with arbitrary values. -# SCAN ... -# Ranges are expressed as an interval with either open or closed start and end, -# such as [1,3) for "1,2" or (1, 3] for "2,3". -# WRITE -# All writes should succeed eventually. Value payload is ignored. -# The server writes PASS or FAIL on a line by itself to STDOUT depending on the result of the test. -# All other server output should be ignored. - -# Echo same scan back after immediate error -CLIENT: SCAN [r1,r3) r1,r2 -EXPECT: SCAN [r1,r3) -SERVER: ERROR Unavailable -EXPECT: SCAN [r1,r3) -SERVER: READ_RESPONSE r1,r2 - -# Retry scans with open interval starting at the least read row key. -# Instead of using open intervals for retry ranges, '\x00' can be -# appended to the last received row key and sent in a closed interval. -CLIENT: SCAN [r1,r9) r1,r2,r3,r4,r5,r6,r7,r8 -EXPECT: SCAN [r1,r9) -SERVER: READ_RESPONSE r1,r2,r3,r4 -SERVER: ERROR Unavailable -EXPECT: SCAN (r4,r9) -SERVER: ERROR Unavailable -EXPECT: SCAN (r4,r9) -SERVER: READ_RESPONSE r5,r6,r7 -SERVER: ERROR Unavailable -EXPECT: SCAN (r7,r9) -SERVER: READ_RESPONSE r8 diff --git a/bigtable/tests/system.py b/bigtable/tests/system.py index 5a5b4324cbbe..1fcda808db39 100644 --- a/bigtable/tests/system.py +++ b/bigtable/tests/system.py @@ -295,84 +295,6 @@ def test_delete_column_family(self): # Make sure we have successfully deleted it. self.assertEqual(temp_table.list_column_families(), {}) - def test_retry(self): - import subprocess, os, stat, platform - from google.cloud.bigtable.client import Client - from google.cloud.bigtable.instance import Instance - from google.cloud.bigtable.table import Table - - # import for urlopen based on version - try: - # python 3 - from urllib.request import urlopen - except ImportError: - # python 2 - from urllib2 import urlopen - - - TEST_SCRIPT = 'tests/retry_test_script.txt' - SERVER_NAME = 'retry_server' - SERVER_ZIP = SERVER_NAME + ".tar.gz" - - def process_scan(table, range, ids): - range_chunks = range.split(",") - range_open = range_chunks[0].lstrip("[") - range_close = range_chunks[1].rstrip(")") - rows = table.read_rows(range_open, range_close) - rows.consume_all() - - # Download server - MOCK_SERVER_URLS = { - 'Linux': 'https://storage.googleapis.com/cloud-bigtable-test/retries/retry_server_linux.tar.gz', - 'Darwin': 'https://storage.googleapis.com/cloud-bigtable-test/retries/retry_server_mac.tar.gz', - } - - test_platform = platform.system() - if test_platform not in MOCK_SERVER_URLS: - self.skip('Retry server not available for platform {0}.'.format(test_platform)) - - mock_server_download = urlopen(MOCK_SERVER_URLS[test_platform]).read() - mock_server_file = open(SERVER_ZIP, 'wb') - mock_server_file.write(mock_server_download) - - # Unzip server - subprocess.call(['tar', 'zxvf', SERVER_ZIP, '-C', '.']) - - # Connect to server - server = subprocess.Popen( - ['./' + SERVER_NAME, '--script=' + TEST_SCRIPT], - stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - - (endpoint, port) = server.stdout.readline().rstrip("\n").split(":") - os.environ["BIGTABLE_EMULATOR_HOST"] = endpoint + ":" + port - client = Client(project="client", admin=True) - instance = Instance("instance", client) - table = instance.table("table") - - # Run test, line by line - with open(TEST_SCRIPT, 'r') as script: - for line in script.readlines(): - if line.startswith("CLIENT:"): - chunks = line.split(" ") - op = chunks[1] - process_scan(table, chunks[2], chunks[3]) - - # Check that the test passed - server.kill() - server_stdout_lines = [] - while True: - line = server.stdout.readline() - if line != '': - server_stdout_lines.append(line) - else: - break - self.assertEqual(server_stdout_lines[-1], "PASS\n") - - # Clean up - os.remove(SERVER_ZIP) - os.remove(SERVER_NAME) class TestDataAPI(unittest.TestCase): diff --git a/bigtable/tests/unit/_testing.py b/bigtable/tests/unit/_testing.py index 7587c66c133b..e67af6a1498c 100644 --- a/bigtable/tests/unit/_testing.py +++ b/bigtable/tests/unit/_testing.py @@ -14,6 +14,7 @@ """Mocks used to emulate gRPC generated objects.""" + class _FakeStub(object): """Acts as a gPRC stub.""" @@ -26,16 +27,6 @@ def __getattr__(self, name): # since __getattribute__ will handle them. return _MethodMock(name, self) -class _CustomFakeStub(object): - """Acts as a gRPC stub. Generates a result using an injected callable.""" - def __init__(self, result_callable): - self.result_callable = result_callable - self.method_calls = [] - - def __getattr__(self, name): - # We need not worry about attributes set in constructor - # since __getattribute__ will handle them. - return _CustomMethodMock(name, self) class _MethodMock(object): """Mock for API method attached to a gRPC stub. @@ -51,19 +42,5 @@ def __call__(self, *args, **kwargs): """Sync method meant to mock a gRPC stub request.""" self._stub.method_calls.append((self._name, args, kwargs)) curr_result, self._stub.results = (self._stub.results[0], - self._stub.results[1:]) + self._stub.results[1:]) return curr_result - -class _CustomMethodMock(object): - """ - Same as _MethodMock, but backed by an injected callable. - """ - - def __init__(self, name, stub): - self._name = name - self._stub = stub - - def __call__(self, *args, **kwargs): - """Sync method meant to mock a gRPC stub request.""" - self._stub.method_calls.append((self._name, args, kwargs)) - return self._stub.result_callable() diff --git a/bigtable/tests/unit/test_table.py b/bigtable/tests/unit/test_table.py index c59667d6a821..dc4d2b5bbad0 100644 --- a/bigtable/tests/unit/test_table.py +++ b/bigtable/tests/unit/test_table.py @@ -493,8 +493,7 @@ def test_read_rows(self): from google.cloud._testing import _Monkey from tests.unit._testing import _FakeStub from google.cloud.bigtable.row_data import PartialRowsData - from google.cloud.bigtable import retry as MUT - from google.cloud.bigtable.retry import ReadRowsIterator + from google.cloud.bigtable import table as MUT client = _Client() instance = _Instance(self.INSTANCE_NAME, client=client) @@ -514,18 +513,20 @@ def mock_create_row_request(table_name, **kwargs): # Patch the stub used by the API method. client._data_stub = stub = _FakeStub(response_iterator) + # Create expected_result. + expected_result = PartialRowsData(response_iterator) + + # Perform the method and check the result. start_key = b'start-key' end_key = b'end-key' filter_obj = object() limit = 22 with _Monkey(MUT, _create_row_request=mock_create_row_request): - # Perform the method and check the result. result = table.read_rows( start_key=start_key, end_key=end_key, filter_=filter_obj, limit=limit) - self.assertIsInstance(result._response_iterator, ReadRowsIterator) - self.assertEqual(result._response_iterator.client, client) + self.assertEqual(result, expected_result) self.assertEqual(stub.method_calls, [( 'ReadRows', (request_pb,), @@ -536,166 +537,9 @@ def mock_create_row_request(table_name, **kwargs): 'end_key': end_key, 'filter_': filter_obj, 'limit': limit, - 'start_key_closed': True, } self.assertEqual(mock_created, [(table.name, created_kwargs)]) - def test_read_rows_one_chunk(self): - from google.cloud._testing import _Monkey - from tests.unit._testing import _FakeStub - from google.cloud.bigtable import retry as MUT - from google.cloud.bigtable.retry import ReadRowsIterator - from google.cloud.bigtable.row_data import Cell - from google.cloud.bigtable.row_data import PartialRowsData - - client = _Client() - instance = _Instance(self.INSTANCE_NAME, client=client) - table = self._make_one(self.TABLE_ID, instance) - - # Create request_pb - request_pb = object() # Returned by our mock. - mock_created = [] - - def mock_create_row_request(table_name, **kwargs): - mock_created.append((table_name, kwargs)) - return request_pb - - # Create response_iterator - chunk = _ReadRowsResponseCellChunkPB( - row_key=self.ROW_KEY, - family_name=self.FAMILY_NAME, - qualifier=self.QUALIFIER, - timestamp_micros=self.TIMESTAMP_MICROS, - value=self.VALUE, - commit_row=True, - ) - response_pb = _ReadRowsResponsePB(chunks=[chunk]) - response_iterator = iter([response_pb]) - - # Patch the stub used by the API method. - client._data_stub = stub = _FakeStub(response_iterator) - - start_key = b'start-key' - end_key = b'end-key' - filter_obj = object() - limit = 22 - with _Monkey(MUT, _create_row_request=mock_create_row_request): - # Perform the method and check the result. - result = table.read_rows( - start_key=start_key, end_key=end_key, filter_=filter_obj, - limit=limit) - result.consume_all() - - def test_read_rows_retry_timeout(self): - from google.cloud._testing import _Monkey - from tests.unit._testing import _CustomFakeStub - from google.cloud.bigtable.row_data import PartialRowsData - from google.cloud.bigtable import retry as MUT - from google.cloud.bigtable.retry import ReadRowsIterator - from google.gax import BackoffSettings - from google.gax.errors import RetryError - from grpc import StatusCode, RpcError - import time - - client = _Client() - instance = _Instance(self.INSTANCE_NAME, client=client) - table = self._make_one(self.TABLE_ID, instance) - - # Create request_pb - request_pb = object() # Returned by our mock. - mock_created = [] - - def mock_create_row_request(table_name, **kwargs): - mock_created.append((table_name, kwargs)) - return request_pb - - # Create a slow response iterator to cause a timeout - class MockTimeoutError(RpcError): - def code(self): - return StatusCode.DEADLINE_EXCEEDED - - def _wait_then_raise(): - time.sleep(0.1) - raise MockTimeoutError() - - # Patch the stub used by the API method. The stub should create a new - # slow_iterator every time its queried. - def make_slow_iterator(): - return (_wait_then_raise() for i in range(10)) - client._data_stub = stub = _CustomFakeStub(make_slow_iterator) - - # Set to timeout before RPC completes - test_backoff_settings = BackoffSettings( - initial_retry_delay_millis=10, - retry_delay_multiplier=0.3, - max_retry_delay_millis=30000, - initial_rpc_timeout_millis=1000, - rpc_timeout_multiplier=1.0, - max_rpc_timeout_millis=25 * 60 * 1000, - total_timeout_millis=1000 - ) - - start_key = b'start-key' - end_key = b'end-key' - filter_obj = object() - limit = 22 - with _Monkey(MUT, _create_row_request=mock_create_row_request): - # Verify that a RetryError is thrown on read. - result = table.read_rows( - start_key=start_key, end_key=end_key, filter_=filter_obj, - limit=limit, backoff_settings=test_backoff_settings) - with self.assertRaises(RetryError): - result.consume_next() - - def test_read_rows_non_idempotent_error_throws(self): - from google.cloud._testing import _Monkey - from tests.unit._testing import _CustomFakeStub - from google.cloud.bigtable.row_data import PartialRowsData - from google.cloud.bigtable import retry as MUT - from google.cloud.bigtable.retry import ReadRowsIterator - from google.gax import BackoffSettings - from google.gax.errors import RetryError - from grpc import StatusCode, RpcError - import time - - client = _Client() - instance = _Instance(self.INSTANCE_NAME, client=client) - table = self._make_one(self.TABLE_ID, instance) - - # Create request_pb - request_pb = object() # Returned by our mock. - mock_created = [] - - def mock_create_row_request(table_name, **kwargs): - mock_created.append((table_name, kwargs)) - return request_pb - - # Create response iterator that raises a non-idempotent exception - class MockNonIdempotentError(RpcError): - def code(self): - return StatusCode.RESOURCE_EXHAUSTED - - def _raise(): - raise MockNonIdempotentError() - - # Patch the stub used by the API method. The stub should create a new - # slow_iterator every time its queried. - def make_raising_iterator(): - return (_raise() for i in range(10)) - client._data_stub = stub = _CustomFakeStub(make_raising_iterator) - - start_key = b'start-key' - end_key = b'end-key' - filter_obj = object() - limit = 22 - with _Monkey(MUT, _create_row_request=mock_create_row_request): - # Verify that a RetryError is thrown on read. - result = table.read_rows( - start_key=start_key, end_key=end_key, filter_=filter_obj, - limit=limit) - with self.assertRaises(MockNonIdempotentError): - result.consume_next() - def test_sample_row_keys(self): from tests.unit._testing import _FakeStub @@ -728,12 +572,12 @@ def test_sample_row_keys(self): class Test__create_row_request(unittest.TestCase): def _call_fut(self, table_name, row_key=None, start_key=None, end_key=None, - start_key_closed=True, filter_=None, limit=None): - from google.cloud.bigtable.retry import _create_row_request + filter_=None, limit=None): + from google.cloud.bigtable.table import _create_row_request return _create_row_request( table_name, row_key=row_key, start_key=start_key, end_key=end_key, - start_key_closed=start_key_closed, filter_=filter_, limit=limit) + filter_=filter_, limit=limit) def test_table_name_only(self): table_name = 'table_name' @@ -756,7 +600,7 @@ def test_row_key(self): expected_result.rows.row_keys.append(row_key) self.assertEqual(result, expected_result) - def test_row_range_start_key_closed(self): + def test_row_range_start_key(self): table_name = 'table_name' start_key = b'start_key' result = self._call_fut(table_name, start_key=start_key) @@ -764,15 +608,6 @@ def test_row_range_start_key_closed(self): expected_result.rows.row_ranges.add(start_key_closed=start_key) self.assertEqual(result, expected_result) - def test_row_range_start_key_open(self): - table_name = 'table_name' - start_key = b'start_key' - result = self._call_fut(table_name, start_key=start_key, - start_key_closed=False) - expected_result = _ReadRowsRequestPB(table_name=table_name) - expected_result.rows.row_ranges.add(start_key_open=start_key) - self.assertEqual(result, expected_result) - def test_row_range_end_key(self): table_name = 'table_name' end_key = b'end_key'