Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add table progress encoder #3083

Merged
merged 15 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 8 additions & 21 deletions src/databricks/labs/ucx/contexts/workflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from databricks.labs.blueprint.installation import Installation
from databricks.labs.lsql.backends import RuntimeBackend, SqlBackend
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationStatus
from databricks.sdk import WorkspaceClient, core

from databricks.labs.ucx.__about__ import __version__
Expand All @@ -21,13 +20,14 @@
from databricks.labs.ucx.config import WorkspaceConfig
from databricks.labs.ucx.contexts.application import GlobalContext
from databricks.labs.ucx.hive_metastore import TablesInMounts, TablesCrawler
from databricks.labs.ucx.hive_metastore.grants import Grant
from databricks.labs.ucx.hive_metastore.table_size import TableSizeCrawler
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler, Table
from databricks.labs.ucx.hive_metastore.tables import FasterTableScanCrawler
from databricks.labs.ucx.hive_metastore.udfs import Udf
from databricks.labs.ucx.installer.logs import TaskRunWarningRecorder
from databricks.labs.ucx.progress.grants import GrantProgressEncoder
from databricks.labs.ucx.progress.history import ProgressEncoder
from databricks.labs.ucx.progress.jobs import JobsProgressEncoder
from databricks.labs.ucx.progress.tables import TableProgressEncoder
from databricks.labs.ucx.progress.workflow_runs import WorkflowRunRecorder

# As with GlobalContext, service factories unavoidably have a lot of public methods.
Expand Down Expand Up @@ -188,11 +188,10 @@ def policies_progress(self) -> ProgressEncoder[PolicyInfo]:
)

