Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for dynamic write in MqttIO #32470

Merged
merged 8 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
* Added support for Zstd codec in SerializableAvroCodecFactory (Java) ([#32349](https://github.com/apache/beam/issues/32349))
* Added support for using vLLM in the RunInference transform (Python) ([#32528](https://github.com/apache/beam/issues/32528))
* Significantly improved performance of Kafka IO reads that enable [commitOffsetsInFinalize](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--) by removing the data reshuffle from SDF implementation. ([#31682](https://github.com/apache/beam/pull/31682)).
* Added support for dynamic writing in MqttIO (Java) ([#19376](https://github.com/apache/beam/issues/19376))
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Breaking Changes

Expand Down
139 changes: 110 additions & 29 deletions sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -99,6 +101,26 @@
* "my_topic"))
*
* }</pre>
*
* <h3>Dynamic Writing to a MQTT Broker</h3>
*
* <p>MqttIO also supports dynamic writing to multiple topics based on the data. You can specify a
* function to determine the target topic for each message. The following example demonstrates how
* to configure dynamic topic writing:
*
* <pre>{@code
* pipeline
* .apply(...) // Provide PCollection<InputT>
* .apply(
* MqttIO.<InputT>dynamicWrite()
* .withConnectionConfiguration(
* MqttIO.ConnectionConfiguration.create("tcp://host:11883"))
* .withTopicFn(<Function to determine the topic dynamically>)
* .withPayloadFn(<Function to extract the payload>));
* }</pre>
*
* <p>This dynamic writing capability allows for more flexible MQTT message routing based on the
* message content, enabling scenarios where messages are directed to different topics.
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Expand All @@ -115,8 +137,16 @@ public static Read read() {
.build();
}

public static Write write() {
return new AutoValue_MqttIO_Write.Builder().setRetained(false).build();
public static Write<byte[]> write() {
return new AutoValue_MqttIO_Write.Builder<byte[]>()
.setRetained(false)
.setPayloadFn(SerializableFunctions.identity())
.setDynamic(false)
.build();
}

public static <InputT> Write<InputT> dynamicWrite() {
return new AutoValue_MqttIO_Write.Builder<InputT>().setRetained(false).setDynamic(true).build();
}

private MqttIO() {}
Expand All @@ -127,7 +157,7 @@ public abstract static class ConnectionConfiguration implements Serializable {

abstract String getServerUri();

abstract String getTopic();
abstract @Nullable String getTopic();

abstract @Nullable String getClientId();

Expand Down Expand Up @@ -169,6 +199,11 @@ public static ConnectionConfiguration create(String serverUri, String topic) {
.build();
}

public static ConnectionConfiguration create(String serverUri) {
checkArgument(serverUri != null, "serverUri can not be null");
return new AutoValue_MqttIO_ConnectionConfiguration.Builder().setServerUri(serverUri).build();
}

/** Set up the MQTT broker URI. */
public ConnectionConfiguration withServerUri(String serverUri) {
checkArgument(serverUri != null, "serverUri can not be null");
Expand Down Expand Up @@ -199,7 +234,7 @@ public ConnectionConfiguration withPassword(String password) {

private void populateDisplayData(DisplayData.Builder builder) {
builder.add(DisplayData.item("serverUri", getServerUri()));
builder.add(DisplayData.item("topic", getTopic()));
builder.addIfNotNull(DisplayData.item("topic", getTopic()));
builder.addIfNotNull(DisplayData.item("clientId", getClientId()));
builder.addIfNotNull(DisplayData.item("username", getUsername()));
}
Expand Down Expand Up @@ -278,6 +313,9 @@ public Read withMaxReadTime(Duration maxReadTime) {

@Override
public PCollection<byte[]> expand(PBegin input) {
checkArgument(connectionConfiguration() != null, "connectionConfiguration can not be null");
checkArgument(connectionConfiguration().getTopic() != null, "topic can not be null");

org.apache.beam.sdk.io.Read.Unbounded<byte[]> unbounded =
org.apache.beam.sdk.io.Read.from(new UnboundedMqttSource(this));

Expand Down Expand Up @@ -505,29 +543,50 @@ public UnboundedMqttSource getCurrentSource() {

/** A {@link PTransform} to write and send a message to a MQTT server. */
@AutoValue
public abstract static class Write extends PTransform<PCollection<byte[]>, PDone> {

public abstract static class Write<InputT> extends PTransform<PCollection<InputT>, PDone> {
abstract @Nullable ConnectionConfiguration connectionConfiguration();

abstract @Nullable SerializableFunction<InputT, String> topicFn();

abstract @Nullable SerializableFunction<InputT, byte[]> payloadFn();

abstract boolean dynamic();

abstract boolean retained();

abstract Builder builder();
abstract Builder<InputT> builder();

@AutoValue.Builder
abstract static class Builder {
abstract Builder setConnectionConfiguration(ConnectionConfiguration configuration);
abstract static class Builder<InputT> {
abstract Builder<InputT> setConnectionConfiguration(ConnectionConfiguration configuration);

abstract Builder<InputT> setRetained(boolean retained);

abstract Builder<InputT> setTopicFn(SerializableFunction<InputT, String> topicFn);

abstract Builder setRetained(boolean retained);
abstract Builder<InputT> setPayloadFn(SerializableFunction<InputT, byte[]> payloadFn);

abstract Write build();
abstract Builder<InputT> setDynamic(boolean dynamic);

abstract Write<InputT> build();
}

/** Define MQTT connection configuration used to connect to the MQTT broker. */
public Write withConnectionConfiguration(ConnectionConfiguration configuration) {
public Write<InputT> withConnectionConfiguration(ConnectionConfiguration configuration) {
checkArgument(configuration != null, "configuration can not be null");
return builder().setConnectionConfiguration(configuration).build();
}

public Write<InputT> withTopicFn(SerializableFunction<InputT, String> topicFn) {
checkArgument(dynamic(), "withTopicFn can not use in non-dynamic write");
return builder().setTopicFn(topicFn).build();
}

public Write<InputT> withPayloadFn(SerializableFunction<InputT, byte[]> payloadFn) {
checkArgument(dynamic(), "withPayloadFn can not use in non-dynamic write");
return builder().setPayloadFn(payloadFn).build();
}

/**
* Whether or not the publish message should be retained by the messaging engine. Sending a
* message with the retained set to {@code false} will clear the retained message from the
Expand All @@ -538,54 +597,76 @@ public Write withConnectionConfiguration(ConnectionConfiguration configuration)
* @param retained Whether or not the messaging engine should retain the message.
* @return The {@link Write} {@link PTransform} with the corresponding retained configuration.
*/
public Write withRetained(boolean retained) {
public Write<InputT> withRetained(boolean retained) {
return builder().setRetained(retained).build();
}

@Override
public PDone expand(PCollection<byte[]> input) {
input.apply(ParDo.of(new WriteFn(this)));
return PDone.in(input.getPipeline());
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
connectionConfiguration().populateDisplayData(builder);
builder.add(DisplayData.item("retained", retained()));
}

private static class WriteFn extends DoFn<byte[], Void> {
@Override
public PDone expand(PCollection<InputT> input) {
checkArgument(connectionConfiguration() != null, "connectionConfiguration can not be null");
if (dynamic()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since getTopic() is or not null corresponds to dynamic() being false and true, we can remove this property, and just use topic nullness to determine the scenario?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would also allow calls like MqttIO.write().withTopicPayloadFn(...), since they use the same Write class.
We don't know if the Write object was called from write() or dynamicWrite(). This is not enough to determine the scenario based on whether the topic is null.
The topic value can be modified externally, but the dynamic value cannot.
I would like to make sure that withTopicPayloadFn() and withPayloadFn() are only available when dynamic is true. What do you think? and please take a look a the following test code.

@Test
  public void testWriteWithTopicFn() {
    IllegalArgumentException exception =
        assertThrows(
            IllegalArgumentException.class, () -> MqttIO.write().withTopicFn(e -> "some topic"));

    assertEquals("withTopicFn can not use in non-dynamic write", exception.getMessage());
  }

  @Test
  public void testWriteWithPayloadFn() {
    final IllegalArgumentException exception =
        assertThrows(
            IllegalArgumentException.class, () -> MqttIO.write().withPayloadFn(e -> new byte[] {}));

    assertEquals("withPayloadFn can not use in non-dynamic write", exception.getMessage());
  }

checkArgument(
connectionConfiguration().getTopic() == null, "DynamicWrite can not have static topic");
checkArgument(topicFn() != null, "topicFn can not be null");
} else {
checkArgument(connectionConfiguration().getTopic() != null, "topic can not be null");
}
checkArgument(payloadFn() != null, "payloadFn can not be null");

input.apply(ParDo.of(new WriteFn<>(this)));
return PDone.in(input.getPipeline());
}

private static class WriteFn<InputT> extends DoFn<InputT, Void> {

private final Write spec;
twosom marked this conversation as resolved.
Show resolved Hide resolved
private final Write<InputT> spec;
private final SerializableFunction<InputT, String> topicFn;
private final SerializableFunction<InputT, byte[]> payloadFn;
private final boolean retained;

private transient MQTT client;
private transient BlockingConnection connection;

public WriteFn(Write spec) {
public WriteFn(Write<InputT> spec) {
this.spec = spec;
if (spec.dynamic()) {
this.topicFn = spec.topicFn();
} else {
String topic = spec.connectionConfiguration().getTopic();
this.topicFn = ignore -> topic;
}
this.payloadFn = spec.payloadFn();
this.retained = spec.retained();
}

@Setup
public void createMqttClient() throws Exception {
LOG.debug("Starting MQTT writer");
client = spec.connectionConfiguration().createClient();
this.client = this.spec.connectionConfiguration().createClient();
LOG.debug("MQTT writer client ID is {}", client.getClientId());
connection = createConnection(client);
this.connection = createConnection(client);
}

@ProcessElement
public void processElement(ProcessContext context) throws Exception {
byte[] payload = context.element();
InputT element = context.element();
byte[] payload = this.payloadFn.apply(element);
String topic = this.topicFn.apply(element);
LOG.debug("Sending message {}", new String(payload, StandardCharsets.UTF_8));
connection.publish(
spec.connectionConfiguration().getTopic(), payload, QoS.AT_LEAST_ONCE, false);
this.connection.publish(topic, payload, QoS.AT_LEAST_ONCE, this.retained);
}

@Teardown
public void closeMqttClient() throws Exception {
if (connection != null) {
if (this.connection != null) {
LOG.debug("Disconnecting MQTT connection (client ID {})", client.getClientId());
connection.disconnect();
this.connection.disconnect();
}
}
}
Expand Down
Loading
Loading