diff --git a/docs/table_persistence.md b/docs/table_persistence.md index 12cac2a2c6..b09c17d423 100644 --- a/docs/table_persistence.md +++ b/docs/table_persistence.md @@ -8,7 +8,7 @@ Table utilization per workflow: | Table | Generate Assessment | Update Migration Progress | Migrate Groups | Migrate External Tables | Upgrade Jobs | Migrate tables | Migrate Data Reconciliation | |--------------------------|---------------------|---------------------------|----------------|-------------------------|--------------|----------------|-----------------------------| -| tables | RW | RW | | RO | | RO | | +| tables | RW | RW | | RW | | RW | | | grants | RW | RW | | RW | | RW | | | mounts | RW | | | RO | RO | RO | | | permissions | RW | | RW | RO | | RO | | @@ -30,7 +30,7 @@ Table utilization per workflow: | query_problems | RW | RW | | | | | | | workflow_problems | RW | RW | | | | | | | udfs | RW | RW | RO | | | | | -| logs | RW | | RW | RW | | RW | RW | +| logs | RW | RW | RW | RW | RW | RW | RW | | recon_results | | | | | | | RW | **RW** - Read/Write, the job generates or updates the table.
diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index 3dedc12e78..1e7cb4abcf 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -2,6 +2,7 @@ import logging import re from collections import defaultdict +from collections.abc import Iterable from functools import partial, cached_property from databricks.labs.blueprint.parallel import Threads @@ -18,8 +19,11 @@ TableMapping, TableToMigrate, ) - -from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatusRefresher, TableMigrationIndex +from databricks.labs.ucx.hive_metastore.table_migration_status import ( + TableMigrationStatusRefresher, + TableMigrationStatus, + TableMigrationIndex, +) from databricks.labs.ucx.hive_metastore.tables import ( MigrationCount, Table, @@ -56,14 +60,11 @@ def __init__( self._migrate_grants = migrate_grants self._external_locations = external_locations - def get_remaining_tables(self) -> list[Table]: - migration_index = self.index(force_refresh=True) - table_rows = [] + def warn_about_remaining_non_migrated_tables(self, migration_statuses: Iterable[TableMigrationStatus]) -> None: + migration_index = TableMigrationIndex(migration_statuses) for crawled_table in self._tables_crawler.snapshot(): if not migration_index.is_migrated(crawled_table.database, crawled_table.name): - table_rows.append(crawled_table) logger.warning(f"remained-hive-metastore-table: {crawled_table.key}") - return table_rows def index(self, *, force_refresh: bool = False): return self._migration_status_refresher.index(force_refresh=force_refresh) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py index 0b6dc60de9..0166e3336f 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py @@ -85,7 +85,7 @@ def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema, tables_ self._tables_crawler = tables_crawler def index(self, *, force_refresh: bool = False) -> TableMigrationIndex: - return TableMigrationIndex(list(self.snapshot(force_refresh=force_refresh))) + return TableMigrationIndex(self.snapshot(force_refresh=force_refresh)) def get_seen_tables(self) -> dict[str, str]: seen_tables: dict[str, str] = {} diff --git a/src/databricks/labs/ucx/hive_metastore/workflows.py b/src/databricks/labs/ucx/hive_metastore/workflows.py index 5a61c1ad37..a17d37ddea 100644 --- a/src/databricks/labs/ucx/hive_metastore/workflows.py +++ b/src/databricks/labs/ucx/hive_metastore/workflows.py @@ -1,3 +1,5 @@ +import datetime as dt + from databricks.labs.ucx.assessment.workflows import Assessment from databricks.labs.ucx.contexts.workflow_task import RuntimeContext from databricks.labs.ucx.framework.tasks import Workflow, job_task @@ -57,10 +59,53 @@ def migrate_views(self, ctx: RuntimeContext): """ ctx.tables_migrator.migrate_tables(what=What.VIEW) - @job_task(job_cluster="user_isolation", depends_on=[migrate_views]) - def update_migration_status(self, ctx: RuntimeContext): - """Refresh the migration status to present it in the dashboard.""" - ctx.tables_migrator.get_remaining_tables() + @job_task(job_cluster="user_isolation") + def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None: + """Verify the prerequisites for running this job on the table migration cluster are fulfilled.""" + ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) + + @job_task( + depends_on=[ + convert_managed_table, + migrate_external_tables_sync, + migrate_dbfs_root_delta_tables, + migrate_dbfs_root_non_delta_tables, + migrate_views, + verify_progress_tracking_prerequisites, + ], + ) + def update_table_inventory(self, ctx: RuntimeContext) -> None: + """Refresh the tables inventory, prior to updating the migration status of all the tables.""" + # The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not + # UC-enabled, so we cannot both snapshot and update the history log from the same location. + # Step 1 of 3: Just refresh the tables inventory. + ctx.tables_crawler.snapshot(force_refresh=True) + + @job_task(depends_on=[verify_progress_tracking_prerequisites, update_table_inventory], job_cluster="user_isolation") + def update_migration_status(self, ctx: RuntimeContext) -> None: + """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" + # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.) + updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) + ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress) + + @job_task( + depends_on=[verify_progress_tracking_prerequisites, update_migration_status], job_cluster="user_isolation" + ) + def update_tables_history_log(self, ctx: RuntimeContext) -> None: + """Update the history log with the latest tables inventory and migration status.""" + # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the + # history log. + # TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty. + tables_snapshot = ctx.tables_crawler.snapshot() + # Note: encoding the Table records will trigger loading of the migration-status data. + ctx.tables_progress.append_inventory_snapshot(tables_snapshot) + + @job_task( + job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log] + ) + def record_workflow_run(self, ctx: RuntimeContext) -> None: + """Record the workflow run of this workflow.""" + ctx.workflow_run_recorder.record() class MigrateHiveSerdeTablesInPlace(Workflow): @@ -86,10 +131,44 @@ def migrate_views(self, ctx: RuntimeContext): """ ctx.tables_migrator.migrate_tables(what=What.VIEW) - @job_task(job_cluster="user_isolation", depends_on=[migrate_views]) - def update_migration_status(self, ctx: RuntimeContext): - """Refresh the migration status to present it in the dashboard.""" - ctx.tables_migrator.get_remaining_tables() + @job_task(job_cluster="user_isolation") + def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None: + """Verify the prerequisites for running this job on the table migration cluster are fulfilled.""" + ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) + + @job_task(depends_on=[verify_progress_tracking_prerequisites, migrate_views]) + def update_table_inventory(self, ctx: RuntimeContext) -> None: + """Refresh the tables inventory, prior to updating the migration status of all the tables.""" + # The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not + # UC-enabled, so we cannot both snapshot and update the history log from the same location. + # Step 1 of 3: Just refresh the tables inventory. + ctx.tables_crawler.snapshot(force_refresh=True) + + @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_table_inventory]) + def update_migration_status(self, ctx: RuntimeContext) -> None: + """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" + # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.) + updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) + ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress) + + @job_task( + job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status] + ) + def update_tables_history_log(self, ctx: RuntimeContext) -> None: + """Update the history log with the latest tables inventory and migration status.""" + # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the + # history log. + # TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty. + tables_snapshot = ctx.tables_crawler.snapshot() + # Note: encoding the Table records will trigger loading of the migration-status data. + ctx.tables_progress.append_inventory_snapshot(tables_snapshot) + + @job_task( + job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log] + ) + def record_workflow_run(self, ctx: RuntimeContext) -> None: + """Record the workflow run of this workflow.""" + ctx.workflow_run_recorder.record() class MigrateExternalTablesCTAS(Workflow): @@ -120,10 +199,51 @@ def migrate_views(self, ctx: RuntimeContext): """ ctx.tables_migrator.migrate_tables(what=What.VIEW) - @job_task(job_cluster="user_isolation", depends_on=[migrate_views]) - def update_migration_status(self, ctx: RuntimeContext): - """Refresh the migration status to present it in the dashboard.""" - ctx.tables_migrator.get_remaining_tables() + @job_task(job_cluster="user_isolation") + def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None: + """Verify the prerequisites for running this job on the table migration cluster are fulfilled.""" + ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) + + @job_task( + depends_on=[ + verify_progress_tracking_prerequisites, + migrate_views, + migrate_hive_serde_ctas, + migrate_other_external_ctas, + ] + ) + def update_table_inventory(self, ctx: RuntimeContext) -> None: + """Refresh the tables inventory, prior to updating the migration status of all the tables.""" + # The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not + # UC-enabled, so cannot both snapshot and update the history log from the same location. + # Step 1 of 3: Just refresh the tables inventory. + ctx.tables_crawler.snapshot(force_refresh=True) + + @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_table_inventory]) + def update_migration_status(self, ctx: RuntimeContext) -> None: + """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" + # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.) + updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) + ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress) + + @job_task( + job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status] + ) + def update_tables_history_log(self, ctx: RuntimeContext) -> None: + """Update the history log with the latest tables inventory and migration status.""" + # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the + # history log. + # TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty. + tables_snapshot = ctx.tables_crawler.snapshot() + # Note: encoding the Table records will trigger loading of the migration-status data. + ctx.tables_progress.append_inventory_snapshot(tables_snapshot) + + @job_task( + job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log] + ) + def record_workflow_run(self, ctx: RuntimeContext) -> None: + """Record the workflow run of this workflow.""" + ctx.workflow_run_recorder.record() class ScanTablesInMounts(Workflow): @@ -137,10 +257,36 @@ def scan_tables_in_mounts_experimental(self, ctx: RuntimeContext): replacing any existing content that might be present.""" ctx.tables_in_mounts.snapshot(force_refresh=True) - @job_task(job_cluster="user_isolation", depends_on=[scan_tables_in_mounts_experimental]) - def update_migration_status(self, ctx: RuntimeContext): - """Refresh the migration status to present it in the dashboard.""" - ctx.tables_migrator.get_remaining_tables() + @job_task(job_cluster="user_isolation") + def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None: + """Verify the prerequisites for running this job on the table migration cluster are fulfilled.""" + ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) + + @job_task( + job_cluster="user_isolation", + depends_on=[verify_progress_tracking_prerequisites, scan_tables_in_mounts_experimental], + ) + def update_migration_status(self, ctx: RuntimeContext) -> None: + """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" + updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) + ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress) + + @job_task( + job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status] + ) + def update_tables_history_log(self, ctx: RuntimeContext) -> None: + """Update the history log with the latest tables inventory and migration status.""" + # TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty. + tables_snapshot = ctx.tables_crawler.snapshot() + # Note: encoding the Table records will trigger loading of the migration-status data. + ctx.tables_progress.append_inventory_snapshot(tables_snapshot) + + @job_task( + job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log] + ) + def record_workflow_run(self, ctx: RuntimeContext) -> None: + """Record the workflow run of this workflow.""" + ctx.workflow_run_recorder.record() class MigrateTablesInMounts(Workflow): @@ -152,7 +298,41 @@ def migrate_tables_in_mounts_experimental(self, ctx: RuntimeContext): """[EXPERIMENTAL] This workflow migrates `delta tables stored in mount points` to Unity Catalog using a Create Table statement.""" ctx.tables_migrator.migrate_tables(what=What.TABLE_IN_MOUNT) - @job_task(job_cluster="user_isolation", depends_on=[migrate_tables_in_mounts_experimental]) - def update_migration_status(self, ctx: RuntimeContext): - """Refresh the migration status to present it in the dashboard.""" - ctx.tables_migrator.get_remaining_tables() + @job_task(job_cluster="user_isolation") + def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None: + """Verify the prerequisites for running this job on the table migration cluster are fulfilled.""" + ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) + + @job_task(depends_on=[verify_progress_tracking_prerequisites, migrate_tables_in_mounts_experimental]) + def update_table_inventory(self, ctx: RuntimeContext) -> None: + """Refresh the tables inventory, prior to updating the migration status of all the tables.""" + # The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not + # UC-enabled, so we cannot both snapshot and update the history log from the same location. + # Step 1 of 3: Just refresh the tables inventory. + ctx.tables_crawler.snapshot(force_refresh=True) + + @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_table_inventory]) + def update_migration_status(self, ctx: RuntimeContext) -> None: + """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" + # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.) + updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) + ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress) + + @job_task( + job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status] + ) + def update_tables_history_log(self, ctx: RuntimeContext) -> None: + """Update the history log with the latest tables inventory and migration status.""" + # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the + # history log. + # TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty. + tables_snapshot = ctx.tables_crawler.snapshot() + # Note: encoding the Table records will trigger loading of the migration-status data. + ctx.tables_progress.append_inventory_snapshot(tables_snapshot) + + @job_task( + job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log] + ) + def record_workflow_run(self, ctx: RuntimeContext) -> None: + """Record the workflow run of this workflow.""" + ctx.workflow_run_recorder.record() diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 62246637c1..838dce3dac 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -1223,7 +1223,7 @@ def prepare_regular_tables(context, external_csv, schema) -> dict[str, TableInfo @pytest.fixture def prepare_tables_for_migration( - ws, installation_ctx, make_catalog, make_random, make_mounted_location, env_or_skip, make_storage_dir, request + installation_ctx, make_catalog, make_random, make_mounted_location, env_or_skip, make_storage_dir, request ) -> tuple[dict[str, TableInfo], SchemaInfo]: # Here we use pytest indirect parametrization, so the test function can pass arguments to this fixture and the # arguments will be available in the request.param. If the argument is "hiveserde", we will prepare hiveserde diff --git a/tests/integration/hive_metastore/test_ext_hms.py b/tests/integration/hive_metastore/test_ext_hms.py index cc6f19bbbc..b10af5a7fc 100644 --- a/tests/integration/hive_metastore/test_ext_hms.py +++ b/tests/integration/hive_metastore/test_ext_hms.py @@ -8,6 +8,8 @@ from databricks.sdk.errors import NotFound, InvalidParameterValue from databricks.sdk.retries import retried +from databricks.labs.ucx.progress.install import ProgressTrackingInstallation + logger = logging.getLogger(__name__) @@ -24,15 +26,16 @@ def sql_backend(ws, env_or_skip) -> SqlBackend: @retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=5)) @pytest.mark.parametrize('prepare_tables_for_migration', ['regular'], indirect=True) -def test_migration_job_ext_hms(ws, installation_ctx, prepare_tables_for_migration, env_or_skip): - ext_hms_cluster_id = env_or_skip("TEST_EXT_HMS_CLUSTER_ID") +def test_migration_job_ext_hms(ws, installation_ctx, prepare_tables_for_migration, env_or_skip) -> None: + main_cluster_id = env_or_skip("TEST_EXT_HMS_NOUC_CLUSTER_ID") + table_migration_cluster_id = env_or_skip("TEST_EXT_HMS_CLUSTER_ID") tables, dst_schema = prepare_tables_for_migration ext_hms_ctx = installation_ctx.replace( config_transform=lambda wc: dataclasses.replace( wc, override_clusters={ - "main": ext_hms_cluster_id, - "user_isolation": ext_hms_cluster_id, + "main": main_cluster_id, + "user_isolation": table_migration_cluster_id, }, ), extend_prompts={ @@ -45,11 +48,19 @@ def test_migration_job_ext_hms(ws, installation_ctx, prepare_tables_for_migratio r"Choose a cluster policy": "0", }, ) - ext_hms_ctx.workspace_installation.run() - ext_hms_ctx.deployed_workflows.run_workflow("migrate-tables") + ProgressTrackingInstallation(ext_hms_ctx.sql_backend, ext_hms_ctx.ucx_catalog).run() + + # The assessment workflow is a prerequisite, and now verified by the workflow: it needs to successfully complete + # before we can test the migration workflow. + ext_hms_ctx.deployed_workflows.run_workflow("assessment", skip_job_wait=True) + workflow_completed_correctly = ext_hms_ctx.deployed_workflows.validate_step("assessment") + assert workflow_completed_correctly, "Workflow failed: assessment" + # assert the workflow is successful - assert ext_hms_ctx.deployed_workflows.validate_step("migrate-tables") + ext_hms_ctx.deployed_workflows.run_workflow("migrate-tables", skip_job_wait=True) + workflow_completed_correctly = ext_hms_ctx.deployed_workflows.validate_step("migrate-tables") + assert workflow_completed_correctly, "Workflow failed: migrate-tables" # assert the tables are migrated for table in tables.values(): diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index ef124ace22..9991960d94 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -3,6 +3,7 @@ from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.hive_metastore.tables import Table +from databricks.labs.ucx.progress.install import ProgressTrackingInstallation @pytest.mark.parametrize( @@ -15,7 +16,6 @@ indirect=("prepare_tables_for_migration",), ) def test_table_migration_job_refreshes_migration_status( - ws, installation_ctx, prepare_tables_for_migration, workflow, @@ -27,39 +27,68 @@ def test_table_migration_job_refreshes_migration_status( r".*Do you want to update the existing installation?.*": 'yes', }, ) - ctx.workspace_installation.run() - ctx.deployed_workflows.run_workflow(workflow) + ProgressTrackingInstallation(ctx.sql_backend, ctx.ucx_catalog).run() + + # The assessment workflow is a prerequisite, and now verified by the workflow: it needs to successfully complete + # before we can test these workflows. + ctx.deployed_workflows.run_workflow("assessment", skip_job_wait=True) + assessment_completed_correctly = ctx.deployed_workflows.validate_step("assessment") + assert assessment_completed_correctly, "Workflow failed: assessment" + + # The workflow under test. + run_id = ctx.deployed_workflows.run_workflow(workflow, skip_job_wait=True) + workflow_completed_correctly = ctx.deployed_workflows.validate_step(workflow) + assert workflow_completed_correctly, f"Workflow failed: {workflow}" - # Avoiding MigrationStatusRefresh as it will refresh the status before fetching - migration_status_query = f"SELECT * FROM {ctx.config.inventory_database}.migration_status" + # Avoiding MigrationStatusRefresh as it will refresh the status before fetching. + migration_status_query = f"SELECT * FROM {ctx.migration_status_refresher.full_name}" migration_statuses = list(ctx.sql_backend.fetch(migration_status_query)) - if len(migration_statuses) == 0: + if not migration_statuses: ctx.deployed_workflows.relay_logs(workflow) - assert False, "No migration statuses found" + pytest.fail("No migration statuses found") - asserts = [] + problems = [] for table in tables.values(): migration_status = [] for status in migration_statuses: if status.src_schema == table.schema_name and status.src_table == table.name: migration_status.append(status) - assert_message_postfix = f" found for {table.table_type} {table.full_name}" - if len(migration_status) == 0: - asserts.append("No migration status" + assert_message_postfix) - elif len(migration_status) > 1: - asserts.append("Multiple migration statuses" + assert_message_postfix) - elif migration_status[0].dst_schema is None: - asserts.append("No destination schema" + assert_message_postfix) - elif migration_status[0].dst_table is None: - asserts.append("No destination table" + assert_message_postfix) - - assert_message = ( - "\n".join(asserts) + " given migration statuses " + "\n".join([str(status) for status in migration_statuses]) + match migration_status: + case []: + problems.append(f"No migration status found for {table.table_type} {table.full_name}") + case [_, _, *_]: + problems.append(f"Multiple migration statuses found for {table.table_type} {table.full_name}") + case [status] if status.dst_schema is None: + problems.append(f"No destination schema found for {table.table_type} {table.full_name}") + case [status] if status.dst_table is None: + problems.append(f"No destination table found for {table.table_type} {table.full_name}") + + failure_message = ( + "\n".join(problems) + " given migration statuses:\n" + "\n".join([str(status) for status in migration_statuses]) ) - assert len(asserts) == 0, assert_message + assert not problems, failure_message + + # Ensure that the workflow populated the `workflow_runs` table. + query = f""" + SELECT 1 FROM {ctx.ucx_catalog}.multiworkspace.workflow_runs + WHERE workspace_id = {ctx.workspace_id} + AND workflow_run_id = {run_id} + LIMIT 1 + """ + assert any(ctx.sql_backend.fetch(query)), f"No workflow run captured: {query}" + + # Ensure that the history file has table records written to it that correspond to this run. + query = f""" + SELECT 1 from {ctx.ucx_catalog}.multiworkspace.historical + WHERE workspace_id = {ctx.workspace_id} + AND job_run_id = {run_id} + AND object_type = 'Table' + LIMIT 1 + """ + assert any(ctx.sql_backend.fetch(query)), f"No snapshots captured to the history log: {query}" @pytest.mark.parametrize( @@ -78,8 +107,15 @@ def test_table_migration_for_managed_table(ws, installation_ctx, prepare_tables_ r".*Do you want to update the existing installation?.*": 'yes', }, ) - ctx.workspace_installation.run() + ProgressTrackingInstallation(ctx.sql_backend, ctx.ucx_catalog).run() + + # The assessment workflow is a prerequisite, and now verified by the workflow: it needs to successfully complete + # before we can test the migration workflow. + installation_ctx.deployed_workflows.run_workflow("assessment") + assert installation_ctx.deployed_workflows.validate_step("assessment"), "Workflow failed: assessment" + + # The workflow under test. ctx.deployed_workflows.run_workflow(workflow) for table in tables.values(): @@ -104,6 +140,14 @@ def test_hiveserde_table_in_place_migration_job(ws, installation_ctx, prepare_ta }, ) ctx.workspace_installation.run() + ProgressTrackingInstallation(ctx.sql_backend, ctx.ucx_catalog).run() + + # The assessment workflow is a prerequisite, and now verified by the workflow: it needs to successfully complete + # before we can test the migration workflow. + installation_ctx.deployed_workflows.run_workflow("assessment") + assert installation_ctx.deployed_workflows.validate_step("assessment"), "Workflow failed: assessment" + + # The workflow under test. ctx.deployed_workflows.run_workflow("migrate-external-hiveserde-tables-in-place-experimental") # assert the workflow is successful assert ctx.deployed_workflows.validate_step("migrate-external-hiveserde-tables-in-place-experimental") @@ -124,6 +168,14 @@ def test_hiveserde_table_ctas_migration_job(ws, installation_ctx, prepare_tables }, ) ctx.workspace_installation.run() + ProgressTrackingInstallation(ctx.sql_backend, ctx.ucx_catalog).run() + + # The assessment workflow is a prerequisite, and now verified by the workflow: it needs to successfully complete + # before we can test the migration workflow. + installation_ctx.deployed_workflows.run_workflow("assessment") + assert installation_ctx.deployed_workflows.validate_step("assessment"), "Workflow failed: assessment" + + # The workflow under test. ctx.deployed_workflows.run_workflow("migrate-external-tables-ctas") # assert the workflow is successful assert ctx.deployed_workflows.validate_step("migrate-external-tables-ctas") @@ -137,10 +189,11 @@ def test_hiveserde_table_ctas_migration_job(ws, installation_ctx, prepare_tables @pytest.mark.parametrize('prepare_tables_for_migration', ['regular'], indirect=True) def test_table_migration_job_publishes_remaining_tables( - ws, installation_ctx, sql_backend, prepare_tables_for_migration, caplog + installation_ctx, sql_backend, prepare_tables_for_migration, caplog ): tables, dst_schema = prepare_tables_for_migration installation_ctx.workspace_installation.run() + ProgressTrackingInstallation(installation_ctx.sql_backend, installation_ctx.ucx_catalog).run() second_table = list(tables.values())[1] table = Table( "hive_metastore", @@ -150,6 +203,13 @@ def test_table_migration_job_publishes_remaining_tables( table_format="UNKNOWN", ) installation_ctx.table_mapping.skip_table_or_view(dst_schema.name, second_table.name, load_table=lambda *_: table) + + # The assessment workflow is a prerequisite, and now verified by the workflow: it needs to successfully complete + # before we can test the migration workflow. + installation_ctx.deployed_workflows.run_workflow("assessment") + assert installation_ctx.deployed_workflows.validate_step("assessment"), "Workflow failed: assessment" + + # The workflow under test. installation_ctx.deployed_workflows.run_workflow("migrate-tables") assert installation_ctx.deployed_workflows.validate_step("migrate-tables") diff --git a/tests/unit/hive_metastore/test_table_migrate.py b/tests/unit/hive_metastore/test_table_migrate.py index d5eea819ec..1712852585 100644 --- a/tests/unit/hive_metastore/test_table_migrate.py +++ b/tests/unit/hive_metastore/test_table_migrate.py @@ -1378,11 +1378,9 @@ def test_revert_migrated_tables_failed(caplog, mock_pyspark): assert "Failed to revert table hive_metastore.test_schema1.test_table1: error" in caplog.text -def test_refresh_migration_status_published_remained_tables(caplog, mock_pyspark): - backend = MockBackend() +def test_refresh_migration_status_check_remaining_tables(ws, mock_backend, caplog) -> None: table_crawler = create_autospec(TablesCrawler) - client = mock_workspace_client() - table_crawler.snapshot.return_value = [ + tables = ( Table( object_type="EXTERNAL", table_format="DELTA", @@ -1409,32 +1407,35 @@ def test_refresh_migration_status_published_remained_tables(caplog, mock_pyspark name="table3", location="s3://some_location/table3", ), - ] - table_mapping = mock_table_mapping() - migration_status_refresher = create_autospec(TableMigrationStatusRefresher) - migration_index = TableMigrationIndex( - [ - TableMigrationStatus("schema1", "table1", "ucx_default", "db1_dst", "dst_table1"), - TableMigrationStatus("schema1", "table2", "ucx_default", "db1_dst", "dst_table2"), - ] ) - migration_status_refresher.index.return_value = migration_index + table_crawler.snapshot.return_value = tables + + def migration_status_from_table(table: Table) -> TableMigrationStatus: + catalog, schema, name = table.upgraded_to.split(".", maxsplit=2) if table.upgraded_to else (None, None, None) + return TableMigrationStatus(table.database, table.name, catalog, schema, name) + + migration_status_snapshot = tuple(migration_status_from_table(table) for table in tables) + migration_status_refresher = create_autospec(TableMigrationStatusRefresher) migrate_grants = create_autospec(MigrateGrants) external_locations = create_autospec(ExternalLocations) table_migrate = TablesMigrator( table_crawler, - client, - backend, - table_mapping, + ws, + mock_backend, + mock_table_mapping(), migration_status_refresher, migrate_grants, external_locations, ) + with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.hive_metastore"): - tables = table_migrate.get_remaining_tables() + table_migrate.warn_about_remaining_non_migrated_tables(migration_status_snapshot) assert 'remained-hive-metastore-table: hive_metastore.schema1.table3' in caplog.messages - assert len(tables) == 1 and tables[0].key == "hive_metastore.schema1.table3" - migrate_grants.assert_not_called() + + table_crawler.snapshot.assert_called_once() + migration_status_refresher.index.assert_not_called() + migration_status_refresher.snapshot.assert_not_called() + migrate_grants.apply.assert_not_called() external_locations.resolve_mount.assert_not_called() diff --git a/tests/unit/hive_metastore/test_workflows.py b/tests/unit/hive_metastore/test_workflows.py index 7d2a05ec8f..d4dd6fda66 100644 --- a/tests/unit/hive_metastore/test_workflows.py +++ b/tests/unit/hive_metastore/test_workflows.py @@ -1,5 +1,12 @@ +import datetime as dt +from collections.abc import Sequence + import pytest +from databricks.sdk.service.catalog import CatalogInfo, MetastoreAssignment +from databricks.sdk.service.jobs import BaseRun, RunResultState, RunState + +from databricks.labs.ucx.framework.tasks import Workflow from databricks.labs.ucx.hive_metastore.workflows import ( TableMigration, MigrateExternalTablesCTAS, @@ -54,18 +61,120 @@ def test_migrate_ctas_views(run_workflow): ctx.workspace_client.catalogs.list.assert_called() +_migration_workflows: Sequence[type[Workflow]] = ( + TableMigration, + MigrateHiveSerdeTablesInPlace, + MigrateExternalTablesCTAS, + ScanTablesInMounts, + MigrateTablesInMounts, +) + + +@pytest.mark.parametrize( + "task", + [getattr(workflow, "verify_progress_tracking_prerequisites") for workflow in _migration_workflows], + ids=[workflow.__name__ for workflow in _migration_workflows], +) +def test_with_valid_prerequisites(ws, run_workflow, task) -> None: + ws.metastores.current.return_value = MetastoreAssignment(metastore_id="test", workspace_id=123456789) + ws.catalogs.get.return_value = CatalogInfo() + ws.jobs.list_runs.return_value = [BaseRun(state=RunState(result_state=RunResultState.SUCCESS))] + run_workflow(task, workspace_client=ws) + # run_workflow will raise RuntimeError if the prerequisites could not be verified. + + +@pytest.mark.parametrize( + "task", + [getattr(workflow, "verify_progress_tracking_prerequisites") for workflow in _migration_workflows], + ids=[workflow.__name__ for workflow in _migration_workflows], +) +def test_with_invalid_prerequisites(ws, run_workflow, task) -> None: + """All invalid prerequisites permutations are tested for `VerifyProgressTracking` separately.""" + ws.metastores.current.return_value = None + with pytest.raises(RuntimeWarning, match="Metastore not attached to workspace."): + run_workflow(task, workspace_client=ws) + + @pytest.mark.parametrize( "workflow", - [ - TableMigration, - MigrateHiveSerdeTablesInPlace, - MigrateExternalTablesCTAS, - ScanTablesInMounts, - MigrateTablesInMounts, - ], + # Special case here for ScanTablesInMounts, handled below. + [workflow for workflow in _migration_workflows if workflow is not ScanTablesInMounts], ) def test_update_migration_status(run_workflow, workflow) -> None: - """Migration status is refreshed by deleting and showing new tables""" - ctx = run_workflow(getattr(workflow, "update_migration_status")) - assert ctx.sql_backend.has_rows_written_for("hive_metastore.ucx.migration_status") - assert "SHOW DATABASES" in ctx.sql_backend.queries + """Migration status is refreshed by updating the table inventory, migration status and history log.""" + task1 = getattr(workflow, "update_table_inventory") + task2 = getattr(workflow, "update_migration_status") + task3 = getattr(workflow, "update_tables_history_log") + assert task1.__name__ in task2.__task__.depends_on and task2.__name__ in task3.__task__.depends_on + + # Refresh tables inventory. + ctx1 = run_workflow(task1) + assert "SHOW DATABASES" in ctx1.sql_backend.queries + assert ctx1.sql_backend.has_rows_written_for("hive_metastore.ucx.tables") + + # Given the tables inventory, refresh the migration status of the tables. + ctx2 = run_workflow(task2, named_parameters={"parent_run_id": "53"}) + assert "SELECT * FROM `hive_metastore`.`ucx`.`tables`" in ctx2.sql_backend.queries + assert ctx2.sql_backend.has_rows_written_for("hive_metastore.ucx.migration_status") + + # Given the tables inventory and migration status snapshot, update the historical log. + ctx3 = run_workflow(task3, named_parameters={"parent_run_id": "53"}) + assert "SELECT * FROM `hive_metastore`.`ucx`.`tables`" in ctx3.sql_backend.queries + assert "SELECT * FROM `hive_metastore`.`ucx`.`migration_status`" in ctx3.sql_backend.queries + assert ctx3.sql_backend.has_rows_written_for("`ucx`.`multiworkspace`.`historical`") + + +def test_scan_tables_in_mounts_update_migration_status(run_workflow) -> None: + """Migration status is refreshed by updating the migration status and history log.""" + task1 = ScanTablesInMounts.update_migration_status + task2 = ScanTablesInMounts.update_tables_history_log + assert task1.__name__ in getattr(task2, "__task__").depends_on + + # Given the tables inventory, refresh the migration status of the tables. + ctx1 = run_workflow(task1, named_parameters={"parent_run_id": "53"}) + assert "SELECT * FROM `hive_metastore`.`ucx`.`tables`" in ctx1.sql_backend.queries + assert ctx1.sql_backend.has_rows_written_for("hive_metastore.ucx.migration_status") + + # Given the tables inventory and migration status snapshot, update the historical log. + ctx2 = run_workflow(task2, named_parameters={"parent_run_id": "53"}) + assert "SELECT * FROM `hive_metastore`.`ucx`.`tables`" in ctx2.sql_backend.queries + assert "SELECT * FROM `hive_metastore`.`ucx`.`migration_status`" in ctx2.sql_backend.queries + assert ctx2.sql_backend.has_rows_written_for("`ucx`.`multiworkspace`.`historical`") + + +@pytest.mark.parametrize( + "task", + [getattr(workflow, "record_workflow_run") for workflow in _migration_workflows], + ids=[workflow.__name__ for workflow in _migration_workflows], +) +def test_migration_record_workflow_run(run_workflow, task) -> None: + """Verify that we log the workflow run.""" + start_time = dt.datetime.now(dt.timezone.utc).replace(microsecond=0) + context_replacements = { + "named_parameters": { + "workflow": "the_workflow", + "job_id": "12345", + "parent_run_id": "456", + "attempt": "1", + "start_time": start_time.isoformat(), + }, + } + + ctx = run_workflow(task, **context_replacements) + + rows = ctx.sql_backend.rows_written_for("ucx.multiworkspace.workflow_runs", "append") + + rows_as_dict = [{k: v for k, v in rows.asDict().items() if k != 'finished_at'} for rows in rows] + assert rows_as_dict == [ + { + "started_at": start_time, + # finished_at: checked below. + "workspace_id": 123, + "workflow_name": "the_workflow", + "workflow_id": 12345, + "workflow_run_id": 456, + "workflow_run_attempt": 1, + } + ] + # Finish-time must be indistinguishable from or later than the start time. + assert all(row["started_at"] <= row["finished_at"] for row in rows)