Skip to content

Commit

Permalink
Add table progress encoder (#3083)
Browse files Browse the repository at this point in the history
## Changes
Add pipeline progress encoder

### Linked issues

Resolves #3061
Resolves #3064

### Functionality

- [x] modified existing workflow: `migration-progress`

### Tests

- [x] added unit tests
  • Loading branch information
JCZuurmond authored Oct 28, 2024
1 parent 522f48a commit 6432a28
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 45 deletions.
29 changes: 8 additions & 21 deletions src/databricks/labs/ucx/contexts/workflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from databricks.labs.blueprint.installation import Installation
from databricks.labs.lsql.backends import RuntimeBackend, SqlBackend
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatus
from databricks.sdk import WorkspaceClient, core

from databricks.labs.ucx.__about__ import __version__
Expand All @@ -21,13 +20,14 @@
from databricks.labs.ucx.config import WorkspaceConfig
from databricks.labs.ucx.contexts.application import GlobalContext
from databricks.labs.ucx.hive_metastore import TablesInMounts, TablesCrawler
from databricks.labs.ucx.hive_metastore.grants import Grant
from databricks.labs.ucx.hive_metastore.table_size import TableSizeCrawler
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler, Table
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler
from databricks.labs.ucx.hive_metastore.udfs import Udf
from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder
from databricks.labs.ucx.progress.grants import GrantProgressEncoder
from databricks.labs.ucx.progress.history import ProgressEncoder
from databricks.labs.ucx.progress.jobs import JobsProgressEncoder
from databricks.labs.ucx.progress.tables import TableProgressEncoder
from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder

# As with GlobalContext, service factories unavoidably have a lot of public methods.
Expand Down Expand Up @@ -188,11 +188,10 @@ def policies_progress(self) -> ProgressEncoder[PolicyInfo]:
)

