Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update table-migration workflows to also capture updated migration progress into the history log #3239

Open
wants to merge 52 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
905fefc
Remove some unnecessary code.
asnare Nov 6, 2024
c23c38a
Update table history logger to not trigger a refresh of the migration…
asnare Nov 6, 2024
9ebb26c
Consistent import.
asnare Nov 7, 2024
e861963
Update the various table-migration workflows to also update the histo…
asnare Nov 7, 2024
cfe1486
All workflows update the logs table.
asnare Nov 7, 2024
145cd7d
Table migration workflows also update the tables inventory (at the end).
asnare Nov 7, 2024
5289582
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 11, 2024
6e5e4ae
Switch to multi-line f""" """-string.
asnare Nov 11, 2024
778ad10
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 11, 2024
1ce8e05
Fix mock return value for crawler snapshot.
asnare Nov 11, 2024
78787df
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 12, 2024
788789f
Switch to specialisation (limited to TableProgressEncoder) for ensuri…
asnare Nov 12, 2024
891b3b7
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 12, 2024
f9bf219
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 13, 2024
da3b15b
Back out changes relating to the way the migration-status information…
asnare Nov 13, 2024
ac8586a
Back out more changes that are either not needed or made on other PRs.
asnare Nov 13, 2024
4b3717f
Remove comment that is no longer relevant.
asnare Nov 13, 2024
c80a9c6
Verify prerequisites for updating the migration-progress prior to the…
asnare Nov 13, 2024
f638cb5
No need to mention the assessment; we won't reach this point of the w…
asnare Nov 13, 2024
2d398f4
Use TODO marker instead of warning to highlight what we'd prefer to h…
asnare Nov 13, 2024
df9f689
Merge branch 'main' into more-workflow-history-snapshots
nfx Nov 13, 2024
4e579fd
Merge branch 'main' into more-workflow-history-snapshots
nfx Nov 18, 2024
e4d4220
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 18, 2024
14cdc77
Merge branch 'main' into more-workflow-history-snapshots
nfx Nov 18, 2024
1a32bd7
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 19, 2024
1df72bf
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 20, 2024
dbafbac
Ensure the assessment has finished before table-migration runs, and t…
asnare Nov 20, 2024
839ac76
Ensure the progress-migration catalog is configured.
asnare Nov 20, 2024
862b5bf
Remove unused fixture.
asnare Nov 20, 2024
54d06f7
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 20, 2024
12a1d9f
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 21, 2024
7774a78
Merge branch 'main' into more-workflow-history-snapshots
asnare Nov 25, 2024
a074fd7
Remove unused fixture.
asnare Nov 25, 2024
034c91b
Split over several lines to make debugging easier.
asnare Nov 25, 2024
dd74dd9
Refactor for debugging.
asnare Nov 25, 2024
4dde220
Fix test to invoke the workflow its verifying.
asnare Nov 25, 2024
007e23b
Merge branch 'main' into more-workflow-history-snapshots
asnare Dec 3, 2024
2dbc2e4
Adjust the debugging convenience.
asnare Dec 3, 2024
375276e
Merge branch 'main' into more-workflow-history-snapshots
asnare Dec 4, 2024
e30d2cc
Configure test with new infrastructure.
asnare Dec 4, 2024
c777794
Fix linting problems.
asnare Dec 4, 2024
b763886
Merge branch 'main' into more-workflow-history-snapshots
asnare Dec 9, 2024
c2127a5
Merge branch 'main' into more-workflow-history-snapshots
asnare Dec 9, 2024
babbc69
Rename task to verify progress-tracking prerequisites.
asnare Dec 9, 2024
1e45dc0
Formatting.
asnare Dec 9, 2024
c966766
Rename method to better indicate purpose.
asnare Dec 9, 2024
5e653de
Inline a local variable.
asnare Dec 9, 2024
ade16ae
Remove misleading TODO marker in lieu of #3422.
asnare Dec 10, 2024
bad20f2
Use alternate plural spelling.
asnare Dec 10, 2024
c13b423
Merge branch 'main' into more-workflow-history-snapshots
asnare Dec 11, 2024
7aa937f
Merge branch 'main' into more-workflow-history-snapshots
asnare Dec 11, 2024
fe34a3b
Use context consistently
JCZuurmond Dec 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/table_persistence.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Table utilization per workflow:

