From 905fefca3f688a71b23c15734aa54594f9bbbf8e Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 6 Nov 2024 18:02:41 +0100 Subject: [PATCH 01/32] Remove some unnecessary code. --- tests/unit/progress/test_tables.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tests/unit/progress/test_tables.py b/tests/unit/progress/test_tables.py index a859bf5c03..9b3ef962e3 100644 --- a/tests/unit/progress/test_tables.py +++ b/tests/unit/progress/test_tables.py @@ -6,7 +6,6 @@ from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex from databricks.labs.ucx.hive_metastore.tables import Table -from databricks.labs.ucx.progress.grants import GrantProgressEncoder from databricks.labs.ucx.progress.tables import TableProgressEncoder @@ -21,7 +20,6 @@ def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None: ownership.owner_of.return_value = "user" table_migration_index = create_autospec(TableMigrationIndex) table_migration_index.is_migrated.return_value = True - grant_progress_encoder = create_autospec(GrantProgressEncoder) encoder = TableProgressEncoder( mock_backend, ownership, table_migration_index, run_id=1, workspace_id=123456789, catalog="test" ) @@ -33,7 +31,6 @@ def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None: assert len(rows[0].failures) == 0 ownership.owner_of.assert_called_once() table_migration_index.is_migrated.assert_called_with(table.database, table.name) - grant_progress_encoder.assert_not_called() @pytest.mark.parametrize( @@ -47,7 +44,6 @@ def test_table_progress_encoder_pending_migration_failure(mock_backend, table: T ownership.owner_of.return_value = "user" table_migration_index = create_autospec(TableMigrationIndex) table_migration_index.is_migrated.return_value = False - grant_progress_encoder = create_autospec(GrantProgressEncoder) encoder = TableProgressEncoder( mock_backend, ownership, table_migration_index, run_id=1, workspace_id=123456789, catalog="test" ) @@ -59,4 +55,3 @@ def test_table_progress_encoder_pending_migration_failure(mock_backend, table: T assert rows[0].failures == ["Pending migration"] ownership.owner_of.assert_called_once() table_migration_index.is_migrated.assert_called_with(table.database, table.name) - grant_progress_encoder.assert_not_called() From c23c38a6bc4d34f9c52380e8740e147de618199a Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 6 Nov 2024 19:10:44 +0100 Subject: [PATCH 02/32] Update table history logger to not trigger a refresh of the migration-status index: this is supposed to be handled explicitly. --- .../labs/ucx/contexts/workflow_task.py | 10 +++---- .../hive_metastore/table_migration_status.py | 4 +-- src/databricks/labs/ucx/progress/grants.py | 4 +-- src/databricks/labs/ucx/progress/history.py | 29 +++++++++++++++---- src/databricks/labs/ucx/progress/jobs.py | 4 +-- src/databricks/labs/ucx/progress/tables.py | 24 ++++++++++----- src/databricks/labs/ucx/progress/workflows.py | 2 +- tests/unit/progress/test_tables.py | 27 ++++++++++------- 8 files changed, 69 insertions(+), 35 deletions(-) diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index a3656b290b..f9fea563ca 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -22,10 +22,10 @@ from databricks.labs.ucx.contexts.application import GlobalContext from databricks.labs.ucx.hive_metastore import TablesInMounts, TablesCrawler from databricks.labs.ucx.hive_metastore.table_size import TableSizeCrawler -from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler +from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler, Table from databricks.labs.ucx.hive_metastore.udfs import Udf from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder -from databricks.labs.ucx.progress.grants import GrantProgressEncoder +from databricks.labs.ucx.progress.grants import GrantProgressEncoder, Grant from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.labs.ucx.progress.jobs import JobsProgressEncoder from databricks.labs.ucx.progress.tables import TableProgressEncoder @@ -189,7 +189,7 @@ def policies_progress(self) -> ProgressEncoder[PolicyInfo]: ) @cached_property - def grants_progress(self) -> GrantProgressEncoder: + def grants_progress(self) -> ProgressEncoder[Grant]: return GrantProgressEncoder( self.sql_backend, self.grant_ownership, @@ -221,11 +221,11 @@ def pipelines_progress(self) -> ProgressEncoder[PipelineInfo]: ) @cached_property - def tables_progress(self) -> TableProgressEncoder: + def tables_progress(self) -> ProgressEncoder[Table]: return TableProgressEncoder( self.sql_backend, self.table_ownership, - self.migration_status_refresher.index(force_refresh=False), + self.migration_status_refresher, self.parent_run_id, self.workspace_id, self.config.ucx_catalog, diff --git a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py index dde5f17790..c40e3d3c1e 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migration_status.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migration_status.py @@ -53,7 +53,7 @@ def key(self): class TableMigrationIndex: - def __init__(self, tables: list[TableMigrationStatus]): + def __init__(self, tables: Iterable[TableMigrationStatus]): self._index = {(ms.src_schema, ms.src_table): ms for ms in tables} def is_migrated(self, schema: str, table: str) -> bool: @@ -84,7 +84,7 @@ def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema, tables_ self._tables_crawler = tables_crawler def index(self, *, force_refresh: bool = False) -> TableMigrationIndex: - return TableMigrationIndex(list(self.snapshot(force_refresh=force_refresh))) + return TableMigrationIndex(self.snapshot(force_refresh=force_refresh)) def get_seen_tables(self) -> dict[str, str]: seen_tables: dict[str, str] = {} diff --git a/src/databricks/labs/ucx/progress/grants.py b/src/databricks/labs/ucx/progress/grants.py index 5807799fa2..a393d31d87 100644 --- a/src/databricks/labs/ucx/progress/grants.py +++ b/src/databricks/labs/ucx/progress/grants.py @@ -25,8 +25,8 @@ def __init__( ) -> None: super().__init__(sql_backend, ownership, Grant, run_id, workspace_id, catalog, schema, table) - def _encode_record_as_historical(self, record: Grant) -> Historical: - historical = super()._encode_record_as_historical(record) + def _encode_record_as_historical(self, record: Grant, snapshot_context: None) -> Historical: + historical = super()._encode_record_as_historical(record, snapshot_context) failures = [] if not record.uc_grant_sql(): type_, key = record.this_type_and_key() diff --git a/src/databricks/labs/ucx/progress/history.py b/src/databricks/labs/ucx/progress/history.py index b1f2847807..b599e6cbad 100644 --- a/src/databricks/labs/ucx/progress/history.py +++ b/src/databricks/labs/ucx/progress/history.py @@ -4,8 +4,9 @@ import typing import json import logging +from contextlib import contextmanager from enum import Enum, EnumMeta -from collections.abc import Iterable, Sequence +from collections.abc import Iterable, Sequence, Generator from typing import Any, ClassVar, Generic, Protocol, TypeVar, get_type_hints, final from databricks.labs.lsql.backends import SqlBackend @@ -282,11 +283,27 @@ def full_name(self) -> str: @final def append_inventory_snapshot(self, snapshot: Iterable[Record]) -> None: - history_records = [self._encode_record_as_historical(record) for record in snapshot] - logger.debug(f"Appending {len(history_records)} {self._klass} record(s) to history.") - # This is the only writer, and the mode is 'append'. This is documented as conflict-free. - self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append") + with self._snapshot_context() as ctx: + history_records = [self._encode_record_as_historical(record, ctx) for record in snapshot] + logger.debug(f"Appending {len(history_records)} {self._klass} record(s) to history.") + # This is the only writer, and the mode is 'append'. This is documented as conflict-free. + self._sql_backend.save_table( + escape_sql_identifier(self.full_name), history_records, Historical, mode="append" + ) + + SC: ClassVar = type(None) + + @contextmanager + def _snapshot_context(self) -> Generator[SC, None, None]: + """A context manager that is held open while a snapshot is underway. + + The context itself is passed as a parameter to :method:`_encode_record_as_historical`. As a manager, preparation + and cleanup can take place before and after the snapshot takes place. + """ + yield - def _encode_record_as_historical(self, record: Record) -> Historical: + def _encode_record_as_historical(self, record: Record, snapshot_context: SC) -> Historical: """Encode a snapshot record as a historical log entry.""" + # Snapshot context not needed with default implementation. + _ = snapshot_context return self._encoder.to_historical(record) diff --git a/src/databricks/labs/ucx/progress/jobs.py b/src/databricks/labs/ucx/progress/jobs.py index 198139543c..17e1471569 100644 --- a/src/databricks/labs/ucx/progress/jobs.py +++ b/src/databricks/labs/ucx/progress/jobs.py @@ -48,7 +48,7 @@ def _job_problems(self) -> dict[int, list[str]]: index[job_problem.job_id].append(failure) return index - def _encode_record_as_historical(self, record: JobInfo) -> Historical: - historical = super()._encode_record_as_historical(record) + def _encode_record_as_historical(self, record: JobInfo, snapshot_context: None) -> Historical: + historical = super()._encode_record_as_historical(record, snapshot_context) failures = self._job_problems.get(int(record.job_id), []) return replace(historical, failures=historical.failures + failures) diff --git a/src/databricks/labs/ucx/progress/tables.py b/src/databricks/labs/ucx/progress/tables.py index 6dc76132e2..6550d1526f 100644 --- a/src/databricks/labs/ucx/progress/tables.py +++ b/src/databricks/labs/ucx/progress/tables.py @@ -1,9 +1,13 @@ +from collections.abc import Generator +from contextlib import contextmanager from dataclasses import replace +from typing import ClassVar from databricks.labs.lsql.backends import SqlBackend +from databricks.labs.ucx.framework.crawlers import CrawlerBase from databricks.labs.ucx.hive_metastore.tables import Table -from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex +from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex, TableMigrationStatus from databricks.labs.ucx.hive_metastore.ownership import TableOwnership from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.labs.ucx.progress.install import Historical @@ -21,7 +25,7 @@ def __init__( self, sql_backend: SqlBackend, ownership: TableOwnership, - table_migration_index: TableMigrationIndex, + migration_status_refresher: CrawlerBase[TableMigrationStatus], run_id: int, workspace_id: int, catalog: str, @@ -38,17 +42,23 @@ def __init__( schema, table, ) - self._table_migration_index = table_migration_index + self._migration_status_refresher = migration_status_refresher - def _encode_record_as_historical(self, record: Table) -> Historical: + SC: ClassVar = TableMigrationIndex + + @contextmanager + def _snapshot_context(self) -> Generator[SC, None, None]: + yield TableMigrationIndex(self._migration_status_refresher.snapshot()) + + def _encode_record_as_historical(self, record: Table, snapshot_context: SC) -> Historical: """Encode record as historical. - A table failure means that the table is pending migration. Grants are purposefully lef out, because a grant + A table failure means that the table is pending migration. Grants are purposefully left out, because a grant might not be mappable to UC, like `READ_METADATA`, thus possibly resulting in false "pending migration" failure for tables that are migrated to UC with their relevant grants also being migrated. """ - historical = super()._encode_record_as_historical(record) + historical = super()._encode_record_as_historical(record, snapshot_context=None) failures = [] - if not self._table_migration_index.is_migrated(record.database, record.name): + if not snapshot_context.is_migrated(record.database, record.name): failures.append("Pending migration") return replace(historical, failures=historical.failures + failures) diff --git a/src/databricks/labs/ucx/progress/workflows.py b/src/databricks/labs/ucx/progress/workflows.py index 060b2fdccf..1380b25a5b 100644 --- a/src/databricks/labs/ucx/progress/workflows.py +++ b/src/databricks/labs/ucx/progress/workflows.py @@ -58,7 +58,7 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: """Update the history log with the latest tables inventory snapshot.""" # The table migration cluster is not legacy-ACL enabled, so we can't crawl from here. # Step 2 of 2: Assuming (due to depends-on) the inventory was refreshed, capture into the history log. - # WARNING: this will fail if the inventory is empty, because it will then try to perform a crawl. + # WARNING: this will fail if the tables inventory is empty, because it will then try to perform a crawl. history_log = ctx.tables_progress tables_snapshot = ctx.tables_crawler.snapshot() history_log.append_inventory_snapshot(tables_snapshot) diff --git a/tests/unit/progress/test_tables.py b/tests/unit/progress/test_tables.py index 9b3ef962e3..baf08de26d 100644 --- a/tests/unit/progress/test_tables.py +++ b/tests/unit/progress/test_tables.py @@ -4,7 +4,10 @@ from databricks.labs.ucx.framework.owners import Ownership from databricks.labs.ucx.framework.utils import escape_sql_identifier -from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex +from databricks.labs.ucx.hive_metastore.table_migration_status import ( + TableMigrationStatusRefresher, + TableMigrationStatus, +) from databricks.labs.ucx.hive_metastore.tables import Table from databricks.labs.ucx.progress.tables import TableProgressEncoder @@ -18,19 +21,21 @@ def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None: ownership = create_autospec(Ownership) ownership.owner_of.return_value = "user" - table_migration_index = create_autospec(TableMigrationIndex) - table_migration_index.is_migrated.return_value = True + migration_status_crawler = create_autospec(TableMigrationStatusRefresher) + migration_status_crawler.snapshot.return_value = ( + TableMigrationStatus(table.database, table.name, "main", "default", table.name, update_ts=None), + ) encoder = TableProgressEncoder( - mock_backend, ownership, table_migration_index, run_id=1, workspace_id=123456789, catalog="test" + mock_backend, ownership, migration_status_crawler, run_id=1, workspace_id=123456789, catalog="test" ) encoder.append_inventory_snapshot([table]) rows = mock_backend.rows_written_for(escape_sql_identifier(encoder.full_name), "append") - assert len(rows) > 0, f"No rows written for: {encoder.full_name}" + assert rows, f"No rows written for: {encoder.full_name}" assert len(rows[0].failures) == 0 ownership.owner_of.assert_called_once() - table_migration_index.is_migrated.assert_called_with(table.database, table.name) + migration_status_crawler.snapshot.assert_called_once() @pytest.mark.parametrize( @@ -42,10 +47,12 @@ def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None: def test_table_progress_encoder_pending_migration_failure(mock_backend, table: Table) -> None: ownership = create_autospec(Ownership) ownership.owner_of.return_value = "user" - table_migration_index = create_autospec(TableMigrationIndex) - table_migration_index.is_migrated.return_value = False + migration_status_crawler = create_autospec(TableMigrationStatusRefresher) + migration_status_crawler.snapshot.return_value = ( + TableMigrationStatus(table.database, table.name), # No destination: therefore not yet migrated. + ) encoder = TableProgressEncoder( - mock_backend, ownership, table_migration_index, run_id=1, workspace_id=123456789, catalog="test" + mock_backend, ownership, migration_status_crawler, run_id=1, workspace_id=123456789, catalog="test" ) encoder.append_inventory_snapshot([table]) @@ -54,4 +61,4 @@ def test_table_progress_encoder_pending_migration_failure(mock_backend, table: T assert len(rows) > 0, f"No rows written for: {encoder.full_name}" assert rows[0].failures == ["Pending migration"] ownership.owner_of.assert_called_once() - table_migration_index.is_migrated.assert_called_with(table.database, table.name) + migration_status_crawler.snapshot.assert_called_once() From 9ebb26c6f2f6f4c77d231d04b6fc12ff847eb88f Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Thu, 7 Nov 2024 17:24:07 +0100 Subject: [PATCH 03/32] Consistent import. --- src/databricks/labs/ucx/progress/history.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/databricks/labs/ucx/progress/history.py b/src/databricks/labs/ucx/progress/history.py index b599e6cbad..41ba114509 100644 --- a/src/databricks/labs/ucx/progress/history.py +++ b/src/databricks/labs/ucx/progress/history.py @@ -1,7 +1,6 @@ from __future__ import annotations import dataclasses import datetime as dt -import typing import json import logging from contextlib import contextmanager @@ -107,7 +106,7 @@ def _get_field_names_with_types(cls, klass: type[Record]) -> tuple[dict[str, typ # are produced automatically in a __future__.__annotations__ context). Unfortunately the dataclass mechanism # captures the type hints prior to resolution (which happens later in the class initialization process). # As such, we rely on dataclasses.fields() for the set of field names, but not the types which we fetch directly. - klass_type_hints = typing.get_type_hints(klass) + klass_type_hints = get_type_hints(klass) field_names = [field.name for field in dataclasses.fields(klass)] field_names_with_types = {field_name: klass_type_hints[field_name] for field_name in field_names} if "failures" not in field_names_with_types: From e861963eb148a532f3548ff5fbd8bff65d39c733 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Thu, 7 Nov 2024 17:25:36 +0100 Subject: [PATCH 04/32] Update the various table-migration workflows to also update the history log when refreshing the migration status at the end. --- .../labs/ucx/hive_metastore/table_migrate.py | 16 +- .../labs/ucx/hive_metastore/workflows.py | 187 ++++++++++++++++-- .../hive_metastore/test_workflows.py | 64 ++++-- .../unit/hive_metastore/test_table_migrate.py | 91 +++++---- tests/unit/hive_metastore/test_workflows.py | 103 ++++++++-- 5 files changed, 360 insertions(+), 101 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index ca80030258..8b79e6c95a 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -2,6 +2,7 @@ import logging import re from collections import defaultdict +from collections.abc import Iterable from functools import partial, cached_property from databricks.labs.blueprint.parallel import Threads @@ -19,7 +20,11 @@ TableToMigrate, ) -from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatusRefresher +from databricks.labs.ucx.hive_metastore.table_migration_status import ( + TableMigrationStatusRefresher, + TableMigrationStatus, + TableMigrationIndex, +) from databricks.labs.ucx.hive_metastore.tables import ( MigrationCount, Table, @@ -55,14 +60,11 @@ def __init__( self._migrate_grants = migrate_grants self._external_locations = external_locations - def get_remaining_tables(self) -> list[Table]: - self.index(force_refresh=True) - table_rows = [] + def check_remaining_tables(self, migration_status: Iterable[TableMigrationStatus]): + migration_index = TableMigrationIndex(migration_status) for crawled_table in self._tables_crawler.snapshot(): - if not self._is_migrated(crawled_table.database, crawled_table.name): - table_rows.append(crawled_table) + if not migration_index.is_migrated(crawled_table.database, crawled_table.name): logger.warning(f"remained-hive-metastore-table: {crawled_table.key}") - return table_rows def index(self, *, force_refresh: bool = False): return self._migration_status_refresher.index(force_refresh=force_refresh) diff --git a/src/databricks/labs/ucx/hive_metastore/workflows.py b/src/databricks/labs/ucx/hive_metastore/workflows.py index a07f8b6746..66818b7c74 100644 --- a/src/databricks/labs/ucx/hive_metastore/workflows.py +++ b/src/databricks/labs/ucx/hive_metastore/workflows.py @@ -57,10 +57,50 @@ def migrate_views(self, ctx: RuntimeContext): """ ctx.tables_migrator.migrate_tables(what=What.VIEW) - @job_task(job_cluster="table_migration", depends_on=[migrate_views]) - def update_migration_status(self, ctx: RuntimeContext): - """Refresh the migration status to present it in the dashboard.""" - ctx.tables_migrator.get_remaining_tables() + @job_task(job_cluster="tacl") + def setup_tacl(self, ctx: RuntimeContext): + """(Optimization) Allow the TACL job cluster to be started before we commence refreshing the tables inventory.""" + + @job_task( + depends_on=[ + convert_managed_table, + migrate_external_tables_sync, + migrate_dbfs_root_delta_tables, + migrate_dbfs_root_non_delta_tables, + migrate_views, + setup_tacl, + ], + job_cluster="tacl", + ) + def update_table_inventory(self, ctx: RuntimeContext) -> None: + """Refresh the tables inventory, prior to updating the migration status of all the tables.""" + # The TACL cluster is not UC-enabled, so we cannot the snapshot cannot be written immediately to the history log. + # Step 1 of 3: Just refresh the tables inventory. + ctx.tables_crawler.snapshot(force_refresh=True) + + @job_task(depends_on=[update_table_inventory], job_cluster="table_migration") + def update_migration_status(self, ctx: RuntimeContext) -> None: + """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" + # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on a TACL cluster.) + updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) + ctx.tables_migrator.check_remaining_tables(updated_migration_progress) + + @job_task(depends_on=[update_migration_status], job_cluster="table_migration") + def update_tables_history_log(self, ctx: RuntimeContext) -> None: + """Update the history log with the latest tables inventory and migration status.""" + # The table migration cluster is not legacy-ACL enabled, so we can't crawl from here. + # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status was refreshed, capture into the + # history log. + # WARNING: this will fail if the tables inventory is empty, because it will then try to perform a crawl. + history_log = ctx.tables_progress + tables_snapshot = ctx.tables_crawler.snapshot() + # Note: encoding the Table records will trigger loading of the migration-status data. + history_log.append_inventory_snapshot(tables_snapshot) + + @job_task(job_cluster="table_migration", depends_on=[update_tables_history_log]) + def record_workflow_run(self, ctx: RuntimeContext) -> None: + """Record the workflow run of this workflow.""" + ctx.workflow_run_recorder.record() class MigrateHiveSerdeTablesInPlace(Workflow): @@ -86,10 +126,40 @@ def migrate_views(self, ctx: RuntimeContext): """ ctx.tables_migrator.migrate_tables(what=What.VIEW) - @job_task(job_cluster="table_migration", depends_on=[migrate_views]) - def update_migration_status(self, ctx: RuntimeContext): - """Refresh the migration status to present it in the dashboard.""" - ctx.tables_migrator.get_remaining_tables() + @job_task(job_cluster="tacl") + def setup_tacl(self, ctx: RuntimeContext): + """(Optimization) Allow the TACL job cluster to be started before we commence refreshing the tables inventory.""" + + @job_task(depends_on=[migrate_hive_serde_in_place, migrate_views, setup_tacl], job_cluster="tacl") + def update_table_inventory(self, ctx: RuntimeContext) -> None: + """Refresh the tables inventory, prior to updating the migration status of all the tables.""" + # The TACL cluster is not UC-enabled, so we cannot the snapshot cannot be written immediately to the history log. + # Step 1 of 3: Just refresh the tables inventory. + ctx.tables_crawler.snapshot(force_refresh=True) + + @job_task(depends_on=[update_table_inventory], job_cluster="table_migration") + def update_migration_status(self, ctx: RuntimeContext) -> None: + """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" + # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on a TACL cluster.) + updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) + ctx.tables_migrator.check_remaining_tables(updated_migration_progress) + + @job_task(depends_on=[update_migration_status], job_cluster="table_migration") + def update_tables_history_log(self, ctx: RuntimeContext) -> None: + """Update the history log with the latest tables inventory and migration status.""" + # The table migration cluster is not legacy-ACL enabled, so we can't crawl from here. + # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status was refreshed, capture into the + # history log. + # WARNING: this will fail if the tables inventory is empty, because it will then try to perform a crawl. + history_log = ctx.tables_progress + tables_snapshot = ctx.tables_crawler.snapshot() + # Note: encoding the Table records will trigger loading of the migration-status data. + history_log.append_inventory_snapshot(tables_snapshot) + + @job_task(job_cluster="table_migration", depends_on=[update_tables_history_log]) + def record_workflow_run(self, ctx: RuntimeContext) -> None: + """Record the workflow run of this workflow.""" + ctx.workflow_run_recorder.record() class MigrateExternalTablesCTAS(Workflow): @@ -120,10 +190,42 @@ def migrate_views(self, ctx: RuntimeContext): """ ctx.tables_migrator.migrate_tables(what=What.VIEW) - @job_task(job_cluster="table_migration", depends_on=[migrate_views]) - def update_migration_status(self, ctx: RuntimeContext): - """Refresh the migration status to present it in the dashboard.""" - ctx.tables_migrator.get_remaining_tables() + @job_task(job_cluster="tacl") + def setup_tacl(self, ctx: RuntimeContext): + """(Optimization) Allow the TACL job cluster to be started before we commence refreshing the tables inventory.""" + + @job_task( + depends_on=[migrate_other_external_ctas, migrate_hive_serde_ctas, migrate_views, setup_tacl], job_cluster="tacl" + ) + def update_table_inventory(self, ctx: RuntimeContext) -> None: + """Refresh the tables inventory, prior to updating the migration status of all the tables.""" + # The TACL cluster is not UC-enabled, so we cannot the snapshot cannot be written immediately to the history log. + # Step 1 of 3: Just refresh the tables inventory. + ctx.tables_crawler.snapshot(force_refresh=True) + + @job_task(depends_on=[update_table_inventory], job_cluster="table_migration") + def update_migration_status(self, ctx: RuntimeContext) -> None: + """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" + # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on a TACL cluster.) + updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) + ctx.tables_migrator.check_remaining_tables(updated_migration_progress) + + @job_task(depends_on=[update_migration_status], job_cluster="table_migration") + def update_tables_history_log(self, ctx: RuntimeContext) -> None: + """Update the history log with the latest tables inventory and migration status.""" + # The table migration cluster is not legacy-ACL enabled, so we can't crawl from here. + # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status was refreshed, capture into the + # history log. + # WARNING: this will fail if the tables inventory is empty, because it will then try to perform a crawl. + history_log = ctx.tables_progress + tables_snapshot = ctx.tables_crawler.snapshot() + # Note: encoding the Table records will trigger loading of the migration-status data. + history_log.append_inventory_snapshot(tables_snapshot) + + @job_task(job_cluster="table_migration", depends_on=[update_tables_history_log]) + def record_workflow_run(self, ctx: RuntimeContext) -> None: + """Record the workflow run of this workflow.""" + ctx.workflow_run_recorder.record() class ScanTablesInMounts(Workflow): @@ -137,10 +239,25 @@ def scan_tables_in_mounts_experimental(self, ctx: RuntimeContext): replacing any existing content that might be present.""" ctx.tables_in_mounts.snapshot(force_refresh=True) - @job_task(job_cluster="table_migration", depends_on=[scan_tables_in_mounts_experimental]) - def update_migration_status(self, ctx: RuntimeContext): - """Refresh the migration status to present it in the dashboard.""" - ctx.tables_migrator.get_remaining_tables() + @job_task(depends_on=[scan_tables_in_mounts_experimental], job_cluster="table_migration") + def update_migration_status(self, ctx: RuntimeContext) -> None: + """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" + updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) + ctx.tables_migrator.check_remaining_tables(updated_migration_progress) + + @job_task(depends_on=[update_migration_status], job_cluster="table_migration") + def update_tables_history_log(self, ctx: RuntimeContext) -> None: + """Update the history log with the latest tables inventory and migration status.""" + # WARNING: this will fail if the tables inventory is empty, because it will then try to perform a crawl. + history_log = ctx.tables_progress + tables_snapshot = ctx.tables_crawler.snapshot() + # Note: encoding the Table records will trigger loading of the migration-status data. + history_log.append_inventory_snapshot(tables_snapshot) + + @job_task(job_cluster="table_migration", depends_on=[update_tables_history_log]) + def record_workflow_run(self, ctx: RuntimeContext) -> None: + """Record the workflow run of this workflow.""" + ctx.workflow_run_recorder.record() class MigrateTablesInMounts(Workflow): @@ -152,7 +269,37 @@ def migrate_tables_in_mounts_experimental(self, ctx: RuntimeContext): """[EXPERIMENTAL] This workflow migrates `delta tables stored in mount points` to Unity Catalog using a Create Table statement.""" ctx.tables_migrator.migrate_tables(what=What.TABLE_IN_MOUNT) - @job_task(job_cluster="table_migration", depends_on=[migrate_tables_in_mounts_experimental]) - def update_migration_status(self, ctx: RuntimeContext): - """Refresh the migration status to present it in the dashboard.""" - ctx.tables_migrator.get_remaining_tables() + @job_task(job_cluster="tacl") + def setup_tacl(self, ctx: RuntimeContext): + """(Optimization) Allow the TACL job cluster to be started before we commence refreshing the tables inventory.""" + + @job_task(depends_on=[migrate_tables_in_mounts_experimental, setup_tacl], job_cluster="tacl") + def update_table_inventory(self, ctx: RuntimeContext) -> None: + """Refresh the tables inventory, prior to updating the migration status of all the tables.""" + # The TACL cluster is not UC-enabled, so we cannot the snapshot cannot be written immediately to the history log. + # Step 1 of 3: Just refresh the tables inventory. + ctx.tables_crawler.snapshot(force_refresh=True) + + @job_task(depends_on=[update_table_inventory], job_cluster="table_migration") + def update_migration_status(self, ctx: RuntimeContext) -> None: + """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" + # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on a TACL cluster.) + updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) + ctx.tables_migrator.check_remaining_tables(updated_migration_progress) + + @job_task(depends_on=[update_migration_status], job_cluster="table_migration") + def update_tables_history_log(self, ctx: RuntimeContext) -> None: + """Update the history log with the latest tables inventory and migration status.""" + # The table migration cluster is not legacy-ACL enabled, so we can't crawl from here. + # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status was refreshed, capture into the + # history log. + # WARNING: this will fail if the tables inventory is empty, because it will then try to perform a crawl. + history_log = ctx.tables_progress + tables_snapshot = ctx.tables_crawler.snapshot() + # Note: encoding the Table records will trigger loading of the migration-status data. + history_log.append_inventory_snapshot(tables_snapshot) + + @job_task(job_cluster="table_migration", depends_on=[update_tables_history_log]) + def record_workflow_run(self, ctx: RuntimeContext) -> None: + """Record the workflow run of this workflow.""" + ctx.workflow_run_recorder.record() diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index ef124ace22..b5a5723c60 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -3,6 +3,7 @@ from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.hive_metastore.tables import Table +from databricks.labs.ucx.progress.install import ProgressTrackingInstallation @pytest.mark.parametrize( @@ -11,11 +12,11 @@ ("regular", "migrate-tables"), ("hiveserde", "migrate-external-hiveserde-tables-in-place-experimental"), ("hiveserde", "migrate-external-tables-ctas"), + # TODO: Some workflows are missing here, and also need to be included in the tests. ], indirect=("prepare_tables_for_migration",), ) def test_table_migration_job_refreshes_migration_status( - ws, installation_ctx, prepare_tables_for_migration, workflow, @@ -27,39 +28,60 @@ def test_table_migration_job_refreshes_migration_status( r".*Do you want to update the existing installation?.*": 'yes', }, ) - ctx.workspace_installation.run() - ctx.deployed_workflows.run_workflow(workflow) + ProgressTrackingInstallation(ctx.sql_backend, ctx.ucx_catalog).run() + + run_id = ctx.deployed_workflows.run_workflow(workflow) + assert installation_ctx.deployed_workflows.validate_step(workflow), f"Workflow failed: {workflow}" - # Avoiding MigrationStatusRefresh as it will refresh the status before fetching - migration_status_query = f"SELECT * FROM {ctx.config.inventory_database}.migration_status" + # Avoiding MigrationStatusRefresh as it will refresh the status before fetching. + migration_status_query = f"SELECT * FROM {ctx.migration_status_refresher.full_name}" migration_statuses = list(ctx.sql_backend.fetch(migration_status_query)) - if len(migration_statuses) == 0: + if not migration_statuses: ctx.deployed_workflows.relay_logs(workflow) - assert False, "No migration statuses found" + pytest.fail("No migration statuses found") - asserts = [] + problems = [] for table in tables.values(): migration_status = [] for status in migration_statuses: if status.src_schema == table.schema_name and status.src_table == table.name: migration_status.append(status) - assert_message_postfix = f" found for {table.table_type} {table.full_name}" - if len(migration_status) == 0: - asserts.append("No migration status" + assert_message_postfix) - elif len(migration_status) > 1: - asserts.append("Multiple migration statuses" + assert_message_postfix) - elif migration_status[0].dst_schema is None: - asserts.append("No destination schema" + assert_message_postfix) - elif migration_status[0].dst_table is None: - asserts.append("No destination table" + assert_message_postfix) - - assert_message = ( - "\n".join(asserts) + " given migration statuses " + "\n".join([str(status) for status in migration_statuses]) + match migration_status: + case []: + problems.append(f"No migration status found for {table.table_type} {table.full_name}") + case [_, _, *_]: + problems.append(f"Multiple migration statuses found for {table.table_type} {table.full_name}") + case [status] if status.dst_schema is None: + problems.append(f"No destination schema found for {table.table_type} {table.full_name}") + case [status] if status.dst_table is None: + problems.append(f"No destination table found for {table.table_type} {table.full_name}") + + failure_message = ( + "\n".join(problems) + " given migration statuses:\n" + "\n".join([str(status) for status in migration_statuses]) + ) + assert not problems, failure_message + + # Ensure that the workflow populated the `workflow_runs` table. + query = ( + f"SELECT 1 FROM {installation_ctx.ucx_catalog}.multiworkspace.workflow_runs\n" + f"WHERE workspace_id = {installation_ctx.workspace_id}\n" + f" AND workflow_run_id = {run_id}\n" + f"LIMIT 1\n" + ) + assert any(installation_ctx.sql_backend.fetch(query)), f"No workflow run captured: {query}" + + # Ensure that the history file has table records written to it that correspond to this run. + query = ( + f"SELECT 1 from {installation_ctx.ucx_catalog}.multiworkspace.historical\n" + f"WHERE workspace_id = {installation_ctx.workspace_id}\n" + f" AND job_run_id = {run_id}\n" + f" AND object_type = 'Table'\n" + f"LIMIT 1\n" ) - assert len(asserts) == 0, assert_message + assert any(installation_ctx.sql_backend.fetch(query)), f"No snapshots captured to the history log: {query}" @pytest.mark.parametrize( diff --git a/tests/unit/hive_metastore/test_table_migrate.py b/tests/unit/hive_metastore/test_table_migrate.py index 5b7e7a66a4..f1932dc6c6 100644 --- a/tests/unit/hive_metastore/test_table_migrate.py +++ b/tests/unit/hive_metastore/test_table_migrate.py @@ -1375,63 +1375,70 @@ def test_revert_migrated_tables_failed(caplog, mock_pyspark): assert "Failed to revert table hive_metastore.test_schema1.test_table1: error" in caplog.text -def test_refresh_migration_status_published_remained_tables(caplog, mock_pyspark): - backend = MockBackend() +def test_refresh_migration_status_check_remaining_tables(ws, mock_backend, caplog) -> None: table_crawler = create_autospec(TablesCrawler) - client = mock_workspace_client() - table_crawler.snapshot.return_value = [ - Table( - object_type="EXTERNAL", - table_format="DELTA", - catalog="hive_metastore", - database="schema1", - name="table1", - location="s3://some_location/table1", - upgraded_to="ucx_default.db1_dst.dst_table1", + tables = ( + ( + Table( + object_type="EXTERNAL", + table_format="DELTA", + catalog="hive_metastore", + database="schema1", + name="table1", + location="s3://some_location/table1", + upgraded_to="ucx_default.db1_dst.dst_table1", + ) ), - Table( - object_type="EXTERNAL", - table_format="DELTA", - catalog="hive_metastore", - database="schema1", - name="table2", - location="s3://some_location/table2", - upgraded_to="ucx_default.db1_dst.dst_table2", + ( + Table( + object_type="EXTERNAL", + table_format="DELTA", + catalog="hive_metastore", + database="schema1", + name="table2", + location="s3://some_location/table2", + upgraded_to="ucx_default.db1_dst.dst_table2", + ) ), - Table( - object_type="EXTERNAL", - table_format="DELTA", - catalog="hive_metastore", - database="schema1", - name="table3", - location="s3://some_location/table3", + ( + Table( + object_type="EXTERNAL", + table_format="DELTA", + catalog="hive_metastore", + database="schema1", + name="table3", + location="s3://some_location/table3", + ) ), - ] - table_mapping = mock_table_mapping() - migration_status_refresher = create_autospec(TableMigrationStatusRefresher) - migration_index = TableMigrationIndex( - [ - TableMigrationStatus("schema1", "table1", "ucx_default", "db1_dst", "dst_table1"), - TableMigrationStatus("schema1", "table2", "ucx_default", "db1_dst", "dst_table2"), - ] ) - migration_status_refresher.index.return_value = migration_index + table_crawler.snapshot.return_value = tables + + def migration_status_from_table(table: Table) -> TableMigrationStatus: + catalog, schema, name = table.upgraded_to.split(".", maxsplit=2) if table.upgraded_to else (None, None, None) + return TableMigrationStatus(table.database, table.name, catalog, schema, name) + + migration_status_snapshot = tuple(migration_status_from_table(table) for table in tables) + migration_status_refresher = create_autospec(TableMigrationStatusRefresher) migrate_grants = create_autospec(MigrateGrants) external_locations = create_autospec(ExternalLocations) table_migrate = TablesMigrator( table_crawler, - client, - backend, - table_mapping, + ws, + mock_backend, + mock_table_mapping(), migration_status_refresher, migrate_grants, external_locations, ) + with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.hive_metastore"): - tables = table_migrate.get_remaining_tables() + table_migrate.check_remaining_tables(migration_status_snapshot) assert 'remained-hive-metastore-table: hive_metastore.schema1.table3' in caplog.messages - assert len(tables) == 1 and tables[0].key == "hive_metastore.schema1.table3" - migrate_grants.assert_not_called() + + table_crawler.snapshot.assert_called_once() + migration_status_refresher.index.assert_not_called() + migration_status_refresher.snapshot.assert_not_called() + migrate_grants.apply.assert_not_called() external_locations.resolve_mount.assert_not_called() diff --git a/tests/unit/hive_metastore/test_workflows.py b/tests/unit/hive_metastore/test_workflows.py index 7d2a05ec8f..c3810268b4 100644 --- a/tests/unit/hive_metastore/test_workflows.py +++ b/tests/unit/hive_metastore/test_workflows.py @@ -1,5 +1,9 @@ +import datetime as dt +from collections.abc import Sequence + import pytest +from databricks.labs.ucx.framework.tasks import Workflow from databricks.labs.ucx.hive_metastore.workflows import ( TableMigration, MigrateExternalTablesCTAS, @@ -54,18 +58,95 @@ def test_migrate_ctas_views(run_workflow): ctx.workspace_client.catalogs.list.assert_called() +_migration_workflows: Sequence[type[Workflow]] = ( + TableMigration, + MigrateHiveSerdeTablesInPlace, + MigrateExternalTablesCTAS, + ScanTablesInMounts, + MigrateTablesInMounts, +) + + @pytest.mark.parametrize( "workflow", - [ - TableMigration, - MigrateHiveSerdeTablesInPlace, - MigrateExternalTablesCTAS, - ScanTablesInMounts, - MigrateTablesInMounts, - ], + # Special case here for ScanTablesInMounts, handled below. + [workflow for workflow in _migration_workflows if workflow is not ScanTablesInMounts], ) def test_update_migration_status(run_workflow, workflow) -> None: - """Migration status is refreshed by deleting and showing new tables""" - ctx = run_workflow(getattr(workflow, "update_migration_status")) - assert ctx.sql_backend.has_rows_written_for("hive_metastore.ucx.migration_status") - assert "SHOW DATABASES" in ctx.sql_backend.queries + """Migration status is refreshed by updating the table inventory, migration status and history log.""" + task1 = getattr(workflow, "update_table_inventory") + task2 = getattr(workflow, "update_migration_status") + task3 = getattr(workflow, "update_tables_history_log") + assert task1.__name__ in task2.__task__.depends_on and task2.__name__ in task3.__task__.depends_on + + # Refresh tables inventory. + ctx1 = run_workflow(task1) + assert "SHOW DATABASES" in ctx1.sql_backend.queries + assert ctx1.sql_backend.has_rows_written_for("hive_metastore.ucx.tables") + + # Given the tables inventory, refresh the migration status of the tables. + ctx2 = run_workflow(task2, named_parameters={"parent_run_id": "53"}) + assert "SELECT * FROM `hive_metastore`.`ucx`.`tables`" in ctx2.sql_backend.queries + assert ctx2.sql_backend.has_rows_written_for("hive_metastore.ucx.migration_status") + + # Given the tables inventory and migration status snapshot, update the historical log. + ctx3 = run_workflow(task3, named_parameters={"parent_run_id": "53"}) + assert "SELECT * FROM `hive_metastore`.`ucx`.`tables`" in ctx3.sql_backend.queries + assert "SELECT * FROM `hive_metastore`.`ucx`.`migration_status`" in ctx3.sql_backend.queries + assert ctx3.sql_backend.has_rows_written_for("`ucx`.`multiworkspace`.`historical`") + + +def test_scan_tables_in_mounts_update_migration_status(run_workflow) -> None: + """Migration status is refreshed by updating the migration status and history log.""" + task1 = ScanTablesInMounts.update_migration_status + task2 = ScanTablesInMounts.update_tables_history_log + assert task1.__name__ in getattr(task2, "__task__").depends_on + + # Given the tables inventory, refresh the migration status of the tables. + ctx1 = run_workflow(task1, named_parameters={"parent_run_id": "53"}) + assert "SELECT * FROM `hive_metastore`.`ucx`.`tables`" in ctx1.sql_backend.queries + assert ctx1.sql_backend.has_rows_written_for("hive_metastore.ucx.migration_status") + + # Given the tables inventory and migration status snapshot, update the historical log. + ctx2 = run_workflow(task2, named_parameters={"parent_run_id": "53"}) + assert "SELECT * FROM `hive_metastore`.`ucx`.`tables`" in ctx2.sql_backend.queries + assert "SELECT * FROM `hive_metastore`.`ucx`.`migration_status`" in ctx2.sql_backend.queries + assert ctx2.sql_backend.has_rows_written_for("`ucx`.`multiworkspace`.`historical`") + + +@pytest.mark.parametrize( + "task", + [getattr(workflow, "record_workflow_run") for workflow in _migration_workflows], + ids=[workflow.__name__ for workflow in _migration_workflows], +) +def test_migration_record_workflow_run(run_workflow, task) -> None: + """Verify that we log the workflow run.""" + start_time = dt.datetime.now(dt.timezone.utc).replace(microsecond=0) + context_replacements = { + "named_parameters": { + "workflow": "the_workflow", + "job_id": "12345", + "parent_run_id": "456", + "attempt": "1", + "start_time": start_time.isoformat(), + }, + } + + ctx = run_workflow(task, **context_replacements) + + rows = ctx.sql_backend.rows_written_for("ucx.multiworkspace.workflow_runs", "append") + + rows_as_dict = [{k: v for k, v in rows.asDict().items() if k != 'finished_at'} for rows in rows] + assert rows_as_dict == [ + { + "started_at": start_time, + # finished_at: checked below. + "workspace_id": 123, + "workflow_name": "the_workflow", + "workflow_id": 12345, + "workflow_run_id": 456, + "workflow_run_attempt": 1, + } + ] + # Finish-time must be indistinguishable from or later than the start time. + assert all(row["started_at"] <= row["finished_at"] for row in rows) From cfe1486aefec883ef5e000cf77b2a2722f87c1d3 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Thu, 7 Nov 2024 17:27:20 +0100 Subject: [PATCH 05/32] All workflows update the logs table. --- docs/table_persistence.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/table_persistence.md b/docs/table_persistence.md index 12cac2a2c6..26ed321369 100644 --- a/docs/table_persistence.md +++ b/docs/table_persistence.md @@ -30,7 +30,7 @@ Table utilization per workflow: | query_problems | RW | RW | | | | | | | workflow_problems | RW | RW | | | | | | | udfs | RW | RW | RO | | | | | -| logs | RW | | RW | RW | | RW | RW | +| logs | RW | RW | RW | RW | RW | RW | RW | | recon_results | | | | | | | RW | **RW** - Read/Write, the job generates or updates the table.
From 145cd7d5cb84f6905ada29fdafd6c87ae9896ae8 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Thu, 7 Nov 2024 17:29:30 +0100 Subject: [PATCH 06/32] Table migration workflows also update the tables inventory (at the end). --- docs/table_persistence.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/table_persistence.md b/docs/table_persistence.md index 26ed321369..b09c17d423 100644 --- a/docs/table_persistence.md +++ b/docs/table_persistence.md @@ -8,7 +8,7 @@ Table utilization per workflow: | Table | Generate Assessment | Update Migration Progress | Migrate Groups | Migrate External Tables | Upgrade Jobs | Migrate tables | Migrate Data Reconciliation | |--------------------------|---------------------|---------------------------|----------------|-------------------------|--------------|----------------|-----------------------------| -| tables | RW | RW | | RO | | RO | | +| tables | RW | RW | | RW | | RW | | | grants | RW | RW | | RW | | RW | | | mounts | RW | | | RO | RO | RO | | | permissions | RW | | RW | RO | | RO | | From 6e5e4aecef75b80e3821edf4aa5098dc8dddec42 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Mon, 11 Nov 2024 16:40:23 +0100 Subject: [PATCH 07/32] Switch to multi-line f""" """-string. --- .../hive_metastore/test_workflows.py | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index b5a5723c60..e31746a1dc 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -65,22 +65,22 @@ def test_table_migration_job_refreshes_migration_status( assert not problems, failure_message # Ensure that the workflow populated the `workflow_runs` table. - query = ( - f"SELECT 1 FROM {installation_ctx.ucx_catalog}.multiworkspace.workflow_runs\n" - f"WHERE workspace_id = {installation_ctx.workspace_id}\n" - f" AND workflow_run_id = {run_id}\n" - f"LIMIT 1\n" - ) + query = f""" + SELECT 1 FROM {installation_ctx.ucx_catalog}.multiworkspace.workflow_runs + WHERE workspace_id = {installation_ctx.workspace_id} + AND workflow_run_id = {run_id} + LIMIT 1 + """ assert any(installation_ctx.sql_backend.fetch(query)), f"No workflow run captured: {query}" # Ensure that the history file has table records written to it that correspond to this run. - query = ( - f"SELECT 1 from {installation_ctx.ucx_catalog}.multiworkspace.historical\n" - f"WHERE workspace_id = {installation_ctx.workspace_id}\n" - f" AND job_run_id = {run_id}\n" - f" AND object_type = 'Table'\n" - f"LIMIT 1\n" - ) + query = f""" + SELECT 1 from {installation_ctx.ucx_catalog}.multiworkspace.historical + WHERE workspace_id = {installation_ctx.workspace_id} + AND job_run_id = {run_id} + AND object_type = 'Table' + LIMIT 1 + """ assert any(installation_ctx.sql_backend.fetch(query)), f"No snapshots captured to the history log: {query}" From 1ce8e0544b0356fb2420aebddeab9426bd6a8178 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Mon, 11 Nov 2024 18:58:05 +0100 Subject: [PATCH 08/32] Fix mock return value for crawler snapshot. It should return all the tables as a snapshot. --- .../unit/hive_metastore/test_table_migrate.py | 52 ++++++++----------- 1 file changed, 23 insertions(+), 29 deletions(-) diff --git a/tests/unit/hive_metastore/test_table_migrate.py b/tests/unit/hive_metastore/test_table_migrate.py index f1932dc6c6..2bd83be36e 100644 --- a/tests/unit/hive_metastore/test_table_migrate.py +++ b/tests/unit/hive_metastore/test_table_migrate.py @@ -1378,37 +1378,31 @@ def test_revert_migrated_tables_failed(caplog, mock_pyspark): def test_refresh_migration_status_check_remaining_tables(ws, mock_backend, caplog) -> None: table_crawler = create_autospec(TablesCrawler) tables = ( - ( - Table( - object_type="EXTERNAL", - table_format="DELTA", - catalog="hive_metastore", - database="schema1", - name="table1", - location="s3://some_location/table1", - upgraded_to="ucx_default.db1_dst.dst_table1", - ) + Table( + object_type="EXTERNAL", + table_format="DELTA", + catalog="hive_metastore", + database="schema1", + name="table1", + location="s3://some_location/table1", + upgraded_to="ucx_default.db1_dst.dst_table1", ), - ( - Table( - object_type="EXTERNAL", - table_format="DELTA", - catalog="hive_metastore", - database="schema1", - name="table2", - location="s3://some_location/table2", - upgraded_to="ucx_default.db1_dst.dst_table2", - ) + Table( + object_type="EXTERNAL", + table_format="DELTA", + catalog="hive_metastore", + database="schema1", + name="table2", + location="s3://some_location/table2", + upgraded_to="ucx_default.db1_dst.dst_table2", ), - ( - Table( - object_type="EXTERNAL", - table_format="DELTA", - catalog="hive_metastore", - database="schema1", - name="table3", - location="s3://some_location/table3", - ) + Table( + object_type="EXTERNAL", + table_format="DELTA", + catalog="hive_metastore", + database="schema1", + name="table3", + location="s3://some_location/table3", ), ) table_crawler.snapshot.return_value = tables From 788789fb8d161edc2bbed6f0e22fb16aeb3755ed Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Tue, 12 Nov 2024 10:59:04 +0100 Subject: [PATCH 09/32] Switch to specialisation (limited to TableProgressEncoder) for ensuring the migration index is available during encoding. --- src/databricks/labs/ucx/progress/grants.py | 4 +-- src/databricks/labs/ucx/progress/history.py | 31 +++++---------------- src/databricks/labs/ucx/progress/jobs.py | 4 +-- src/databricks/labs/ucx/progress/tables.py | 27 ++++++++++-------- 4 files changed, 26 insertions(+), 40 deletions(-) diff --git a/src/databricks/labs/ucx/progress/grants.py b/src/databricks/labs/ucx/progress/grants.py index a393d31d87..5807799fa2 100644 --- a/src/databricks/labs/ucx/progress/grants.py +++ b/src/databricks/labs/ucx/progress/grants.py @@ -25,8 +25,8 @@ def __init__( ) -> None: super().__init__(sql_backend, ownership, Grant, run_id, workspace_id, catalog, schema, table) - def _encode_record_as_historical(self, record: Grant, snapshot_context: None) -> Historical: - historical = super()._encode_record_as_historical(record, snapshot_context) + def _encode_record_as_historical(self, record: Grant) -> Historical: + historical = super()._encode_record_as_historical(record) failures = [] if not record.uc_grant_sql(): type_, key = record.this_type_and_key() diff --git a/src/databricks/labs/ucx/progress/history.py b/src/databricks/labs/ucx/progress/history.py index 41ba114509..e2072ec8aa 100644 --- a/src/databricks/labs/ucx/progress/history.py +++ b/src/databricks/labs/ucx/progress/history.py @@ -3,10 +3,9 @@ import datetime as dt import json import logging -from contextlib import contextmanager from enum import Enum, EnumMeta -from collections.abc import Iterable, Sequence, Generator -from typing import Any, ClassVar, Generic, Protocol, TypeVar, get_type_hints, final +from collections.abc import Iterable, Sequence +from typing import Any, ClassVar, Generic, Protocol, TypeVar, get_type_hints from databricks.labs.lsql.backends import SqlBackend @@ -280,29 +279,13 @@ def __init__( def full_name(self) -> str: return f"{self._catalog}.{self._schema}.{self._table}" - @final def append_inventory_snapshot(self, snapshot: Iterable[Record]) -> None: - with self._snapshot_context() as ctx: - history_records = [self._encode_record_as_historical(record, ctx) for record in snapshot] - logger.debug(f"Appending {len(history_records)} {self._klass} record(s) to history.") - # This is the only writer, and the mode is 'append'. This is documented as conflict-free. - self._sql_backend.save_table( - escape_sql_identifier(self.full_name), history_records, Historical, mode="append" - ) - - SC: ClassVar = type(None) - - @contextmanager - def _snapshot_context(self) -> Generator[SC, None, None]: - """A context manager that is held open while a snapshot is underway. - - The context itself is passed as a parameter to :method:`_encode_record_as_historical`. As a manager, preparation - and cleanup can take place before and after the snapshot takes place. - """ - yield + history_records = [self._encode_record_as_historical(record) for record in snapshot] + logger.debug(f"Appending {len(history_records)} {self._klass} record(s) to history.") + # This is the only writer, and the mode is 'append'. This is documented as conflict-free. + self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append") - def _encode_record_as_historical(self, record: Record, snapshot_context: SC) -> Historical: + def _encode_record_as_historical(self, record: Record) -> Historical: """Encode a snapshot record as a historical log entry.""" # Snapshot context not needed with default implementation. - _ = snapshot_context return self._encoder.to_historical(record) diff --git a/src/databricks/labs/ucx/progress/jobs.py b/src/databricks/labs/ucx/progress/jobs.py index 17e1471569..198139543c 100644 --- a/src/databricks/labs/ucx/progress/jobs.py +++ b/src/databricks/labs/ucx/progress/jobs.py @@ -48,7 +48,7 @@ def _job_problems(self) -> dict[int, list[str]]: index[job_problem.job_id].append(failure) return index - def _encode_record_as_historical(self, record: JobInfo, snapshot_context: None) -> Historical: - historical = super()._encode_record_as_historical(record, snapshot_context) + def _encode_record_as_historical(self, record: JobInfo) -> Historical: + historical = super()._encode_record_as_historical(record) failures = self._job_problems.get(int(record.job_id), []) return replace(historical, failures=historical.failures + failures) diff --git a/src/databricks/labs/ucx/progress/tables.py b/src/databricks/labs/ucx/progress/tables.py index 6550d1526f..87671e72c1 100644 --- a/src/databricks/labs/ucx/progress/tables.py +++ b/src/databricks/labs/ucx/progress/tables.py @@ -1,10 +1,10 @@ -from collections.abc import Generator -from contextlib import contextmanager +import logging +from collections.abc import Iterable from dataclasses import replace -from typing import ClassVar from databricks.labs.lsql.backends import SqlBackend from databricks.labs.ucx.framework.crawlers import CrawlerBase +from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.hive_metastore.tables import Table from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex, TableMigrationStatus @@ -12,6 +12,8 @@ from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.labs.ucx.progress.install import Historical +logger = logging.getLogger(__name__) + class TableProgressEncoder(ProgressEncoder[Table]): """Encoder class:Table to class:History. @@ -44,21 +46,22 @@ def __init__( ) self._migration_status_refresher = migration_status_refresher - SC: ClassVar = TableMigrationIndex - - @contextmanager - def _snapshot_context(self) -> Generator[SC, None, None]: - yield TableMigrationIndex(self._migration_status_refresher.snapshot()) + def append_inventory_snapshot(self, snapshot: Iterable[Table]) -> None: + migration_index = TableMigrationIndex(self._migration_status_refresher.snapshot()) + history_records = [self._encode_table_as_historical(record, migration_index) for record in snapshot] + logger.debug(f"Appending {len(history_records)} {self._klass} table record(s) to history.") + # This is the only writer, and the mode is 'append'. This is documented as conflict-free. + self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append") - def _encode_record_as_historical(self, record: Table, snapshot_context: SC) -> Historical: - """Encode record as historical. + def _encode_table_as_historical(self, record: Table, migration_index: TableMigrationIndex) -> Historical: + """Encode a table record, enriching with the migration status. A table failure means that the table is pending migration. Grants are purposefully left out, because a grant might not be mappable to UC, like `READ_METADATA`, thus possibly resulting in false "pending migration" failure for tables that are migrated to UC with their relevant grants also being migrated. """ - historical = super()._encode_record_as_historical(record, snapshot_context=None) + historical = super()._encode_record_as_historical(record) failures = [] - if not snapshot_context.is_migrated(record.database, record.name): + if not migration_index.is_migrated(record.database, record.name): failures.append("Pending migration") return replace(historical, failures=historical.failures + failures) From da3b15b74c9225045a80e09c2841b934324a1c74 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 13 Nov 2024 15:52:35 +0100 Subject: [PATCH 10/32] Back out changes relating to the way the migration-status information is passed into the table encoder. This has been backed out to reduce clutter on the PR; they have been moved to PR #3270. --- .../labs/ucx/contexts/workflow_task.py | 10 +++--- src/databricks/labs/ucx/progress/history.py | 3 +- src/databricks/labs/ucx/progress/tables.py | 25 ++++----------- tests/unit/progress/test_tables.py | 32 +++++++++---------- 4 files changed, 28 insertions(+), 42 deletions(-) diff --git a/src/databricks/labs/ucx/contexts/workflow_task.py b/src/databricks/labs/ucx/contexts/workflow_task.py index f9fea563ca..a3656b290b 100644 --- a/src/databricks/labs/ucx/contexts/workflow_task.py +++ b/src/databricks/labs/ucx/contexts/workflow_task.py @@ -22,10 +22,10 @@ from databricks.labs.ucx.contexts.application import GlobalContext from databricks.labs.ucx.hive_metastore import TablesInMounts, TablesCrawler from databricks.labs.ucx.hive_metastore.table_size import TableSizeCrawler -from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler, Table +from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler from databricks.labs.ucx.hive_metastore.udfs import Udf from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder -from databricks.labs.ucx.progress.grants import GrantProgressEncoder, Grant +from databricks.labs.ucx.progress.grants import GrantProgressEncoder from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.labs.ucx.progress.jobs import JobsProgressEncoder from databricks.labs.ucx.progress.tables import TableProgressEncoder @@ -189,7 +189,7 @@ def policies_progress(self) -> ProgressEncoder[PolicyInfo]: ) @cached_property - def grants_progress(self) -> ProgressEncoder[Grant]: + def grants_progress(self) -> GrantProgressEncoder: return GrantProgressEncoder( self.sql_backend, self.grant_ownership, @@ -221,11 +221,11 @@ def pipelines_progress(self) -> ProgressEncoder[PipelineInfo]: ) @cached_property - def tables_progress(self) -> ProgressEncoder[Table]: + def tables_progress(self) -> TableProgressEncoder: return TableProgressEncoder( self.sql_backend, self.table_ownership, - self.migration_status_refresher, + self.migration_status_refresher.index(force_refresh=False), self.parent_run_id, self.workspace_id, self.config.ucx_catalog, diff --git a/src/databricks/labs/ucx/progress/history.py b/src/databricks/labs/ucx/progress/history.py index e2072ec8aa..66af70d768 100644 --- a/src/databricks/labs/ucx/progress/history.py +++ b/src/databricks/labs/ucx/progress/history.py @@ -5,7 +5,7 @@ import logging from enum import Enum, EnumMeta from collections.abc import Iterable, Sequence -from typing import Any, ClassVar, Generic, Protocol, TypeVar, get_type_hints +from typing import Any, ClassVar, Generic, Protocol, TypeVar, get_type_hints, final from databricks.labs.lsql.backends import SqlBackend @@ -279,6 +279,7 @@ def __init__( def full_name(self) -> str: return f"{self._catalog}.{self._schema}.{self._table}" + @final def append_inventory_snapshot(self, snapshot: Iterable[Record]) -> None: history_records = [self._encode_record_as_historical(record) for record in snapshot] logger.debug(f"Appending {len(history_records)} {self._klass} record(s) to history.") diff --git a/src/databricks/labs/ucx/progress/tables.py b/src/databricks/labs/ucx/progress/tables.py index 87671e72c1..548286d150 100644 --- a/src/databricks/labs/ucx/progress/tables.py +++ b/src/databricks/labs/ucx/progress/tables.py @@ -1,19 +1,13 @@ -import logging -from collections.abc import Iterable from dataclasses import replace from databricks.labs.lsql.backends import SqlBackend -from databricks.labs.ucx.framework.crawlers import CrawlerBase -from databricks.labs.ucx.framework.utils import escape_sql_identifier from databricks.labs.ucx.hive_metastore.tables import Table -from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex, TableMigrationStatus +from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex from databricks.labs.ucx.hive_metastore.ownership import TableOwnership from databricks.labs.ucx.progress.history import ProgressEncoder from databricks.labs.ucx.progress.install import Historical -logger = logging.getLogger(__name__) - class TableProgressEncoder(ProgressEncoder[Table]): """Encoder class:Table to class:History. @@ -27,7 +21,7 @@ def __init__( self, sql_backend: SqlBackend, ownership: TableOwnership, - migration_status_refresher: CrawlerBase[TableMigrationStatus], + table_migration_index: TableMigrationIndex, run_id: int, workspace_id: int, catalog: str, @@ -44,17 +38,10 @@ def __init__( schema, table, ) - self._migration_status_refresher = migration_status_refresher - - def append_inventory_snapshot(self, snapshot: Iterable[Table]) -> None: - migration_index = TableMigrationIndex(self._migration_status_refresher.snapshot()) - history_records = [self._encode_table_as_historical(record, migration_index) for record in snapshot] - logger.debug(f"Appending {len(history_records)} {self._klass} table record(s) to history.") - # This is the only writer, and the mode is 'append'. This is documented as conflict-free. - self._sql_backend.save_table(escape_sql_identifier(self.full_name), history_records, Historical, mode="append") + self._table_migration_index = table_migration_index - def _encode_table_as_historical(self, record: Table, migration_index: TableMigrationIndex) -> Historical: - """Encode a table record, enriching with the migration status. + def _encode_record_as_historical(self, record: Table) -> Historical: + """Encode record as historical. A table failure means that the table is pending migration. Grants are purposefully left out, because a grant might not be mappable to UC, like `READ_METADATA`, thus possibly resulting in false "pending migration" failure @@ -62,6 +49,6 @@ def _encode_table_as_historical(self, record: Table, migration_index: TableMigra """ historical = super()._encode_record_as_historical(record) failures = [] - if not migration_index.is_migrated(record.database, record.name): + if not self._table_migration_index.is_migrated(record.database, record.name): failures.append("Pending migration") return replace(historical, failures=historical.failures + failures) diff --git a/tests/unit/progress/test_tables.py b/tests/unit/progress/test_tables.py index baf08de26d..a859bf5c03 100644 --- a/tests/unit/progress/test_tables.py +++ b/tests/unit/progress/test_tables.py @@ -4,11 +4,9 @@ from databricks.labs.ucx.framework.owners import Ownership from databricks.labs.ucx.framework.utils import escape_sql_identifier -from databricks.labs.ucx.hive_metastore.table_migration_status import ( - TableMigrationStatusRefresher, - TableMigrationStatus, -) +from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex from databricks.labs.ucx.hive_metastore.tables import Table +from databricks.labs.ucx.progress.grants import GrantProgressEncoder from databricks.labs.ucx.progress.tables import TableProgressEncoder @@ -21,21 +19,21 @@ def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None: ownership = create_autospec(Ownership) ownership.owner_of.return_value = "user" - migration_status_crawler = create_autospec(TableMigrationStatusRefresher) - migration_status_crawler.snapshot.return_value = ( - TableMigrationStatus(table.database, table.name, "main", "default", table.name, update_ts=None), - ) + table_migration_index = create_autospec(TableMigrationIndex) + table_migration_index.is_migrated.return_value = True + grant_progress_encoder = create_autospec(GrantProgressEncoder) encoder = TableProgressEncoder( - mock_backend, ownership, migration_status_crawler, run_id=1, workspace_id=123456789, catalog="test" + mock_backend, ownership, table_migration_index, run_id=1, workspace_id=123456789, catalog="test" ) encoder.append_inventory_snapshot([table]) rows = mock_backend.rows_written_for(escape_sql_identifier(encoder.full_name), "append") - assert rows, f"No rows written for: {encoder.full_name}" + assert len(rows) > 0, f"No rows written for: {encoder.full_name}" assert len(rows[0].failures) == 0 ownership.owner_of.assert_called_once() - migration_status_crawler.snapshot.assert_called_once() + table_migration_index.is_migrated.assert_called_with(table.database, table.name) + grant_progress_encoder.assert_not_called() @pytest.mark.parametrize( @@ -47,12 +45,11 @@ def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None: def test_table_progress_encoder_pending_migration_failure(mock_backend, table: Table) -> None: ownership = create_autospec(Ownership) ownership.owner_of.return_value = "user" - migration_status_crawler = create_autospec(TableMigrationStatusRefresher) - migration_status_crawler.snapshot.return_value = ( - TableMigrationStatus(table.database, table.name), # No destination: therefore not yet migrated. - ) + table_migration_index = create_autospec(TableMigrationIndex) + table_migration_index.is_migrated.return_value = False + grant_progress_encoder = create_autospec(GrantProgressEncoder) encoder = TableProgressEncoder( - mock_backend, ownership, migration_status_crawler, run_id=1, workspace_id=123456789, catalog="test" + mock_backend, ownership, table_migration_index, run_id=1, workspace_id=123456789, catalog="test" ) encoder.append_inventory_snapshot([table]) @@ -61,4 +58,5 @@ def test_table_progress_encoder_pending_migration_failure(mock_backend, table: T assert len(rows) > 0, f"No rows written for: {encoder.full_name}" assert rows[0].failures == ["Pending migration"] ownership.owner_of.assert_called_once() - migration_status_crawler.snapshot.assert_called_once() + table_migration_index.is_migrated.assert_called_with(table.database, table.name) + grant_progress_encoder.assert_not_called() From ac8586a523019c0f0e9d32485a91a3326523bda6 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 13 Nov 2024 16:09:01 +0100 Subject: [PATCH 11/32] Back out more changes that are either not needed or made on other PRs. --- src/databricks/labs/ucx/progress/history.py | 3 ++- src/databricks/labs/ucx/progress/tables.py | 2 +- src/databricks/labs/ucx/progress/workflows.py | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/databricks/labs/ucx/progress/history.py b/src/databricks/labs/ucx/progress/history.py index 66af70d768..89639c7e86 100644 --- a/src/databricks/labs/ucx/progress/history.py +++ b/src/databricks/labs/ucx/progress/history.py @@ -1,6 +1,7 @@ from __future__ import annotations import dataclasses import datetime as dt +import typing import json import logging from enum import Enum, EnumMeta @@ -105,7 +106,7 @@ def _get_field_names_with_types(cls, klass: type[Record]) -> tuple[dict[str, typ # are produced automatically in a __future__.__annotations__ context). Unfortunately the dataclass mechanism # captures the type hints prior to resolution (which happens later in the class initialization process). # As such, we rely on dataclasses.fields() for the set of field names, but not the types which we fetch directly. - klass_type_hints = get_type_hints(klass) + klass_type_hints = typing.get_type_hints(klass) field_names = [field.name for field in dataclasses.fields(klass)] field_names_with_types = {field_name: klass_type_hints[field_name] for field_name in field_names} if "failures" not in field_names_with_types: diff --git a/src/databricks/labs/ucx/progress/tables.py b/src/databricks/labs/ucx/progress/tables.py index 548286d150..6dc76132e2 100644 --- a/src/databricks/labs/ucx/progress/tables.py +++ b/src/databricks/labs/ucx/progress/tables.py @@ -43,7 +43,7 @@ def __init__( def _encode_record_as_historical(self, record: Table) -> Historical: """Encode record as historical. - A table failure means that the table is pending migration. Grants are purposefully left out, because a grant + A table failure means that the table is pending migration. Grants are purposefully lef out, because a grant might not be mappable to UC, like `READ_METADATA`, thus possibly resulting in false "pending migration" failure for tables that are migrated to UC with their relevant grants also being migrated. """ diff --git a/src/databricks/labs/ucx/progress/workflows.py b/src/databricks/labs/ucx/progress/workflows.py index 1380b25a5b..060b2fdccf 100644 --- a/src/databricks/labs/ucx/progress/workflows.py +++ b/src/databricks/labs/ucx/progress/workflows.py @@ -58,7 +58,7 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: """Update the history log with the latest tables inventory snapshot.""" # The table migration cluster is not legacy-ACL enabled, so we can't crawl from here. # Step 2 of 2: Assuming (due to depends-on) the inventory was refreshed, capture into the history log. - # WARNING: this will fail if the tables inventory is empty, because it will then try to perform a crawl. + # WARNING: this will fail if the inventory is empty, because it will then try to perform a crawl. history_log = ctx.tables_progress tables_snapshot = ctx.tables_crawler.snapshot() history_log.append_inventory_snapshot(tables_snapshot) From 4b3717fa6591a6e6e4389e1b4e2d3f2b10339eab Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 13 Nov 2024 16:10:05 +0100 Subject: [PATCH 12/32] Remove comment that is no longer relevant. --- src/databricks/labs/ucx/progress/history.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/databricks/labs/ucx/progress/history.py b/src/databricks/labs/ucx/progress/history.py index 89639c7e86..b1f2847807 100644 --- a/src/databricks/labs/ucx/progress/history.py +++ b/src/databricks/labs/ucx/progress/history.py @@ -289,5 +289,4 @@ def append_inventory_snapshot(self, snapshot: Iterable[Record]) -> None: def _encode_record_as_historical(self, record: Record) -> Historical: """Encode a snapshot record as a historical log entry.""" - # Snapshot context not needed with default implementation. return self._encoder.to_historical(record) From c80a9c683c6d4c0734d69348f000250e55bd9a5e Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 13 Nov 2024 17:25:50 +0100 Subject: [PATCH 13/32] Verify prerequisites for updating the migration-progress prior to the tasks starting. --- .../labs/ucx/hive_metastore/workflows.py | 123 +++++++++++------- .../hive_metastore/test_workflows.py | 35 ++++- tests/unit/hive_metastore/test_workflows.py | 28 ++++ 3 files changed, 135 insertions(+), 51 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/workflows.py b/src/databricks/labs/ucx/hive_metastore/workflows.py index 66818b7c74..d1311acb10 100644 --- a/src/databricks/labs/ucx/hive_metastore/workflows.py +++ b/src/databricks/labs/ucx/hive_metastore/workflows.py @@ -1,3 +1,5 @@ +import datetime as dt + from databricks.labs.ucx.assessment.workflows import Assessment from databricks.labs.ucx.contexts.workflow_task import RuntimeContext from databricks.labs.ucx.framework.tasks import Workflow, job_task @@ -57,9 +59,13 @@ def migrate_views(self, ctx: RuntimeContext): """ ctx.tables_migrator.migrate_tables(what=What.VIEW) - @job_task(job_cluster="tacl") - def setup_tacl(self, ctx: RuntimeContext): - """(Optimization) Allow the TACL job cluster to be started before we commence refreshing the tables inventory.""" + @job_task(job_cluster="table_migration") + def verify_prerequisites(self, ctx: RuntimeContext) -> None: + """Verify the prerequisites for running this job on the table migration cluster are fulfilled. + + We will wait up to 1 hour for the assessment run to finish if it is running or pending. + """ + ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) @job_task( depends_on=[ @@ -68,28 +74,27 @@ def setup_tacl(self, ctx: RuntimeContext): migrate_dbfs_root_delta_tables, migrate_dbfs_root_non_delta_tables, migrate_views, - setup_tacl, + verify_prerequisites, ], - job_cluster="tacl", ) def update_table_inventory(self, ctx: RuntimeContext) -> None: """Refresh the tables inventory, prior to updating the migration status of all the tables.""" - # The TACL cluster is not UC-enabled, so we cannot the snapshot cannot be written immediately to the history log. + # The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not + # UC-enabled, so we cannot both snapshot and update the history log from the same location. # Step 1 of 3: Just refresh the tables inventory. ctx.tables_crawler.snapshot(force_refresh=True) - @job_task(depends_on=[update_table_inventory], job_cluster="table_migration") + @job_task(depends_on=[verify_prerequisites, update_table_inventory], job_cluster="table_migration") def update_migration_status(self, ctx: RuntimeContext) -> None: """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" - # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on a TACL cluster.) + # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.) updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) ctx.tables_migrator.check_remaining_tables(updated_migration_progress) - @job_task(depends_on=[update_migration_status], job_cluster="table_migration") + @job_task(depends_on=[verify_prerequisites, update_migration_status], job_cluster="table_migration") def update_tables_history_log(self, ctx: RuntimeContext) -> None: """Update the history log with the latest tables inventory and migration status.""" - # The table migration cluster is not legacy-ACL enabled, so we can't crawl from here. - # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status was refreshed, capture into the + # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the # history log. # WARNING: this will fail if the tables inventory is empty, because it will then try to perform a crawl. history_log = ctx.tables_progress @@ -97,7 +102,7 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: # Note: encoding the Table records will trigger loading of the migration-status data. history_log.append_inventory_snapshot(tables_snapshot) - @job_task(job_cluster="table_migration", depends_on=[update_tables_history_log]) + @job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_tables_history_log]) def record_workflow_run(self, ctx: RuntimeContext) -> None: """Record the workflow run of this workflow.""" ctx.workflow_run_recorder.record() @@ -126,29 +131,33 @@ def migrate_views(self, ctx: RuntimeContext): """ ctx.tables_migrator.migrate_tables(what=What.VIEW) - @job_task(job_cluster="tacl") - def setup_tacl(self, ctx: RuntimeContext): - """(Optimization) Allow the TACL job cluster to be started before we commence refreshing the tables inventory.""" + @job_task(job_cluster="table_migration") + def verify_prerequisites(self, ctx: RuntimeContext) -> None: + """Verify the prerequisites for running this job on the table migration cluster are fulfilled. - @job_task(depends_on=[migrate_hive_serde_in_place, migrate_views, setup_tacl], job_cluster="tacl") + We will wait up to 1 hour for the assessment run to finish if it is running or pending. + """ + ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) + + @job_task(depends_on=[verify_prerequisites, migrate_views]) def update_table_inventory(self, ctx: RuntimeContext) -> None: """Refresh the tables inventory, prior to updating the migration status of all the tables.""" - # The TACL cluster is not UC-enabled, so we cannot the snapshot cannot be written immediately to the history log. + # The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not + # UC-enabled, so we cannot both snapshot and update the history log from the same location. # Step 1 of 3: Just refresh the tables inventory. ctx.tables_crawler.snapshot(force_refresh=True) - @job_task(depends_on=[update_table_inventory], job_cluster="table_migration") + @job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_table_inventory]) def update_migration_status(self, ctx: RuntimeContext) -> None: """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" - # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on a TACL cluster.) + # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.) updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) ctx.tables_migrator.check_remaining_tables(updated_migration_progress) - @job_task(depends_on=[update_migration_status], job_cluster="table_migration") + @job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_migration_status]) def update_tables_history_log(self, ctx: RuntimeContext) -> None: """Update the history log with the latest tables inventory and migration status.""" - # The table migration cluster is not legacy-ACL enabled, so we can't crawl from here. - # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status was refreshed, capture into the + # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the # history log. # WARNING: this will fail if the tables inventory is empty, because it will then try to perform a crawl. history_log = ctx.tables_progress @@ -156,7 +165,7 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: # Note: encoding the Table records will trigger loading of the migration-status data. history_log.append_inventory_snapshot(tables_snapshot) - @job_task(job_cluster="table_migration", depends_on=[update_tables_history_log]) + @job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_tables_history_log]) def record_workflow_run(self, ctx: RuntimeContext) -> None: """Record the workflow run of this workflow.""" ctx.workflow_run_recorder.record() @@ -190,31 +199,33 @@ def migrate_views(self, ctx: RuntimeContext): """ ctx.tables_migrator.migrate_tables(what=What.VIEW) - @job_task(job_cluster="tacl") - def setup_tacl(self, ctx: RuntimeContext): - """(Optimization) Allow the TACL job cluster to be started before we commence refreshing the tables inventory.""" + @job_task(job_cluster="table_migration") + def verify_prerequisites(self, ctx: RuntimeContext) -> None: + """Verify the prerequisites for running this job on the table migration cluster are fulfilled. - @job_task( - depends_on=[migrate_other_external_ctas, migrate_hive_serde_ctas, migrate_views, setup_tacl], job_cluster="tacl" - ) + We will wait up to 1 hour for the assessment run to finish if it is running or pending. + """ + ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) + + @job_task(depends_on=[verify_prerequisites, migrate_views, migrate_hive_serde_ctas, migrate_other_external_ctas]) def update_table_inventory(self, ctx: RuntimeContext) -> None: """Refresh the tables inventory, prior to updating the migration status of all the tables.""" - # The TACL cluster is not UC-enabled, so we cannot the snapshot cannot be written immediately to the history log. + # The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not + # UC-enabled, so cannot both snapshot and update the history log from the same location. # Step 1 of 3: Just refresh the tables inventory. ctx.tables_crawler.snapshot(force_refresh=True) - @job_task(depends_on=[update_table_inventory], job_cluster="table_migration") + @job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_table_inventory]) def update_migration_status(self, ctx: RuntimeContext) -> None: """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" - # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on a TACL cluster.) + # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.) updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) ctx.tables_migrator.check_remaining_tables(updated_migration_progress) - @job_task(depends_on=[update_migration_status], job_cluster="table_migration") + @job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_migration_status]) def update_tables_history_log(self, ctx: RuntimeContext) -> None: """Update the history log with the latest tables inventory and migration status.""" - # The table migration cluster is not legacy-ACL enabled, so we can't crawl from here. - # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status was refreshed, capture into the + # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the # history log. # WARNING: this will fail if the tables inventory is empty, because it will then try to perform a crawl. history_log = ctx.tables_progress @@ -222,7 +233,7 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: # Note: encoding the Table records will trigger loading of the migration-status data. history_log.append_inventory_snapshot(tables_snapshot) - @job_task(job_cluster="table_migration", depends_on=[update_tables_history_log]) + @job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_tables_history_log]) def record_workflow_run(self, ctx: RuntimeContext) -> None: """Record the workflow run of this workflow.""" ctx.workflow_run_recorder.record() @@ -239,13 +250,21 @@ def scan_tables_in_mounts_experimental(self, ctx: RuntimeContext): replacing any existing content that might be present.""" ctx.tables_in_mounts.snapshot(force_refresh=True) - @job_task(depends_on=[scan_tables_in_mounts_experimental], job_cluster="table_migration") + @job_task(job_cluster="table_migration") + def verify_prerequisites(self, ctx: RuntimeContext) -> None: + """Verify the prerequisites for running this job on the table migration cluster are fulfilled. + + We will wait up to 1 hour for the assessment run to finish if it is running or pending. + """ + ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) + + @job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, scan_tables_in_mounts_experimental]) def update_migration_status(self, ctx: RuntimeContext) -> None: """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) ctx.tables_migrator.check_remaining_tables(updated_migration_progress) - @job_task(depends_on=[update_migration_status], job_cluster="table_migration") + @job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_migration_status]) def update_tables_history_log(self, ctx: RuntimeContext) -> None: """Update the history log with the latest tables inventory and migration status.""" # WARNING: this will fail if the tables inventory is empty, because it will then try to perform a crawl. @@ -254,7 +273,7 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: # Note: encoding the Table records will trigger loading of the migration-status data. history_log.append_inventory_snapshot(tables_snapshot) - @job_task(job_cluster="table_migration", depends_on=[update_tables_history_log]) + @job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_tables_history_log]) def record_workflow_run(self, ctx: RuntimeContext) -> None: """Record the workflow run of this workflow.""" ctx.workflow_run_recorder.record() @@ -269,29 +288,33 @@ def migrate_tables_in_mounts_experimental(self, ctx: RuntimeContext): """[EXPERIMENTAL] This workflow migrates `delta tables stored in mount points` to Unity Catalog using a Create Table statement.""" ctx.tables_migrator.migrate_tables(what=What.TABLE_IN_MOUNT) - @job_task(job_cluster="tacl") - def setup_tacl(self, ctx: RuntimeContext): - """(Optimization) Allow the TACL job cluster to be started before we commence refreshing the tables inventory.""" + @job_task(job_cluster="table_migration") + def verify_prerequisites(self, ctx: RuntimeContext) -> None: + """Verify the prerequisites for running this job on the table migration cluster are fulfilled. + + We will wait up to 1 hour for the assessment run to finish if it is running or pending. + """ + ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) - @job_task(depends_on=[migrate_tables_in_mounts_experimental, setup_tacl], job_cluster="tacl") + @job_task(depends_on=[verify_prerequisites, migrate_tables_in_mounts_experimental]) def update_table_inventory(self, ctx: RuntimeContext) -> None: """Refresh the tables inventory, prior to updating the migration status of all the tables.""" - # The TACL cluster is not UC-enabled, so we cannot the snapshot cannot be written immediately to the history log. + # The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not + # UC-enabled, so we cannot both snapshot and update the history log from the same location. # Step 1 of 3: Just refresh the tables inventory. ctx.tables_crawler.snapshot(force_refresh=True) - @job_task(depends_on=[update_table_inventory], job_cluster="table_migration") + @job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_table_inventory]) def update_migration_status(self, ctx: RuntimeContext) -> None: """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" - # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on a TACL cluster.) + # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.) updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) ctx.tables_migrator.check_remaining_tables(updated_migration_progress) - @job_task(depends_on=[update_migration_status], job_cluster="table_migration") + @job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_migration_status]) def update_tables_history_log(self, ctx: RuntimeContext) -> None: """Update the history log with the latest tables inventory and migration status.""" - # The table migration cluster is not legacy-ACL enabled, so we can't crawl from here. - # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status was refreshed, capture into the + # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the # history log. # WARNING: this will fail if the tables inventory is empty, because it will then try to perform a crawl. history_log = ctx.tables_progress @@ -299,7 +322,7 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: # Note: encoding the Table records will trigger loading of the migration-status data. history_log.append_inventory_snapshot(tables_snapshot) - @job_task(job_cluster="table_migration", depends_on=[update_tables_history_log]) + @job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_tables_history_log]) def record_workflow_run(self, ctx: RuntimeContext) -> None: """Record the workflow run of this workflow.""" ctx.workflow_run_recorder.record() diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index e31746a1dc..e12b9ecd90 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -31,6 +31,12 @@ def test_table_migration_job_refreshes_migration_status( ctx.workspace_installation.run() ProgressTrackingInstallation(ctx.sql_backend, ctx.ucx_catalog).run() + # The assessment workflow is a prerequisite, and now verified by the workflow: it needs to successfully complete + # before we can test these workflows. + installation_ctx.deployed_workflows.run_workflow("assessment") + assert installation_ctx.deployed_workflows.validate_step("assessment"), "Workflow failed: assessment" + + # The workflow under test. run_id = ctx.deployed_workflows.run_workflow(workflow) assert installation_ctx.deployed_workflows.validate_step(workflow), f"Workflow failed: {workflow}" @@ -100,8 +106,14 @@ def test_table_migration_for_managed_table(ws, installation_ctx, prepare_tables_ r".*Do you want to update the existing installation?.*": 'yes', }, ) - ctx.workspace_installation.run() + + # The assessment workflow is a prerequisite, and now verified by the workflow: it needs to successfully complete + # before we can test the migration workflow. + installation_ctx.deployed_workflows.run_workflow("assessment") + assert installation_ctx.deployed_workflows.validate_step("assessment"), "Workflow failed: assessment" + + # The workflow under test. ctx.deployed_workflows.run_workflow(workflow) for table in tables.values(): @@ -126,6 +138,13 @@ def test_hiveserde_table_in_place_migration_job(ws, installation_ctx, prepare_ta }, ) ctx.workspace_installation.run() + + # The assessment workflow is a prerequisite, and now verified by the workflow: it needs to successfully complete + # before we can test the migration workflow. + installation_ctx.deployed_workflows.run_workflow("assessment") + assert installation_ctx.deployed_workflows.validate_step("assessment"), "Workflow failed: assessment" + + # The workflow under test. ctx.deployed_workflows.run_workflow("migrate-external-hiveserde-tables-in-place-experimental") # assert the workflow is successful assert ctx.deployed_workflows.validate_step("migrate-external-hiveserde-tables-in-place-experimental") @@ -146,6 +165,13 @@ def test_hiveserde_table_ctas_migration_job(ws, installation_ctx, prepare_tables }, ) ctx.workspace_installation.run() + + # The assessment workflow is a prerequisite, and now verified by the workflow: it needs to successfully complete + # before we can test the migration workflow. + installation_ctx.deployed_workflows.run_workflow("assessment") + assert installation_ctx.deployed_workflows.validate_step("assessment"), "Workflow failed: assessment" + + # The workflow under test. ctx.deployed_workflows.run_workflow("migrate-external-tables-ctas") # assert the workflow is successful assert ctx.deployed_workflows.validate_step("migrate-external-tables-ctas") @@ -172,6 +198,13 @@ def test_table_migration_job_publishes_remaining_tables( table_format="UNKNOWN", ) installation_ctx.table_mapping.skip_table_or_view(dst_schema.name, second_table.name, load_table=lambda *_: table) + + # The assessment workflow is a prerequisite, and now verified by the workflow: it needs to successfully complete + # before we can test the migration workflow. + installation_ctx.deployed_workflows.run_workflow("assessment") + assert installation_ctx.deployed_workflows.validate_step("assessment"), "Workflow failed: assessment" + + # The workflow under test. installation_ctx.deployed_workflows.run_workflow("migrate-tables") assert installation_ctx.deployed_workflows.validate_step("migrate-tables") diff --git a/tests/unit/hive_metastore/test_workflows.py b/tests/unit/hive_metastore/test_workflows.py index c3810268b4..e0d4eaf84a 100644 --- a/tests/unit/hive_metastore/test_workflows.py +++ b/tests/unit/hive_metastore/test_workflows.py @@ -3,6 +3,9 @@ import pytest +from databricks.sdk.service.catalog import CatalogInfo, MetastoreAssignment +from databricks.sdk.service.jobs import BaseRun, RunResultState, RunState + from databricks.labs.ucx.framework.tasks import Workflow from databricks.labs.ucx.hive_metastore.workflows import ( TableMigration, @@ -67,6 +70,31 @@ def test_migrate_ctas_views(run_workflow): ) +@pytest.mark.parametrize( + "task", + [getattr(workflow, "verify_prerequisites") for workflow in _migration_workflows], + ids=[workflow.__name__ for workflow in _migration_workflows], +) +def test_with_valid_prerequisites(ws, run_workflow, task) -> None: + ws.metastores.current.return_value = MetastoreAssignment(metastore_id="test", workspace_id=123456789) + ws.catalogs.get.return_value = CatalogInfo() + ws.jobs.list_runs.return_value = [BaseRun(state=RunState(result_state=RunResultState.SUCCESS))] + run_workflow(task, workspace_client=ws) + # run_workflow will raise RuntimeError if the prerequisites could not be verified. + + +@pytest.mark.parametrize( + "task", + [getattr(workflow, "verify_prerequisites") for workflow in _migration_workflows], + ids=[workflow.__name__ for workflow in _migration_workflows], +) +def test_with_invalid_prerequisites(ws, run_workflow, task) -> None: + """All invalid prerequisites permutations are tested for `VerifyProgressTracking` separately.""" + ws.metastores.current.return_value = None + with pytest.raises(RuntimeWarning, match="Metastore not attached to workspace."): + run_workflow(task, workspace_client=ws) + + @pytest.mark.parametrize( "workflow", # Special case here for ScanTablesInMounts, handled below. From f638cb5c2a61cef418ff25981d1441bfcfb8543c Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 13 Nov 2024 17:32:20 +0100 Subject: [PATCH 14/32] No need to mention the assessment; we won't reach this point of the workflow if the assessment has not completed. --- .../labs/ucx/hive_metastore/workflows.py | 25 ++++--------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/workflows.py b/src/databricks/labs/ucx/hive_metastore/workflows.py index d1311acb10..46f6fc8623 100644 --- a/src/databricks/labs/ucx/hive_metastore/workflows.py +++ b/src/databricks/labs/ucx/hive_metastore/workflows.py @@ -61,10 +61,7 @@ def migrate_views(self, ctx: RuntimeContext): @job_task(job_cluster="table_migration") def verify_prerequisites(self, ctx: RuntimeContext) -> None: - """Verify the prerequisites for running this job on the table migration cluster are fulfilled. - - We will wait up to 1 hour for the assessment run to finish if it is running or pending. - """ + """Verify the prerequisites for running this job on the table migration cluster are fulfilled.""" ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) @job_task( @@ -133,10 +130,7 @@ def migrate_views(self, ctx: RuntimeContext): @job_task(job_cluster="table_migration") def verify_prerequisites(self, ctx: RuntimeContext) -> None: - """Verify the prerequisites for running this job on the table migration cluster are fulfilled. - - We will wait up to 1 hour for the assessment run to finish if it is running or pending. - """ + """Verify the prerequisites for running this job on the table migration cluster are fulfilled.""" ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) @job_task(depends_on=[verify_prerequisites, migrate_views]) @@ -201,10 +195,7 @@ def migrate_views(self, ctx: RuntimeContext): @job_task(job_cluster="table_migration") def verify_prerequisites(self, ctx: RuntimeContext) -> None: - """Verify the prerequisites for running this job on the table migration cluster are fulfilled. - - We will wait up to 1 hour for the assessment run to finish if it is running or pending. - """ + """Verify the prerequisites for running this job on the table migration cluster are fulfilled.""" ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) @job_task(depends_on=[verify_prerequisites, migrate_views, migrate_hive_serde_ctas, migrate_other_external_ctas]) @@ -252,10 +243,7 @@ def scan_tables_in_mounts_experimental(self, ctx: RuntimeContext): @job_task(job_cluster="table_migration") def verify_prerequisites(self, ctx: RuntimeContext) -> None: - """Verify the prerequisites for running this job on the table migration cluster are fulfilled. - - We will wait up to 1 hour for the assessment run to finish if it is running or pending. - """ + """Verify the prerequisites for running this job on the table migration cluster are fulfilled.""" ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) @job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, scan_tables_in_mounts_experimental]) @@ -290,10 +278,7 @@ def migrate_tables_in_mounts_experimental(self, ctx: RuntimeContext): @job_task(job_cluster="table_migration") def verify_prerequisites(self, ctx: RuntimeContext) -> None: - """Verify the prerequisites for running this job on the table migration cluster are fulfilled. - - We will wait up to 1 hour for the assessment run to finish if it is running or pending. - """ + """Verify the prerequisites for running this job on the table migration cluster are fulfilled.""" ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) @job_task(depends_on=[verify_prerequisites, migrate_tables_in_mounts_experimental]) From 2d398f49437bc1625fd271c51457301bfc3333a3 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 13 Nov 2024 17:33:03 +0100 Subject: [PATCH 15/32] Use TODO marker instead of warning to highlight what we'd prefer to happen here. --- src/databricks/labs/ucx/hive_metastore/workflows.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/workflows.py b/src/databricks/labs/ucx/hive_metastore/workflows.py index 46f6fc8623..10b3bf4c4a 100644 --- a/src/databricks/labs/ucx/hive_metastore/workflows.py +++ b/src/databricks/labs/ucx/hive_metastore/workflows.py @@ -93,7 +93,7 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: """Update the history log with the latest tables inventory and migration status.""" # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the # history log. - # WARNING: this will fail if the tables inventory is empty, because it will then try to perform a crawl. + # TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty. history_log = ctx.tables_progress tables_snapshot = ctx.tables_crawler.snapshot() # Note: encoding the Table records will trigger loading of the migration-status data. @@ -153,7 +153,7 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: """Update the history log with the latest tables inventory and migration status.""" # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the # history log. - # WARNING: this will fail if the tables inventory is empty, because it will then try to perform a crawl. + # TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty. history_log = ctx.tables_progress tables_snapshot = ctx.tables_crawler.snapshot() # Note: encoding the Table records will trigger loading of the migration-status data. @@ -218,7 +218,7 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: """Update the history log with the latest tables inventory and migration status.""" # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the # history log. - # WARNING: this will fail if the tables inventory is empty, because it will then try to perform a crawl. + # TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty. history_log = ctx.tables_progress tables_snapshot = ctx.tables_crawler.snapshot() # Note: encoding the Table records will trigger loading of the migration-status data. @@ -255,7 +255,7 @@ def update_migration_status(self, ctx: RuntimeContext) -> None: @job_task(job_cluster="table_migration", depends_on=[verify_prerequisites, update_migration_status]) def update_tables_history_log(self, ctx: RuntimeContext) -> None: """Update the history log with the latest tables inventory and migration status.""" - # WARNING: this will fail if the tables inventory is empty, because it will then try to perform a crawl. + # TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty. history_log = ctx.tables_progress tables_snapshot = ctx.tables_crawler.snapshot() # Note: encoding the Table records will trigger loading of the migration-status data. @@ -301,7 +301,7 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: """Update the history log with the latest tables inventory and migration status.""" # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the # history log. - # WARNING: this will fail if the tables inventory is empty, because it will then try to perform a crawl. + # TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty. history_log = ctx.tables_progress tables_snapshot = ctx.tables_crawler.snapshot() # Note: encoding the Table records will trigger loading of the migration-status data. From dbafbac304e0ec2b7d663767aac0e4c80d491025 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 20 Nov 2024 11:42:21 +0100 Subject: [PATCH 16/32] Ensure the assessment has finished before table-migration runs, and that the migration-progress installation has completed.` --- tests/integration/hive_metastore/test_ext_hms.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tests/integration/hive_metastore/test_ext_hms.py b/tests/integration/hive_metastore/test_ext_hms.py index f39bd44b91..4ec25fca1a 100644 --- a/tests/integration/hive_metastore/test_ext_hms.py +++ b/tests/integration/hive_metastore/test_ext_hms.py @@ -8,6 +8,8 @@ from databricks.sdk.errors import NotFound, InvalidParameterValue from databricks.sdk.retries import retried +from databricks.labs.ucx.progress.install import ProgressTrackingInstallation + logger = logging.getLogger(__name__) @@ -40,9 +42,14 @@ def test_migration_job_ext_hms(ws, installation_ctx, prepare_tables_for_migratio r"Choose a cluster policy": "0", }, ) - ext_hms_ctx.workspace_installation.run() - ext_hms_ctx.deployed_workflows.run_workflow("migrate-tables") + ProgressTrackingInstallation(ext_hms_ctx.sql_backend, ext_hms_ctx.ucx_catalog).run() + + # The assessment workflow is a prerequisite, and now verified by the workflow: it needs to successfully complete + # before we can test the migration workflow. + ext_hms_ctx.deployed_workflows.run_workflow("assessment") + assert ext_hms_ctx.deployed_workflows.validate_step("assessment"), "Workflow failed: assessment" + # assert the workflow is successful assert ext_hms_ctx.deployed_workflows.validate_step("migrate-tables") From 839ac76647c0a66b6e375e81150b002da5617b00 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 20 Nov 2024 11:43:46 +0100 Subject: [PATCH 17/32] Ensure the progress-migration catalog is configured. --- tests/integration/hive_metastore/test_workflows.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index e12b9ecd90..0342b6d943 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -107,6 +107,7 @@ def test_table_migration_for_managed_table(ws, installation_ctx, prepare_tables_ }, ) ctx.workspace_installation.run() + ProgressTrackingInstallation(ctx.sql_backend, ctx.ucx_catalog).run() # The assessment workflow is a prerequisite, and now verified by the workflow: it needs to successfully complete # before we can test the migration workflow. @@ -138,6 +139,7 @@ def test_hiveserde_table_in_place_migration_job(ws, installation_ctx, prepare_ta }, ) ctx.workspace_installation.run() + ProgressTrackingInstallation(ctx.sql_backend, ctx.ucx_catalog).run() # The assessment workflow is a prerequisite, and now verified by the workflow: it needs to successfully complete # before we can test the migration workflow. @@ -165,6 +167,7 @@ def test_hiveserde_table_ctas_migration_job(ws, installation_ctx, prepare_tables }, ) ctx.workspace_installation.run() + ProgressTrackingInstallation(ctx.sql_backend, ctx.ucx_catalog).run() # The assessment workflow is a prerequisite, and now verified by the workflow: it needs to successfully complete # before we can test the migration workflow. @@ -189,6 +192,7 @@ def test_table_migration_job_publishes_remaining_tables( ): tables, dst_schema = prepare_tables_for_migration installation_ctx.workspace_installation.run() + ProgressTrackingInstallation(installation_ctx.sql_backend, installation_ctx.ucx_catalog).run() second_table = list(tables.values())[1] table = Table( "hive_metastore", From 862b5bff37fabd60caebad57a2bd9ddbb2b0909f Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 20 Nov 2024 11:44:25 +0100 Subject: [PATCH 18/32] Remove unused fixture. --- tests/integration/hive_metastore/test_workflows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index 0342b6d943..c8a03e229a 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -188,7 +188,7 @@ def test_hiveserde_table_ctas_migration_job(ws, installation_ctx, prepare_tables @pytest.mark.parametrize('prepare_tables_for_migration', ['regular'], indirect=True) def test_table_migration_job_publishes_remaining_tables( - ws, installation_ctx, sql_backend, prepare_tables_for_migration, caplog + installation_ctx, sql_backend, prepare_tables_for_migration, caplog ): tables, dst_schema = prepare_tables_for_migration installation_ctx.workspace_installation.run() From a074fd750d14616d07659a7cd28c154a64b04827 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Mon, 25 Nov 2024 14:02:33 +0100 Subject: [PATCH 19/32] Remove unused fixture. --- tests/integration/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 13007f0165..fbfe381ebd 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -1223,7 +1223,7 @@ def prepare_regular_tables(context, external_csv, schema) -> dict[str, TableInfo @pytest.fixture def prepare_tables_for_migration( - ws, installation_ctx, make_catalog, make_random, make_mounted_location, env_or_skip, make_storage_dir, request + installation_ctx, make_catalog, make_random, make_mounted_location, env_or_skip, make_storage_dir, request ) -> tuple[dict[str, TableInfo], SchemaInfo]: # Here we use pytest indirect parametrization, so the test function can pass arguments to this fixture and the # arguments will be available in the request.param. If the argument is "hiveserde", we will prepare hiveserde From 034c91b1fec11bd1b5e93cfa771dd5486d9ccf36 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Mon, 25 Nov 2024 14:04:10 +0100 Subject: [PATCH 20/32] Split over several lines to make debugging easier. --- tests/integration/hive_metastore/test_workflows.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index c8a03e229a..2085d601be 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -33,12 +33,14 @@ def test_table_migration_job_refreshes_migration_status( # The assessment workflow is a prerequisite, and now verified by the workflow: it needs to successfully complete # before we can test these workflows. - installation_ctx.deployed_workflows.run_workflow("assessment") - assert installation_ctx.deployed_workflows.validate_step("assessment"), "Workflow failed: assessment" + installation_ctx.deployed_workflows.run_workflow("assessment", skip_job_wait=True) + assessment_completed_correctly = installation_ctx.deployed_workflows.validate_step("assessment") + assert assessment_completed_correctly, "Workflow failed: assessment" # The workflow under test. - run_id = ctx.deployed_workflows.run_workflow(workflow) - assert installation_ctx.deployed_workflows.validate_step(workflow), f"Workflow failed: {workflow}" + run_id = ctx.deployed_workflows.run_workflow(workflow, skip_job_wait=True) + workflow_completed_correctly = installation_ctx.deployed_workflows.validate_step("assessment") + assert workflow_completed_correctly, f"Workflow failed: {workflow}" # Avoiding MigrationStatusRefresh as it will refresh the status before fetching. migration_status_query = f"SELECT * FROM {ctx.migration_status_refresher.full_name}" From dd74dd9a139332d5eb615e6da3584f9ca8ba587e Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Mon, 25 Nov 2024 14:53:32 +0100 Subject: [PATCH 21/32] Refactor for debugging. --- tests/integration/hive_metastore/test_ext_hms.py | 5 +++-- tests/integration/hive_metastore/test_workflows.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/integration/hive_metastore/test_ext_hms.py b/tests/integration/hive_metastore/test_ext_hms.py index 4ec25fca1a..3d448acd03 100644 --- a/tests/integration/hive_metastore/test_ext_hms.py +++ b/tests/integration/hive_metastore/test_ext_hms.py @@ -47,8 +47,9 @@ def test_migration_job_ext_hms(ws, installation_ctx, prepare_tables_for_migratio # The assessment workflow is a prerequisite, and now verified by the workflow: it needs to successfully complete # before we can test the migration workflow. - ext_hms_ctx.deployed_workflows.run_workflow("assessment") - assert ext_hms_ctx.deployed_workflows.validate_step("assessment"), "Workflow failed: assessment" + ext_hms_ctx.deployed_workflows.run_workflow("assessment", skip_job_wait=True) + assessment_completed_correctly = ext_hms_ctx.deployed_workflows.validate_step("assessment") + assert assessment_completed_correctly, "Workflow failed: assessment" # assert the workflow is successful assert ext_hms_ctx.deployed_workflows.validate_step("migrate-tables") diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index 2085d601be..b86ed82677 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -39,7 +39,7 @@ def test_table_migration_job_refreshes_migration_status( # The workflow under test. run_id = ctx.deployed_workflows.run_workflow(workflow, skip_job_wait=True) - workflow_completed_correctly = installation_ctx.deployed_workflows.validate_step("assessment") + workflow_completed_correctly = installation_ctx.deployed_workflows.validate_step(workflow) assert workflow_completed_correctly, f"Workflow failed: {workflow}" # Avoiding MigrationStatusRefresh as it will refresh the status before fetching. From 4dde2204bd8e628dd9dcf056c1c7417e5edff111 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Mon, 25 Nov 2024 14:53:46 +0100 Subject: [PATCH 22/32] Fix test to invoke the workflow its verifying. --- tests/integration/hive_metastore/test_ext_hms.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/integration/hive_metastore/test_ext_hms.py b/tests/integration/hive_metastore/test_ext_hms.py index 3d448acd03..7813079566 100644 --- a/tests/integration/hive_metastore/test_ext_hms.py +++ b/tests/integration/hive_metastore/test_ext_hms.py @@ -52,7 +52,9 @@ def test_migration_job_ext_hms(ws, installation_ctx, prepare_tables_for_migratio assert assessment_completed_correctly, "Workflow failed: assessment" # assert the workflow is successful - assert ext_hms_ctx.deployed_workflows.validate_step("migrate-tables") + ext_hms_ctx.deployed_workflows.run_workflow("migrate-tables", skip_job_wait=True) + migrate_tables_completed_correctly = ext_hms_ctx.deployed_workflows.validate_step("migrate-tables") + assert migrate_tables_completed_correctly, "Workflow failed: migrate-tables" # assert the tables are migrated for table in tables.values(): From 2dbc2e40d3efb806a8331aefcf418296af9e08a1 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Tue, 3 Dec 2024 14:30:00 +0100 Subject: [PATCH 23/32] Adjust the debugging convenience. --- tests/integration/assessment/test_ext_hms.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/integration/assessment/test_ext_hms.py b/tests/integration/assessment/test_ext_hms.py index 0e43c80dfa..a742c116dd 100644 --- a/tests/integration/assessment/test_ext_hms.py +++ b/tests/integration/assessment/test_ext_hms.py @@ -40,8 +40,9 @@ def test_running_real_assessment_job_ext_hms( ext_hms_ctx.workspace_installation.run() workflow = "assessment" - ext_hms_ctx.deployed_workflows.run_workflow(workflow) - assert ext_hms_ctx.deployed_workflows.validate_step(workflow), f"Workflow failed: {workflow}" + ext_hms_ctx.deployed_workflows.run_workflow(workflow, skip_job_wait=True) + workflow_completed_successfully = ext_hms_ctx.deployed_workflows.validate_step(workflow) + assert workflow_completed_successfully, f"Workflow failed: {workflow}" after = ext_hms_ctx.generic_permissions_support.load_as_dict("cluster-policies", cluster_policy.policy_id) assert ws_group.display_name in after, f"Group {ws_group.display_name} not found in cluster policy" From e30d2cc665897859919f8b9a672531d35f6ac9ce Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 4 Dec 2024 14:28:09 +0100 Subject: [PATCH 24/32] Configure test with new infrastructure. The 'main' cluster needs to: a) have an external-HMS attached; b) have an isolation mode that supports the JVM bridge. --- tests/integration/hive_metastore/test_ext_hms.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/integration/hive_metastore/test_ext_hms.py b/tests/integration/hive_metastore/test_ext_hms.py index 0f8820c28f..5f8bde3a75 100644 --- a/tests/integration/hive_metastore/test_ext_hms.py +++ b/tests/integration/hive_metastore/test_ext_hms.py @@ -21,15 +21,16 @@ def sql_backend(ws, env_or_skip) -> SqlBackend: @retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=5)) @pytest.mark.parametrize('prepare_tables_for_migration', ['regular'], indirect=True) -def test_migration_job_ext_hms(ws, installation_ctx, prepare_tables_for_migration, env_or_skip): - ext_hms_cluster_id = env_or_skip("TEST_EXT_HMS_CLUSTER_ID") +def test_migration_job_ext_hms(ws, installation_ctx, prepare_tables_for_migration, env_or_skip) -> None: + ext_hms_main_cluster_id = env_or_skip("TEST_EXT_HMS_NOUC_CLUSTER_ID") + ext_hms_table_migration_cluster_id = env_or_skip("TEST_EXT_HMS_CLUSTER_ID") tables, dst_schema = prepare_tables_for_migration ext_hms_ctx = installation_ctx.replace( config_transform=lambda wc: dataclasses.replace( wc, override_clusters={ - "main": ext_hms_cluster_id, - "user_isolation": ext_hms_cluster_id, + "main": ext_hms_main_cluster_id, + "user_isolation": ext_hms_table_migration_cluster_id, }, ), extend_prompts={ From c777794227b24da7a2ab04c94a9a3561a3d081bf Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Wed, 4 Dec 2024 15:02:27 +0100 Subject: [PATCH 25/32] Fix linting problems. --- tests/integration/hive_metastore/test_ext_hms.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/integration/hive_metastore/test_ext_hms.py b/tests/integration/hive_metastore/test_ext_hms.py index 5f8bde3a75..26e08b80c7 100644 --- a/tests/integration/hive_metastore/test_ext_hms.py +++ b/tests/integration/hive_metastore/test_ext_hms.py @@ -22,15 +22,15 @@ def sql_backend(ws, env_or_skip) -> SqlBackend: @retried(on=[NotFound, InvalidParameterValue], timeout=timedelta(minutes=5)) @pytest.mark.parametrize('prepare_tables_for_migration', ['regular'], indirect=True) def test_migration_job_ext_hms(ws, installation_ctx, prepare_tables_for_migration, env_or_skip) -> None: - ext_hms_main_cluster_id = env_or_skip("TEST_EXT_HMS_NOUC_CLUSTER_ID") - ext_hms_table_migration_cluster_id = env_or_skip("TEST_EXT_HMS_CLUSTER_ID") + main_cluster_id = env_or_skip("TEST_EXT_HMS_NOUC_CLUSTER_ID") + table_migration_cluster_id = env_or_skip("TEST_EXT_HMS_CLUSTER_ID") tables, dst_schema = prepare_tables_for_migration ext_hms_ctx = installation_ctx.replace( config_transform=lambda wc: dataclasses.replace( wc, override_clusters={ - "main": ext_hms_main_cluster_id, - "user_isolation": ext_hms_table_migration_cluster_id, + "main": main_cluster_id, + "user_isolation": table_migration_cluster_id, }, ), extend_prompts={ @@ -49,13 +49,13 @@ def test_migration_job_ext_hms(ws, installation_ctx, prepare_tables_for_migratio # The assessment workflow is a prerequisite, and now verified by the workflow: it needs to successfully complete # before we can test the migration workflow. ext_hms_ctx.deployed_workflows.run_workflow("assessment", skip_job_wait=True) - assessment_completed_correctly = ext_hms_ctx.deployed_workflows.validate_step("assessment") - assert assessment_completed_correctly, "Workflow failed: assessment" + workflow_completed_correctly = ext_hms_ctx.deployed_workflows.validate_step("assessment") + assert workflow_completed_correctly, "Workflow failed: assessment" # assert the workflow is successful ext_hms_ctx.deployed_workflows.run_workflow("migrate-tables", skip_job_wait=True) - migrate_tables_completed_correctly = ext_hms_ctx.deployed_workflows.validate_step("migrate-tables") - assert migrate_tables_completed_correctly, "Workflow failed: migrate-tables" + workflow_completed_correctly = ext_hms_ctx.deployed_workflows.validate_step("migrate-tables") + assert workflow_completed_correctly, "Workflow failed: migrate-tables" # assert the tables are migrated for table in tables.values(): From babbc69dc0a08073fbe4c344389820e033df7df0 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Mon, 9 Dec 2024 14:04:43 +0100 Subject: [PATCH 26/32] Rename task to verify progress-tracking prerequisites. --- .../labs/ucx/hive_metastore/workflows.py | 48 +++++++++---------- tests/unit/hive_metastore/test_workflows.py | 4 +- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/workflows.py b/src/databricks/labs/ucx/hive_metastore/workflows.py index 40db431ae2..cc79ba81e0 100644 --- a/src/databricks/labs/ucx/hive_metastore/workflows.py +++ b/src/databricks/labs/ucx/hive_metastore/workflows.py @@ -60,7 +60,7 @@ def migrate_views(self, ctx: RuntimeContext): ctx.tables_migrator.migrate_tables(what=What.VIEW) @job_task(job_cluster="user_isolation") - def verify_prerequisites(self, ctx: RuntimeContext) -> None: + def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None: """Verify the prerequisites for running this job on the table migration cluster are fulfilled.""" ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) @@ -71,7 +71,7 @@ def verify_prerequisites(self, ctx: RuntimeContext) -> None: migrate_dbfs_root_delta_tables, migrate_dbfs_root_non_delta_tables, migrate_views, - verify_prerequisites, + verify_progress_tracking_prerequisites, ], ) def update_table_inventory(self, ctx: RuntimeContext) -> None: @@ -81,14 +81,14 @@ def update_table_inventory(self, ctx: RuntimeContext) -> None: # Step 1 of 3: Just refresh the tables inventory. ctx.tables_crawler.snapshot(force_refresh=True) - @job_task(depends_on=[verify_prerequisites, update_table_inventory], job_cluster="user_isolation") + @job_task(depends_on=[verify_progress_tracking_prerequisites, update_table_inventory], job_cluster="user_isolation") def update_migration_status(self, ctx: RuntimeContext) -> None: """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.) updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) ctx.tables_migrator.check_remaining_tables(updated_migration_progress) - @job_task(depends_on=[verify_prerequisites, update_migration_status], job_cluster="user_isolation") + @job_task(depends_on=[verify_progress_tracking_prerequisites, update_migration_status], job_cluster="user_isolation") def update_tables_history_log(self, ctx: RuntimeContext) -> None: """Update the history log with the latest tables inventory and migration status.""" # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the @@ -99,7 +99,7 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: # Note: encoding the Table records will trigger loading of the migration-status data. history_log.append_inventory_snapshot(tables_snapshot) - @job_task(job_cluster="user_isolation", depends_on=[verify_prerequisites, update_tables_history_log]) + @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]) def record_workflow_run(self, ctx: RuntimeContext) -> None: """Record the workflow run of this workflow.""" ctx.workflow_run_recorder.record() @@ -129,11 +129,11 @@ def migrate_views(self, ctx: RuntimeContext): ctx.tables_migrator.migrate_tables(what=What.VIEW) @job_task(job_cluster="user_isolation") - def verify_prerequisites(self, ctx: RuntimeContext) -> None: + def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None: """Verify the prerequisites for running this job on the table migration cluster are fulfilled.""" ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) - @job_task(depends_on=[verify_prerequisites, migrate_views]) + @job_task(depends_on=[verify_progress_tracking_prerequisites, migrate_views]) def update_table_inventory(self, ctx: RuntimeContext) -> None: """Refresh the tables inventory, prior to updating the migration status of all the tables.""" # The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not @@ -141,14 +141,14 @@ def update_table_inventory(self, ctx: RuntimeContext) -> None: # Step 1 of 3: Just refresh the tables inventory. ctx.tables_crawler.snapshot(force_refresh=True) - @job_task(job_cluster="user_isolation", depends_on=[verify_prerequisites, update_table_inventory]) + @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_table_inventory]) def update_migration_status(self, ctx: RuntimeContext) -> None: """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.) updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) ctx.tables_migrator.check_remaining_tables(updated_migration_progress) - @job_task(job_cluster="user_isolation", depends_on=[verify_prerequisites, update_migration_status]) + @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status]) def update_tables_history_log(self, ctx: RuntimeContext) -> None: """Update the history log with the latest tables inventory and migration status.""" # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the @@ -159,7 +159,7 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: # Note: encoding the Table records will trigger loading of the migration-status data. history_log.append_inventory_snapshot(tables_snapshot) - @job_task(job_cluster="user_isolation", depends_on=[verify_prerequisites, update_tables_history_log]) + @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]) def record_workflow_run(self, ctx: RuntimeContext) -> None: """Record the workflow run of this workflow.""" ctx.workflow_run_recorder.record() @@ -194,11 +194,11 @@ def migrate_views(self, ctx: RuntimeContext): ctx.tables_migrator.migrate_tables(what=What.VIEW) @job_task(job_cluster="user_isolation") - def verify_prerequisites(self, ctx: RuntimeContext) -> None: + def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None: """Verify the prerequisites for running this job on the table migration cluster are fulfilled.""" ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) - @job_task(depends_on=[verify_prerequisites, migrate_views, migrate_hive_serde_ctas, migrate_other_external_ctas]) + @job_task(depends_on=[verify_progress_tracking_prerequisites, migrate_views, migrate_hive_serde_ctas, migrate_other_external_ctas]) def update_table_inventory(self, ctx: RuntimeContext) -> None: """Refresh the tables inventory, prior to updating the migration status of all the tables.""" # The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not @@ -206,14 +206,14 @@ def update_table_inventory(self, ctx: RuntimeContext) -> None: # Step 1 of 3: Just refresh the tables inventory. ctx.tables_crawler.snapshot(force_refresh=True) - @job_task(job_cluster="user_isolation", depends_on=[verify_prerequisites, update_table_inventory]) + @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_table_inventory]) def update_migration_status(self, ctx: RuntimeContext) -> None: """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.) updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) ctx.tables_migrator.check_remaining_tables(updated_migration_progress) - @job_task(job_cluster="user_isolation", depends_on=[verify_prerequisites, update_migration_status]) + @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status]) def update_tables_history_log(self, ctx: RuntimeContext) -> None: """Update the history log with the latest tables inventory and migration status.""" # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the @@ -224,7 +224,7 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: # Note: encoding the Table records will trigger loading of the migration-status data. history_log.append_inventory_snapshot(tables_snapshot) - @job_task(job_cluster="user_isolation", depends_on=[verify_prerequisites, update_tables_history_log]) + @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]) def record_workflow_run(self, ctx: RuntimeContext) -> None: """Record the workflow run of this workflow.""" ctx.workflow_run_recorder.record() @@ -242,17 +242,17 @@ def scan_tables_in_mounts_experimental(self, ctx: RuntimeContext): ctx.tables_in_mounts.snapshot(force_refresh=True) @job_task(job_cluster="user_isolation") - def verify_prerequisites(self, ctx: RuntimeContext) -> None: + def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None: """Verify the prerequisites for running this job on the table migration cluster are fulfilled.""" ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) - @job_task(job_cluster="user_isolation", depends_on=[verify_prerequisites, scan_tables_in_mounts_experimental]) + @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, scan_tables_in_mounts_experimental]) def update_migration_status(self, ctx: RuntimeContext) -> None: """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) ctx.tables_migrator.check_remaining_tables(updated_migration_progress) - @job_task(job_cluster="user_isolation", depends_on=[verify_prerequisites, update_migration_status]) + @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status]) def update_tables_history_log(self, ctx: RuntimeContext) -> None: """Update the history log with the latest tables inventory and migration status.""" # TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty. @@ -261,7 +261,7 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: # Note: encoding the Table records will trigger loading of the migration-status data. history_log.append_inventory_snapshot(tables_snapshot) - @job_task(job_cluster="user_isolation", depends_on=[verify_prerequisites, update_tables_history_log]) + @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]) def record_workflow_run(self, ctx: RuntimeContext) -> None: """Record the workflow run of this workflow.""" ctx.workflow_run_recorder.record() @@ -277,11 +277,11 @@ def migrate_tables_in_mounts_experimental(self, ctx: RuntimeContext): ctx.tables_migrator.migrate_tables(what=What.TABLE_IN_MOUNT) @job_task(job_cluster="user_isolation") - def verify_prerequisites(self, ctx: RuntimeContext) -> None: + def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None: """Verify the prerequisites for running this job on the table migration cluster are fulfilled.""" ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) - @job_task(depends_on=[verify_prerequisites, migrate_tables_in_mounts_experimental]) + @job_task(depends_on=[verify_progress_tracking_prerequisites, migrate_tables_in_mounts_experimental]) def update_table_inventory(self, ctx: RuntimeContext) -> None: """Refresh the tables inventory, prior to updating the migration status of all the tables.""" # The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not @@ -289,14 +289,14 @@ def update_table_inventory(self, ctx: RuntimeContext) -> None: # Step 1 of 3: Just refresh the tables inventory. ctx.tables_crawler.snapshot(force_refresh=True) - @job_task(job_cluster="user_isolation", depends_on=[verify_prerequisites, update_table_inventory]) + @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_table_inventory]) def update_migration_status(self, ctx: RuntimeContext) -> None: """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.) updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) ctx.tables_migrator.check_remaining_tables(updated_migration_progress) - @job_task(job_cluster="user_isolation", depends_on=[verify_prerequisites, update_migration_status]) + @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status]) def update_tables_history_log(self, ctx: RuntimeContext) -> None: """Update the history log with the latest tables inventory and migration status.""" # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the @@ -307,7 +307,7 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: # Note: encoding the Table records will trigger loading of the migration-status data. history_log.append_inventory_snapshot(tables_snapshot) - @job_task(job_cluster="user_isolation", depends_on=[verify_prerequisites, update_tables_history_log]) + @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]) def record_workflow_run(self, ctx: RuntimeContext) -> None: """Record the workflow run of this workflow.""" ctx.workflow_run_recorder.record() diff --git a/tests/unit/hive_metastore/test_workflows.py b/tests/unit/hive_metastore/test_workflows.py index e0d4eaf84a..d4dd6fda66 100644 --- a/tests/unit/hive_metastore/test_workflows.py +++ b/tests/unit/hive_metastore/test_workflows.py @@ -72,7 +72,7 @@ def test_migrate_ctas_views(run_workflow): @pytest.mark.parametrize( "task", - [getattr(workflow, "verify_prerequisites") for workflow in _migration_workflows], + [getattr(workflow, "verify_progress_tracking_prerequisites") for workflow in _migration_workflows], ids=[workflow.__name__ for workflow in _migration_workflows], ) def test_with_valid_prerequisites(ws, run_workflow, task) -> None: @@ -85,7 +85,7 @@ def test_with_valid_prerequisites(ws, run_workflow, task) -> None: @pytest.mark.parametrize( "task", - [getattr(workflow, "verify_prerequisites") for workflow in _migration_workflows], + [getattr(workflow, "verify_progress_tracking_prerequisites") for workflow in _migration_workflows], ids=[workflow.__name__ for workflow in _migration_workflows], ) def test_with_invalid_prerequisites(ws, run_workflow, task) -> None: From 1e45dc034e86537e5d194ee2f5d21711ca64fa59 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Mon, 9 Dec 2024 14:06:39 +0100 Subject: [PATCH 27/32] Formatting. --- .../labs/ucx/hive_metastore/workflows.py | 54 ++++++++++++++----- 1 file changed, 42 insertions(+), 12 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/workflows.py b/src/databricks/labs/ucx/hive_metastore/workflows.py index cc79ba81e0..5752b94ce3 100644 --- a/src/databricks/labs/ucx/hive_metastore/workflows.py +++ b/src/databricks/labs/ucx/hive_metastore/workflows.py @@ -88,7 +88,9 @@ def update_migration_status(self, ctx: RuntimeContext) -> None: updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) ctx.tables_migrator.check_remaining_tables(updated_migration_progress) - @job_task(depends_on=[verify_progress_tracking_prerequisites, update_migration_status], job_cluster="user_isolation") + @job_task( + depends_on=[verify_progress_tracking_prerequisites, update_migration_status], job_cluster="user_isolation" + ) def update_tables_history_log(self, ctx: RuntimeContext) -> None: """Update the history log with the latest tables inventory and migration status.""" # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the @@ -99,7 +101,9 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: # Note: encoding the Table records will trigger loading of the migration-status data. history_log.append_inventory_snapshot(tables_snapshot) - @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]) + @job_task( + job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log] + ) def record_workflow_run(self, ctx: RuntimeContext) -> None: """Record the workflow run of this workflow.""" ctx.workflow_run_recorder.record() @@ -148,7 +152,9 @@ def update_migration_status(self, ctx: RuntimeContext) -> None: updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) ctx.tables_migrator.check_remaining_tables(updated_migration_progress) - @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status]) + @job_task( + job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status] + ) def update_tables_history_log(self, ctx: RuntimeContext) -> None: """Update the history log with the latest tables inventory and migration status.""" # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the @@ -159,7 +165,9 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: # Note: encoding the Table records will trigger loading of the migration-status data. history_log.append_inventory_snapshot(tables_snapshot) - @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]) + @job_task( + job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log] + ) def record_workflow_run(self, ctx: RuntimeContext) -> None: """Record the workflow run of this workflow.""" ctx.workflow_run_recorder.record() @@ -198,7 +206,14 @@ def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None: """Verify the prerequisites for running this job on the table migration cluster are fulfilled.""" ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) - @job_task(depends_on=[verify_progress_tracking_prerequisites, migrate_views, migrate_hive_serde_ctas, migrate_other_external_ctas]) + @job_task( + depends_on=[ + verify_progress_tracking_prerequisites, + migrate_views, + migrate_hive_serde_ctas, + migrate_other_external_ctas, + ] + ) def update_table_inventory(self, ctx: RuntimeContext) -> None: """Refresh the tables inventory, prior to updating the migration status of all the tables.""" # The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not @@ -213,7 +228,9 @@ def update_migration_status(self, ctx: RuntimeContext) -> None: updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) ctx.tables_migrator.check_remaining_tables(updated_migration_progress) - @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status]) + @job_task( + job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status] + ) def update_tables_history_log(self, ctx: RuntimeContext) -> None: """Update the history log with the latest tables inventory and migration status.""" # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the @@ -224,7 +241,9 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: # Note: encoding the Table records will trigger loading of the migration-status data. history_log.append_inventory_snapshot(tables_snapshot) - @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]) + @job_task( + job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log] + ) def record_workflow_run(self, ctx: RuntimeContext) -> None: """Record the workflow run of this workflow.""" ctx.workflow_run_recorder.record() @@ -246,13 +265,18 @@ def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None: """Verify the prerequisites for running this job on the table migration cluster are fulfilled.""" ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1)) - @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, scan_tables_in_mounts_experimental]) + @job_task( + job_cluster="user_isolation", + depends_on=[verify_progress_tracking_prerequisites, scan_tables_in_mounts_experimental], + ) def update_migration_status(self, ctx: RuntimeContext) -> None: """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) ctx.tables_migrator.check_remaining_tables(updated_migration_progress) - @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status]) + @job_task( + job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status] + ) def update_tables_history_log(self, ctx: RuntimeContext) -> None: """Update the history log with the latest tables inventory and migration status.""" # TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty. @@ -261,7 +285,9 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: # Note: encoding the Table records will trigger loading of the migration-status data. history_log.append_inventory_snapshot(tables_snapshot) - @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]) + @job_task( + job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log] + ) def record_workflow_run(self, ctx: RuntimeContext) -> None: """Record the workflow run of this workflow.""" ctx.workflow_run_recorder.record() @@ -296,7 +322,9 @@ def update_migration_status(self, ctx: RuntimeContext) -> None: updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) ctx.tables_migrator.check_remaining_tables(updated_migration_progress) - @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status]) + @job_task( + job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status] + ) def update_tables_history_log(self, ctx: RuntimeContext) -> None: """Update the history log with the latest tables inventory and migration status.""" # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the @@ -307,7 +335,9 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: # Note: encoding the Table records will trigger loading of the migration-status data. history_log.append_inventory_snapshot(tables_snapshot) - @job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]) + @job_task( + job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log] + ) def record_workflow_run(self, ctx: RuntimeContext) -> None: """Record the workflow run of this workflow.""" ctx.workflow_run_recorder.record() From c96676680a99f4bed4dd32f02faf168621108f06 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Mon, 9 Dec 2024 14:22:36 +0100 Subject: [PATCH 28/32] Rename method to better indicate purpose. --- .../labs/ucx/hive_metastore/table_migrate.py | 2 +- src/databricks/labs/ucx/hive_metastore/workflows.py | 10 +++++----- tests/unit/hive_metastore/test_table_migrate.py | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index 5cc6d4032f..7cfa394c23 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -60,7 +60,7 @@ def __init__( self._migrate_grants = migrate_grants self._external_locations = external_locations - def check_remaining_tables(self, migration_status: Iterable[TableMigrationStatus]) -> None: + def warn_about_remaining_non_migrated_tables(self, migration_status: Iterable[TableMigrationStatus]) -> None: migration_index = TableMigrationIndex(migration_status) for crawled_table in self._tables_crawler.snapshot(): if not migration_index.is_migrated(crawled_table.database, crawled_table.name): diff --git a/src/databricks/labs/ucx/hive_metastore/workflows.py b/src/databricks/labs/ucx/hive_metastore/workflows.py index 5752b94ce3..fc687c5459 100644 --- a/src/databricks/labs/ucx/hive_metastore/workflows.py +++ b/src/databricks/labs/ucx/hive_metastore/workflows.py @@ -86,7 +86,7 @@ def update_migration_status(self, ctx: RuntimeContext) -> None: """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.) updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) - ctx.tables_migrator.check_remaining_tables(updated_migration_progress) + ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress) @job_task( depends_on=[verify_progress_tracking_prerequisites, update_migration_status], job_cluster="user_isolation" @@ -150,7 +150,7 @@ def update_migration_status(self, ctx: RuntimeContext) -> None: """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.) updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) - ctx.tables_migrator.check_remaining_tables(updated_migration_progress) + ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress) @job_task( job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status] @@ -226,7 +226,7 @@ def update_migration_status(self, ctx: RuntimeContext) -> None: """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.) updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) - ctx.tables_migrator.check_remaining_tables(updated_migration_progress) + ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress) @job_task( job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status] @@ -272,7 +272,7 @@ def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None: def update_migration_status(self, ctx: RuntimeContext) -> None: """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) - ctx.tables_migrator.check_remaining_tables(updated_migration_progress) + ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress) @job_task( job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status] @@ -320,7 +320,7 @@ def update_migration_status(self, ctx: RuntimeContext) -> None: """Scan the tables (and views) in the inventory and record whether each has been migrated or not.""" # Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.) updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True) - ctx.tables_migrator.check_remaining_tables(updated_migration_progress) + ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress) @job_task( job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status] diff --git a/tests/unit/hive_metastore/test_table_migrate.py b/tests/unit/hive_metastore/test_table_migrate.py index 6d8eb42a3c..1712852585 100644 --- a/tests/unit/hive_metastore/test_table_migrate.py +++ b/tests/unit/hive_metastore/test_table_migrate.py @@ -1429,7 +1429,7 @@ def migration_status_from_table(table: Table) -> TableMigrationStatus: ) with caplog.at_level(logging.WARNING, logger="databricks.labs.ucx.hive_metastore"): - table_migrate.check_remaining_tables(migration_status_snapshot) + table_migrate.warn_about_remaining_non_migrated_tables(migration_status_snapshot) assert 'remained-hive-metastore-table: hive_metastore.schema1.table3' in caplog.messages table_crawler.snapshot.assert_called_once() From 5e653de6d41b7cc7997bb06e0582db1962fffe59 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Mon, 9 Dec 2024 14:30:28 +0100 Subject: [PATCH 29/32] Inline a local variable. There was a marginal benefit to ensuring the migration progress singleton could be initialized prior to loading the snapshot, but it wasn't really worth the eye-catching local. --- .../labs/ucx/hive_metastore/workflows.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/workflows.py b/src/databricks/labs/ucx/hive_metastore/workflows.py index fc687c5459..a17d37ddea 100644 --- a/src/databricks/labs/ucx/hive_metastore/workflows.py +++ b/src/databricks/labs/ucx/hive_metastore/workflows.py @@ -96,10 +96,9 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the # history log. # TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty. - history_log = ctx.tables_progress tables_snapshot = ctx.tables_crawler.snapshot() # Note: encoding the Table records will trigger loading of the migration-status data. - history_log.append_inventory_snapshot(tables_snapshot) + ctx.tables_progress.append_inventory_snapshot(tables_snapshot) @job_task( job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log] @@ -160,10 +159,9 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the # history log. # TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty. - history_log = ctx.tables_progress tables_snapshot = ctx.tables_crawler.snapshot() # Note: encoding the Table records will trigger loading of the migration-status data. - history_log.append_inventory_snapshot(tables_snapshot) + ctx.tables_progress.append_inventory_snapshot(tables_snapshot) @job_task( job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log] @@ -236,10 +234,9 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the # history log. # TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty. - history_log = ctx.tables_progress tables_snapshot = ctx.tables_crawler.snapshot() # Note: encoding the Table records will trigger loading of the migration-status data. - history_log.append_inventory_snapshot(tables_snapshot) + ctx.tables_progress.append_inventory_snapshot(tables_snapshot) @job_task( job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log] @@ -280,10 +277,9 @@ def update_migration_status(self, ctx: RuntimeContext) -> None: def update_tables_history_log(self, ctx: RuntimeContext) -> None: """Update the history log with the latest tables inventory and migration status.""" # TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty. - history_log = ctx.tables_progress tables_snapshot = ctx.tables_crawler.snapshot() # Note: encoding the Table records will trigger loading of the migration-status data. - history_log.append_inventory_snapshot(tables_snapshot) + ctx.tables_progress.append_inventory_snapshot(tables_snapshot) @job_task( job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log] @@ -330,10 +326,9 @@ def update_tables_history_log(self, ctx: RuntimeContext) -> None: # Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the # history log. # TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty. - history_log = ctx.tables_progress tables_snapshot = ctx.tables_crawler.snapshot() # Note: encoding the Table records will trigger loading of the migration-status data. - history_log.append_inventory_snapshot(tables_snapshot) + ctx.tables_progress.append_inventory_snapshot(tables_snapshot) @job_task( job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log] From ade16aead0d347790f6864e22b0f79388722be18 Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Tue, 10 Dec 2024 13:21:48 +0100 Subject: [PATCH 30/32] Remove misleading TODO marker in lieu of #3422. --- tests/integration/hive_metastore/test_workflows.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index b86ed82677..43f2d19188 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -12,7 +12,6 @@ ("regular", "migrate-tables"), ("hiveserde", "migrate-external-hiveserde-tables-in-place-experimental"), ("hiveserde", "migrate-external-tables-ctas"), - # TODO: Some workflows are missing here, and also need to be included in the tests. ], indirect=("prepare_tables_for_migration",), ) From bad20f2d4db56d0aac5e7142f7574f2a4049534e Mon Sep 17 00:00:00 2001 From: Andrew Snare Date: Tue, 10 Dec 2024 13:22:56 +0100 Subject: [PATCH 31/32] Use alternate plural spelling. --- src/databricks/labs/ucx/hive_metastore/table_migrate.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/databricks/labs/ucx/hive_metastore/table_migrate.py b/src/databricks/labs/ucx/hive_metastore/table_migrate.py index 7cfa394c23..1e7cb4abcf 100644 --- a/src/databricks/labs/ucx/hive_metastore/table_migrate.py +++ b/src/databricks/labs/ucx/hive_metastore/table_migrate.py @@ -60,8 +60,8 @@ def __init__( self._migrate_grants = migrate_grants self._external_locations = external_locations - def warn_about_remaining_non_migrated_tables(self, migration_status: Iterable[TableMigrationStatus]) -> None: - migration_index = TableMigrationIndex(migration_status) + def warn_about_remaining_non_migrated_tables(self, migration_statuses: Iterable[TableMigrationStatus]) -> None: + migration_index = TableMigrationIndex(migration_statuses) for crawled_table in self._tables_crawler.snapshot(): if not migration_index.is_migrated(crawled_table.database, crawled_table.name): logger.warning(f"remained-hive-metastore-table: {crawled_table.key}") From fe34a3b6f83544db670cf71df9034684985c503e Mon Sep 17 00:00:00 2001 From: Cor Zuurmond Date: Thu, 12 Dec 2024 17:31:43 +0100 Subject: [PATCH 32/32] Use context consistently --- .../hive_metastore/test_workflows.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/integration/hive_metastore/test_workflows.py b/tests/integration/hive_metastore/test_workflows.py index 43f2d19188..9991960d94 100644 --- a/tests/integration/hive_metastore/test_workflows.py +++ b/tests/integration/hive_metastore/test_workflows.py @@ -32,13 +32,13 @@ def test_table_migration_job_refreshes_migration_status( # The assessment workflow is a prerequisite, and now verified by the workflow: it needs to successfully complete # before we can test these workflows. - installation_ctx.deployed_workflows.run_workflow("assessment", skip_job_wait=True) - assessment_completed_correctly = installation_ctx.deployed_workflows.validate_step("assessment") + ctx.deployed_workflows.run_workflow("assessment", skip_job_wait=True) + assessment_completed_correctly = ctx.deployed_workflows.validate_step("assessment") assert assessment_completed_correctly, "Workflow failed: assessment" # The workflow under test. run_id = ctx.deployed_workflows.run_workflow(workflow, skip_job_wait=True) - workflow_completed_correctly = installation_ctx.deployed_workflows.validate_step(workflow) + workflow_completed_correctly = ctx.deployed_workflows.validate_step(workflow) assert workflow_completed_correctly, f"Workflow failed: {workflow}" # Avoiding MigrationStatusRefresh as it will refresh the status before fetching. @@ -73,22 +73,22 @@ def test_table_migration_job_refreshes_migration_status( # Ensure that the workflow populated the `workflow_runs` table. query = f""" - SELECT 1 FROM {installation_ctx.ucx_catalog}.multiworkspace.workflow_runs - WHERE workspace_id = {installation_ctx.workspace_id} + SELECT 1 FROM {ctx.ucx_catalog}.multiworkspace.workflow_runs + WHERE workspace_id = {ctx.workspace_id} AND workflow_run_id = {run_id} LIMIT 1 """ - assert any(installation_ctx.sql_backend.fetch(query)), f"No workflow run captured: {query}" + assert any(ctx.sql_backend.fetch(query)), f"No workflow run captured: {query}" # Ensure that the history file has table records written to it that correspond to this run. query = f""" - SELECT 1 from {installation_ctx.ucx_catalog}.multiworkspace.historical - WHERE workspace_id = {installation_ctx.workspace_id} + SELECT 1 from {ctx.ucx_catalog}.multiworkspace.historical + WHERE workspace_id = {ctx.workspace_id} AND job_run_id = {run_id} AND object_type = 'Table' LIMIT 1 """ - assert any(installation_ctx.sql_backend.fetch(query)), f"No snapshots captured to the history log: {query}" + assert any(ctx.sql_backend.fetch(query)), f"No snapshots captured to the history log: {query}" @pytest.mark.parametrize(