-
Notifications
You must be signed in to change notification settings - Fork 2.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NIFI-11556: Added ability to use a Process Group as a Stateless Flow #7253
Conversation
2060bc5
to
90733b9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for introducing this new feature @markap14! The changes cover a lot of ground, most of the file adjustments are straightforward given the nature of the changes.
I only noted a couple minor questions on initial code review. I will run through some additional runtime testing.
nifi-api/src/main/java/org/apache/nifi/components/PortFunction.java
Outdated
Show resolved
Hide resolved
nifi-api/src/main/java/org/apache/nifi/flow/ExecutionEngine.java
Outdated
Show resolved
Hide resolved
.../nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/SystemDiagnosticsSnapshotDTO.java
Show resolved
Hide resolved
...k-core/src/main/java/org/apache/nifi/controller/serialization/VersionedFlowSynchronizer.java
Show resolved
Hide resolved
...ifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
Outdated
Show resolved
Hide resolved
...-system-test-suite/src/test/java/org/apache/nifi/tests/system/registry/RegistryClientIT.java
Outdated
Show resolved
Hide resolved
Rebased to main in order to address conflicts and squashed commits in order to make that rebase easier. Thanks for the feedback @exceptionfactory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates @markap14.
While going through some runtime testing, it looks like Controller Services remain in Enabled status when a Process Group is configured for Stateless execution. As a result, the Services cannot be disabled or changed.
42fb4e8
to
37dd87b
Compare
Force pushed to rebase against main, due to conflicts. |
Just experimenting with it. It appears the 'stateless flow timeout' is always shown on the process group settings but presumably only matters when the execution engine is 'stateless'. Perhaps we can ensure a tooltip shows up on the process group properties so a user could quickly read that property is often irrelevant/only useful if the engine selection is stateless? |
Went to try and put a stateful group inside a stateless group and it does block it which is cool. But the error message buries the lede a bit. "Cannot change Execution Engine for StandardProcessGroup[identifier=87dcc86d-0189-1000-9176-d4a998c280f0,name=Stateful Group] to STANDARD because parent group StandardProcessGroup[identifier=87d912dd-0189-1000-13e8-b1b11eb1c2e6,name=Stateless Group] is configured to use the Stateless Engine. A Process Group using the Stateless Engine may be embedded within a Process Group using the Traditional Engine, but the reverse is not allowed." Instead can we reverse the order of that output, be consistent about references to 'standard vs traditional' and simplify the language to avoid referring to the 'reverse' situation. For instance: "A process group using a standard engine may not be a child of a process group using the stateless engine. Cannot set PG[bla-standard] as a child of PG[bla2-stateless]." |
On root group created a process group and selected stateless engine. In that stateless group created a GenerateFlowFile -> UpdateAttribute flow with a batch size of 500 and 1KB non unique text objects. Realized I cannot start/stop individual components and instead must do so at the group level based on error responses. Ideally we would prevent those buttons from being used at all as otherwise we're doing a user experience based on 'try and fail' which is often not fun. Ideally we improve that now or in time. When I started the group I realized I had the wrong scheduling period set as the default is 1min so a single flowfile went through. I changed to '0 secs' and tried again. Nothing happened and I noticed that scheduling period was returned to '1 min' suggesting my changes aren't taking effect. |
If I then change the flow to be traditional with generate flowfile feeding into stateless group with updateattribute the strange scheduling behavior remains. Cannot change from 1min to 0 secs as it keeps returning to its old form. But when I start things it runs a couple times then stops. The behavior seems a good bit off even for traditional flow. I did copy/paste from the stateless group to the traditional so maybe that is a factor. |
Thanks for testing it out @joewitt. I agree that the message on that error could be improved. Will update that. As for the UI showing things that aren't really applicable: I agree, ideally we hide things that are not applicable. However, my UI skills are very much lacking. So I did what I felt was minimally necessary in terms of UI. I think we could actually go pretty far there - even removing the 'Scheduling' tab from processors in a Stateless Group that have incoming connections. I do feel that could be a follow-on Jira, though, to improve the UI and remove the elements that are not relevant in a Stateless group. |
@joewitt I was able to replicate the issue around the scheduling period. It looks like this bug is actually present on the |
OK pushed some changes @joewitt . Will now automatically show/hide the "Max Concurrent Tasks" and "Stateless Flow Timeout" settings based on the chosen Execution Engine. Also addressed wording of the error messages. |
Had a scenario I figured would cause problems. Nope. It works perfectly Cannot change Execution Engine for StandardProcessGroup[identifier=888d6dd0-0189-1000-e88e-b0a720bf53e6,name=Stateless Group] while components are running. UpdateAttribute[id=888d916e-0189-1000-749a-0d0497e2b6e2] is currently running. Running a typical GenerateFF->UpdateAttr flow in both standard and stateless. Seeing performance being double the speed in standard vs stateless which I think surprises me. Stateless has 5 concurrent threads whereas standard has 1 task for generate and update. It seems like standard runs both generate and update at once whereas stateless runs one then the other and never more than one. So that makes sense speed is double in standard case. Other note: We should deprecate in 1.x line the ExecuteStateless processor. This will be superior in every way to that. |
The above info was with run duration set above zero. With run duration at 0 on all things the behavior seems to possibly flip with standard being slower due to backlogs on connection hitting 10K slowing it down. |
I had a flow running in a PG that is traditional and a PG that is stateless. After restart the stateless PG did not automatically restart. |
With provenance in Volatile mode now I see stateless flow running with all 5 threads. I think my laptop will melt |
Ok again it did not restart the stateless group. So build a stateless flow in nifi. Have it running. Restart NiFi. Then it all comes back up running. Restart nifi again without doing anything else and the stateless flow will come up not running. Traditional flows do run though. Repeated that pattern twice. and it happened twice. |
I see this in the logs once I tell nifi to restart.
|
nifi appears stuck runnig the integration tests/system tests in nifi-system-tests with mvn clean install -Pintegration-tests It isn't moving and stack dump shows
|
Thanks @joewitt . I haven't been able to replicate an issue with the system test freezing, but based on the thread dump it looks like the nifi instance failed to startup. Perhaps due to a port conflict or something of that nature? I did track down the issue with the Stateless Group not starting up when NiFi is restarted, though. I pushed a fix for that. Great catch! |
…ge; automatically show/hide the Stateless-specific settings for a ProcessGroup based on the chosen Execution Engine
…rt after restart, if there were two restarted in a row without changing the flow in any way
…Processors, Controller Services (even those not executed by the Stateless Engine) are stopped/disabled before considering the Stateless Group to be fully STOPPED.
…, ensure that we wait for the ports' active threads to complete before returning
…eck the result and if the queue still has data (because a Processor hasn't acknowledged the data, for example) then continue issuing request until the queue fully becomes empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing several recent issues @markap14, this looks close to completion.
Running through several sample flows, the basic operations appear to be covered well.
I noticed one minor issue, where it is possible to set Max Concurrent Tasks to -1
, which allows the Stateless group to be started, but it never runs. That could be addressed as a follow-on issue if needed.
I observed one other issue, however, that seems worth correcting before merging. When configured a nested group for Stateless execution, and having Processors in the nested group reference Controller Services in a parent group, the NiFi system does not shutdown properly. The logs indicate application shutdown, and Jetty stops, but the process itself is still running, leaving the bootstrap handler to kill the NiFI process after the grace period. This seems to indicate a thread or executor that is not stopping properly. It appears to be related to the fact that Controller Services in a parent Standard-execution group are enabled, and disabling does not work as expected. This is not a problem when Controller Services are defined in the Stateless-execution group itself, as they are Disabled in terms of framework status. If we can track down this issue, I think this pull request should be ready to go.
...ifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessGroupDTO.java
Outdated
Show resolved
Hide resolved
… set less than 1 for stateless group; fixed typo in ProcessGroupDTO's docs; on shutdown, we may need to disable controller services asynchronously. At that point, the thread pool used to do so may already be shutdown. If so, catch this and create a new single-thread pool, disable the service, and immediately shutdown the pool. Also, if we fail to disable services on shutdown of a stateless flow, instead of throwing an Exception, just log it and move on - it doesn't make much sense for shutdown() to throw an Exception in that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the latest updates @markap14. The validation for Max Concurrent Tasks is working as expected.
On further testing, there still seems to be a timing issue with shutting down the system when a Stateless Process Group is running. When configuring 2 or 4 Max Concurrent Tasks for a Stateless group, the following exception occurs when trying to stop NiFi, which appears to indicate that the executor is unable to complete the job of disabling references Controller Services:
2023-08-08 12:15:36,071 INFO [Framework Task Thread Thread-3] o.a.n.s.flow.StandardStatelessFlow Disabling 1 Controller Services
2023-08-08 12:15:36,072 ERROR [Framework Task Thread Thread-3] org.apache.nifi.engine.FlowEngine Uncaught Exception in Runnable task
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@6228b4ee[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@554f4591[Wrapped task = org.apache.nifi.engine.FlowEngine$2@7a54fe16]] rejected from org.apache.nifi.engine.FlowEngine@34a20a50[Shutting down, pool size = 2, active threads = 1, queued tasks = 0, completed tasks = 6]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:340)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:562)
at org.apache.nifi.engine.FlowEngine.schedule(FlowEngine.java:87)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor.submit(ScheduledThreadPoolExecutor.java:715)
at org.apache.nifi.controller.scheduling.StandardProcessScheduler.submitFrameworkTask(StandardProcessScheduler.java:148)
at org.apache.nifi.controller.service.StandardControllerServiceProvider.disableControllerServicesAsync(StandardControllerServiceProvider.java:405)
at org.apache.nifi.stateless.flow.StandardStatelessFlow.shutdown(StandardStatelessFlow.java:387)
at org.apache.nifi.controller.tasks.StatelessFlowTask.shutdown(StatelessFlowTask.java:150)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.apache.nifi.groups.StandardStatelessGroupNode.shutdownFlows(StandardStatelessGroupNode.java:315)
at org.apache.nifi.groups.StandardStatelessGroupNode.lambda$stop$3(StandardStatelessGroupNode.java:405)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
…tdownSeconds seconds for the components to stop before shutting down thread pools. This allows for asynchronous operations such as disableControllerServicesAsync to complete during shutdown. Updated StandardStatelessFlow so that on shutdown it catches more general Exception to ensure that shutdown succeeds
I am just beginning to review this and it will take some time because it is a significant change. A couple comments off the bat. Can you add "help" dialogs for the new properties for the configuration of a process group - specifically Execution Engine, Max Concurrent Tasks, Stateless Flow Timeout? See the Log File Suffix property for an example. Can you add some information in the PR description to assist in evaluating this one? It is non-trivial to say the least and not having any description of what was added/modified, why, or what areas to focus on makes reviewing even more challenging. Thanks! |
The latest version has a clean shutdown with the Controller Service reference scenario described, thanks @markap14! I pushed a correction for the unused import, otherwise this looks ready to go pending successful builds. |
Thanks for taking a look @markobean. As you noted, there is a lot here, but it builds on the core capabilities of Stateless execution. If you are not familiar with Stateless operation, or the ExecuteStateless Processor, it would be worth reviewing the associated documentation for those features. It is worth giving some consideration to how approachable this capability is for general users, so improving usability is worth doing. However, some of those items could be considered under a follow-on effort. One of the main concerns with this pull request is to ensure existing functionality works, and the new optional functionality operates within reasonable parameters. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following another set of successful automated builds, this looks ready to merge. Thanks for the testing @joewitt, and thanks for resolving the various issues raised @markap14!
@markobean as mentioned, please feel free to review the details and raise additional Jira issues for improvements. Similar to the support for Python-based components, this has significant impact on the framework for NiFi 2.0, but we still have some time to making adjustments as we continue working on technical debt reduction.
+1 merging
Summary
NIFI-00000
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000
NIFI-00000
Pull Request Formatting
main
branchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
mvn clean install -P contrib-check
Licensing
LICENSE
andNOTICE
filesDocumentation