Skip to content

Commit

Permalink
Addressed Comments. Streamlined testing.
Browse files Browse the repository at this point in the history
  • Loading branch information
FastLee committed Dec 24, 2023
1 parent 015415b commit da73f7f
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 33 deletions.
27 changes: 17 additions & 10 deletions src/databricks/labs/ucx/hive_metastore/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ def sql_alter_from(self, catalog):
)

def sql_unset_to(self, catalog):
return f"ALTER {self.kind} {catalog}.{self.database}.{self.name} UNSET TBLPROPERTIES IF EXISTS('upgraded_to');"
return (
f"ALTER {self.kind} `{catalog}`.`{self.database}`.`{self.name}` "
f"UNSET TBLPROPERTIES IF EXISTS('upgraded_to');"
)

# SQL to reset the assessment record to revert migration state
def sql_unset_to_assessment(self, schema):
Expand Down Expand Up @@ -268,17 +271,12 @@ def _init_seen_tables(self):
def _table_already_upgraded(self, target) -> bool:
return target in self._seen_tables

def revert_migrated_tables(self, *, schema: str | None = None, table: str | None = None):
def scope_filter(cur_table: Table):
def revert_migrated_tables(self, schema: str | None = None, table: str | None = None):
upgraded_tables = []
for cur_table in self._tc.snapshot():
schema_match = not schema or cur_table.database == schema
# if schema is specified matches the schema
table_match = not table or cur_table.name == table
# if table is specified matches the table
return schema_match and table_match

upgraded_tables = []
for cur_table in list(self._tc.snapshot()):
if scope_filter(cur_table) and cur_table.upgraded_to is not None:
if schema_match and table_match and cur_table.upgraded_to is not None:
upgraded_tables.append(cur_table)

for upgraded_table in upgraded_tables:
Expand All @@ -288,3 +286,12 @@ def scope_filter(cur_table: Table):
)
self._backend.execute(upgraded_table.sql_unset_to("hive_metastore"))
self._tc.unset_upgraded_to(database=schema, name=table)

def checks_upgraded_to(self, schema: str, table: str):
result = self._backend.fetch(f"SHOW TBLPROPERTIES `{schema}`.`{table}`")

Check warning on line 291 in src/databricks/labs/ucx/hive_metastore/tables.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/hive_metastore/tables.py#L291

Added line #L291 was not covered by tests
for value in result:
if value["key"] == "upgraded_to":
logger.info(f"{schema}.{table} is set as upgraded")
return True
logger.info(f"{schema}.{table} is set as not upgraded")
return False

Check warning on line 297 in src/databricks/labs/ucx/hive_metastore/tables.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/hive_metastore/tables.py#L294-L297

Added lines #L294 - L297 were not covered by tests
28 changes: 13 additions & 15 deletions tests/integration/hive_metastore/test_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,26 +115,25 @@ def test_revert_migrated_table(ws, sql_backend, inventory_schema, make_schema, m
make_table(schema_name=schema2.name),
]

for i in range(3):
logger.info(f"Applying upgraded_to table property for {tables[i].full_name}")
sql_backend.execute(f"ALTER TABLE {tables[i].full_name} set TBLPROPERTIES('upgraded_to'='FAKE DEST')")
for table in tables[0:3]:
# apply "upgraded to" table property to the first three tables.
logger.info(f"Applying upgraded_to table property for {table.full_name}")
sql_backend.execute(f"ALTER TABLE {table.full_name} set TBLPROPERTIES('upgraded_to'='FAKE DEST')")

crawler = TablesCrawler(sql_backend, inventory_schema)
tm = TablesMigrate(crawler, ws, sql_backend)
tm.revert_migrated_tables(schema=schema1.name)

def checks_upgraded_to(table_name: str):
result = sql_backend.fetch(f"SHOW TBLPROPERTIES {table_name}")
for value in result:
if value["key"] == "upgraded_to":
logger.info(f"{table_name} is set as upgraded")
return True
logger.info(f"{table_name} is set as not upgraded")
return False

