Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistently use WorkspaceClient from databricks.sdk #120

Merged
merged 5 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions src/databricks/labs/ucx/inventory/inventorizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
ObjectType,
SecretScope,
)
from ratelimit import limits, sleep_and_retry

from databricks.labs.ucx.inventory.listing import WorkspaceListing
from databricks.labs.ucx.inventory.types import (
Expand All @@ -23,7 +24,6 @@
PermissionsInventoryItem,
RequestObjectType,
)
from databricks.labs.ucx.providers.client import ImprovedWorkspaceClient
from databricks.labs.ucx.providers.groups_info import GroupMigrationState
from databricks.labs.ucx.utils import ProgressReporter, ThreadedExecution

Expand Down Expand Up @@ -57,7 +57,7 @@ def logical_object_types(self) -> list[LogicalObjectType]:

def __init__(
self,
ws: ImprovedWorkspaceClient,
ws: WorkspaceClient,
logical_object_type: LogicalObjectType,
request_object_type: RequestObjectType,
listing_function: Callable[..., Iterator[InventoryObject]],
Expand All @@ -72,9 +72,14 @@ def __init__(
self._permissions_function = permissions_function if permissions_function else self._safe_get_permissions
self._objects: list[InventoryObject] = []

@sleep_and_retry
@limits(calls=100, period=1)
def _get_permissions(self, request_object_type: RequestObjectType, request_object_id: str):
return self._ws.permissions.get(request_object_type=request_object_type, request_object_id=request_object_id)

def _safe_get_permissions(self, request_object_type: RequestObjectType, object_id: str) -> ObjectPermissions | None:
try:
permissions = self._ws.get_permissions(request_object_type, object_id)
permissions = self._get_permissions(request_object_type, object_id)
return permissions
except DatabricksError as e:
if e.error_code in ["RESOURCE_DOES_NOT_EXIST", "RESOURCE_NOT_FOUND", "PERMISSION_DENIED"]:
Expand Down Expand Up @@ -119,7 +124,7 @@ class TokensAndPasswordsInventorizer(BaseInventorizer[InventoryObject]):
def logical_object_types(self) -> list[LogicalObjectType]:
return [LogicalObjectType.TOKEN, LogicalObjectType.PASSWORD]

def __init__(self, ws: ImprovedWorkspaceClient):
def __init__(self, ws: WorkspaceClient):
self._ws = ws
self._tokens_acl = []
self._passwords_acl = []
Expand Down Expand Up @@ -188,7 +193,7 @@ class SecretScopeInventorizer(BaseInventorizer[InventoryObject]):
def logical_object_types(self) -> list[LogicalObjectType]:
return [LogicalObjectType.SECRET_SCOPE]

def __init__(self, ws: ImprovedWorkspaceClient):
def __init__(self, ws: WorkspaceClient):
self._ws = ws
self._scopes = ws.secrets.list_scopes()

Expand Down Expand Up @@ -221,7 +226,7 @@ class WorkspaceInventorizer(BaseInventorizer[InventoryObject]):
def logical_object_types(self) -> list[LogicalObjectType]:
return [LogicalObjectType.NOTEBOOK, LogicalObjectType.DIRECTORY, LogicalObjectType.REPO, LogicalObjectType.FILE]

def __init__(self, ws: ImprovedWorkspaceClient, num_threads=20, start_path: str | None = "/"):
def __init__(self, ws: WorkspaceClient, num_threads=20, start_path: str | None = "/"):
self._ws = ws
self.listing = WorkspaceListing(
ws,
Expand Down Expand Up @@ -262,13 +267,18 @@ def __convert_request_object_type_to_logical_type(request_object_type: RequestOb
case RequestObjectType.FILES:
return LogicalObjectType.FILE

@sleep_and_retry
@limits(calls=100, period=1)
def _get_permissions(self, request_object_type: RequestObjectType, request_object_id: str):
return self._ws.permissions.get(request_object_type=request_object_type, request_object_id=request_object_id)

def _convert_result_to_permission_item(self, _object: ObjectInfo) -> PermissionsInventoryItem | None:
request_object_type = self.__convert_object_type_to_request_type(_object)
if not request_object_type:
return
else:
try:
permissions = self._ws.get_permissions(
permissions = self._get_permissions(
request_object_type=request_object_type, request_object_id=_object.object_id
)
except DatabricksError as e:
Expand Down Expand Up @@ -306,7 +316,7 @@ class RolesAndEntitlementsInventorizer(BaseInventorizer[InventoryObject]):
def logical_object_types(self) -> list[LogicalObjectType]:
return [LogicalObjectType.ROLES, LogicalObjectType.ENTITLEMENTS]

def __init__(self, ws: ImprovedWorkspaceClient, migration_state: GroupMigrationState):
def __init__(self, ws: WorkspaceClient, migration_state: GroupMigrationState):
self._ws = ws
self._migration_state = migration_state
self._group_info: list[Group] = []
Expand Down Expand Up @@ -363,7 +373,7 @@ def inner() -> Iterator[ModelDatabricks]:

class Inventorizers:
@staticmethod
def provide(ws: ImprovedWorkspaceClient, migration_state: GroupMigrationState, num_threads: int):
def provide(ws: WorkspaceClient, migration_state: GroupMigrationState, num_threads: int):
return [
RolesAndEntitlementsInventorizer(ws, migration_state),
TokensAndPasswordsInventorizer(ws),
Expand Down
17 changes: 11 additions & 6 deletions src/databricks/labs/ucx/inventory/listing.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import datetime as dt
import logging
from collections.abc import Iterator
from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait
from itertools import groupby

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.workspace import ObjectInfo, ObjectType

from databricks.labs.ucx.providers.client import ImprovedWorkspaceClient
from ratelimit import limits, sleep_and_retry

logger = logging.getLogger(__name__)


class WorkspaceListing:
def __init__(
self,
ws: ImprovedWorkspaceClient,
ws: WorkspaceClient,
num_threads: int,
*,
with_directories: bool = True,
Expand All @@ -39,12 +40,16 @@ def _progress_report(self, _):
f" rps: {rps:.3f}/sec"
)

@sleep_and_retry
@limits(calls=45, period=1) # safety value, can be 50 actually
def _list_workspace(self, path: str) -> Iterator[ObjectType]:
# TODO: remove, use SDK
return self._ws.workspace.list(path=path, recursive=False)

def _list_and_analyze(self, obj: ObjectInfo) -> (list[ObjectInfo], list[ObjectInfo]):
directories = []
others = []
grouped_iterator = groupby(
self._ws.list_workspace(obj.path), key=lambda x: x.object_type == ObjectType.DIRECTORY
)
grouped_iterator = groupby(self._list_workspace(obj.path), key=lambda x: x.object_type == ObjectType.DIRECTORY)
for is_directory, objects in grouped_iterator:
if is_directory:
directories.extend(list(objects))
Expand Down
67 changes: 63 additions & 4 deletions src/databricks/labs/ucx/inventory/permissions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
import random
import time
Expand All @@ -6,8 +7,10 @@
from functools import partial
from typing import Literal

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.iam import AccessControlRequest, Group, ObjectPermissions
from databricks.sdk.service.workspace import AclItem as SdkAclItem
from ratelimit import limits, sleep_and_retry
from tenacity import retry, stop_after_attempt, wait_fixed, wait_random

from databricks.labs.ucx.inventory.inventorizer import BaseInventorizer
Expand All @@ -19,7 +22,6 @@
RequestObjectType,
RolesAndEntitlements,
)
from databricks.labs.ucx.providers.client import ImprovedWorkspaceClient
from databricks.labs.ucx.providers.groups_info import GroupMigrationState
from databricks.labs.ucx.utils import ThreadedExecution, safe_get_acls

Expand Down Expand Up @@ -51,7 +53,7 @@ class RolesAndEntitlementsRequestPayload:

# TODO: this class has too many @staticmethod and they must not be such. write a unit test for this logic.
class PermissionManager:
def __init__(self, ws: ImprovedWorkspaceClient, inventory_table_manager: InventoryTableManager):
def __init__(self, ws: WorkspaceClient, inventory_table_manager: InventoryTableManager):
self._ws = ws
self.inventory_table_manager = inventory_table_manager
self._inventorizers = []
Expand Down Expand Up @@ -195,16 +197,30 @@ def _scope_permissions_applicator(self, request_payload: SecretsPermissionReques
f"Expected: {_acl_item.permission}. Actual: {applied_acls.permission}"
)

@sleep_and_retry
@limits(calls=30, period=1)
def _update_permissions(
self,
request_object_type: RequestObjectType,
request_object_id: str,
access_control_list: list[AccessControlRequest],
):
return self._ws.permissions.update(
request_object_type=request_object_type,
request_object_id=request_object_id,
access_control_list=access_control_list,
)

def _standard_permissions_applicator(self, request_payload: PermissionRequestPayload):
self._ws.update_permissions(
self._update_permissions(
request_object_type=request_payload.request_object_type,
request_object_id=request_payload.object_id,
access_control_list=request_payload.access_control_list,
)

def applicator(self, request_payload: AnyRequestPayload):
if isinstance(request_payload, RolesAndEntitlementsRequestPayload):
self._ws.apply_roles_and_entitlements(
self._apply_roles_and_entitlements(
group_id=request_payload.group_id,
roles=request_payload.payload.roles,
entitlements=request_payload.payload.entitlements,
Expand All @@ -216,6 +232,49 @@ def applicator(self, request_payload: AnyRequestPayload):
else:
logger.warning(f"Unsupported payload type {type(request_payload)}")

@sleep_and_retry
@limits(calls=10, period=1) # assumption
def _apply_roles_and_entitlements(self, group_id: str, roles: list, entitlements: list):
# TODO: move to other places, this won't be in SDK
op_schema = "urn:ietf:params:scim:api:messages:2.0:PatchOp"
schemas = []
operations = []

if entitlements:
schemas.append(op_schema)
entitlements_payload = {
"op": "add",
"path": "entitlements",
"value": entitlements,
}
operations.append(entitlements_payload)

if roles:
schemas.append(op_schema)
roles_payload = {
"op": "add",
"path": "roles",
"value": roles,
}
operations.append(roles_payload)

if operations:
request = {
"schemas": schemas,
"Operations": operations,
}
self._patch_workspace_group(group_id, request)

def _patch_workspace_group(self, group_id: str, payload: dict):
# TODO: replace usages
# self.groups.patch(group_id,
# schemas=[PatchSchema.URN_IETF_PARAMS_SCIM_API_MESSAGES_2_0_PATCH_OP],
# operations=[
# Patch(op=PatchOp.ADD, path='..', value='...')
# ])
path = f"/api/2.0/preview/scim/v2/Groups/{group_id}"
self._ws.api_client.do("PATCH", path, data=json.dumps(payload))

def _apply_permissions_in_parallel(
self,
requests: list[AnyRequestPayload],
Expand Down
32 changes: 28 additions & 4 deletions src/databricks/labs/ucx/managers/group.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import json
import logging
import typing
from functools import partial

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.iam import Group
from ratelimit import limits, sleep_and_retry

from databricks.labs.ucx.config import GroupsConfig
from databricks.labs.ucx.generic import StrEnum
from databricks.labs.ucx.providers.client import ImprovedWorkspaceClient
from databricks.labs.ucx.providers.groups_info import (
GroupMigrationState,
MigrationGroupInfo,
Expand All @@ -24,7 +26,7 @@ class GroupLevel(StrEnum):
class GroupManager:
SYSTEM_GROUPS: typing.ClassVar[list[str]] = ["users", "admins", "account users"]

def __init__(self, ws: ImprovedWorkspaceClient, groups: GroupsConfig):
def __init__(self, ws: WorkspaceClient, groups: GroupsConfig):
self._ws = ws
self.config = groups
self._migration_state: GroupMigrationState = GroupMigrationState()
Expand All @@ -39,9 +41,18 @@ def _find_eligible_groups(self) -> list[str]:
logger.info(f"Found {len(eligible_groups)} eligible groups")
return [g.display_name for g in eligible_groups]

@sleep_and_retry
@limits(calls=100, period=1) # assumption
def _list_account_level_groups(
self, filter: str, attributes: str | None = None, excluded_attributes: str | None = None # noqa: A002
) -> list[Group]:
query = {"filter": filter, "attributes": attributes, "excludedAttributes": excluded_attributes}
response = self._ws.api_client.do("GET", "/api/2.0/account/scim/v2/Groups", query=query)
return [Group.from_dict(v) for v in response.get("Resources", [])]

def _get_group(self, group_name, level: GroupLevel) -> Group | None:
# TODO: calling this can cause issues for SCIM backend, cache groups instead
method = self._ws.groups.list if level == GroupLevel.WORKSPACE else self._ws.list_account_level_groups
method = self._ws.groups.list if level == GroupLevel.WORKSPACE else self._list_account_level_groups
query_filter = f"displayName eq '{group_name}'"
attributes = ",".join(["id", "displayName", "meta", "entitlements", "roles", "members"])

Expand Down Expand Up @@ -99,7 +110,20 @@ def _replace_group(self, migration_info: MigrationGroupInfo):
else:
logger.warning(f"Workspace-level group {ws_group.display_name} does not exist, skipping")

self._ws.reflect_account_group_to_workspace(acc_group)
self._reflect_account_group_to_workspace(acc_group)

@sleep_and_retry
@limits(calls=5, period=1) # assumption
def _reflect_account_group_to_workspace(self, acc_group: Group) -> None:
logger.info(f"Reflecting group {acc_group.display_name} to workspace")

# TODO: add OpenAPI spec for it
principal_id = acc_group.id
permissions = ["USER"]
path = f"/api/2.0/preview/permissionassignments/principals/{principal_id}"
self._ws.api_client.do("PUT", path, data=json.dumps({"permissions": permissions}))

logger.info(f"Group {acc_group.display_name} successfully reflected to workspace")

# please keep the public methods below this line

Expand Down
Loading