Skip to content

Commit

Permalink
Adding HappyBase Table.scan().
Browse files Browse the repository at this point in the history
  • Loading branch information
dhermes committed Feb 26, 2016
1 parent ade1724 commit 0861755
Show file tree
Hide file tree
Showing 2 changed files with 255 additions and 10 deletions.
64 changes: 57 additions & 7 deletions gcloud/bigtable/happybase/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@


import struct
import warnings

import six

Expand All @@ -40,6 +41,7 @@
from gcloud.bigtable.table import Table as _LowLevelTable


_WARN = warnings.warn
_UNPACK_I64 = struct.Struct('>q').unpack
_SIMPLE_GC_RULES = (MaxAgeGCRule, MaxVersionsGCRule)

Expand Down Expand Up @@ -367,15 +369,63 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None,
:param kwargs: Remaining keyword arguments. Provided for HappyBase
compatibility.
:raises: :class:`ValueError <exceptions.ValueError>` if ``batch_size``
or ``scan_batching`` are used, or if ``limit`` is set but
non-positive, or if row prefix is used with row start/stop,
:raises: If ``limit`` is set but non-positive, or if row prefix is
used with row start/stop,
:class:`TypeError <exceptions.TypeError>` if a string
``filter`` is used,
:class:`NotImplementedError <exceptions.NotImplementedError>`
always (until the method is implemented).
``filter`` is used.
"""
raise NotImplementedError
legacy_args = []
for kw_name in ('batch_size', 'scan_batching', 'sorted_columns'):
if kw_name in kwargs:
legacy_args.append(kw_name)
kwargs.pop(kw_name)
if legacy_args:
legacy_args = ', '.join(legacy_args)
message = ('The HappyBase legacy arguments %s were used. These '
'arguments are unused by gcloud.' % (legacy_args,))
_WARN(message)
if kwargs:
raise TypeError('Received unexpected arguments', kwargs.keys())

if limit is not None and limit < 1:
raise ValueError('limit must be positive')
if row_prefix is not None:
if row_start is not None or row_stop is not None:
raise ValueError('row_prefix cannot be combined with '
'row_start or row_stop')
row_start = row_prefix
row_stop = _string_successor(row_prefix)

filters = []
if isinstance(filter, six.string_types):
raise TypeError('HBase filter strings not supported by Cloud '
'Bigtable. RowFilter\'s from row module may be '
'used instead.')
elif filter is not None:
filters.append(filter)

if columns is not None:
filters.append(_columns_filter_helper(columns))
# versions == 1 since we only want the latest.
filter_ = _filter_chain_helper(versions=1, timestamp=timestamp,
filters=filters)

partial_rows_data = self._low_level_table.read_rows(
start_key=row_start, end_key=row_stop,
limit=limit, filter_=filter_)

# Mutable copy of data.
rows_dict = partial_rows_data.rows
while True:
try:
partial_rows_data.consume_next()
row_key, curr_row_data = rows_dict.popitem()
# NOTE: We expect len(rows_dict) == 0, but don't check it.
curr_row_dict = _partial_row_to_dict(
curr_row_data, include_timestamp=include_timestamp)
yield (row_key, curr_row_dict)
except StopIteration:
break

def put(self, row, data, timestamp=None, wal=_WAL_SENTINEL):
"""Insert data into a row in this table.
Expand Down
201 changes: 198 additions & 3 deletions gcloud/bigtable/happybase/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,13 +507,203 @@ def mock_cells_to_pairs(*args, **kwargs):
self.assertEqual(mock_cells,
[((fake_cells,), to_pairs_kwargs)])

def test_scan(self):
def test_scan_with_batch_size(self):
from gcloud._testing import _Monkey
from gcloud.bigtable.happybase import table as MUT

warned = []

def mock_warn(msg):
warned.append(msg)

name = 'table-name'
connection = None
table = self._makeOne(name, connection)
# Use unknown to force a TypeError, so we don't need to
# stub out the rest of the method.
with self.assertRaises(TypeError):
with _Monkey(MUT, _WARN=mock_warn):
list(table.scan(batch_size=object(), unknown=None))

with self.assertRaises(NotImplementedError):
table.scan()
self.assertEqual(len(warned), 1)
self.assertIn('batch_size', warned[0])

def test_scan_with_scan_batching(self):
from gcloud._testing import _Monkey
from gcloud.bigtable.happybase import table as MUT

warned = []

def mock_warn(msg):
warned.append(msg)

name = 'table-name'
connection = None
table = self._makeOne(name, connection)
# Use unknown to force a TypeError, so we don't need to
# stub out the rest of the method.
with self.assertRaises(TypeError):
with _Monkey(MUT, _WARN=mock_warn):
list(table.scan(scan_batching=object(), unknown=None))

self.assertEqual(len(warned), 1)
self.assertIn('scan_batching', warned[0])

def test_scan_with_sorted_columns(self):
from gcloud._testing import _Monkey
from gcloud.bigtable.happybase import table as MUT

warned = []