# Checking that two of the tables were reverted and one was left intact.
assert [False, False, True, False] == [checks_upgraded_to(table.full_name) for table in tables]

# The first two tables belong to schema 1 and should have not "upgraded_to" property
assert not tm.checks_upgraded_to(tables[0].schema_name, tables[0].name)
assert not tm.checks_upgraded_to(tables[1].schema_name, tables[1].name)
# The third table belongs to schema 2 and should have an "upgraded_to" property set
assert tm.checks_upgraded_to(tables[2].schema_name, tables[2].name)
# The fourth table did have the "upgraded_to" property set and should remain that way.
assert not tm.checks_upgraded_to(tables[3].schema_name, tables[3].name)

# Testing that the records in the tables table reflect then new state of the tables.
assessed_tables = sql_backend.fetch(
f"SELECT database, name, upgraded_to from {inventory_schema}.tables "
f"where database in ('{schema1.name}','{schema2.name}')"
Expand All @@ -144,4 +143,3 @@ def checks_upgraded_to(table_name: str):
assert Row((tables[1].schema_name, tables[1].name, None)) in assessed_tables_list
assert Row((tables[2].schema_name, tables[2].name, "fake dest")) in assessed_tables_list
assert Row((tables[3].schema_name, tables[3].name, None)) in assessed_tables_list
assert True
23 changes: 15 additions & 8 deletions tests/unit/hive_metastore/test_migrate.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from unittest.mock import MagicMock
from unittest.mock import MagicMock, create_autospec

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import CatalogInfo, SchemaInfo, TableInfo

from databricks.labs.ucx.hive_metastore.tables import (
Expand Down Expand Up @@ -133,7 +134,7 @@ def test_tables_sql_unset_to():
upgraded_to="dest1",
)
assert table.sql_unset_to("hive_metastore") == (
"ALTER TABLE hive_metastore.test_schema1.test_table1 UNSET TBLPROPERTIES IF EXISTS('upgraded_to');"
"ALTER TABLE `hive_metastore`.`test_schema1`.`test_table1` UNSET TBLPROPERTIES IF EXISTS('upgraded_to');"
)


Expand All @@ -145,8 +146,8 @@ def test_revert_migrated_tables():
]
}
backend = MockBackend(fails_on_first=errors, rows=rows)
tc = TablesCrawler(backend, "inventory_database")
client = MagicMock()
tc = create_autospec(TablesCrawler)
client = create_autospec(WorkspaceClient)
tm = TablesMigrate(tc, client, backend, default_catalog="test_catalog")
test_tables = [
Table(
Expand Down Expand Up @@ -175,11 +176,17 @@ def test_revert_migrated_tables():
),
]

tc.snapshot = MagicMock()
tc.snapshot.return_value = test_tables
tm.revert_migrated_tables(schema="test_schema1")
assert (list(backend.queries)) == [
"ALTER TABLE hive_metastore.test_schema1.test_table1 UNSET TBLPROPERTIES IF EXISTS('upgraded_to');",
"ALTER TABLE hive_metastore.test_schema1.test_table2 UNSET TBLPROPERTIES IF EXISTS('upgraded_to');",
"UPDATE hive_metastore.inventory_database.tables SET upgraded_to=NULL WHERE database='test_schema1'",
"ALTER TABLE `hive_metastore`.`test_schema1`.`test_table1` UNSET TBLPROPERTIES IF EXISTS('upgraded_to');",
"ALTER TABLE `hive_metastore`.`test_schema1`.`test_table2` UNSET TBLPROPERTIES IF EXISTS('upgraded_to');",
]
tc.unset_upgraded_to.assert_called_with(database="test_schema1", name=None)

tc_test = TablesCrawler(backend, "inventory_database")
tc_test.unset_upgraded_to(database="test_schema1", name=None)
assert (
backend.queries[-1]
== "UPDATE hive_metastore.inventory_database.tables SET upgraded_to=NULL WHERE database='test_schema1'"
)

0 comments on commit da73f7f

Please sign in to comment.