Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Dashboard widget to show the list of cluster policies along with DBR version #1013

Merged
merged 32 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7390c52
Cluster Policy Count
prajin-29 Mar 6, 2024
7144e16
Cluster Policy Count
prajin-29 Mar 6, 2024
dcaf034
Merge branch 'main' into feature/cluster_policy
prajin-29 Mar 6, 2024
da78acb
Merge branch 'main' into feature/cluster_policy
prajin-29 Mar 7, 2024
1b0946a
checking cluster policy
prajin-29 Mar 7, 2024
105a673
checking cluster policy
prajin-29 Mar 7, 2024
05f9715
checking cluster policy
prajin-29 Mar 7, 2024
6cd87fb
checking cluster policy
prajin-29 Mar 7, 2024
2b34e8c
checking cluster policy
prajin-29 Mar 7, 2024
2a4e733
checking cluster policy
prajin-29 Mar 7, 2024
3059f90
checking cluster policy
prajin-29 Mar 7, 2024
0a7b2dc
checking cluster policy
prajin-29 Mar 7, 2024
7c21bf2
changing based on review comments
prajin-29 Mar 8, 2024
69a8e19
changing based on review comments
prajin-29 Mar 8, 2024
6923b5e
Added Unit Test case
prajin-29 Mar 8, 2024
7f6d8b5
Added Unit Test case
prajin-29 Mar 8, 2024
74888dc
Added Unit Test case
prajin-29 Mar 11, 2024
49d2cae
Added Unit Test case
prajin-29 Mar 11, 2024
2d0e600
Adding unit test case for failure scenarios
prajin-29 Mar 11, 2024
e340f7d
Adding integration Test
prajin-29 Mar 11, 2024
a1fec9a
Merge branch 'main' into feature/cluster_policy
prajin-29 Mar 11, 2024
c174761
Resolving the unit test comments
prajin-29 Mar 11, 2024
499a9df
Resolving the unit test comments
prajin-29 Mar 11, 2024
df7ea2b
Resolving the integration test
prajin-29 Mar 12, 2024
b481af3
Resolving the integration test and unit test cases
prajin-29 Mar 12, 2024
b7e0c80
Merge branch 'main' into feature/cluster_policy
prajin-29 Mar 12, 2024
a7c5baa
Updated integration test with failure scenaris
prajin-29 Mar 13, 2024
859f7c0
Adding Upgrade script for cluster table
prajin-29 Mar 13, 2024
2df818d
Making the CI pass
prajin-29 Mar 14, 2024
28e56aa
Merge branch 'main' into feature/cluster_policy
prajin-29 Mar 14, 2024
cac127c
Merged the code with latest changes
prajin-29 Mar 14, 2024
ec3a667
Updating the sqlExecution using workspace_client api
prajin-29 Mar 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions src/databricks/labs/ucx/assessment/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class ClusterInfo:
cluster_id: str
success: int
failures: str
spark_version: str | None = None
policy_id: str | None = None
nfx marked this conversation as resolved.
Show resolved Hide resolved
cluster_name: str | None = None
creator: str | None = None

