Skip to content

Commit

Permalink
Add unack-message threshold to restrict consumer for receiving messag…
Browse files Browse the repository at this point in the history
…es without acknowledging-msg up to the threshold
  • Loading branch information
rdhabalia committed Oct 7, 2016
1 parent bb35716 commit 82bf1fc
Show file tree
Hide file tree
Showing 10 changed files with 686 additions and 26 deletions.
5 changes: 5 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ tlsTrustCertsFilePath=
# Accept untrusted TLS certificate from client
tlsAllowInsecureConnection=false

# Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will stop sending
# messages to consumer once, this limit reaches until consumer starts acknowledging messages back.
# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
maxUnackedMessagesPerConsumer=50000

### --- Authentication --- ###

# Enable authentication
Expand Down
5 changes: 5 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ clientLibraryVersionCheckAllowUnversioned=true
# to service discovery health checks
statusFilePath=/usr/local/apache/htdocs

# Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will stop sending
# messages to consumer once, this limit reaches until consumer starts acknowledging messages back
# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
maxUnackedMessagesPerConsumer=50000

### --- Authentication --- ###

# Enable authentication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ public class ServiceConfiguration {
// Path for the file used to determine the rotation status for the broker
// when responding to service discovery health checks
private String statusFilePath;
// Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will stop sending
// messages to consumer once, this limit reaches until consumer starts acknowledging messages back
// Using a value of 0, is disabling unackedMessage-limit check and consumer can receive messages without any restriction
private int maxUnackedMessagesPerConsumer = 50000;