def mock_warn(msg):
warned.append(msg)

name = 'table-name'
connection = None
table = self._makeOne(name, connection)
# Use unknown to force a TypeError, so we don't need to
# stub out the rest of the method.
with self.assertRaises(TypeError):
with _Monkey(MUT, _WARN=mock_warn):
list(table.scan(sorted_columns=object(), unknown=None))

self.assertEqual(len(warned), 1)
self.assertIn('sorted_columns', warned[0])

def test_scan_with_invalid_limit(self):
name = 'table-name'
connection = None
table = self._makeOne(name, connection)
with self.assertRaises(ValueError):
list(table.scan(limit=-10))

def test_scan_with_row_prefix_and_row_start(self):
name = 'table-name'
connection = None
table = self._makeOne(name, connection)
with self.assertRaises(ValueError):
list(table.scan(row_prefix='a', row_stop='abc'))

def test_scan_with_string_filter(self):
name = 'table-name'
connection = None
table = self._makeOne(name, connection)
with self.assertRaises(TypeError):
list(table.scan(filter='some-string'))

def _scan_test_helper(self, row_limits=(None, None), row_prefix=None,
columns=None, filter_=None, timestamp=None,
include_timestamp=False, limit=None, rr_result=None,
expected_result=None):
import types
from gcloud._testing import _Monkey
from gcloud.bigtable.happybase import table as MUT

name = 'table-name'
row_start, row_stop = row_limits
connection = None
table = self._makeOne(name, connection)
table._low_level_table = _MockLowLevelTable()
rr_result = rr_result or _MockPartialRowsData()
table._low_level_table.read_rows_result = rr_result
self.assertEqual(rr_result.consume_next_calls, 0)

# Set-up mocks.
fake_col_filter = object()
mock_columns = []

def mock_columns_filter_helper(*args):
mock_columns.append(args)
return fake_col_filter

fake_filter = object()
mock_filters = []

def mock_filter_chain_helper(**kwargs):
mock_filters.append(kwargs)
return fake_filter

with _Monkey(MUT, _filter_chain_helper=mock_filter_chain_helper,
_columns_filter_helper=mock_columns_filter_helper):
result = table.scan(row_start=row_start, row_stop=row_stop,
row_prefix=row_prefix, columns=columns,
filter=filter_, timestamp=timestamp,
include_timestamp=include_timestamp,
limit=limit)
self.assertTrue(isinstance(result, types.GeneratorType))
# Need to consume the result while the monkey patch is applied.
# read_rows_result == Empty PartialRowsData --> No results.
expected_result = expected_result or []
self.assertEqual(list(result), expected_result)

read_rows_args = ()
if row_prefix:
row_start = row_prefix
row_stop = MUT._string_successor(row_prefix)
read_rows_kwargs = {
'end_key': row_stop,
'filter_': fake_filter,
'limit': limit,
'start_key': row_start,
}
self.assertEqual(table._low_level_table.read_rows_calls, [
(read_rows_args, read_rows_kwargs),
])
self.assertEqual(rr_result.consume_next_calls,
rr_result.iterations + 1)

if columns is not None:
self.assertEqual(mock_columns, [(columns,)])
else:
self.assertEqual(mock_columns, [])

filters = []
if filter_ is not None:
filters.append(filter_)
if columns:
filters.append(fake_col_filter)
expected_kwargs = {
'filters': filters,
'versions': 1,
'timestamp': timestamp,
}
self.assertEqual(mock_filters, [expected_kwargs])

def test_scan_with_columns(self):
columns = object()
self._scan_test_helper(columns=columns)

def test_scan_with_row_start_and_stop(self):
row_start = 'bar'
row_stop = 'foo'
row_limits = (row_start, row_stop)
self._scan_test_helper(row_limits=row_limits)

def test_scan_with_row_prefix(self):
row_prefix = 'row-prefi'
self._scan_test_helper(row_prefix=row_prefix)

def test_scan_with_filter(self):
mock_filter = object()
self._scan_test_helper(filter_=mock_filter)

def test_scan_with_no_results(self):
limit = 1337
timestamp = object()
self._scan_test_helper(timestamp=timestamp, limit=limit)

def test_scan_with_results(self):
from gcloud.bigtable.row_data import PartialRowData

row_key1 = 'row-key1'
row1 = PartialRowData(row_key1)
rr_result = _MockPartialRowsData(rows={row_key1: row1}, iterations=1)

include_timestamp = object()
expected_result = [(row_key1, {})]
self._scan_test_helper(include_timestamp=include_timestamp,
rr_result=rr_result,
expected_result=expected_result)

def test_put(self):
from gcloud._testing import _Monkey
Expand Down Expand Up @@ -1292,3 +1482,8 @@ def __init__(self, rows=None, iterations=0):

def consume_all(self):
self.consume_all_calls += 1

def consume_next(self):
self.consume_next_calls += 1
if self.consume_next_calls > self.iterations:
raise StopIteration

0 comments on commit 0861755

Please sign in to comment.