@cached_property
def grants_progress(self) -> ProgressEncoder[Grant]:
return ProgressEncoder(
def grants_progress(self) -> GrantProgressEncoder:
return GrantProgressEncoder(
self.sql_backend,
self.grant_ownership,
Grant,
self.parent_run_id,
self.workspace_id,
self.config.ucx_catalog,
Expand Down Expand Up @@ -221,23 +220,11 @@ def pipelines_progress(self) -> ProgressEncoder[PipelineInfo]:
)

@cached_property
def tables_progress(self) -> ProgressEncoder[Table]:
return ProgressEncoder(
def tables_progress(self) -> TableProgressEncoder:
return TableProgressEncoder(
self.sql_backend,
self.table_ownership,
Table,
self.parent_run_id,
self.workspace_id,
self.config.ucx_catalog,
)

@cached_property
def historical_table_migration_log(self) -> ProgressEncoder[TableMigrationStatus]:
# TODO: merge into tables_progress
return ProgressEncoder(
self.sql_backend,
self.table_migration_ownership,
TableMigrationStatus,
self.migration_status_refresher.index(force_refresh=False),
self.parent_run_id,
self.workspace_id,
self.config.ucx_catalog,
Expand Down
18 changes: 16 additions & 2 deletions src/databricks/labs/ucx/progress/grants.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,30 @@
from dataclasses import replace

from databricks.labs.ucx.hive_metastore.grants import Grant
from databricks.labs.lsql.backends import SqlBackend

from databricks.labs.ucx.hive_metastore.grants import Grant, GrantOwnership
from databricks.labs.ucx.progress.history import ProgressEncoder
from databricks.labs.ucx.progress.install import Historical


class GrantsProgressEncoder(ProgressEncoder[Grant]):
class GrantProgressEncoder(ProgressEncoder[Grant]):
"""Encoder class:Grant to class:History.
A failure for a grants implies it cannot be mapped to Unity Catalog.
"""

def __init__(
self,
sql_backend: SqlBackend,
ownership: GrantOwnership,
run_id: int,
workspace_id: int,
catalog: str,
schema: str = "multiworkspace",
table: str = "historical",
) -> None:
super().__init__(sql_backend, ownership, Grant, run_id, workspace_id, catalog, schema, table)

def _encode_record_as_historical(self, record: Grant) -> Historical:
historical = super()._encode_record_as_historical(record)
failures = []
Expand Down
54 changes: 54 additions & 0 deletions src/databricks/labs/ucx/progress/tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from dataclasses import replace

from databricks.labs.lsql.backends import SqlBackend

from databricks.labs.ucx.hive_metastore.tables import Table
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
from databricks.labs.ucx.hive_metastore.ownership import TableOwnership
from databricks.labs.ucx.progress.history import ProgressEncoder
from databricks.labs.ucx.progress.install import Historical


class TableProgressEncoder(ProgressEncoder[Table]):
"""Encoder class:Table to class:History.
A progress failure for a table means:
- the table is not migrated yet
- the associated grants have a failure
"""

def __init__(
self,
sql_backend: SqlBackend,
ownership: TableOwnership,
table_migration_index: TableMigrationIndex,
run_id: int,
workspace_id: int,
catalog: str,
schema: str = "multiworkspace",
table: str = "historical",
) -> None:
super().__init__(
sql_backend,
ownership,
Table,
run_id,
workspace_id,
catalog,
schema,
table,
)
self._table_migration_index = table_migration_index

def _encode_record_as_historical(self, record: Table) -> Historical:
"""Encode record as historical.
A table failure means that the table is pending migration. Grants are purposefully lef out, because a grant
might not be mappable to UC, like `READ_METADATA`, thus possibly resulting in false "pending migration" failure
for tables that are migrated to UC with their relevant grants also being migrated.
"""
historical = super()._encode_record_as_historical(record)
failures = []
if not self._table_migration_index.is_migrated(record.database, record.name):
failures.append("Pending migration")
return replace(historical, failures=historical.failures + failures)
17 changes: 7 additions & 10 deletions src/databricks/labs/ucx/progress/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ def crawl_tables(self, ctx: RuntimeContext) -> None:
ctx.tables_crawler.snapshot(force_refresh=True)

@job_task(depends_on=[verify_prerequisites, crawl_tables], job_cluster="table_migration")
def refresh_table_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
ctx.migration_status_refresher.snapshot(force_refresh=True)

@job_task(
depends_on=[verify_prerequisites, crawl_tables, refresh_table_migration_status], job_cluster="table_migration"
)
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
"""Update the history log with the latest tables inventory snapshot."""
# The table migration cluster is not legacy-ACL enabled, so we can't crawl from here.
Expand Down Expand Up @@ -136,16 +143,6 @@ def crawl_cluster_policies(self, ctx: RuntimeContext) -> None:
cluster_policies_snapshot = ctx.policies_crawler.snapshot(force_refresh=True)
history_log.append_inventory_snapshot(cluster_policies_snapshot)

@job_task(depends_on=[verify_prerequisites, crawl_tables, verify_prerequisites], job_cluster="table_migration")
def refresh_table_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not.
The results of the scan are stored in the `$inventory.migration_status` inventory table.
"""
history_log = ctx.historical_table_migration_log
migration_status_snapshot = ctx.migration_status_refresher.snapshot(force_refresh=True)
history_log.append_inventory_snapshot(migration_status_snapshot)

@job_task(depends_on=[verify_prerequisites])
def assess_dashboards(self, ctx: RuntimeContext):
"""Scans all dashboards for migration issues in SQL code of embedded widgets.
Expand Down
12 changes: 5 additions & 7 deletions tests/unit/progress/test_grants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from databricks.labs.ucx.framework.owners import Ownership
from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.labs.ucx.hive_metastore.grants import Grant
from databricks.labs.ucx.progress.grants import GrantsProgressEncoder
from databricks.labs.ucx.progress.grants import GrantProgressEncoder


@pytest.mark.parametrize(
Expand All @@ -17,13 +17,12 @@
Grant("principal", "USAGE", "catalog"),
],
)
def test_grants_progress_encoder_no_failures(mock_backend, grant: Grant) -> None:
def test_grant_progress_encoder_no_failures(mock_backend, grant: Grant) -> None:
ownership = create_autospec(Ownership)
ownership.owner_of.return_value = "user"
encoder = GrantsProgressEncoder(
encoder = GrantProgressEncoder(
mock_backend,
ownership,
Grant,
run_id=1,
workspace_id=123456789,
catalog="test",
Expand All @@ -50,13 +49,12 @@ def test_grants_progress_encoder_no_failures(mock_backend, grant: Grant) -> None
),
],
)
def test_grants_progress_encoder_failures(mock_backend, grant: Grant, failure: str) -> None:
def test_grant_progress_encoder_failures(mock_backend, grant: Grant, failure: str) -> None:
ownership = create_autospec(Ownership)
ownership.owner_of.return_value = "user"
encoder = GrantsProgressEncoder(
encoder = GrantProgressEncoder(
mock_backend,
ownership,
Grant,
run_id=1,
workspace_id=123456789,
catalog="test",
Expand Down
62 changes: 62 additions & 0 deletions tests/unit/progress/test_tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from unittest.mock import create_autospec

import pytest

from databricks.labs.ucx.framework.owners import Ownership
from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
from databricks.labs.ucx.hive_metastore.tables import Table
from databricks.labs.ucx.progress.grants import GrantProgressEncoder
from databricks.labs.ucx.progress.tables import TableProgressEncoder


@pytest.mark.parametrize(
"table",
[
Table("hive_metastore", "schema", "table", "MANAGED", "DELTA"),
],
)
def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None:
ownership = create_autospec(Ownership)
ownership.owner_of.return_value = "user"
table_migration_index = create_autospec(TableMigrationIndex)
table_migration_index.is_migrated.return_value = True
grant_progress_encoder = create_autospec(GrantProgressEncoder)
encoder = TableProgressEncoder(
mock_backend, ownership, table_migration_index, run_id=1, workspace_id=123456789, catalog="test"
)

encoder.append_inventory_snapshot([table])

rows = mock_backend.rows_written_for(escape_sql_identifier(encoder.full_name), "append")
assert len(rows) > 0, f"No rows written for: {encoder.full_name}"
assert len(rows[0].failures) == 0
ownership.owner_of.assert_called_once()
table_migration_index.is_migrated.assert_called_with(table.database, table.name)
grant_progress_encoder.assert_not_called()


@pytest.mark.parametrize(
"table",
[
Table("hive_metastore", "schema", "table", "MANAGED", "DELTA"),
],
)
def test_table_progress_encoder_pending_migration_failure(mock_backend, table: Table) -> None:
ownership = create_autospec(Ownership)
ownership.owner_of.return_value = "user"
table_migration_index = create_autospec(TableMigrationIndex)
table_migration_index.is_migrated.return_value = False
grant_progress_encoder = create_autospec(GrantProgressEncoder)
encoder = TableProgressEncoder(
mock_backend, ownership, table_migration_index, run_id=1, workspace_id=123456789, catalog="test"
)

encoder.append_inventory_snapshot([table])

rows = mock_backend.rows_written_for(escape_sql_identifier(encoder.full_name), "append")
assert len(rows) > 0, f"No rows written for: {encoder.full_name}"
assert rows[0].failures == ["Pending migration"]
ownership.owner_of.assert_called_once()
table_migration_index.is_migrated.assert_called_with(table.database, table.name)
grant_progress_encoder.assert_not_called()
5 changes: 0 additions & 5 deletions tests/unit/progress/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@
RuntimeContext.policies_crawler,
RuntimeContext.policies_progress,
),
(
MigrationProgress.refresh_table_migration_status,
RuntimeContext.migration_status_refresher,
RuntimeContext.historical_table_migration_log,
),
),
)
def test_migration_progress_runtime_refresh(run_workflow, task, crawler, history_log) -> None:
Expand Down

0 comments on commit 6432a28

Please sign in to comment.