-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added databricks labs ucx cluster-remap
command to remap legacy cluster configurations to UC-compatible
#994
Merged
Merged
Changes from all commits
Commits
Show all changes
34 commits
Select commit
Hold shift + click to select a range
321f09a
Adding command to Remap the cluster to UC
prajin-29 878ac55
Adding command to Remap the cluster to UC
prajin-29 6dfa642
Adding command to Remap the cluster to UC
prajin-29 89dc542
Adding command to Remap the cluster to UC
prajin-29 54ca4db
Adding command to Remap the cluster to UC
prajin-29 421c31f
Adding command to Remap the cluster to UC
prajin-29 7f685e3
Adding command to Remap the cluster to UC
prajin-29 a84421e
Adding command to Remap the cluster to UC
prajin-29 41ac6b6
writing unit test
prajin-29 881d54a
writing unit test
prajin-29 81a2af0
writing unit test
prajin-29 d62ae3f
Merge branch 'main' into feature/cluster_remap_command
prajin-29 8a4f46b
writing unit test
prajin-29 509f5bf
writing unit test
prajin-29 08a8072
Adding Integration Testing
prajin-29 b8b96bb
Merge branch 'main' into feature/cluster_remap_command
prajin-29 8724726
Creating revert cluster remap command
prajin-29 40b132f
Creating revert cluster remap command
prajin-29 9aea0f6
Creating revert cluster remap command
prajin-29 5d2a388
creating Unit Test cases
prajin-29 4e858f8
creating Unit Test cases
prajin-29 126cd0a
Changing the logic for iterating to all the clusters
prajin-29 cc417fa
Changing the logic for cluster remap
prajin-29 2738e90
Changing the logic for cluster remap
prajin-29 e283696
Updating the Unit test
prajin-29 57c841b
Merge branch 'main' into feature/cluster_remap_command
prajin-29 05101b1
Increasing the test coverage
prajin-29 51d45dd
Applying the review comments
prajin-29 53c5bcb
Applying the review comments
prajin-29 8b61365
Applying the review comments
prajin-29 6e82ac8
Applying the review comments
prajin-29 b65d1a5
Using the API only once to fetch the ids and details
prajin-29 9df440a
Modifying the code based on the comments provided
prajin-29 23a1b68
Merge branch 'main' into feature/cluster_remap_command
nfx File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
import logging | ||
|
||
from databricks.labs.blueprint.installation import Installation | ||
from databricks.labs.blueprint.tui import Prompts | ||
from databricks.sdk import WorkspaceClient | ||
from databricks.sdk.errors import InvalidParameterValue | ||
from databricks.sdk.service.compute import ClusterDetails, DataSecurityMode | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class ClusterAccess: | ||
def __init__(self, installation: Installation, ws: WorkspaceClient, prompts: Prompts): | ||
self._ws = ws | ||
self._prompts = prompts | ||
self._installation = installation | ||
|
||
def list_cluster(self): | ||
clusters = [ | ||
clusters | ||
for clusters in self._ws.clusters.list() | ||
if clusters.cluster_source is not None and clusters.cluster_source.name != "JOB" | ||
] | ||
return clusters | ||
|
||
def _get_access_mode(self, access_mode: str): | ||
if access_mode in {"LEGACY_SINGLE_USER", "SINGLE_USER"}: | ||
return DataSecurityMode.SINGLE_USER | ||
return DataSecurityMode.USER_ISOLATION | ||
|
||
def map_cluster_to_uc(self, cluster_id: str, cluster_details: list[ClusterDetails]): | ||
if cluster_id != "<ALL>": | ||
cluster_ids = [x.strip() for x in cluster_id.split(",")] | ||
cluster_id_list = [cluster for cluster in cluster_details if cluster.cluster_id in cluster_ids] | ||
else: | ||
cluster_id_list = cluster_details | ||
spark_version = self._ws.clusters.select_spark_version(latest=True, long_term_support=True) | ||
for cluster in cluster_id_list: | ||
try: | ||
assert cluster.cluster_id is not None | ||
if cluster.data_security_mode is None: | ||
raise InvalidParameterValue(f"Data security Mode is None for the cluster {cluster.cluster_id}") | ||
access_mode = self._get_access_mode(cluster.data_security_mode.name) | ||
self._installation.save(cluster, filename=f'backup/clusters/{cluster.cluster_id}.json') | ||
logger.info(f"Editing the cluster of cluster: {cluster.cluster_id} with access_mode as {access_mode}") | ||
self._ws.clusters.edit( | ||
cluster_id=cluster.cluster_id, | ||
cluster_name=cluster.cluster_name, | ||
spark_version=spark_version, | ||
num_workers=cluster.num_workers, | ||
spark_conf=cluster.spark_conf, | ||
spark_env_vars=cluster.spark_env_vars, | ||
data_security_mode=access_mode, | ||
node_type_id=cluster.node_type_id, | ||
autoscale=cluster.autoscale, | ||
policy_id=cluster.policy_id, | ||
autotermination_minutes=cluster.autotermination_minutes, | ||
custom_tags=cluster.custom_tags, | ||
init_scripts=cluster.init_scripts, | ||
cluster_log_conf=cluster.cluster_log_conf, | ||
aws_attributes=cluster.aws_attributes, | ||
ssh_public_keys=cluster.ssh_public_keys, | ||
enable_elastic_disk=cluster.enable_elastic_disk, | ||
cluster_source=cluster.cluster_source, | ||
instance_pool_id=cluster.instance_pool_id, | ||
enable_local_disk_encryption=cluster.enable_local_disk_encryption, | ||
driver_instance_pool_id=cluster.driver_instance_pool_id, | ||
) | ||
except InvalidParameterValue as e: | ||
logger.warning(f"skipping cluster remapping: {e}") | ||
nfx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def revert_cluster_remap(self, cluster_ids: str, total_cluster_ids: list): | ||
if cluster_ids != "<ALL>": | ||
cluster_list = [x.strip() for x in cluster_ids.split(",")] | ||
else: | ||
cluster_list = total_cluster_ids | ||
logger.info(f"Reverting the configurations for the cluster {cluster_list}") | ||
for cluster in cluster_list: | ||
try: | ||
cluster_details = self._installation.load(ClusterDetails, filename=f"backup/clusters/{cluster}.json") | ||
if cluster_details.spark_version is None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. spark version may be absent if cluster is using a policy, so this line is not necessary |
||
raise InvalidParameterValue( | ||
f"Spark Version is not present in the config file for the cluster:{cluster}" | ||
) | ||
if cluster_details.cluster_id is None: | ||
raise InvalidParameterValue( | ||
f"cluster Id is not present in the config file for the cluster:{cluster}" | ||
) | ||
num_workers = cluster_details.num_workers if cluster_details.num_workers else 0 | ||
self._ws.clusters.edit( | ||
cluster_id=cluster_details.cluster_id, | ||
cluster_name=cluster_details.cluster_name, | ||
spark_version=cluster_details.spark_version, | ||
num_workers=num_workers, | ||
spark_conf=cluster_details.spark_conf, | ||
spark_env_vars=cluster_details.spark_env_vars, | ||
data_security_mode=cluster_details.data_security_mode, | ||
node_type_id=cluster_details.node_type_id, | ||
autoscale=cluster_details.autoscale, | ||
policy_id=cluster_details.policy_id, | ||
autotermination_minutes=cluster_details.autotermination_minutes, | ||
custom_tags=cluster_details.custom_tags, | ||
init_scripts=cluster_details.init_scripts, | ||
cluster_log_conf=cluster_details.cluster_log_conf, | ||
aws_attributes=cluster_details.aws_attributes, | ||
ssh_public_keys=cluster_details.ssh_public_keys, | ||
enable_elastic_disk=cluster_details.enable_elastic_disk, | ||
cluster_source=cluster_details.cluster_source, | ||
instance_pool_id=cluster_details.instance_pool_id, | ||
enable_local_disk_encryption=cluster_details.enable_local_disk_encryption, | ||
driver_instance_pool_id=cluster_details.driver_instance_pool_id, | ||
) | ||
except InvalidParameterValue as e: | ||
logger.warning(f"skipping cluster remapping: {e}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
from unittest.mock import create_autospec | ||
|
||
from databricks.labs.blueprint.installation import Installation | ||
from databricks.labs.blueprint.tui import MockPrompts | ||
from databricks.sdk import WorkspaceClient | ||
from databricks.sdk.service.compute import ( | ||
ClusterDetails, | ||
ClusterSource, | ||
DataSecurityMode, | ||
) | ||
|
||
from databricks.labs.ucx.workspace_access.clusters import ClusterAccess | ||
|
||
|
||
nfx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
def test_map_cluster_to_uc(caplog): | ||
ws = create_autospec(WorkspaceClient) | ||
cluster_details = [ | ||
ClusterDetails( | ||
cluster_id="123", cluster_name="test_cluster", data_security_mode=DataSecurityMode.LEGACY_SINGLE_USER | ||
) | ||
] | ||
prompts = MockPrompts({}) | ||
installation = create_autospec(Installation) | ||
installation.save.return_value = "a/b/c" | ||
cluster = ClusterAccess(installation, ws, prompts) | ||
with caplog.at_level('INFO'): | ||
cluster.map_cluster_to_uc(cluster_id="123", cluster_details=cluster_details) | ||
assert 'Editing the cluster of cluster: 123 with access_mode as DataSecurityMode.SINGLE_USER' in caplog.messages | ||
|
||
|
||
def test_map_cluster_to_uc_shared(caplog): | ||
ws = create_autospec(WorkspaceClient) | ||
ws.clusters.list.return_value = [ | ||
ClusterDetails( | ||
cluster_id="123", | ||
cluster_name="test_cluster", | ||
cluster_source=ClusterSource.UI, | ||
data_security_mode=DataSecurityMode.LEGACY_TABLE_ACL, | ||
), | ||
ClusterDetails(cluster_id="1234", cluster_name="test_cluster", cluster_source=ClusterSource.JOB), | ||
] | ||
cluster_details = [ | ||
ClusterDetails( | ||
cluster_id="123", | ||
cluster_name="test_cluster", | ||
cluster_source=ClusterSource.UI, | ||
data_security_mode=DataSecurityMode.LEGACY_TABLE_ACL, | ||
), | ||
ClusterDetails(cluster_id="1234", cluster_name="test_cluster", cluster_source=ClusterSource.JOB), | ||
] | ||
prompts = MockPrompts({}) | ||
installation = create_autospec(Installation) | ||
installation.save.return_value = "a/b/c" | ||
cluster = ClusterAccess(installation, ws, prompts) | ||
with caplog.at_level('INFO'): | ||
cluster.map_cluster_to_uc(cluster_id="<ALL>", cluster_details=cluster_details) | ||
assert ( | ||
'Editing the cluster of cluster: 123 with access_mode as DataSecurityMode.USER_ISOLATION' in caplog.messages | ||
) | ||
|
||
|
||
def test_list_clusters(): | ||
ws = create_autospec(WorkspaceClient) | ||
ws.clusters.list.return_value = [ | ||
ClusterDetails(cluster_id="123", cluster_name="test_cluster", cluster_source=ClusterSource.UI), | ||
ClusterDetails(cluster_id="1234", cluster_name="test_cluster1", cluster_source=ClusterSource.JOB), | ||
] | ||
prompts = MockPrompts({}) | ||
installation = create_autospec(Installation) | ||
installation.save.return_value = "a/b/c" | ||
cluster = ClusterAccess(installation, ws, prompts) | ||
cluster_list = cluster.list_cluster() | ||
assert cluster_list[0].cluster_id == "123" | ||
assert len(cluster_list) == 1 | ||
|
||
|
||
def test_map_cluster_to_uc_error(caplog): | ||
ws = create_autospec(WorkspaceClient) | ||
cluster_details = [ClusterDetails(cluster_id="123", cluster_name="test_cluster")] | ||
prompts = MockPrompts({}) | ||
installation = create_autospec(Installation) | ||
installation.save.return_value = "a/b/c" | ||
cluster = ClusterAccess(installation, ws, prompts) | ||
with caplog.at_level('INFO'): | ||
cluster.map_cluster_to_uc(cluster_id="123", cluster_details=cluster_details) | ||
assert 'skipping cluster remapping: Data security Mode is None for the cluster 123' in caplog.messages | ||
|
||
|
||
def test_revert_map_cluster_to_uc(caplog): | ||
ws = create_autospec(WorkspaceClient) | ||
installation = create_autospec(Installation) | ||
prompts = MockPrompts({}) | ||
installation.load.return_value = ClusterDetails( | ||
cluster_id="123", cluster_name="test_cluster", spark_version="13.3.x-cpu-ml-scala2.12" | ||
) | ||
cluster = ClusterAccess(installation, ws, prompts) | ||
cluster.revert_cluster_remap(cluster_ids="123", total_cluster_ids=["123"]) | ||
|
||
|
||
def test_revert_all_cluster_to_uc(caplog): | ||
ws = create_autospec(WorkspaceClient) | ||
installation = create_autospec(Installation) | ||
prompts = MockPrompts({}) | ||
installation.load.return_value = ClusterDetails(cluster_id="123", cluster_name="test_cluster") | ||
cluster = ClusterAccess(installation, ws, prompts) | ||
with caplog.at_level('INFO'): | ||
cluster.revert_cluster_remap(cluster_ids="<ALL>", total_cluster_ids=["123", "234"]) | ||
assert "Reverting the configurations for the cluster ['123', '234']" in caplog.messages | ||
|
||
|
||
def test_revert_cluster_to_uc_empty_cluster(caplog): | ||
ws = create_autospec(WorkspaceClient) | ||
installation = create_autospec(Installation) | ||
prompts = MockPrompts({}) | ||
installation.load.return_value = ClusterDetails( | ||
cluster_name="test_cluster", spark_version="13.3.x-cpu-ml-scala2.12" | ||
) | ||
cluster = ClusterAccess(installation, ws, prompts) | ||
with caplog.at_level('INFO'): | ||
cluster.revert_cluster_remap(cluster_ids="123", total_cluster_ids=["123"]) | ||
assert ( | ||
'skipping cluster remapping: cluster Id is not present in the config file for the cluster:123' | ||
in caplog.messages | ||
) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to make this resilient to cluster API changes? The Cluster UI and options are constantly changing, how can this code just focus on the specifics of credentials, spark configs and data security mode and pass through all the other configuration stuff. Will this code break frequently? [I don't know]