Skip to content

Commit

Permalink
NIFI-12991 Fixed Recursive Validation of Group Execution Engine (#9186)
Browse files Browse the repository at this point in the history
Fixed issue in which we recursively validate that we can change Execution Engine of a group, but without changing the top level, inherited groups can't actually be changed; instead, replaced logic to ensure that we can change the top level, that we don't validate the constraint that a STATELESS group cannot have a STATEFUL child, and then validate that all descendant groups can be updated, but without checking the descendants' parent groups.

Signed-off-by: David Handermann <exceptionfactory@apache.org>
  • Loading branch information
markap14 authored Aug 20, 2024
1 parent 8919955 commit da9c296
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -4406,6 +4406,23 @@ public void verifyCanSetExecutionEngine(final ExecutionEngine executionEngine) {
}
}

// Ensure that we are not changing a parent to Stateless when a child is explicitly set to STANDARD.
if (resolvedProposedEngine == ExecutionEngine.STATELESS) {
for (final ProcessGroup descendant : findAllProcessGroups()) {
final ExecutionEngine descendantEngine = descendant.getExecutionEngine();
if (descendantEngine == ExecutionEngine.STANDARD) {
throw new IllegalStateException("A Process Group using the Stateless Engine may not have a child Process Group using the Standard Engine. Cannot set Execution Engine of " + this +
" to Stateless because it has a child Process Group " + descendant + " using the Standard Engine");
}
}
}

verifyCanUpdateExecutionEngine();
}

@Override
public void verifyCanUpdateExecutionEngine() {
// Ensure that no components are running / services enabled.
for (final ProcessorNode processor : getProcessors()) {
if (processor.isRunning()) {
throw new IllegalStateException("Cannot change Execution Engine for " + this + " while components are running. " + processor + " is currently running.");
Expand All @@ -4432,16 +4449,18 @@ public void verifyCanSetExecutionEngine(final ExecutionEngine executionEngine) {
}
}

// Ensure that there is no data queued.
for (final Connection connection : getConnections()) {
final boolean queueEmpty = connection.getFlowFileQueue().isEmpty();
if (!queueEmpty) {
throw new IllegalStateException("Cannot change Execution Engine for " + this + " while data is queued. " + connection + " has data queued.");
}
}

// Ensure that all descendants are in a good state for updating the execution engine.
for (final ProcessGroup child : getProcessGroups()) {
if (child.getExecutionEngine() == ExecutionEngine.INHERITED) {
child.verifyCanSetExecutionEngine(executionEngine);
child.verifyCanUpdateExecutionEngine();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1214,6 +1214,12 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
*/
void verifyCanSetExecutionEngine(ExecutionEngine executionEngine);

/**
* Verifies that the Process Group is in a state in which the Execution Engine can be changed.
* @throws IllegalStateException if the Execution Engine cannot be changed at this time
*/
void verifyCanUpdateExecutionEngine();

/**
* Sets the maximum number on concurrent tasks that can be run in this Process Group if using the Stateless Execution Engine
* @param maxConcurrentTasks the maximum number of concurrent tasks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,10 @@ public ExecutionEngine resolveExecutionEngine() {
public void verifyCanSetExecutionEngine(final ExecutionEngine executionEngine) {
}

@Override
public void verifyCanUpdateExecutionEngine() {
}

@Override
public void setMaxConcurrentTasks(final int maxConcurrentTasks) {
}
Expand Down

0 comments on commit da9c296

Please sign in to comment.