Skip to content

Commit

Permalink
Added crawler for Azure Service principals used for direct storage ac…
Browse files Browse the repository at this point in the history
…cess (#305)

Fixes #249
  • Loading branch information
dipankarkush-db authored Sep 28, 2023
1 parent 2e8a880 commit 82bdd33
Show file tree
Hide file tree
Showing 2 changed files with 425 additions and 4 deletions.
88 changes: 88 additions & 0 deletions src/databricks/labs/ucx/assessment/crawlers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import re
from dataclasses import dataclass

from databricks.sdk import WorkspaceClient
Expand All @@ -13,6 +14,17 @@
"spark.databricks.hive.metastore.glueCatalog.enabled",
]

_AZURE_SP_CONF = [
"fs.azure.account.key",
"fs.azure.account.auth.type",
"fs.azure.account.oauth.provider.type",
"fs.azure.account.oauth2.client.id",
"fs.azure.account.oauth2.client.secret",
"fs.azure.account.oauth2.client.endpoint",
]

_AZURE_SP_CONF_FAILURE_MSG = "Uses azure service principal credentials config in "


@dataclass
class JobInfo:
Expand All @@ -32,6 +44,23 @@ class ClusterInfo:
failures: str


@dataclass
class PipelineInfo:
pipeline_id: str
pipeline_name: str
creator_name: str
success: int
failures: str


def _azure_sp_conf_present_check(config: dict) -> bool:
for key in config.keys():
for conf in _AZURE_SP_CONF:
if re.search(conf, key):
return True
return False


def spark_version_compatibility(spark_version: str) -> str:
first_comp_custom_rt = 3
first_comp_custom_x = 2
Expand All @@ -51,6 +80,37 @@ def spark_version_compatibility(spark_version: str) -> str:
return "supported"


class PipelinesCrawler(CrawlerBase):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(sbe, "hive_metastore", schema, "pipelines")
self._ws = ws

def _crawl(self) -> list[PipelineInfo]:
all_pipelines = list(self._ws.pipelines.list_pipelines())
return list(self._assess_pipelines(all_pipelines))

def _assess_pipelines(self, all_pipelines):
for pipeline in all_pipelines:
pipeline_info = PipelineInfo(pipeline.pipeline_id, pipeline.name, pipeline.creator_user_name, 1, "")
failures = []
pipeline_config = self._ws.pipelines.get(pipeline.pipeline_id).spec.configuration
if pipeline_config:
if _azure_sp_conf_present_check(pipeline_config):
failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} pipeline.")

pipeline_info.failures = json.dumps(failures)
if len(failures) > 0:
pipeline_info.success = 0
yield pipeline_info

def snapshot(self) -> list[PipelineInfo]:
return self._snapshot(self._try_fetch, self._crawl)

def _try_fetch(self) -> list[PipelineInfo]:
for row in self._fetch(f"SELECT * FROM {self._schema}.{self._table}"):
yield PipelineInfo(*row)


class ClustersCrawler(CrawlerBase):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(sbe, "hive_metastore", schema, "clusters")
Expand Down Expand Up @@ -78,6 +138,20 @@ def _assess_clusters(self, all_clusters):
for value in cluster.spark_conf.values():
if "dbfs:/mnt" in value or "/dbfs/mnt" in value:
failures.append(f"using DBFS mount in configuration: {value}")

# Checking if Azure cluster config is present in spark config
if _azure_sp_conf_present_check(cluster.spark_conf):
failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.")

# Checking if Azure cluster config is present in cluster policies
if cluster.policy_id:
policy = self._ws.cluster_policies.get(cluster.policy_id)
if _azure_sp_conf_present_check(json.loads(policy.definition)):
failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.")
if policy.policy_family_definition_overrides:
if _azure_sp_conf_present_check(json.loads(policy.policy_family_definition_overrides)):
failures.append(f"{_AZURE_SP_CONF_FAILURE_MSG} cluster.")

cluster_info.failures = json.dumps(failures)
if len(failures) > 0:
cluster_info.success = 0
Expand Down Expand Up @@ -139,6 +213,20 @@ def _assess_jobs(self, all_jobs: list[BaseJob], all_clusters_by_id) -> list[JobI
for value in cluster_config.spark_conf.values():
if "dbfs:/mnt" in value or "/dbfs/mnt" in value:
job_assessment[job.job_id].add(f"using DBFS mount in configuration: {value}")

# Checking if Azure cluster config is present in spark config
if _azure_sp_conf_present_check(cluster_config.spark_conf):
job_assessment[job.job_id].add(f"{_AZURE_SP_CONF_FAILURE_MSG} Job cluster.")

# Checking if Azure cluster config is present in cluster policies
if cluster_config.policy_id:
policy = self._ws.cluster_policies.get(cluster_config.policy_id)
if _azure_sp_conf_present_check(json.loads(policy.definition)):
job_assessment[job.job_id].add(f"{_AZURE_SP_CONF_FAILURE_MSG} Job cluster.")
if policy.policy_family_definition_overrides:
if _azure_sp_conf_present_check(json.loads(policy.policy_family_definition_overrides)):
job_assessment[job.job_id].add(f"{_AZURE_SP_CONF_FAILURE_MSG} Job cluster.")

for job_key in job_details.keys():
job_details[job_key].failures = json.dumps(list(job_assessment[job_key]))
if len(job_assessment[job_key]) > 0:
Expand Down
Loading

0 comments on commit 82bdd33

Please sign in to comment.