Skip to content

Commit

Permalink
Added databricks labs ucx move command (#756)
Browse files Browse the repository at this point in the history
closes #675

---------

Co-authored-by: Liran Bareket <lbareket@gmail.com>
Co-authored-by: Marcin Wojtyczka <wojtyczka.marcin@gmail.com>
Co-authored-by: prajin-29 <127273819+prajin-29@users.noreply.github.com>
Co-authored-by: Serge Smertin <259697+nfx@users.noreply.github.com>
  • Loading branch information
5 people committed Jan 18, 2024
1 parent cc64764 commit daccde3
Show file tree
Hide file tree
Showing 6 changed files with 439 additions and 3 deletions.
14 changes: 14 additions & 0 deletions labs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,17 @@ commands:
- name: delete_managed
description: Revert and delete managed tables


- name: move
description: move tables across schema/catalog withing a UC metastore
flags:
- name: from-catalog
description: from catalog name
- name: from-schema
description: schema name to migrate.
- name: from-table
description: table names to migrate. enter * to migrate all tables
- name: to-catalog
description: target catalog to migrate schema to
- name: to-schema
description: target schema to migrate tables to
35 changes: 34 additions & 1 deletion src/databricks/labs/ucx/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from databricks.labs.ucx.framework.crawlers import StatementExecutionBackend
from databricks.labs.ucx.hive_metastore import ExternalLocations, TablesCrawler
from databricks.labs.ucx.hive_metastore.mapping import TableMapping
from databricks.labs.ucx.hive_metastore.table_migrate import TablesMigrate
from databricks.labs.ucx.hive_metastore.table_migrate import TableMove, TablesMigrate
from databricks.labs.ucx.install import WorkspaceInstaller
from databricks.labs.ucx.installer import InstallationManager

Expand Down Expand Up @@ -165,5 +165,38 @@ def revert_migrated_tables(w: WorkspaceClient, schema: str, table: str, *, delet
tm.revert_migrated_tables(schema, table, delete_managed=delete_managed)


@ucx.command
def move(
w: WorkspaceClient,
from_catalog: str,
from_schema: str,
from_table: str,
to_catalog: str,
to_schema: str,
):
"""move a uc table/tables from one schema to another schema in same or different catalog"""
logger.info("Running move command")
prompts = Prompts()
installation_manager = InstallationManager(w)
installation = installation_manager.for_user(w.current_user.me())
if not installation:
logger.error(CANT_FIND_UCX_MSG)
return
sql_backend = StatementExecutionBackend(w, installation.config.warehouse_id)
tables = TableMove(w, sql_backend)
if from_catalog == "" or to_catalog == "":
logger.error("Please enter from_catalog and to_catalog details")
return
if from_schema == "" or to_schema == "" or from_table == "":
logger.error("Please enter from_schema, to_schema and from_table(enter * for migrating all tables) details.")
return
if from_catalog == to_catalog and from_schema == to_schema:
logger.error("please select a different schema or catalog to migrate to")
return
del_table = prompts.confirm(f"should we delete tables/view after moving to new schema {to_catalog}.{to_schema}")
logger.info(f"migrating tables {from_table} from {from_catalog}.{from_schema} to {to_catalog}.{to_schema}")
tables.move_tables(from_catalog, from_schema, from_table, to_catalog, to_schema, del_table)


if "__main__" == __name__:
ucx()
134 changes: 134 additions & 0 deletions src/databricks/labs/ucx/hive_metastore/table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from databricks.labs.blueprint.parallel import Threads
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import NotFound
from databricks.sdk.service.catalog import PermissionsChange, SecurableType, TableType

from databricks.labs.ucx.framework.crawlers import SqlBackend
from databricks.labs.ucx.hive_metastore import TablesCrawler
Expand Down Expand Up @@ -182,3 +184,135 @@ def print_revert_report(self, *, delete_managed: bool) -> bool | None:
print("Migrated Manged Tables (targets) will be left intact.")
print("To revert and delete Migrated Tables, add --delete_managed true flag to the command.")
return True


class TableMove:
def __init__(self, ws: WorkspaceClient, backend: SqlBackend):
self._backend = backend
self._ws = ws

def move_tables(
self,
from_catalog: str,
from_schema: str,
from_table: str,
to_catalog: str,
to_schema: str,
del_table: bool, # noqa: FBT001
):
try:
self._ws.schemas.get(f"{from_catalog}.{from_schema}")
except NotFound:
logger.error(f"schema {from_schema} not found in catalog {from_catalog}, enter correct schema details.")
return
try:
self._ws.schemas.get(f"{to_catalog}.{to_schema}")
except NotFound:
logger.warning(f"schema {to_schema} not found in {to_catalog}, creating...")
self._ws.schemas.create(to_schema, to_catalog)

tables = self._ws.tables.list(from_catalog, from_schema)
table_tasks = []
view_tasks = []
filtered_tables = [table for table in tables if from_table in [table.name, "*"]]
for table in filtered_tables:
try:
self._ws.tables.get(f"{to_catalog}.{to_schema}.{table.name}")
logger.warning(
f"table {from_table} already present in {from_catalog}.{from_schema}. skipping this table..."
)
continue
except NotFound:
if table.table_type and table.table_type in (TableType.EXTERNAL, TableType.MANAGED):
table_tasks.append(
partial(
self._move_table, from_catalog, from_schema, table.name, to_catalog, to_schema, del_table
)
)
else:
view_tasks.append(
partial(
self._move_view,
from_catalog,
from_schema,
table.name,
to_catalog,
to_schema,
del_table,
table.view_definition,
)
)
Threads.strict("creating tables", table_tasks)
logger.info(f"moved {len(list(table_tasks))} tables to the new schema {to_schema}.")
Threads.strict("creating views", view_tasks)
logger.info(f"moved {len(list(view_tasks))} views to the new schema {to_schema}.")

def _move_table(
self,
from_catalog: str,
from_schema: str,
from_table: str,
to_catalog: str,
to_schema: str,
del_table: bool, # noqa: FBT001
) -> bool:
from_table_name = f"{from_catalog}.{from_schema}.{from_table}"
to_table_name = f"{to_catalog}.{to_schema}.{from_table}"
try:
create_sql = str(next(self._backend.fetch(f"SHOW CREATE TABLE {from_table_name}"))[0])
create_table_sql = create_sql.replace(f"CREATE TABLE {from_table_name}", f"CREATE TABLE {to_table_name}")
logger.debug(f"Creating table {to_table_name}.")
self._backend.execute(create_table_sql)
grants = self._ws.grants.get(SecurableType.TABLE, from_table_name)
if grants.privilege_assignments is None:
return True
grants_changes = [
PermissionsChange(pair.privileges, pair.principal) for pair in grants.privilege_assignments
]
self._ws.grants.update(SecurableType.TABLE, to_table_name, changes=grants_changes)
if del_table:
logger.info(f"dropping source table {from_table_name}")
drop_sql = f"DROP TABLE {from_table_name}"
self._backend.execute(drop_sql)
return True
except NotFound as err:
if "[TABLE_OR_VIEW_NOT_FOUND]" in str(err):
logger.error(f"Could not find table {from_table_name}. Table not found.")
else:
logger.error(err)
return False

def _move_view(
self,
from_catalog: str,
from_schema: str,
from_table: str,
to_catalog: str,
to_schema: str,
del_view: bool, # noqa: FBT001
view_text: str | None = None,
) -> bool:
from_table_name = f"{from_catalog}.{from_schema}.{from_table}"
to_table_name = f"{to_catalog}.{to_schema}.{from_table}"
try:
create_sql = f"CREATE VIEW {to_table_name} AS {view_text}"
logger.debug(f"Creating view {to_table_name}.")
self._backend.execute(create_sql)
grants = self._ws.grants.get(SecurableType.TABLE, from_table_name)
if grants.privilege_assignments is None:
return True
grants_changes = [
PermissionsChange(pair.privileges, pair.principal) for pair in grants.privilege_assignments
]
self._ws.grants.update(SecurableType.TABLE, to_table_name, changes=grants_changes)
if del_view:
logger.info(f"dropping source view {from_table_name}")
drop_sql = f"DROP VIEW {from_table_name}"
self._backend.execute(drop_sql)
return True
except NotFound as err:
if "[TABLE_OR_VIEW_NOT_FOUND]" in str(err):
logger.error(f"Could not find view {from_table_name}. View not found.")
else:
logger.error(err)
return False
96 changes: 96 additions & 0 deletions tests/integration/hive_metastore/test_table_move.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import logging
from datetime import timedelta

from databricks.sdk.errors import NotFound
from databricks.sdk.retries import retried
from databricks.sdk.service.catalog import Privilege, PrivilegeAssignment, SecurableType

from databricks.labs.ucx.hive_metastore.table_migrate import TableMove

logger = logging.getLogger(__name__)


@retried(on=[NotFound], timeout=timedelta(minutes=2))
def test_move_tables_no_from_schema(ws, sql_backend, make_random, make_catalog, caplog):
from_catalog = make_catalog()
from_schema = make_random(4)
to_catalog = make_catalog()
tm = TableMove(ws, sql_backend)
tm.move_tables(from_catalog.name, from_schema, "*", to_catalog.name, from_schema, False)
rec_results = [
rec.message
for rec in caplog.records
if f"schema {from_schema} not found in catalog {from_catalog.name}" in rec.message
]
assert len(rec_results) == 1


@retried(on=[NotFound], timeout=timedelta(minutes=2))
def test_move_tables(ws, sql_backend, make_catalog, make_schema, make_table, make_acc_group):
tm = TableMove(ws, sql_backend)
group_a = make_acc_group()
group_b = make_acc_group()
from_catalog = make_catalog()
from_schema = make_schema(catalog_name=from_catalog.name)
from_table_1 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
from_table_2 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
from_table_3 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
from_view_1 = make_table(
catalog_name=from_catalog.name,
schema_name=from_schema.name,
view=True,
ctas=f"select * from {from_table_2.full_name}",
)
to_catalog = make_catalog()
to_schema = make_schema(catalog_name=to_catalog.name)
# creating a table in target schema to test skipping
to_table_3 = make_table(catalog_name=to_catalog.name, schema_name=to_schema.name, name=from_table_3.name)
sql_backend.execute(f"GRANT SELECT ON TABLE {from_table_1.full_name} TO `{group_a.display_name}`")
sql_backend.execute(f"GRANT SELECT,MODIFY ON TABLE {from_table_2.full_name} TO `{group_b.display_name}`")
sql_backend.execute(f"GRANT SELECT ON VIEW {from_view_1.full_name} TO `{group_b.display_name}`")
sql_backend.execute(f"GRANT SELECT ON TABLE {to_table_3.full_name} TO `{group_a.display_name}`")
tm.move_tables(from_catalog.name, from_schema.name, "*", to_catalog.name, to_schema.name, False)
tables = ws.tables.list(catalog_name=to_catalog.name, schema_name=to_schema.name)
table_1_grant = ws.grants.get(
securable_type=SecurableType.TABLE, full_name=f"{to_catalog.name}.{to_schema.name}.{from_table_1.name}"
)
table_2_grant = ws.grants.get(
securable_type=SecurableType.TABLE, full_name=f"{to_catalog.name}.{to_schema.name}.{from_table_2.name}"
)
table_3_grant = ws.grants.get(
securable_type=SecurableType.TABLE, full_name=f"{to_catalog.name}.{to_schema.name}.{from_table_3.name}"
)
view_1_grant = ws.grants.get(
securable_type=SecurableType.TABLE, full_name=f"{to_catalog.name}.{to_schema.name}.{from_view_1.name}"
)
for t in tables:
assert t.name in [from_table_1.name, from_table_2.name, from_table_3.name, from_view_1.name]
expected_table_1_grant = [PrivilegeAssignment(group_a.display_name, [Privilege.SELECT])]
expected_table_2_grant = [
PrivilegeAssignment(group_b.display_name, [Privilege.MODIFY, Privilege.SELECT]),
]
expected_table_3_grant = [PrivilegeAssignment(group_a.display_name, [Privilege.SELECT])]
expected_view_1_grant = [PrivilegeAssignment(group_b.display_name, [Privilege.SELECT])]
assert table_1_grant.privilege_assignments == expected_table_1_grant
assert table_2_grant.privilege_assignments == expected_table_2_grant
assert table_3_grant.privilege_assignments == expected_table_3_grant
assert view_1_grant.privilege_assignments == expected_view_1_grant


@retried(on=[NotFound], timeout=timedelta(minutes=2))
def test_move_tables_no_to_schema(ws, sql_backend, make_catalog, make_schema, make_table, make_random):
tm = TableMove(ws, sql_backend)
from_catalog = make_catalog()
from_schema = make_schema(catalog_name=from_catalog.name)
from_table_1 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
from_table_2 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
from_table_3 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
to_catalog = make_catalog()
to_schema = make_random(4)
tm.move_tables(from_catalog.name, from_schema.name, from_table_1.name, to_catalog.name, to_schema, True)
tables = ws.tables.list(catalog_name=to_catalog.name, schema_name=to_schema)
dropped_tables = ws.tables.list(catalog_name=from_catalog.name, schema_name=from_schema.name)
for t in tables:
assert t.name in [from_table_1.name, from_table_2.name, from_table_3.name]
for t in dropped_tables:
assert t.name in [from_table_1.name, from_table_2.name, from_table_3.name]
Loading

0 comments on commit daccde3

Please sign in to comment.