Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added databricks labs ucx move command #756

Merged
merged 47 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
7651860
setting it to current user folder to speed up workspace listing
HariGS-DB Dec 23, 2023
a087062
setting it to current user folder to speed up workspace listing
HariGS-DB Dec 23, 2023
df5e45a
added cli and labs changes. and parameter syntax checks
HariGS-DB Dec 26, 2023
f11830d
fmting
HariGS-DB Dec 26, 2023
3b79f81
added logic for iterating to tables and validating objects
HariGS-DB Dec 27, 2023
b226ac5
meringMerge branch 'main' of https://github.com/databricks/ucx
HariGS-DB Jan 2, 2024
765387e
merging
HariGS-DB Jan 2, 2024
42ce597
merging
HariGS-DB Jan 2, 2024
6cf75dc
Fixed flaky 'test_group_name_change_substitute' (#739)
FastLee Dec 26, 2023
b3a3d0d
merging
HariGS-DB Jan 2, 2024
4805fa7
Fix installation issue when upgrading from an older version of the to…
mwojtyczka Dec 28, 2023
97e94bd
Added `databricks labs ucx repair-run --step ...` CLI command for rep…
prajin-29 Dec 28, 2023
8a4a556
Added `.codegen.json` for automated release infra (#743)
nfx Dec 28, 2023
31de563
Increased installation test coverage (#742)
mwojtyczka Dec 28, 2023
ec16872
Release v0.8.0 (#744)
nfx Dec 28, 2023
0f7865f
Added Assessment step to estimate the size of DBFS Root Tables. (#741)
FastLee Dec 29, 2023
b11b7d0
Merge branch 'feature/uc_to_uc' of https://github.com/databricks/ucx …
HariGS-DB Jan 2, 2024
016d06f
merging
HariGS-DB Jan 2, 2024
661c8eb
merging
HariGS-DB Jan 2, 2024
f4a7119
merging
HariGS-DB Jan 2, 2024
8811f75
testcases
HariGS-DB Jan 3, 2024
e546459
cli test cases
HariGS-DB Jan 3, 2024
bc0158d
unit test cases for tables
HariGS-DB Jan 3, 2024
bbd8e30
added permission migrate and test cases
HariGS-DB Jan 4, 2024
6df411a
added int test for failure
HariGS-DB Jan 4, 2024
bb83917
added int test for table migration
HariGS-DB Jan 5, 2024
0d44a0e
merging
HariGS-DB Jan 5, 2024
9e640d2
merging and moving code to right places
HariGS-DB Jan 5, 2024
fb22d1a
merging and moving code to right places
HariGS-DB Jan 5, 2024
9e509dd
int test fixes
HariGS-DB Jan 5, 2024
b0adcf2
Merge branch 'main' into feature/uc_to_uc
HariGS-DB Jan 5, 2024
2a0c19c
fixed review comments
HariGS-DB Jan 7, 2024
a9537ec
merging changes from main
HariGS-DB Jan 7, 2024
7a2e6d3
int test fixes
HariGS-DB Jan 7, 2024
0237ddb
improving coverage
HariGS-DB Jan 7, 2024
c4c758e
incorporating review comments, and seperating move to seperate class
HariGS-DB Jan 8, 2024
f32f479
updating labs.yml
HariGS-DB Jan 8, 2024
4578abf
adding specific exception catching
HariGS-DB Jan 8, 2024
b56a319
added prompts
HariGS-DB Jan 9, 2024
874280e
added prompts
HariGS-DB Jan 9, 2024
1ff6724
added drop table int test
HariGS-DB Jan 9, 2024
e41b597
added drop table int test
HariGS-DB Jan 9, 2024
1967541
mergin from mainMerge branch 'main' into feature/uc_to_uc
HariGS-DB Jan 9, 2024
11f24c2
Merge branch 'main' into feature/uc_to_uc
HariGS-DB Jan 9, 2024
2702d8e
int test assert checks
HariGS-DB Jan 9, 2024
cdd6da0
int test assert checks
HariGS-DB Jan 9, 2024
cc249ab
int test assert checks on name and not length
HariGS-DB Jan 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions labs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,17 @@ commands:
- name: delete_managed
description: Revert and delete managed tables


- name: move
description: move tables across schema/catalog withing a UC metastore
flags:
- name: from-catalog
description: from catalog name
- name: from-schema
description: schema name to migrate.
- name: from-table
description: table names to migrate. enter * to migrate all tables
- name: to-catalog
description: target catalog to migrate schema to
- name: to-schema
description: target schema to migrate tables to
35 changes: 34 additions & 1 deletion src/databricks/labs/ucx/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from databricks.labs.ucx.framework.crawlers import StatementExecutionBackend
from databricks.labs.ucx.hive_metastore import ExternalLocations, TablesCrawler
from databricks.labs.ucx.hive_metastore.mapping import TableMapping
from databricks.labs.ucx.hive_metastore.table_migrate import TablesMigrate
from databricks.labs.ucx.hive_metastore.table_migrate import TableMove, TablesMigrate
from databricks.labs.ucx.install import WorkspaceInstaller
from databricks.labs.ucx.installer import InstallationManager

Expand Down Expand Up @@ -165,5 +165,38 @@ def revert_migrated_tables(w: WorkspaceClient, schema: str, table: str, *, delet
tm.revert_migrated_tables(schema, table, delete_managed=delete_managed)


@ucx.command
def move(
w: WorkspaceClient,
from_catalog: str,
from_schema: str,
from_table: str,
to_catalog: str,
to_schema: str,
):
"""move a uc table/tables from one schema to another schema in same or different catalog"""
logger.info("Running move command")
prompts = Prompts()
installation_manager = InstallationManager(w)
installation = installation_manager.for_user(w.current_user.me())
if not installation:
HariGS-DB marked this conversation as resolved.
Show resolved Hide resolved
logger.error(CANT_FIND_UCX_MSG)
return
sql_backend = StatementExecutionBackend(w, installation.config.warehouse_id)
tables = TableMove(w, sql_backend)
if from_catalog == "" or to_catalog == "":
logger.error("Please enter from_catalog and to_catalog details")
return
if from_schema == "" or to_schema == "" or from_table == "":
logger.error("Please enter from_schema, to_schema and from_table(enter * for migrating all tables) details.")
return
if from_catalog == to_catalog and from_schema == to_schema:
logger.error("please select a different schema or catalog to migrate to")
return
del_table = prompts.confirm(f"should we delete tables/view after moving to new schema {to_catalog}.{to_schema}")
logger.info(f"migrating tables {from_table} from {from_catalog}.{from_schema} to {to_catalog}.{to_schema}")
tables.move_tables(from_catalog, from_schema, from_table, to_catalog, to_schema, del_table)


if "__main__" == __name__:
ucx()
134 changes: 134 additions & 0 deletions src/databricks/labs/ucx/hive_metastore/table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from databricks.labs.blueprint.parallel import Threads
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import NotFound
from databricks.sdk.service.catalog import PermissionsChange, SecurableType, TableType

from databricks.labs.ucx.framework.crawlers import SqlBackend
from databricks.labs.ucx.hive_metastore import TablesCrawler
Expand Down Expand Up @@ -182,3 +184,135 @@
print("Migrated Manged Tables (targets) will be left intact.")
print("To revert and delete Migrated Tables, add --delete_managed true flag to the command.")
return True


class TableMove:
def __init__(self, ws: WorkspaceClient, backend: SqlBackend):
self._backend = backend
self._ws = ws

def move_tables(
self,
from_catalog: str,
from_schema: str,
from_table: str,
to_catalog: str,
to_schema: str,
del_table: bool, # noqa: FBT001
):
try:
self._ws.schemas.get(f"{from_catalog}.{from_schema}")
except NotFound:
logger.error(f"schema {from_schema} not found in catalog {from_catalog}, enter correct schema details.")
return
try:
self._ws.schemas.get(f"{to_catalog}.{to_schema}")
except NotFound:
logger.warning(f"schema {to_schema} not found in {to_catalog}, creating...")
self._ws.schemas.create(to_schema, to_catalog)

tables = self._ws.tables.list(from_catalog, from_schema)
table_tasks = []
view_tasks = []
filtered_tables = [table for table in tables if from_table in [table.name, "*"]]
for table in filtered_tables:
try:
self._ws.tables.get(f"{to_catalog}.{to_schema}.{table.name}")
logger.warning(
f"table {from_table} already present in {from_catalog}.{from_schema}. skipping this table..."
)
continue
except NotFound:
if table.table_type and table.table_type in (TableType.EXTERNAL, TableType.MANAGED):
table_tasks.append(
partial(
self._move_table, from_catalog, from_schema, table.name, to_catalog, to_schema, del_table
)
)
else:
view_tasks.append(
partial(
self._move_view,
from_catalog,
from_schema,
table.name,
to_catalog,
to_schema,
del_table,
table.view_definition,
)
)
Threads.strict("creating tables", table_tasks)
logger.info(f"moved {len(list(table_tasks))} tables to the new schema {to_schema}.")
Threads.strict("creating views", view_tasks)
logger.info(f"moved {len(list(view_tasks))} views to the new schema {to_schema}.")

def _move_table(
self,
from_catalog: str,
from_schema: str,
from_table: str,
to_catalog: str,
to_schema: str,
del_table: bool, # noqa: FBT001
) -> bool:
from_table_name = f"{from_catalog}.{from_schema}.{from_table}"
to_table_name = f"{to_catalog}.{to_schema}.{from_table}"
try:
create_sql = str(next(self._backend.fetch(f"SHOW CREATE TABLE {from_table_name}"))[0])
create_table_sql = create_sql.replace(f"CREATE TABLE {from_table_name}", f"CREATE TABLE {to_table_name}")
logger.debug(f"Creating table {to_table_name}.")
self._backend.execute(create_table_sql)
grants = self._ws.grants.get(SecurableType.TABLE, from_table_name)
if grants.privilege_assignments is None:
return True
grants_changes = [
PermissionsChange(pair.privileges, pair.principal) for pair in grants.privilege_assignments
]
self._ws.grants.update(SecurableType.TABLE, to_table_name, changes=grants_changes)
if del_table:
logger.info(f"dropping source table {from_table_name}")
drop_sql = f"DROP TABLE {from_table_name}"
self._backend.execute(drop_sql)
return True
except NotFound as err:

Check warning on line 278 in src/databricks/labs/ucx/hive_metastore/table_migrate.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/hive_metastore/table_migrate.py#L278

Added line #L278 was not covered by tests
if "[TABLE_OR_VIEW_NOT_FOUND]" in str(err):
logger.error(f"Could not find table {from_table_name}. Table not found.")

Check warning on line 280 in src/databricks/labs/ucx/hive_metastore/table_migrate.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/hive_metastore/table_migrate.py#L280

Added line #L280 was not covered by tests
else:
logger.error(err)
return False

Check warning on line 283 in src/databricks/labs/ucx/hive_metastore/table_migrate.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/hive_metastore/table_migrate.py#L282-L283

Added lines #L282 - L283 were not covered by tests

def _move_view(
self,
from_catalog: str,
from_schema: str,
from_table: str,
to_catalog: str,
to_schema: str,
del_view: bool, # noqa: FBT001
view_text: str | None = None,
) -> bool:
from_table_name = f"{from_catalog}.{from_schema}.{from_table}"
to_table_name = f"{to_catalog}.{to_schema}.{from_table}"
try:
create_sql = f"CREATE VIEW {to_table_name} AS {view_text}"
logger.debug(f"Creating view {to_table_name}.")
self._backend.execute(create_sql)
grants = self._ws.grants.get(SecurableType.TABLE, from_table_name)
if grants.privilege_assignments is None:
return True
grants_changes = [
PermissionsChange(pair.privileges, pair.principal) for pair in grants.privilege_assignments
]
self._ws.grants.update(SecurableType.TABLE, to_table_name, changes=grants_changes)
if del_view:
logger.info(f"dropping source view {from_table_name}")
drop_sql = f"DROP VIEW {from_table_name}"
self._backend.execute(drop_sql)
return True
except NotFound as err:

Check warning on line 313 in src/databricks/labs/ucx/hive_metastore/table_migrate.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/hive_metastore/table_migrate.py#L313

Added line #L313 was not covered by tests
if "[TABLE_OR_VIEW_NOT_FOUND]" in str(err):
logger.error(f"Could not find view {from_table_name}. View not found.")

Check warning on line 315 in src/databricks/labs/ucx/hive_metastore/table_migrate.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/hive_metastore/table_migrate.py#L315

Added line #L315 was not covered by tests
else:
logger.error(err)
return False

Check warning on line 318 in src/databricks/labs/ucx/hive_metastore/table_migrate.py

View check run for this annotation

Codecov / codecov/patch

src/databricks/labs/ucx/hive_metastore/table_migrate.py#L317-L318

Added lines #L317 - L318 were not covered by tests
96 changes: 96 additions & 0 deletions tests/integration/hive_metastore/test_table_move.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import logging
from datetime import timedelta

from databricks.sdk.errors import NotFound
from databricks.sdk.retries import retried
from databricks.sdk.service.catalog import Privilege, PrivilegeAssignment, SecurableType

from databricks.labs.ucx.hive_metastore.table_migrate import TableMove

logger = logging.getLogger(__name__)


@retried(on=[NotFound], timeout=timedelta(minutes=2))
def test_move_tables_no_from_schema(ws, sql_backend, make_random, make_catalog, caplog):
from_catalog = make_catalog()
from_schema = make_random(4)
to_catalog = make_catalog()
tm = TableMove(ws, sql_backend)
tm.move_tables(from_catalog.name, from_schema, "*", to_catalog.name, from_schema, False)
rec_results = [
rec.message
for rec in caplog.records
if f"schema {from_schema} not found in catalog {from_catalog.name}" in rec.message
]
assert len(rec_results) == 1


@retried(on=[NotFound], timeout=timedelta(minutes=2))
def test_move_tables(ws, sql_backend, make_catalog, make_schema, make_table, make_acc_group):
tm = TableMove(ws, sql_backend)
group_a = make_acc_group()
group_b = make_acc_group()
from_catalog = make_catalog()
from_schema = make_schema(catalog_name=from_catalog.name)
from_table_1 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
from_table_2 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
from_table_3 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
from_view_1 = make_table(
catalog_name=from_catalog.name,
schema_name=from_schema.name,
view=True,
ctas=f"select * from {from_table_2.full_name}",
)
to_catalog = make_catalog()
to_schema = make_schema(catalog_name=to_catalog.name)
# creating a table in target schema to test skipping
to_table_3 = make_table(catalog_name=to_catalog.name, schema_name=to_schema.name, name=from_table_3.name)
sql_backend.execute(f"GRANT SELECT ON TABLE {from_table_1.full_name} TO `{group_a.display_name}`")
sql_backend.execute(f"GRANT SELECT,MODIFY ON TABLE {from_table_2.full_name} TO `{group_b.display_name}`")
sql_backend.execute(f"GRANT SELECT ON VIEW {from_view_1.full_name} TO `{group_b.display_name}`")
sql_backend.execute(f"GRANT SELECT ON TABLE {to_table_3.full_name} TO `{group_a.display_name}`")
tm.move_tables(from_catalog.name, from_schema.name, "*", to_catalog.name, to_schema.name, False)
tables = ws.tables.list(catalog_name=to_catalog.name, schema_name=to_schema.name)
table_1_grant = ws.grants.get(
securable_type=SecurableType.TABLE, full_name=f"{to_catalog.name}.{to_schema.name}.{from_table_1.name}"
)
table_2_grant = ws.grants.get(
securable_type=SecurableType.TABLE, full_name=f"{to_catalog.name}.{to_schema.name}.{from_table_2.name}"
)
table_3_grant = ws.grants.get(
securable_type=SecurableType.TABLE, full_name=f"{to_catalog.name}.{to_schema.name}.{from_table_3.name}"
)
view_1_grant = ws.grants.get(
securable_type=SecurableType.TABLE, full_name=f"{to_catalog.name}.{to_schema.name}.{from_view_1.name}"
)
for t in tables:
assert t.name in [from_table_1.name, from_table_2.name, from_table_3.name, from_view_1.name]
expected_table_1_grant = [PrivilegeAssignment(group_a.display_name, [Privilege.SELECT])]
expected_table_2_grant = [
PrivilegeAssignment(group_b.display_name, [Privilege.MODIFY, Privilege.SELECT]),
]
expected_table_3_grant = [PrivilegeAssignment(group_a.display_name, [Privilege.SELECT])]
expected_view_1_grant = [PrivilegeAssignment(group_b.display_name, [Privilege.SELECT])]
assert table_1_grant.privilege_assignments == expected_table_1_grant
assert table_2_grant.privilege_assignments == expected_table_2_grant
assert table_3_grant.privilege_assignments == expected_table_3_grant
assert view_1_grant.privilege_assignments == expected_view_1_grant


@retried(on=[NotFound], timeout=timedelta(minutes=2))
def test_move_tables_no_to_schema(ws, sql_backend, make_catalog, make_schema, make_table, make_random):
tm = TableMove(ws, sql_backend)
from_catalog = make_catalog()
from_schema = make_schema(catalog_name=from_catalog.name)
from_table_1 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
from_table_2 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
from_table_3 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
to_catalog = make_catalog()
to_schema = make_random(4)
tm.move_tables(from_catalog.name, from_schema.name, from_table_1.name, to_catalog.name, to_schema, True)
tables = ws.tables.list(catalog_name=to_catalog.name, schema_name=to_schema)
dropped_tables = ws.tables.list(catalog_name=from_catalog.name, schema_name=from_schema.name)
for t in tables:
assert t.name in [from_table_1.name, from_table_2.name, from_table_3.name]
for t in dropped_tables:
assert t.name in [from_table_1.name, from_table_2.name, from_table_3.name]
Loading
Loading