From f1e4b46cd3f6a867c187f57a637ba66c5c20571f Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Wed, 24 Feb 2016 21:35:58 -0800 Subject: [PATCH 1/2] Adding HappyBase Batch.put() and helpers. --- gcloud/bigtable/happybase/batch.py | 109 +++++++++++ gcloud/bigtable/happybase/test_batch.py | 228 ++++++++++++++++++++++++ 2 files changed, 337 insertions(+) diff --git a/gcloud/bigtable/happybase/batch.py b/gcloud/bigtable/happybase/batch.py index 16eb1d4ebe76..944424011db6 100644 --- a/gcloud/bigtable/happybase/batch.py +++ b/gcloud/bigtable/happybase/batch.py @@ -18,6 +18,8 @@ import datetime import warnings +import six + from gcloud._helpers import _datetime_from_microseconds from gcloud.bigtable.row import TimestampRange @@ -105,6 +107,62 @@ def send(self): self._row_map.clear() self._mutation_count = 0 + def _try_send(self): + """Send / commit the batch if mutations have exceeded batch size.""" + if self._batch_size and self._mutation_count >= self._batch_size: + self.send() + + def _get_row(self, row_key): + """Gets a row that will hold mutations. + + If the row is not already cached on the current batch, a new row will + be created. + + :type row_key: str + :param row_key: The row key for a row stored in the map. + + :rtype: :class:`Row ` + :returns: The newly created or stored row that will hold mutations. + """ + if row_key not in self._row_map: + table = self._table._low_level_table + self._row_map[row_key] = table.row(row_key) + + return self._row_map[row_key] + + def put(self, row, data, wal=_WAL_SENTINEL): + """Insert data into a row in the table owned by this batch. + + :type row: str + :param row: The row key where the mutation will be "put". + + :type data: dict + :param data: Dictionary containing the data to be inserted. The keys + are columns names (of the form ``fam:col``) and the values + are strings (bytes) to be stored in those columns. + + :type wal: object + :param wal: Unused parameter (to over-ride the default on the + instance). Provided for compatibility with HappyBase, but + irrelevant for Cloud Bigtable since it does not have a + Write Ahead Log. + """ + if wal is not _WAL_SENTINEL: + _WARN(_WAL_WARNING) + + row_object = self._get_row(row) + # Make sure all the keys are valid before beginning + # to add mutations. + column_pairs = _get_column_pairs(six.iterkeys(data), + require_qualifier=True) + for column_family_id, column_qualifier in column_pairs: + value = data[column_family_id + ':' + column_qualifier] + row_object.set_cell(column_family_id, column_qualifier, + value, timestamp=self._timestamp) + + self._mutation_count += len(data) + self._try_send() + def __enter__(self): """Enter context manager, no set-up required.""" return self @@ -133,3 +191,54 @@ def __exit__(self, exc_type, exc_value, traceback): # NOTE: For non-transactional batches, this will even commit mutations # if an error occurred during the context manager. self.send() + + +def _get_column_pairs(columns, require_qualifier=False): + """Turns a list of column or column families into parsed pairs. + + Turns a column family (``fam`` or ``fam:``) into a pair such + as ``['fam', None]`` and turns a column (``fam:col``) into + ``['fam', 'col']``. + + :type columns: list + :param columns: Iterable containing column names (as + strings). Each column name can be either + + * an entire column family: ``fam`` or ``fam:`` + * an single column: ``fam:col`` + + :type require_qualifier: bool + :param require_qualifier: Boolean indicating if the columns should + all have a qualifier or not. + + :rtype: list + :returns: List of pairs, where the first element in each pair is the + column family and the second is the column qualifier + (or :data:`None`). + :raises: :class:`ValueError ` if any of the columns + are not of the expected format. + :class:`ValueError ` if + ``require_qualifier`` is :data:`True` and one of the values is + for an entire column family + """ + column_pairs = [] + for column in columns: + if isinstance(column, six.binary_type): + column = column.decode('utf-8') + # Remove trailing colons (i.e. for standalone column family). + if column.endswith(u':'): + column = column[:-1] + num_colons = column.count(u':') + if num_colons == 0: + # column is a column family. + if require_qualifier: + raise ValueError('column does not contain a qualifier', + column) + else: + column_pairs.append([column, None]) + elif num_colons == 1: + column_pairs.append(column.split(u':')) + else: + raise ValueError('Column contains the : separator more than once') + + return column_pairs diff --git a/gcloud/bigtable/happybase/test_batch.py b/gcloud/bigtable/happybase/test_batch.py index 0611de9a633d..fc484e71f708 100644 --- a/gcloud/bigtable/happybase/test_batch.py +++ b/gcloud/bigtable/happybase/test_batch.py @@ -122,6 +122,172 @@ def test_send(self): self.assertEqual(batch._mutation_count, 0) self.assertEqual(row_map, {}) + def test__try_send_no_batch_size(self): + klass = self._getTargetClass() + + class BatchWithSend(_SendMixin, klass): + pass + + table = object() + batch = BatchWithSend(table) + + self.assertEqual(batch._batch_size, None) + self.assertFalse(batch._send_called) + batch._try_send() + self.assertFalse(batch._send_called) + + def test__try_send_too_few_mutations(self): + klass = self._getTargetClass() + + class BatchWithSend(_SendMixin, klass): + pass + + table = object() + batch_size = 10 + batch = BatchWithSend(table, batch_size=batch_size) + + self.assertEqual(batch._batch_size, batch_size) + self.assertFalse(batch._send_called) + mutation_count = 2 + batch._mutation_count = mutation_count + self.assertTrue(mutation_count < batch_size) + batch._try_send() + self.assertFalse(batch._send_called) + + def test__try_send_actual_send(self): + klass = self._getTargetClass() + + class BatchWithSend(_SendMixin, klass): + pass + + table = object() + batch_size = 10 + batch = BatchWithSend(table, batch_size=batch_size) + + self.assertEqual(batch._batch_size, batch_size) + self.assertFalse(batch._send_called) + mutation_count = 12 + batch._mutation_count = mutation_count + self.assertTrue(mutation_count > batch_size) + batch._try_send() + self.assertTrue(batch._send_called) + + def test__get_row_exists(self): + table = object() + batch = self._makeOne(table) + + row_key = 'row-key' + row_obj = object() + batch._row_map[row_key] = row_obj + result = batch._get_row(row_key) + self.assertEqual(result, row_obj) + + def test__get_row_create_new(self): + # Make mock batch and make sure we can create a low-level table. + low_level_table = _MockLowLevelTable() + table = _MockTable(low_level_table) + batch = self._makeOne(table) + + # Make sure row map is empty. + self.assertEqual(batch._row_map, {}) + + # Customize/capture mock table creation. + low_level_table.mock_row = mock_row = object() + + # Actually get the row (which creates a row via a low-level table). + row_key = 'row-key' + result = batch._get_row(row_key) + self.assertEqual(result, mock_row) + + # Check all the things that were constructed. + self.assertEqual(low_level_table.rows_made, [row_key]) + # Check how the batch was updated. + self.assertEqual(batch._row_map, {row_key: mock_row}) + + def test_put_bad_wal(self): + from gcloud._testing import _Monkey + from gcloud.bigtable.happybase import batch as MUT + + warned = [] + + def mock_warn(message): + warned.append(message) + # Raise an exception so we don't + raise RuntimeError('No need to execute the rest.') + + table = object() + batch = self._makeOne(table) + + row = 'row-key' + data = {} + wal = None + + self.assertNotEqual(wal, MUT._WAL_SENTINEL) + with _Monkey(MUT, _WARN=mock_warn): + with self.assertRaises(RuntimeError): + batch.put(row, data, wal=wal) + + self.assertEqual(warned, [MUT._WAL_WARNING]) + + def test_put(self): + import operator + + table = object() + batch = self._makeOne(table) + batch._timestamp = timestamp = object() + row_key = 'row-key' + batch._row_map[row_key] = row = _MockRow() + + col1_fam = 'cf1' + col1_qual = 'qual1' + value1 = 'value1' + col2_fam = 'cf2' + col2_qual = 'qual2' + value2 = 'value2' + data = {col1_fam + ':' + col1_qual: value1, + col2_fam + ':' + col2_qual: value2} + + self.assertEqual(batch._mutation_count, 0) + self.assertEqual(row.set_cell_calls, []) + batch.put(row_key, data) + self.assertEqual(batch._mutation_count, 2) + # Since the calls depend on data.keys(), the order + # is non-deterministic. + first_elt = operator.itemgetter(0) + ordered_calls = sorted(row.set_cell_calls, key=first_elt) + + cell1_args = (col1_fam, col1_qual, value1) + cell1_kwargs = {'timestamp': timestamp} + cell2_args = (col2_fam, col2_qual, value2) + cell2_kwargs = {'timestamp': timestamp} + self.assertEqual(ordered_calls, [ + (cell1_args, cell1_kwargs), + (cell2_args, cell2_kwargs), + ]) + + def test_put_call_try_send(self): + klass = self._getTargetClass() + + class CallTrySend(klass): + + try_send_calls = 0 + + def _try_send(self): + self.try_send_calls += 1 + + table = object() + batch = CallTrySend(table) + + row_key = 'row-key' + batch._row_map[row_key] = _MockRow() + + self.assertEqual(batch._mutation_count, 0) + self.assertEqual(batch.try_send_calls, 0) + # No data so that nothing happens + batch.put(row_key, data={}) + self.assertEqual(batch._mutation_count, 0) + self.assertEqual(batch.try_send_calls, 1) + def test_context_manager(self): klass = self._getTargetClass() @@ -174,6 +340,45 @@ class BatchWithSend(_SendMixin, klass): self.assertTrue(batch._send_called) +class Test__get_column_pairs(unittest2.TestCase): + + def _callFUT(self, *args, **kwargs): + from gcloud.bigtable.happybase.batch import _get_column_pairs + return _get_column_pairs(*args, **kwargs) + + def test_it(self): + columns = [b'cf1', u'cf2:', 'cf3::', 'cf3:name1', 'cf3:name2'] + result = self._callFUT(columns) + expected_result = [ + ['cf1', None], + ['cf2', None], + ['cf3', ''], + ['cf3', 'name1'], + ['cf3', 'name2'], + ] + self.assertEqual(result, expected_result) + + def test_bad_column(self): + columns = ['a:b:c'] + with self.assertRaises(ValueError): + self._callFUT(columns) + + def test_bad_column_type(self): + columns = [None] + with self.assertRaises(AttributeError): + self._callFUT(columns) + + def test_bad_columns_var(self): + columns = None + with self.assertRaises(TypeError): + self._callFUT(columns) + + def test_column_family_with_require_qualifier(self): + columns = ['a:'] + with self.assertRaises(ValueError): + self._callFUT(columns, require_qualifier=True) + + class _MockRowMap(dict): clear_count = 0 @@ -187,6 +392,29 @@ class _MockRow(object): def __init__(self): self.commits = 0 + self.set_cell_calls = [] def commit(self): self.commits += 1 + + def set_cell(self, *args, **kwargs): + self.set_cell_calls.append((args, kwargs)) + + +class _MockTable(object): + + def __init__(self, low_level_table): + self._low_level_table = low_level_table + + +class _MockLowLevelTable(object): + + def __init__(self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + self.rows_made = [] + self.mock_row = None + + def row(self, row_key): + self.rows_made.append(row_key) + return self.mock_row From c088e9922f75c6ecd1882b437492620c84ccd09f Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Thu, 25 Feb 2016 12:07:01 -0800 Subject: [PATCH 2/2] Adding Batch() note about __exit__() behavior. --- gcloud/bigtable/happybase/batch.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/gcloud/bigtable/happybase/batch.py b/gcloud/bigtable/happybase/batch.py index 944424011db6..6b8006e2d3c1 100644 --- a/gcloud/bigtable/happybase/batch.py +++ b/gcloud/bigtable/happybase/batch.py @@ -35,6 +35,14 @@ class Batch(object): """Batch class for accumulating mutations. + .. note:: + + When using a batch with ``transaction=False`` as a context manager + (i.e. in a ``with`` statement), mutations will still be sent as + row mutations even if the context manager exits with an error. + This behavior is in place to match the behavior in the HappyBase + HBase / Thrift implementation. + :type table: :class:`Table ` :param table: The table where mutations will be applied.