Skip to content

Commit

Permalink
NIFI-13777: Fixed stateless group startup around Controller Services …
Browse files Browse the repository at this point in the history
…by changing the startup logic in two ways. First, we enable Controller Services before we create the Stateless Flow. Secondly, we do not bother enabling the ephemeral Controller Services that are created during the Stateless Flow creation/synchronization. This ensures that if Processors make use of a Controller Service during its @OnScheduled method (which is part of the Stateless Flow initialization process) that the Controller Service is Enabled. It also avoids calling @onEnabled methods of Controller Services that will never actually be used, as Processors make use of the existing Controller Services when running within standard NiFi
  • Loading branch information
markap14 committed Sep 20, 2024
1 parent e155e6b commit f267bbf
Show file tree
Hide file tree
Showing 11 changed files with 109 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1472,7 +1472,7 @@ private void run(final ScheduledExecutorService taskScheduler, final long admini

final Processor processor = processorRef.get().getProcessor();
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor, new StandardLoggingContext(StandardProcessorNode.this));
LOG.info("Starting {}", this);
LOG.debug("Starting {}", this);

ScheduledState currentState;
boolean starting;
Expand Down Expand Up @@ -1804,7 +1804,7 @@ public CompletableFuture<Void> stop(final ProcessScheduler processScheduler, fin
final SchedulingAgent schedulingAgent, final LifecycleState lifecycleState, final boolean triggerLifecycleMethods) {

final Processor processor = processorRef.get().getProcessor();
LOG.info("Stopping processor: {}", this);
LOG.debug("Stopping processor: {}", this);
desiredState = ScheduledState.STOPPED;

final CompletableFuture<Void> future = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import org.apache.nifi.stateless.repository.StatelessProvenanceRepository;
import org.apache.nifi.stateless.repository.StatelessRepositoryContextFactory;

import javax.net.ssl.SSLContext;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
Expand All @@ -89,6 +88,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import javax.net.ssl.SSLContext;

public class StandardStatelessGroupNodeFactory implements StatelessGroupNodeFactory {
private final FlowController flowController;
Expand Down Expand Up @@ -296,9 +296,11 @@ public Future<Set<Bundle>> fetch(final Set<BundleCoordinate> bundleCoordinates,

child.synchronizeFlow(versionedExternalFlow, synchronizationOptions, flowMappingOptions);
child.setParent(group);

return child;
}


private FlowEngine lazyInitializeThreadPool(final AtomicReference<FlowEngine> reference, final Supplier<FlowEngine> factory) {
FlowEngine threadPool = reference.get();
if (threadPool == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@

import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -438,18 +439,32 @@ public void trigger() {

// Start each Processor. This won't trigger the Processor to run because the Execution Engine will be Stateless.
// However, it will transition its scheduled state to the appropriate value.
LOG.debug("All Controller Services for {} have been enabled; starting Processors and ports.", groupNode);
groupNode.getProcessGroup().findAllProcessors().forEach(proc -> startProcessor(proc, false));
groupNode.getProcessGroup().findAllInputPorts().forEach(port -> startConnectable(port));
groupNode.getProcessGroup().findAllOutputPorts().forEach(port -> startConnectable(port));
groupNode.getProcessGroup().findAllControllerServices().forEach(service -> enableControllerService(service));

getSchedulingAgent(groupNode).schedule(groupNode, lifecycleState);
future.complete(null);
}
};

LOG.info("Starting {}", groupNode);
groupNode.start(componentMonitoringThreadPool, callback, lifecycleState);
// Enable all of the Controller Services. Once they have all become enabled, we will start the Process Group
// We have to enable the Controller Services first in case any of the Processors use a Controller Service in its
// @OnScheduled method. While a new copy of the Processor is created for each Concurrent Task in the stateless group,
// we do not use a separate copy of the Controller Service, because doing so would cause problems for services that
// perform functions such as caching or connection pooling.
final List<CompletableFuture<?>> serviceStartFutures = new ArrayList<>();
for (final ControllerServiceNode serviceNode : groupNode.getProcessGroup().findAllControllerServices()) {
serviceStartFutures.add(enableControllerService(serviceNode));
}

final CompletableFuture<?> allServiceStartFutures = CompletableFuture.allOf(serviceStartFutures.toArray(new CompletableFuture<?>[0]));
allServiceStartFutures.thenRun(() -> {
LOG.info("Starting {}", groupNode);
groupNode.start(componentMonitoringThreadPool, callback, lifecycleState);
});

return future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.nifi.stateless.engine.StatelessProcessContextFactory;
import org.apache.nifi.stateless.flow.StandardDataflowDefinition;
import org.apache.nifi.stateless.flow.StandardStatelessFlow;
import org.apache.nifi.stateless.flow.StatelessDataflowInitializationContext;
import org.apache.nifi.stateless.flow.TransactionThresholds;
import org.apache.nifi.stateless.repository.RepositoryContextFactory;
import org.apache.nifi.util.FormatUtils;
Expand Down Expand Up @@ -355,7 +356,20 @@ public void yield(final ProcessorNode procNode) {
lifecycleStateManager,
Duration.of(10, ChronoUnit.SECONDS));

dataflow.initialize();
// We don't want to enable Controller Services because we want to use the actual Controller Services that exist within the
// Standard NiFi instance, not the ephemeral ones that created during the initialization of the Stateless Group.
// This may not matter for Controller Services such as Record Readers and Writers, but it can matter for other types
// of Controller Services, such as connection pooling services. We don't want to create a new instance of a Connection Pool
// for each Concurrent Task in a Stateless Group, for example.
// Since we will not be using the ephemeral Controller Services, we also do not want to enable them.
final StatelessDataflowInitializationContext initializationContext = new StatelessDataflowInitializationContext() {
@Override
public boolean isEnableControllerServices() {
return false;
}
};

dataflow.initialize(initializationContext);

return dataflow;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,20 @@
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.servlet.ServletContext;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DefaultValue;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.HttpMethod;
import jakarta.ws.rs.PUT;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.StreamingOutput;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction;
Expand Down Expand Up @@ -142,20 +155,6 @@
import org.apache.nifi.web.api.request.FlowMetricsRegistry;
import org.apache.nifi.web.api.request.IntegerParameter;
import org.apache.nifi.web.api.request.LongParameter;

import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DefaultValue;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.HttpMethod;
import jakarta.ws.rs.PUT;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.StreamingOutput;
import org.apache.nifi.web.servlet.shared.RequestUriBuilder;
import org.apache.nifi.web.util.PaginationHelper;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -946,7 +945,7 @@ public Response scheduleComponents(
)
@PathParam("id") String id,
@Parameter(
description = "The request to schedule or unschedule. If the comopnents in the request are not specified, all authorized components will be considered.",
description = "The request to schedule or unschedule. If the components in the request are not specified, all authorized components will be considered.",
required = true
) final ScheduleComponentsEntity requestScheduleComponentsEntity) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface StatelessDataflow {
* Triggers the dataflow to run, returning a DataflowTrigger that can be used to wait for the result. Uses the {@link DataflowTriggerContext#IMPLICIT_CONTEXT}.
* @return a DataflowTrigger that can be used to wait for the result
*
* @throws IllegalStateException if called before {@link #initialize()} is called.
* @throws IllegalStateException if called before {@link #initialize(StatelessDataflowInitializationContext)} is called.
*/
default DataflowTrigger trigger() {
return trigger(DataflowTriggerContext.IMPLICIT_CONTEXT);
Expand All @@ -43,7 +43,7 @@ default DataflowTrigger trigger() {
* @param triggerContext the trigger context to use
* @return a DataflowTrigger that can be used to wait for the result
*
* @throws IllegalStateException if called before {@link #initialize()} is called.
* @throws IllegalStateException if called before {@link #initialize(StatelessDataflowInitializationContext)} is called.
*/
DataflowTrigger trigger(DataflowTriggerContext triggerContext);

Expand All @@ -66,7 +66,7 @@ default DataflowTrigger trigger() {
* This method MUST be called prior to calling {@link #trigger()}.
* </p>
*/
void initialize();
void initialize(StatelessDataflowInitializationContext initializationContext);

default void shutdown() {
shutdown(true, false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.stateless.flow;

public interface StatelessDataflowInitializationContext {

boolean isEnableControllerServices();

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.nifi.stateless.flow.DataflowDefinition;
import org.apache.nifi.stateless.flow.DataflowTrigger;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.StatelessDataflowInitializationContext;
import org.apache.nifi.stateless.flow.StatelessDataflowValidation;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.slf4j.Logger;
Expand Down Expand Up @@ -92,7 +93,12 @@ public static StatelessDataflow createDataflow(final StatelessEngineConfiguratio
final DataflowDefinition dataflowDefinition = bootstrap.parseDataflowDefinition(flowDefinitionFile, parameterOverrides);

final StatelessDataflow dataflow = bootstrap.createDataflow(dataflowDefinition);
dataflow.initialize();
dataflow.initialize(new StatelessDataflowInitializationContext() {
@Override
public boolean isEnableControllerServices() {
return true;
}
});

final StatelessDataflowValidation validation = dataflow.performValidation();
if (!validation.isValid()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ public void onTaskComplete() {
final Supplier<ProcessContext> processContextSupplier = () -> processContextFactory.createProcessContext(procNode);
procNode.start(componentMonitoringThreadPool, ADMINISTRATIVE_YIELD_MILLIS, this.processorStartTimeoutMillis, processContextSupplier, callback, failIfStopping, true);
return future;

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public static boolean isTerminalPort(final Connectable connectable) {
}

@Override
public void initialize() {
public void initialize(final StatelessDataflowInitializationContext initializationContext) {
if (initialized) {
logger.debug("{} initialize() was called, but dataflow has already been initialized. Returning without doing anything.", this);
return;
Expand All @@ -248,9 +248,14 @@ public void initialize() {
// before proceeding any further.
try {
final long serviceEnableStart = System.currentTimeMillis();
enableControllerServices(rootGroup);

waitForServicesEnabled(rootGroup);
if (initializationContext.isEnableControllerServices()) {
enableControllerServices(rootGroup);
waitForServicesEnabled(rootGroup);
} else {
logger.debug("Skipping Controller Service enablement because initializationContext.isEnableControllerServices() returned false");
}

final long serviceEnableMillis = System.currentTimeMillis() - serviceEnableStart;

// Perform validation again so that any processors that reference controller services that were just
Expand Down Expand Up @@ -474,9 +479,14 @@ private void startProcessors(final ProcessGroup processGroup) {
try {
future.get(this.componentEnableTimeoutMillis, TimeUnit.MILLISECONDS);
} catch (final Exception e) {
final String validationErrors = performValidation().toString();
throw new IllegalStateException("Processor " + processor + " has not fully enabled. Current Validation Status is "
+ processor.getValidationStatus() + ". All validation errors: " + validationErrors);
final StatelessDataflowValidation validation = performValidation();
if (validation.isValid()) {
throw new IllegalStateException("Processor " + processor + " is valid but has not fully started", e);
} else {
final String validationErrors = performValidation().toString();
throw new IllegalStateException("Processor " + processor + " has not fully started. Current Validation Status is "
+ processor.getValidationStatus() + ". All validation errors: " + validationErrors);
}
}

final long millis = System.currentTimeMillis() - start;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
import org.apache.nifi.stateless.flow.DataflowDefinition;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.StatelessDataflowInitializationContext;
import org.apache.nifi.stateless.flow.TransactionThresholds;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -232,7 +233,12 @@ public TransactionThresholds getTransactionThresholds() {

final StatelessBootstrap bootstrap = StatelessBootstrap.bootstrap(getEngineConfiguration());
final StatelessDataflow dataflow = bootstrap.createDataflow(dataflowDefinition);
dataflow.initialize();
dataflow.initialize(new StatelessDataflowInitializationContext() {
@Override
public boolean isEnableControllerServices() {
return true;
}
});

createdFlows.add(dataflow);
return dataflow;
Expand Down

0 comments on commit f267bbf

Please sign in to comment.