From 08617550809da1d4f7c145110653f42edf25746c Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Thu, 25 Feb 2016 17:03:32 -0800 Subject: [PATCH] Adding HappyBase Table.scan(). --- gcloud/bigtable/happybase/table.py | 64 +++++++- gcloud/bigtable/happybase/test_table.py | 201 +++++++++++++++++++++++- 2 files changed, 255 insertions(+), 10 deletions(-) diff --git a/gcloud/bigtable/happybase/table.py b/gcloud/bigtable/happybase/table.py index a2cd8bc75be91..243bd1223f6fa 100644 --- a/gcloud/bigtable/happybase/table.py +++ b/gcloud/bigtable/happybase/table.py @@ -16,6 +16,7 @@ import struct +import warnings import six @@ -40,6 +41,7 @@ from gcloud.bigtable.table import Table as _LowLevelTable +_WARN = warnings.warn _UNPACK_I64 = struct.Struct('>q').unpack _SIMPLE_GC_RULES = (MaxAgeGCRule, MaxVersionsGCRule) @@ -367,15 +369,63 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None, :param kwargs: Remaining keyword arguments. Provided for HappyBase compatibility. - :raises: :class:`ValueError ` if ``batch_size`` - or ``scan_batching`` are used, or if ``limit`` is set but - non-positive, or if row prefix is used with row start/stop, + :raises: If ``limit`` is set but non-positive, or if row prefix is + used with row start/stop, :class:`TypeError ` if a string - ``filter`` is used, - :class:`NotImplementedError ` - always (until the method is implemented). + ``filter`` is used. """ - raise NotImplementedError + legacy_args = [] + for kw_name in ('batch_size', 'scan_batching', 'sorted_columns'): + if kw_name in kwargs: + legacy_args.append(kw_name) + kwargs.pop(kw_name) + if legacy_args: + legacy_args = ', '.join(legacy_args) + message = ('The HappyBase legacy arguments %s were used. These ' + 'arguments are unused by gcloud.' % (legacy_args,)) + _WARN(message) + if kwargs: + raise TypeError('Received unexpected arguments', kwargs.keys()) + + if limit is not None and limit < 1: + raise ValueError('limit must be positive') + if row_prefix is not None: + if row_start is not None or row_stop is not None: + raise ValueError('row_prefix cannot be combined with ' + 'row_start or row_stop') + row_start = row_prefix + row_stop = _string_successor(row_prefix) + + filters = [] + if isinstance(filter, six.string_types): + raise TypeError('HBase filter strings not supported by Cloud ' + 'Bigtable. RowFilter\'s from row module may be ' + 'used instead.') + elif filter is not None: + filters.append(filter) + + if columns is not None: + filters.append(_columns_filter_helper(columns)) + # versions == 1 since we only want the latest. + filter_ = _filter_chain_helper(versions=1, timestamp=timestamp, + filters=filters) + + partial_rows_data = self._low_level_table.read_rows( + start_key=row_start, end_key=row_stop, + limit=limit, filter_=filter_) + + # Mutable copy of data. + rows_dict = partial_rows_data.rows + while True: + try: + partial_rows_data.consume_next() + row_key, curr_row_data = rows_dict.popitem() + # NOTE: We expect len(rows_dict) == 0, but don't check it. + curr_row_dict = _partial_row_to_dict( + curr_row_data, include_timestamp=include_timestamp) + yield (row_key, curr_row_dict) + except StopIteration: + break def put(self, row, data, timestamp=None, wal=_WAL_SENTINEL): """Insert data into a row in this table. diff --git a/gcloud/bigtable/happybase/test_table.py b/gcloud/bigtable/happybase/test_table.py index be9809747a7a2..c6abb83b33d88 100644 --- a/gcloud/bigtable/happybase/test_table.py +++ b/gcloud/bigtable/happybase/test_table.py @@ -507,13 +507,203 @@ def mock_cells_to_pairs(*args, **kwargs): self.assertEqual(mock_cells, [((fake_cells,), to_pairs_kwargs)]) - def test_scan(self): + def test_scan_with_batch_size(self): + from gcloud._testing import _Monkey + from gcloud.bigtable.happybase import table as MUT + + warned = [] + + def mock_warn(msg): + warned.append(msg) + name = 'table-name' connection = None table = self._makeOne(name, connection) + # Use unknown to force a TypeError, so we don't need to + # stub out the rest of the method. + with self.assertRaises(TypeError): + with _Monkey(MUT, _WARN=mock_warn): + list(table.scan(batch_size=object(), unknown=None)) - with self.assertRaises(NotImplementedError): - table.scan() + self.assertEqual(len(warned), 1) + self.assertIn('batch_size', warned[0]) + + def test_scan_with_scan_batching(self): + from gcloud._testing import _Monkey + from gcloud.bigtable.happybase import table as MUT + + warned = [] + + def mock_warn(msg): + warned.append(msg) + + name = 'table-name' + connection = None + table = self._makeOne(name, connection) + # Use unknown to force a TypeError, so we don't need to + # stub out the rest of the method. + with self.assertRaises(TypeError): + with _Monkey(MUT, _WARN=mock_warn): + list(table.scan(scan_batching=object(), unknown=None)) + + self.assertEqual(len(warned), 1) + self.assertIn('scan_batching', warned[0]) + + def test_scan_with_sorted_columns(self): + from gcloud._testing import _Monkey + from gcloud.bigtable.happybase import table as MUT + + warned = [] + + def mock_warn(msg): + warned.append(msg) + + name = 'table-name' + connection = None + table = self._makeOne(name, connection) + # Use unknown to force a TypeError, so we don't need to + # stub out the rest of the method. + with self.assertRaises(TypeError): + with _Monkey(MUT, _WARN=mock_warn): + list(table.scan(sorted_columns=object(), unknown=None)) + + self.assertEqual(len(warned), 1) + self.assertIn('sorted_columns', warned[0]) + + def test_scan_with_invalid_limit(self): + name = 'table-name' + connection = None + table = self._makeOne(name, connection) + with self.assertRaises(ValueError): + list(table.scan(limit=-10)) + + def test_scan_with_row_prefix_and_row_start(self): + name = 'table-name' + connection = None + table = self._makeOne(name, connection) + with self.assertRaises(ValueError): + list(table.scan(row_prefix='a', row_stop='abc')) + + def test_scan_with_string_filter(self): + name = 'table-name' + connection = None + table = self._makeOne(name, connection) + with self.assertRaises(TypeError): + list(table.scan(filter='some-string')) + + def _scan_test_helper(self, row_limits=(None, None), row_prefix=None, + columns=None, filter_=None, timestamp=None, + include_timestamp=False, limit=None, rr_result=None, + expected_result=None): + import types + from gcloud._testing import _Monkey + from gcloud.bigtable.happybase import table as MUT + + name = 'table-name' + row_start, row_stop = row_limits + connection = None + table = self._makeOne(name, connection) + table._low_level_table = _MockLowLevelTable() + rr_result = rr_result or _MockPartialRowsData() + table._low_level_table.read_rows_result = rr_result + self.assertEqual(rr_result.consume_next_calls, 0) + + # Set-up mocks. + fake_col_filter = object() + mock_columns = [] + + def mock_columns_filter_helper(*args): + mock_columns.append(args) + return fake_col_filter + + fake_filter = object() + mock_filters = [] + + def mock_filter_chain_helper(**kwargs): + mock_filters.append(kwargs) + return fake_filter + + with _Monkey(MUT, _filter_chain_helper=mock_filter_chain_helper, + _columns_filter_helper=mock_columns_filter_helper): + result = table.scan(row_start=row_start, row_stop=row_stop, + row_prefix=row_prefix, columns=columns, + filter=filter_, timestamp=timestamp, + include_timestamp=include_timestamp, + limit=limit) + self.assertTrue(isinstance(result, types.GeneratorType)) + # Need to consume the result while the monkey patch is applied. + # read_rows_result == Empty PartialRowsData --> No results. + expected_result = expected_result or [] + self.assertEqual(list(result), expected_result) + + read_rows_args = () + if row_prefix: + row_start = row_prefix + row_stop = MUT._string_successor(row_prefix) + read_rows_kwargs = { + 'end_key': row_stop, + 'filter_': fake_filter, + 'limit': limit, + 'start_key': row_start, + } + self.assertEqual(table._low_level_table.read_rows_calls, [ + (read_rows_args, read_rows_kwargs), + ]) + self.assertEqual(rr_result.consume_next_calls, + rr_result.iterations + 1) + + if columns is not None: + self.assertEqual(mock_columns, [(columns,)]) + else: + self.assertEqual(mock_columns, []) + + filters = [] + if filter_ is not None: + filters.append(filter_) + if columns: + filters.append(fake_col_filter) + expected_kwargs = { + 'filters': filters, + 'versions': 1, + 'timestamp': timestamp, + } + self.assertEqual(mock_filters, [expected_kwargs]) + + def test_scan_with_columns(self): + columns = object() + self._scan_test_helper(columns=columns) + + def test_scan_with_row_start_and_stop(self): + row_start = 'bar' + row_stop = 'foo' + row_limits = (row_start, row_stop) + self._scan_test_helper(row_limits=row_limits) + + def test_scan_with_row_prefix(self): + row_prefix = 'row-prefi' + self._scan_test_helper(row_prefix=row_prefix) + + def test_scan_with_filter(self): + mock_filter = object() + self._scan_test_helper(filter_=mock_filter) + + def test_scan_with_no_results(self): + limit = 1337 + timestamp = object() + self._scan_test_helper(timestamp=timestamp, limit=limit) + + def test_scan_with_results(self): + from gcloud.bigtable.row_data import PartialRowData + + row_key1 = 'row-key1' + row1 = PartialRowData(row_key1) + rr_result = _MockPartialRowsData(rows={row_key1: row1}, iterations=1) + + include_timestamp = object() + expected_result = [(row_key1, {})] + self._scan_test_helper(include_timestamp=include_timestamp, + rr_result=rr_result, + expected_result=expected_result) def test_put(self): from gcloud._testing import _Monkey @@ -1292,3 +1482,8 @@ def __init__(self, rows=None, iterations=0): def consume_all(self): self.consume_all_calls += 1 + + def consume_next(self): + self.consume_next_calls += 1 + if self.consume_next_calls > self.iterations: + raise StopIteration