Skip to content

Commit

Permalink
refactor dispatcher's getting next consumer with consumer priority
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Apr 20, 2017
1 parent 7bf4197 commit 6dee55b
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -400,33 +400,19 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
*
* <pre>
* <b>Algorithm:</b>
* 1. sorted-list: consumers stored in sorted-list: max-priority stored first
* 1. consumerList: it stores consumers in sorted-list: max-priority stored first
* 2. currentConsumerRoundRobinIndex: it always stores last served consumer-index
* 3. resultingAvailableConsumerIndex: traversal index. we always start searching availableConsumer from the
* beginning of sorted-list and update resultingAvailableConsumerIndex according searching-traversal
*
* Each time getNextConsumer() is called:<p>
* 1. It always starts to traverse from the max-priority consumer (first element) from sorted-list
* 2. Consumers on same priority-level will be treated equally and it tries to pick one of them in round-robin manner
* 3. If consumer is not available on given priority-level then it will go to the next lower priority-level consumers
* 4. Optimization: <p>
* A. Consumers on same priority-level must be treated equally => dispatch message round-robin to them:
* [if Consumer of resultingAvailableConsumerIndex(current-traversal-index) has the same
* priority-level as consumer of currentConsumerRoundRobinIndex(last-Served-Consumer-Index)]
* <b>Dispatching in Round-Robin:</b> then it means we should do round-robin and skip all the consumers before
* currentConsumerRoundRobinIndex (as they are already served previously)
* a. if found: if we found availableConsumer on the same priority-level after currentConsumerRoundRobinIndex
* then return that consumer and update currentConsumerRoundRobinIndex with that consumer-index
* b. else not_found: if we don't find any consumer on that same-priority level after currentConsumerRoundRobinIndex
* - a. check skipped consumers: check skipped consumer (4.A.a) which are on index before than currentConsumerRoundRobinIndex
* - b. next priority-level: if not found in previous step: then it means no consumer available in prior level. So, move to
* next lower priority level and try to find next-available consumer as per 4.A
*
* 3. If consumer is not available on given priority-level then only it will go to the next lower priority-level consumers
* 4. Returns null in case it doesn't find any available consumer
* </pre>
*
* @return nextAvailableConsumer
*/
public Consumer getNextConsumer() {
private Consumer getNextConsumer() {
if (consumerList.isEmpty() || IS_CLOSED_UPDATER.get(this) == TRUE) {
// abort read if no consumers are connected or if disconnect is initiated
return null;
Expand All @@ -436,60 +422,102 @@ public Consumer getNextConsumer() {
currentConsumerRoundRobinIndex = 0;
}

// index of resulting consumer which will be returned
int resultingAvailableConsumerIndex = 0;
boolean scanFromBeginningIfCurrentConsumerNotAvailable = true;
int firstConsumerIndexOfCurrentPriorityLevel = -1;
do {
int priorityLevel = consumerList.get(currentConsumerRoundRobinIndex).getPriorityLevel()
- consumerList.get(resultingAvailableConsumerIndex).getPriorityLevel();
int currentRoundRobinConsumerPriority = consumerList.get(currentConsumerRoundRobinIndex).getPriorityLevel();

boolean isSamePriorityLevel = priorityLevel == 0;
// store first-consumer index with same-priority as currentConsumerRoundRobinIndex
if (isSamePriorityLevel && firstConsumerIndexOfCurrentPriorityLevel == -1) {
firstConsumerIndexOfCurrentPriorityLevel = resultingAvailableConsumerIndex;
// first find available-consumer on higher level unless currentIndex is not on highest level which is 0
if (currentRoundRobinConsumerPriority != 0) {
int higherPriorityConsumerIndex = getConsumerFromHigherPriority(currentRoundRobinConsumerPriority);
if (higherPriorityConsumerIndex != -1) {
currentConsumerRoundRobinIndex = higherPriorityConsumerIndex + 1;
return consumerList.get(higherPriorityConsumerIndex);
}
}

// currentIndex is already on highest level or couldn't find consumer on higher level so, find consumer on same or lower
// level
int availableConsumerIndex = getNextConsumerFromSameOrLowerLevel(currentConsumerRoundRobinIndex);
if (availableConsumerIndex != -1) {
currentConsumerRoundRobinIndex = availableConsumerIndex + 1;
return consumerList.get(availableConsumerIndex);
}

// skip already served same-priority-consumer to select consumer in round-robin manner
resultingAvailableConsumerIndex = (isSamePriorityLevel
&& currentConsumerRoundRobinIndex > resultingAvailableConsumerIndex)
? currentConsumerRoundRobinIndex : resultingAvailableConsumerIndex;

// if resultingAvailableConsumerIndex moved ahead of currentConsumerRoundRobinIndex: then we should
// check skipped consumer which had same priority as currentConsumerRoundRobinIndex consumer
boolean isLastConsumerBlocked = (resultingAvailableConsumerIndex == (consumerList.size() - 1)
&& !isConsumerAvailable(consumerList.get(resultingAvailableConsumerIndex)));
boolean shouldScanCurrentLevel = priorityLevel < 0
/* means moved to next lower-priority-level */ || isLastConsumerBlocked;
if (shouldScanCurrentLevel && scanFromBeginningIfCurrentConsumerNotAvailable) {
for (int i = firstConsumerIndexOfCurrentPriorityLevel; i < currentConsumerRoundRobinIndex; i++) {
Consumer nextConsumer = consumerList.get(i);
if (isConsumerAvailable(nextConsumer)) {
currentConsumerRoundRobinIndex = i + 1;
return nextConsumer;
}
// couldn't find available consumer
return null;
}

/**
* Finds index of first available consumer which has higher priority then given targetPriority
* @param targetPriority
* @return -1 if couldn't find any available consumer
*/
private int getConsumerFromHigherPriority(int targetPriority) {
for (int i = 0; i < currentConsumerRoundRobinIndex; i++) {
Consumer consumer = consumerList.get(i);
if (consumer.getPriorityLevel() < targetPriority) {
if (isConsumerAvailable(consumerList.get(i))) {
return i;
}
// now, we have scanned from the beginning: flip the flag to avoid scan again
scanFromBeginningIfCurrentConsumerNotAvailable = false;
} else {
break;
}
}
return -1;
}

/**
* Finds index of round-robin available consumer that present on same level as consumer on currentRoundRobinIndex if doesn't
* find consumer on same level then it finds first available consumer on lower priority level else returns index=-1
* if couldn't find any available consumer in the list
*
* @param currentRoundRobinIndex
* @return
*/
private int getNextConsumerFromSameOrLowerLevel(int currentRoundRobinIndex) {

int targetPriority = consumerList.get(currentRoundRobinIndex).getPriorityLevel();
// use to do round-robin if can't find consumer from currentRR to last-consumer in list
int scanIndex = currentRoundRobinIndex;
int endPriorityLevelIndex = currentRoundRobinIndex;
do {
Consumer scanConsumer = scanIndex < consumerList.size() ? consumerList.get(scanIndex)
: null /* reached to last consumer of list */;

Consumer nextConsumer = consumerList.get(resultingAvailableConsumerIndex);
if (isConsumerAvailable(nextConsumer)) {
currentConsumerRoundRobinIndex = resultingAvailableConsumerIndex + 1;
return nextConsumer;
// if reached to last consumer of list then check from beginning to currentRRIndex of the list
if (scanConsumer == null || scanConsumer.getPriorityLevel() != targetPriority) {
endPriorityLevelIndex = scanIndex; // last consumer on this level
scanIndex = getFirstConsumerIndexOfPriority(targetPriority);
} else {
if (isConsumerAvailable(scanConsumer)) {
return scanIndex;
}
scanIndex++;
}
if (++resultingAvailableConsumerIndex >= consumerList.size()) {
break;
} while (scanIndex != currentRoundRobinIndex);

// it means: didn't find consumer in the same priority-level so, check available consumer lower than this level
for (int i = endPriorityLevelIndex; i < consumerList.size(); i++) {
if (isConsumerAvailable(consumerList.get(i))) {
return i;
}
} while (true);
}

// not found unblocked consumer
if (log.isDebugEnabled()) {
log.debug("[{}] Couldn't find available consumer ", name);
return -1;
}

/**
* Finds index of first consumer in list which has same priority as given targetPriority
* @param targetPriority
* @return
*/
private int getFirstConsumerIndexOfPriority(int targetPriority) {
for (int i = 0; i < consumerList.size(); i++) {
if (consumerList.get(i).getPriorityLevel() == targetPriority) {
return i;
}
}
return null;
return -1;
}

/**
* returns true only if {@link consumerList} has atleast one unblocked consumer and have available permits
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.testng.AssertJUnit.assertTrue;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -416,7 +417,11 @@ public void testFewBlockedConsumerDifferentPriority2() throws Exception {
}

private Consumer getNextConsumer(PersistentDispatcherMultipleConsumers dispatcher) throws Exception {
Consumer consumer = dispatcher.getNextConsumer();

Method getNextConsumerMethod = PersistentDispatcherMultipleConsumers.class.getDeclaredMethod("getNextConsumer");
getNextConsumerMethod.setAccessible(true);
Consumer consumer = (Consumer) getNextConsumerMethod.invoke(dispatcher);

if (consumer != null) {
Field field = Consumer.class.getDeclaredField("MESSAGE_PERMITS_UPDATER");
field.setAccessible(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1268,7 +1268,7 @@ public void testShouldNotBlockConsumerIfRedeliverBeforeReceive() throws Exceptio
}

// (2) wait for consumer to receive messages
Thread.sleep(200);
Thread.sleep(1000);
assertEquals(consumer.numMessagesInQueue(), receiverQueueSize);

// (3) wait for messages to expire, we should've received more
Expand Down Expand Up @@ -1836,6 +1836,122 @@ public void testPriorityConsumer() throws Exception {
log.info("-- Exiting {} test --", methodName);
}

/**
* <pre>
* Verifies Dispatcher dispatches messages properly with shared-subscription consumers with combination of blocked
* and unblocked consumers.
*
* 1. Dispatcher will have 5 consumers : c1, c2, c3, c4, c5.
* Out of which : c1,c2,c4,c5 will be blocked due to MaxUnackedMessages limit.
* 2. So, dispatcher should moves round-robin and make sure it delivers unblocked consumer : c3
* </pre>
*
* @throws Exception
*/
@Test(timeOut=5000)
public void testSharedSamePriorityConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);
ConsumerConfiguration conf1 = new ConsumerConfiguration();
conf1.setSubscriptionType(SubscriptionType.Shared);
final int queueSize = 5;
conf1.setReceiverQueueSize(queueSize);
int maxUnAckMsgs = pulsar.getConfiguration().getMaxConcurrentLookupRequest();
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(queueSize);
Consumer c1 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic2", "my-subscriber-name",
conf1);
Consumer c2 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic2", "my-subscriber-name",
conf1);
ProducerConfiguration producerConf = new ProducerConfiguration();
Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic2", producerConf);
List<Future<MessageId>> futures = Lists.newArrayList();

// Asynchronously produce messages
final int totalPublishMessages = 500;
for (int i = 0; i < totalPublishMessages; i++) {
final String message = "my-message-" + i;
Future<MessageId> future = producer.sendAsync(message.getBytes());
futures.add(future);
}

log.info("Waiting for async publish to complete");
for (Future<MessageId> future : futures) {
future.get();
}

List<Message> messages = Lists.newArrayList();

// let consumer1 and consumer2 cosume messages up to the queue will be full
for (int i = 0; i < totalPublishMessages; i++) {
Message msg = c1.receive(500, TimeUnit.MILLISECONDS);
if (msg != null) {
messages.add(msg);
} else {
break;
}
}
for (int i = 0; i < totalPublishMessages; i++) {
Message msg = c2.receive(500, TimeUnit.MILLISECONDS);
if (msg != null) {
messages.add(msg);
} else {
break;
}
}

Assert.assertEquals(queueSize * 2, messages.size());

// create new consumers with the same priority
Consumer c3 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic2", "my-subscriber-name",
conf1);
Consumer c4 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic2", "my-subscriber-name",
conf1);
Consumer c5 = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic2", "my-subscriber-name",
conf1);

// c1 and c2 are blocked: so, let c3, c4 and c5 consume rest of the messages

for (int i = 0; i < totalPublishMessages; i++) {
Message msg = c4.receive(500, TimeUnit.MILLISECONDS);
if (msg != null) {
messages.add(msg);
} else {
break;
}
}

for (int i = 0; i < totalPublishMessages; i++) {
Message msg = c5.receive(500, TimeUnit.MILLISECONDS);
if (msg != null) {
messages.add(msg);
} else {
break;
}
}

for (int i = 0; i < totalPublishMessages; i++) {
Message msg = c3.receive(500, TimeUnit.MILLISECONDS);
if (msg != null) {
messages.add(msg);
c3.acknowledge(msg);
} else {
break;
}
}

// total messages must be consumed by all consumers
Assert.assertEquals(messages.size(), totalPublishMessages);

// Asynchronously acknowledge upto and including the last message
producer.close();
c1.close();
c2.close();
c3.close();
c4.close();
c5.close();
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnAckMsgs);
log.info("-- Exiting {} test --", methodName);
}

@Test
public void testRedeliveryFailOverConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);
Expand Down

0 comments on commit 6dee55b

Please sign in to comment.