diff --git a/src/databricks/labs/ucx/assessment/crawlers.py b/src/databricks/labs/ucx/assessment/crawlers.py index 33372e25dc..56e5b6543a 100644 --- a/src/databricks/labs/ucx/assessment/crawlers.py +++ b/src/databricks/labs/ucx/assessment/crawlers.py @@ -1,13 +1,17 @@ import json +import logging import re from dataclasses import dataclass from databricks.sdk import WorkspaceClient +from databricks.sdk.core import DatabricksError from databricks.sdk.service.compute import ClusterSource from databricks.sdk.service.jobs import BaseJob from databricks.labs.ucx.framework.crawlers import CrawlerBase, SqlBackend +logger = logging.getLogger(__name__) + INCOMPATIBLE_SPARK_CONFIG_KEYS = [ "spark.databricks.passthrough.enabled", "spark.hadoop.javax.jdo.option.ConnectionURL", @@ -145,12 +149,15 @@ def _assess_clusters(self, all_clusters): # Checking if Azure cluster config is present in cluster policies if cluster.policy_id: - policy = self._ws.cluster_policies.get(cluster.policy_id) - if _azure_sp_conf_present_check(json.loads(policy.definition)): - failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.") - if policy.policy_family_definition_overrides: - if _azure_sp_conf_present_check(json.loads(policy.policy_family_definition_overrides)): + try: + policy = self._ws.cluster_policies.get(cluster.policy_id) + if _azure_sp_conf_present_check(json.loads(policy.definition)): failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.") + if policy.policy_family_definition_overrides: + if _azure_sp_conf_present_check(json.loads(policy.policy_family_definition_overrides)): + failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.") + except DatabricksError as err: + logger.warning(f"Error retrieving cluster policy {cluster.policy_id}. Error: {err}") cluster_info.failures = json.dumps(failures) if len(failures) > 0: @@ -220,12 +227,15 @@ def _assess_jobs(self, all_jobs: list[BaseJob], all_clusters_by_id) -> list[JobI # Checking if Azure cluster config is present in cluster policies if cluster_config.policy_id: - policy = self._ws.cluster_policies.get(cluster_config.policy_id) - if _azure_sp_conf_present_check(json.loads(policy.definition)): - job_assessment[job.job_id].add(f"{_AZURE_SP_CONF_FAILURE_MSG} Job cluster.") - if policy.policy_family_definition_overrides: - if _azure_sp_conf_present_check(json.loads(policy.policy_family_definition_overrides)): + try: + policy = self._ws.cluster_policies.get(cluster_config.policy_id) + if _azure_sp_conf_present_check(json.loads(policy.definition)): job_assessment[job.job_id].add(f"{_AZURE_SP_CONF_FAILURE_MSG} Job cluster.") + if policy.policy_family_definition_overrides: + if _azure_sp_conf_present_check(json.loads(policy.policy_family_definition_overrides)): + job_assessment[job.job_id].add(f"{_AZURE_SP_CONF_FAILURE_MSG} Job cluster.") + except DatabricksError as err: + logger.warning(f"Error retrieving cluster policy {cluster_config.policy_id}. Error: {err}") for job_key in job_details.keys(): job_details[job_key].failures = json.dumps(list(job_assessment[job_key])) diff --git a/tests/unit/assessment/test_assessment.py b/tests/unit/assessment/test_assessment.py index 828dd10324..2df960bf33 100644 --- a/tests/unit/assessment/test_assessment.py +++ b/tests/unit/assessment/test_assessment.py @@ -1,5 +1,6 @@ from unittest.mock import Mock +from databricks.sdk.core import DatabricksError from databricks.sdk.service.compute import AutoScale, ClusterDetails, ClusterSource from databricks.sdk.service.jobs import BaseJob, JobSettings, NotebookTask, Task from databricks.sdk.service.pipelines import PipelineState, PipelineStateInfo @@ -395,6 +396,24 @@ def test_cluster_assessment_cluster_policy_no_spark_conf(mocker): assert result_set1[0].success == 1 +def test_cluster_assessment_cluster_policy_not_found(mocker): + sample_clusters1 = [ + ClusterDetails( + cluster_name="cluster1", + autoscale=AutoScale(min_workers=1, max_workers=6), + spark_context_id=5134472582179565315, + spark_env_vars=None, + policy_id="D96308F1BF0003A8", + spark_version="13.3.x-cpu-ml-scala2.12", + cluster_id="0915-190044-3dqy6751", + ) + ] + ws = Mock() + ws.cluster_policies.get.side_effect = DatabricksError(error="NO_POLICY", error_code="NO_POLICY") + crawler = ClustersCrawler(ws, MockBackend(), "ucx")._assess_clusters(sample_clusters1) + list(crawler) + + def test_pipeline_assessment_with_config(mocker): sample_pipelines = [ PipelineStateInfo(