Skip to content

Commit

Permalink
Solace Read connector: data classes and mapper (#31637)
Browse files Browse the repository at this point in the history
* Add data classes

* Remove obvious tests of AutoValue classes

* Remove @DefaultSchema and @SchemaFieldNumber annotations
  • Loading branch information
bzablocki authored Jun 21, 2024
1 parent c6c3fd0 commit 3914ad0
Show file tree
Hide file tree
Showing 2 changed files with 302 additions and 18 deletions.
1 change: 1 addition & 0 deletions sdks/java/io/solace/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ dependencies {
implementation library.java.joda_time
implementation library.java.solace
implementation project(":sdks:java:extensions:avro")
implementation library.java.vendored_grpc_1_60_1
implementation library.java.avro
permitUnusedDeclared library.java.avro
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,23 @@
*/
package org.apache.beam.sdk.io.solace.data;

import com.google.auto.value.AutoValue;
import com.solacesystems.jcsmp.BytesXMLMessage;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A record to be written to a Solace topic.
*
* <p>You need to transform to {@link Solace.Record} to be able to write to Solace. For that, you
* can use the {@link Solace.Record.Builder} provided with this class.
*
* <p>For instance, to create a record, use the following code:
*
* <pre>{@code
* Solace.Record record = Solace.Record.builder()
* .setMessageId(messageId)
* .setSenderTimestamp(timestampMillis)
* .setPayload(payload)
* .build();
* }</pre>
*
* Setting the message id and the timestamp is mandatory.
* Provides core data models and utilities for working with Solace messages in the context of Apache
* Beam pipelines. This class includes representations for Solace topics, queues, destinations, and
* message records, as well as a utility for converting Solace messages into Beam-compatible
* records.
*/
public class Solace {

/** Represents a Solace queue. */
public static class Queue {
private final String name;

Expand All @@ -52,7 +49,7 @@ public String getName() {
return name;
}
}

/** Represents a Solace topic. */
public static class Topic {
private final String name;

Expand All @@ -68,4 +65,290 @@ public String getName() {
return name;
}
}
/** Represents a Solace destination type. */
public enum DestinationType {
TOPIC,
QUEUE,
UNKNOWN
}

/** Represents a Solace message destination (either a Topic or a Queue). */
@AutoValue
public abstract static class Destination {
/**
* Gets the name of the destination.
*
* @return The destination name.
*/
public abstract String getName();

/**
* Gets the type of the destination (TOPIC, QUEUE or UNKNOWN).
*
* @return The destination type.
*/
public abstract DestinationType getType();

static Builder builder() {
return new AutoValue_Solace_Destination.Builder();
}

@AutoValue.Builder
abstract static class Builder {
abstract Builder setName(String name);

abstract Builder setType(DestinationType type);

abstract Destination build();
}
}

/** Represents a Solace message record with its associated metadata. */
@AutoValue
public abstract static class Record {
/**
* Gets the unique identifier of the message, a string for an application-specific message
* identifier.
*
* <p>Mapped from {@link BytesXMLMessage#getApplicationMessageId()}
*
* @return The message ID, or null if not available.
*/
public abstract @Nullable String getMessageId();

/**
* Gets the payload of the message as a ByteString.
*
* <p>Mapped from {@link BytesXMLMessage#getBytes()}
*
* @return The message payload.
*/
public abstract ByteString getPayload();
/**
* Gets the destination (topic or queue) to which the message was sent.
*
* <p>Mapped from {@link BytesXMLMessage#getDestination()}
*
* @return The destination, or null if not available.
*/
public abstract @Nullable Destination getDestination();

/**
* Gets the message expiration time in milliseconds since the Unix epoch.
*
* <p>A value of 0 indicates the message does not expire.
*
* <p>Mapped from {@link BytesXMLMessage#getExpiration()}
*
* @return The expiration timestamp.
*/
public abstract long getExpiration();

/**
* Gets the priority level of the message (0-255, higher is more important). -1 if not set.
*
* <p>Mapped from {@link BytesXMLMessage#getPriority()}
*
* @return The message priority.
*/
public abstract int getPriority();

/**
* Indicates whether the message has been redelivered due to a prior delivery failure.
*
* <p>Mapped from {@link BytesXMLMessage#getRedelivered()}
*
* @return True if redelivered, false otherwise.
*/
public abstract boolean getRedelivered();

/**
* Gets the destination to which replies to this message should be sent.
*
* <p>Mapped from {@link BytesXMLMessage#getReplyTo()}
*
* @return The reply-to destination, or null if not specified.
*/
public abstract @Nullable Destination getReplyTo();

/**
* Gets the timestamp (in milliseconds since the Unix epoch) when the message was received by
* the Solace broker.
*
* <p>Mapped from {@link BytesXMLMessage#getReceiveTimestamp()}
*
* @return The timestamp.
*/
public abstract long getReceiveTimestamp();

/**
* Gets the timestamp (in milliseconds since the Unix epoch) when the message was sent by the
* sender. Can be null if not provided.
*
* @return The sender timestamp, or null if not available.
*/
public abstract @Nullable Long getSenderTimestamp();

/**
* Gets the sequence number of the message (if applicable).
*
* <p>Mapped from {@link BytesXMLMessage#getSequenceNumber()}
*
* @return The sequence number, or null if not available.
*/
public abstract @Nullable Long getSequenceNumber();

/**
* The number of milliseconds before the message is discarded or moved to Dead Message Queue. A
* value of 0 means the message will never expire. The default value is 0.
*
* <p>Mapped from {@link BytesXMLMessage#getTimeToLive()}
*
* @return The time-to-live value.
*/
public abstract long getTimeToLive();

/**
* Gets the ID for the message within its replication group (if applicable).
*
* <p>Mapped from {@link BytesXMLMessage#getReplicationGroupMessageId()}
*
* <p>The ID for a particular message is only guaranteed to be the same for a particular copy of
* a message on a particular queue or topic endpoint within a replication group. The same
* message on different queues or topic endpoints within the same replication group may or may
* not have the same replication group message ID. See more at <a
* href="https://docs.solace.com/API/API-Developer-Guide/Detecting-Duplicate-Mess.htm">https://docs.solace.com/API/API-Developer-Guide/Detecting-Duplicate-Mess.htm</a>
*
* @return The replication group message ID, or null if not present.
*/
public abstract @Nullable String getReplicationGroupMessageId();
/**
* Gets the attachment data of the message as a ByteString, if any. This might represent files
* or other binary content associated with the message.
*
* <p>Mapped from {@link BytesXMLMessage#getAttachmentByteBuffer()}
*
* @return The attachment data, or an empty ByteString if no attachment is present.
*/
public abstract ByteString getAttachmentBytes();

static Builder builder() {
return new AutoValue_Solace_Record.Builder();
}

@AutoValue.Builder
abstract static class Builder {
abstract Builder setMessageId(@Nullable String messageId);

abstract Builder setPayload(ByteString payload);

abstract Builder setDestination(@Nullable Destination destination);

abstract Builder setExpiration(long expiration);

abstract Builder setPriority(int priority);

abstract Builder setRedelivered(boolean redelivered);

abstract Builder setReplyTo(@Nullable Destination replyTo);

abstract Builder setReceiveTimestamp(long receiveTimestamp);

abstract Builder setSenderTimestamp(@Nullable Long senderTimestamp);

abstract Builder setSequenceNumber(@Nullable Long sequenceNumber);

abstract Builder setTimeToLive(long timeToLive);

abstract Builder setReplicationGroupMessageId(@Nullable String replicationGroupMessageId);

abstract Builder setAttachmentBytes(ByteString attachmentBytes);

abstract Record build();
}
}
/**
* A utility class for mapping {@link BytesXMLMessage} instances to {@link Solace.Record} objects.
* This simplifies the process of converting raw Solace messages into a format suitable for use
* within Apache Beam pipelines.
*/
public static class SolaceRecordMapper {
private static final Logger LOG = LoggerFactory.getLogger(SolaceRecordMapper.class);
/**
* Maps a {@link BytesXMLMessage} (if not null) to a {@link Solace.Record}.
*
* <p>Extracts relevant information from the message, including payload, metadata, and
* destination details.
*
* @param msg The Solace message to map.
* @return A Solace Record representing the message, or null if the input message was null.
*/
public static @Nullable Record map(@Nullable BytesXMLMessage msg) {
if (msg == null) {
return null;
}

ByteArrayOutputStream payloadBytesStream = new ByteArrayOutputStream();
if (msg.getContentLength() != 0) {
try {
payloadBytesStream.write(msg.getBytes());
} catch (IOException e) {
LOG.error("Could not write bytes from the BytesXMLMessage to the Solace.record.", e);
}
}

ByteArrayOutputStream attachmentBytesStream = new ByteArrayOutputStream();
if (msg.getAttachmentContentLength() != 0) {
try {
attachmentBytesStream.write(msg.getAttachmentByteBuffer().array());
} catch (IOException e) {
LOG.error(
"Could not AttachmentByteBuffer from the BytesXMLMessage to the Solace.record.", e);
}
}

Destination replyTo = getDestination(msg.getCorrelationId(), msg.getReplyTo());
Destination destination = getDestination(msg.getCorrelationId(), msg.getDestination());

return Record.builder()
.setMessageId(msg.getApplicationMessageId())
.setPayload(ByteString.copyFrom(payloadBytesStream.toByteArray()))
.setDestination(destination)
.setExpiration(msg.getExpiration())
.setPriority(msg.getPriority())
.setRedelivered(msg.getRedelivered())
.setReplyTo(replyTo)
.setReceiveTimestamp(msg.getReceiveTimestamp())
.setSenderTimestamp(msg.getSenderTimestamp())
.setSequenceNumber(msg.getSequenceNumber())
.setTimeToLive(msg.getTimeToLive())
.setReplicationGroupMessageId(
msg.getReplicationGroupMessageId() != null
? msg.getReplicationGroupMessageId().toString()
: null)
.setAttachmentBytes(ByteString.copyFrom(attachmentBytesStream.toByteArray()))
.build();
}

private static @Nullable Destination getDestination(
String msgId, com.solacesystems.jcsmp.Destination originalDestinationField) {
if (originalDestinationField == null) {
return null;
}
Destination.Builder destinationBuilder =
Destination.builder().setName(originalDestinationField.getName());
if (originalDestinationField instanceof com.solacesystems.jcsmp.Topic) {
destinationBuilder.setType(DestinationType.TOPIC);
} else if (originalDestinationField instanceof com.solacesystems.jcsmp.Queue) {
destinationBuilder.setType(DestinationType.QUEUE);
} else {
LOG.error(
"SolaceIO: Unknown destination type type for message {}, setting to {}",
msgId,
DestinationType.UNKNOWN.name());
destinationBuilder.setType(DestinationType.UNKNOWN);
}
return destinationBuilder.build();
}
}
}

0 comments on commit 3914ad0

Please sign in to comment.