| Table | Generate Assessment | Update Migration Progress | Migrate Groups | Migrate External Tables | Upgrade Jobs | Migrate tables | Migrate Data Reconciliation |
|--------------------------|---------------------|---------------------------|----------------|-------------------------|--------------|----------------|-----------------------------|
| tables | RW | RW | | RO | | RO | |
| tables | RW | RW | | RW | | RW | |
| grants | RW | RW | | RW | | RW | |
| mounts | RW | | | RO | RO | RO | |
| permissions | RW | | RW | RO | | RO | |
Expand All @@ -30,7 +30,7 @@ Table utilization per workflow:
| query_problems | RW | RW | | | | | |
| workflow_problems | RW | RW | | | | | |
| udfs | RW | RW | RO | | | | |
| logs | RW | | RW | RW | | RW | RW |
| logs | RW | RW | RW | RW | RW | RW | RW |
| recon_results | | | | | | | RW |

**RW** - Read/Write, the job generates or updates the table.<br/>
Expand Down
15 changes: 8 additions & 7 deletions src/databricks/labs/ucx/hive_metastore/table_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import re
from collections import defaultdict
from collections.abc import Iterable
from functools import partial, cached_property

from databricks.labs.blueprint.parallel import Threads
Expand All @@ -18,8 +19,11 @@
TableMapping,
TableToMigrate,
)

from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatusRefresher, TableMigrationIndex
from databricks.labs.ucx.hive_metastore.table_migration_status import (
TableMigrationStatusRefresher,
TableMigrationStatus,
TableMigrationIndex,
)
from databricks.labs.ucx.hive_metastore.tables import (
MigrationCount,
Table,
Expand Down Expand Up @@ -56,14 +60,11 @@ def __init__(
self._migrate_grants = migrate_grants
self._external_locations = external_locations

def get_remaining_tables(self) -> list[Table]:
nfx marked this conversation as resolved.
Show resolved Hide resolved
migration_index = self.index(force_refresh=True)
table_rows = []
def warn_about_remaining_non_migrated_tables(self, migration_statuses: Iterable[TableMigrationStatus]) -> None:
migration_index = TableMigrationIndex(migration_statuses)
for crawled_table in self._tables_crawler.snapshot():
if not migration_index.is_migrated(crawled_table.database, crawled_table.name):
table_rows.append(crawled_table)
logger.warning(f"remained-hive-metastore-table: {crawled_table.key}")
return table_rows

def index(self, *, force_refresh: bool = False):
return self._migration_status_refresher.index(force_refresh=force_refresh)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def __init__(self, ws: WorkspaceClient, sql_backend: SqlBackend, schema, tables_
self._tables_crawler = tables_crawler

def index(self, *, force_refresh: bool = False) -> TableMigrationIndex:
return TableMigrationIndex(list(self.snapshot(force_refresh=force_refresh)))
return TableMigrationIndex(self.snapshot(force_refresh=force_refresh))

def get_seen_tables(self) -> dict[str, str]:
seen_tables: dict[str, str] = {}
Expand Down
220 changes: 200 additions & 20 deletions src/databricks/labs/ucx/hive_metastore/workflows.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import datetime as dt

from databricks.labs.ucx.assessment.workflows import Assessment
from databricks.labs.ucx.contexts.workflow_task import RuntimeContext
from databricks.labs.ucx.framework.tasks import Workflow, job_task
Expand Down Expand Up @@ -57,10 +59,53 @@ def migrate_views(self, ctx: RuntimeContext):
"""
ctx.tables_migrator.migrate_tables(what=What.VIEW)

@job_task(job_cluster="user_isolation", depends_on=[migrate_views])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
@job_task(job_cluster="user_isolation")
def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None:
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled."""
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This forces the UCX catalog to be created before table migration, while it was not a pre-requiste before


@job_task(
depends_on=[
convert_managed_table,
migrate_external_tables_sync,
migrate_dbfs_root_delta_tables,
migrate_dbfs_root_non_delta_tables,
migrate_views,
verify_progress_tracking_prerequisites,
],
)
def update_table_inventory(self, ctx: RuntimeContext) -> None:
"""Refresh the tables inventory, prior to updating the migration status of all the tables."""
# The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not
# UC-enabled, so we cannot both snapshot and update the history log from the same location.
# Step 1 of 3: Just refresh the tables inventory.
ctx.tables_crawler.snapshot(force_refresh=True)
asnare marked this conversation as resolved.
Show resolved Hide resolved

@job_task(depends_on=[verify_progress_tracking_prerequisites, update_table_inventory], job_cluster="user_isolation")
def update_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.)
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress)

@job_task(
depends_on=[verify_progress_tracking_prerequisites, update_migration_status], job_cluster="user_isolation"
)
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
"""Update the history log with the latest tables inventory and migration status."""
# Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the
# history log.
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty.
tables_snapshot = ctx.tables_crawler.snapshot()
# Note: encoding the Table records will trigger loading of the migration-status data.
ctx.tables_progress.append_inventory_snapshot(tables_snapshot)

@job_task(
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]
)
def record_workflow_run(self, ctx: RuntimeContext) -> None:
"""Record the workflow run of this workflow."""
ctx.workflow_run_recorder.record()


