Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unack-message threshold to restrict consumer for receiving messages without acknowledging-msg up to the threshold #48

Merged
merged 1 commit into from
Oct 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ tlsTrustCertsFilePath=
# Accept untrusted TLS certificate from client
tlsAllowInsecureConnection=false

# Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will stop sending
# messages to consumer once, this limit reaches until consumer starts acknowledging messages back.
# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
maxUnackedMessagesPerConsumer=50000

### --- Authentication --- ###

# Enable authentication
Expand Down
5 changes: 5 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ clientLibraryVersionCheckAllowUnversioned=true
# to service discovery health checks
statusFilePath=/usr/local/apache/htdocs

# Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will stop sending
# messages to consumer once, this limit reaches until consumer starts acknowledging messages back
# Using a value of 0, is disabling unackeMessage limit check and consumer can receive messages without any restriction
maxUnackedMessagesPerConsumer=50000

### --- Authentication --- ###

# Enable authentication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ public class ServiceConfiguration {
// Path for the file used to determine the rotation status for the broker
// when responding to service discovery health checks
private String statusFilePath;
// Max number of unacknowledged messages allowed to receive messages by a consumer on a shared subscription. Broker will stop sending
// messages to consumer once, this limit reaches until consumer starts acknowledging messages back
// Using a value of 0, is disabling unackedMessage-limit check and consumer can receive messages without any restriction
private int maxUnackedMessagesPerConsumer = 50000;

