Skip to content

Commit

Permalink
NIFI-11746: Refactored to use a caching mechanism for nifi instances …
Browse files Browse the repository at this point in the history
…in system test. Also changed some properties around so that we can have both a clustered and a standalone instance running at the same time.
  • Loading branch information
markap14 committed Jun 23, 2023
1 parent ce62363 commit 3ef7a8f
Show file tree
Hide file tree
Showing 25 changed files with 443 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,6 @@ public void setProperty(final String propertyName, final String propertyValue) t
}
}

@Override
public void setFlowXmlGz(final File flowXmlGz) throws IOException {
for (final NiFiInstance instance : instances) {
instance.setFlowXmlGz(flowXmlGz);
}
}

@Override
public void setProperties(final Map<String, String> properties) throws IOException {
Expand All @@ -156,4 +150,15 @@ public void quarantineTroubleshootingInfo(final File directory, final Throwable
instance.quarantineTroubleshootingInfo(nodeDirectory, cause);
}
}

@Override
public boolean isAccessible() {
for (final NiFiInstance instance : instances) {
if (!instance.isAccessible()) {
return false;
}
}

return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.FileNotFoundException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public class InstanceConfiguration {
private final File bootstrapConfigFile;
Expand Down Expand Up @@ -68,6 +69,27 @@ public Map<String, String> getNifiPropertiesOverrides() {
return nifiPropertiesOverrides;
}

@Override
public boolean equals(final Object other) {
if (this == other) {
return true;
}

if (other == null || getClass() != other.getClass()) {
return false;
}

final InstanceConfiguration that = (InstanceConfiguration) other;
return autoStart == that.autoStart && unpackPythonExtensions == that.unpackPythonExtensions && Objects.equals(bootstrapConfigFile, that.bootstrapConfigFile)
&& Objects.equals(instanceDirectory, that.instanceDirectory) && Objects.equals(flowXmlGz, that.flowXmlGz)
&& Objects.equals(stateDirectory, that.stateDirectory) && Objects.equals(nifiPropertiesOverrides, that.nifiPropertiesOverrides);
}

@Override
public int hashCode() {
return Objects.hash(bootstrapConfigFile, instanceDirectory, flowXmlGz, stateDirectory, autoStart, nifiPropertiesOverrides, unpackPythonExtensions);
}

public static class Builder {
private File bootstrapConfigFile;
private File instanceDirectory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1289,9 +1289,9 @@ public DropRequestEntity emptyQueue(final String connectionId) throws NiFiClient
return requestEntity;
}

public RemoteProcessGroupEntity createRPG(final String parentGroupId, final SiteToSiteTransportProtocol transportProtocol) throws NiFiClientException, IOException {
public RemoteProcessGroupEntity createRPG(final String parentGroupId, final int httpPort, final SiteToSiteTransportProtocol transportProtocol) throws NiFiClientException, IOException {
final RemoteProcessGroupDTO component = new RemoteProcessGroupDTO();
component.setTargetUri("http://localhost:5671");
component.setTargetUri("http://localhost:" + httpPort);
component.setName(component.getTargetUri());
component.setTransportProtocol(transportProtocol.name());

Expand All @@ -1300,7 +1300,7 @@ public RemoteProcessGroupEntity createRPG(final String parentGroupId, final Site
entity.setRevision(createNewRevision());

final RemoteProcessGroupEntity rpg = nifiClient.getRemoteProcessGroupClient().createRemoteProcessGroup(parentGroupId, entity);
logger.info("Created Remote Process Group [id={}, protocol={}, url={}, parentGroupId={}] for Test [{}]", rpg.getId(), transportProtocol, parentGroupId, testName);
logger.info("Created Remote Process Group [id={}, protocol={}, url={}, parentGroupId={}] for Test [{}]", rpg.getId(), transportProtocol, component.getTargetUri(), parentGroupId, testName);
return rpg;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,6 @@ default void start() {
*/
void setProperty(String propertyName, String propertyValue) throws IOException;

/**
* Change the value of the flow that should be loaded on startup
* @param flowXmlGz the file that contains the flow that should be loaded on startup
*/
void setFlowXmlGz(final File flowXmlGz) throws IOException;

/**
* Change the values of the given properties in nifi.properties. Any property that is not present in the given map will remain unchanged. If the node is already running, this change will not take
* effect until the instance is stopped and started again.
Expand All @@ -118,4 +112,11 @@ default void start() {
* @throws IOException if unable to write the information
*/
void quarantineTroubleshootingInfo(final File directory, final Throwable failureCause) throws IOException;

/**
* Checks if able to communicate with the instance
*
* @return <code>true</code> if the instance is started and the REST API can be accessed, false otherwise
*/
boolean isAccessible();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.tests.system;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;

public class NiFiInstanceCache {
private static final Logger logger = LoggerFactory.getLogger(NiFiInstanceCache.class);

private final CachedInstance standaloneInstance = new CachedInstance();
private final CachedInstance clusteredInstance = new CachedInstance();

public NiFiInstance createInstance(final NiFiInstanceFactory instanceFactory, final String testName, final boolean allowReuse) throws IOException {
final CachedInstance cachedInstance = instanceFactory.isClusteredInstance() ? clusteredInstance : standaloneInstance;

if (!allowReuse) {
logger.info("Will create new NiFi instance for {} because the test does not allow reuse of the created instance", testName);
cachedInstance.shutdown();
cachedInstance.setFactory(null, testName);

return instanceFactory.createInstance();
}

if (cachedInstance.getFactory() == null) {
cachedInstance.setFactory(instanceFactory, testName);
return cachedInstance.getOrCreateInstance();
}

if (Objects.equals(cachedInstance.getFactory(), instanceFactory)) {
logger.info("Will use cached Nifi instance that was created for test {} in order to run test {} if there is one", cachedInstance.getTestName(), testName);
return cachedInstance.getOrCreateInstance();
}

logger.info("Cached NiFi Instance created for test {} differs in configuration from what is necessary for test {}. Will shutdown cached instance.", cachedInstance.getTestName(), testName);
cachedInstance.shutdown();
cachedInstance.setFactory(instanceFactory, testName);
return cachedInstance.getOrCreateInstance();
}

public void shutdown() {
standaloneInstance.shutdown();
clusteredInstance.shutdown();
}

public void stopOrRecycle(final NiFiInstance nifiInstance) {
if (nifiInstance instanceof CachedNiFiInstance) {
((CachedNiFiInstance) nifiInstance).ensureProperState();
return;
}

nifiInstance.stop();
}

public void poison(final NiFiInstance nifiInstance) {
if (nifiInstance == null) {
return;
}

nifiInstance.stop();

standaloneInstance.poison(nifiInstance);
clusteredInstance.poison(nifiInstance);
}

private static class CachedInstance {
private NiFiInstanceFactory factory;
private CachedNiFiInstance instance;
private String testName;

public void setFactory(final NiFiInstanceFactory factory, final String testName) {
this.factory = factory;
this.testName = testName;
}

public String getTestName() {
return testName;
}

public NiFiInstanceFactory getFactory() {
return factory;
}

public void poison(final NiFiInstance toPoison) {
if (this.instance == null) {
return;
}

final NiFiInstance rawInstance = this.instance.getRawInstance();

if (Objects.equals(rawInstance, toPoison)) {
logger.info("{} has been poisoned. Will not reuse this NiFi instance", rawInstance);
this.instance = null;
}
}

public NiFiInstance getOrCreateInstance() throws IOException {
if (instance != null) {
return instance;
}

final NiFiInstance rawInstance = factory.createInstance();
this.instance = new CachedNiFiInstance(rawInstance);
return this.instance;
}

public void shutdown() {
if (instance != null) {
instance.stop();
}

instance = null;
}
}


private static class CachedNiFiInstance implements NiFiInstance {
private final NiFiInstance rawInstance;
private boolean envCreated = false;
private boolean started = false;
private boolean requireRestart;

public CachedNiFiInstance(final NiFiInstance rawInstance) {
this.rawInstance = rawInstance;
}

public NiFiInstance getRawInstance() {
return this.rawInstance;
}

public void ensureProperState() {
if (rawInstance.getNumberOfNodes() == 1) {
return;
}

if (!rawInstance.isAccessible()) {
logger.info("NiFi Instance {} is not accessible so will stop the instance to ensure proper state for the next test", rawInstance);
stop();
}
}

@Override
public boolean isAccessible() {
return rawInstance.isAccessible();
}

@Override
public void createEnvironment() throws IOException {
if (envCreated) {
return;
}

rawInstance.createEnvironment();
envCreated = true;
}

@Override
public void start(final boolean waitForCompletion) {
if (started && requireRestart) {
logger.info("Must restart NiFi Instance {} before use", rawInstance);

rawInstance.stop();
started = false;
requireRestart = false;
}

if (started) {
logger.info("NiFi Instance {} is already started", rawInstance);
return;
}

rawInstance.start(waitForCompletion);
started = true;
}

@Override
public void stop() {
logger.info("Stopping NiFi Instance {}", rawInstance);
started = false;
rawInstance.stop();
}

@Override
public boolean isClustered() {
return rawInstance.isClustered();
}

@Override
public int getNumberOfNodes() {
return rawInstance.getNumberOfNodes();
}

@Override
public int getNumberOfNodes(final boolean includeOnlyAutoStartInstances) {
return rawInstance.getNumberOfNodes(includeOnlyAutoStartInstances);
}

@Override
public NiFiInstance getNodeInstance(final int nodeIndex) {
return rawInstance.getNodeInstance(nodeIndex);
}

@Override
public Properties getProperties() throws IOException {
return rawInstance.getProperties();
}

@Override
public File getInstanceDirectory() {
return rawInstance.getInstanceDirectory();
}

@Override
public boolean isAutoStart() {
return rawInstance.isAutoStart();
}

@Override
public void setProperty(final String propertyName, final String propertyValue) throws IOException {
rawInstance.setProperty(propertyName, propertyValue);
requireRestart = true;

logger.info("Setting property {} on NiFi Instance {}. This will require that the instance be restarted.", propertyName, rawInstance);
}

@Override
public void setProperties(final Map<String, String> properties) throws IOException {
rawInstance.setProperties(properties);
requireRestart = true;

logger.info("Setting multiple properties on NiFi Instance {}. This will require that the instance be restarted.", rawInstance);
}

@Override
public void quarantineTroubleshootingInfo(final File directory, final Throwable failureCause) throws IOException {
rawInstance.quarantineTroubleshootingInfo(directory, failureCause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@

public interface NiFiInstanceFactory {
NiFiInstance createInstance() throws IOException;

boolean isClusteredInstance();
}
Loading

0 comments on commit 3ef7a8f

Please sign in to comment.