Skip to content

Commit

Permalink
Improved efficiency in KeyShared dispatcher (apache#7104)
Browse files Browse the repository at this point in the history
Instead of grouping the messages by same key, group them directly by same consumer in order to achieve max possible grouping when sending them. Also reuse the map for messages grouping as a thread local.
  • Loading branch information
merlimat authored Jun 4, 2020
1 parent d4dfcc9 commit 605868d
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public void filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchS

for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
Entry entry = entries.get(i);
if (entry == null) {
continue;
}

ByteBuf metadataAndPayload = entry.getDataBuffer();

MessageMetadata msgMetadata = Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,8 @@ public void removeConsumer(Consumer consumer) {

@Override
public Consumer select(byte[] stickyKey) {
return select(Murmur3_32Hash.getInstance().makeHash(stickyKey));
}
int hash = Murmur3_32Hash.getInstance().makeHash(stickyKey);

@Override
public Consumer select(int hash) {
rwLock.readLock().lock();
try {
if (hashRing.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,7 @@ public synchronized void removeConsumer(Consumer consumer) {

@Override
public Consumer select(byte[] stickyKey) {
return select(Murmur3_32Hash.getInstance().makeHash(stickyKey));
}

public Consumer select(int hash) {
int hash = Murmur3_32Hash.getInstance().makeHash(stickyKey);
if (rangeMap.size() > 0) {
int slot = hash % rangeSize;
return rangeMap.ceilingEntry(slot).getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public Consumer select(byte[] stickyKey) {
return select(Murmur3_32Hash.getInstance().makeHash(stickyKey));
}

public Consumer select(int hash) {
Consumer select(int hash) {
if (rangeMap.size() > 0) {
int slot = hash % rangeSize;
Map.Entry<Integer, Consumer> ceilingEntry = rangeMap.ceilingEntry(slot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,4 @@ public interface StickyKeyConsumerSelector {
* @return consumer
*/
Consumer select(byte[] stickyKey);

/**
* Select a consumer by hash of the sticky they
* @param keyHash hash of sticky key
* @return
*/
Consumer select(int keyHash);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,21 @@
*/
package org.apache.pulsar.broker.service.nonpersistent;

import io.netty.util.concurrent.FastThreadLocal;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.Murmur3_32Hash;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersistentDispatcherMultipleConsumers {

Expand Down Expand Up @@ -62,37 +61,42 @@ public SubType getType() {
return SubType.Key_Shared;
}

private static final FastThreadLocal<Map<Consumer, List<Entry>>> localGroupedEntries = new FastThreadLocal<Map<Consumer, List<Entry>>>() {
@Override
protected Map<Consumer, List<Entry>> initialValue() throws Exception {
return new HashMap<>();
}
};

@Override
public void sendMessages(List<Entry> entries) {
if (entries.size() > 0) {
final Map<Integer, List<Entry>> groupedEntries = new HashMap<>();
for (Entry entry : entries) {
int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
groupedEntries.putIfAbsent(key, new ArrayList<>());
groupedEntries.get(key).add(entry);
}
final Iterator<Map.Entry<Integer, List<Entry>>> iterator = groupedEntries.entrySet().iterator();
while (iterator.hasNext()) {
final Map.Entry<Integer, List<Entry>> entriesWithSameKey = iterator.next();
//TODO: None key policy
Consumer consumer = selector.select(entriesWithSameKey.getKey());
if (consumer != null) {
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesWithSameKey.getValue().size());
filterEntriesForConsumer(entriesWithSameKey.getValue(), batchSizes, sendMessageInfo, null, null);
consumer.sendMessages(entriesWithSameKey.getValue(), batchSizes, null, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
} else {
entries.forEach(entry -> {
int totalMsgs = Commands.getNumberOfMessagesInBatch(entry.getDataBuffer(), subscription.toString(), -1);
if (totalMsgs > 0) {
msgDrop.recordEvent(totalMsgs);
}
entry.release();
});
}
}
if (!entries.isEmpty()) {
return;
}

if (consumerSet.isEmpty()) {
entries.forEach(Entry::release);
return;
}

final Map<Consumer, List<Entry>> groupedEntries = localGroupedEntries.get();
groupedEntries.clear();

for (Entry entry : entries) {
Consumer consumer = selector.select(peekStickyKey(entry.getDataBuffer()));
groupedEntries.computeIfAbsent(consumer, k -> new ArrayList<>()).add(entry);
}

for (Map.Entry<Consumer, List<Entry>> entriesByConsumer : groupedEntries.entrySet()) {
Consumer consumer = entriesByConsumer.getKey();
List<Entry> entriesForConsumer = entriesByConsumer.getValue();

SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForConsumer.size());
filterEntriesForConsumer(entriesForConsumer, batchSizes, sendMessageInfo, null, null);
consumer.sendMessages(entriesForConsumer, batchSizes, null, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -215,8 +214,8 @@ public synchronized void internalReadEntriesComplete(final List<Entry> entries,
Iterator<Entry> iterator = entries.iterator();
while (iterator.hasNext()) {
Entry entry = iterator.next();
int keyHash = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
Consumer consumer = stickyKeyConsumerSelector.select(keyHash);
byte[] key = peekStickyKey(entry.getDataBuffer());
Consumer consumer = stickyKeyConsumerSelector.select(key);
if (consumer == null || currentConsumer != consumer) {
iterator.remove();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
*/
package org.apache.pulsar.broker.service.persistent;

import io.netty.util.concurrent.FastThreadLocal;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -37,7 +38,6 @@
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -63,77 +63,89 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE
selector.removeConsumer(consumer);
}

private static final FastThreadLocal<Map<Consumer, List<Entry>>> localGroupedEntries = new FastThreadLocal<Map<Consumer, List<Entry>>>() {
@Override
protected Map<Consumer, List<Entry>> initialValue() throws Exception {
return new HashMap<>();
}
};

@Override
protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
long totalMessagesSent = 0;
long totalBytesSent = 0;
int entriesCount = entries.size();

// Trigger read more messages
if (entries.size() == 0) {
if (entriesCount == 0) {
readMoreEntries();
return;
}
final Map<Integer, List<Entry>> groupedEntries = new HashMap<>();
for (Entry entry : entries) {
int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
groupedEntries.putIfAbsent(key, new ArrayList<>());
groupedEntries.get(key).add(entry);

if (consumerSet.isEmpty()) {
entries.forEach(Entry::release);
cursor.rewind();
return;
}
final Iterator<Map.Entry<Integer, List<Entry>>> iterator = groupedEntries.entrySet().iterator();

final Map<Consumer, List<Entry>> groupedEntries = localGroupedEntries.get();
groupedEntries.clear();

for (int i = 0; i < entriesCount; i++) {
Entry entry = entries.get(i);
Consumer c = selector.select(peekStickyKey(entry.getDataBuffer()));
groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry);
}

AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());
while (iterator.hasNext() && totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
final Map.Entry<Integer, List<Entry>> entriesWithSameKey = iterator.next();
//TODO: None key policy
Consumer consumer = selector.select(entriesWithSameKey.getKey());
if (consumer == null) {
// Do nothing, cursor will be rewind at reconnection
log.info("[{}] rewind because no available consumer found for key {} from total {}", name,
entriesWithSameKey.getKey(), consumerList.size());
entriesWithSameKey.getValue().forEach(Entry::release);
cursor.rewind();
return;
}

int availablePermits = consumer.isWritable() ? consumer.getAvailablePermits() : 1;
if (log.isDebugEnabled() && !consumer.isWritable()) {
log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {} ", topic.getName(), name,
consumer);
}
int messagesForC = Math.min(entriesWithSameKey.getValue().size(), availablePermits);
for (Map.Entry<Consumer, List<Entry>> current : groupedEntries.entrySet()) {
Consumer consumer = current.getKey();
List<Entry> entriesWithSameKey = current.getValue();
int entriesWithSameKeyCount = entriesWithSameKey.size();

int messagesForC = Math.min(entriesWithSameKeyCount, consumer.getAvailablePermits());
if (log.isDebugEnabled()) {
log.debug("[{}] select consumer {} for key {} with messages num {}, read type is {}",
name, consumer.consumerName(), entriesWithSameKey.getKey(), messagesForC, readType);
log.debug("[{}] select consumer {} with messages num {}, read type is {}",
name, consumer.consumerName(), messagesForC, readType);
}

if (messagesForC < entriesWithSameKeyCount) {
// We are not able to push all the messages with given key to its consumer,
// so we discard for now and mark them for later redelivery
for (int i = messagesForC; i < entriesWithSameKeyCount; i++) {
Entry entry = entriesWithSameKey.get(i);
messagesToRedeliver.add(entry.getLedgerId(), entry.getEntryId());
entry.release();
entriesWithSameKey.set(i, null);
}
}

if (messagesForC > 0) {
// remove positions first from replay list first : sendMessages recycles entries
List<Entry> subList = new ArrayList<>(entriesWithSameKey.getValue().subList(0, messagesForC));
if (readType == ReadType.Replay) {
subList.forEach(entry -> messagesToRedeliver.remove(entry.getLedgerId(), entry.getEntryId()));
for (int i = 0; i < messagesForC; i++) {
Entry entry = entriesWithSameKey.get(i);
messagesToRedeliver.remove(entry.getLedgerId(), entry.getEntryId());
}
}

SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(subList.size());
EntryBatchSizes batchSizes = EntryBatchSizes.get(messagesForC);
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get();
filterEntriesForConsumer(subList, batchSizes, sendMessageInfo, batchIndexesAcks, cursor);
filterEntriesForConsumer(entriesWithSameKey, batchSizes, sendMessageInfo, batchIndexesAcks, cursor);

consumer.sendMessages(subList, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
consumer.sendMessages(entriesWithSameKey, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
getRedeliveryTracker()).addListener(future -> {
if (future.isSuccess() && keyNumbers.decrementAndGet() == 0) {
readMoreEntries();
}
});

for (int i = 0; i < messagesForC; i++) {
entriesWithSameKey.getValue().remove(0);
}

TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, -(sendMessageInfo.getTotalMessages() - batchIndexesAcks.getTotalAckedIndexCount()));
totalMessagesSent += sendMessageInfo.getTotalMessages();
totalBytesSent += sendMessageInfo.getTotalBytes();

if (entriesWithSameKey.getValue().size() == 0) {
iterator.remove();
}
}
}

Expand All @@ -147,21 +159,6 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
dispatchRateLimiter.get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
}
}

if (groupedEntries.size() > 0) {
int laterReplay = 0;
for (List<Entry> entryList : groupedEntries.values()) {
laterReplay += entryList.size();
entryList.forEach(entry -> {
messagesToRedeliver.add(entry.getLedgerId(), entry.getEntryId());
entry.release();
});
}
if (log.isDebugEnabled()) {
log.debug("[{}] No consumers found with available permits, storing {} positions for later replay", name,
laterReplay);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void testSendMarkerMessage() {
}

ArgumentCaptor<Integer> totalMessagesCaptor = ArgumentCaptor.forClass(Integer.class);
verify(consumerMock, times(2)).sendMessages(
verify(consumerMock, times(1)).sendMessages(
anyList(),
any(EntryBatchSizes.class),
any(EntryBatchIndexesAcks.class),
Expand All @@ -152,8 +152,7 @@ public void testSendMarkerMessage() {
);

List<Integer> allTotalMessagesCaptor = totalMessagesCaptor.getAllValues();
Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 0);
Assert.assertEquals(allTotalMessagesCaptor.get(1).intValue(), 5);
Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5);
}

private ByteBuf createMessage(String message, int sequenceId) {
Expand Down

0 comments on commit 605868d

Please sign in to comment.