class MigrateHiveSerdeTablesInPlace(Workflow):
Expand All @@ -86,10 +131,44 @@ def migrate_views(self, ctx: RuntimeContext):
"""
ctx.tables_migrator.migrate_tables(what=What.VIEW)

@job_task(job_cluster="user_isolation", depends_on=[migrate_views])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
@job_task(job_cluster="user_isolation")
asnare marked this conversation as resolved.
Show resolved Hide resolved
def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None:
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled."""
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1))

@job_task(depends_on=[verify_progress_tracking_prerequisites, migrate_views])
def update_table_inventory(self, ctx: RuntimeContext) -> None:
"""Refresh the tables inventory, prior to updating the migration status of all the tables."""
# The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not
# UC-enabled, so we cannot both snapshot and update the history log from the same location.
# Step 1 of 3: Just refresh the tables inventory.
ctx.tables_crawler.snapshot(force_refresh=True)

@job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_table_inventory])
def update_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.)
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress)

@job_task(
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status]
)
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
"""Update the history log with the latest tables inventory and migration status."""
# Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the
# history log.
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty.
tables_snapshot = ctx.tables_crawler.snapshot()
# Note: encoding the Table records will trigger loading of the migration-status data.
ctx.tables_progress.append_inventory_snapshot(tables_snapshot)

@job_task(
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]
)
def record_workflow_run(self, ctx: RuntimeContext) -> None:
"""Record the workflow run of this workflow."""
ctx.workflow_run_recorder.record()


class MigrateExternalTablesCTAS(Workflow):
Expand Down Expand Up @@ -120,10 +199,51 @@ def migrate_views(self, ctx: RuntimeContext):
"""
ctx.tables_migrator.migrate_tables(what=What.VIEW)

@job_task(job_cluster="user_isolation", depends_on=[migrate_views])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
@job_task(job_cluster="user_isolation")
def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None:
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled."""
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1))

@job_task(
depends_on=[
verify_progress_tracking_prerequisites,
migrate_views,
migrate_hive_serde_ctas,
migrate_other_external_ctas,
]
)
def update_table_inventory(self, ctx: RuntimeContext) -> None:
"""Refresh the tables inventory, prior to updating the migration status of all the tables."""
# The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not
# UC-enabled, so cannot both snapshot and update the history log from the same location.
# Step 1 of 3: Just refresh the tables inventory.
ctx.tables_crawler.snapshot(force_refresh=True)

@job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_table_inventory])
def update_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.)
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress)

@job_task(
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status]
)
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
asnare marked this conversation as resolved.
Show resolved Hide resolved
"""Update the history log with the latest tables inventory and migration status."""
# Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the
# history log.
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty.
tables_snapshot = ctx.tables_crawler.snapshot()
# Note: encoding the Table records will trigger loading of the migration-status data.
ctx.tables_progress.append_inventory_snapshot(tables_snapshot)

