Skip to content

Commit

Permalink
NIFI-3065 Per Process Group logging (#7315)
Browse files Browse the repository at this point in the history
* NIFI-3065 Per Process Group logging
  • Loading branch information
timeabarna authored Jun 24, 2023
1 parent 7d6af0d commit ba6797b
Show file tree
Hide file tree
Showing 55 changed files with 1,002 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class VersionedProcessGroup extends VersionedComponent {
private Long defaultBackPressureObjectThreshold;
private String defaultBackPressureDataSizeThreshold;

private String logFileSuffix;


@ApiModelProperty("The child Process Groups")
public Set<VersionedProcessGroup> getProcessGroups() {
Expand Down Expand Up @@ -205,4 +207,13 @@ public String getDefaultBackPressureDataSizeThreshold() {
public void setDefaultBackPressureDataSizeThreshold(final String defaultBackPressureDataSizeThreshold) {
this.defaultBackPressureDataSizeThreshold = defaultBackPressureDataSizeThreshold;
}

@ApiModelProperty(value = "The log file suffix for this Process Group for dedicated logging.")
public String getLogFileSuffix() {
return logFileSuffix;
}

public void setLogFileSuffix(final String logFileSuffix) {
this.logFileSuffix = logFileSuffix;
}
}
6 changes: 6 additions & 0 deletions nifi-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ language governing permissions and limitations under the License. -->
<artifactId>slf4j-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-per-process-group-logging</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
Expand Down
1 change: 1 addition & 0 deletions nifi-assembly/src/main/assembly/common.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
<include>*:logback-core</include>
<include>*:nifi-api</include>
<include>*:nifi-property-protection-api</include>
<include>*:nifi-per-process-group-logging</include>
</includes>
</dependencySet>

Expand Down
31 changes: 31 additions & 0 deletions nifi-commons/nifi-per-process-group-logging/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-commons</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-per-process-group-logging</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.logging;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.sift.Discriminator;
import org.slf4j.event.KeyValuePair;

public class NifiDiscriminator implements Discriminator<ILoggingEvent> {
private static final String KEY = "logFileSuffix";

private boolean started;

@Override
public String getDiscriminatingValue(final ILoggingEvent iLoggingEvent) {
for (KeyValuePair keyValuePair : iLoggingEvent.getKeyValuePairs()) {
if (keyValuePair.key.equals(getKey())) {
return keyValuePair.value.toString();
}
}
return null;
}

@Override
public String getKey() {
return KEY;
}

@Override
public void start() {
started = true;
}

@Override
public void stop() {
started = false;
}

@Override
public boolean isStarted() {
return started;
}
}
1 change: 1 addition & 0 deletions nifi-commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<module>nifi-logging-utils</module>
<module>nifi-metrics</module>
<module>nifi-parameter</module>
<module>nifi-per-process-group-logging</module>
<module>nifi-property-encryptor</module>
<module>nifi-property-utils</module>
<module>nifi-properties</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class ProcessGroupDTO extends ComponentDTO {
private String defaultFlowFileExpiration;
private Long defaultBackPressureObjectThreshold;
private String defaultBackPressureDataSizeThreshold;
private String logFileSuffix;

private Integer runningCount;
private Integer stoppedCount;
Expand Down Expand Up @@ -403,4 +404,13 @@ public String getDefaultBackPressureDataSizeThreshold() {
public void setDefaultBackPressureDataSizeThreshold(final String defaultBackPressureDataSizeThreshold) {
this.defaultBackPressureDataSizeThreshold = defaultBackPressureDataSizeThreshold;
}

@ApiModelProperty(value = "The log file suffix for this Process Group for dedicated logging.")
public String getLogFileSuffix() {
return logFileSuffix;
}

public void setLogFileSuffix(final String logFileSuffix) {
this.logFileSuffix = logFileSuffix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.InstanceClassLoader;
import org.apache.nifi.nar.NarCloseable;
Expand Down Expand Up @@ -1572,7 +1573,7 @@ private void run(ScheduledExecutorService taskScheduler, long administrativeYiel
SchedulingAgentCallback schedulingAgentCallback, boolean failIfStopping, ScheduledState desiredState, ScheduledState scheduledState) {

final Processor processor = processorRef.get().getProcessor();
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor, new StandardLoggingContext(StandardProcessorNode.this));
LOG.info("Starting {}", this);

ScheduledState currentState;
Expand Down Expand Up @@ -1714,7 +1715,7 @@ private void initiateStart(final ScheduledExecutorService taskScheduler, final l
AtomicLong startupAttemptCount, final Supplier<ProcessContext> processContextFactory, final SchedulingAgentCallback schedulingAgentCallback) {

final Processor processor = getProcessor();
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor, new StandardLoggingContext(StandardProcessorNode.this));

// Completion Timestamp is set to MAX_VALUE because we don't want to timeout until the task has a chance to run.
final AtomicLong completionTimestampRef = new AtomicLong(Long.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.util.CharacterFilterUtils;
import org.apache.nifi.util.FormatUtils;
Expand Down Expand Up @@ -596,7 +597,8 @@ public void run() {

enablingAttemptCount.incrementAndGet();
if (enablingAttemptCount.get() == 120 || enablingAttemptCount.get() % 3600 == 0) {
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this,
new StandardLoggingContext(StandardControllerServiceNode.this));
componentLog.error("Encountering difficulty enabling. (Validation State is {}: {}). Will continue trying to enable.",
validationState, validationState.getValidationErrors());
}
Expand Down Expand Up @@ -635,7 +637,8 @@ public void run() {
future.completeExceptionally(e);

final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this,
new StandardLoggingContext(StandardControllerServiceNode.this));
componentLog.error("Failed to invoke @OnEnabled method", cause);
invokeDisable(configContext);

Expand Down Expand Up @@ -717,7 +720,7 @@ private void invokeDisable(ConfigurationContext configContext) {
LOG.debug("Successfully disabled {}", this);
} catch (Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this, new StandardLoggingContext(StandardControllerServiceNode.this));
componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
LOG.error("Failed to invoke @OnDisabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.logging.StandardLoggingContext;

import java.io.IOException;
import java.util.Map;
Expand Down Expand Up @@ -52,7 +53,7 @@ private ComponentLog getLogger(final String componentId) {
final LogRepository repo = LogRepositoryFactory.getRepository(componentId);
final ComponentLog logger = (repo == null) ? null : repo.getLogger();
if (repo == null || logger == null) {
return new SimpleProcessLogger(componentId, this);
return new SimpleProcessLogger(componentId, this, new StandardLoggingContext(null));
}

return logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.nifi.parameter.ParameterParser;
import org.apache.nifi.parameter.ParameterTokenList;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.processor.StandardValidationContext;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.security.util.SslContextFactory;
Expand Down Expand Up @@ -341,7 +342,7 @@ private static StateProvider createStateProvider(
propertyMap.put(descriptor, new StandardPropertyValue(resourceContext, entry.getValue(),null, parameterLookup, variableRegistry));
}

final ComponentLog logger = new SimpleProcessLogger(providerConfig.getId(), provider);
final ComponentLog logger = new SimpleProcessLogger(providerConfig.getId(), provider, new StandardLoggingContext(null));
final StateProviderInitializationContext initContext = new StandardStateProviderInitializationContext(providerConfig.getId(), propertyMap, sslContext, logger);

synchronized (provider) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@ private void synchronize(final ProcessGroup group, final VersionedProcessGroup p
group.setDefaultBackPressureObjectThreshold(proposed.getDefaultBackPressureObjectThreshold());
group.setDefaultBackPressureDataSizeThreshold(proposed.getDefaultBackPressureDataSizeThreshold());

if (group.getLogFileSuffix() == null || group.getLogFileSuffix().isEmpty()) {
group.setLogFileSuffix(proposed.getLogFileSuffix());
}

final VersionedFlowCoordinates remoteCoordinates = proposed.getVersionedFlowCoordinates();
if (remoteCoordinates == null) {
group.disconnectVersionControl(false);
Expand Down Expand Up @@ -1795,6 +1799,7 @@ public void synchronizeProcessGroupSettings(final ProcessGroup processGroup, fin
groupToUpdate.setComments(proposed.getComments());
groupToUpdate.setName(proposed.getName());
groupToUpdate.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
groupToUpdate.setLogFileSuffix(proposed.getLogFileSuffix());

if (processGroup == null) {
LOG.info("Successfully synchronized {} by adding it to the flow", groupToUpdate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ public final class StandardProcessGroup implements ProcessGroup {
private static final String DEFAULT_FLOWFILE_EXPIRATION = "0 sec";
private static final long DEFAULT_BACKPRESSURE_OBJECT = 10_000L;
private static final String DEFAULT_BACKPRESSURE_DATA_SIZE = "1 GB";
private static final Pattern INVALID_DIRECTORY_NAME_CHARACTERS = Pattern.compile("[\\s\\<\\>:\\'\\\"\\/\\\\\\|\\?\\*]");
private volatile String logFileSuffix;


public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler,
Expand Down Expand Up @@ -244,6 +246,7 @@ public StandardProcessGroup(final String id, final ControllerServiceProvider ser
this.defaultFlowFileExpiration = new AtomicReference<>();
this.defaultBackPressureObjectThreshold = new AtomicReference<>();
this.defaultBackPressureDataSizeThreshold = new AtomicReference<>();
this.logFileSuffix = null;

// save only the nifi properties needed, and account for the possibility those properties are missing
if (nifiProperties == null) {
Expand Down Expand Up @@ -4422,6 +4425,20 @@ public QueueSize getQueueSize() {
return new QueueSize(count, contentSize);
}

@Override
public String getLogFileSuffix() {
return logFileSuffix;
}

@Override
public void setLogFileSuffix(final String logFileSuffix) {
if (logFileSuffix != null && INVALID_DIRECTORY_NAME_CHARACTERS.matcher(logFileSuffix).find()) {
throw new IllegalArgumentException("Log file suffix can not contain the following characters: space, <, >, :, \', \", /, \\, |, ?, *");
} else {
this.logFileSuffix = logFileSuffix;
}
}

@Override
public String getDefaultBackPressureDataSizeThreshold() {
// Use value in this object if it has been set. Otherwise, inherit from parent group; if at root group, obtain from nifi properties.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.logging;

import java.util.Optional;

public interface LoggingContext {
/**
* @return the log file name suffix. This will be the discriminating value for the dedicated logging.
*/
Optional<String> getLogFileSuffix();

/**
* @return The key under which the discriminating value should be exported into the host environment.
*/
String getDiscriminatorKey();
}
Loading

0 comments on commit ba6797b

Please sign in to comment.