Expand Down Expand Up @@ -155,6 +157,8 @@ def _assess_clusters(self, all_clusters):
cluster_info = ClusterInfo(
cluster_id=cluster.cluster_id if cluster.cluster_id else "",
cluster_name=cluster.cluster_name,
policy_id=cluster.policy_id,
spark_version=cluster.spark_version,
creator=cluster.creator_user_name,
success=1,
failures="[]",
Expand All @@ -171,3 +175,58 @@ def snapshot(self) -> Iterable[ClusterInfo]:
def _try_fetch(self) -> Iterable[ClusterInfo]:
for row in self._fetch(f"SELECT * FROM {self._schema}.{self._table}"):
yield ClusterInfo(*row)


@dataclass
class PolicyInfo:
policy_id: str
policy_name: str
success: int
failures: str
spark_version: str | None = None
policy_description: str | None = None
creator: str | None = None


class PoliciesCrawler(CrawlerBase[PolicyInfo], CheckClusterMixin):
def __init__(self, ws: WorkspaceClient, sbe: SqlBackend, schema):
super().__init__(sbe, "hive_metastore", schema, "policies", PolicyInfo)
self._ws = ws

def _crawl(self) -> Iterable[PolicyInfo]:
all_policices = list(self._ws.cluster_policies.list())
return list(self._assess_policies(all_policices))

def _assess_policies(self, all_policices):
for policy in all_policices:
failures: list[str] = []
if policy.policy_id is None:
continue
failures.extend(self._check_cluster_policy(policy.policy_id, "policy"))
try:
spark_version = json.dumps(json.loads(policy.definition)["spark_version"])
except KeyError:
spark_version = None
policy_name = policy.name
creator_name = policy.creator_user_name

policy_info = PolicyInfo(
nfx marked this conversation as resolved.
Show resolved Hide resolved
policy_id=policy.policy_id,
policy_description=policy.description,
policy_name=policy_name,
spark_version=spark_version,
success=1,
failures="[]",
creator=creator_name,
)
if len(failures) > 0:
policy_info.success = 0
policy_info.failures = json.dumps(failures)
yield policy_info

def snapshot(self) -> Iterable[PolicyInfo]:
return self._snapshot(self._try_fetch, self._crawl)

def _try_fetch(self) -> Iterable[PolicyInfo]:
for row in self._fetch(f"SELECT * FROM {self._schema}.{self._table}"):
yield PolicyInfo(*row)
3 changes: 2 additions & 1 deletion src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@

from databricks.labs.ucx.__about__ import __version__
from databricks.labs.ucx.assessment.azure import AzureServicePrincipalInfo
from databricks.labs.ucx.assessment.clusters import ClusterInfo
from databricks.labs.ucx.assessment.clusters import ClusterInfo, PolicyInfo
from databricks.labs.ucx.assessment.init_scripts import GlobalInitScriptInfo
from databricks.labs.ucx.assessment.jobs import JobInfo, SubmitRunInfo
from databricks.labs.ucx.assessment.pipelines import PipelineInfo
Expand Down Expand Up @@ -162,6 +162,7 @@ def deploy_schema(sql_backend: SqlBackend, inventory_schema: str):
functools.partial(table, "workspace_objects", WorkspaceObjectInfo),
functools.partial(table, "permissions", Permissions),
functools.partial(table, "submit_runs", SubmitRunInfo),
functools.partial(table, "policies", PolicyInfo),
],
)
deployer.deploy_view("objects", "queries/views/objects.sql")
Expand Down
4 changes: 3 additions & 1 deletion src/databricks/labs/ucx/mixins/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,9 @@ def create(*, name: str | None = None, **kwargs):
name = f"sdk-{make_random(4)}"
if "definition" not in kwargs:
kwargs["definition"] = json.dumps(
{"spark_conf.spark.databricks.delta.preview.enabled": {"type": "fixed", "value": "true"}}
{
"spark_conf.spark.databricks.delta.preview.enabled": {"type": "fixed", "value": "true"},
}
)
cluster_policy = ws.cluster_policies.create(name, **kwargs)
logger.info(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- viz type=table, name=Cluster Policies, columns=policy_name,cluster_dbr_version,policy_spark_version
-- widget title=Cluster Policies, row=8, col=0, size_x=3, size_y=8
SELECT
distinct policy_name,
cluster.spark_version as cluster_dbr_version,
policy.spark_version as policy_spark_version
FROM $inventory.clusters as cluster
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to select from clusters if you are required to select only from policies?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From clusters table we will get the cluster DBR versions and from policies table we will get the policy spark version.
We are joining both so that its easy for a customer to compare both side by side and see if a policy spark version can be upgraded based on cluster DBR version

JOIN $inventory.policies as policy
ON cluster.policy_id=policy.policy_id
16 changes: 15 additions & 1 deletion src/databricks/labs/ucx/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from databricks.sdk import WorkspaceClient

from databricks.labs.ucx.assessment.azure import AzureServicePrincipalCrawler
from databricks.labs.ucx.assessment.clusters import ClustersCrawler
from databricks.labs.ucx.assessment.clusters import ClustersCrawler, PoliciesCrawler
from databricks.labs.ucx.assessment.init_scripts import GlobalInitScriptCrawler
from databricks.labs.ucx.assessment.jobs import JobsCrawler, SubmitRunsCrawler
from databricks.labs.ucx.assessment.pipelines import PipelinesCrawler
Expand Down Expand Up @@ -154,6 +154,19 @@ def assess_incompatible_submit_runs(cfg: WorkspaceConfig, ws: WorkspaceClient, s
crawler.snapshot()


@task("assessment")
def crawl_cluster_policies(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend):
"""This module scans through all the Cluster Policies and get the necessary information

It looks for:
- Clusters Policies with Databricks Runtime (DBR) version earlier than 11.3

Subsequently, a list of all the policies with matching configurations are stored in the
`$inventory.policies` table."""
crawler = PoliciesCrawler(ws, sql_backend, cfg.inventory_database)
crawler.snapshot()


@task("assessment", cloud="azure")
def assess_azure_service_principals(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBackend):
"""This module scans through all the clusters configurations, cluster policies, job cluster configurations,
Expand Down Expand Up @@ -237,6 +250,7 @@ def crawl_groups(cfg: WorkspaceConfig, ws: WorkspaceClient, sql_backend: SqlBack
assess_jobs,
assess_incompatible_submit_runs,
assess_clusters,
crawl_cluster_policies,
assess_azure_service_principals,
assess_pipelines,
assess_global_init_scripts,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# pylint: disable=invalid-name

import logging
from typing import Any

from databricks.labs.blueprint.installation import Installation
from databricks.sdk import WorkspaceClient

from databricks.labs.ucx.assessment.clusters import ClusterInfo
from databricks.labs.ucx.config import WorkspaceConfig
from databricks.labs.ucx.framework.crawlers import (
SchemaDeployer,
StatementExecutionBackend,
)

logger = logging.getLogger(__name__)


def upgrade(installation: Installation, ws: WorkspaceClient):
config = installation.load(WorkspaceConfig)
sql = f"DROP TABLE IF EXISTS hive_metastore.{config.inventory_database}.clusters"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you dropping the production database table? 😰

Alter table and add columns instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed the code to alter the table.
But why I used drop and recreate is that once we drop and recreate and then if customer run the assessment it should populate the correct data.

sql_backend = StatementExecutionBackend(ws=ws, warehouse_id=config.warehouse_id)
sql_backend.execute(sql=sql)
deploy_schema = SchemaDeployer(sql_backend=sql_backend, inventory_schema=config.inventory_database, mod=Any)
deploy_schema.deploy_table("clusters", ClusterInfo)
31 changes: 30 additions & 1 deletion tests/integration/assessment/test_clusters.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import json
from datetime import timedelta

from databricks.sdk.errors import NotFound
from databricks.sdk.retries import retried
from databricks.sdk.service.compute import DataSecurityMode

from databricks.labs.ucx.assessment.clusters import ClustersCrawler
from databricks.labs.ucx.assessment.clusters import ClustersCrawler, PoliciesCrawler

from .test_assessment import _SPARK_CONF

Expand Down Expand Up @@ -36,3 +37,31 @@ def test_cluster_crawler_no_isolation(ws, make_cluster, inventory_schema, sql_ba

assert len(results) == 1
assert results[0].failures == '["No isolation shared clusters not supported in UC"]'


@retried(on=[NotFound], timeout=timedelta(minutes=6))
def test_policy_crawler(ws, make_cluster_policy, inventory_schema, sql_backend):
created_policy = make_cluster_policy(
name="test_policy_check",
definition=json.dumps({"spark_version": {'type': 'fixed', 'value': '14.3.x-scala2.12'}}),
)
policy_definition = {
"spark_version": {'type': 'fixed', 'value': '14.3.x-scala2.12'},
"spark_conf.fs.azure.account.auth.type": {"type": "fixed", "value": "OAuth", "hidden": True},
}
created_policy_2 = make_cluster_policy(name="test_policy_check2", definition=json.dumps(policy_definition))
policy_crawler = PoliciesCrawler(ws=ws, sbe=sql_backend, schema=inventory_schema)
policies = policy_crawler.snapshot()
results = []
for policy in policies:
if policy.policy_id in (created_policy.policy_id, created_policy_2.policy_id):
results.append(policy)

assert results[0].policy_name == "test_policy_check"
assert results[0].success == 1
assert results[0].failures == "[]"
assert results[0].spark_version == json.dumps({'type': 'fixed', 'value': '14.3.x-scala2.12'})
nfx marked this conversation as resolved.
Show resolved Hide resolved

assert results[1].policy_name == "test_policy_check2"
assert results[1].success == 0
assert results[1].failures == '["Uses azure service principal credentials config in policy."]'
3 changes: 3 additions & 0 deletions tests/unit/assessment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def _load_fixture(filename: str):
BaseRun: '../assessment/jobruns',
ClusterDetails: '../assessment/clusters',
PipelineStateInfo: '../assessment/pipelines',
Policy: '../assessment/policies',
}


Expand Down Expand Up @@ -77,11 +78,13 @@ def workspace_client_mock(
pipeline_ids: list[str] | None = None,
job_ids: list[str] | None = None,
jobruns_ids: list[str] | None = None,
policy_ids: list[str] | None = None,
warehouse_config="single-config.json",
secret_exists=True,
):
ws = create_autospec(WorkspaceClient)
ws.clusters.list.return_value = _id_list(ClusterDetails, cluster_ids)
ws.cluster_policies.list.return_value = _id_list(Policy, policy_ids)
ws.cluster_policies.get = _cluster_policy
ws.pipelines.list_pipelines.return_value = _id_list(PipelineStateInfo, pipeline_ids)
ws.pipelines.get = _pipeline
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"policy_id": "123",
"definition": "{\"azure_attributes.availability\": {\"type\": \"fixed\", \"value\": \"ON_DEMAND_AZURE\", \"hidden\": true}}"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"policy_id": "single-user-with-spn-policyid",
"definition": "{\"spark_version\": {\"type\": \"unlimited\",\"defaultValue\": \"auto:latest-ml\"},\"spark_conf.fs.azure.account.auth.type\": {\"type\": \"fixed\", \"value\": \"OAuth\", \"hidden\": true}}",
"policy_family_definition_overrides": {
"spark_conf.fs.azure.account.auth.type": {
"type": "fixed",
"value": "OAuth",
"hidden": true
},
"not.a.type": {
"type": "fixed",
"value": "not.a.matching.type",
"hidden": true
},
"not.a.matching.type": {
"type": "fixed",
"value": "https://login.microsoftonline.com/1234ededed/oauth2/token",
"hidden": true
}
},
"name": "test_policy",
"description": "test",
"creator_user_name": "test_creator"
}
58 changes: 57 additions & 1 deletion tests/unit/assessment/test_clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
from databricks.sdk.service.compute import AutoScale, ClusterDetails, ClusterSource

from databricks.labs.ucx.assessment.azure import AzureServicePrincipalCrawler
from databricks.labs.ucx.assessment.clusters import ClusterInfo, ClustersCrawler
from databricks.labs.ucx.assessment.clusters import (
ClusterInfo,
ClustersCrawler,
PoliciesCrawler,
)

from ..framework.mocks import MockBackend
from . import workspace_client_mock
Expand Down Expand Up @@ -112,8 +116,10 @@ def test_cluster_without_owner_should_have_empty_creator_name():
assert result == [
ClusterInfo(
cluster_id="simplest-autoscale",
policy_id="single-user-with-spn",
nfx marked this conversation as resolved.
Show resolved Hide resolved
cluster_name="Simplest Shared Autoscale",
creator=None,
spark_version="13.3.x-cpu-ml-scala2.12",
success=1,
failures='[]',
)
Expand Down Expand Up @@ -169,3 +175,53 @@ def test_unsupported_clusters():
result_set = list(crawler.snapshot())
assert len(result_set) == 1
assert result_set[0].failures == '["cluster type not supported : LEGACY_PASSTHROUGH"]'


def test_policy_crawler():
ws = workspace_client_mock(
policy_ids=['single-user-with-spn', 'single-user-with-spn-policyid', 'single-user-with-spn-no-sparkversion'],
)

crawler = PoliciesCrawler(ws, MockBackend(), "ucx")
result_set = list(crawler.snapshot())
failures = json.loads(result_set[0].failures)
assert len(result_set) == 2
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and what are the failures?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking for failure also.

assert "Uses azure service principal credentials config in policy." in failures


def test_policy_try_fetch():
ws = workspace_client_mock(policy_ids=['single-user-with-spn-policyid'])
mock_backend = MockBackend(
rows={
r"SELECT \* FROM ucx.policies": [
(
"single-user-with-spn-policyid",
"test_policy",
1,
"[]",
json.dumps({"type": "unlimited", "defaultValue": "auto:latest-ml"}),
"test",
"test_creator",
)
]
}
)
crawler = PoliciesCrawler(ws, mock_backend, "ucx")
result_set = list(crawler.snapshot())

assert len(result_set) == 1
assert result_set[0].policy_id == "single-user-with-spn-policyid"
assert result_set[0].policy_name == "test_policy"
assert result_set[0].spark_version == json.dumps({"type": "unlimited", "defaultValue": "auto:latest-ml"})
assert result_set[0].policy_description == "test"
assert result_set[0].creator == "test_creator"


def test_policy_without_failure():
ws = workspace_client_mock(
policy_ids=['single-user-with-spn-no-sparkversion'],
)

crawler = PoliciesCrawler(ws, MockBackend(), "ucx")
result_set = list(crawler.snapshot())
assert result_set[0].failures == '[]'
Loading