diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index e4317d49ae052..b9d014a784afd 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -803,6 +803,14 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { return result.entries; } + /** + * Async replays given positions: + * a. before reading it filters out already-acked messages + * b. reads remaining entries async and gives it to given ReadEntriesCallback + * c. returns all already-acked messages which are not replayed so, those messages can be removed by + * caller(Dispatcher)'s replay-list and it won't try to replay it again + * + */ @Override public Set asyncReplayEntries(final Set positions, ReadEntriesCallback callback, Object ctx) { List entries = Lists.newArrayListWithExpectedSize(positions.size()); diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 6f1caf8dc448b..832b1a5e17b5f 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -173,6 +173,12 @@ private void readMoreEntries() { ReadType.Replay); // clear already acked positions from replay bucket messagesToReplay.removeAll(deletedMessages); + // if all the entries are acked-entries and cleared up from messagesToReplay, try to read + // next entries as readCompletedEntries-callback was never called + if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) { + havePendingReplayRead = false; + readMoreEntries(); + } } else if (!havePendingRead) { if (log.isDebugEnabled()) { log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead, diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicE2ETest.java index 32f41f1f0d689..4cfcaede75af0 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicE2ETest.java @@ -1129,4 +1129,85 @@ public void testMessageRedelivery() throws Exception { producer.close(); } + /** + * Verify: + * 1. Broker should not replay already acknowledged messages + * 2. Dispatcher should not stuck while dispatching new messages due to previous-replay + * of invalid/already-acked messages + * + * @throws Exception + */ + @Test + public void testMessageReplay() throws Exception { + + final String topicName = "persistent://prop/use/ns-abc/topic2"; + final String subName = "sub2"; + + Message msg; + int totalMessages = 10; + int replayIndex = totalMessages / 2; + + ConsumerConfiguration conf = new ConsumerConfiguration(); + conf.setSubscriptionType(SubscriptionType.Shared); + conf.setReceiverQueueSize(1); + + Consumer consumer = pulsarClient.subscribe(topicName, subName, conf); + Producer producer = pulsarClient.createProducer(topicName); + + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + assertNotNull(topicRef); + PersistentSubscription subRef = topicRef.getPersistentSubscription(subName); + PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) subRef + .getDispatcher(); + Field replayMap = PersistentDispatcherMultipleConsumers.class.getDeclaredField("messagesToReplay"); + replayMap.setAccessible(true); + TreeSet messagesToReplay = Sets.newTreeSet(); + + assertNotNull(subRef); + + // (1) Produce messages + for (int i = 0; i < totalMessages; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + MessageIdImpl firstAckedMsg = null; + // (2) Consume and ack messages except first message + for (int i = 0; i < totalMessages; i++) { + msg = consumer.receive(); + consumer.acknowledge(msg); + MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); + if (i == 0) { + firstAckedMsg = msgId; + } + if (i < replayIndex) { + // (3) accumulate acked messages for replay + messagesToReplay.add(new PositionImpl(msgId.getLedgerId(), msgId.getEntryId())); + } + } + + // (4) redelivery : should redeliver only unacked messages + Thread.sleep(1000); + + replayMap.set(dispatcher, messagesToReplay); + // (a) redelivery with all acked-message should clear messageReply bucket + dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0)); + assertEquals(messagesToReplay.size(), 0); + + // (b) fill messageReplyBucket with already acked entry again: and try to publish new msg and read it + messagesToReplay.add(new PositionImpl(firstAckedMsg.getLedgerId(), firstAckedMsg.getEntryId())); + replayMap.set(dispatcher, messagesToReplay); + // send new message + final String testMsg = "testMsg"; + producer.send(testMsg.getBytes()); + // consumer should be able to receive only new message and not the + dispatcher.consumerFlow(dispatcher.getConsumers().get(0), 1); + msg = consumer.receive(1, TimeUnit.SECONDS); + assertNotNull(msg); + assertEquals(msg.getData(), testMsg.getBytes()); + + consumer.close(); + producer.close(); + } + }