Skip to content

Commit

Permalink
[feat][broker] Add config to count filtered entries towards rate limi…
Browse files Browse the repository at this point in the history
…ts (apache#132)

Co-authored-by: Michael Marshall <mmarshall@apache.org>
  • Loading branch information
eolivelli and michaeljmarshall authored Sep 16, 2022
1 parent dff29cf commit 522dfd8
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 51 deletions.
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,12 @@ entryFilterNames=
# The directory for all the entry filter implementations
entryFiltersDirectory=

# Whether the broker should count filtered entries in dispatch rate limit calculations. When disabled,
# only messages sent to a consumer count towards a dispatch rate limit at the broker, topic, and
# subscription level. When enabled, messages filtered out due to entry filter logic are counted towards
# each relevant rate limit.
dispatchThrottlingForFilteredEntriesEnabled=false

# Whether allow topic level entry filters policies overrides broker configuration.
allowOverrideEntryFilters=false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,16 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private boolean dispatcherDispatchMessagesInSubscriptionThread = true;

@FieldContext(
dynamic = false,
category = CATEGORY_SERVER,
doc = "Whether the broker should count filtered entries in dispatch rate limit calculations. When disabled, "
+ "only messages sent to a consumer count towards a dispatch rate limit at the broker, topic, and "
+ "subscription level. When enabled, messages filtered out due to entry filter logic are counted towards "
+ "each relevant rate limit."
)
private boolean dispatchThrottlingForFilteredEntriesEnabled = false;

// <-- dispatcher read settings -->
@FieldContext(
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int e
long totalBytes = 0;
int totalChunkedMessages = 0;
int totalEntries = 0;
int filteredMessageCount = 0;
int filteredEntryCount = 0;
long filteredBytesCount = 0;
List<Position> entriesToFiltered = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
List<PositionImpl> entriesToRedeliver = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null;
for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
Expand All @@ -145,12 +148,18 @@ public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int e
entriesToFiltered.add(entry.getPosition());
entries.set(i, null);
this.filterRejectedMsgs.add(entryMsgCnt);
filteredEntryCount++;
filteredMessageCount += entryMsgCnt;
filteredBytesCount += metadataAndPayload.readableBytes();
entry.release();
continue;
} else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
entriesToRedeliver.add((PositionImpl) entry.getPosition());
entries.set(i, null);
this.filterRescheduledMsgs.add(entryMsgCnt);
filteredEntryCount++;
filteredMessageCount += entryMsgCnt;
filteredBytesCount += metadataAndPayload.readableBytes();
entry.release();
continue;
}
Expand Down Expand Up @@ -233,6 +242,11 @@ public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int e

}

if (serviceConfig.isDispatchThrottlingForFilteredEntriesEnabled()) {
acquirePermitsForDeliveredMessages(subscription.getTopic(), cursor, filteredEntryCount,
filteredMessageCount, filteredBytesCount);
}

sendMessageInfo.setTotalMessages(totalMessages);
sendMessageInfo.setTotalBytes(totalBytes);
sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
Expand All @@ -245,6 +259,19 @@ private void individualAcknowledgeMessageIfNeeded(Position position, Map<String,
}
}

protected void acquirePermitsForDeliveredMessages(Topic topic, ManagedCursor cursor, long totalEntries,
long totalMessagesSent, long totalBytesSent) {
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled()
|| (cursor != null && !cursor.isActive())) {
long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
topic.getBrokerDispatchRateLimiter().ifPresent(rateLimiter ->
rateLimiter.tryDispatchPermit(permits, totalBytesSent));
topic.getDispatchRateLimiter().ifPresent(rateLimter ->
rateLimter.tryDispatchPermit(permits, totalBytesSent));
getRateLimiter().ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(permits, totalBytesSent));
}
}

