Skip to content

Commit

Permalink
Complete Message replay immediately if there is no entry to replay wh…
Browse files Browse the repository at this point in the history
…ich will not block reads from other consumer (#103)
  • Loading branch information
rdhabalia authored Nov 4, 2016
1 parent 1f05368 commit 15683fd
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,14 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
return result.entries;
}

/**
* Async replays given positions:
* a. before reading it filters out already-acked messages
* b. reads remaining entries async and gives it to given ReadEntriesCallback
* c. returns all already-acked messages which are not replayed so, those messages can be removed by
* caller(Dispatcher)'s replay-list and it won't try to replay it again
*
*/
@Override
public Set<? extends Position> asyncReplayEntries(final Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx) {
List<Entry> entries = Lists.newArrayListWithExpectedSize(positions.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ private void readMoreEntries() {
ReadType.Replay);
// clear already acked positions from replay bucket
messagesToReplay.removeAll(deletedMessages);
// if all the entries are acked-entries and cleared up from messagesToReplay, try to read
// next entries as readCompletedEntries-callback was never called
if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) {
havePendingReplayRead = false;
readMoreEntries();
}
} else if (!havePendingRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1129,4 +1129,85 @@ public void testMessageRedelivery() throws Exception {
producer.close();
}

/**
* Verify:
* 1. Broker should not replay already acknowledged messages
* 2. Dispatcher should not stuck while dispatching new messages due to previous-replay
* of invalid/already-acked messages
*
* @throws Exception
*/
@Test
public void testMessageReplay() throws Exception {

final String topicName = "persistent://prop/use/ns-abc/topic2";
final String subName = "sub2";

Message msg;
int totalMessages = 10;
int replayIndex = totalMessages / 2;

ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Shared);
conf.setReceiverQueueSize(1);

Consumer consumer = pulsarClient.subscribe(topicName, subName, conf);
Producer producer = pulsarClient.createProducer(topicName);

PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName);
assertNotNull(topicRef);
PersistentSubscription subRef = topicRef.getPersistentSubscription(subName);
PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) subRef
.getDispatcher();
Field replayMap = PersistentDispatcherMultipleConsumers.class.getDeclaredField("messagesToReplay");
replayMap.setAccessible(true);
TreeSet<PositionImpl> messagesToReplay = Sets.newTreeSet();

assertNotNull(subRef);

// (1) Produce messages
for (int i = 0; i < totalMessages; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}

MessageIdImpl firstAckedMsg = null;
// (2) Consume and ack messages except first message
for (int i = 0; i < totalMessages; i++) {
msg = consumer.receive();
consumer.acknowledge(msg);
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
if (i == 0) {
firstAckedMsg = msgId;
}
if (i < replayIndex) {
// (3) accumulate acked messages for replay
messagesToReplay.add(new PositionImpl(msgId.getLedgerId(), msgId.getEntryId()));
}
}

// (4) redelivery : should redeliver only unacked messages
Thread.sleep(1000);

replayMap.set(dispatcher, messagesToReplay);
// (a) redelivery with all acked-message should clear messageReply bucket
dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0));
assertEquals(messagesToReplay.size(), 0);

// (b) fill messageReplyBucket with already acked entry again: and try to publish new msg and read it
messagesToReplay.add(new PositionImpl(firstAckedMsg.getLedgerId(), firstAckedMsg.getEntryId()));
replayMap.set(dispatcher, messagesToReplay);
// send new message
final String testMsg = "testMsg";
producer.send(testMsg.getBytes());
// consumer should be able to receive only new message and not the
dispatcher.consumerFlow(dispatcher.getConsumers().get(0), 1);
msg = consumer.receive(1, TimeUnit.SECONDS);
assertNotNull(msg);
assertEquals(msg.getData(), testMsg.getBytes());

consumer.close();
producer.close();
}

}

0 comments on commit 15683fd

Please sign in to comment.