Skip to content

Commit

Permalink
refactor: finish replacement
Browse files Browse the repository at this point in the history
  • Loading branch information
doctrino committed Dec 20, 2024
1 parent ad695be commit 2b1b642
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 28 deletions.
38 changes: 12 additions & 26 deletions cognite_toolkit/_cdf_tk/loaders/_resource_loaders/auth_loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ class _ReplaceMethod:
"""This is a small helper class used in the
lookup and replace in the ACL scoped ids"""

verify_method: Callable[[str, bool, str], int]
operation: str
lookup_method: Callable[[str, bool], int]
id_name: str


Expand Down Expand Up @@ -198,7 +197,7 @@ def get_dependent_items(cls, item: dict) -> Iterable[tuple[type[ResourceLoader],
yield loader, id_

def _substitute_scope_ids(self, group: dict, is_dry_run: bool) -> dict:
replace_method_by_acl = self._create_replace_method_by_acl_and_scope(is_dry_run)
replace_method_by_acl = self._create_replace_method_by_acl_and_scope()

for capability in group.get("capabilities", []):
for acl, values in capability.items():
Expand All @@ -216,56 +215,43 @@ def _substitute_scope_ids(self, group: dict, is_dry_run: bool) -> dict:
continue
if ids := scope.get(scope_name, {}).get(replace_method.id_name, []):
values["scope"][scope_name][replace_method.id_name] = [
replace_method.verify_method(ext_id, is_dry_run, replace_method.operation)
if isinstance(ext_id, str)
else ext_id
replace_method.lookup_method(ext_id, is_dry_run) if isinstance(ext_id, str) else ext_id
for ext_id in ids
]
return group

@classmethod
def _create_replace_method_by_acl_and_scope(
cls, ToolGlobals: CDFToolConfig
) -> dict[tuple[str, str] | str, _ReplaceMethod]:
def _create_replace_method_by_acl_and_scope(self) -> dict[tuple[str, str] | str, _ReplaceMethod]:
source = {
(cap.DataSetsAcl, cap.DataSetsAcl.Scope.ID): _ReplaceMethod(
ToolGlobals.verify_dataset,
operation="replace datasetExternalId with dataSetId in group",
self.client.lookup.data_sets.id,
id_name="ids",
),
(cap.ExtractionPipelinesAcl, cap.ExtractionPipelinesAcl.Scope.ID): _ReplaceMethod(
ToolGlobals.verify_extraction_pipeline,
operation="replace extractionPipelineExternalId with extractionPipelineId in group",
self.client.lookup.extraction_pipelines.id,
id_name="ids",
),
(cap.LocationFiltersAcl, cap.LocationFiltersAcl.Scope.ID): _ReplaceMethod(
ToolGlobals.verify_locationfilter,
operation="replace locationFilterExternalId with locationFilterId in group",
self.client.lookup.location_filters.id,
id_name="ids",
),
(cap.SecurityCategoriesAcl, cap.SecurityCategoriesAcl.Scope.ID): _ReplaceMethod(
ToolGlobals.verify_security_categories,
operation="replace securityCategoryExternalId with securityCategoryId in group",
self.client.lookup.security_categories.id,
id_name="ids",
),
(cap.TimeSeriesAcl, cap.TimeSeriesAcl.Scope.ID): _ReplaceMethod(
ToolGlobals.verify_timeseries,
operation="replace timeSeriesExternalId with timeSeriesId in group",
self.client.lookup.time_series.id,
id_name="ids",
),
cap.DataSetScope: _ReplaceMethod(
ToolGlobals.verify_dataset,
operation="replace datasetExternalId with dataSetId in group",
self.client.lookup.data_sets.id,
id_name="ids",
),
cap.ExtractionPipelineScope: _ReplaceMethod(
ToolGlobals.verify_extraction_pipeline,
operation="replace extractionPipelineExternalId with extractionPipelineId in group",
self.client.lookup.extraction_pipelines.id,
id_name="ids",
),
cap.AssetRootIDScope: _ReplaceMethod(
ToolGlobals.verify_dataset,
operation="replace rootAssetExternalId with rootAssetId in group",
self.client.lookup.assets.id,
id_name="rootIds",
),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def get_dependent_items(cls, item: dict) -> Iterable[tuple[type[ResourceLoader],
if "dataSetExternalId" in item:
yield DataSetsLoader, item["dataSetExternalId"]

def load_resource_file(
def load_resource_file( # type: ignore[override]
self, filepath: Path, ToolGlobals: CDFToolConfig, is_dry_run: bool
) -> FunctionWrite | FunctionWriteList | None:
if filepath.parent.name != self.folder_name:
Expand All @@ -125,7 +125,9 @@ def load_resource(
self.extra_configs[func["externalId"]] = {}
if func.get("dataSetExternalId") is not None:
ds_external_id = func.pop("dataSetExternalId")
self.extra_configs[func["externalId"]]["dataSetId"] = self.client.lookup.data_sets.id(ds_external_id, is_dry_run)
self.extra_configs[func["externalId"]]["dataSetId"] = self.client.lookup.data_sets.id(
ds_external_id, is_dry_run
)
if "fileId" not in func:
# The fileID is required for the function to be created, but in the `.create` method
# we first create that file and then set the fileID.
Expand Down

0 comments on commit 2b1b642

Please sign in to comment.