/***** --- TLS --- ****/
// Enable TLS
Expand Down Expand Up @@ -404,6 +408,14 @@ public String getStatusFilePath() {
public void setStatusFilePath(String statusFilePath) {
this.statusFilePath = statusFilePath;
}

public int getMaxUnackedMessagesPerConsumer() {
return maxUnackedMessagesPerConsumer;
}

public void setMaxUnackedMessagesPerConsumer(int maxUnackedMessagesPerConsumer) {
this.maxUnackedMessagesPerConsumer = maxUnackedMessagesPerConsumer;
}

public boolean isTlsEnabled() {
return tlsEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.policies.data.ConsumerStats;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashSet;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
Expand All @@ -63,17 +63,22 @@ public class Consumer {
// increase its availability
private final AtomicInteger messagePermits = new AtomicInteger(0);

private final ConcurrentOpenHashSet<PositionImpl> pendingAcks;
private final ConcurrentOpenHashMap<PositionImpl, Integer> pendingAcks;

private final ConsumerStats stats;

private final int maxUnackedMessages;
private AtomicInteger unackedMessages = new AtomicInteger(0);
private volatile boolean blockedConsumerOnUnackedMsgs = false;

public Consumer(Subscription subscription, SubType subType, long consumerId, String consumerName, ServerCnx cnx,
String appId) throws BrokerServiceException {
public Consumer(Subscription subscription, SubType subType, long consumerId, String consumerName,
int maxUnackedMessages, ServerCnx cnx, String appId) throws BrokerServiceException {

this.subscription = subscription;
this.subType = subType;
this.consumerId = consumerId;
this.consumerName = consumerName;
this.maxUnackedMessages = maxUnackedMessages;
this.cnx = cnx;
this.msgOut = new Rate();
this.appId = appId;
Expand All @@ -84,7 +89,7 @@ public Consumer(Subscription subscription, SubType subType, long consumerId, Str
stats.connectedSince = DATE_FORMAT.format(new Date(System.currentTimeMillis()));

if (subType == SubType.Shared) {
this.pendingAcks = new ConcurrentOpenHashSet<PositionImpl>(256, 2);
this.pendingAcks = new ConcurrentOpenHashMap<PositionImpl, Integer>(256, 2);
} else {
// We don't need to keep track of pending acks if the subscription is not shared
this.pendingAcks = null;
Expand Down Expand Up @@ -156,6 +161,12 @@ public ChannelPromise sendMessages(final List<Entry> entries) {
return writePromise;
}

private void incrementUnackedMessages(int ackedMessages) {
if (shouldBlockConsumerOnUnackMsgs() && unackedMessages.addAndGet(ackedMessages) >= maxUnackedMessages) {
blockedConsumerOnUnackedMsgs = true;
}
}

int getBatchSizeforEntry(ByteBuf metadataAndPayload) {
try {
// save the reader index and restore after parsing
Expand Down Expand Up @@ -191,12 +202,13 @@ void updatePermitsAndPendingAcks(final List<Entry> entries) {
}
if (pendingAcks != null) {
PositionImpl pos = PositionImpl.get((PositionImpl) entry.getPosition());
pendingAcks.add(pos);
pendingAcks.put(pos, batchSize);
}
permitsToReduce += batchSize;
}

// reduce permit and increment unackedMsg count with total number of messages in batch-msgs
int permits = messagePermits.addAndGet(-permitsToReduce);
incrementUnackedMessages(permitsToReduce);
if (permits < 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] message permits dropped below 0 - {}", subscription, consumerId, permits);
Expand Down Expand Up @@ -266,23 +278,46 @@ void messageAcked(CommandAck ack) {
} else {
subscription.acknowledgeMessage(position, ack.getAckType());
}

}

void flowPermits(int additionalNumberOfMessages) {
checkArgument(additionalNumberOfMessages > 0);

// block shared consumer when unacked-messages reaches limit
if (shouldBlockConsumerOnUnackMsgs() && unackedMessages.get() >= maxUnackedMessages) {
blockedConsumerOnUnackedMsgs = true;
}

int oldPermits = messagePermits.getAndAdd(additionalNumberOfMessages);
if (!blockedConsumerOnUnackedMsgs) {
subscription.consumerFlow(this, additionalNumberOfMessages);
}
if (log.isDebugEnabled()) {
log.debug("[{}] Added more flow control message permits {} (old was: {})", this, additionalNumberOfMessages,
oldPermits);
log.debug("[{}] Added more flow control message permits {} (old was: {})", this,
additionalNumberOfMessages, oldPermits);
}
subscription.consumerFlow(this, additionalNumberOfMessages);
}

public int getAvailablePermits() {
return messagePermits.get();
}

public boolean isBlocked() {
return blockedConsumerOnUnackedMsgs;
}

/**
* Checks if consumer-blocking on unAckedMessages is allowed for below conditions:<br/>
* a. consumer must have Shared-subscription<br/>
* b. {@link maxUnackedMessages} value > 0
*
* @return
*/
private boolean shouldBlockConsumerOnUnackMsgs() {
return SubType.Shared.equals(subType) && maxUnackedMessages > 0;
}

public void updateRates() {
msgOut.calculateRate();
stats.msgRateOut = msgOut.getRate();
Expand All @@ -291,6 +326,7 @@ public void updateRates() {

public ConsumerStats getStats() {
stats.availablePermits = getAvailablePermits();
stats.unackedMessages = unackedMessages.get();
return stats;
}

Expand Down Expand Up @@ -338,22 +374,44 @@ public int hashCode() {
* @param position
*/
private void removePendingAcks(PositionImpl position) {
if (!pendingAcks.remove(position)) {
Consumer ackOwnedConsumer = null;
if (pendingAcks.get(position) == null) {
for (Consumer consumer : subscription.getConsumers()) {
if (!consumer.equals(this) && consumer.getPendingAcks().remove(position)) {
if (!consumer.equals(this) && consumer.getPendingAcks().get(position) != null) {
ackOwnedConsumer = consumer;
break;
}
}
} else {
ackOwnedConsumer = this;
}

// remove pending message from appropriate consumer and unblock unAckMsg-flow if requires
if (ackOwnedConsumer != null) {
int totalAckedMsgs = ackOwnedConsumer.getPendingAcks().remove(position);
// unblock consumer-throttling when receives half of maxUnackedMessages => consumer can start again
// consuming messages
if (ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs()
&& ((ackOwnedConsumer.unackedMessages.addAndGet(-totalAckedMsgs) == (maxUnackedMessages / 2))
&& ackOwnedConsumer.blockedConsumerOnUnackedMsgs)) {
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
subscription.consumerFlow(ackOwnedConsumer, ackOwnedConsumer.messagePermits.get());
}
}
}

public ConcurrentOpenHashSet<PositionImpl> getPendingAcks() {
public ConcurrentOpenHashMap<PositionImpl, Integer> getPendingAcks() {
return pendingAcks;
}

private static final Logger log = LoggerFactory.getLogger(Consumer.class);

public void redeliverUnacknowledgedMessages() {

// cleanup unackedMessage bucket and redeliver those unack-msgs again
unackedMessages.set(0);
blockedConsumerOnUnackedMsgs = false;
// redeliver unacked-msgs
subscription.redeliverUnacknowledgedMessages(this);
if (pendingAcks != null) {
pendingAcks.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.yahoo.pulsar.broker.service.persistent;

import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
Expand Down Expand Up @@ -118,7 +119,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer are left, reading more entries", name);
}
consumer.getPendingAcks().forEach(pendingMessages -> {
consumer.getPendingAcks().forEach((pendingMessages, totalMsg) -> {
messagesToReplay.add(pendingMessages);
});
totalAvailablePermits -= consumer.getAvailablePermits();
Expand Down Expand Up @@ -148,7 +149,7 @@ public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfM
}

private void readMoreEntries() {
if (totalAvailablePermits > 0) {
if (totalAvailablePermits > 0 && isUnblockedConsumerAvailable()) {
int messagesToRead = Math.min(totalAvailablePermits, readBatchSize);

if (!messagesToReplay.isEmpty()) {
Expand Down Expand Up @@ -257,7 +258,7 @@ public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size());
}

while (entriesToDispatch > 0 && totalAvailablePermits > 0) {
while (entriesToDispatch > 0 && totalAvailablePermits > 0 && isUnblockedConsumerAvailable()) {
Consumer c = getNextConsumer();
if (c == null) {
// Do nothing, cursor will be rewind at reconnection
Expand Down Expand Up @@ -352,12 +353,45 @@ private Consumer getNextConsumer() {
if (consumerIndex >= consumerList.size()) {
consumerIndex = 0;
}
return consumerList.get(consumerIndex++);

// find next available unblocked consumer
int unblockedConsumerIndex = consumerIndex;
do {
if (!consumerList.get(unblockedConsumerIndex).isBlocked()) {
consumerIndex = unblockedConsumerIndex;
return consumerList.get(consumerIndex++);
}
if (++unblockedConsumerIndex >= consumerList.size()) {
unblockedConsumerIndex = 0;
}
} while (unblockedConsumerIndex != consumerIndex);

// not found unblocked consumer
return null;
}

/**
* returns true only if {@link consumerList} has atleast one unblocked consumer
*
* @return
*/
private boolean isUnblockedConsumerAvailable() {
if (consumerList.isEmpty() || closeFuture != null) {
// abort read if no consumers are connected or if disconnect is initiated
return false;
}
Iterator<Consumer> consumerIterator = consumerList.iterator();
while (consumerIterator.hasNext()) {
if (!consumerIterator.next().isBlocked()) {
return true;
}
}
return false;
}

@Override
public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
consumer.getPendingAcks().forEach(pendingMessages -> {
consumer.getPendingAcks().forEach((pendingMessages, totalMsg) -> {
messagesToReplay.add(pendingMessages);
});
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,9 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) {
try {
PersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
name -> new PersistentSubscription(PersistentTopic.this, cursor));

Consumer consumer = new Consumer(subscription, subType, consumerId, consumerName, cnx,

Consumer consumer = new Consumer(subscription, subType, consumerId, consumerName,
brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerConsumer(), cnx,
cnx.getRole());
subscription.addConsumer(consumer);
if (!cnx.isActive()) {
Expand Down Expand Up @@ -835,6 +836,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
destStatsStream.writePair("address", consumerStats.address);
destStatsStream.writePair("consumerName", consumerStats.consumerName);
destStatsStream.writePair("availablePermits", consumerStats.availablePermits);
destStatsStream.writePair("unackedMessages", consumerStats.unackedMessages);
destStatsStream.writePair("connectedSince", consumerStats.connectedSince);
destStatsStream.writePair("msgRateOut", consumerStats.msgRateOut);
destStatsStream.writePair("msgThroughputOut", consumerStats.msgThroughputOut);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void testAddRemoveConsumer() throws Exception {

// 2. Add consumer
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, 1 /* consumer id */, "Cons1"/* consumer name */,
serverCnx, "myrole-1");
50000, serverCnx, "myrole-1");
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
assertTrue(consumers.get(0).consumerName() == consumer1.consumerName());
Expand All @@ -209,15 +209,15 @@ public void testAddRemoveConsumer() throws Exception {

// 5. Add another consumer which does not change active consumer
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, 2 /* consumer id */, "Cons2"/* consumer name */,
serverCnx, "myrole-1");
50000, serverCnx, "myrole-1");
pdfc.addConsumer(consumer2);
consumers = pdfc.getConsumers();
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName());
assertEquals(3, consumers.size());

// 6. Add a consumer which changes active consumer
Consumer consumer0 = new Consumer(sub, SubType.Exclusive, 0 /* consumer id */, "Cons0"/* consumer name */,
serverCnx, "myrole-1");
50000, serverCnx, "myrole-1");
pdfc.addConsumer(consumer0);
consumers = pdfc.getConsumers();
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer0.consumerName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ public void testAddRemoveConsumer() throws Exception {

// 1. simple add consumer
Consumer consumer = new Consumer(sub, SubType.Exclusive, 1 /* consumer id */, "Cons1"/* consumer name */,
serverCnx, "myrole-1");
50000, serverCnx, "myrole-1");
sub.addConsumer(consumer);
assertTrue(sub.getDispatcher().isConsumerConnected());

Expand Down Expand Up @@ -402,7 +402,7 @@ public void testUbsubscribeRaceConditions() throws Exception {
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentSubscription sub = new PersistentSubscription(topic, cursorMock);
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, 1 /* consumer id */, "Cons1"/* consumer name */,
serverCnx, "myrole-1");
50000, serverCnx, "myrole-1");
sub.addConsumer(consumer1);

doAnswer(new Answer<Object>() {
Expand All @@ -424,7 +424,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
try {
Thread.sleep(10); /* delay to ensure that the ubsubscribe gets executed first */
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, 2 /* consumer id */, "Cons2"/* consumer name */,
serverCnx, "myrole-1");
50000, serverCnx, "myrole-1");
} catch (BrokerServiceException e) {
assertTrue(e instanceof BrokerServiceException.SubscriptionFencedException);
}
Expand Down
Loading