diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java index 76b8ff4fbdac3..cf5ec93f3dc38 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; @@ -137,4 +138,23 @@ public interface TableViewBuilder { * @return the {@link TableViewBuilder} builder instance */ TableViewBuilder cryptoFailureAction(ConsumerCryptoFailureAction action); + + + /** + * Set the message listeners. + * If {@link TableViewBuilder#existingMessageListeners} are not specified, these listeners are used for both + * existing and tailing(future) messages in the topic. + * @param listeners message listeners + * @return the {@link TableViewBuilder} builder instance + */ + TableViewBuilder messageListeners(BiConsumer... listeners); + + /** + * Set the message listeners separately for existing messages in the topic. + * @param listeners message listeners + * @return + */ + TableViewBuilder existingMessageListeners(BiConsumer... listeners); + + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java index e0a47a70b1c8b..315369a6ef4c6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import lombok.NonNull; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; @@ -109,4 +110,16 @@ public TableViewBuilder cryptoFailureAction(ConsumerCryptoFailureAction actio conf.setCryptoFailureAction(action); return this; } + + @Override + public TableViewBuilder messageListeners(BiConsumer... messageListeners) { + conf.setMessageListeners(messageListeners); + return this; + } + + @Override + public TableViewBuilder existingMessageListeners(BiConsumer... existingMessageListeners) { + conf.setExistingMessageListeners(existingMessageListeners); + return this; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java index b27f8137b14a4..df8869814c854 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import java.io.Serializable; +import java.util.function.BiConsumer; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; @@ -37,6 +38,9 @@ public class TableViewConfigurationData implements Serializable, Cloneable { private CryptoKeyReader cryptoKeyReader = null; private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL; + private BiConsumer[] messageListeners; + private BiConsumer[] existingMessageListeners; + @Override public TableViewConfigurationData clone() { try { @@ -45,6 +49,8 @@ public TableViewConfigurationData clone() { clone.setAutoUpdatePartitionsSeconds(autoUpdatePartitionsSeconds); clone.setSubscriptionName(subscriptionName); clone.setTopicCompactionStrategyClassName(topicCompactionStrategyClassName); + clone.setMessageListeners(messageListeners); + clone.setExistingMessageListeners(existingMessageListeners); return clone; } catch (CloneNotSupportedException e) { throw new AssertionError(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java index d5d4174ee10a9..4cfa35e4d8a70 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java @@ -56,6 +56,10 @@ public class TableViewImpl implements TableView { private final CompletableFuture> reader; private final List> listeners; + + private final List> existingMessageListeners; + + private final ReentrantLock listenersMutex; private final boolean isPersistentTopic; private TopicCompactionStrategy compactionStrategy; @@ -86,6 +90,7 @@ public class TableViewImpl implements TableView { this.data = new ConcurrentHashMap<>(); this.immutableData = Collections.unmodifiableMap(data); this.listeners = new ArrayList<>(); + this.existingMessageListeners = new ArrayList<>(); this.listenersMutex = new ReentrantLock(); this.compactionStrategy = TopicCompactionStrategy.load(TABLE_VIEW_TAG, conf.getTopicCompactionStrategyClassName()); @@ -108,6 +113,17 @@ public class TableViewImpl implements TableView { } readerBuilder.cryptoFailureAction(conf.getCryptoFailureAction()); + if (conf.getExistingMessageListeners() != null && conf.getExistingMessageListeners().length > 0) { + for (BiConsumer listener : conf.getExistingMessageListeners()) { + existingMessageListeners.add(listener); + } + } + + if (conf.getMessageListeners() != null && conf.getMessageListeners().length > 0) { + for (BiConsumer listener : conf.getMessageListeners()) { + listeners.add(listener); + } + } this.reader = readerBuilder.createAsync(); } @@ -202,7 +218,7 @@ public void close() throws PulsarClientException { } } - private void handleMessage(Message msg) { + private void handleMessage(Message msg, boolean handleExistingMessage) { lastReadPositions.put(msg.getTopicName(), msg.getMessageId()); try { if (msg.hasKey()) { @@ -238,6 +254,10 @@ private void handleMessage(Message msg) { data.put(key, cur); } + List> listeners = + handleExistingMessage && !existingMessageListeners.isEmpty() ? + this.existingMessageListeners : this.listeners; + for (BiConsumer listener : listeners) { try { listener.accept(key, cur); @@ -357,7 +377,7 @@ private void readAllExistingMessages(Reader reader, CompletableFuture f messagesRead.incrementAndGet(); String topicName = msg.getTopicName(); MessageId messageId = msg.getMessageId(); - handleMessage(msg); + handleMessage(msg, true); if (!checkFreshTask(maxMessageIds, future, messageId, topicName)) { readAllExistingMessages(reader, future, startTime, messagesRead, maxMessageIds); @@ -389,7 +409,7 @@ private void readAllExistingMessages(Reader reader, CompletableFuture f private void readTailMessages(Reader reader) { reader.readNextAsync() .thenAccept(msg -> { - handleMessage(msg); + handleMessage(msg, false); readTailMessages(reader); }).exceptionally(ex -> { if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) {