Skip to content

Commit

Permalink
Add end-point and CLI command for deleting an asset when unreferenced
Browse files Browse the repository at this point in the history
  • Loading branch information
bbende committed Aug 5, 2024
1 parent 0b43d4a commit 6977a96
Show file tree
Hide file tree
Showing 12 changed files with 241 additions and 20 deletions.
1 change: 1 addition & 0 deletions nifi-docs/src/main/asciidoc/toolkit-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ The following are available commands:
nifi create-asset
nifi list-assets
nifi get-asset
nifi delete-asset
nifi add-asset-reference
nifi remove-asset-reference
registry current-user
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public interface AssetManager {
void initialize(AssetManagerInitializationContext context);

/**
* Creates a new Asset with the given name and contents. If the replicate flag is set to true, the asset will be replicated to all nodes in the cluster.
* Creates a new Asset with the given name and contents.
* @param parameterContextId the id of the parameter context
* @param assetName the name of the asset
* @param contents the contents of the asset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,8 @@ public NarManager narManager(@Autowired final NarPersistenceProvider narPersiste
* @return Asset Manager
*/
@Bean
public AssetManager assetManager(@Autowired final FlowController flowController) {
return flowController.getAssetManager();
public AssetManager assetManager() throws Exception {
return flowController().getAssetManager();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.apache.nifi.web.api.entity.ActionEntity;
import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
import org.apache.nifi.web.api.entity.AffectedComponentEntity;
import org.apache.nifi.web.api.entity.AssetEntity;
import org.apache.nifi.web.api.entity.BulletinEntity;
import org.apache.nifi.web.api.entity.ComponentValidationResultEntity;
import org.apache.nifi.web.api.entity.ConfigurationAnalysisEntity;
Expand Down Expand Up @@ -2909,4 +2910,24 @@ ControllerServiceReferencingComponentsEntity updateControllerServiceReferencingC
*/
NarSummaryEntity deleteNar(String identifier) throws IOException;

// ----------------------------------------
// Asset Manager methods
// ----------------------------------------

/**
* Verifies the given asset can be deleted from the given parameter context.
*
* @param parameterContextId the parameter context id
* @param assetId the asset id
*/
void verifyDeleteAsset(String parameterContextId, String assetId);

/**
* Deletes the given asset from the given parameter context.
*
* @param parameterContextId the parameter context id
* @param assetId the asset id
*/
AssetEntity deleteAsset(String parameterContextId, String assetId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@
import org.apache.nifi.web.api.entity.ActionEntity;
import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
import org.apache.nifi.web.api.entity.AffectedComponentEntity;
import org.apache.nifi.web.api.entity.AssetEntity;
import org.apache.nifi.web.api.entity.BulletinEntity;
import org.apache.nifi.web.api.entity.ComponentReferenceEntity;
import org.apache.nifi.web.api.entity.ComponentValidationResultEntity;
Expand Down Expand Up @@ -6748,6 +6749,38 @@ public NarSummaryEntity deleteNar(final String identifier) throws IOException {
return entityFactory.createNarSummaryEntity(narSummaryDTO);
}

@Override
public void verifyDeleteAsset(final String parameterContextId, final String assetId) {
final ParameterContext parameterContext = parameterContextDAO.getParameterContext(parameterContextId);
final Set<String> referencingParameterNames = getReferencingParameterNames(parameterContext, assetId);
if (!referencingParameterNames.isEmpty()) {
final String joinedParametersNames = String.join(", ", referencingParameterNames);
throw new IllegalStateException("Unable to delete Asset [%s] because it is currently references by Parameters [%s]".formatted(assetId, joinedParametersNames));
}
}

private Set<String> getReferencingParameterNames(final ParameterContext parameterContext, final String assetId) {
final Set<String> referencingParameterNames = new HashSet<>();
for (final Parameter parameter : parameterContext.getParameters().values()) {
if (parameter.getReferencedAssets() != null) {
for (final Asset asset : parameter.getReferencedAssets()) {
if (asset.getIdentifier().equals(assetId)) {
referencingParameterNames.add(parameter.getDescriptor().getName());
}
}
}
}
return referencingParameterNames;
}

@Override
public AssetEntity deleteAsset(final String parameterContextId, final String assetId) {
verifyDeleteAsset(parameterContextId, assetId);
final Asset deletedAsset = assetManager.deleteAsset(assetId)
.orElseThrow(() -> new ResourceNotFoundException("Asset does not exist with id [%s]".formatted(assetId)));
return dtoFactory.createAssetEntity(deletedAsset);
}

private PermissionsDTO createPermissionDto(
final String id,
final org.apache.nifi.flow.ComponentType subjectComponentType,
Expand Down Expand Up @@ -7039,6 +7072,7 @@ public void setNarManager(final NarManager narManager) {
this.narManager = narManager;
}

@Autowired
public void setAssetManager(final AssetManager assetManager) {
this.assetManager = assetManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ public Response createAsset(
parameterContext.authorize(authorizer, RequestAction.WRITE, user);

// Verify READ and WRITE permissions for user, for every component that is affected
// This is necessary because this end-point may be called to replace the content of an asset that is referenced in a parameter that is already in use
affectedComponents.forEach(component -> parameterUpdateManager.authorizeAffectedComponent(component, lookup, user, true, true));
});

Expand Down Expand Up @@ -555,6 +556,86 @@ public Response getAssetContent(
.build();
}

@DELETE
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("{contextId}/assets/{assetId}")
@Operation(
summary = "Deletes an Asset from the given Parameter Context",
responses = @ApiResponse(content = @Content(schema = @Schema(implementation = AssetEntity.class))),
description = "This endpoint will create a new Asset in the given Parameter Context. The Asset will be created with the given name and the contents of the file that is uploaded. " +
"The Asset will be created in the given Parameter Context, and will be available for use by any component that references the Parameter Context.",
security = {
@SecurityRequirement(name = "Read - /parameter-contexts/{parameterContextId}"),
@SecurityRequirement(name = "Write - /parameter-contexts/{parameterContextId}"),
@SecurityRequirement(name = "Read - for every component that is affected by the update"),
@SecurityRequirement(name = "Write - for every component that is affected by the update"),
@SecurityRequirement(name = "Read - for every currently inherited parameter context")
}
)
@ApiResponses(
value = {
@ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(responseCode = "401", description = "Client could not be authenticated."),
@ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."),
@ApiResponse(responseCode = "404", description = "The specified resource could not be found."),
@ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.")
}
)
public Response deleteAsset(
@QueryParam(DISCONNECTED_NODE_ACKNOWLEDGED) @DefaultValue("false")
final Boolean disconnectedNodeAcknowledged,
@Parameter(description = "The ID of the Parameter Context")
@PathParam("contextId")
final String parameterContextId,
@Parameter(description = "The ID of the Asset")
@PathParam("assetId")
final String assetId
) {
if (StringUtils.isBlank(parameterContextId)) {
throw new IllegalArgumentException("Parameter context id is required");
}
if (StringUtils.isBlank(assetId)) {
throw new IllegalArgumentException("Asset id is required");
}

if (isReplicateRequest()) {
return replicate(HttpMethod.DELETE);
} else if (isDisconnectedFromCluster()) {
verifyDisconnectedNodeModification(disconnectedNodeAcknowledged);
}

// Get the context or throw ResourceNotFoundException
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final ParameterContextEntity contextEntity = serviceFacade.getParameterContext(parameterContextId, false, user);

final AssetDTO assetDTO = new AssetDTO();
assetDTO.setId(assetId);

final AssetEntity assetEntity = new AssetEntity();
assetEntity.setAsset(assetDTO);

// Need to call into service facade with a lock to ensure that the parameter context can't be updated to
// reference the asset being deleted at the same time that we are verifying no parameters reference it
return withWriteLock(
serviceFacade,
assetEntity,
lookup -> {
// Deletion of an asset will only be allowed when it is not referenced by any parameters, so we only need to
// authorize that the user has access to modify the context which is READ and WRITE on the context itself
final ParameterContext parameterContext = lookup.getParameterContext(contextEntity.getId());
parameterContext.authorize(authorizer, RequestAction.READ, user);
parameterContext.authorize(authorizer, RequestAction.WRITE, user);
},
() -> serviceFacade.verifyDeleteAsset(contextEntity.getId(), assetId),
requestEntity -> {
final String requestAssetId = requestEntity.getAsset().getId();
final AssetEntity deletedAsset = serviceFacade.deleteAsset(contextEntity.getId(), requestAssetId);
return generateOkResponse(deletedAsset).build();
}
);
}

@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
Expand Down Expand Up @@ -727,9 +808,11 @@ private void validateAssetReferences(final ParameterContextDTO parameterContextD
if (StringUtils.isBlank(referencedAsset.getId())) {
throw new IllegalArgumentException("Asset reference id cannot be blank");
}
final Optional<Asset> asset = assetManager.getAsset(referencedAsset.getId());
if (asset.isEmpty()) {
throw new IllegalArgumentException("Request contains a reference to an Asset (%s) that does not exist".formatted(referencedAsset));
final Asset asset = assetManager.getAsset(referencedAsset.getId())
.orElseThrow(() -> new IllegalArgumentException("Request contains a reference to an Asset (%s) that does not exist".formatted(referencedAsset)));
if (!asset.getParameterContextIdentifier().equals(parameterContextDto.getId())) {
throw new IllegalArgumentException("Request contains a reference to an Asset (%s) that does not exist in Parameter Context (%s)"
.formatted(referencedAsset, parameterContextDto.getId()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ public ParameterContext updateParameterContext(final ParameterContextDTO paramet
verifyUpdate(parameterContextDto, true);

final ParameterContext context = getParameterContext(parameterContextDto.getId());
final Set<String> originalReferencedAssets = getReferencedAssetIds(context);

if (parameterContextDto.getName() != null) {
verifyNoNamingConflict(parameterContextDto.getName(), parameterContextDto.getId());
Expand All @@ -306,14 +305,6 @@ public ParameterContext updateParameterContext(final ParameterContextDTO paramet
context.setInheritedParameterContexts(inheritedParameterContexts);
}

final Set<String> updatedReferencedAssets = getReferencedAssetIds(context);
final Set<String> removedReferences = new HashSet<>(originalReferencedAssets);
removedReferences.removeAll(updatedReferencedAssets);
for (final String removedReference : removedReferences) {
assetManager.deleteAsset(removedReference);
logger.info("Removed Asset {} because it is no longer being referenced", removedReference);
}

return context;
}

Expand All @@ -336,7 +327,7 @@ public List<ParameterContext> getInheritedParameterContexts(final ParameterConte
if (parameterContextDto.getInheritedParameterContexts() != null) {
inheritedParameterContexts.addAll(parameterContextDto.getInheritedParameterContexts().stream()
.map(entity -> flowManager.getParameterContextManager().getParameterContext(entity.getComponent().getId()))
.collect(Collectors.toList()));
.toList());
}

return inheritedParameterContexts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,12 +725,21 @@ public void testAssetReference() throws NiFiClientException, IOException, Interr
final AssetsEntity assetListing = assertAssetListing(paramContext.getId(), 1);
assertAssetExists(asset, assetListing);

// Attempt to delete the asset should be prevented since it is still referenced
assertThrows(NiFiClientException.class, () -> getNifiClient().getParamContextClient().deleteAsset(paramContext.getId(), asset.getAsset().getId()));

// Change the parameter so that it no longer references the asset
final ParameterContextUpdateRequestEntity removeAssetUpdateRequest = getClientUtil().updateParameterContext(paramContext, Map.of("fileToIngest", "invalid"));

// Wait for the update to complete
getClientUtil().waitForParameterContextRequestToComplete(paramContext.getId(), removeAssetUpdateRequest.getRequest().getRequestId());

// Attempt to delete the asset should now succeed since no longer referenced
final AssetEntity deletedAssetEntity = getNifiClient().getParamContextClient().deleteAsset(paramContext.getId(), asset.getAsset().getId());
assertNotNull(deletedAssetEntity);
assertNotNull(deletedAssetEntity.getAsset());
assertEquals(asset.getAsset().getId(), deletedAssetEntity.getAsset().getId());

// Ensure that the directories no longer exist
waitFor(() -> !node1ContextDir.exists());

Expand Down Expand Up @@ -805,7 +814,7 @@ public void testAssetsRemovedWhenDeletingParameterContext() throws NiFiClientExc
}

@Test
public void testAssetsRemovedWhenRemovingReference() throws NiFiClientException, IOException, InterruptedException {
public void testAssetsRemainWhenRemovingReference() throws NiFiClientException, IOException, InterruptedException {
// Create context
final ParameterContextEntity paramContext = getClientUtil().createParameterContext("testAssetsRemovedWhenDeletingParameterContext",
Map.of("name", "foo", "fileToIngest", ""));
Expand Down Expand Up @@ -836,10 +845,10 @@ public void testAssetsRemovedWhenRemovingReference() throws NiFiClientException,
final ParameterContextUpdateRequestEntity removeReferenceUpdateRequest = getClientUtil().updateParameterAssetReferences(paramContext, Map.of("fileToIngest", List.of()));
getClientUtil().waitForParameterContextRequestToComplete(paramContext.getId(), removeReferenceUpdateRequest.getRequest().getRequestId());

// Verify the directory for the context's assets was removed
assertFalse(node1ContextDir.exists());
// Verify the directory for the context's assets was not removed
assertTrue(node1ContextDir.exists());
if (node2ContextDir != null) {
assertFalse(node2ContextDir.exists());
assertTrue(node2ContextDir.exists());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,7 @@ public interface ParamContextClient {

File getAssetContent(String contextId, String assetId, File outputDirectory) throws NiFiClientException, IOException;

AssetEntity deleteAsset(String contextId, String assetId) throws NiFiClientException, IOException;

AssetEntity deleteAsset(String contextId, String assetId, boolean disconnectedNodeAcknowledged) throws NiFiClientException, IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,4 +243,31 @@ public File getAssetContent(final String contextId, final String assetId, final
}
});
}

@Override
public AssetEntity deleteAsset(final String contextId, final String assetId) throws NiFiClientException, IOException {
return deleteAsset(contextId, assetId, false);
}

@Override
public AssetEntity deleteAsset(final String contextId, final String assetId, final boolean disconnectedNodeAcknowledged)
throws NiFiClientException, IOException {
if (StringUtils.isBlank(contextId)) {
throw new IllegalArgumentException("Parameter context id cannot be null or blank");
}
if (StringUtils.isBlank(assetId)) {
throw new IllegalArgumentException("Asset id cannot be null or blank");
}
return executeAction("Error deleting asset", () -> {
WebTarget target = paramContextTarget.path("{context-id}/assets/{asset-id}")
.resolveTemplate("context-id", contextId)
.resolveTemplate("asset-id", assetId);

if (disconnectedNodeAcknowledged) {
target = target.queryParam("disconnectedNodeAcknowledged", "true");
}

return getRequestBuilder(target).delete(AssetEntity.class);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.CreateAsset;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.CreateParamContext;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.CreateParamProvider;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.DeleteAsset;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.GetAsset;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.ListAssets;
import org.apache.nifi.toolkit.cli.impl.command.nifi.params.RemoveAssetReference;
Expand Down Expand Up @@ -220,6 +221,7 @@ protected List<Command> createCommands() {
commands.add(new CreateAsset());
commands.add(new ListAssets());
commands.add(new GetAsset());
commands.add(new DeleteAsset());
commands.add(new AddAssetReference());
commands.add(new RemoveAssetReference());
return new ArrayList<>(commands);
Expand Down
Loading

0 comments on commit 6977a96

Please sign in to comment.