diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 883bf5553e307..5752034a79a76 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -265,7 +265,8 @@ public SendMessageInfo sendMessages(final List entries, SendListener list if (i == (entries.size() - 1)) { promise = writePromise; } - ctx.write(Commands.newMessage(consumerId, messageId, metadataAndPayload), promise); + int redeliveryCount = subscription.getDispatcher().getRedeliveryTracker().getRedeliveryCount(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())); + ctx.write(Commands.newMessage(consumerId, messageId, redeliveryCount, metadataAndPayload), promise); messageId.recycle(); messageIdBuilder.recycle(); entry.release(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java index 0d7b7d6e55dab..43f65cd73b985 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java @@ -70,4 +70,6 @@ public interface Dispatcher { void addUnAckedMessages(int unAckMessages); + RedeliveryTracker getRedeliveryTracker(); + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java new file mode 100644 index 0000000000000..99b38ccf2fb76 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import org.apache.bookkeeper.mledger.Position; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +public class InMemoryRedeliveryTracker implements RedeliveryTracker { + + private ConcurrentHashMap trackerCache = new ConcurrentHashMap<>(16); + + @Override + public int incrementAndGetRedeliveryCount(Position position) { + trackerCache.putIfAbsent(position, new AtomicInteger(0)); + return trackerCache.get(position).incrementAndGet(); + } + + @Override + public int getRedeliveryCount(Position position) { + return trackerCache.getOrDefault(position, new AtomicInteger(0)).get(); + } + + @Override + public void remove(Position position) { + trackerCache.remove(position); + } + + @Override + public void removeBatch(List positions) { + if (positions != null) { + positions.forEach(this::remove); + } + } + + @Override + public void clear() { + trackerCache.clear(); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java new file mode 100644 index 0000000000000..0f2e54a542824 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import org.apache.bookkeeper.mledger.Position; + +import java.util.List; + +public interface RedeliveryTracker { + + int incrementAndGetRedeliveryCount(Position position); + + int getRedeliveryCount(Position position); + + void remove(Position position); + + void removeBatch(List positions); + + void clear(); +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java new file mode 100644 index 0000000000000..521417f53544a --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import org.apache.bookkeeper.mledger.Position; + +import java.util.List; + +public class RedeliveryTrackerDisabled implements RedeliveryTracker { + + public static final RedeliveryTrackerDisabled REDELIVERY_TRACKER_DISABLED = new RedeliveryTrackerDisabled(); + + private RedeliveryTrackerDisabled() {} + + @Override + public int incrementAndGetRedeliveryCount(Position position) { + return 0; + } + + @Override + public int getRedeliveryCount(Position position) { + return 0; + } + + @Override + public void remove(Position position) { + // no-op + } + + @Override + public void removeBatch(List positions) { + // no-op + } + + @Override + public void clear() { + // no-op + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java index f40564bf69c0f..2067a80d8efc9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java @@ -31,6 +31,8 @@ import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.RedeliveryTracker; +import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.utils.CopyOnWriteArrayList; @@ -54,6 +56,7 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher private volatile int totalAvailablePermits = 0; private final ServiceConfiguration serviceConfig; + private final RedeliveryTracker redeliveryTracker; public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription) { this.topic = topic; @@ -61,6 +64,7 @@ public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, Subscr this.name = topic.getName() + " / " + subscription.getName(); this.msgDrop = new Rate(); this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration(); + this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED; } @Override @@ -178,6 +182,11 @@ public SubType getType() { return SubType.Shared; } + @Override + public RedeliveryTracker getRedeliveryTracker() { + return redeliveryTracker; + } + @Override public void sendMessages(List entries) { Consumer consumer = TOTAL_AVAILABLE_PERMITS_UPDATER.get(this) > 0 ? getNextConsumer() : null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java index 2083cc7864fe7..787fb00a940ed 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java @@ -29,6 +29,8 @@ import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.RedeliveryTracker; +import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.TopicName; @@ -40,6 +42,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD private final Rate msgDrop; private final Subscription subscription; private final ServiceConfiguration serviceConfig; + private final RedeliveryTracker redeliveryTracker; public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex, NonPersistentTopic topic, Subscription subscription) { @@ -48,6 +51,7 @@ public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int this.subscription = subscription; this.msgDrop = new Rate(); this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration(); + this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED; } @Override @@ -117,6 +121,11 @@ public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) { // No-op } + @Override + public RedeliveryTracker getRedeliveryTracker() { + return redeliveryTracker; + } + @Override protected void scheduleReadOnActiveConsumer() { // No-op diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 17c5db7a6a819..5198a139ec536 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -44,6 +44,8 @@ import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Consumer.SendMessageInfo; import org.apache.pulsar.broker.service.Dispatcher; +import org.apache.pulsar.broker.service.RedeliveryTracker; +import org.apache.pulsar.broker.service.InMemoryRedeliveryTracker; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.TopicName; @@ -69,6 +71,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu private CompletableFuture closeFuture = null; private ConcurrentLongPairSet messagesToReplay; + private final RedeliveryTracker redeliveryTracker; private boolean havePendingRead = false; private boolean havePendingReplayRead = false; @@ -97,6 +100,7 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso this.name = topic.getName() + " / " + Codec.decode(cursor.getName()); this.topic = topic; this.messagesToReplay = new ConcurrentLongPairSet(512, 2); + this.redeliveryTracker = new InMemoryRedeliveryTracker(); this.readBatchSize = MaxReadBatchSize; this.maxUnackedMessages = topic.getBrokerService().pulsar().getConfiguration() .getMaxUnackedMessagesPerSubscription(); @@ -556,7 +560,10 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { @Override public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List positions) { - positions.forEach(position -> messagesToReplay.add(position.getLedgerId(), position.getEntryId())); + positions.forEach(position -> { + messagesToReplay.add(position.getLedgerId(), position.getEntryId()); + redeliveryTracker.incrementAndGetRedeliveryCount(position); + }); if (log.isDebugEnabled()) { log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, positions); } @@ -624,5 +631,10 @@ public DispatchRateLimiter getDispatchRateLimiter() { return dispatchRateLimiter; } + @Override + public RedeliveryTracker getRedeliveryTracker() { + return redeliveryTracker; + } + private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 9ab7b877c0815..89dfb473313f3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -39,6 +39,8 @@ import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Dispatcher; +import org.apache.pulsar.broker.service.RedeliveryTracker; +import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.TopicName; @@ -62,6 +64,8 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp private final ServiceConfiguration serviceConfig; private ScheduledFuture readOnActiveConsumerTask = null; + private final RedeliveryTracker redeliveryTracker; + public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex, PersistentTopic topic) { super(subscriptionType, partitionIndex, topic.getName()); @@ -72,6 +76,7 @@ public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType su this.readBatchSize = MaxReadBatchSize; this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration(); this.dispatchRateLimiter = null; + this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED; } protected void scheduleReadOnActiveConsumer() { @@ -307,6 +312,7 @@ private synchronized void internalRedeliverUnacknowledgedMessages(Consumer consu @Override public void redeliverUnacknowledgedMessages(Consumer consumer, List positions) { // We cannot redeliver single messages to single consumers to preserve ordering. + positions.forEach(redeliveryTracker::incrementAndGetRedeliveryCount); redeliverUnacknowledgedMessages(consumer); } @@ -485,5 +491,10 @@ public void addUnAckedMessages(int unAckMessages) { // No-op } + @Override + public RedeliveryTracker getRedeliveryTracker() { + return redeliveryTracker; + } + private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index dac9f055d704c..669c6d41d457b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -195,6 +195,7 @@ public void acknowledgeMessage(List positions, AckType ackType, Map closeAsync() { } @Override - void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientCnx cnx) { + void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf headersAndPayload, ClientCnx cnx) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Received raw message: {}/{}/{}", topic, subscription, messageId.getEntryId(), messageId.getLedgerId(), messageId.getPartition()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java new file mode 100644 index 0000000000000..114152af07dc5 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertNull; + +public class DeadLetterTopicTest extends ProducerConsumerBase { + + private static final Logger log = LoggerFactory.getLogger(DeadLetterTopicTest.class); + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testDeadLetterTopic() throws Exception { + final String topic = "persistent://my-property/my-ns/dead-letter-topic"; + + final int maxRedeliveryCount = 2; + + final int sendMessages = 100; + + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + + for (int i = 0; i < sendMessages; i++) { + producer.send(String.format("Hello Pulsar [%d]", i).getBytes()); + } + + producer.close(); + + + + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(3, TimeUnit.SECONDS) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build()) + .receiverQueueSize(100) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Consumer deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES) + .topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ") + .subscriptionName("my-subscription") + .subscribe(); + + int totalReceived = 0; + do { + Message message = consumer.receive(); + log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + totalReceived++; + } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); + + int totalInDeadLetter = 0; + do { + Message message = deadLetterConsumer.receive(); + log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } while (totalInDeadLetter < sendMessages); + + deadLetterConsumer.close(); + consumer.close(); + + Consumer checkConsumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Message checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS); + if (checkMessage != null) { + log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData())); + } + assertNull(checkMessage); + + checkConsumer.close(); + } + + @Test + public void testDeadLetterTopicWithMultiTopic() throws Exception { + final String topic1 = "persistent://my-property/my-ns/dead-letter-topic-1"; + final String topic2 = "persistent://my-property/my-ns/dead-letter-topic-2"; + + final int maxRedeliveryCount = 2; + + int sendMessages = 100; + + Producer producer1 = pulsarClient.newProducer(Schema.BYTES) + .topic(topic1) + .create(); + + Producer producer2 = pulsarClient.newProducer(Schema.BYTES) + .topic(topic2) + .create(); + + for (int i = 0; i < sendMessages; i++) { + producer1.send(String.format("Hello Pulsar [%d]", i).getBytes()); + producer2.send(String.format("Hello Pulsar [%d]", i).getBytes()); + } + + sendMessages = sendMessages * 2; + + producer1.close(); + producer2.close(); + + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic1, topic2) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(3, TimeUnit.SECONDS) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build()) + .receiverQueueSize(100) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Consumer deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES) + .topic("persistent://my-property/my-ns/dead-letter-topic-1-my-subscription-DLQ", "persistent://my-property/my-ns/dead-letter-topic-2-my-subscription-DLQ") + .subscriptionName("my-subscription") + .subscribe(); + + int totalReceived = 0; + do { + Message message = consumer.receive(); + log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + totalReceived++; + } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); + + int totalInDeadLetter = 0; + do { + Message message = deadLetterConsumer.receive(); + log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } while (totalInDeadLetter < sendMessages); + + deadLetterConsumer.close(); + consumer.close(); + + Consumer checkConsumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic1, topic2) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Message checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS); + if (checkMessage != null) { + log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData())); + } + assertNull(checkMessage); + + checkConsumer.close(); + } + + @Test + public void testDeadLetterTopicByCustomTopicName() throws Exception { + final String topic = "persistent://my-property/my-ns/dead-letter-topic"; + final int maxRedeliveryCount = 2; + final int sendMessages = 100; + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + for (int i = 0; i < sendMessages; i++) { + producer.send(String.format("Hello Pulsar [%d]", i).getBytes()); + } + producer.close(); + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(3, TimeUnit.SECONDS) + .receiverQueueSize(100) + .deadLetterPolicy(DeadLetterPolicy.builder() + .maxRedeliverCount(maxRedeliveryCount) + .deadLetterTopic("persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ") + .build()) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + Consumer deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES) + .topic("persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ") + .subscriptionName("my-subscription") + .subscribe(); + int totalReceived = 0; + do { + Message message = consumer.receive(); + log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + totalReceived++; + } while (totalReceived < sendMessages * (maxRedeliveryCount + 1)); + int totalInDeadLetter = 0; + do { + Message message = deadLetterConsumer.receive(); + log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData())); + deadLetterConsumer.acknowledge(message); + totalInDeadLetter++; + } while (totalInDeadLetter < sendMessages); + deadLetterConsumer.close(); + consumer.close(); + Consumer checkConsumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + Message checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS); + if (checkMessage != null) { + log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData())); + } + assertNull(checkMessage); + checkConsumer.close(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java index 85dbb245b9013..de1b88a21f965 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java @@ -73,7 +73,7 @@ public void testCompactedOutMessages() throws Exception { = (ConsumerImpl) pulsarClient.newConsumer().topic(topic1) .subscriptionName("my-subscriber-name").subscribe()) { // shove it in the sideways - consumer.receiveIndividualMessagesFromBatch(metadata, batchBuffer, + consumer.receiveIndividualMessagesFromBatch(metadata, 0, batchBuffer, MessageIdData.newBuilder().setLedgerId(1234) .setEntryId(567).build(), consumer.cnx()); Message m = consumer.receive(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index da1cbb5d17b9c..d94175846ad34 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -338,4 +338,28 @@ public interface ConsumerBuilder extends Cloneable { * @return consumer builder. */ ConsumerBuilder intercept(ConsumerInterceptor ...interceptors); + + /** + * Set dead letter policy for consumer + * + * By default some message will redelivery so many times possible, even to the extent that it can be never stop. + * By using dead letter mechanism messages will has the max redelivery count, when message exceeding the maximum + * number of redeliveries, message will send to the Dead Letter Topic and acknowledged automatic. + * + * You can enable the dead letter mechanism by setting dead letter policy. + * example: + *
+     * client.newConsumer()
+     *          .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).build())
+     *          .subscribe();
+     * 
+ * Default dead letter topic name is {TopicName}-{Subscription}-DLQ. + * To setting a custom dead letter topic name + *
+     * client.newConsumer()
+     *          .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("your-topic-name").build())
+     *          .subscribe();
+     * 
+ */ + ConsumerBuilder deadLetterPolicy(DeadLetterPolicy deadLetterPolicy); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java new file mode 100644 index 0000000000000..52a2a23b7fc1f --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import lombok.Builder; +import lombok.Data; + +@Builder +@Data +public class DeadLetterPolicy { + + private int maxRedeliverCount; + + private String deadLetterTopic; + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 7287194f62c8d..9306a8248bdb3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -284,7 +284,7 @@ protected void handleMessage(CommandMessage cmdMessage, ByteBuf headersAndPayloa } ConsumerImpl consumer = consumers.get(cmdMessage.getConsumerId()); if (consumer != null) { - consumer.messageReceived(cmdMessage.getMessageId(), headersAndPayload, this); + consumer.messageReceived(cmdMessage.getMessageId(), cmdMessage.getRedeliveryCount(), headersAndPayload, this); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 2095babbb1de8..103bb5e847aec 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.common.util.FutureUtil; import com.google.common.collect.Lists; @@ -256,6 +257,12 @@ public ConsumerBuilder intercept(ConsumerInterceptor... interceptors) { return this; } + @Override + public ConsumerBuilder deadLetterPolicy(DeadLetterPolicy deadLetterPolicy) { + conf.setDeadLetterPolicy(deadLetterPolicy); + return this; + } + public ConsumerConfigurationData getConf() { return conf; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index a0a231902fbac..b7f9918d92f29 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -27,14 +27,12 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; import io.netty.buffer.ByteBuf; import io.netty.util.Timeout; import java.io.IOException; import java.util.ArrayList; -import static java.util.Base64.getEncoder; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -43,6 +41,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -54,11 +53,13 @@ import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.ConsumerStats; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; @@ -81,6 +82,7 @@ import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,6 +135,12 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final String topicNameWithoutPartition; + private ConcurrentHashMap>> possibleSendToDeadLetterTopicMessages; + + private DeadLetterPolicy deadLetterPolicy; + + private Producer deadLetterProducer; + enum SubscriptionMode { // Make the subscription to be backed by a durable cursor that will retain messages and persist the current // position @@ -205,6 +213,21 @@ enum SubscriptionMode { NonPersistentAcknowledgmentGroupingTracker.of(); } + if (conf.getDeadLetterPolicy() != null) { + possibleSendToDeadLetterTopicMessages = new ConcurrentHashMap<>(); + if (StringUtils.isNotBlank(conf.getDeadLetterPolicy().getDeadLetterTopic())) { + this.deadLetterPolicy = DeadLetterPolicy.builder() + .maxRedeliverCount(conf.getDeadLetterPolicy().getMaxRedeliverCount()) + .deadLetterTopic(conf.getDeadLetterPolicy().getDeadLetterTopic()) + .build(); + } else { + this.deadLetterPolicy = DeadLetterPolicy.builder() + .maxRedeliverCount(conf.getDeadLetterPolicy().getMaxRedeliverCount()) + .deadLetterTopic(String.format("%s-%s-DLQ", topic, subscription)) + .build(); + } + } + topicNameWithoutPartition = topicName.getPartitionedTopicName(); grabCnx(); @@ -233,6 +256,9 @@ public CompletableFuture unsubscribeAsync() { cnx.sendRequestWithId(unsubscribe, requestId).thenRun(() -> { cnx.removeConsumer(consumerId); unAckedMessageTracker.close(); + if (possibleSendToDeadLetterTopicMessages != null) { + possibleSendToDeadLetterTopicMessages.clear(); + } client.cleanupConsumer(ConsumerImpl.this); log.info("[{}][{}] Successfully unsubscribed from topic", topic, subscription); unsubscribeFuture.complete(null); @@ -448,9 +474,16 @@ private CompletableFuture sendAcknowledge(MessageId messageId, AckType ack stats.incrementNumAcksSent(batchMessageId.getBatchSize()); unAckedMessageTracker.remove(new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(), batchMessageId.getPartitionIndex())); + if (possibleSendToDeadLetterTopicMessages != null) { + possibleSendToDeadLetterTopicMessages.remove(new MessageIdImpl(batchMessageId.getLedgerId(), + batchMessageId.getEntryId(), batchMessageId.getPartitionIndex())); + } } else { // increment counter by 1 for non-batch msg unAckedMessageTracker.remove(msgId); + if (possibleSendToDeadLetterTopicMessages != null) { + possibleSendToDeadLetterTopicMessages.remove(msgId); + } stats.incrementNumAcksSent(1); } onAcknowledge(messageId, null); @@ -479,7 +512,9 @@ public void connectionOpened(final ClientCnx cnx) { synchronized (this) { currentSize = incomingMessages.size(); startMessageId = clearReceiverQueue(); - unAckedMessageTracker.clear(); + if (possibleSendToDeadLetterTopicMessages != null) { + possibleSendToDeadLetterTopicMessages.clear(); + } } boolean isDurable = subscriptionMode == SubscriptionMode.Durable; @@ -623,6 +658,9 @@ public void connectionFailed(PulsarClientException exception) { public CompletableFuture closeAsync() { if (getState() == State.Closing || getState() == State.Closed) { unAckedMessageTracker.close(); + if (possibleSendToDeadLetterTopicMessages != null) { + possibleSendToDeadLetterTopicMessages.clear(); + } return CompletableFuture.completedFuture(null); } @@ -630,6 +668,9 @@ public CompletableFuture closeAsync() { log.info("[{}] [{}] Closed Consumer (not connected)", topic, subscription); setState(State.Closed); unAckedMessageTracker.close(); + if (possibleSendToDeadLetterTopicMessages != null) { + possibleSendToDeadLetterTopicMessages.clear(); + } client.cleanupConsumer(this); return CompletableFuture.completedFuture(null); } @@ -651,6 +692,9 @@ public CompletableFuture closeAsync() { log.info("[{}] [{}] Closed consumer", topic, subscription); setState(State.Closed); unAckedMessageTracker.close(); + if (possibleSendToDeadLetterTopicMessages != null) { + possibleSendToDeadLetterTopicMessages.clear(); + } closeFuture.complete(null); client.cleanupConsumer(this); // fail all pending-receive futures to notify application @@ -697,7 +741,7 @@ void activeConsumerChanged(boolean isActive) { }); } - void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientCnx cnx) { + void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf headersAndPayload, ClientCnx cnx) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Received message: {}/{}", topic, subscription, messageId.getLedgerId(), messageId.getEntryId()); @@ -756,6 +800,7 @@ void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientC if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) { final MessageImpl message = new MessageImpl<>(msgId, msgMetadata, uncompressedPayload, createEncryptionContext(msgMetadata), cnx, schema); + uncompressedPayload.release(); msgMetadata.recycle(); @@ -765,6 +810,9 @@ void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientC // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it. // if asyncReceive is waiting then notify callback without adding to incomingMessages queue unAckedMessageTracker.add((MessageIdImpl) message.getMessageId()); + if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) { + possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(), Collections.singletonList(message)); + } if (!pendingReceives.isEmpty()) { notifyPendingReceivedCallback(message, null); } else if (conf.getReceiverQueueSize() != 0 || waitingOnReceiveForZeroQueueSize) { @@ -791,7 +839,7 @@ void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientC }); } else { // handle batch message enqueuing; uncompressed payload has all messages in batch - receiveIndividualMessagesFromBatch(msgMetadata, uncompressedPayload, messageId, cnx); + receiveIndividualMessagesFromBatch(msgMetadata, redeliveryCount, uncompressedPayload, messageId, cnx); } uncompressedPayload.release(); msgMetadata.recycle(); @@ -881,7 +929,7 @@ private void triggerZeroQueueSizeListener(final Message message) { }); } - void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf uncompressedPayload, + void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliveryCount, ByteBuf uncompressedPayload, MessageIdData messageId, ClientCnx cnx) { int batchSize = msgMetadata.getNumMessagesInBatch(); @@ -890,7 +938,10 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf unc getPartitionIndex()); BatchMessageAcker acker = BatchMessageAcker.newAcker(batchSize); unAckedMessageTracker.add(batchMessage); - + List> possibleToDeadLetter = null; + if (deadLetterPolicy != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) { + possibleToDeadLetter = new ArrayList<>(); + } int skippedMessages = 0; try { for (int i = 0; i < batchSize; ++i) { @@ -930,6 +981,9 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf unc final MessageImpl message = new MessageImpl<>(batchMessageIdImpl, msgMetadata, singleMessageMetadataBuilder.build(), singleMessagePayload, createEncryptionContext(msgMetadata), cnx, schema); + if (possibleToDeadLetter != null) { + possibleToDeadLetter.add(message); + } lock.readLock().lock(); try { if (pendingReceives.isEmpty()) { @@ -947,6 +1001,10 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf unc log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName); discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError); } + if (possibleToDeadLetter != null && possibleSendToDeadLetterTopicMessages != null) { + possibleSendToDeadLetterTopicMessages.put(batchMessage, possibleToDeadLetter); + } + if (log.isDebugEnabled()) { log.debug("[{}] [{}] enqueued messages in batch. queue size - {}, available queue size - {}", subscription, consumerName, incomingMessages.size(), incomingMessages.remainingCapacity()); @@ -1184,6 +1242,8 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { MessageIdData.Builder builder = MessageIdData.newBuilder(); batches.forEach(ids -> { List messageIdDatas = ids.stream().map(messageId -> { + // process message possible to dead letter topic + processPossibleToDLQ(messageId); // attempt to remove message from batchMessageAckTracker builder.setPartition(messageId.getPartitionIndex()); builder.setLedgerId(messageId.getLedgerId()); @@ -1212,6 +1272,43 @@ public void redeliverUnacknowledgedMessages(Set messageIds) { } } + private void processPossibleToDLQ(MessageIdImpl messageId) { + List> deadLetterMessages = null; + if (possibleSendToDeadLetterTopicMessages != null) { + if (messageId instanceof BatchMessageIdImpl) { + deadLetterMessages = possibleSendToDeadLetterTopicMessages.get(new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), + getPartitionIndex())); + } else { + deadLetterMessages = possibleSendToDeadLetterTopicMessages.get(messageId); + } + } + if (deadLetterMessages != null) { + if (deadLetterProducer == null) { + try { + deadLetterProducer = client.newProducer(schema) + .topic(this.deadLetterPolicy.getDeadLetterTopic()) + .blockIfQueueFull(false) + .create(); + } catch (Exception e) { + log.error("Create dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), e); + } + } + if (deadLetterProducer != null) { + try { + for (MessageImpl message : deadLetterMessages) { + deadLetterProducer.newMessage() + .value(message.getValue()) + .properties(message.getProperties()) + .send(); + } + acknowledge(messageId); + } catch (Exception e) { + log.error("Send to dead letter topic exception with topic: {}, messageId: {}", deadLetterProducer.getTopic(), messageId, e); + } + } + } + } + @Override public void seek(MessageId messageId) throws PulsarClientException { try { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 47ede41de6000..a0fd493494c20 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.DeadLetterPolicy; @Data public class ConsumerConfigurationData implements Serializable, Cloneable { @@ -82,6 +83,8 @@ public class ConsumerConfigurationData implements Serializable, Cloneable { private int patternAutoDiscoveryPeriod = 1; + private DeadLetterPolicy deadLetterPolicy; + @JsonIgnore public String getSingleTopic() { checkArgument(topicNames.size() == 1); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java index b5e2406c9ea27..c94482d0227fa 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java @@ -282,10 +282,13 @@ public static MessageMetadata parseMessageMetadata(ByteBuf buffer) { } } - public static ByteBufPair newMessage(long consumerId, MessageIdData messageId, ByteBuf metadataAndPayload) { + public static ByteBufPair newMessage(long consumerId, MessageIdData messageId, int redeliveryCount, ByteBuf metadataAndPayload) { CommandMessage.Builder msgBuilder = CommandMessage.newBuilder(); msgBuilder.setConsumerId(consumerId); msgBuilder.setMessageId(messageId); + if (redeliveryCount > 0) { + msgBuilder.setRedeliveryCount(redeliveryCount); + } CommandMessage msg = msgBuilder.build(); BaseCommand.Builder cmdBuilder = BaseCommand.newBuilder(); BaseCommand cmd = cmdBuilder.setType(Type.MESSAGE).setMessage(msg).build(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index edc7e9b118468..c6ff6c1668b75 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -13542,6 +13542,10 @@ public interface CommandMessageOrBuilder // required .pulsar.proto.MessageIdData message_id = 2; boolean hasMessageId(); org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData getMessageId(); + + // optional uint32 redelivery_count = 3 [default = 0]; + boolean hasRedeliveryCount(); + int getRedeliveryCount(); } public static final class CommandMessage extends org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite @@ -13598,9 +13602,20 @@ public org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData getMessageId() return messageId_; } + // optional uint32 redelivery_count = 3 [default = 0]; + public static final int REDELIVERY_COUNT_FIELD_NUMBER = 3; + private int redeliveryCount_; + public boolean hasRedeliveryCount() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public int getRedeliveryCount() { + return redeliveryCount_; + } + private void initFields() { consumerId_ = 0L; messageId_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance(); + redeliveryCount_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -13637,6 +13652,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeMessage(2, messageId_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeUInt32(3, redeliveryCount_); + } } private int memoizedSerializedSize = -1; @@ -13653,6 +13671,10 @@ public int getSerializedSize() { size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream .computeMessageSize(2, messageId_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeUInt32Size(3, redeliveryCount_); + } memoizedSerializedSize = size; return size; } @@ -13770,6 +13792,8 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000001); messageId_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance(); bitField0_ = (bitField0_ & ~0x00000002); + redeliveryCount_ = 0; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -13811,6 +13835,10 @@ public org.apache.pulsar.common.api.proto.PulsarApi.CommandMessage buildPartial( to_bitField0_ |= 0x00000002; } result.messageId_ = messageId_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.redeliveryCount_ = redeliveryCount_; result.bitField0_ = to_bitField0_; return result; } @@ -13823,6 +13851,9 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandMes if (other.hasMessageId()) { mergeMessageId(other.getMessageId()); } + if (other.hasRedeliveryCount()) { + setRedeliveryCount(other.getRedeliveryCount()); + } return this; } @@ -13879,6 +13910,11 @@ public Builder mergeFrom( subBuilder.recycle(); break; } + case 24: { + bitField0_ |= 0x00000004; + redeliveryCount_ = input.readUInt32(); + break; + } } } } @@ -13949,6 +13985,27 @@ public Builder clearMessageId() { return this; } + // optional uint32 redelivery_count = 3 [default = 0]; + private int redeliveryCount_ ; + public boolean hasRedeliveryCount() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public int getRedeliveryCount() { + return redeliveryCount_; + } + public Builder setRedeliveryCount(int value) { + bitField0_ |= 0x00000004; + redeliveryCount_ = value; + + return this; + } + public Builder clearRedeliveryCount() { + bitField0_ = (bitField0_ & ~0x00000004); + redeliveryCount_ = 0; + + return this; + } + // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandMessage) } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 779a27bc1f027..c50f0728e4961 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -335,6 +335,7 @@ message CommandSendError { message CommandMessage { required uint64 consumer_id = 1; required MessageIdData message_id = 2; + optional uint32 redelivery_count = 3 [default = 0]; } message CommandAck {