Skip to content

Commit

Permalink
Enable integration tests for EXTERNAL table migrations (#677)
Browse files Browse the repository at this point in the history
This PR adds integration tests for EXTERNAL and MANAGED table migrations
on Azure
  • Loading branch information
nfx committed Dec 5, 2023
1 parent 80145a4 commit 0a50d63
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 24 deletions.
16 changes: 13 additions & 3 deletions src/databricks/labs/ucx/hive_metastore/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,11 @@ def migrate_tables(self):
_, errors = Threads.gather("migrate tables", tasks)
if len(errors) > 0:
# TODO: https://github.com/databrickslabs/ucx/issues/406
logger.error(f"Detected {len(errors)} errors while migrating tables")
# TODO: pick first X issues in the summary
msg = f"Detected {len(errors)} errors: {'. '.join(str(e) for e in errors)}"
raise ValueError(msg)

def _migrate_table(self, target_catalog, table):
def _migrate_table(self, target_catalog: str, table: Table):
sql = table.uc_create_sql(target_catalog)
logger.debug(f"Migrating table {table.key} to using SQL query: {sql}")
target = f"{target_catalog}.{table.database}.{table.name}".lower()
Expand All @@ -220,8 +222,16 @@ def _migrate_table(self, target_catalog, table):
self._backend.execute(table.sql_alter_to(target_catalog))
self._backend.execute(table.sql_alter_from(target_catalog))
self._seen_tables[target] = table.key
elif table.object_type == "EXTERNAL":
result = next(self._backend.fetch(sql))
if result.status_code != "SUCCESS":
raise ValueError(result.description)
self._backend.execute(table.sql_alter_to(target_catalog))
self._backend.execute(table.sql_alter_from(target_catalog))
self._seen_tables[target] = table.key
else:
logger.info(f"Table {table.key} is a {table.object_type} and is not supported for migration yet ")
msg = f"Table {table.key} is a {table.object_type} and is not supported for migration yet"
raise ValueError(msg)
return True

def _init_seen_tables(self):
Expand Down
45 changes: 39 additions & 6 deletions src/databricks/labs/ucx/mixins/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
from databricks.sdk import AccountClient, WorkspaceClient
from databricks.sdk.core import DatabricksError
from databricks.sdk.service import compute, iam, jobs, pipelines, workspace
from databricks.sdk.service.catalog import CatalogInfo, SchemaInfo, TableInfo
from databricks.sdk.service.catalog import (
CatalogInfo,
DataSourceFormat,
SchemaInfo,
TableInfo,
TableType,
)
from databricks.sdk.service.sql import (
CreateWarehouseRequestWarehouseType,
Query,
Expand Down Expand Up @@ -776,6 +782,7 @@ def create(
ctas: str | None = None,
non_delta: bool = False,
external: bool = False,
external_csv: str | None = None,
view: bool = False,
tbl_properties: dict[str, str] | None = None,
) -> TableInfo:
Expand All @@ -785,28 +792,54 @@ def create(
schema_name = schema.name
if name is None:
name = f"ucx_T{make_random(4)}".lower()
table_type = None
data_source_format = None
storage_location = None
full_name = f"{catalog_name}.{schema_name}.{name}".lower()
ddl = f'CREATE {"VIEW" if view else "TABLE"} {full_name}'
if view:
table_type = TableType.VIEW
if ctas is not None:
# temporary (if not view)
ddl = f"{ddl} AS {ctas}"
elif non_delta:
location = "dbfs:/databricks-datasets/iot-stream/data-device"
ddl = f"{ddl} USING json LOCATION '{location}'"
table_type = TableType.MANAGED
data_source_format = DataSourceFormat.JSON
storage_location = "dbfs:/databricks-datasets/iot-stream/data-device"
ddl = f"{ddl} USING json LOCATION '{storage_location}'"
elif external_csv is not None:
table_type = TableType.EXTERNAL
data_source_format = DataSourceFormat.CSV
storage_location = external_csv
ddl = f"{ddl} USING CSV OPTIONS (header=true) LOCATION '{storage_location}'"
elif external:
# external table
table_type = TableType.EXTERNAL
data_source_format = DataSourceFormat.DELTASHARING
url = "s3a://databricks-datasets-oregon/delta-sharing/share/open-datasets.share"
share = f"{url}#delta_sharing.default.lending_club"
ddl = f"{ddl} USING deltaSharing LOCATION '{share}'"
storage_location = f"{url}#delta_sharing.default.lending_club"
ddl = f"{ddl} USING deltaSharing LOCATION '{storage_location}'"
else:
# managed table
table_type = TableType.MANAGED
data_source_format = DataSourceFormat.DELTA
storage_location = f"dbfs:/user/hive/warehouse/{schema_name}/{name}"
ddl = f"{ddl} (id INT, value STRING)"
if tbl_properties:
tbl_properties = ",".join([f" '{k}' = '{v}' " for k, v in tbl_properties.items()])
ddl = f"{ddl} TBLPROPERTIES ({tbl_properties})"

sql_backend.execute(ddl)
table_info = TableInfo(catalog_name=catalog_name, schema_name=schema_name, name=name, full_name=full_name)
table_info = TableInfo(
catalog_name=catalog_name,
schema_name=schema_name,
name=name,
full_name=full_name,
properties=tbl_properties,
storage_location=storage_location,
table_type=table_type,
data_source_format=data_source_format,
)
logger.info(
f"Table {table_info.full_name}: "
f"{ws.config.host}/explore/data/{table_info.catalog_name}/{table_info.schema_name}/{table_info.name}"
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ def __init__(self, sql_backend: SqlBackend, schema: str, tables: list[TableInfo]
catalog=_.catalog_name,
database=_.schema_name,
name=_.name,
object_type="TABLE" if not _.view_definition else "VIEW",
object_type=f"{_.table_type.value}",
view_text=_.view_definition,
location=_.storage_location,
table_format=f"{_.data_source_format}",
table_format=f"{_.data_source_format.value}",
)
for _ in tables
]
Expand Down
34 changes: 21 additions & 13 deletions tests/integration/hive_metastore/test_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

import pytest

from databricks.labs.ucx.hive_metastore import TablesCrawler
from databricks.labs.ucx.hive_metastore.tables import TablesMigrate

from ..conftest import StaticTablesCrawler

logger = logging.getLogger(__name__)


@pytest.mark.skip("integration testing environments on AWS do not yet have UC IAM properly setup")
def test_migrate_managed_tables(ws, sql_backend, inventory_schema, make_catalog, make_schema, make_table, env_or_skip):
def test_migrate_managed_tables(ws, sql_backend, inventory_schema, make_catalog, make_schema, make_table):
if not ws.config.is_azure:
pytest.skip("temporary: only works in azure test env")
src_schema = make_schema(catalog_name="hive_metastore")
src_managed_table = make_table(catalog_name=src_schema.catalog_name, schema_name=src_schema.name)

Expand All @@ -18,21 +20,23 @@ def test_migrate_managed_tables(ws, sql_backend, inventory_schema, make_catalog,

logger.info(f"dst_catalog={dst_catalog.name}, managed_table={src_managed_table.full_name}")

crawler = TablesCrawler(sql_backend, inventory_schema)
# crawler = TablesCrawler(sql_backend, inventory_schema)
crawler = StaticTablesCrawler(sql_backend, inventory_schema, [src_managed_table])
tm = TablesMigrate(crawler, ws, sql_backend, dst_catalog.name)
tm.migrate_tables()

target_tables = list(sql_backend.fetch(f"SHOW TABLES IN {dst_schema.full_name}"))
assert len(target_tables) == 1

target_table_properties = ws.tables.get(f"{dst_schema.full_name}.{src_managed_table.name}").properties
assert target_table_properties["upgraded_from"] == src_managed_table
assert target_table_properties["upgraded_from"] == src_managed_table.full_name


@pytest.mark.skip("integration testing environments on AWS do not yet have UC IAM properly setup")
def test_migrate_tables_with_cache_should_not_create_table(
ws, sql_backend, inventory_schema, make_random, make_catalog, make_schema, make_table, env_or_skip
ws, sql_backend, inventory_schema, make_random, make_catalog, make_schema, make_table
):
if not ws.config.is_azure:
pytest.skip("temporary: only works in azure test env")
src_schema = make_schema(catalog_name="hive_metastore")

dst_catalog = make_catalog()
Expand All @@ -56,10 +60,10 @@ def test_migrate_tables_with_cache_should_not_create_table(
f"target_catalog={dst_catalog.name}, "
f"source_managed_table={src_managed_table}"
f"target_managed_table={dst_managed_table}"
f""
)

crawler = TablesCrawler(sql_backend, inventory_schema)
# crawler = TablesCrawler(sql_backend, inventory_schema)
crawler = StaticTablesCrawler(sql_backend, inventory_schema, [src_managed_table])
tm = TablesMigrate(crawler, ws, sql_backend, dst_catalog.name)
tm.migrate_tables()

Expand All @@ -69,17 +73,21 @@ def test_migrate_tables_with_cache_should_not_create_table(
assert target_tables[0]["tableName"] == table_name


@pytest.mark.skip(reason="Needs Storage credential + External Location in place")
def test_migrate_external_table(ws, sql_backend, inventory_schema, make_catalog, make_schema, make_table, env_or_skip):
if not ws.config.is_azure:
pytest.skip("temporary: only works in azure test env")
src_schema = make_schema(catalog_name="hive_metastore")
src_external_table = make_table(schema_name=src_schema.name, external=True)

mounted_location = f'dbfs:/mnt/{env_or_skip("TEST_MOUNT_NAME")}/a/b/c'
src_external_table = make_table(schema_name=src_schema.name, external_csv=mounted_location)

dst_catalog = make_catalog()
dst_schema = make_schema(catalog_name=dst_catalog.name, schema_name=src_schema.name)
dst_schema = make_schema(catalog_name=dst_catalog.name, name=src_schema.name)

logger.info(f"dst_catalog={dst_catalog.name}, external_table={src_external_table.full_name}")

crawler = TablesCrawler(sql_backend, inventory_schema)
# crawler = TablesCrawler(sql_backend, inventory_schema)
crawler = StaticTablesCrawler(sql_backend, inventory_schema, [src_external_table])
tm = TablesMigrate(crawler, ws, sql_backend, dst_catalog.name)
tm.migrate_tables()

Expand Down

0 comments on commit 0a50d63

Please sign in to comment.