Skip to content

Commit

Permalink
🦺 add failsafe permission get operations (#65)
Browse files Browse the repository at this point in the history
* add failsafe switches

* add raiser for non-handled cases
  • Loading branch information
renardeinside authored Aug 2, 2023
1 parent 7a62801 commit 46a1dc3
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 37 deletions.
79 changes: 53 additions & 26 deletions src/uc_migration_toolkit/managers/inventory/inventorizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from uc_migration_toolkit.providers.config import provider as config_provider
from uc_migration_toolkit.providers.groups_info import MigrationGroupsProvider
from uc_migration_toolkit.providers.logger import logger
from uc_migration_toolkit.utils import ThreadedExecution
from uc_migration_toolkit.utils import ProgressReporter, ThreadedExecution

InventoryObject = TypeVar("InventoryObject")

Expand Down Expand Up @@ -56,6 +56,18 @@ class StandardInventorizer(BaseInventorizer[InventoryObject]):
def logical_object_types(self) -> list[LogicalObjectType]:
return [self._logical_object_type]

@staticmethod
def safe_get_permissions(request_object_type: RequestObjectType, object_id: str) -> ObjectPermissions | None:
try:
permissions = provider.ws.get_permissions(request_object_type, object_id)
return permissions
except DatabricksError as e:
if e.error_code in ["RESOURCE_DOES_NOT_EXIST", "RESOURCE_NOT_FOUND", "PERMISSION_DENIED"]:
logger.warning(f"Could not get permissions for {request_object_type} {object_id} due to {e.error_code}")
return None
else:
raise e

def __init__(
self,
logical_object_type: LogicalObjectType,
Expand All @@ -68,7 +80,7 @@ def __init__(
self._request_object_type = request_object_type
self._listing_function = listing_function
self._id_attribute = id_attribute
self._permissions_function = permissions_function if permissions_function else provider.ws.get_permissions
self._permissions_function = permissions_function if permissions_function else self.safe_get_permissions
self._objects: list[InventoryObject] = []

@property
Expand All @@ -80,23 +92,24 @@ def preload(self):
self._objects = list(self._listing_function())
logger.info(f"Object metadata prepared for {len(self._objects)} objects.")

def _process_single_object(self, _object: InventoryObject) -> PermissionsInventoryItem:
def _process_single_object(self, _object: InventoryObject) -> PermissionsInventoryItem | None:
object_id = str(getattr(_object, self._id_attribute))
permissions = self._permissions_function(self._request_object_type, object_id)
inventory_item = PermissionsInventoryItem(
object_id=object_id,
logical_object_type=self._logical_object_type,
request_object_type=self._request_object_type,
raw_object_permissions=json.dumps(permissions.as_dict()),
)
return inventory_item
if permissions:
inventory_item = PermissionsInventoryItem(
object_id=object_id,
logical_object_type=self._logical_object_type,
request_object_type=self._request_object_type,
raw_object_permissions=json.dumps(permissions.as_dict()),
)
return inventory_item

def inventorize(self):
logger.info(f"Fetching permissions for {len(self._objects)} objects...")

executables = [partial(self._process_single_object, _object) for _object in self._objects]
threaded_execution = ThreadedExecution[PermissionsInventoryItem](executables)
collected = threaded_execution.run()
collected = [item for item in threaded_execution.run() if item is not None]
logger.info(f"Permissions fetched for {len(collected)} objects of type {self._request_object_type}")
return collected

Expand Down Expand Up @@ -203,12 +216,13 @@ class WorkspaceInventorizer(BaseInventorizer[InventoryObject]):
def logical_object_types(self) -> list[LogicalObjectType]:
return [LogicalObjectType.NOTEBOOK, LogicalObjectType.DIRECTORY, LogicalObjectType.REPO, LogicalObjectType.FILE]

def __init__(self):
def __init__(self, start_path: str | None = "/"):
self.listing = WorkspaceListing(
provider.ws,
num_threads=config_provider.config.num_threads,
with_directories=False,
)
self._start_path = start_path

def preload(self):
pass
Expand Down Expand Up @@ -247,23 +261,36 @@ def _convert_result_to_permission_item(self, _object: ObjectInfo) -> Permissions
if not request_object_type:
return
else:
permissions = provider.ws.permissions.get(
request_object_type=request_object_type, request_object_id=_object.object_id
)

inventory_item = PermissionsInventoryItem(
object_id=str(_object.object_id),
logical_object_type=self.__convert_request_object_type_to_logical_type(request_object_type),
request_object_type=request_object_type,
raw_object_permissions=json.dumps(permissions.as_dict()),
)
return inventory_item
try:
permissions = provider.ws.get_permissions(
request_object_type=request_object_type, request_object_id=_object.object_id
)
except DatabricksError as e:
if e.error_code in ["PERMISSION_DENIED", "RESOURCE_NOT_FOUND"]:
logger.warning(f"Cannot load permissions for {_object.path} due to error {e.error_code}")
return
else:
raise e

if permissions:
inventory_item = PermissionsInventoryItem(
object_id=str(_object.object_id),
logical_object_type=self.__convert_request_object_type_to_logical_type(request_object_type),
request_object_type=request_object_type,
raw_object_permissions=json.dumps(permissions.as_dict()),
)
return inventory_item

def inventorize(self) -> list[PermissionsInventoryItem]:
self.listing.walk("/")
self.listing.walk(self._start_path)
executables = [partial(self._convert_result_to_permission_item, _object) for _object in self.listing.results]
results = ThreadedExecution[PermissionsInventoryItem | None](executables).run()
results = [result for result in results if result]
results = ThreadedExecution[PermissionsInventoryItem | None](
executables,
progress_reporter=ProgressReporter(
len(executables), "Fetching permissions for workspace objects - processed: "
),
).run()
results = [result for result in results if result] # empty filter
logger.info(f"Permissions fetched for {len(results)} workspace objects")
return results

Expand Down
4 changes: 2 additions & 2 deletions src/uc_migration_toolkit/managers/inventory/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def _scope_permissions_applicator(request_payload: SecretsPermissionRequestPaylo

@staticmethod
def _standard_permissions_applicator(request_payload: PermissionRequestPayload):
provider.ws.permissions.update(
provider.ws.update_permissions(
request_object_type=request_payload.request_object_type,
request_object_id=request_payload.object_id,
access_control_list=request_payload.access_control_list,
Expand Down Expand Up @@ -236,4 +236,4 @@ def apply_group_permissions(
logger.info(f"Applying {len(permission_payloads)} permissions")

self._apply_permissions_in_parallel(requests=permission_payloads)
logger.info("All permissions were applied")
logger.info(f"All permissions were applied for {destination} groups")
12 changes: 3 additions & 9 deletions src/uc_migration_toolkit/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,13 @@ def __init__(
self,
executables: list[ExecutableFunction],
num_threads: int | None = None,
done_callback: Callable[..., None] | None = None,
progress_reporter: ProgressReporter | None = None,
):
self._num_threads = num_threads if num_threads else config_provider.config.num_threads
self._executables = executables
self._futures = []
self._done_callback = (
done_callback if done_callback else self._prepare_default_done_callback(len(self._executables))
)

@staticmethod
def _prepare_default_done_callback(total_executables: int):
progress_reporter = ProgressReporter(total_executables)
return progress_reporter.progress_report
_reporter = ProgressReporter(len(executables)) if not progress_reporter else progress_reporter
self._done_callback = _reporter.progress_report

def run(self) -> list[ExecutableResult]:
logger.trace("Starting threaded execution")
Expand Down

0 comments on commit 46a1dc3

Please sign in to comment.