@cached_property
def grants_progress(self) -> ProgressEncoder[Grant]:
return ProgressEncoder(
def grants_progress(self) -> GrantProgressEncoder:
return GrantProgressEncoder(
self.sql_backend,
self.grant_ownership,
Grant,
self.parent_run_id,
self.workspace_id,
self.config.ucx_catalog,
Expand Down Expand Up @@ -221,23 +220,11 @@ def pipelines_progress(self) -> ProgressEncoder[PipelineInfo]:
)

@cached_property
def tables_progress(self) -> ProgressEncoder[Table]:
return ProgressEncoder(
def tables_progress(self) -> TableProgressEncoder:
return TableProgressEncoder(
self.sql_backend,
self.table_ownership,
Table,
self.parent_run_id,
self.workspace_id,
self.config.ucx_catalog,
)

@cached_property
def historical_table_migration_log(self) -> ProgressEncoder[TableMigrationStatus]:
# TODO: merge into tables_progress
return ProgressEncoder(
self.sql_backend,
self.table_migration_ownership,
TableMigrationStatus,
self.migration_status_refresher.index(force_refresh=False),
self.parent_run_id,
self.workspace_id,
self.config.ucx_catalog,
Expand Down
18 changes: 16 additions & 2 deletions src/databricks/labs/ucx/progress/grants.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,30 @@
from dataclasses import replace

from databricks.labs.ucx.hive_metastore.grants import Grant
from databricks.labs.lsql.backends import SqlBackend

from databricks.labs.ucx.hive_metastore.grants import Grant, GrantOwnership
from databricks.labs.ucx.progress.history import ProgressEncoder
from databricks.labs.ucx.progress.install import Historical


class GrantsProgressEncoder(ProgressEncoder[Grant]):
class GrantProgressEncoder(ProgressEncoder[Grant]):
"""Encoder class:Grant to class:History.

A failure for a grants implies it cannot be mapped to Unity Catalog.
"""

def __init__(
self,
sql_backend: SqlBackend,
ownership: GrantOwnership,
run_id: int,
workspace_id: int,
catalog: str,
schema: str = "multiworkspace",
table: str = "historical",
) -> None:
super().__init__(sql_backend, ownership, Grant, run_id, workspace_id, catalog, schema, table)

def _encode_record_as_historical(self, record: Grant) -> Historical:
historical = super()._encode_record_as_historical(record)
failures = []
Expand Down
54 changes: 54 additions & 0 deletions src/databricks/labs/ucx/progress/tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from dataclasses import replace

from databricks.labs.lsql.backends import SqlBackend

from databricks.labs.ucx.hive_metastore.tables import Table
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
from databricks.labs.ucx.hive_metastore.ownership import TableOwnership
from databricks.labs.ucx.progress.history import ProgressEncoder
from databricks.labs.ucx.progress.install import Historical


class TableProgressEncoder(ProgressEncoder[Table]):
"""Encoder class:Table to class:History.

A progress failure for a table means:
- the table is not migrated yet
- the associated grants have a failure
"""

def __init__(
self,
sql_backend: SqlBackend,
ownership: TableOwnership,
table_migration_index: TableMigrationIndex,
run_id: int,
workspace_id: int,
catalog: str,
schema: str = "multiworkspace",
table: str = "historical",
) -> None:
super().__init__(
sql_backend,
ownership,
Table,
run_id,
workspace_id,
catalog,
schema,
table,
)
self._table_migration_index = table_migration_index

def _encode_record_as_historical(self, record: Table) -> Historical:
"""Encode record as historical.

A table failure means that the table is pending migration. Grants are purposefully lef out, because a grant
might not be mappable to UC, like `READ_METADATA`, thus possibly resulting in false "pending migration" failure
for tables that are migrated to UC with their relevant grants also being migrated.
"""
historical = super()._encode_record_as_historical(record)
failures = []
if not self._table_migration_index.is_migrated(record.database, record.name):
failures.append("Pending migration")
return replace(historical, failures=historical.failures + failures)
17 changes: 7 additions & 10 deletions src/databricks/labs/ucx/progress/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ def crawl_tables(self, ctx: RuntimeContext) -> None:
ctx.tables_crawler.snapshot(force_refresh=True)

@job_task(depends_on=[verify_prerequisites, crawl_tables], job_cluster="table_migration")
def refresh_table_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not."""
ctx.migration_status_refresher.snapshot(force_refresh=True)

@job_task(
depends_on=[verify_prerequisites, crawl_tables, refresh_table_migration_status], job_cluster="table_migration"
)
def update_tables_history_log(self, ctx: RuntimeContext) -> None:
"""Update the history log with the latest tables inventory snapshot."""
# The table migration cluster is not legacy-ACL enabled, so we can't crawl from here.
Expand Down Expand Up @@ -136,16 +143,6 @@ def crawl_cluster_policies(self, ctx: RuntimeContext) -> None:
cluster_policies_snapshot = ctx.policies_crawler.snapshot(force_refresh=True)
history_log.append_inventory_snapshot(cluster_policies_snapshot)

@job_task(depends_on=[verify_prerequisites, crawl_tables, verify_prerequisites], job_cluster="table_migration")
def refresh_table_migration_status(self, ctx: RuntimeContext) -> None:
"""Scan the tables (and views) in the inventory and record whether each has been migrated or not.

The results of the scan are stored in the `$inventory.migration_status` inventory table.
"""
history_log = ctx.historical_table_migration_log
migration_status_snapshot = ctx.migration_status_refresher.snapshot(force_refresh=True)
history_log.append_inventory_snapshot(migration_status_snapshot)

@job_task(depends_on=[verify_prerequisites])
def assess_dashboards(self, ctx: RuntimeContext):
"""Scans all dashboards for migration issues in SQL code of embedded widgets.
Expand Down
12 changes: 5 additions & 7 deletions tests/unit/progress/test_grants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from databricks.labs.ucx.framework.owners import Ownership
from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.labs.ucx.hive_metastore.grants import Grant
from databricks.labs.ucx.progress.grants import GrantsProgressEncoder
from databricks.labs.ucx.progress.grants import GrantProgressEncoder


@pytest.mark.parametrize(
Expand All @@ -17,13 +17,12 @@
Grant("principal", "USAGE", "catalog"),
],
)
def test_grants_progress_encoder_no_failures(mock_backend, grant: Grant) -> None:
def test_grant_progress_encoder_no_failures(mock_backend, grant: Grant) -> None:
ownership = create_autospec(Ownership)
ownership.owner_of.return_value = "user"
encoder = GrantsProgressEncoder(
encoder = GrantProgressEncoder(
mock_backend,
ownership,
Grant,
run_id=1,
workspace_id=123456789,
catalog="test",
Expand All @@ -50,13 +49,12 @@ def test_grants_progress_encoder_no_failures(mock_backend, grant: Grant) -> None
),
],
)
def test_grants_progress_encoder_failures(mock_backend, grant: Grant, failure: str) -> None:
def test_grant_progress_encoder_failures(mock_backend, grant: Grant, failure: str) -> None:
ownership = create_autospec(Ownership)
ownership.owner_of.return_value = "user"
encoder = GrantsProgressEncoder(
encoder = GrantProgressEncoder(
mock_backend,
ownership,
Grant,
run_id=1,
workspace_id=123456789,
catalog="test",
Expand Down
62 changes: 62 additions & 0 deletions tests/unit/progress/test_tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from unittest.mock import create_autospec

import pytest

from databricks.labs.ucx.framework.owners import Ownership
from databricks.labs.ucx.framework.utils import escape_sql_identifier
from databricks.labs.ucx.hive_metastore.table_migration_status import TableMigrationIndex
from databricks.labs.ucx.hive_metastore.tables import Table
from databricks.labs.ucx.progress.grants import GrantProgressEncoder
from databricks.labs.ucx.progress.tables import TableProgressEncoder


@pytest.mark.parametrize(
"table",
[
Table("hive_metastore", "schema", "table", "MANAGED", "DELTA"),
],
)
def test_table_progress_encoder_no_failures(mock_backend, table: Table) -> None:
ownership = create_autospec(Ownership)
ownership.owner_of.return_value = "user"
table_migration_index = create_autospec(TableMigrationIndex)
table_migration_index.is_migrated.return_value = True
grant_progress_encoder = create_autospec(GrantProgressEncoder)
encoder = TableProgressEncoder(
mock_backend, ownership, table_migration_index, run_id=1, workspace_id=123456789, catalog="test"
)

encoder.append_inventory_snapshot([table])

rows = mock_backend.rows_written_for(escape_sql_identifier(encoder.full_name), "append")
assert len(rows) > 0, f"No rows written for: {encoder.full_name}"
assert len(rows[0].failures) == 0
ownership.owner_of.assert_called_once()
table_migration_index.is_migrated.assert_called_with(table.database, table.name)
grant_progress_encoder.assert_not_called()


@pytest.mark.parametrize(
"table",
[
Table("hive_metastore", "schema", "table", "MANAGED", "DELTA"),
],
)
def test_table_progress_encoder_pending_migration_failure(mock_backend, table: Table) -> None:
ownership = create_autospec(Ownership)
ownership.owner_of.return_value = "user"
table_migration_index = create_autospec(TableMigrationIndex)
table_migration_index.is_migrated.return_value = False
grant_progress_encoder = create_autospec(GrantProgressEncoder)
encoder = TableProgressEncoder(
mock_backend, ownership, table_migration_index, run_id=1, workspace_id=123456789, catalog="test"
)

encoder.append_inventory_snapshot([table])

rows = mock_backend.rows_written_for(escape_sql_identifier(encoder.full_name), "append")
assert len(rows) > 0, f"No rows written for: {encoder.full_name}"
assert rows[0].failures == ["Pending migration"]
ownership.owner_of.assert_called_once()
table_migration_index.is_migrated.assert_called_with(table.database, table.name)
grant_progress_encoder.assert_not_called()
5 changes: 0 additions & 5 deletions tests/unit/progress/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@
RuntimeContext.policies_crawler,
RuntimeContext.policies_progress,
),
(
MigrationProgress.refresh_table_migration_status,
RuntimeContext.migration_status_refresher,
RuntimeContext.historical_table_migration_log,
),
),
)
def test_migration_progress_runtime_refresh(run_workflow, task, crawler, history_log) -> None:
Expand Down