/**
* Determine whether the number of consumers on the subscription reaches the threshold.
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,20 +664,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
}
}

// acquire message-dispatch permits for already delivered messages
long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
if (topic.getBrokerDispatchRateLimiter().isPresent()) {
topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
}
if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
}

if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent);
}
}
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

if (entriesToDispatch > 0) {
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,23 +219,8 @@ protected void dispatchEntriesToConsumer(Consumer currentConsumer, List<Entry> e
redeliveryTracker, epoch)
.addListener(future -> {
if (future.isSuccess()) {
int permits = dispatchThrottlingOnBatchMessageEnabled ? entries.size()
: sendMessageInfo.getTotalMessages();
// acquire message-dispatch permits for already delivered messages
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
if (topic.getBrokerDispatchRateLimiter().isPresent()) {
topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits,
sendMessageInfo.getTotalBytes());
}

if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().tryDispatchPermit(permits,
sendMessageInfo.getTotalBytes());
}
dispatchRateLimiter.ifPresent(rateLimiter ->
rateLimiter.tryDispatchPermit(permits,
sendMessageInfo.getTotalBytes()));
}
acquirePermitsForDeliveredMessages(topic, cursor, entries.size(),
sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes());

// Schedule a new read batch operation only after the previous batch has been written to the socket.
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,19 +296,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
}

// acquire message-dispatch permits for already delivered messages
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
if (topic.getBrokerDispatchRateLimiter().isPresent()) {
topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
}
if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
}

if (dispatchRateLimiter.isPresent()) {
dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent);
}
}
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

stuckConsumers.clear();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,24 @@

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import com.google.common.collect.ImmutableMap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
Expand All @@ -60,8 +64,9 @@ public class AbstractBaseDispatcherTest {
@BeforeMethod
public void setup() throws Exception {
this.svcConfig = mock(ServiceConfiguration.class);
when(svcConfig.isDispatchThrottlingForFilteredEntriesEnabled()).thenReturn(true);
this.subscriptionMock = mock(PersistentSubscription.class);
this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig);
this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig, null);
}

@Test
Expand Down Expand Up @@ -89,17 +94,24 @@ public void testFilterEntriesForConsumerOfEntryFilter() throws Exception {
EntryFilter.FilterResult.REJECT);
ImmutableMap<String, EntryFilterWithClassLoader> entryFilters = ImmutableMap.of("key", mockFilter);
when(mockTopic.getEntryFilters()).thenReturn(entryFilters);
DispatchRateLimiter subscriptionDispatchRateLimiter = mock(DispatchRateLimiter.class);

this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig);
this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig,
subscriptionDispatchRateLimiter);

List<Entry> entries = new ArrayList<>();

entries.add(EntryImpl.create(1, 2, createMessage("message1", 1)));
Entry e = EntryImpl.create(1, 2, createMessage("message1", 1));
long expectedBytePermits = e.getLength();
entries.add(e);
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
//
int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, false, null);

ManagedCursor cursor = mock(ManagedCursor.class);

int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, cursor, false, null);
assertEquals(size, 0);
verify(subscriptionDispatchRateLimiter).tryDispatchPermit(1, expectedBytePermits);
}

@Test
Expand Down Expand Up @@ -201,9 +213,18 @@ private ByteBuf createDelayedMessage(String message, int sequenceId) {

private static class AbstractBaseDispatcherTestHelper extends AbstractBaseDispatcher {

private final Optional<DispatchRateLimiter> dispatchRateLimiter;

protected AbstractBaseDispatcherTestHelper(Subscription subscription,
ServiceConfiguration serviceConfig) {
ServiceConfiguration serviceConfig,
DispatchRateLimiter rateLimiter) {
super(subscription, serviceConfig);
dispatchRateLimiter = Optional.ofNullable(rateLimiter);
}

@Override
public Optional<DispatchRateLimiter> getRateLimiter() {
return dispatchRateLimiter;
}

@Override
Expand Down

0 comments on commit 522dfd8

Please sign in to comment.