Skip to content

Commit

Permalink
Merge pull request #1533 from dhermes/happybase-batch-put
Browse files Browse the repository at this point in the history
Adding HappyBase Batch.put() and helpers.
  • Loading branch information
dhermes committed Feb 25, 2016
2 parents bf87e92 + c088e99 commit 8b49f25
Show file tree
Hide file tree
Showing 2 changed files with 345 additions and 0 deletions.
117 changes: 117 additions & 0 deletions gcloud/bigtable/happybase/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import datetime
import warnings

import six

from gcloud._helpers import _datetime_from_microseconds
from gcloud.bigtable.row import TimestampRange

Expand All @@ -33,6 +35,14 @@
class Batch(object):
"""Batch class for accumulating mutations.
.. note::
When using a batch with ``transaction=False`` as a context manager
(i.e. in a ``with`` statement), mutations will still be sent as
row mutations even if the context manager exits with an error.
This behavior is in place to match the behavior in the HappyBase
HBase / Thrift implementation.
:type table: :class:`Table <gcloud.bigtable.happybase.table.Table>`
:param table: The table where mutations will be applied.
Expand Down Expand Up @@ -105,6 +115,62 @@ def send(self):
self._row_map.clear()
self._mutation_count = 0

def _try_send(self):
"""Send / commit the batch if mutations have exceeded batch size."""
if self._batch_size and self._mutation_count >= self._batch_size:
self.send()

def _get_row(self, row_key):
"""Gets a row that will hold mutations.
If the row is not already cached on the current batch, a new row will
be created.
:type row_key: str
:param row_key: The row key for a row stored in the map.
:rtype: :class:`Row <gcloud.bigtable.row.Row>`
:returns: The newly created or stored row that will hold mutations.
"""
if row_key not in self._row_map:
table = self._table._low_level_table
self._row_map[row_key] = table.row(row_key)

return self._row_map[row_key]

def put(self, row, data, wal=_WAL_SENTINEL):
"""Insert data into a row in the table owned by this batch.
:type row: str
:param row: The row key where the mutation will be "put".
:type data: dict
:param data: Dictionary containing the data to be inserted. The keys
are columns names (of the form ``fam:col``) and the values
are strings (bytes) to be stored in those columns.
:type wal: object
:param wal: Unused parameter (to over-ride the default on the
instance). Provided for compatibility with HappyBase, but
irrelevant for Cloud Bigtable since it does not have a
Write Ahead Log.
"""
if wal is not _WAL_SENTINEL:
_WARN(_WAL_WARNING)

row_object = self._get_row(row)
# Make sure all the keys are valid before beginning
# to add mutations.
column_pairs = _get_column_pairs(six.iterkeys(data),
require_qualifier=True)
for column_family_id, column_qualifier in column_pairs:
value = data[column_family_id + ':' + column_qualifier]
row_object.set_cell(column_family_id, column_qualifier,
value, timestamp=self._timestamp)

self._mutation_count += len(data)
self._try_send()

def __enter__(self):
"""Enter context manager, no set-up required."""
return self
Expand Down Expand Up @@ -133,3 +199,54 @@ def __exit__(self, exc_type, exc_value, traceback):
# NOTE: For non-transactional batches, this will even commit mutations
# if an error occurred during the context manager.
self.send()


def _get_column_pairs(columns, require_qualifier=False):
"""Turns a list of column or column families into parsed pairs.
Turns a column family (``fam`` or ``fam:``) into a pair such
as ``['fam', None]`` and turns a column (``fam:col``) into
``['fam', 'col']``.
:type columns: list
:param columns: Iterable containing column names (as
strings). Each column name can be either
* an entire column family: ``fam`` or ``fam:``
* an single column: ``fam:col``
:type require_qualifier: bool
:param require_qualifier: Boolean indicating if the columns should
all have a qualifier or not.
:rtype: list
:returns: List of pairs, where the first element in each pair is the
column family and the second is the column qualifier
(or :data:`None`).
:raises: :class:`ValueError <exceptions.ValueError>` if any of the columns
are not of the expected format.
:class:`ValueError <exceptions.ValueError>` if
``require_qualifier`` is :data:`True` and one of the values is
for an entire column family
"""
column_pairs = []
for column in columns:
if isinstance(column, six.binary_type):
column = column.decode('utf-8')
# Remove trailing colons (i.e. for standalone column family).
if column.endswith(u':'):
column = column[:-1]
num_colons = column.count(u':')
if num_colons == 0:
# column is a column family.
if require_qualifier:
raise ValueError('column does not contain a qualifier',
column)
else:
column_pairs.append([column, None])
elif num_colons == 1:
column_pairs.append(column.split(u':'))
else:
raise ValueError('Column contains the : separator more than once')

return column_pairs
228 changes: 228 additions & 0 deletions gcloud/bigtable/happybase/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,172 @@ def test_send(self):
self.assertEqual(batch._mutation_count, 0)
self.assertEqual(row_map, {})

def test__try_send_no_batch_size(self):
klass = self._getTargetClass()

class BatchWithSend(_SendMixin, klass):
pass

table = object()
batch = BatchWithSend(table)

self.assertEqual(batch._batch_size, None)
self.assertFalse(batch._send_called)
batch._try_send()
self.assertFalse(batch._send_called)

def test__try_send_too_few_mutations(self):
klass = self._getTargetClass()

class BatchWithSend(_SendMixin, klass):
pass

table = object()
batch_size = 10
batch = BatchWithSend(table, batch_size=batch_size)

self.assertEqual(batch._batch_size, batch_size)
self.assertFalse(batch._send_called)
mutation_count = 2
batch._mutation_count = mutation_count
self.assertTrue(mutation_count < batch_size)
batch._try_send()
self.assertFalse(batch._send_called)

def test__try_send_actual_send(self):
klass = self._getTargetClass()

class BatchWithSend(_SendMixin, klass):
pass

table = object()
batch_size = 10
batch = BatchWithSend(table, batch_size=batch_size)

self.assertEqual(batch._batch_size, batch_size)
self.assertFalse(batch._send_called)
mutation_count = 12
batch._mutation_count = mutation_count
self.assertTrue(mutation_count > batch_size)
batch._try_send()
self.assertTrue(batch._send_called)

def test__get_row_exists(self):
table = object()
batch = self._makeOne(table)

row_key = 'row-key'
row_obj = object()
batch._row_map[row_key] = row_obj
result = batch._get_row(row_key)
self.assertEqual(result, row_obj)

def test__get_row_create_new(self):
# Make mock batch and make sure we can create a low-level table.
low_level_table = _MockLowLevelTable()
table = _MockTable(low_level_table)
batch = self._makeOne(table)

# Make sure row map is empty.
self.assertEqual(batch._row_map, {})

# Customize/capture mock table creation.
low_level_table.mock_row = mock_row = object()

# Actually get the row (which creates a row via a low-level table).
row_key = 'row-key'
result = batch._get_row(row_key)
self.assertEqual(result, mock_row)

# Check all the things that were constructed.
self.assertEqual(low_level_table.rows_made, [row_key])
# Check how the batch was updated.
self.assertEqual(batch._row_map, {row_key: mock_row})

def test_put_bad_wal(self):
from gcloud._testing import _Monkey
from gcloud.bigtable.happybase import batch as MUT

warned = []

def mock_warn(message):
warned.append(message)
# Raise an exception so we don't
raise RuntimeError('No need to execute the rest.')

table = object()
batch = self._makeOne(table)

row = 'row-key'
data = {}
wal = None

self.assertNotEqual(wal, MUT._WAL_SENTINEL)
with _Monkey(MUT, _WARN=mock_warn):
with self.assertRaises(RuntimeError):
batch.put(row, data, wal=wal)

self.assertEqual(warned, [MUT._WAL_WARNING])

def test_put(self):
import operator

table = object()
batch = self._makeOne(table)
batch._timestamp = timestamp = object()
row_key = 'row-key'
batch._row_map[row_key] = row = _MockRow()

col1_fam = 'cf1'
col1_qual = 'qual1'
value1 = 'value1'
col2_fam = 'cf2'
col2_qual = 'qual2'
value2 = 'value2'
data = {col1_fam + ':' + col1_qual: value1,
col2_fam + ':' + col2_qual: value2}

self.assertEqual(batch._mutation_count, 0)
self.assertEqual(row.set_cell_calls, [])
batch.put(row_key, data)
self.assertEqual(batch._mutation_count, 2)
# Since the calls depend on data.keys(), the order
# is non-deterministic.
first_elt = operator.itemgetter(0)
ordered_calls = sorted(row.set_cell_calls, key=first_elt)

cell1_args = (col1_fam, col1_qual, value1)
cell1_kwargs = {'timestamp': timestamp}
cell2_args = (col2_fam, col2_qual, value2)
cell2_kwargs = {'timestamp': timestamp}
self.assertEqual(ordered_calls, [
(cell1_args, cell1_kwargs),
(cell2_args, cell2_kwargs),
])

def test_put_call_try_send(self):
klass = self._getTargetClass()

class CallTrySend(klass):

try_send_calls = 0

def _try_send(self):
self.try_send_calls += 1

table = object()
batch = CallTrySend(table)

row_key = 'row-key'
batch._row_map[row_key] = _MockRow()

self.assertEqual(batch._mutation_count, 0)
self.assertEqual(batch.try_send_calls, 0)
# No data so that nothing happens
batch.put(row_key, data={})
self.assertEqual(batch._mutation_count, 0)
self.assertEqual(batch.try_send_calls, 1)

def test_context_manager(self):
klass = self._getTargetClass()

Expand Down Expand Up @@ -174,6 +340,45 @@ class BatchWithSend(_SendMixin, klass):
self.assertTrue(batch._send_called)


class Test__get_column_pairs(unittest2.TestCase):

def _callFUT(self, *args, **kwargs):
from gcloud.bigtable.happybase.batch import _get_column_pairs
return _get_column_pairs(*args, **kwargs)

def test_it(self):
columns = [b'cf1', u'cf2:', 'cf3::', 'cf3:name1', 'cf3:name2']
result = self._callFUT(columns)
expected_result = [
['cf1', None],
['cf2', None],
['cf3', ''],
['cf3', 'name1'],
['cf3', 'name2'],
]
self.assertEqual(result, expected_result)

def test_bad_column(self):
columns = ['a:b:c']
with self.assertRaises(ValueError):
self._callFUT(columns)

def test_bad_column_type(self):
columns = [None]
with self.assertRaises(AttributeError):
self._callFUT(columns)

def test_bad_columns_var(self):
columns = None
with self.assertRaises(TypeError):
self._callFUT(columns)

def test_column_family_with_require_qualifier(self):
columns = ['a:']
with self.assertRaises(ValueError):
self._callFUT(columns, require_qualifier=True)


class _MockRowMap(dict):

clear_count = 0
Expand All @@ -187,6 +392,29 @@ class _MockRow(object):

def __init__(self):
self.commits = 0
self.set_cell_calls = []

def commit(self):
self.commits += 1

def set_cell(self, *args, **kwargs):
self.set_cell_calls.append((args, kwargs))


class _MockTable(object):

def __init__(self, low_level_table):
self._low_level_table = low_level_table


class _MockLowLevelTable(object):

def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
self.rows_made = []
self.mock_row = None

def row(self, row_key):
self.rows_made.append(row_key)
return self.mock_row

0 comments on commit 8b49f25

Please sign in to comment.