diff --git a/src/databricks/labs/ucx/assessment/clusters.py b/src/databricks/labs/ucx/assessment/clusters.py index ddef444ff6..07d15bc832 100644 --- a/src/databricks/labs/ucx/assessment/clusters.py +++ b/src/databricks/labs/ucx/assessment/clusters.py @@ -37,6 +37,8 @@ class ClusterInfo: cluster_id: str success: int failures: str + spark_version: str | None = None + policy_id: str | None = None cluster_name: str | None = None creator: str | None = None @@ -156,6 +158,8 @@ def _assess_clusters(self, all_clusters): cluster_info = ClusterInfo( cluster_id=cluster.cluster_id if cluster.cluster_id else "", cluster_name=cluster.cluster_name, + policy_id=cluster.policy_id, + spark_version=cluster.spark_version, creator=cluster.creator_user_name, success=1, failures="[]", @@ -172,3 +176,58 @@ def snapshot(self) -> Iterable[ClusterInfo]: def _try_fetch(self) -> Iterable[ClusterInfo]: for row in self._fetch(f"SELECT * FROM {self._schema}.{self._table}"): yield ClusterInfo(*row) + + +@dataclass +class PolicyInfo: + policy_id: str + policy_name: str + success: int + failures: str + spark_version: str | None = None + policy_description: str | None = None + creator: str | None = None + + +class PoliciesCrawler(CrawlerBase[PolicyInfo], CheckClusterMixin): + def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema): + super().__init__(sbe, "hive_metastore", schema, "policies", PolicyInfo) + self._ws = ws + + def _crawl(self) -> Iterable[PolicyInfo]: + all_policices = list(self._ws.cluster_policies.list()) + return list(self._assess_policies(all_policices)) + + def _assess_policies(self, all_policices): + for policy in all_policices: + failures: list[str] = [] + if policy.policy_id is None: + continue + failures.extend(self._check_cluster_policy(policy.policy_id, "policy")) + try: + spark_version = json.dumps(json.loads(policy.definition)["spark_version"]) + except KeyError: + spark_version = None + policy_name = policy.name + creator_name = policy.creator_user_name + + policy_info = PolicyInfo( + policy_id=policy.policy_id, + policy_description=policy.description, + policy_name=policy_name, + spark_version=spark_version, + success=1, + failures="[]", + creator=creator_name, + ) + if len(failures) > 0: + policy_info.success = 0 + policy_info.failures = json.dumps(failures) + yield policy_info + + def snapshot(self) -> Iterable[PolicyInfo]: + return self._snapshot(self._try_fetch, self._crawl) + + def _try_fetch(self) -> Iterable[PolicyInfo]: + for row in self._fetch(f"SELECT * FROM {self._schema}.{self._table}"): + yield PolicyInfo(*row) diff --git a/src/databricks/labs/ucx/install.py b/src/databricks/labs/ucx/install.py index 0d4affdd0d..73b7c6da19 100644 --- a/src/databricks/labs/ucx/install.py +++ b/src/databricks/labs/ucx/install.py @@ -56,7 +56,7 @@ from databricks.labs.ucx.__about__ import __version__ from databricks.labs.ucx.assessment.azure import AzureServicePrincipalInfo -from databricks.labs.ucx.assessment.clusters import ClusterInfo +from databricks.labs.ucx.assessment.clusters import ClusterInfo, PolicyInfo from databricks.labs.ucx.assessment.init_scripts import GlobalInitScriptInfo from databricks.labs.ucx.assessment.jobs import JobInfo, SubmitRunInfo from databricks.labs.ucx.assessment.pipelines import PipelineInfo @@ -160,6 +160,7 @@ def deploy_schema(sql_backend: SqlBackend, inventory_schema: str): functools.partial(table, "workspace_objects", WorkspaceObjectInfo), functools.partial(table, "permissions", Permissions), functools.partial(table, "submit_runs", SubmitRunInfo), + functools.partial(table, "policies", PolicyInfo), functools.partial(table, "migration_status", MigrationStatus), ], ) diff --git a/src/databricks/labs/ucx/mixins/fixtures.py b/src/databricks/labs/ucx/mixins/fixtures.py index 9a400ae159..ae0d0999ed 100644 --- a/src/databricks/labs/ucx/mixins/fixtures.py +++ b/src/databricks/labs/ucx/mixins/fixtures.py @@ -647,7 +647,9 @@ def create(*, name: str | None = None, **kwargs): name = f"sdk-{make_random(4)}" if "definition" not in kwargs: kwargs["definition"] = json.dumps( - {"spark_conf.spark.databricks.delta.preview.enabled": {"type": "fixed", "value": "true"}} + { + "spark_conf.spark.databricks.delta.preview.enabled": {"type": "fixed", "value": "true"}, + } ) cluster_policy = ws.cluster_policies.create(name, **kwargs) logger.info( diff --git a/src/databricks/labs/ucx/queries/assessment/main/08_0_cluster_policies.sql b/src/databricks/labs/ucx/queries/assessment/main/08_0_cluster_policies.sql new file mode 100644 index 0000000000..f6284fa4be --- /dev/null +++ b/src/databricks/labs/ucx/queries/assessment/main/08_0_cluster_policies.sql @@ -0,0 +1,9 @@ +-- viz type=table, name=Cluster Policies, columns=policy_name,cluster_dbr_version,policy_spark_version +-- widget title=Cluster Policies, row=8, col=0, size_x=3, size_y=8 +SELECT + distinct policy_name, + cluster.spark_version as cluster_dbr_version, + policy.spark_version as policy_spark_version +FROM $inventory.clusters as cluster +JOIN $inventory.policies as policy +ON cluster.policy_id=policy.policy_id \ No newline at end of file diff --git a/src/databricks/labs/ucx/runtime.py b/src/databricks/labs/ucx/runtime.py index e6c72f8646..9419c6fdec 100644 --- a/src/databricks/labs/ucx/runtime.py +++ b/src/databricks/labs/ucx/runtime.py @@ -6,7 +6,7 @@ from databricks.sdk import WorkspaceClient from databricks.labs.ucx.assessment.azure import AzureServicePrincipalCrawler -from databricks.labs.ucx.assessment.clusters import ClustersCrawler +from databricks.labs.ucx.assessment.clusters import ClustersCrawler, PoliciesCrawler from databricks.labs.ucx.assessment.init_scripts import GlobalInitScriptCrawler from databricks.labs.ucx.assessment.jobs import JobsCrawler, SubmitRunsCrawler from databricks.labs.ucx.assessment.pipelines import PipelinesCrawler @@ -154,6 +154,19 @@ def assess_incompatible_submit_runs(cfg: WorkspaceConfig, ws: WorkspaceClient, s crawler.snapshot() +@task("assessment") +def crawl_cluster_policies(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend): + """This module scans through all the Cluster Policies and get the necessary information + + It looks for: + - Clusters Policies with Databricks Runtime (DBR) version earlier than 11.3 + + Subsequently, a list of all the policies with matching configurations are stored in the + `$inventory.policies` table.""" + crawler = PoliciesCrawler(ws, sql_backend, cfg.inventory_database) + crawler.snapshot() + + @task("assessment", cloud="azure") def assess_azure_service_principals(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend): """This module scans through all the clusters configurations, cluster policies, job cluster configurations, @@ -237,6 +250,7 @@ def crawl_groups(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBack assess_jobs, assess_incompatible_submit_runs, assess_clusters, + crawl_cluster_policies, assess_azure_service_principals, assess_pipelines, assess_global_init_scripts, diff --git a/src/databricks/labs/ucx/upgrades/v0.16.0_changing_cluster_table_schema.py b/src/databricks/labs/ucx/upgrades/v0.16.0_changing_cluster_table_schema.py new file mode 100644 index 0000000000..dfd7b3ec0a --- /dev/null +++ b/src/databricks/labs/ucx/upgrades/v0.16.0_changing_cluster_table_schema.py @@ -0,0 +1,17 @@ +# pylint: disable=invalid-name + +import logging + +from databricks.labs.blueprint.installation import Installation +from databricks.sdk import WorkspaceClient + +from databricks.labs.ucx.config import WorkspaceConfig + +logger = logging.getLogger(__name__) + + +def upgrade(installation: Installation, ws: WorkspaceClient): + config = installation.load(WorkspaceConfig) + warehouse_id = str(config.warehouse_id) + sql = f"ALTER TABLE hive_metastore.{config.inventory_database}.clusters ADD COLUMNS(policy_id string,spark_version string)" + ws.statement_execution.execute_statement(sql, warehouse_id=warehouse_id) diff --git a/tests/integration/assessment/test_clusters.py b/tests/integration/assessment/test_clusters.py index 61da777583..6855531421 100644 --- a/tests/integration/assessment/test_clusters.py +++ b/tests/integration/assessment/test_clusters.py @@ -1,10 +1,11 @@ +import json from datetime import timedelta from databricks.sdk.errors import NotFound from databricks.sdk.retries import retried from databricks.sdk.service.compute import DataSecurityMode -from databricks.labs.ucx.assessment.clusters import ClustersCrawler +from databricks.labs.ucx.assessment.clusters import ClustersCrawler, PoliciesCrawler from .test_assessment import _SPARK_CONF @@ -36,3 +37,31 @@ def test_cluster_crawler_no_isolation(ws, make_cluster, inventory_schema, sql_ba assert len(results) == 1 assert results[0].failures == '["No isolation shared clusters not supported in UC"]' + + +@retried(on=[NotFound], timeout=timedelta(minutes=6)) +def test_policy_crawler(ws, make_cluster_policy, inventory_schema, sql_backend): + created_policy = make_cluster_policy( + name="test_policy_check", + definition=json.dumps({"spark_version": {'type': 'fixed', 'value': '14.3.x-scala2.12'}}), + ) + policy_definition = { + "spark_version": {'type': 'fixed', 'value': '14.3.x-scala2.12'}, + "spark_conf.fs.azure.account.auth.type": {"type": "fixed", "value": "OAuth", "hidden": True}, + } + created_policy_2 = make_cluster_policy(name="test_policy_check2", definition=json.dumps(policy_definition)) + policy_crawler = PoliciesCrawler(ws=ws, sbe=sql_backend, schema=inventory_schema) + policies = policy_crawler.snapshot() + results = [] + for policy in policies: + if policy.policy_id in (created_policy.policy_id, created_policy_2.policy_id): + results.append(policy) + + assert results[0].policy_name == "test_policy_check" + assert results[0].success == 1 + assert results[0].failures == "[]" + assert results[0].spark_version == json.dumps({'type': 'fixed', 'value': '14.3.x-scala2.12'}) + + assert results[1].policy_name == "test_policy_check2" + assert results[1].success == 0 + assert results[1].failures == '["Uses azure service principal credentials config in policy."]' diff --git a/tests/unit/assessment/__init__.py b/tests/unit/assessment/__init__.py index c7ab912bee..36890d0d9d 100644 --- a/tests/unit/assessment/__init__.py +++ b/tests/unit/assessment/__init__.py @@ -40,6 +40,7 @@ def _load_fixture(filename: str): BaseRun: '../assessment/jobruns', ClusterDetails: '../assessment/clusters', PipelineStateInfo: '../assessment/pipelines', + Policy: '../assessment/policies', } @@ -77,11 +78,13 @@ def workspace_client_mock( pipeline_ids: list[str] | None = None, job_ids: list[str] | None = None, jobruns_ids: list[str] | None = None, + policy_ids: list[str] | None = None, warehouse_config="single-config.json", secret_exists=True, ): ws = create_autospec(WorkspaceClient) ws.clusters.list.return_value = _id_list(ClusterDetails, cluster_ids) + ws.cluster_policies.list.return_value = _id_list(Policy, policy_ids) ws.cluster_policies.get = _cluster_policy ws.pipelines.list_pipelines.return_value = _id_list(PipelineStateInfo, pipeline_ids) ws.pipelines.get = _pipeline diff --git a/tests/unit/assessment/policies/single-user-with-spn-no-sparkversion.json b/tests/unit/assessment/policies/single-user-with-spn-no-sparkversion.json new file mode 100644 index 0000000000..7dec562bc4 --- /dev/null +++ b/tests/unit/assessment/policies/single-user-with-spn-no-sparkversion.json @@ -0,0 +1,7 @@ +{ + "policy_id": "single-user-with-spn-no-sparkversion", + "name": "test_policy", + "definition": "{\"azure_attributes.availability\": {\"type\": \"fixed\", \"value\": \"ON_DEMAND_AZURE\", \"hidden\": true}}", + "policy_family_definition_overrides":{ + } +} \ No newline at end of file diff --git a/tests/unit/assessment/policies/single-user-with-spn-policyid.json b/tests/unit/assessment/policies/single-user-with-spn-policyid.json new file mode 100644 index 0000000000..5658fb818d --- /dev/null +++ b/tests/unit/assessment/policies/single-user-with-spn-policyid.json @@ -0,0 +1,24 @@ +{ + "policy_id": "single-user-with-spn-policyid", + "definition": "{\"spark_version\": {\"type\": \"unlimited\",\"defaultValue\": \"auto:latest-ml\"},\"spark_conf.fs.azure.account.auth.type\": {\"type\": \"fixed\", \"value\": \"OAuth\", \"hidden\": true}}", + "policy_family_definition_overrides": { + "spark_conf.fs.azure.account.auth.type": { + "type": "fixed", + "value": "OAuth", + "hidden": true + }, + "not.a.type": { + "type": "fixed", + "value": "not.a.matching.type", + "hidden": true + }, + "not.a.matching.type": { + "type": "fixed", + "value": "https://login.microsoftonline.com/1234ededed/oauth2/token", + "hidden": true + } + }, + "name": "test_policy", + "description": "test", + "creator_user_name": "test_creator" +} \ No newline at end of file diff --git a/tests/unit/assessment/test_clusters.py b/tests/unit/assessment/test_clusters.py index a487a64767..fc422db75e 100644 --- a/tests/unit/assessment/test_clusters.py +++ b/tests/unit/assessment/test_clusters.py @@ -8,7 +8,7 @@ from databricks.sdk.service.compute import AutoScale, ClusterDetails, ClusterSource from databricks.labs.ucx.assessment.azure import AzureServicePrincipalCrawler -from databricks.labs.ucx.assessment.clusters import ClustersCrawler +from databricks.labs.ucx.assessment.clusters import ClustersCrawler, PoliciesCrawler from databricks.labs.ucx.framework.crawlers import SqlBackend from . import workspace_client_mock @@ -109,10 +109,12 @@ def test_cluster_without_owner_should_have_empty_creator_name(): assert result == [ Row( cluster_id="simplest-autoscale", - success=1, - failures='[]', + policy_id="single-user-with-spn", cluster_name="Simplest Shared Autoscale", creator=None, + spark_version="13.3.x-cpu-ml-scala2.12", + success=1, + failures='[]', ) ] @@ -166,3 +168,54 @@ def test_unsupported_clusters(): result_set = list(crawler.snapshot()) assert len(result_set) == 1 assert result_set[0].failures == '["cluster type not supported : LEGACY_PASSTHROUGH"]' + + +def test_policy_crawler(): + ws = workspace_client_mock( + policy_ids=['single-user-with-spn', 'single-user-with-spn-policyid', 'single-user-with-spn-no-sparkversion'], + ) + + sql_backend = create_autospec(SqlBackend) + crawler = PoliciesCrawler(ws, sql_backend, "ucx") + result_set = list(crawler.snapshot()) + failures = json.loads(result_set[0].failures) + assert len(result_set) == 2 + assert "Uses azure service principal credentials config in policy." in failures + + +def test_policy_try_fetch(): + ws = workspace_client_mock(policy_ids=['single-user-with-spn-policyid']) + mock_backend = MockBackend( + rows={ + r"SELECT \* FROM ucx.policies": [ + ( + "single-user-with-spn-policyid", + "test_policy", + 1, + "[]", + json.dumps({"type": "unlimited", "defaultValue": "auto:latest-ml"}), + "test", + "test_creator", + ) + ] + } + ) + crawler = PoliciesCrawler(ws, mock_backend, "ucx") + result_set = list(crawler.snapshot()) + + assert len(result_set) == 1 + assert result_set[0].policy_id == "single-user-with-spn-policyid" + assert result_set[0].policy_name == "test_policy" + assert result_set[0].spark_version == json.dumps({"type": "unlimited", "defaultValue": "auto:latest-ml"}) + assert result_set[0].policy_description == "test" + assert result_set[0].creator == "test_creator" + + +def test_policy_without_failure(): + ws = workspace_client_mock( + policy_ids=['single-user-with-spn-no-sparkversion'], + ) + + crawler = PoliciesCrawler(ws, MockBackend(), "ucx") + result_set = list(crawler.snapshot()) + assert result_set[0].failures == '[]'