From f267bbf8a0ff688134faa8b2db3e0caa63d713fd Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 20 Sep 2024 12:33:07 -0400 Subject: [PATCH] NIFI-13777: Fixed stateless group startup around Controller Services by changing the startup logic in two ways. First, we enable Controller Services before we create the Stateless Flow. Secondly, we do not bother enabling the ephemeral Controller Services that are created during the Stateless Flow creation/synchronization. This ensures that if Processors make use of a Controller Service during its @OnScheduled method (which is part of the Stateless Flow initialization process) that the Controller Service is Enabled. It also avoids calling @OnEnabled methods of Controller Services that will never actually be used, as Processors make use of the existing Controller Services when running within standard NiFi --- .../controller/StandardProcessorNode.java | 4 +-- .../StandardStatelessGroupNodeFactory.java | 4 ++- .../scheduling/StandardProcessScheduler.java | 21 ++++++++++++-- .../groups/StandardStatelessGroupNode.java | 16 +++++++++- .../org/apache/nifi/web/api/FlowResource.java | 29 +++++++++---------- .../stateless/flow/StatelessDataflow.java | 6 ++-- ...tatelessDataflowInitializationContext.java | 24 +++++++++++++++ .../stateless/bootstrap/RunStatelessFlow.java | 8 ++++- .../scheduling/StatelessProcessScheduler.java | 1 - .../stateless/flow/StandardStatelessFlow.java | 22 ++++++++++---- .../nifi/stateless/StatelessSystemIT.java | 8 ++++- 11 files changed, 109 insertions(+), 34 deletions(-) create mode 100644 nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflowInitializationContext.java diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index a48aca3be0c4..52874421f995 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -1472,7 +1472,7 @@ private void run(final ScheduledExecutorService taskScheduler, final long admini final Processor processor = processorRef.get().getProcessor(); final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor, new StandardLoggingContext(StandardProcessorNode.this)); - LOG.info("Starting {}", this); + LOG.debug("Starting {}", this); ScheduledState currentState; boolean starting; @@ -1804,7 +1804,7 @@ public CompletableFuture stop(final ProcessScheduler processScheduler, fin final SchedulingAgent schedulingAgent, final LifecycleState lifecycleState, final boolean triggerLifecycleMethods) { final Processor processor = processorRef.get().getProcessor(); - LOG.info("Stopping processor: {}", this); + LOG.debug("Stopping processor: {}", this); desiredState = ScheduledState.STOPPED; final CompletableFuture future = new CompletableFuture<>(); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java index 6c4919e10a3e..46e76c84071c 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardStatelessGroupNodeFactory.java @@ -74,7 +74,6 @@ import org.apache.nifi.stateless.repository.StatelessProvenanceRepository; import org.apache.nifi.stateless.repository.StatelessRepositoryContextFactory; -import javax.net.ssl.SSLContext; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.Collections; @@ -89,6 +88,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BooleanSupplier; import java.util.function.Supplier; +import javax.net.ssl.SSLContext; public class StandardStatelessGroupNodeFactory implements StatelessGroupNodeFactory { private final FlowController flowController; @@ -296,9 +296,11 @@ public Future> fetch(final Set bundleCoordinates, child.synchronizeFlow(versionedExternalFlow, synchronizationOptions, flowMappingOptions); child.setParent(group); + return child; } + private FlowEngine lazyInitializeThreadPool(final AtomicReference reference, final Supplier factory) { FlowEngine threadPool = reference.get(); if (threadPool == null) { diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 715c04cbdf7a..a1323deece57 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -62,6 +62,7 @@ import java.lang.reflect.InvocationTargetException; import java.net.URL; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; @@ -438,18 +439,32 @@ public void trigger() { // Start each Processor. This won't trigger the Processor to run because the Execution Engine will be Stateless. // However, it will transition its scheduled state to the appropriate value. + LOG.debug("All Controller Services for {} have been enabled; starting Processors and ports.", groupNode); groupNode.getProcessGroup().findAllProcessors().forEach(proc -> startProcessor(proc, false)); groupNode.getProcessGroup().findAllInputPorts().forEach(port -> startConnectable(port)); groupNode.getProcessGroup().findAllOutputPorts().forEach(port -> startConnectable(port)); - groupNode.getProcessGroup().findAllControllerServices().forEach(service -> enableControllerService(service)); getSchedulingAgent(groupNode).schedule(groupNode, lifecycleState); future.complete(null); } }; - LOG.info("Starting {}", groupNode); - groupNode.start(componentMonitoringThreadPool, callback, lifecycleState); + // Enable all of the Controller Services. Once they have all become enabled, we will start the Process Group + // We have to enable the Controller Services first in case any of the Processors use a Controller Service in its + // @OnScheduled method. While a new copy of the Processor is created for each Concurrent Task in the stateless group, + // we do not use a separate copy of the Controller Service, because doing so would cause problems for services that + // perform functions such as caching or connection pooling. + final List> serviceStartFutures = new ArrayList<>(); + for (final ControllerServiceNode serviceNode : groupNode.getProcessGroup().findAllControllerServices()) { + serviceStartFutures.add(enableControllerService(serviceNode)); + } + + final CompletableFuture allServiceStartFutures = CompletableFuture.allOf(serviceStartFutures.toArray(new CompletableFuture[0])); + allServiceStartFutures.thenRun(() -> { + LOG.info("Starting {}", groupNode); + groupNode.start(componentMonitoringThreadPool, callback, lifecycleState); + }); + return future; } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java index 0c3b458b0b72..70d4af831de1 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java @@ -56,6 +56,7 @@ import org.apache.nifi.stateless.engine.StatelessProcessContextFactory; import org.apache.nifi.stateless.flow.StandardDataflowDefinition; import org.apache.nifi.stateless.flow.StandardStatelessFlow; +import org.apache.nifi.stateless.flow.StatelessDataflowInitializationContext; import org.apache.nifi.stateless.flow.TransactionThresholds; import org.apache.nifi.stateless.repository.RepositoryContextFactory; import org.apache.nifi.util.FormatUtils; @@ -355,7 +356,20 @@ public void yield(final ProcessorNode procNode) { lifecycleStateManager, Duration.of(10, ChronoUnit.SECONDS)); - dataflow.initialize(); + // We don't want to enable Controller Services because we want to use the actual Controller Services that exist within the + // Standard NiFi instance, not the ephemeral ones that created during the initialization of the Stateless Group. + // This may not matter for Controller Services such as Record Readers and Writers, but it can matter for other types + // of Controller Services, such as connection pooling services. We don't want to create a new instance of a Connection Pool + // for each Concurrent Task in a Stateless Group, for example. + // Since we will not be using the ephemeral Controller Services, we also do not want to enable them. + final StatelessDataflowInitializationContext initializationContext = new StatelessDataflowInitializationContext() { + @Override + public boolean isEnableControllerServices() { + return false; + } + }; + + dataflow.initialize(initializationContext); return dataflow; } diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java index 610328a35242..31359c3f5993 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java @@ -29,7 +29,20 @@ import io.swagger.v3.oas.annotations.tags.Tag; import jakarta.servlet.ServletContext; import jakarta.servlet.http.HttpServletRequest; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.DefaultValue; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.HttpMethod; +import jakarta.ws.rs.PUT; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.HttpHeaders; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.StreamingOutput; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.authorization.Authorizer; import org.apache.nifi.authorization.RequestAction; @@ -142,20 +155,6 @@ import org.apache.nifi.web.api.request.FlowMetricsRegistry; import org.apache.nifi.web.api.request.IntegerParameter; import org.apache.nifi.web.api.request.LongParameter; - -import jakarta.ws.rs.Consumes; -import jakarta.ws.rs.DefaultValue; -import jakarta.ws.rs.GET; -import jakarta.ws.rs.HttpMethod; -import jakarta.ws.rs.PUT; -import jakarta.ws.rs.Path; -import jakarta.ws.rs.PathParam; -import jakarta.ws.rs.Produces; -import jakarta.ws.rs.QueryParam; -import jakarta.ws.rs.core.HttpHeaders; -import jakarta.ws.rs.core.MediaType; -import jakarta.ws.rs.core.Response; -import jakarta.ws.rs.core.StreamingOutput; import org.apache.nifi.web.servlet.shared.RequestUriBuilder; import org.apache.nifi.web.util.PaginationHelper; import org.springframework.beans.factory.annotation.Autowired; @@ -946,7 +945,7 @@ public Response scheduleComponents( ) @PathParam("id") String id, @Parameter( - description = "The request to schedule or unschedule. If the comopnents in the request are not specified, all authorized components will be considered.", + description = "The request to schedule or unschedule. If the components in the request are not specified, all authorized components will be considered.", required = true ) final ScheduleComponentsEntity requestScheduleComponentsEntity) { diff --git a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java index d92f4fefa2a5..1e74e082226d 100644 --- a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java +++ b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflow.java @@ -31,7 +31,7 @@ public interface StatelessDataflow { * Triggers the dataflow to run, returning a DataflowTrigger that can be used to wait for the result. Uses the {@link DataflowTriggerContext#IMPLICIT_CONTEXT}. * @return a DataflowTrigger that can be used to wait for the result * - * @throws IllegalStateException if called before {@link #initialize()} is called. + * @throws IllegalStateException if called before {@link #initialize(StatelessDataflowInitializationContext)} is called. */ default DataflowTrigger trigger() { return trigger(DataflowTriggerContext.IMPLICIT_CONTEXT); @@ -43,7 +43,7 @@ default DataflowTrigger trigger() { * @param triggerContext the trigger context to use * @return a DataflowTrigger that can be used to wait for the result * - * @throws IllegalStateException if called before {@link #initialize()} is called. + * @throws IllegalStateException if called before {@link #initialize(StatelessDataflowInitializationContext)} is called. */ DataflowTrigger trigger(DataflowTriggerContext triggerContext); @@ -66,7 +66,7 @@ default DataflowTrigger trigger() { * This method MUST be called prior to calling {@link #trigger()}. *

*/ - void initialize(); + void initialize(StatelessDataflowInitializationContext initializationContext); default void shutdown() { shutdown(true, false); diff --git a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflowInitializationContext.java b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflowInitializationContext.java new file mode 100644 index 000000000000..5ce335d6e801 --- /dev/null +++ b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/flow/StatelessDataflowInitializationContext.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.stateless.flow; + +public interface StatelessDataflowInitializationContext { + + boolean isEnableControllerServices(); + +} diff --git a/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/RunStatelessFlow.java b/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/RunStatelessFlow.java index 43187b629f50..2b637dd6ec45 100644 --- a/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/RunStatelessFlow.java +++ b/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/bootstrap/RunStatelessFlow.java @@ -24,6 +24,7 @@ import org.apache.nifi.stateless.flow.DataflowDefinition; import org.apache.nifi.stateless.flow.DataflowTrigger; import org.apache.nifi.stateless.flow.StatelessDataflow; +import org.apache.nifi.stateless.flow.StatelessDataflowInitializationContext; import org.apache.nifi.stateless.flow.StatelessDataflowValidation; import org.apache.nifi.stateless.flow.TriggerResult; import org.slf4j.Logger; @@ -92,7 +93,12 @@ public static StatelessDataflow createDataflow(final StatelessEngineConfiguratio final DataflowDefinition dataflowDefinition = bootstrap.parseDataflowDefinition(flowDefinitionFile, parameterOverrides); final StatelessDataflow dataflow = bootstrap.createDataflow(dataflowDefinition); - dataflow.initialize(); + dataflow.initialize(new StatelessDataflowInitializationContext() { + @Override + public boolean isEnableControllerServices() { + return true; + } + }); final StatelessDataflowValidation validation = dataflow.performValidation(); if (!validation.isValid()) { diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java index c01988e27af1..58bd8366ece2 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java @@ -147,7 +147,6 @@ public void onTaskComplete() { final Supplier processContextSupplier = () -> processContextFactory.createProcessContext(procNode); procNode.start(componentMonitoringThreadPool, ADMINISTRATIVE_YIELD_MILLIS, this.processorStartTimeoutMillis, processContextSupplier, callback, failIfStopping, true); return future; - } @Override diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java index b18fa42664b0..b62c8a0bf018 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java @@ -230,7 +230,7 @@ public static boolean isTerminalPort(final Connectable connectable) { } @Override - public void initialize() { + public void initialize(final StatelessDataflowInitializationContext initializationContext) { if (initialized) { logger.debug("{} initialize() was called, but dataflow has already been initialized. Returning without doing anything.", this); return; @@ -248,9 +248,14 @@ public void initialize() { // before proceeding any further. try { final long serviceEnableStart = System.currentTimeMillis(); - enableControllerServices(rootGroup); - waitForServicesEnabled(rootGroup); + if (initializationContext.isEnableControllerServices()) { + enableControllerServices(rootGroup); + waitForServicesEnabled(rootGroup); + } else { + logger.debug("Skipping Controller Service enablement because initializationContext.isEnableControllerServices() returned false"); + } + final long serviceEnableMillis = System.currentTimeMillis() - serviceEnableStart; // Perform validation again so that any processors that reference controller services that were just @@ -474,9 +479,14 @@ private void startProcessors(final ProcessGroup processGroup) { try { future.get(this.componentEnableTimeoutMillis, TimeUnit.MILLISECONDS); } catch (final Exception e) { - final String validationErrors = performValidation().toString(); - throw new IllegalStateException("Processor " + processor + " has not fully enabled. Current Validation Status is " - + processor.getValidationStatus() + ". All validation errors: " + validationErrors); + final StatelessDataflowValidation validation = performValidation(); + if (validation.isValid()) { + throw new IllegalStateException("Processor " + processor + " is valid but has not fully started", e); + } else { + final String validationErrors = performValidation().toString(); + throw new IllegalStateException("Processor " + processor + " has not fully started. Current Validation Status is " + + processor.getValidationStatus() + ". All validation errors: " + validationErrors); + } } final long millis = System.currentTimeMillis() - start; diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java index 411704ffc87f..8bc146d08957 100644 --- a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java +++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java @@ -33,6 +33,7 @@ import org.apache.nifi.stateless.engine.StatelessEngineConfiguration; import org.apache.nifi.stateless.flow.DataflowDefinition; import org.apache.nifi.stateless.flow.StatelessDataflow; +import org.apache.nifi.stateless.flow.StatelessDataflowInitializationContext; import org.apache.nifi.stateless.flow.TransactionThresholds; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -232,7 +233,12 @@ public TransactionThresholds getTransactionThresholds() { final StatelessBootstrap bootstrap = StatelessBootstrap.bootstrap(getEngineConfiguration()); final StatelessDataflow dataflow = bootstrap.createDataflow(dataflowDefinition); - dataflow.initialize(); + dataflow.initialize(new StatelessDataflowInitializationContext() { + @Override + public boolean isEnableControllerServices() { + return true; + } + }); createdFlows.add(dataflow); return dataflow;