Skip to content

Commit

Permalink
Fixed KeyShared consumers getting stuck on delivery (apache#7105)
Browse files Browse the repository at this point in the history
Motivation
If one consumer is slowly processing messages, this can prevent other consumers from making progress on the topic. Instead we're in a loop of keep trying to replay messages without being able to dispatch any message.

The basic idea here is that we can make progress by keep going through the topic and dispatch these messages to the consumers that are free, at least the keys that belong to them.
  • Loading branch information
merlimat authored Jun 5, 2020
1 parent 9b0098a commit 2f75d02
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata
}
}

private synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
if (!messagesToRedeliver.isEmpty()) {
return messagesToRedeliver.items(maxMessagesToRead,
(ledgerId, entryId) -> new PositionImpl(ledgerId, entryId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.util.concurrent.FastThreadLocal;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -30,13 +31,15 @@
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.ReadType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,6 +48,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi

private final StickyKeyConsumerSelector selector;

private boolean isDispatcherStuckOnReplays = false;

PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor,
Subscription subscription, StickyKeyConsumerSelector selector) {
super(topic, cursor, subscription);
Expand Down Expand Up @@ -159,6 +164,30 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
dispatchRateLimiter.get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
}
}

if (totalMessagesSent == 0) {
// This means, that all the messages we've just read cannot be dispatched right now.
// This condition can only happen when:
// 1. We have consumers ready to accept messages (otherwise the would not haven been triggered)
// 2. All keys in the current set of messages are routing to consumers that are currently busy
//
// The solution here is to move on and read next batch of messages which might hopefully contain
// also keys meant for other consumers.
isDispatcherStuckOnReplays = true;
readMoreEntries();
}
}

protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
if (isDispatcherStuckOnReplays) {
// If we're stuck on replay, we want to move forward reading on the topic (until the overall max-unacked
// messages kicks in), instead of keep replaying the same old messages, since the consumer that these
// messages are routing to might be busy at the moment
this.isDispatcherStuckOnReplays = false;
return Collections.emptySet();
} else {
return super.getMessagesToReplayNow(maxMessagesToRead);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -392,6 +395,74 @@ public void testCannotUseAcknowledgeCumulative() throws PulsarClientException {
}
}

@Test(dataProvider = "batch")
public void testMakingProgressWithSlowerConsumer(boolean enableBatch) throws Exception {
this.conf.setSubscriptionKeySharedEnable(true);
String topic = "testMakingProgressWithSlowerConsumer-" + UUID.randomUUID();

String slowKey = "slowKey";

List<PulsarClient> clients = new ArrayList<>();

AtomicInteger receivedMessages = new AtomicInteger();

for (int i = 0; i < 10; i++) {
PulsarClient client = PulsarClient.builder()
.serviceUrl(brokerUrl.toString())
.build();
clients.add(client);

client.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
.receiverQueueSize(1)
.messageListener((consumer, msg) -> {
try {
if (slowKey.equals(msg.getKey())) {
// Block the thread to simulate a slow consumer
Thread.sleep(10000);
}

receivedMessages.incrementAndGet();
consumer.acknowledge(msg);
} catch (Exception e) {
e.printStackTrace();
}
})
.subscribe();
}

@Cleanup
Producer<Integer> producer = createProducer(topic, enableBatch);

// First send the "slow key" so that 1 consumer will get stuck
producer.newMessage()
.key(slowKey)
.value(-1)
.send();

int N = 1000;

// Then send all the other keys
for (int i = 0; i < N; i++) {
producer.newMessage()
.key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
.value(i)
.send();
}

// Since only 1 out of 10 consumers is stuck, we should be able to receive ~90% messages,
// plus or minus for some skew in the key distribution.
Thread.sleep(5000);

assertEquals((double) receivedMessages.get(), N * 0.9, N * 0.3);

for (PulsarClient c : clients) {
c.close();
}
}

private Producer<Integer> createProducer(String topic, boolean enableBatch) throws PulsarClientException {
Producer<Integer> producer = null;
if (enableBatch) {
Expand Down

0 comments on commit 2f75d02

Please sign in to comment.