Skip to content

Commit

Permalink
Update documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Feb 24, 2017
1 parent 90a8273 commit 3fd5ba9
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -349,23 +349,55 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
}, waitTimeMillis, TimeUnit.MILLISECONDS);

}

/**
* Broker will first dispatch messages to upper priority-level consumers if they
* <pre>
* Broker gives more priority while dispatching messages. Here, broker follows descending priorities. (eg:
* 0=max-priority, 1, 2,..)
* <p>
* Broker will first dispatch messages to max priority-level consumers if they
* have permits, else broker will consider next priority level consumers.
* Also on the same priority-level, it selects consumer in round-robin manner.
* </p>
* <p>
* If subscription has consumer-A with priorityLevel 1 and Consumer-B with priorityLevel 2 then broker will dispatch
* messages to only consumer-A until it runs out permit and then broker starts dispatching messages to Consumer-B.
*
* <p>
* Consumer PriorityLevel Permits
* C1 1 2
* C2 1 1
* C3 1 1
* C1 0 2
* C2 0 1
* C3 0 1
* C4 1 2
* C5 1 1
* Result of getNextConsumer(): C1, C2, C3, C1, C4, C5, C4
* @return
* </pre>
*
* <pre>
* <b>Algorithm:</b>
* 1. sorted-list: consumers stored in sorted-list: max-priority stored first
* 2. currentConsumerRoundRobinIndex: it always stores last served consumer-index
* 3. resultingAvailableConsumerIndex: traversal index. we always start searching availableConsumer from the
* beginning of sorted-list and update resultingAvailableConsumerIndex according searching-traversal
*
* Each time getNextConsumer() is called:<p>
* 1. It always starts to traverse from the max-priority consumer (first element) from sorted-list
* 2. Consumers on same priority-level will be treated equally and it tries to pick one of them in round-robin manner
* 3. If consumer is not available on given priority-level then it will go to the next lower priority-level consumers
* 4. Optimization: <p>
* A. Consumers on same priority-level must be treated equally => dispatch message round-robin to them:
* [if Consumer of resultingAvailableConsumerIndex(current-traversal-index) has the same
* priority-level as consumer of currentConsumerRoundRobinIndex(last-Served-Consumer-Index)]
* <b>Dispatching in Round-Robin:</b> then it means we should do round-robin and skip all the consumers before
* currentConsumerRoundRobinIndex (as they are already served previously)
* a. if found: if we found availableConsumer on the same priority-level after currentConsumerRoundRobinIndex
* then return that consumer and update currentConsumerRoundRobinIndex with that consumer-index
* b. else not_found: if we don't find any consumer on that same-priority level after currentConsumerRoundRobinIndex
* - a. check skipped consumers: check skipped consumer (4.A.a) which are on index before than currentConsumerRoundRobinIndex
* - b. next priority-level: if not found in previous step: then it means no consumer available in prior level. So, move to
* next lower priority level and try to find next-available consumer as per 4.A
*
* </pre>
*
* @return nextAvailableConsumer
*/
public Consumer getNextConsumer() {
if (consumerList.isEmpty() || closeFuture != null) {
Expand All @@ -376,8 +408,8 @@ public Consumer getNextConsumer() {
if (currentConsumerRoundRobinIndex >= consumerList.size()) {
currentConsumerRoundRobinIndex = 0;
}
// index of resulting consumer which will be returned

// index of resulting consumer which will be returned
int resultingAvailableConsumerIndex = 0;
boolean scanFromBeginningIfCurrentConsumerNotAvailable = true;
int firstConsumerIndexOfCurrentPriorityLevel = -1;
Expand All @@ -400,7 +432,8 @@ public Consumer getNextConsumer() {
// check skipped consumer which had same priority as currentConsumerRoundRobinIndex consumer
boolean isLastConsumerBlocked = (currentConsumerRoundRobinIndex == (consumerList.size() - 1)
&& !isConsumerAvailable(consumerList.get(currentConsumerRoundRobinIndex)));
boolean shouldScanCurrentLevel = priorityLevel < 0 || isLastConsumerBlocked;
boolean shouldScanCurrentLevel = priorityLevel < 0
/* means moved to next lower-priority-level */ || isLastConsumerBlocked;
if (shouldScanCurrentLevel && scanFromBeginningIfCurrentConsumerNotAvailable) {
for (int i = firstConsumerIndexOfCurrentPriorityLevel; i < currentConsumerRoundRobinIndex; i++) {
Consumer nextConsumer = consumerList.get(i);
Expand All @@ -409,6 +442,7 @@ public Consumer getNextConsumer() {
return nextConsumer;
}
}
// now, we have scanned from the beginning: flip the flag to avoid scan again
scanFromBeginningIfCurrentConsumerNotAvailable = false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
Expand Down Expand Up @@ -309,10 +310,10 @@ public void testMultipleDispatcherGetNextConsumerWithDifferentPriorityLevel() th

private Consumer getNextConsumer(PersistentDispatcherMultipleConsumers dispatcher) throws Exception {
Consumer consumer = dispatcher.getNextConsumer();
Field field = Consumer.class.getDeclaredField("messagePermits");
Field field = Consumer.class.getDeclaredField("MESSAGE_PERMITS_UPDATER");
field.setAccessible(true);
AtomicInteger messagePermits = (AtomicInteger) field.get(consumer);
messagePermits.decrementAndGet();
AtomicIntegerFieldUpdater<Consumer> messagePermits = (AtomicIntegerFieldUpdater) field.get(consumer);
messagePermits.decrementAndGet(consumer);
return consumer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,12 +646,12 @@ public void testDuplicateConcurrentSubscribeCommand() throws Exception {
doReturn(delayFuture).when(brokerService).getTopic(any(String.class));
// Create subscriber first time
ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, "test" /* consumer name */);
successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, "test" /* consumer name */);
channel.writeInbound(clientCommand);

// Create producer second time
clientCommand = Commands.newSubscribe(successTopicName, //
successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, "test" /* consumer name */);
successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, "test" /* consumer name */);
channel.writeInbound(clientCommand);

Object response = getResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,22 @@ public int getPriorityLevel() {

/**
* Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching
* messages.
* </p>
* In Shared subscription mode, broker will first dispatch messages to upper priority-level consumers if they
* have permits, else broker will consider next priority level consumers.
* </p>
* If subscription has consumer-A with priorityLevel 1 and Consumer-B with priorityLevel 2 then broker will dispatch
* messages. Here, broker follows descending priorities. (eg: 0=max-priority, 1, 2,..) </br>
* In Shared subscription mode, broker will first dispatch messages to max priority-level consumers if they have
* permits, else broker will consider next priority level consumers. </br>
* If subscription has consumer-A with priorityLevel 0 and Consumer-B with priorityLevel 1 then broker will dispatch
* messages to only consumer-A until it runs out permit and then broker starts dispatching messages to Consumer-B.
*
* <pre>
* Consumer PriorityLevel Permits
* C1 0 2
* C2 0 1
* C3 0 1
* C4 1 2
* C5 1 1
* Order in which broker dispatches messages to consumers: C1, C2, C3, C1, C4, C5, C4
* </pre>
*
* @param priorityLevel
*/
public void setPriorityLevel(int priorityLevel) {
Expand Down

0 comments on commit 3fd5ba9

Please sign in to comment.