Skip to content

Commit

Permalink
[improve][client] Add messageListeners and existingMessageListeners i…
Browse files Browse the repository at this point in the history
…n TableViewBuilder
  • Loading branch information
heesung-sn committed Aug 12, 2024
1 parent 06a2f5c commit f9529df
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;

Expand Down Expand Up @@ -137,4 +138,23 @@ public interface TableViewBuilder<T> {
* @return the {@link TableViewBuilder} builder instance
*/
TableViewBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action);


/**
* Set the message listeners.
* If {@link TableViewBuilder#existingMessageListeners} are not specified, these listeners are used for both
* existing and tailing(future) messages in the topic.
* @param listeners message listeners
* @return the {@link TableViewBuilder} builder instance
*/
TableViewBuilder<T> messageListeners(BiConsumer<String, T>... listeners);

/**
* Set the message listeners separately for existing messages in the topic.
* @param listeners message listeners
* @return
*/
TableViewBuilder<T> existingMessageListeners(BiConsumer<String, T>... listeners);


}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
Expand Down Expand Up @@ -109,4 +110,16 @@ public TableViewBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction actio
conf.setCryptoFailureAction(action);
return this;
}

@Override
public TableViewBuilder<T> messageListeners(BiConsumer<String, T>... messageListeners) {
conf.setMessageListeners(messageListeners);
return this;
}

@Override
public TableViewBuilder<T> existingMessageListeners(BiConsumer<String, T>... existingMessageListeners) {
conf.setExistingMessageListeners(existingMessageListeners);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;

import java.io.Serializable;
import java.util.function.BiConsumer;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
Expand All @@ -37,6 +38,9 @@ public class TableViewConfigurationData implements Serializable, Cloneable {
private CryptoKeyReader cryptoKeyReader = null;
private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;

private BiConsumer[] messageListeners;
private BiConsumer[] existingMessageListeners;

@Override
public TableViewConfigurationData clone() {
try {
Expand All @@ -45,6 +49,8 @@ public TableViewConfigurationData clone() {
clone.setAutoUpdatePartitionsSeconds(autoUpdatePartitionsSeconds);
clone.setSubscriptionName(subscriptionName);
clone.setTopicCompactionStrategyClassName(topicCompactionStrategyClassName);
clone.setMessageListeners(messageListeners);
clone.setExistingMessageListeners(existingMessageListeners);
return clone;
} catch (CloneNotSupportedException e) {
throw new AssertionError();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public class TableViewImpl<T> implements TableView<T> {
private final CompletableFuture<Reader<T>> reader;

private final List<BiConsumer<String, T>> listeners;

private final List<BiConsumer<String, T>> existingMessageListeners;


private final ReentrantLock listenersMutex;
private final boolean isPersistentTopic;
private TopicCompactionStrategy<T> compactionStrategy;
Expand Down Expand Up @@ -86,6 +90,7 @@ public class TableViewImpl<T> implements TableView<T> {
this.data = new ConcurrentHashMap<>();
this.immutableData = Collections.unmodifiableMap(data);
this.listeners = new ArrayList<>();
this.existingMessageListeners = new ArrayList<>();
this.listenersMutex = new ReentrantLock();
this.compactionStrategy =
TopicCompactionStrategy.load(TABLE_VIEW_TAG, conf.getTopicCompactionStrategyClassName());
Expand All @@ -108,6 +113,17 @@ public class TableViewImpl<T> implements TableView<T> {
}

readerBuilder.cryptoFailureAction(conf.getCryptoFailureAction());
if (conf.getExistingMessageListeners() != null && conf.getExistingMessageListeners().length > 0) {
for (BiConsumer<String, T> listener : conf.getExistingMessageListeners()) {
existingMessageListeners.add(listener);
}
}

if (conf.getMessageListeners() != null && conf.getMessageListeners().length > 0) {
for (BiConsumer<String, T> listener : conf.getMessageListeners()) {
listeners.add(listener);
}
}

this.reader = readerBuilder.createAsync();
}
Expand Down Expand Up @@ -202,7 +218,7 @@ public void close() throws PulsarClientException {
}
}

private void handleMessage(Message<T> msg) {
private void handleMessage(Message<T> msg, boolean handleExistingMessage) {
lastReadPositions.put(msg.getTopicName(), msg.getMessageId());
try {
if (msg.hasKey()) {
Expand Down Expand Up @@ -238,6 +254,10 @@ private void handleMessage(Message<T> msg) {
data.put(key, cur);
}

List<BiConsumer<String, T>> listeners =
handleExistingMessage && !existingMessageListeners.isEmpty() ?
this.existingMessageListeners : this.listeners;

for (BiConsumer<String, T> listener : listeners) {
try {
listener.accept(key, cur);
Expand Down Expand Up @@ -357,7 +377,7 @@ private void readAllExistingMessages(Reader<T> reader, CompletableFuture<Void> f
messagesRead.incrementAndGet();
String topicName = msg.getTopicName();
MessageId messageId = msg.getMessageId();
handleMessage(msg);
handleMessage(msg, true);
if (!checkFreshTask(maxMessageIds, future, messageId, topicName)) {
readAllExistingMessages(reader, future, startTime,
messagesRead, maxMessageIds);
Expand Down Expand Up @@ -389,7 +409,7 @@ private void readAllExistingMessages(Reader<T> reader, CompletableFuture<Void> f
private void readTailMessages(Reader<T> reader) {
reader.readNextAsync()
.thenAccept(msg -> {
handleMessage(msg);
handleMessage(msg, false);
readTailMessages(reader);
}).exceptionally(ex -> {
if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) {
Expand Down

0 comments on commit f9529df

Please sign in to comment.