@job_task(
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]
)
def record_workflow_run(self, ctx: RuntimeContext) -> None:
"""Record the workflow run of this workflow."""
ctx.workflow_run_recorder.record()


class ScanTablesInMounts(Workflow):
Expand All @@ -137,10 +257,36 @@ def scan_tables_in_mounts_experimental(self, ctx: RuntimeContext):
replacing any existing content that might be present."""
ctx.tables_in_mounts.snapshot(force_refresh=True)

@job_task(job_cluster="user_isolation", depends_on=[scan_tables_in_mounts_experimental])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
@job_task(job_cluster="user_isolation")
def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None:
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled."""
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1))

@job_task(
job_cluster="user_isolation",
depends_on=[verify_progress_tracking_prerequisites, scan_tables_in_mounts_experimental],
)
def update_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress)

@job_task(
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status]
)
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
"""Update the history log with the latest tables inventory and migration status."""
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty.
tables_snapshot = ctx.tables_crawler.snapshot()
# Note: encoding the Table records will trigger loading of the migration-status data.
ctx.tables_progress.append_inventory_snapshot(tables_snapshot)

@job_task(
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]
)
def record_workflow_run(self, ctx: RuntimeContext) -> None:
"""Record the workflow run of this workflow."""
ctx.workflow_run_recorder.record()


class MigrateTablesInMounts(Workflow):
Expand All @@ -152,7 +298,41 @@ def migrate_tables_in_mounts_experimental(self, ctx: RuntimeContext):
"""[EXPERIMENTAL] This workflow migrates `delta tables stored in mount points` to Unity Catalog using a Create Table statement."""
ctx.tables_migrator.migrate_tables(what=What.TABLE_IN_MOUNT)

@job_task(job_cluster="user_isolation", depends_on=[migrate_tables_in_mounts_experimental])
def update_migration_status(self, ctx: RuntimeContext):
"""Refresh the migration status to present it in the dashboard."""
ctx.tables_migrator.get_remaining_tables()
@job_task(job_cluster="user_isolation")
def verify_progress_tracking_prerequisites(self, ctx: RuntimeContext) -> None:
"""Verify the prerequisites for running this job on the table migration cluster are fulfilled."""
ctx.verify_progress_tracking.verify(timeout=dt.timedelta(hours=1))

@job_task(depends_on=[verify_progress_tracking_prerequisites, migrate_tables_in_mounts_experimental])
def update_table_inventory(self, ctx: RuntimeContext) -> None:
"""Refresh the tables inventory, prior to updating the migration status of all the tables."""
# The table inventory cannot be (quickly) crawled from the table_migration cluster, and the main cluster is not
# UC-enabled, so we cannot both snapshot and update the history log from the same location.
# Step 1 of 3: Just refresh the tables inventory.
ctx.tables_crawler.snapshot(force_refresh=True)

@job_task(job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_table_inventory])
def update_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
# Step 2 of 3: Refresh the migration status of all the tables (updated in the previous step on the main cluster.)
updated_migration_progress = ctx.migration_status_refresher.snapshot(force_refresh=True)
ctx.tables_migrator.warn_about_remaining_non_migrated_tables(updated_migration_progress)

@job_task(
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_migration_status]
)
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
"""Update the history log with the latest tables inventory and migration status."""
# Step 3 of 3: Assuming (due to depends-on) the inventory and migration status were refreshed, capture into the
# history log.
# TODO: Avoid triggering implicit refresh here if either the table or migration-status inventory is empty.
tables_snapshot = ctx.tables_crawler.snapshot()
# Note: encoding the Table records will trigger loading of the migration-status data.
ctx.tables_progress.append_inventory_snapshot(tables_snapshot)

@job_task(
job_cluster="user_isolation", depends_on=[verify_progress_tracking_prerequisites, update_tables_history_log]
)
def record_workflow_run(self, ctx: RuntimeContext) -> None:
"""Record the workflow run of this workflow."""
ctx.workflow_run_recorder.record()
Loading
Loading