diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java index 1e1ca9a2ca477..a5d56e3786960 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -455,4 +456,13 @@ public void redeliverUnacknowledgedMessages() { } } + + public void redeliverUnacknowledgedMessages(List messageIds) { + List pendingPositions = messageIds.stream() + .map(messageIdData -> PositionImpl.get(messageIdData.getLedgerId(), messageIdData.getEntryId())) + .filter(position -> pendingAcks.remove(position) != null) + .collect(Collectors.toList()); + + subscription.redeliverUnacknowledgedMessages(this, pendingPositions); + } } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Dispatcher.java index 1e068a80d136d..4d6f7c0abca0a 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Dispatcher.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Dispatcher.java @@ -15,10 +15,13 @@ */ package com.yahoo.pulsar.broker.service; +import java.util.List; import java.util.concurrent.CompletableFuture; +import com.yahoo.pulsar.common.api.proto.PulsarApi; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import com.yahoo.pulsar.utils.CopyOnWriteArrayList; +import org.apache.bookkeeper.mledger.impl.PositionImpl; public interface Dispatcher { void addConsumer(Consumer consumer) throws BrokerServiceException; @@ -43,4 +46,6 @@ public interface Dispatcher { SubType getType(); void redeliverUnacknowledgedMessages(Consumer consumer); + + void redeliverUnacknowledgedMessages(Consumer consumer, List positions); } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java index 68304426b4b1d..da185f03a2298 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java @@ -455,7 +455,12 @@ protected void handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessa CompletableFuture consumerFuture = consumers.get(redeliver.getConsumerId()); if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { - consumerFuture.getNow(null).redeliverUnacknowledgedMessages(); + Consumer consumer = consumerFuture.getNow(null); + if (redeliver.getMessageIdsCount() > 0 && consumer.subType() == SubType.Shared) { + consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList()); + } else { + consumer.redeliverUnacknowledgedMessages(); + } } } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Subscription.java index 180477c90ce0e..e1c588d8dc8f3 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Subscription.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; +import com.yahoo.pulsar.common.api.proto.PulsarApi; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -59,4 +60,6 @@ public interface Subscription { void expireMessages(int messageTTLInSeconds); void redeliverUnacknowledgedMessages(Consumer consumer); + + void redeliverUnacknowledgedMessages(Consumer consumer, List positions); } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 57383ed9b3063..6f1caf8dc448b 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -22,6 +22,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import com.yahoo.pulsar.common.api.proto.PulsarApi; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -29,6 +30,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -403,5 +405,14 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { readMoreEntries(); } + @Override + public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List positions) { + messagesToReplay.addAll(positions); + if (log.isDebugEnabled()) { + log.debug("[{}] Redelivering unacknowledged messages for consumer ", consumer); + } + readMoreEntries(); + } + private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class); } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 47b9acb75df84..597f2d0acc45e 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -30,6 +30,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -262,6 +263,12 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { } + @Override + public void redeliverUnacknowledgedMessages(Consumer consumer, List positions) { + // We cannot redeliver single messages to single consumers to preserve ordering. + redeliverUnacknowledgedMessages(consumer); + } + private void readMoreEntries(Consumer consumer) { int availablePermits = consumer.getAvailablePermits(); diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java index ede14b32503fd..13166342f79db 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java @@ -15,10 +15,12 @@ */ package com.yahoo.pulsar.broker.service.persistent; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import com.yahoo.pulsar.common.api.proto.PulsarApi; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; @@ -586,5 +588,10 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { dispatcher.redeliverUnacknowledgedMessages(consumer); } + @Override + public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List positions) { + dispatcher.redeliverUnacknowledgedMessages(consumer, positions); + } + private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class); } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java new file mode 100644 index 0000000000000..ad9ef4775c5b2 --- /dev/null +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java @@ -0,0 +1,451 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.pulsar.client.impl; + +import static org.testng.Assert.assertEquals; + +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.yahoo.pulsar.broker.service.BrokerTestBase; +import com.yahoo.pulsar.client.api.Consumer; +import com.yahoo.pulsar.client.api.ConsumerConfiguration; +import com.yahoo.pulsar.client.api.Message; +import com.yahoo.pulsar.client.api.Producer; +import com.yahoo.pulsar.client.api.ProducerConfiguration; +import com.yahoo.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; +import com.yahoo.pulsar.client.api.SubscriptionType; + +public class PerMessageUnAcknowledgedRedeliveryTest extends BrokerTestBase { + private static final long testTimeout = 90000; // 1.5 min + private static final Logger log = LoggerFactory.getLogger(PerMessageUnAcknowledgedRedeliveryTest.class); + private final long ackTimeOutMillis = TimeUnit.SECONDS.toMillis(2); + + @Override + @BeforeMethod + public void setup() throws Exception { + super.internalSetup(); + } + + @Override + @AfterMethod + public void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(timeOut = testTimeout) + public void testSharedAckedNormalTopic() throws Exception { + String key = "testSharedAckedNormalTopic"; + final String topicName = "persistent://prop/use/ns-abc/topic-" + key; + final String subscriptionName = "my-ex-subscription-" + key; + final String messagePredicate = "my-message-" + key + "-"; + final int totalMessages = 15; + + // 1. producer connect + Producer producer = pulsarClient.createProducer(topicName); + + // 2. Create consumer + ConsumerConfiguration conf = new ConsumerConfiguration(); + conf.setReceiverQueueSize(50); + conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); + conf.setSubscriptionType(SubscriptionType.Shared); + Consumer consumer = pulsarClient.subscribe(topicName, subscriptionName, conf); + + // 3. producer publish messages + for (int i = 0; i < totalMessages / 3; i++) { + String message = messagePredicate + i; + log.info("Producer produced: " + message); + producer.send(message.getBytes()); + } + + // 4. Receiver receives the message, doesn't ack + Message message = consumer.receive(); + while (message != null) { + String data = new String(message.getData()); + log.info("Consumer received : " + data); + message = consumer.receive(10, TimeUnit.MILLISECONDS); + } + long size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); + log.info(key + " Unacked Message Tracker size is " + size); + assertEquals(size, 5); + + // 5. producer publish more messages + for (int i = 0; i < totalMessages / 3; i++) { + String m = messagePredicate + i; + log.info("Producer produced: " + m); + producer.send(m.getBytes()); + } + + // 6. Receiver receives the message, ack them + message = consumer.receive(); + int received = 0; + while (message != null) { + received++; + String data = new String(message.getData()); + log.info("Consumer received : " + data); + consumer.acknowledge(message); + message = consumer.receive(10, TimeUnit.MILLISECONDS); + } + size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); + log.info(key + " Unacked Message Tracker size is " + size); + assertEquals(size, 5); + assertEquals(received, 5); + + // 7. Simulate ackTimeout + ((ConsumerImpl) consumer).getUnAckedMessageTracker().toggle(); + + // 8. producer publish more messages + for (int i = 0; i < totalMessages / 3; i++) { + String m = messagePredicate + i; + log.info("Producer produced: " + m); + producer.send(m.getBytes()); + } + + // 9. Receiver receives the message, doesn't ack + message = consumer.receive(); + while (message != null) { + String data = new String(message.getData()); + log.info("Consumer received : " + data); + message = consumer.receive(10, TimeUnit.MILLISECONDS); + } + size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); + log.info(key + " Unacked Message Tracker size is " + size); + assertEquals(size, 10); + + Thread.sleep(ackTimeOutMillis); + + // 10. Receiver receives redelivered messages + message = consumer.receive(); + int redelivered = 0; + while (message != null) { + redelivered++; + String data = new String(message.getData()); + log.info("Consumer received : " + data); + consumer.acknowledge(message); + message = consumer.receive(10, TimeUnit.MILLISECONDS); + } + assertEquals(redelivered, 5); + size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); + log.info(key + " Unacked Message Tracker size is " + size); + assertEquals(size, 5); + } + + @Test(timeOut = testTimeout) + public void testExclusiveAckedNormalTopic() throws Exception { + String key = "testExclusiveAckedNormalTopic"; + final String topicName = "persistent://prop/use/ns-abc/topic-" + key; + final String subscriptionName = "my-ex-subscription-" + key; + final String messagePredicate = "my-message-" + key + "-"; + final int totalMessages = 15; + + // 1. producer connect + Producer producer = pulsarClient.createProducer(topicName); + + // 2. Create consumer + ConsumerConfiguration conf = new ConsumerConfiguration(); + conf.setReceiverQueueSize(50); + conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); + conf.setSubscriptionType(SubscriptionType.Exclusive); + Consumer consumer = pulsarClient.subscribe(topicName, subscriptionName, conf); + + // 3. producer publish messages + for (int i = 0; i < totalMessages / 3; i++) { + String message = messagePredicate + i; + log.info("Producer produced: " + message); + producer.send(message.getBytes()); + } + + // 4. Receiver receives the message, doesn't ack + Message message = consumer.receive(); + while (message != null) { + String data = new String(message.getData()); + log.info("Consumer received : " + data); + message = consumer.receive(10, TimeUnit.MILLISECONDS); + } + long size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); + log.info(key + " Unacked Message Tracker size is " + size); + assertEquals(size, 5); + + // 5. producer publish more messages + for (int i = 0; i < totalMessages / 3; i++) { + String m = messagePredicate + i; + log.info("Producer produced: " + m); + producer.send(m.getBytes()); + } + + // 6. Receiver receives the message, ack them + message = consumer.receive(); + int received = 0; + while (message != null) { + received++; + String data = new String(message.getData()); + log.info("Consumer received : " + data); + consumer.acknowledge(message); + message = consumer.receive(10, TimeUnit.MILLISECONDS); + } + size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); + log.info(key + " Unacked Message Tracker size is " + size); + assertEquals(size, 5); + assertEquals(received, 5); + + // 7. Simulate ackTimeout + ((ConsumerImpl) consumer).getUnAckedMessageTracker().toggle(); + + // 8. producer publish more messages + for (int i = 0; i < totalMessages / 3; i++) { + String m = messagePredicate + i; + log.info("Producer produced: " + m); + producer.send(m.getBytes()); + } + + // 9. Receiver receives the message, doesn't ack + message = consumer.receive(); + while (message != null) { + String data = new String(message.getData()); + log.info("Consumer received : " + data); + message = consumer.receive(10, TimeUnit.MILLISECONDS); + } + size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); + log.info(key + " Unacked Message Tracker size is " + size); + assertEquals(size, 10); + + Thread.sleep(ackTimeOutMillis); + + // 10. Receiver receives redelivered messages + message = consumer.receive(); + int redelivered = 0; + while (message != null) { + redelivered++; + String data = new String(message.getData()); + log.info("Consumer received : " + data); + consumer.acknowledge(message); + message = consumer.receive(10, TimeUnit.MILLISECONDS); + } + assertEquals(redelivered, 10); + size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); + log.info(key + " Unacked Message Tracker size is " + size); + assertEquals(size, 0); + } + + @Test(timeOut = testTimeout) + public void testFailoverAckedNormalTopic() throws Exception { + String key = "testFailoverAckedNormalTopic"; + final String topicName = "persistent://prop/use/ns-abc/topic-" + key; + final String subscriptionName = "my-ex-subscription-" + key; + final String messagePredicate = "my-message-" + key + "-"; + final int totalMessages = 15; + + // 1. producer connect + Producer producer = pulsarClient.createProducer(topicName); + + // 2. Create consumer + ConsumerConfiguration conf = new ConsumerConfiguration(); + conf.setReceiverQueueSize(50); + conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); + conf.setSubscriptionType(SubscriptionType.Failover); + Consumer consumer = pulsarClient.subscribe(topicName, subscriptionName, conf); + + // 3. producer publish messages + for (int i = 0; i < totalMessages / 3; i++) { + String message = messagePredicate + i; + log.info("Producer produced: " + message); + producer.send(message.getBytes()); + } + + // 4. Receiver receives the message, doesn't ack + Message message = consumer.receive(); + while (message != null) { + String data = new String(message.getData()); + log.info("Consumer received : " + data); + message = consumer.receive(10, TimeUnit.MILLISECONDS); + } + long size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); + log.info(key + " Unacked Message Tracker size is " + size); + assertEquals(size, 5); + + // 5. producer publish more messages + for (int i = 0; i < totalMessages / 3; i++) { + String m = messagePredicate + i; + log.info("Producer produced: " + m); + producer.send(m.getBytes()); + } + + // 6. Receiver receives the message, ack them + message = consumer.receive(); + int received = 0; + while (message != null) { + received++; + String data = new String(message.getData()); + log.info("Consumer received : " + data); + consumer.acknowledge(message); + message = consumer.receive(10, TimeUnit.MILLISECONDS); + } + size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); + log.info(key + " Unacked Message Tracker size is " + size); + assertEquals(size, 5); + assertEquals(received, 5); + + // 7. Simulate ackTimeout + ((ConsumerImpl) consumer).getUnAckedMessageTracker().toggle(); + + // 8. producer publish more messages + for (int i = 0; i < totalMessages / 3; i++) { + String m = messagePredicate + i; + log.info("Producer produced: " + m); + producer.send(m.getBytes()); + } + + // 9. Receiver receives the message, doesn't ack + message = consumer.receive(); + while (message != null) { + String data = new String(message.getData()); + log.info("Consumer received : " + data); + message = consumer.receive(10, TimeUnit.MILLISECONDS); + } + size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); + log.info(key + " Unacked Message Tracker size is " + size); + assertEquals(size, 10); + + Thread.sleep(ackTimeOutMillis); + + // 10. Receiver receives redelivered messages + message = consumer.receive(); + int redelivered = 0; + while (message != null) { + redelivered++; + String data = new String(message.getData()); + log.info("Consumer received : " + data); + consumer.acknowledge(message); + message = consumer.receive(10, TimeUnit.MILLISECONDS); + } + assertEquals(redelivered, 10); + size = ((ConsumerImpl) consumer).getUnAckedMessageTracker().size(); + log.info(key + " Unacked Message Tracker size is " + size); + assertEquals(size, 0); + } + + private static long getUnackedMessagesCountInPartitionedConsumer(Consumer c) { + return ((PartitionedConsumerImpl) c).getConsumers().stream() + .mapToLong(consumer -> consumer.getUnAckedMessageTracker().size()).sum(); + } + + @Test(timeOut = testTimeout) + public void testSharedAckedPartitionedTopic() throws Exception { + String key = "testSharedAckedPartitionedTopic"; + final String topicName = "persistent://prop/use/ns-abc/topic-" + key; + final String subscriptionName = "my-ex-subscription-" + key; + final String messagePredicate = "my-message-" + key + "-"; + final int totalMessages = 15; + final int numberOfPartitions = 3; + admin.persistentTopics().createPartitionedTopic(topicName, numberOfPartitions); + + // 1. producer connect + ProducerConfiguration prodConfig = new ProducerConfiguration(); + prodConfig.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); + Producer producer = pulsarClient.createProducer(topicName, prodConfig); + + // 2. Create consumer + ConsumerConfiguration conf = new ConsumerConfiguration(); + conf.setReceiverQueueSize(50); + conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS); + conf.setSubscriptionType(SubscriptionType.Shared); + Consumer consumer = pulsarClient.subscribe(topicName, subscriptionName, conf); + + // 3. producer publish messages + for (int i = 0; i < totalMessages / 3; i++) { + String message = messagePredicate + i; + log.info("Producer produced: " + message); + producer.send(message.getBytes()); + } + + // 4. Receiver receives the message, doesn't ack + Message message = consumer.receive(); + while (message != null) { + String data = new String(message.getData()); + log.info("Consumer received : " + data); + message = consumer.receive(10, TimeUnit.MILLISECONDS); + } + + long size = getUnackedMessagesCountInPartitionedConsumer(consumer); + log.info(key + " Unacked Message Tracker size is " + size); + assertEquals(size, 5); + + // 5. producer publish more messages + for (int i = 0; i < totalMessages / 3; i++) { + String m = messagePredicate + i; + log.info("Producer produced: " + m); + producer.send(m.getBytes()); + } + + // 6. Receiver receives the message, ack them + message = consumer.receive(); + int received = 0; + while (message != null) { + received++; + String data = new String(message.getData()); + log.info("Consumer received : " + data); + consumer.acknowledge(message); + message = consumer.receive(10, TimeUnit.MILLISECONDS); + } + size = getUnackedMessagesCountInPartitionedConsumer(consumer); + log.info(key + " Unacked Message Tracker size is " + size); + assertEquals(size, 5); + assertEquals(received, 5); + + // 7. Simulate ackTimeout + ((PartitionedConsumerImpl) consumer).getConsumers().forEach(c -> c.getUnAckedMessageTracker().toggle()); + + // 8. producer publish more messages + for (int i = 0; i < totalMessages / 3; i++) { + String m = messagePredicate + i; + log.info("Producer produced: " + m); + producer.send(m.getBytes()); + } + + // 9. Receiver receives the message, doesn't ack + message = consumer.receive(); + while (message != null) { + String data = new String(message.getData()); + log.info("Consumer received : " + data); + message = consumer.receive(10, TimeUnit.MILLISECONDS); + } + size = getUnackedMessagesCountInPartitionedConsumer(consumer); + log.info(key + " Unacked Message Tracker size is " + size); + assertEquals(size, 10); + + Thread.sleep(ackTimeOutMillis); + + // 10. Receiver receives redelivered messages + message = consumer.receive(); + int redelivered = 0; + while (message != null) { + redelivered++; + String data = new String(message.getData()); + log.info("Consumer received : " + data); + consumer.acknowledge(message); + message = consumer.receive(10, TimeUnit.MILLISECONDS); + } + assertEquals(redelivered, 5); + size = getUnackedMessagesCountInPartitionedConsumer(consumer); + log.info(key + " Unacked Message Tracker size is " + size); + assertEquals(size, 5); + } +} diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerBase.java index 21393b4d9cd49..1246477bb3e4e 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerBase.java @@ -15,6 +15,7 @@ */ package com.yahoo.pulsar.client.impl; +import java.util.List; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; @@ -23,16 +24,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import com.yahoo.pulsar.client.api.*; import org.apache.commons.codec.digest.DigestUtils; import com.google.common.collect.Queues; -import com.yahoo.pulsar.client.api.Consumer; -import com.yahoo.pulsar.client.api.ConsumerConfiguration; -import com.yahoo.pulsar.client.api.Message; -import com.yahoo.pulsar.client.api.MessageId; -import com.yahoo.pulsar.client.api.MessageListener; -import com.yahoo.pulsar.client.api.PulsarClientException; -import com.yahoo.pulsar.client.api.SubscriptionType; import com.yahoo.pulsar.client.util.FutureUtil; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; @@ -309,4 +304,12 @@ public String getTopic() { public String getSubscription() { return subscription; } + + /** + * Redelivers the given unacknowledged messages. In Failover mode, the request is ignored if the consumer is not + * active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all + * the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection + * breaks, the messages are redelivered after reconnect. + */ + protected abstract void redeliverUnacknowledgedMessages(List messageIds); } diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java index 3c1f5718d5a16..a03f2b1dcce7a 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.BitSet; +import java.util.List; import java.util.NavigableMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; @@ -29,7 +30,9 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +41,7 @@ import com.yahoo.pulsar.client.api.Message; import com.yahoo.pulsar.client.api.MessageId; import com.yahoo.pulsar.client.api.PulsarClientException; +import com.yahoo.pulsar.client.api.SubscriptionType; import com.yahoo.pulsar.client.util.FutureUtil; import com.yahoo.pulsar.common.api.Commands; import com.yahoo.pulsar.common.api.PulsarDecoder; @@ -60,6 +64,7 @@ import static com.yahoo.pulsar.common.api.Commands.hasChecksum; public class ConsumerImpl extends ConsumerBase { + private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000; private final long consumerId; @@ -887,6 +892,40 @@ public void redeliverUnacknowledgedMessages() { } } + @Override + public void redeliverUnacknowledgedMessages(List messageIds) { + if (conf.getSubscriptionType() != SubscriptionType.Shared) { + // We cannot redeliver single messages if subscription type is not Shared + redeliverUnacknowledgedMessages(); + return; + } + ClientCnx cnx = cnx(); + if (isConnected() && cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v2.getNumber()) { + List> batches = Lists.partition(messageIds, MAX_REDELIVER_UNACKNOWLEDGED); + MessageIdData.Builder builder = MessageIdData.newBuilder(); + batches.forEach(ids -> { + List messageIdDatas = ids.stream() + .map(messageId -> { + builder.setPartition(messageId.getPartitionIndex()); + builder.setLedgerId(messageId.getLedgerId()); + builder.setEntryId(messageId.getEntryId()); + return builder.build(); + }).collect(Collectors.toList()); + ByteBuf cmd = Commands.newRedeliverUnacknowledgedMessages(consumerId, messageIdDatas); + cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise()); + messageIdDatas.forEach(MessageIdData::recycle); + }); + builder.recycle(); + return; + } + if (cnx == null || (state.get() == State.Connecting)) { + log.warn("[{}] Client Connection needs to be establised for redelivery of unacknowledged messages", this); + } else { + log.warn("[{}] Reconnecting the client to redeliver the messages.", this); + cnx.ctx().close(); + } + } + @Override public ConsumerStats getStats() { if (stats instanceof ConsumerStatsDisabled) { diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java index dae3bf1411398..694afb7c2b296 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PartitionedConsumerImpl.java @@ -17,6 +17,7 @@ import static com.google.common.base.Preconditions.checkArgument; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -365,9 +366,24 @@ public void redeliverUnacknowledgedMessages() { } } + @Override + public void redeliverUnacknowledgedMessages(List messageIds) { + for (ConsumerImpl c : consumers) { + List consumerMessageIds = new ArrayList<>(); + messageIds.removeIf(messageId -> { + if (messageId.getPartitionIndex() == c.getPartitionIndex()) { + consumerMessageIds.add(messageId); + return true; + } + return false; + }); + c.redeliverUnacknowledgedMessages(consumerMessageIds); + } + } + /** * helper method that returns current state of data structure used to track acks for batch messages - * + * * @return true if all batch messages have been acknowledged */ public boolean isBatchingAckTrackerEmpty() { @@ -378,6 +394,10 @@ public boolean isBatchingAckTrackerEmpty() { return state; } + List getConsumers() { + return consumers; + } + @Override public synchronized ConsumerStats getStats() { if (stats == null) { @@ -391,5 +411,4 @@ public synchronized ConsumerStats getStats() { } private static final Logger log = LoggerFactory.getLogger(PartitionedConsumerImpl.class); - } diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/UnAckedMessageTracker.java index f41e2d86245a7..89c2e3e27e66b 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/UnAckedMessageTracker.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/UnAckedMessageTracker.java @@ -16,6 +16,8 @@ package com.yahoo.pulsar.client.impl; import java.io.Closeable; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -83,10 +85,13 @@ public void start(PulsarClientImpl client, ConsumerBase consumerBase, long ackTi this.stop(); timeout = client.timer().newTimeout(new TimerTask() { @Override - public void run(Timeout timeout) throws Exception { + public void run(Timeout t) throws Exception { if (isAckTimeout()) { log.warn("[{}] {} messages have timed-out", consumerBase, oldOpenSet.size()); - consumerBase.redeliverUnacknowledgedMessages(); + List messageIds = new ArrayList<>(); + oldOpenSet.forEach(messageIds::add); + oldOpenSet.clear(); + consumerBase.redeliverUnacknowledgedMessages(messageIds); } toggle(); timeout = client.timer().newTimeout(this, ackTimeoutMillis, TimeUnit.MILLISECONDS); @@ -94,7 +99,7 @@ public void run(Timeout timeout) throws Exception { }, ackTimeoutMillis, TimeUnit.MILLISECONDS); } - private void toggle() { + void toggle() { writeLock.lock(); try { ConcurrentOpenHashSet temp = currentSet; diff --git a/pulsar-common/generate_protobuf.sh b/pulsar-common/generate_protobuf.sh index d6077397573d5..a32384b22d797 100755 --- a/pulsar-common/generate_protobuf.sh +++ b/pulsar-common/generate_protobuf.sh @@ -1,3 +1,4 @@ +#!/bin/bash # # Copyright 2016 Yahoo Inc. # diff --git a/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/Commands.java index 64f6ab3532060..201ad85ee564d 100644 --- a/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/Commands.java @@ -16,6 +16,7 @@ package com.yahoo.pulsar.common.api; import java.io.IOException; +import java.util.List; import com.google.protobuf.ByteString; import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.computeChecksum; @@ -388,6 +389,19 @@ public static ByteBuf newRedeliverUnacknowledgedMessages(long consumerId) { return res; } + public static ByteBuf newRedeliverUnacknowledgedMessages(long consumerId, List messageIds) { + CommandRedeliverUnacknowledgedMessages.Builder redeliverBuilder = CommandRedeliverUnacknowledgedMessages + .newBuilder(); + redeliverBuilder.setConsumerId(consumerId); + redeliverBuilder.addAllMessageIds(messageIds); + CommandRedeliverUnacknowledgedMessages redeliver = redeliverBuilder.build(); + ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.REDELIVER_UNACKNOWLEDGED_MESSAGES) + .setRedeliverUnacknowledgedMessages(redeliverBuilder)); + redeliver.recycle(); + redeliverBuilder.recycle(); + return res; + } + private final static ByteBuf cmdPing = serializeWithSize( BaseCommand.newBuilder().setType(Type.PING).setPing(CommandPing.getDefaultInstance())); diff --git a/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java index 82e262c0cf98a..f2044caa8768f 100644 --- a/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java @@ -9525,6 +9525,12 @@ public interface CommandRedeliverUnacknowledgedMessagesOrBuilder // required uint64 consumer_id = 1; boolean hasConsumerId(); long getConsumerId(); + + // repeated .com.yahoo.pulsar.common.api.proto.MessageIdData message_ids = 2; + java.util.List + getMessageIdsList(); + com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData getMessageIds(int index); + int getMessageIdsCount(); } public static final class CommandRedeliverUnacknowledgedMessages extends com.google.protobuf.GeneratedMessageLite @@ -9571,8 +9577,30 @@ public long getConsumerId() { return consumerId_; } + // repeated .com.yahoo.pulsar.common.api.proto.MessageIdData message_ids = 2; + public static final int MESSAGE_IDS_FIELD_NUMBER = 2; + private java.util.List messageIds_; + public java.util.List getMessageIdsList() { + return messageIds_; + } + public java.util.List + getMessageIdsOrBuilderList() { + return messageIds_; + } + public int getMessageIdsCount() { + return messageIds_.size(); + } + public com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData getMessageIds(int index) { + return messageIds_.get(index); + } + public com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdDataOrBuilder getMessageIdsOrBuilder( + int index) { + return messageIds_.get(index); + } + private void initFields() { consumerId_ = 0L; + messageIds_ = java.util.Collections.emptyList(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -9583,6 +9611,12 @@ public final boolean isInitialized() { memoizedIsInitialized = 0; return false; } + for (int i = 0; i < getMessageIdsCount(); i++) { + if (!getMessageIds(i).isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } memoizedIsInitialized = 1; return true; } @@ -9598,6 +9632,9 @@ public void writeTo(com.yahoo.pulsar.common.util.protobuf.ByteBufCodedOutputStre if (((bitField0_ & 0x00000001) == 0x00000001)) { output.writeUInt64(1, consumerId_); } + for (int i = 0; i < messageIds_.size(); i++) { + output.writeMessage(2, messageIds_.get(i)); + } } private int memoizedSerializedSize = -1; @@ -9610,6 +9647,10 @@ public int getSerializedSize() { size += com.google.protobuf.CodedOutputStream .computeUInt64Size(1, consumerId_); } + for (int i = 0; i < messageIds_.size(); i++) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(2, messageIds_.get(i)); + } memoizedSerializedSize = size; return size; } @@ -9725,6 +9766,8 @@ public Builder clear() { super.clear(); consumerId_ = 0L; bitField0_ = (bitField0_ & ~0x00000001); + messageIds_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000002); return this; } @@ -9762,6 +9805,11 @@ public com.yahoo.pulsar.common.api.proto.PulsarApi.CommandRedeliverUnacknowledge to_bitField0_ |= 0x00000001; } result.consumerId_ = consumerId_; + if (((bitField0_ & 0x00000002) == 0x00000002)) { + messageIds_ = java.util.Collections.unmodifiableList(messageIds_); + bitField0_ = (bitField0_ & ~0x00000002); + } + result.messageIds_ = messageIds_; result.bitField0_ = to_bitField0_; return result; } @@ -9771,6 +9819,16 @@ public Builder mergeFrom(com.yahoo.pulsar.common.api.proto.PulsarApi.CommandRede if (other.hasConsumerId()) { setConsumerId(other.getConsumerId()); } + if (!other.messageIds_.isEmpty()) { + if (messageIds_.isEmpty()) { + messageIds_ = other.messageIds_; + bitField0_ = (bitField0_ & ~0x00000002); + } else { + ensureMessageIdsIsMutable(); + messageIds_.addAll(other.messageIds_); + } + + } return this; } @@ -9779,6 +9837,12 @@ public final boolean isInitialized() { return false; } + for (int i = 0; i < getMessageIdsCount(); i++) { + if (!getMessageIds(i).isInitialized()) { + + return false; + } + } return true; } @@ -9809,6 +9873,12 @@ public Builder mergeFrom( consumerId_ = input.readUInt64(); break; } + case 18: { + com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData.Builder subBuilder = com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData.newBuilder(); + input.readMessage(subBuilder, extensionRegistry); + addMessageIds(subBuilder.buildPartial()); + break; + } } } } @@ -9836,6 +9906,95 @@ public Builder clearConsumerId() { return this; } + // repeated .com.yahoo.pulsar.common.api.proto.MessageIdData message_ids = 2; + private java.util.List messageIds_ = + java.util.Collections.emptyList(); + private void ensureMessageIdsIsMutable() { + if (!((bitField0_ & 0x00000002) == 0x00000002)) { + messageIds_ = new java.util.ArrayList(messageIds_); + bitField0_ |= 0x00000002; + } + } + + public java.util.List getMessageIdsList() { + return java.util.Collections.unmodifiableList(messageIds_); + } + public int getMessageIdsCount() { + return messageIds_.size(); + } + public com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData getMessageIds(int index) { + return messageIds_.get(index); + } + public Builder setMessageIds( + int index, com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData value) { + if (value == null) { + throw new NullPointerException(); + } + ensureMessageIdsIsMutable(); + messageIds_.set(index, value); + + return this; + } + public Builder setMessageIds( + int index, com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData.Builder builderForValue) { + ensureMessageIdsIsMutable(); + messageIds_.set(index, builderForValue.build()); + + return this; + } + public Builder addMessageIds(com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData value) { + if (value == null) { + throw new NullPointerException(); + } + ensureMessageIdsIsMutable(); + messageIds_.add(value); + + return this; + } + public Builder addMessageIds( + int index, com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData value) { + if (value == null) { + throw new NullPointerException(); + } + ensureMessageIdsIsMutable(); + messageIds_.add(index, value); + + return this; + } + public Builder addMessageIds( + com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData.Builder builderForValue) { + ensureMessageIdsIsMutable(); + messageIds_.add(builderForValue.build()); + + return this; + } + public Builder addMessageIds( + int index, com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData.Builder builderForValue) { + ensureMessageIdsIsMutable(); + messageIds_.add(index, builderForValue.build()); + + return this; + } + public Builder addAllMessageIds( + java.lang.Iterable values) { + ensureMessageIdsIsMutable(); + super.addAll(values, messageIds_); + + return this; + } + public Builder clearMessageIds() { + messageIds_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000002); + + return this; + } + public Builder removeMessageIds(int index) { + ensureMessageIdsIsMutable(); + messageIds_.remove(index); + + return this; + } + // @@protoc_insertion_point(builder_scope:com.yahoo.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages) } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 5de4523979483..f8856e11c2158 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -140,7 +140,7 @@ message CommandProducer { message CommandSend { required uint64 producer_id = 1; required uint64 sequence_id = 2; - optional int32 num_messages = 3 [default = 1];; + optional int32 num_messages = 3 [default = 1]; } message CommandSendReceipt { @@ -209,6 +209,7 @@ message CommandCloseConsumer { message CommandRedeliverUnacknowledgedMessages { required uint64 consumer_id = 1; + repeated MessageIdData message_ids = 2; } message CommandSuccess {