diff --git a/labs.yml b/labs.yml index 93495e1f17..4f609a96ce 100644 --- a/labs.yml +++ b/labs.yml @@ -66,3 +66,17 @@ commands: - name: delete_managed description: Revert and delete managed tables + + - name: move + description: move tables across schema/catalog withing a UC metastore + flags: + - name: from-catalog + description: from catalog name + - name: from-schema + description: schema name to migrate. + - name: from-table + description: table names to migrate. enter * to migrate all tables + - name: to-catalog + description: target catalog to migrate schema to + - name: to-schema + description: target schema to migrate tables to diff --git a/src/databricks/labs/ucx/cli.py b/src/databricks/labs/ucx/cli.py index 0f3207cd8a..fd7c5352ac 100644 --- a/src/databricks/labs/ucx/cli.py +++ b/src/databricks/labs/ucx/cli.py @@ -11,7 +11,7 @@ from databricks.labs.ucx.framework.crawlers import StatementExecutionBackend from databricks.labs.ucx.hive_metastore import ExternalLocations, TablesCrawler from databricks.labs.ucx.hive_metastore.mapping import TableMapping -from databricks.labs.ucx.hive_metastore.table_migrate import TablesMigrate +from databricks.labs.ucx.hive_metastore.table_migrate import TableMove, TablesMigrate from databricks.labs.ucx.install import WorkspaceInstaller from databricks.labs.ucx.installer import InstallationManager @@ -165,5 +165,38 @@ def revert_migrated_tables(w: WorkspaceClient, schema: str, table: str, *, delet tm.revert_migrated_tables(schema, table, delete_managed=delete_managed) +@ucx.command +def move( + w: WorkspaceClient, + from_catalog: str, + from_schema: str, + from_table: str, + to_catalog: str, + to_schema: str, +): + """move a uc table/tables from one schema to another schema in same or different catalog""" + logger.info("Running move command") + prompts = Prompts() + installation_manager = InstallationManager(w) + installation = installation_manager.for_user(w.current_user.me()) + if not installation: + logger.error(CANT_FIND_UCX_MSG) + return + sql_backend = StatementExecutionBackend(w, installation.config.warehouse_id) + tables = TableMove(w, sql_backend) + if from_catalog == "" or to_catalog == "": + logger.error("Please enter from_catalog and to_catalog details") + return + if from_schema == "" or to_schema == "" or from_table == "": + logger.error("Please enter from_schema, to_schema and from_table(enter * for migrating all tables) details.") + return + if from_catalog == to_catalog and from_schema == to_schema: + logger.error("please select a different schema or catalog to migrate to") + return + del_table = prompts.confirm(f"should we delete tables/view after moving to new schema {to_catalog}.{to_schema}") + logger.info(f"migrating tables {from_table} from {from_catalog}.{from_schema} to {to_catalog}.{to_schema}") + tables.move_tables(from_catalog, from_schema, from_table, to_catalog, to_schema, del_table) + + if "__main__" == __name__: ucx() diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index 0704bda1e3..8ad2705b2c 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -4,6 +4,8 @@ from databricks.labs.blueprint.parallel import Threads from databricks.sdk import WorkspaceClient +from databricks.sdk.errors import NotFound +from databricks.sdk.service.catalog import PermissionsChange, SecurableType, TableType from databricks.labs.ucx.framework.crawlers import SqlBackend from databricks.labs.ucx.hive_metastore import TablesCrawler @@ -182,3 +184,135 @@ def print_revert_report(self, *, delete_managed: bool) -> bool | None: print("Migrated Manged Tables (targets) will be left intact.") print("To revert and delete Migrated Tables, add --delete_managed true flag to the command.") return True + + +class TableMove: + def __init__(self, ws: WorkspaceClient, backend: SqlBackend): + self._backend = backend + self._ws = ws + + def move_tables( + self, + from_catalog: str, + from_schema: str, + from_table: str, + to_catalog: str, + to_schema: str, + del_table: bool, # noqa: FBT001 + ): + try: + self._ws.schemas.get(f"{from_catalog}.{from_schema}") + except NotFound: + logger.error(f"schema {from_schema} not found in catalog {from_catalog}, enter correct schema details.") + return + try: + self._ws.schemas.get(f"{to_catalog}.{to_schema}") + except NotFound: + logger.warning(f"schema {to_schema} not found in {to_catalog}, creating...") + self._ws.schemas.create(to_schema, to_catalog) + + tables = self._ws.tables.list(from_catalog, from_schema) + table_tasks = [] + view_tasks = [] + filtered_tables = [table for table in tables if from_table in [table.name, "*"]] + for table in filtered_tables: + try: + self._ws.tables.get(f"{to_catalog}.{to_schema}.{table.name}") + logger.warning( + f"table {from_table} already present in {from_catalog}.{from_schema}. skipping this table..." + ) + continue + except NotFound: + if table.table_type and table.table_type in (TableType.EXTERNAL, TableType.MANAGED): + table_tasks.append( + partial( + self._move_table, from_catalog, from_schema, table.name, to_catalog, to_schema, del_table + ) + ) + else: + view_tasks.append( + partial( + self._move_view, + from_catalog, + from_schema, + table.name, + to_catalog, + to_schema, + del_table, + table.view_definition, + ) + ) + Threads.strict("creating tables", table_tasks) + logger.info(f"moved {len(list(table_tasks))} tables to the new schema {to_schema}.") + Threads.strict("creating views", view_tasks) + logger.info(f"moved {len(list(view_tasks))} views to the new schema {to_schema}.") + + def _move_table( + self, + from_catalog: str, + from_schema: str, + from_table: str, + to_catalog: str, + to_schema: str, + del_table: bool, # noqa: FBT001 + ) -> bool: + from_table_name = f"{from_catalog}.{from_schema}.{from_table}" + to_table_name = f"{to_catalog}.{to_schema}.{from_table}" + try: + create_sql = str(next(self._backend.fetch(f"SHOW CREATE TABLE {from_table_name}"))[0]) + create_table_sql = create_sql.replace(f"CREATE TABLE {from_table_name}", f"CREATE TABLE {to_table_name}") + logger.debug(f"Creating table {to_table_name}.") + self._backend.execute(create_table_sql) + grants = self._ws.grants.get(SecurableType.TABLE, from_table_name) + if grants.privilege_assignments is None: + return True + grants_changes = [ + PermissionsChange(pair.privileges, pair.principal) for pair in grants.privilege_assignments + ] + self._ws.grants.update(SecurableType.TABLE, to_table_name, changes=grants_changes) + if del_table: + logger.info(f"dropping source table {from_table_name}") + drop_sql = f"DROP TABLE {from_table_name}" + self._backend.execute(drop_sql) + return True + except NotFound as err: + if "[TABLE_OR_VIEW_NOT_FOUND]" in str(err): + logger.error(f"Could not find table {from_table_name}. Table not found.") + else: + logger.error(err) + return False + + def _move_view( + self, + from_catalog: str, + from_schema: str, + from_table: str, + to_catalog: str, + to_schema: str, + del_view: bool, # noqa: FBT001 + view_text: str | None = None, + ) -> bool: + from_table_name = f"{from_catalog}.{from_schema}.{from_table}" + to_table_name = f"{to_catalog}.{to_schema}.{from_table}" + try: + create_sql = f"CREATE VIEW {to_table_name} AS {view_text}" + logger.debug(f"Creating view {to_table_name}.") + self._backend.execute(create_sql) + grants = self._ws.grants.get(SecurableType.TABLE, from_table_name) + if grants.privilege_assignments is None: + return True + grants_changes = [ + PermissionsChange(pair.privileges, pair.principal) for pair in grants.privilege_assignments + ] + self._ws.grants.update(SecurableType.TABLE, to_table_name, changes=grants_changes) + if del_view: + logger.info(f"dropping source view {from_table_name}") + drop_sql = f"DROP VIEW {from_table_name}" + self._backend.execute(drop_sql) + return True + except NotFound as err: + if "[TABLE_OR_VIEW_NOT_FOUND]" in str(err): + logger.error(f"Could not find view {from_table_name}. View not found.") + else: + logger.error(err) + return False diff --git a/tests/integration/hive_metastore/test_table_move.py b/tests/integration/hive_metastore/test_table_move.py new file mode 100644 index 0000000000..29e8c91f6c --- /dev/null +++ b/tests/integration/hive_metastore/test_table_move.py @@ -0,0 +1,96 @@ +import logging +from datetime import timedelta + +from databricks.sdk.errors import NotFound +from databricks.sdk.retries import retried +from databricks.sdk.service.catalog import Privilege, PrivilegeAssignment, SecurableType + +from databricks.labs.ucx.hive_metastore.table_migrate import TableMove + +logger = logging.getLogger(__name__) + + +@retried(on=[NotFound], timeout=timedelta(minutes=2)) +def test_move_tables_no_from_schema(ws, sql_backend, make_random, make_catalog, caplog): + from_catalog = make_catalog() + from_schema = make_random(4) + to_catalog = make_catalog() + tm = TableMove(ws, sql_backend) + tm.move_tables(from_catalog.name, from_schema, "*", to_catalog.name, from_schema, False) + rec_results = [ + rec.message + for rec in caplog.records + if f"schema {from_schema} not found in catalog {from_catalog.name}" in rec.message + ] + assert len(rec_results) == 1 + + +@retried(on=[NotFound], timeout=timedelta(minutes=2)) +def test_move_tables(ws, sql_backend, make_catalog, make_schema, make_table, make_acc_group): + tm = TableMove(ws, sql_backend) + group_a = make_acc_group() + group_b = make_acc_group() + from_catalog = make_catalog() + from_schema = make_schema(catalog_name=from_catalog.name) + from_table_1 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name) + from_table_2 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name) + from_table_3 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name) + from_view_1 = make_table( + catalog_name=from_catalog.name, + schema_name=from_schema.name, + view=True, + ctas=f"select * from {from_table_2.full_name}", + ) + to_catalog = make_catalog() + to_schema = make_schema(catalog_name=to_catalog.name) + # creating a table in target schema to test skipping + to_table_3 = make_table(catalog_name=to_catalog.name, schema_name=to_schema.name, name=from_table_3.name) + sql_backend.execute(f"GRANT SELECT ON TABLE {from_table_1.full_name} TO `{group_a.display_name}`") + sql_backend.execute(f"GRANT SELECT,MODIFY ON TABLE {from_table_2.full_name} TO `{group_b.display_name}`") + sql_backend.execute(f"GRANT SELECT ON VIEW {from_view_1.full_name} TO `{group_b.display_name}`") + sql_backend.execute(f"GRANT SELECT ON TABLE {to_table_3.full_name} TO `{group_a.display_name}`") + tm.move_tables(from_catalog.name, from_schema.name, "*", to_catalog.name, to_schema.name, False) + tables = ws.tables.list(catalog_name=to_catalog.name, schema_name=to_schema.name) + table_1_grant = ws.grants.get( + securable_type=SecurableType.TABLE, full_name=f"{to_catalog.name}.{to_schema.name}.{from_table_1.name}" + ) + table_2_grant = ws.grants.get( + securable_type=SecurableType.TABLE, full_name=f"{to_catalog.name}.{to_schema.name}.{from_table_2.name}" + ) + table_3_grant = ws.grants.get( + securable_type=SecurableType.TABLE, full_name=f"{to_catalog.name}.{to_schema.name}.{from_table_3.name}" + ) + view_1_grant = ws.grants.get( + securable_type=SecurableType.TABLE, full_name=f"{to_catalog.name}.{to_schema.name}.{from_view_1.name}" + ) + for t in tables: + assert t.name in [from_table_1.name, from_table_2.name, from_table_3.name, from_view_1.name] + expected_table_1_grant = [PrivilegeAssignment(group_a.display_name, [Privilege.SELECT])] + expected_table_2_grant = [ + PrivilegeAssignment(group_b.display_name, [Privilege.MODIFY, Privilege.SELECT]), + ] + expected_table_3_grant = [PrivilegeAssignment(group_a.display_name, [Privilege.SELECT])] + expected_view_1_grant = [PrivilegeAssignment(group_b.display_name, [Privilege.SELECT])] + assert table_1_grant.privilege_assignments == expected_table_1_grant + assert table_2_grant.privilege_assignments == expected_table_2_grant + assert table_3_grant.privilege_assignments == expected_table_3_grant + assert view_1_grant.privilege_assignments == expected_view_1_grant + + +@retried(on=[NotFound], timeout=timedelta(minutes=2)) +def test_move_tables_no_to_schema(ws, sql_backend, make_catalog, make_schema, make_table, make_random): + tm = TableMove(ws, sql_backend) + from_catalog = make_catalog() + from_schema = make_schema(catalog_name=from_catalog.name) + from_table_1 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name) + from_table_2 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name) + from_table_3 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name) + to_catalog = make_catalog() + to_schema = make_random(4) + tm.move_tables(from_catalog.name, from_schema.name, from_table_1.name, to_catalog.name, to_schema, True) + tables = ws.tables.list(catalog_name=to_catalog.name, schema_name=to_schema) + dropped_tables = ws.tables.list(catalog_name=from_catalog.name, schema_name=from_schema.name) + for t in tables: + assert t.name in [from_table_1.name, from_table_2.name, from_table_3.name] + for t in dropped_tables: + assert t.name in [from_table_1.name, from_table_2.name, from_table_3.name] diff --git a/tests/unit/hive_metastore/test_table_move.py b/tests/unit/hive_metastore/test_table_move.py new file mode 100644 index 0000000000..a76e1a0511 --- /dev/null +++ b/tests/unit/hive_metastore/test_table_move.py @@ -0,0 +1,114 @@ +import logging +from unittest.mock import create_autospec + +from databricks.sdk import WorkspaceClient +from databricks.sdk.errors import NotFound +from databricks.sdk.service.catalog import ( + PermissionsList, + Privilege, + PrivilegeAssignment, + SchemaInfo, + TableInfo, + TableType, +) + +from databricks.labs.ucx.hive_metastore.table_migrate import TableMove +from databricks.labs.ucx.mixins.sql import Row + +from ..framework.mocks import MockBackend + +logger = logging.getLogger(__name__) + + +def make_row(data, columns): + row = Row(data) + row.__columns__ = columns + return row + + +def test_move_tables_invalid_from_schema(caplog): + client = create_autospec(WorkspaceClient) + client.schemas.get.side_effect = NotFound() + tm = TableMove(client, MockBackend) + tm.move_tables("SrcC", "SrcS", "*", "TgtC", "TgtS", False) + assert len([rec.message for rec in caplog.records if "schema SrcS not found in catalog SrcC" in rec.message]) == 1 + + +def test_move_tables_invalid_to_schema(caplog): + client = create_autospec(WorkspaceClient) + client.schemas.get.side_effect = [SchemaInfo(), NotFound()] + tm = TableMove(client, MockBackend) + tm.move_tables("SrcC", "SrcS", "*", "TgtC", "TgtS", False) + assert len([rec.message for rec in caplog.records if "schema TgtS not found in TgtC" in rec.message]) == 1 + + +def test_move_tables(caplog): + caplog.set_level(logging.INFO) + client = create_autospec(WorkspaceClient) + errors = {} + rows = { + "SHOW CREATE TABLE SrcC.SrcS.table1": [ + ("CREATE TABLE SrcC.SrcS.table1 (name string)"), + ], + "SHOW CREATE TABLE SrcC.SrcS.table3": [ + ("CREATE TABLE SrcC.SrcS.table3 (name string)"), + ], + } + client.tables.list.return_value = [ + TableInfo( + catalog_name="SrcC", + schema_name="SrcS", + name="table1", + full_name="SrcC.SrcS.table1", + table_type=TableType.EXTERNAL, + ), + TableInfo( + catalog_name="SrcC", + schema_name="SrcS", + name="table2", + full_name="SrcC.SrcS.table2", + table_type=TableType.EXTERNAL, + ), + TableInfo( + catalog_name="SrcC", + schema_name="SrcS", + name="table3", + full_name="SrcC.SrcS.table3", + table_type=TableType.EXTERNAL, + ), + TableInfo( + catalog_name="SrcC", + schema_name="SrcS", + name="view1", + full_name="SrcC.SrcS.view1", + table_type=TableType.VIEW, + view_definition="SELECT * FROM SrcC.SrcS.table1", + ), + TableInfo( + catalog_name="SrcC", + schema_name="SrcS", + name="view2", + full_name="SrcC.SrcS.view2", + table_type=TableType.VIEW, + view_definition="SELECT * FROM SrcC.SrcS.table1", + ), + ] + perm_list = PermissionsList([PrivilegeAssignment("foo", [Privilege.SELECT])]) + perm_none = PermissionsList(None) + client.grants.get.side_effect = [perm_list, perm_none, perm_none, perm_list, perm_none] + client.schemas.get.side_effect = [SchemaInfo(), SchemaInfo()] + client.tables.get.side_effect = [NotFound(), TableInfo(), NotFound(), NotFound(), NotFound()] + backend = MockBackend(fails_on_first=errors, rows=rows) + tm = TableMove(client, backend) + tm.move_tables("SrcC", "SrcS", "*", "TgtC", "TgtS", True) + log_cnt = 0 + for rec in caplog.records: + if rec.message in [ + "moved 2 tables to the new schema TgtS.", + "moved 2 views to the new schema TgtS.", + "dropping source table SrcC.SrcS.table1", + "dropping source view SrcC.SrcS.view2", + ]: + log_cnt += 1 + + assert log_cnt == 4 diff --git a/tests/unit/test_cli.py b/tests/unit/test_cli.py index 7e890869fc..1fe51d3fcf 100644 --- a/tests/unit/test_cli.py +++ b/tests/unit/test_cli.py @@ -1,11 +1,11 @@ -from unittest.mock import create_autospec +from unittest.mock import create_autospec, patch from databricks.sdk import WorkspaceClient from databricks.sdk.errors import NotFound from databricks.sdk.service import iam from databricks.sdk.service.iam import User -from databricks.labs.ucx.cli import repair_run, skip +from databricks.labs.ucx.cli import move, repair_run, skip def test_skip_no_schema(caplog): @@ -42,3 +42,48 @@ def test_no_step_in_repair_run(mocker, caplog): repair_run(w, "") except KeyError as e: assert e.args[0] == "You did not specify --step" + + +def test_move_no_ucx(mocker, caplog): + w = create_autospec(WorkspaceClient) + w.current_user.me = lambda: iam.User(user_name="foo", groups=[iam.ComplexValue(display="admins")]) + mocker.patch("databricks.labs.ucx.installer.InstallationManager.for_user", return_value=None) + move(w, "", "", "", "", "") + assert [rec.message for rec in caplog.records if "UCX configuration" in rec.message] + + +def test_move_no_catalog(mocker, caplog): + w = create_autospec(WorkspaceClient) + w.current_user.me = lambda: iam.User(user_name="foo", groups=[iam.ComplexValue(display="admins")]) + mocker.patch("databricks.labs.ucx.installer.InstallationManager.for_user", return_value=w.current_user) + move(w, "", "", "", "", "") + assert ( + len([rec.message for rec in caplog.records if "Please enter from_catalog and to_catalog" in rec.message]) == 1 + ) + + +def test_move_same_schema(mocker, caplog): + w = create_autospec(WorkspaceClient) + w.current_user.me = lambda: iam.User(user_name="foo", groups=[iam.ComplexValue(display="admins")]) + mocker.patch("databricks.labs.ucx.installer.InstallationManager.for_user", return_value=w.current_user) + move(w, "SrcCat", "SrcS", "*", "SrcCat", "SrcS") + assert len([rec.message for rec in caplog.records if "please select a different schema" in rec.message]) == 1 + + +def test_move_no_schema(mocker, caplog): + w = create_autospec(WorkspaceClient) + w.current_user.me = lambda: iam.User(user_name="foo", groups=[iam.ComplexValue(display="admins")]) + mocker.patch("databricks.labs.ucx.installer.InstallationManager.for_user", return_value=w.current_user) + move(w, "SrcCat", "", "*", "TgtCat", "") + assert len([rec.message for rec in caplog.records if "Please enter from_schema, to_schema" in rec.message]) == 1 + + +def test_move(mocker, caplog, monkeypatch): + w = create_autospec(WorkspaceClient) + w.current_user.me = lambda: iam.User(user_name="foo", groups=[iam.ComplexValue(display="admins")]) + mocker.patch("databricks.labs.ucx.installer.InstallationManager.for_user", return_value=w.current_user) + monkeypatch.setattr("builtins.input", lambda _: "yes") + + with patch("databricks.labs.ucx.hive_metastore.table_migrate.TableMove.move_tables", return_value=None) as m: + move(w, "SrcC", "SrcS", "*", "TgtC", "ToS") + m.assert_called_once()