/***** --- TLS --- ****/
// Enable TLS
Expand Down Expand Up @@ -404,6 +408,14 @@ public String getStatusFilePath() {
public void setStatusFilePath(String statusFilePath) {
this.statusFilePath = statusFilePath;
}

public int getMaxUnackedMessagesPerConsumer() {
return maxUnackedMessagesPerConsumer;
}

public void setMaxUnackedMessagesPerConsumer(int maxUnackedMessagesPerConsumer) {
this.maxUnackedMessagesPerConsumer = maxUnackedMessagesPerConsumer;
}

public boolean isTlsEnabled() {
return tlsEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.policies.data.ConsumerStats;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashSet;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
Expand All @@ -63,17 +63,22 @@ public class Consumer {
// increase its availability
private final AtomicInteger messagePermits = new AtomicInteger(0);

private final ConcurrentOpenHashSet<PositionImpl> pendingAcks;
private final ConcurrentOpenHashMap<PositionImpl, Integer> pendingAcks;

private final ConsumerStats stats;

private final int maxUnackedMessages;
private AtomicInteger unackedMessages = new AtomicInteger(0);
private volatile boolean blockedConsumerOnUnackedMsgs = false;

public Consumer(Subscription subscription, SubType subType, long consumerId, String consumerName, ServerCnx cnx,
String appId) throws BrokerServiceException {
public Consumer(Subscription subscription, SubType subType, long consumerId, String consumerName,
int maxUnackedMessages, ServerCnx cnx, String appId) throws BrokerServiceException {

this.subscription = subscription;
this.subType = subType;
this.consumerId = consumerId;
this.consumerName = consumerName;
this.maxUnackedMessages = maxUnackedMessages;
this.cnx = cnx;
this.msgOut = new Rate();
this.appId = appId;
Expand All @@ -84,7 +89,7 @@ public Consumer(Subscription subscription, SubType subType, long consumerId, Str
stats.connectedSince = DATE_FORMAT.format(new Date(System.currentTimeMillis()));

if (subType == SubType.Shared) {
this.pendingAcks = new ConcurrentOpenHashSet<PositionImpl>(256, 2);
this.pendingAcks = new ConcurrentOpenHashMap<PositionImpl, Integer>(256, 2);
} else {
// We don't need to keep track of pending acks if the subscription is not shared
this.pendingAcks = null;
Expand Down Expand Up @@ -156,6 +161,12 @@ public ChannelPromise sendMessages(final List<Entry> entries) {
return writePromise;
}

private void incrementUnackedMessages(int ackedMessages) {
if (shouldBlockConsumerOnUnackMsgs() && unackedMessages.addAndGet(ackedMessages) >= maxUnackedMessages) {
blockedConsumerOnUnackedMsgs = true;
}
}

int getBatchSizeforEntry(ByteBuf metadataAndPayload) {
try {
// save the reader index and restore after parsing
Expand Down Expand Up @@ -191,12 +202,13 @@ void updatePermitsAndPendingAcks(final List<Entry> entries) {
}
if (pendingAcks != null) {
PositionImpl pos = PositionImpl.get((PositionImpl) entry.getPosition());
pendingAcks.add(pos);
pendingAcks.put(pos, batchSize);
}
permitsToReduce += batchSize;
}

// reduce permit and increment unackedMsg count with total number of messages in batch-msgs
int permits = messagePermits.addAndGet(-permitsToReduce);
incrementUnackedMessages(permitsToReduce);
if (permits < 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] message permits dropped below 0 - {}", subscription, consumerId, permits);
Expand Down Expand Up @@ -266,23 +278,46 @@ void messageAcked(CommandAck ack) {
} else {
subscription.acknowledgeMessage(position, ack.getAckType());
}

}

void flowPermits(int additionalNumberOfMessages) {
checkArgument(additionalNumberOfMessages > 0);

// block shared consumer when unacked-messages reaches limit
if (shouldBlockConsumerOnUnackMsgs() && unackedMessages.get() >= maxUnackedMessages) {
blockedConsumerOnUnackedMsgs = true;
}

int oldPermits = messagePermits.getAndAdd(additionalNumberOfMessages);
if (!blockedConsumerOnUnackedMsgs) {
subscription.consumerFlow(this, additionalNumberOfMessages);
}
if (log.isDebugEnabled()) {
log.debug("[{}] Added more flow control message permits {} (old was: {})", this, additionalNumberOfMessages,
oldPermits);
log.debug("[{}] Added more flow control message permits {} (old was: {})", this,
additionalNumberOfMessages, oldPermits);
}
subscription.consumerFlow(this, additionalNumberOfMessages);
}

public int getAvailablePermits() {
return messagePermits.get();
}

public boolean isBlocked() {
return blockedConsumerOnUnackedMsgs;
}

/**
* Checks if consumer-blocking on unAckedMessages is allowed for below conditions:<br/>
* a. consumer must have Shared-subscription<br/>
* b. {@link maxUnackedMessages} value > 0
*
* @return
*/
private boolean shouldBlockConsumerOnUnackMsgs() {
return SubType.Shared.equals(subType) && maxUnackedMessages > 0;
}

public void updateRates() {
msgOut.calculateRate();
stats.msgRateOut = msgOut.getRate();
Expand All @@ -291,6 +326,7 @@ public void updateRates() {

public ConsumerStats getStats() {
stats.availablePermits = getAvailablePermits();
stats.unackedMessages = unackedMessages.get();
return stats;
}

Expand Down Expand Up @@ -338,22 +374,44 @@ public int hashCode() {
* @param position
*/
private void removePendingAcks(PositionImpl position) {
if (!pendingAcks.remove(position)) {
Consumer ackOwnedConsumer = null;
if (pendingAcks.get(position) == null) {
for (Consumer consumer : subscription.getConsumers()) {
if (!consumer.equals(this) && consumer.getPendingAcks().remove(position)) {
if (!consumer.equals(this) && consumer.getPendingAcks().get(position) != null) {
ackOwnedConsumer = consumer;
break;
}
}
} else {
ackOwnedConsumer = this;
}

// remove pending message from appropriate consumer and unblock unAckMsg-flow if requires
if (ackOwnedConsumer != null) {
int totalAckedMsgs = ackOwnedConsumer.getPendingAcks().remove(position);
// unblock consumer-throttling when receives half of maxUnackedMessages => consumer can start again
// consuming messages
if (ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs()
&& ((ackOwnedConsumer.unackedMessages.addAndGet(-totalAckedMsgs) == (maxUnackedMessages / 2))
&& ackOwnedConsumer.blockedConsumerOnUnackedMsgs)) {
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
subscription.consumerFlow(ackOwnedConsumer, ackOwnedConsumer.messagePermits.get());
}
}
}

public ConcurrentOpenHashSet<PositionImpl> getPendingAcks() {
public ConcurrentOpenHashMap<PositionImpl, Integer> getPendingAcks() {
return pendingAcks;
}

private static final Logger log = LoggerFactory.getLogger(Consumer.class);

public void redeliverUnacknowledgedMessages() {

// cleanup unackedMessage bucket and redeliver those unack-msgs again
unackedMessages.set(0);
blockedConsumerOnUnackedMsgs = false;
// redeliver unacked-msgs
subscription.redeliverUnacknowledgedMessages(this);
if (pendingAcks != null) {
pendingAcks.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.yahoo.pulsar.broker.service.persistent;

import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
Expand Down Expand Up @@ -118,7 +119,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer are left, reading more entries", name);
}
consumer.getPendingAcks().forEach(pendingMessages -> {
consumer.getPendingAcks().forEach((pendingMessages, totalMsg) -> {
messagesToReplay.add(pendingMessages);
});
totalAvailablePermits -= consumer.getAvailablePermits();
Expand Down Expand Up @@ -148,7 +149,7 @@ public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfM
}

private void readMoreEntries() {
if (totalAvailablePermits > 0) {
if (totalAvailablePermits > 0 && isUnblockedConsumerAvailable()) {
int messagesToRead = Math.min(totalAvailablePermits, readBatchSize);

if (!messagesToReplay.isEmpty()) {
Expand Down Expand Up @@ -257,7 +258,7 @@ public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size());
}

while (entriesToDispatch > 0 && totalAvailablePermits > 0) {
while (entriesToDispatch > 0 && totalAvailablePermits > 0 && isUnblockedConsumerAvailable()) {
Consumer c = getNextConsumer();
if (c == null) {
// Do nothing, cursor will be rewind at reconnection
Expand Down Expand Up @@ -352,12 +353,45 @@ private Consumer getNextConsumer() {
if (consumerIndex >= consumerList.size()) {
consumerIndex = 0;
}
return consumerList.get(consumerIndex++);

// find next available unblocked consumer
int unblockedConsumerIndex = consumerIndex;
do {
if (!consumerList.get(unblockedConsumerIndex).isBlocked()) {
consumerIndex = unblockedConsumerIndex;
return consumerList.get(consumerIndex++);
}
if (++unblockedConsumerIndex >= consumerList.size()) {
unblockedConsumerIndex = 0;
}
} while (unblockedConsumerIndex != consumerIndex);

// not found unblocked consumer
return null;
}

/**
* returns true only if {@link consumerList} has atleast one unblocked consumer
*
* @return
*/
private boolean isUnblockedConsumerAvailable() {
if (consumerList.isEmpty() || closeFuture != null) {
// abort read if no consumers are connected or if disconnect is initiated
return false;
}
Iterator<Consumer> consumerIterator = consumerList.iterator();
while (consumerIterator.hasNext()) {
if (!consumerIterator.next().isBlocked()) {
return true;
}
}
return false;
}

@Override
public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
consumer.getPendingAcks().forEach(pendingMessages -> {
consumer.getPendingAcks().forEach((pendingMessages, totalMsg) -> {
messagesToReplay.add(pendingMessages);
});
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,9 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) {
try {
PersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
name -> new PersistentSubscription(PersistentTopic.this, cursor));

Consumer consumer = new Consumer(subscription, subType, consumerId, consumerName, cnx,

Consumer consumer = new Consumer(subscription, subType, consumerId, consumerName,
brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerConsumer(), cnx,
cnx.getRole());
subscription.addConsumer(consumer);
if (!cnx.isActive()) {
Expand Down Expand Up @@ -835,6 +836,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
destStatsStream.writePair("address", consumerStats.address);
destStatsStream.writePair("consumerName", consumerStats.consumerName);
destStatsStream.writePair("availablePermits", consumerStats.availablePermits);
destStatsStream.writePair("unackedMessages", consumerStats.unackedMessages);
destStatsStream.writePair("connectedSince", consumerStats.connectedSince);
destStatsStream.writePair("msgRateOut", consumerStats.msgRateOut);
destStatsStream.writePair("msgThroughputOut", consumerStats.msgThroughputOut);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void testAddRemoveConsumer() throws Exception {

// 2. Add consumer
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, 1 /* consumer id */, "Cons1"/* consumer name */,
serverCnx, "myrole-1");
50000, serverCnx, "myrole-1");
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertTrue(consumers.get(0).consumerName() == consumer1.consumerName());
Expand All @@ -209,15 +209,15 @@ public void testAddRemoveConsumer() throws Exception {

// 5. Add another consumer which does not change active consumer
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, 2 /* consumer id */, "Cons2"/* consumer name */,
serverCnx, "myrole-1");
50000, serverCnx, "myrole-1");
pdfc.addConsumer(consumer2);
consumers = pdfc.getConsumers();
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName());
assertEquals(3, consumers.size());

// 6. Add a consumer which changes active consumer
Consumer consumer0 = new Consumer(sub, SubType.Exclusive, 0 /* consumer id */, "Cons0"/* consumer name */,
serverCnx, "myrole-1");
50000, serverCnx, "myrole-1");
pdfc.addConsumer(consumer0);
consumers = pdfc.getConsumers();
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer0.consumerName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ public void testAddRemoveConsumer() throws Exception {

// 1. simple add consumer
Consumer consumer = new Consumer(sub, SubType.Exclusive, 1 /* consumer id */, "Cons1"/* consumer name */,
serverCnx, "myrole-1");
50000, serverCnx, "myrole-1");
sub.addConsumer(consumer);
assertTrue(sub.getDispatcher().isConsumerConnected());

Expand Down Expand Up @@ -402,7 +402,7 @@ public void testUbsubscribeRaceConditions() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentSubscription sub = new PersistentSubscription(topic, cursorMock);
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, 1 /* consumer id */, "Cons1"/* consumer name */,
serverCnx, "myrole-1");
50000, serverCnx, "myrole-1");
sub.addConsumer(consumer1);

doAnswer(new Answer<Object>() {
Expand All @@ -424,7 +424,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
try {
Thread.sleep(10); /* delay to ensure that the ubsubscribe gets executed first */
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, 2 /* consumer id */, "Cons2"/* consumer name */,
serverCnx, "myrole-1");
50000, serverCnx, "myrole-1");
} catch (BrokerServiceException e) {
assertTrue(e instanceof BrokerServiceException.SubscriptionFencedException);
}
Expand Down
Loading

0 comments on commit 82bf1fc

Please sign in to comment.