diff --git a/system_tests/bigtable.py b/system_tests/bigtable.py index 122bc60a8317..31d00f36675e 100644 --- a/system_tests/bigtable.py +++ b/system_tests/bigtable.py @@ -19,6 +19,7 @@ from gcloud import _helpers from gcloud.bigtable.client import Client +from gcloud.bigtable.column_family import MaxVersionsGCRule from gcloud.environment_vars import TESTS_PROJECT @@ -27,6 +28,8 @@ NOW_MILLIS = int(1000 * time.time()) CLUSTER_ID = 'gcloud-python-%d' % (NOW_MILLIS,) TABLE_ID = 'gcloud-python-test-table' +COLUMN_FAMILY_ID1 = u'col-fam-id1' +COLUMN_FAMILY_ID2 = u'col-fam-id2' EXISTING_CLUSTERS = [] EXPECTED_ZONES = ( 'asia-east1-b', @@ -227,3 +230,65 @@ def test_rename_table(self): self.assertEqual( exc_caught.details, 'BigtableTableService.RenameTable is not yet implemented') + + def test_create_column_family(self): + temp_table_id = 'foo-bar-baz-table' + temp_table = Config.CLUSTER.table(temp_table_id) + temp_table.create() + self.tables_to_delete.append(temp_table) + + self.assertEqual(temp_table.list_column_families(), {}) + gc_rule = MaxVersionsGCRule(1) + column_family = temp_table.column_family(COLUMN_FAMILY_ID1, + gc_rule=gc_rule) + column_family.create() + + col_fams = temp_table.list_column_families() + + self.assertEqual(len(col_fams), 1) + retrieved_col_fam = col_fams[COLUMN_FAMILY_ID1] + self.assertTrue(retrieved_col_fam._table is column_family._table) + self.assertEqual(retrieved_col_fam.column_family_id, + column_family.column_family_id) + self.assertEqual(retrieved_col_fam.gc_rule, gc_rule) + + def test_update_column_family(self): + temp_table_id = 'foo-bar-baz-table' + temp_table = Config.CLUSTER.table(temp_table_id) + temp_table.create() + self.tables_to_delete.append(temp_table) + + gc_rule = MaxVersionsGCRule(1) + column_family = temp_table.column_family(COLUMN_FAMILY_ID1, + gc_rule=gc_rule) + column_family.create() + + # Check that our created table is as expected. + col_fams = temp_table.list_column_families() + self.assertEqual(col_fams, {COLUMN_FAMILY_ID1: column_family}) + + # Update the column family's GC rule and then try to update. + column_family.gc_rule = None + column_family.update() + + # Check that the update has propagated. + col_fams = temp_table.list_column_families() + self.assertEqual(col_fams[COLUMN_FAMILY_ID1].gc_rule, None) + + def test_delete_column_family(self): + temp_table_id = 'foo-bar-baz-table' + temp_table = Config.CLUSTER.table(temp_table_id) + temp_table.create() + self.tables_to_delete.append(temp_table) + + self.assertEqual(temp_table.list_column_families(), {}) + column_family = temp_table.column_family(COLUMN_FAMILY_ID1) + column_family.create() + + # Make sure the family is there before deleting it. + col_fams = temp_table.list_column_families() + self.assertEqual(list(col_fams.keys()), [COLUMN_FAMILY_ID1]) + + column_family.delete() + # Make sure we have successfully deleted it. + self.assertEqual(temp_table.list_column_families(), {})