From 5659f767a545a8b1b60653a3ad8e13305490157a Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Thu, 25 Feb 2016 12:38:19 -0800 Subject: [PATCH] Adding remaining HappyBase table helpers. --- gcloud/bigtable/happybase/table.py | 121 ++++++++++++++- gcloud/bigtable/happybase/test_table.py | 195 ++++++++++++++++++++++++ 2 files changed, 315 insertions(+), 1 deletion(-) diff --git a/gcloud/bigtable/happybase/table.py b/gcloud/bigtable/happybase/table.py index 7da6c43bd230..4e5c0b9c250a 100644 --- a/gcloud/bigtable/happybase/table.py +++ b/gcloud/bigtable/happybase/table.py @@ -26,10 +26,18 @@ from gcloud.bigtable.column_family import GCRuleIntersection from gcloud.bigtable.column_family import MaxAgeGCRule from gcloud.bigtable.column_family import MaxVersionsGCRule +from gcloud.bigtable.happybase.batch import _get_column_pairs from gcloud.bigtable.happybase.batch import _WAL_SENTINEL from gcloud.bigtable.happybase.batch import Batch -from gcloud.bigtable.table import Table as _LowLevelTable +from gcloud.bigtable.row import CellsColumnLimitFilter +from gcloud.bigtable.row import ColumnQualifierRegexFilter +from gcloud.bigtable.row import FamilyNameRegexFilter +from gcloud.bigtable.row import RowFilterChain +from gcloud.bigtable.row import RowFilterUnion +from gcloud.bigtable.row import RowKeyRegexFilter from gcloud.bigtable.row import TimestampRange +from gcloud.bigtable.row import TimestampRangeFilter +from gcloud.bigtable.table import Table as _LowLevelTable _UNPACK_I64 = struct.Struct('>q').unpack @@ -717,5 +725,116 @@ def _partial_row_to_dict(partial_row_data, include_timestamp=False): for column, cells in six.iteritems(partial_row_data.to_dict()): cell_vals = _cells_to_pairs(cells, include_timestamp=include_timestamp) + # NOTE: We assume there is exactly 1 version since we used that in + # our filter, but we don't check this. result[column] = cell_vals[0] return result + + +def _filter_chain_helper(column=None, versions=None, timestamp=None, + filters=None): + """Create filter chain to limit a results set. + + :type column: str + :param column: (Optional) The column (``fam:col``) to be selected + with the filter. + + :type versions: int + :param versions: (Optional) The maximum number of cells to return. + + :type timestamp: int + :param timestamp: (Optional) Timestamp (in milliseconds since the + epoch). If specified, only cells returned before (or + at) the timestamp will be matched. + + :type filters: list + :param filters: (Optional) List of existing filters to be extended. + + :rtype: :class:`.RowFilter` + :returns: The chained filter created, or just a single filter if only + one was needed. + :raises: :class:`ValueError ` if there are no + filters to chain. + """ + if filters is None: + filters = [] + + if column is not None: + if isinstance(column, six.binary_type): + column = column.decode('utf-8') + column_family_id, column_qualifier = column.split(':') + fam_filter = FamilyNameRegexFilter(column_family_id) + qual_filter = ColumnQualifierRegexFilter(column_qualifier) + filters.extend([fam_filter, qual_filter]) + if versions is not None: + filters.append(CellsColumnLimitFilter(versions)) + time_range = _convert_to_time_range(timestamp=timestamp) + if time_range is not None: + filters.append(TimestampRangeFilter(time_range)) + + num_filters = len(filters) + if num_filters == 0: + raise ValueError('Must have at least one filter.') + elif num_filters == 1: + return filters[0] + else: + return RowFilterChain(filters=filters) + + +def _columns_filter_helper(columns): + """Creates a union filter for a list of columns. + + :type columns: list + :param columns: Iterable containing column names (as strings). Each column + name can be either + + * an entire column family: ``fam`` or ``fam:`` + * an single column: ``fam:col`` + + :rtype: :class:`.RowFilter` + :returns: The union filter created containing all of the matched columns. + :raises: :class:`ValueError ` if there are no + filters to union. + """ + filters = [] + for column_family_id, column_qualifier in _get_column_pairs(columns): + fam_filter = FamilyNameRegexFilter(column_family_id) + if column_qualifier is not None: + qual_filter = ColumnQualifierRegexFilter(column_qualifier) + combined_filter = RowFilterChain( + filters=[fam_filter, qual_filter]) + filters.append(combined_filter) + else: + filters.append(fam_filter) + + num_filters = len(filters) + if num_filters == 0: + raise ValueError('Must have at least one filter.') + elif num_filters == 1: + return filters[0] + else: + return RowFilterUnion(filters=filters) + + +def _row_keys_filter_helper(row_keys): + """Creates a union filter for a list of rows. + + :type row_keys: list + :param row_keys: Iterable containing row keys (as strings). + + :rtype: :class:`.RowFilter` + :returns: The union filter created containing all of the row keys. + :raises: :class:`ValueError ` if there are no + filters to union. + """ + filters = [] + for row_key in row_keys: + filters.append(RowKeyRegexFilter(row_key)) + + num_filters = len(filters) + if num_filters == 0: + raise ValueError('Must have at least one filter.') + elif num_filters == 1: + return filters[0] + else: + return RowFilterUnion(filters=filters) diff --git a/gcloud/bigtable/happybase/test_table.py b/gcloud/bigtable/happybase/test_table.py index bbd7ac32c4f3..2e66d19e9340 100644 --- a/gcloud/bigtable/happybase/test_table.py +++ b/gcloud/bigtable/happybase/test_table.py @@ -579,6 +579,201 @@ def test_with_timestamp(self): self.assertEqual(result, expected_result) +class Test__filter_chain_helper(unittest2.TestCase): + + def _callFUT(self, *args, **kwargs): + from gcloud.bigtable.happybase.table import _filter_chain_helper + return _filter_chain_helper(*args, **kwargs) + + def test_no_filters(self): + with self.assertRaises(ValueError): + self._callFUT() + + def test_single_filter(self): + from gcloud.bigtable.row import CellsColumnLimitFilter + + versions = 1337 + result = self._callFUT(versions=versions) + self.assertTrue(isinstance(result, CellsColumnLimitFilter)) + # Relies on the fact that RowFilter instances can + # only have one value set. + self.assertEqual(result.num_cells, versions) + + def test_existing_filters(self): + from gcloud.bigtable.row import CellsColumnLimitFilter + + filters = [] + versions = 1337 + result = self._callFUT(versions=versions, filters=filters) + # Make sure filters has grown. + self.assertEqual(filters, [result]) + + self.assertTrue(isinstance(result, CellsColumnLimitFilter)) + # Relies on the fact that RowFilter instances can + # only have one value set. + self.assertEqual(result.num_cells, versions) + + def _column_helper(self, num_filters, versions=None, timestamp=None, + column=None, col_fam=None, qual=None): + from gcloud.bigtable.row import ColumnQualifierRegexFilter + from gcloud.bigtable.row import FamilyNameRegexFilter + from gcloud.bigtable.row import RowFilterChain + + if col_fam is None: + col_fam = 'cf1' + if qual is None: + qual = 'qual' + if column is None: + column = col_fam + ':' + qual + result = self._callFUT(column, versions=versions, timestamp=timestamp) + self.assertTrue(isinstance(result, RowFilterChain)) + + self.assertEqual(len(result.filters), num_filters) + fam_filter = result.filters[0] + qual_filter = result.filters[1] + self.assertTrue(isinstance(fam_filter, FamilyNameRegexFilter)) + self.assertTrue(isinstance(qual_filter, ColumnQualifierRegexFilter)) + + # Relies on the fact that RowFilter instances can + # only have one value set. + self.assertEqual(fam_filter.regex, col_fam) + self.assertEqual(qual_filter.regex, qual) + + return result + + def test_column_only(self): + self._column_helper(num_filters=2) + + def test_column_bytes(self): + self._column_helper(num_filters=2, column=b'cfB:qualY', + col_fam=u'cfB', qual=u'qualY') + + def test_column_unicode(self): + self._column_helper(num_filters=2, column=u'cfU:qualN', + col_fam=u'cfU', qual=u'qualN') + + def test_with_versions(self): + from gcloud.bigtable.row import CellsColumnLimitFilter + + versions = 11 + result = self._column_helper(num_filters=3, versions=versions) + + version_filter = result.filters[2] + self.assertTrue(isinstance(version_filter, CellsColumnLimitFilter)) + # Relies on the fact that RowFilter instances can + # only have one value set. + self.assertEqual(version_filter.num_cells, versions) + + def test_with_timestamp(self): + from gcloud._helpers import _datetime_from_microseconds + from gcloud.bigtable.row import TimestampRange + from gcloud.bigtable.row import TimestampRangeFilter + + timestamp = 1441928298571 + result = self._column_helper(num_filters=3, timestamp=timestamp) + + range_filter = result.filters[2] + self.assertTrue(isinstance(range_filter, TimestampRangeFilter)) + # Relies on the fact that RowFilter instances can + # only have one value set. + time_range = range_filter.range_ + self.assertTrue(isinstance(time_range, TimestampRange)) + self.assertEqual(time_range.start, None) + ts_dt = _datetime_from_microseconds(1000 * timestamp) + self.assertEqual(time_range.end, ts_dt) + + def test_with_all_options(self): + versions = 11 + timestamp = 1441928298571 + self._column_helper(num_filters=4, versions=versions, + timestamp=timestamp) + + +class Test__columns_filter_helper(unittest2.TestCase): + + def _callFUT(self, *args, **kwargs): + from gcloud.bigtable.happybase.table import _columns_filter_helper + return _columns_filter_helper(*args, **kwargs) + + def test_no_columns(self): + columns = [] + with self.assertRaises(ValueError): + self._callFUT(columns) + + def test_single_column(self): + from gcloud.bigtable.row import FamilyNameRegexFilter + + col_fam = 'cf1' + columns = [col_fam] + result = self._callFUT(columns) + expected_result = FamilyNameRegexFilter(col_fam) + self.assertEqual(result, expected_result) + + def test_column_and_column_families(self): + from gcloud.bigtable.row import ColumnQualifierRegexFilter + from gcloud.bigtable.row import FamilyNameRegexFilter + from gcloud.bigtable.row import RowFilterChain + from gcloud.bigtable.row import RowFilterUnion + + col_fam1 = 'cf1' + col_fam2 = 'cf2' + col_qual2 = 'qual2' + columns = [col_fam1, col_fam2 + ':' + col_qual2] + result = self._callFUT(columns) + + self.assertTrue(isinstance(result, RowFilterUnion)) + self.assertEqual(len(result.filters), 2) + filter1 = result.filters[0] + filter2 = result.filters[1] + + self.assertTrue(isinstance(filter1, FamilyNameRegexFilter)) + self.assertEqual(filter1.regex, col_fam1) + + self.assertTrue(isinstance(filter2, RowFilterChain)) + filter2a, filter2b = filter2.filters + self.assertTrue(isinstance(filter2a, FamilyNameRegexFilter)) + self.assertEqual(filter2a.regex, col_fam2) + self.assertTrue(isinstance(filter2b, ColumnQualifierRegexFilter)) + self.assertEqual(filter2b.regex, col_qual2) + + +class Test__row_keys_filter_helper(unittest2.TestCase): + + def _callFUT(self, *args, **kwargs): + from gcloud.bigtable.happybase.table import _row_keys_filter_helper + return _row_keys_filter_helper(*args, **kwargs) + + def test_no_rows(self): + row_keys = [] + with self.assertRaises(ValueError): + self._callFUT(row_keys) + + def test_single_row(self): + from gcloud.bigtable.row import RowKeyRegexFilter + + row_key = b'row-key' + row_keys = [row_key] + result = self._callFUT(row_keys) + expected_result = RowKeyRegexFilter(row_key) + self.assertEqual(result, expected_result) + + def test_many_rows(self): + from gcloud.bigtable.row import RowFilterUnion + from gcloud.bigtable.row import RowKeyRegexFilter + + row_key1 = b'row-key1' + row_key2 = b'row-key2' + row_key3 = b'row-key3' + row_keys = [row_key1, row_key2, row_key3] + result = self._callFUT(row_keys) + + filter1 = RowKeyRegexFilter(row_key1) + filter2 = RowKeyRegexFilter(row_key2) + filter3 = RowKeyRegexFilter(row_key3) + expected_result = RowFilterUnion(filters=[filter1, filter2, filter3]) + self.assertEqual(result, expected_result) + + class _Connection(object): def __init__(self, cluster):