Skip to content

Commit

Permalink
NIFI-11889 Added Record-oriented Transmission to PutTCP
Browse files Browse the repository at this point in the history
This closes apache#7554
Signed-off-by: Paul Grey <greyp@apache.org>
  • Loading branch information
exceptionfactory authored and greyp9 committed Aug 4, 2023
1 parent 63c72bd commit 5c577be
Show file tree
Hide file tree
Showing 4 changed files with 358 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,21 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor

public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
.name("Hostname")
.description("The ip address or hostname of the destination.")
.description("Destination hostname or IP address")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("localhost")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor PORT = new PropertyDescriptor
.Builder().name("Port")
.description("The port on the destination.")

public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
.name("Port")
.description("Destination port number")
.required(true)
.addValidator(StandardValidators.PORT_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();

public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Max Size of Socket Send Buffer")
.description("The maximum size of the socket send buffer that should be used. This is a suggestion to the Operating System " +
Expand All @@ -76,8 +78,9 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
.defaultValue("1 MB")
.required(true)
.build();
public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor
.Builder().name("Idle Connection Expiration")

public static final PropertyDescriptor IDLE_EXPIRATION = new PropertyDescriptor.Builder()
.name("Idle Connection Expiration")
.description("The amount of time a connection should be held open without being used before closing the connection. A value of 0 seconds will disable this feature.")
.required(true)
.defaultValue("15 seconds")
Expand All @@ -89,13 +92,14 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
// not added to the properties by default since not all processors may need them
public static final AllowableValue TCP_VALUE = new AllowableValue("TCP", "TCP");
public static final AllowableValue UDP_VALUE = new AllowableValue("UDP", "UDP");
public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
.Builder().name("Protocol")
public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor.Builder()
.name("Protocol")
.description("The protocol for communication.")
.required(true)
.allowableValues(TCP_VALUE, UDP_VALUE)
.defaultValue(TCP_VALUE.getValue())
.build();

public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
.name("Message Delimiter")
.description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. "
Expand All @@ -109,6 +113,7 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();

public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("Specifies the character set of the data being sent.")
Expand All @@ -117,6 +122,7 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();

public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
.name("Timeout")
.description("The timeout for connecting to and communicating with the destination. Does not apply to UDP")
Expand All @@ -125,6 +131,7 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();

public static final PropertyDescriptor OUTGOING_MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
.name("Outgoing Message Delimiter")
.description("Specifies the delimiter to use when sending messages out over the same TCP stream. The delimiter is appended to each FlowFile message "
Expand All @@ -135,17 +142,18 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();

public static final PropertyDescriptor CONNECTION_PER_FLOWFILE = new PropertyDescriptor.Builder()
.name("Connection Per FlowFile")
.description("Specifies whether to send each FlowFile's content on an individual connection.")
.required(true)
.defaultValue("false")
.allowableValues("true", "false")
.build();

public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " +
"messages will be sent over a secure connection.")
.description("Specifies the SSL Context Service to enable TLS socket communication")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
Expand All @@ -154,6 +162,7 @@ public abstract class AbstractPutEventProcessor<T> extends AbstractSessionFactor
.name("success")
.description("FlowFiles that are sent successfully to the destination are sent out this relationship.")
.build();

public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles that failed to send to the destination are sent out this relationship.")
Expand Down Expand Up @@ -414,14 +423,14 @@ public synchronized void completeSession() {
}

if (successfulRanges.isEmpty() && failedRanges.isEmpty()) {
getLogger().info("Completed processing {} but sent 0 FlowFiles", new Object[] {flowFile});
getLogger().info("Completed processing {} but sent 0 FlowFiles", flowFile);
session.transfer(flowFile, REL_SUCCESS);
session.commitAsync();
return;
}

if (successfulRanges.isEmpty()) {
getLogger().error("Failed to send {}; routing to 'failure'; last failure reason reported was {};", new Object[] {flowFile, lastFailureReason});
getLogger().error("Failed to send {}; routing to 'failure'; last failure reason reported was {};", flowFile, lastFailureReason);
final FlowFile penalizedFlowFile = session.penalize(flowFile);
session.transfer(penalizedFlowFile, REL_FAILURE);
session.commitAsync();
Expand All @@ -432,7 +441,7 @@ public synchronized void completeSession() {
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(completeTime - startTime);
session.getProvenanceReporter().send(flowFile, transitUri, "Sent " + successfulRanges.size() + " messages;", transferMillis);
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully sent {} messages for {} in {} millis", new Object[] {successfulRanges.size(), flowFile, transferMillis});
getLogger().info("Successfully sent {} messages for {} in {} millis", successfulRanges.size(), flowFile, transferMillis);
session.commitAsync();
return;
}
Expand All @@ -444,7 +453,7 @@ public synchronized void completeSession() {
transferRanges(failedRanges, REL_FAILURE);
session.remove(flowFile);
getLogger().error("Successfully sent {} messages, but failed to send {} messages; the last error received was {}",
new Object[] {successfulRanges.size(), failedRanges.size(), lastFailureReason});
successfulRanges.size(), failedRanges.size(), lastFailureReason);
session.commitAsync();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
Expand All @@ -33,33 +35,89 @@
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
import org.apache.nifi.processors.standard.property.TransmissionStrategy;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.StopWatch;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@CapabilityDescription("The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. "
+ "By default, the FlowFiles are transmitted over the same TCP connection (or pool of TCP connections if multiple input threads are configured). "
+ "To assist the TCP server with determining message boundaries, an optional \"Outgoing Message Delimiter\" string can be configured which is appended "
+ "to the end of each FlowFiles content when it is transmitted over the TCP connection. An optional \"Connection Per FlowFile\" parameter can be "
+ "specified to change the behaviour so that each FlowFiles content is transmitted over a single TCP connection which is opened when the FlowFile "
+ "is received and closed after the FlowFile has been sent. This option should only be used for low message volume scenarios, otherwise the platform " + "may run out of TCP sockets.")
@CapabilityDescription("Sends serialized FlowFiles or Records over TCP to a configurable destination with optional support for TLS")
@InputRequirement(Requirement.INPUT_REQUIRED)
@SeeAlso({ListenTCP.class, PutUDP.class})
@Tags({ "remote", "egress", "put", "tcp" })
@SupportsBatching
@WritesAttributes({
@WritesAttribute(attribute = PutTCP.RECORD_COUNT_TRANSMITTED, description = "Count of records transmitted to configured destination address")
})
public class PutTCP extends AbstractPutEventProcessor<InputStream> {

public static final String RECORD_COUNT_TRANSMITTED = "record.count.transmitted";

static final PropertyDescriptor TRANSMISSION_STRATEGY = new PropertyDescriptor.Builder()
.name("Transmission Strategy")
.displayName("Transmission Strategy")
.description("Specifies the strategy used for reading input FlowFiles and transmitting messages to the destination socket address")
.required(true)
.allowableValues(TransmissionStrategy.class)
.defaultValue(TransmissionStrategy.FLOWFILE_ORIENTED.getValue())
.build();

static final PropertyDescriptor DEPENDENT_CHARSET = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(CHARSET)
.dependsOn(TRANSMISSION_STRATEGY, TransmissionStrategy.FLOWFILE_ORIENTED.getValue())
.build();

static final PropertyDescriptor DEPENDENT_OUTGOING_MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(OUTGOING_MESSAGE_DELIMITER)
.dependsOn(TRANSMISSION_STRATEGY, TransmissionStrategy.FLOWFILE_ORIENTED.getValue())
.build();

static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("Record Reader")
.displayName("Record Reader")
.description("Specifies the Controller Service to use for reading Records from input FlowFiles")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.dependsOn(TRANSMISSION_STRATEGY, TransmissionStrategy.RECORD_ORIENTED.getValue())
.build();

static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("Record Writer")
.displayName("Record Writer")
.description("Specifies the Controller Service to use for writing Records to the configured socket address")
.identifiesControllerService(RecordSetWriterFactory.class)
.required(true)
.dependsOn(TRANSMISSION_STRATEGY, TransmissionStrategy.RECORD_ORIENTED.getValue())
.build();

private static final List<PropertyDescriptor> ADDITIONAL_PROPERTIES = Collections.unmodifiableList(Arrays.asList(
CONNECTION_PER_FLOWFILE,
SSL_CONTEXT_SERVICE,
TRANSMISSION_STRATEGY,
DEPENDENT_OUTGOING_MESSAGE_DELIMITER,
DEPENDENT_CHARSET,
RECORD_READER,
RECORD_WRITER
));

@Override
protected List<PropertyDescriptor> getAdditionalProperties() {
return Arrays.asList(CONNECTION_PER_FLOWFILE,
OUTGOING_MESSAGE_DELIMITER,
TIMEOUT,
SSL_CONTEXT_SERVICE,
CHARSET);
return ADDITIONAL_PROPERTIES;
}

@Override
Expand All @@ -70,22 +128,21 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
return;
}

final TransmissionStrategy transmissionStrategy = TransmissionStrategy.valueOf(context.getProperty(TRANSMISSION_STRATEGY).getValue());
final StopWatch stopWatch = new StopWatch(true);
try {
session.read(flowFile, inputStream -> {
InputStream inputStreamEvent = inputStream;
final int recordCount;
if (TransmissionStrategy.RECORD_ORIENTED == transmissionStrategy) {
recordCount = sendRecords(context, session, flowFile);

final String delimiter = getOutgoingMessageDelimiter(context, flowFile);
if (delimiter != null) {
final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
inputStreamEvent = new DelimitedInputStream(inputStream, delimiter.getBytes(charSet));
}

eventSender.sendEvent(inputStreamEvent);
});
} else {
sendFlowFile(context, session, flowFile);
recordCount = 0;
}

session.getProvenanceReporter().send(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
final FlowFile processedFlowFile = session.putAttribute(flowFile, RECORD_COUNT_TRANSMITTED, Integer.toString(recordCount));
session.getProvenanceReporter().send(processedFlowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(processedFlowFile, REL_SUCCESS);
session.commitAsync();
} catch (final Exception e) {
getLogger().error("Send Failed {}", flowFile, e);
Expand All @@ -104,4 +161,64 @@ protected String getProtocol(final ProcessContext context) {
protected NettyEventSenderFactory<InputStream> getNettyEventSenderFactory(final String hostname, final int port, final String protocol) {
return new StreamingNettyEventSenderFactory(getLogger(), hostname, port, TransportProtocol.TCP);
}

private void sendFlowFile(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
session.read(flowFile, inputStream -> {
InputStream inputStreamEvent = inputStream;

final String delimiter = getOutgoingMessageDelimiter(context, flowFile);
if (delimiter != null) {
final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
inputStreamEvent = new DelimitedInputStream(inputStream, delimiter.getBytes(charSet));
}

eventSender.sendEvent(inputStreamEvent);
});
}

private int sendRecords(final ProcessContext context, final ProcessSession session, final FlowFile flowFile) {
final AtomicInteger recordCount = new AtomicInteger();

final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);

session.read(flowFile, inputStream -> {
try (
RecordReader recordReader = readerFactory.createRecordReader(flowFile, inputStream, getLogger());
ReusableByteArrayInputStream eventInputStream = new ReusableByteArrayInputStream();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
RecordSetWriter recordSetWriter = writerFactory.createWriter(getLogger(), recordReader.getSchema(), outputStream, flowFile)
) {
Record record;
while ((record = recordReader.nextRecord()) != null) {
recordSetWriter.write(record);
recordSetWriter.flush();

final byte[] buffer = outputStream.toByteArray();
eventInputStream.setBuffer(buffer);
eventSender.sendEvent(eventInputStream);
outputStream.reset();

recordCount.getAndIncrement();
}
} catch (final SchemaNotFoundException | MalformedRecordException e) {
throw new IOException("Record reading failed", e);
}
});

return recordCount.get();
}

private static class ReusableByteArrayInputStream extends ByteArrayInputStream {

private ReusableByteArrayInputStream() {
super(new byte[0]);
}

private void setBuffer(final byte[] buffer) {
this.buf = buffer;
this.pos = 0;
this.count = buffer.length;
}
}
}
Loading

0 comments on commit 5c577be

Please sign in to comment.