Skip to content

Commit

Permalink
Message dispatching based on consumer priority-level (#165)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored and merlimat committed Feb 26, 2017
1 parent 82f9adb commit cd2d16e
Show file tree
Hide file tree
Showing 15 changed files with 380 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class Consumer {
private final String appId;

private final long consumerId;
private final int priorityLevel;
private final String consumerName;
private final Rate msgOut;
private final Rate msgRedeliver;
Expand Down Expand Up @@ -88,12 +89,13 @@ public class Consumer {
private volatile int unackedMessages = 0;
private volatile boolean blockedConsumerOnUnackedMsgs = false;

public Consumer(Subscription subscription, SubType subType, long consumerId, String consumerName,
public Consumer(Subscription subscription, SubType subType, long consumerId, int priorityLevel, String consumerName,
int maxUnackedMessages, ServerCnx cnx, String appId) throws BrokerServiceException {

this.subscription = subscription;
this.subType = subType;
this.consumerId = consumerId;
this.priorityLevel = priorityLevel;
this.consumerName = consumerName;
this.maxUnackedMessages = maxUnackedMessages;
this.cnx = cnx;
Expand Down Expand Up @@ -476,8 +478,10 @@ private void removePendingAcks(PositionImpl position) {
public ConcurrentOpenHashMap<PositionImpl, Integer> getPendingAcks() {
return pendingAcks;
}

private static final Logger log = LoggerFactory.getLogger(Consumer.class);

public int getPriorityLevel() {
return priorityLevel;
}

public void redeliverUnacknowledgedMessages() {
// cleanup unackedMessage bucket and redeliver those unack-msgs again
Expand Down Expand Up @@ -526,4 +530,6 @@ public void redeliverUnacknowledgedMessages(List<MessageIdData> messageIds) {
subscription.consumerFlow(this, numberOfBlockedPermits);
}
}

private static final Logger log = LoggerFactory.getLogger(Consumer.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
final long consumerId = subscribe.getConsumerId();
final SubType subType = subscribe.getSubType();
final String consumerName = subscribe.getConsumerName();
final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;

authorizationFuture.thenApply(isAuthorized -> {
if (isAuthorized) {
Expand Down Expand Up @@ -279,7 +280,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
}

service.getTopic(topicName).thenCompose(
topic -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId, subType, consumerName))
topic -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId, subType, priorityLevel, consumerName))
.thenAccept(consumer -> {
if (consumerFuture.complete(consumer)) {
log.info("[{}] Created subscription on topic {} / {}", remoteAddress, topicName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public interface PublishCallback {
void removeProducer(Producer producer);

CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType,
String consumerName);
int priorityLevel, String consumerName);

CompletableFuture<Void> unsubscribe(String subName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,19 @@
*/
package com.yahoo.pulsar.broker.service.persistent;

import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import com.yahoo.pulsar.common.api.proto.PulsarApi;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -62,7 +59,7 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
private CompletableFuture<Void> closeFuture = null;
private TreeSet<PositionImpl> messagesToReplay;

private int consumerIndex = 0;
private int currentConsumerRoundRobinIndex = 0;
private boolean havePendingRead = false;
private boolean havePendingReplayRead = false;
private boolean shouldRewindBeforeReadingOrReplaying = false;
Expand Down Expand Up @@ -98,6 +95,7 @@ public synchronized void addConsumer(Consumer consumer) {
}

consumerList.add(consumer);
consumerList.sort((c1, c2) -> c1.getPriorityLevel() - c2.getPriorityLevel());
consumerSet.add(consumer);
}

Expand Down Expand Up @@ -351,28 +349,112 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
}, waitTimeMillis, TimeUnit.MILLISECONDS);

}

private Consumer getNextConsumer() {

/**
* <pre>
* Broker gives more priority while dispatching messages. Here, broker follows descending priorities. (eg:
* 0=max-priority, 1, 2,..)
* <p>
* Broker will first dispatch messages to max priority-level consumers if they
* have permits, else broker will consider next priority level consumers.
* Also on the same priority-level, it selects consumer in round-robin manner.
* <p>
* If subscription has consumer-A with priorityLevel 1 and Consumer-B with priorityLevel 2 then broker will dispatch
* messages to only consumer-A until it runs out permit and then broker starts dispatching messages to Consumer-B.
* <p>
* Consumer PriorityLevel Permits
* C1 0 2
* C2 0 1
* C3 0 1
* C4 1 2
* C5 1 1
* Result of getNextConsumer(): C1, C2, C3, C1, C4, C5, C4
* </pre>
*
* <pre>
* <b>Algorithm:</b>
* 1. sorted-list: consumers stored in sorted-list: max-priority stored first
* 2. currentConsumerRoundRobinIndex: it always stores last served consumer-index
* 3. resultingAvailableConsumerIndex: traversal index. we always start searching availableConsumer from the
* beginning of sorted-list and update resultingAvailableConsumerIndex according searching-traversal
*
* Each time getNextConsumer() is called:<p>
* 1. It always starts to traverse from the max-priority consumer (first element) from sorted-list
* 2. Consumers on same priority-level will be treated equally and it tries to pick one of them in round-robin manner
* 3. If consumer is not available on given priority-level then it will go to the next lower priority-level consumers
* 4. Optimization: <p>
* A. Consumers on same priority-level must be treated equally => dispatch message round-robin to them:
* [if Consumer of resultingAvailableConsumerIndex(current-traversal-index) has the same
* priority-level as consumer of currentConsumerRoundRobinIndex(last-Served-Consumer-Index)]
* <b>Dispatching in Round-Robin:</b> then it means we should do round-robin and skip all the consumers before
* currentConsumerRoundRobinIndex (as they are already served previously)
* a. if found: if we found availableConsumer on the same priority-level after currentConsumerRoundRobinIndex
* then return that consumer and update currentConsumerRoundRobinIndex with that consumer-index
* b. else not_found: if we don't find any consumer on that same-priority level after currentConsumerRoundRobinIndex
* - a. check skipped consumers: check skipped consumer (4.A.a) which are on index before than currentConsumerRoundRobinIndex
* - b. next priority-level: if not found in previous step: then it means no consumer available in prior level. So, move to
* next lower priority level and try to find next-available consumer as per 4.A
*
* </pre>
*
* @return nextAvailableConsumer
*/
public Consumer getNextConsumer() {
if (consumerList.isEmpty() || closeFuture != null) {
// abort read if no consumers are connected or if disconnect is initiated
return null;
}

if (consumerIndex >= consumerList.size()) {
consumerIndex = 0;
if (currentConsumerRoundRobinIndex >= consumerList.size()) {
currentConsumerRoundRobinIndex = 0;
}

// find next available unblocked consumer
int unblockedConsumerIndex = consumerIndex;

// index of resulting consumer which will be returned
int resultingAvailableConsumerIndex = 0;
boolean scanFromBeginningIfCurrentConsumerNotAvailable = true;
int firstConsumerIndexOfCurrentPriorityLevel = -1;
do {
if (isConsumerAvailable(consumerList.get(unblockedConsumerIndex))) {
consumerIndex = unblockedConsumerIndex;
return consumerList.get(consumerIndex++);
int priorityLevel = consumerList.get(currentConsumerRoundRobinIndex).getPriorityLevel()
- consumerList.get(resultingAvailableConsumerIndex).getPriorityLevel();

boolean isSamePriorityLevel = priorityLevel == 0;
// store first-consumer index with same-priority as currentConsumerRoundRobinIndex
if (isSamePriorityLevel && firstConsumerIndexOfCurrentPriorityLevel == -1) {
firstConsumerIndexOfCurrentPriorityLevel = resultingAvailableConsumerIndex;
}

// skip already served same-priority-consumer to select consumer in round-robin manner
resultingAvailableConsumerIndex = (isSamePriorityLevel
&& currentConsumerRoundRobinIndex > resultingAvailableConsumerIndex)
? currentConsumerRoundRobinIndex : resultingAvailableConsumerIndex;

// if resultingAvailableConsumerIndex moved ahead of currentConsumerRoundRobinIndex: then we should
// check skipped consumer which had same priority as currentConsumerRoundRobinIndex consumer
boolean isLastConsumerBlocked = (currentConsumerRoundRobinIndex == (consumerList.size() - 1)
&& !isConsumerAvailable(consumerList.get(currentConsumerRoundRobinIndex)));
boolean shouldScanCurrentLevel = priorityLevel < 0
/* means moved to next lower-priority-level */ || isLastConsumerBlocked;
if (shouldScanCurrentLevel && scanFromBeginningIfCurrentConsumerNotAvailable) {
for (int i = firstConsumerIndexOfCurrentPriorityLevel; i < currentConsumerRoundRobinIndex; i++) {
Consumer nextConsumer = consumerList.get(i);
if (isConsumerAvailable(nextConsumer)) {
currentConsumerRoundRobinIndex = i + 1;
return nextConsumer;
}
}
// now, we have scanned from the beginning: flip the flag to avoid scan again
scanFromBeginningIfCurrentConsumerNotAvailable = false;
}

Consumer nextConsumer = consumerList.get(resultingAvailableConsumerIndex);
if (isConsumerAvailable(nextConsumer)) {
currentConsumerRoundRobinIndex = resultingAvailableConsumerIndex + 1;
return nextConsumer;
}
if (++unblockedConsumerIndex >= consumerList.size()) {
unblockedConsumerIndex = 0;
if (++resultingAvailableConsumerIndex >= consumerList.size()) {
break;
}
} while (unblockedConsumerIndex != consumerIndex);
} while (true);

// not found unblocked consumer
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ public void removeProducer(Producer producer) {

@Override
public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
SubType subType, String consumerName) {
SubType subType, int priorityLevel, String consumerName) {

final CompletableFuture<Consumer> future = new CompletableFuture<>();

Expand Down Expand Up @@ -367,7 +367,7 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) {
PersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
name -> new PersistentSubscription(PersistentTopic.this, cursor));

Consumer consumer = new Consumer(subscription, subType, consumerId, consumerName,
Consumer consumer = new Consumer(subscription, subType, consumerId, priorityLevel, consumerName,
brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerConsumer(), cnx,
cnx.getRole());
subscription.addConsumer(consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue;

import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
Expand All @@ -46,6 +49,7 @@
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand All @@ -60,6 +64,7 @@
import com.yahoo.pulsar.broker.service.Consumer;
import com.yahoo.pulsar.broker.service.BrokerService;
import com.yahoo.pulsar.broker.service.ServerCnx;
import com.yahoo.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import com.yahoo.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import com.yahoo.pulsar.broker.service.persistent.PersistentSubscription;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
Expand Down Expand Up @@ -191,7 +196,7 @@ public void testAddRemoveConsumer() throws Exception {
assertFalse(pdfc.isConsumerConnected());

// 2. Add consumer
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, 1 /* consumer id */, "Cons1"/* consumer name */,
Consumer consumer1 = new Consumer(sub, SubType.Exclusive, 1 /* consumer id */, 0, "Cons1"/* consumer name */,
50000, serverCnx, "myrole-1");
pdfc.addConsumer(consumer1);
List<Consumer> consumers = pdfc.getConsumers();
Expand All @@ -208,15 +213,15 @@ public void testAddRemoveConsumer() throws Exception {
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName());

// 5. Add another consumer which does not change active consumer
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, 2 /* consumer id */, "Cons2"/* consumer name */,
Consumer consumer2 = new Consumer(sub, SubType.Exclusive, 2 /* consumer id */, 0, "Cons2"/* consumer name */,
50000, serverCnx, "myrole-1");
pdfc.addConsumer(consumer2);
consumers = pdfc.getConsumers();
assertTrue(pdfc.getActiveConsumer().consumerName() == consumer1.consumerName());
assertEquals(3, consumers.size());

// 6. Add a consumer which changes active consumer
Consumer consumer0 = new Consumer(sub, SubType.Exclusive, 0 /* consumer id */, "Cons0"/* consumer name */,
Consumer consumer0 = new Consumer(sub, SubType.Exclusive, 0 /* consumer id */, 0, "Cons0"/* consumer name */,
50000, serverCnx, "myrole-1");
pdfc.addConsumer(consumer0);
consumers = pdfc.getConsumers();
Expand Down Expand Up @@ -257,6 +262,69 @@ public void testAddRemoveConsumer() throws Exception {
assertTrue(pdfc.canUnsubscribe(consumer1));

}

@Test
public void testMultipleDispatcherGetNextConsumerWithDifferentPriorityLevel() throws Exception {

PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock);
Consumer consumer1 = createConsumer(0, 2, 1);
Consumer consumer2 = createConsumer(0, 2, 2);
Consumer consumer3 = createConsumer(0, 2, 3);
Consumer consumer4 = createConsumer(1, 2, 4);
Consumer consumer5 = createConsumer(1, 1, 5);
Consumer consumer6 = createConsumer(1, 2, 6);
Consumer consumer7 = createConsumer(2, 1, 7);
Consumer consumer8 = createConsumer(2, 1, 8);
Consumer consumer9 = createConsumer(2, 1, 9);
dispatcher.addConsumer(consumer1);
dispatcher.addConsumer(consumer2);
dispatcher.addConsumer(consumer3);
dispatcher.addConsumer(consumer4);
dispatcher.addConsumer(consumer5);
dispatcher.addConsumer(consumer6);
dispatcher.addConsumer(consumer7);
dispatcher.addConsumer(consumer8);
dispatcher.addConsumer(consumer9);
Assert.assertEquals(getNextConsumer(dispatcher), consumer1);
Assert.assertEquals(getNextConsumer(dispatcher), consumer2);
Assert.assertEquals(getNextConsumer(dispatcher), consumer3);
Assert.assertEquals(getNextConsumer(dispatcher), consumer1);
Assert.assertEquals(getNextConsumer(dispatcher), consumer2);
Assert.assertEquals(getNextConsumer(dispatcher), consumer3);
Assert.assertEquals(getNextConsumer(dispatcher), consumer4);
Assert.assertEquals(getNextConsumer(dispatcher), consumer5);
Assert.assertEquals(getNextConsumer(dispatcher), consumer6);
Assert.assertEquals(getNextConsumer(dispatcher), consumer4);
Assert.assertEquals(getNextConsumer(dispatcher), consumer6);
Assert.assertEquals(getNextConsumer(dispatcher), consumer7);
Assert.assertEquals(getNextConsumer(dispatcher), consumer8);
// in between add upper priority consumer with more permits
Consumer consumer10 = createConsumer(0, 2, 10);
dispatcher.addConsumer(consumer10);
Assert.assertEquals(getNextConsumer(dispatcher), consumer10);
Assert.assertEquals(getNextConsumer(dispatcher), consumer10);
Assert.assertEquals(getNextConsumer(dispatcher), consumer9);

}

private Consumer getNextConsumer(PersistentDispatcherMultipleConsumers dispatcher) throws Exception {
Consumer consumer = dispatcher.getNextConsumer();
Field field = Consumer.class.getDeclaredField("MESSAGE_PERMITS_UPDATER");
field.setAccessible(true);
AtomicIntegerFieldUpdater<Consumer> messagePermits = (AtomicIntegerFieldUpdater) field.get(consumer);
messagePermits.decrementAndGet(consumer);
return consumer;
}

private Consumer createConsumer(int priority, int permit, int id) throws BrokerServiceException {
Consumer consumer = new Consumer(null, SubType.Shared, id, priority, ""+id, 5000, serverCnx, "appId");
try {
consumer.flowPermits(permit);
} catch (Exception e) {
}
return consumer;
}

private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherFailoverConsumerTest.class);

Expand Down
Loading

0 comments on commit cd2d16e

Please sign in to comment.