diff --git a/conf/broker.conf b/conf/broker.conf index 3630445ce4f1c..a4a1fa470cd4d 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -443,6 +443,12 @@ entryFilterNames= # The directory for all the entry filter implementations entryFiltersDirectory= +# Whether the broker should count filtered entries in dispatch rate limit calculations. When disabled, +# only messages sent to a consumer count towards a dispatch rate limit at the broker, topic, and +# subscription level. When enabled, messages filtered out due to entry filter logic are counted towards +# each relevant rate limit. +dispatchThrottlingForFilteredEntriesEnabled=false + # Whether allow topic level entry filters policies overrides broker configuration. allowOverrideEntryFilters=false diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index a9b3ab5212281..d22164b3fe975 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -957,6 +957,16 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private boolean dispatcherDispatchMessagesInSubscriptionThread = true; + @FieldContext( + dynamic = false, + category = CATEGORY_SERVER, + doc = "Whether the broker should count filtered entries in dispatch rate limit calculations. When disabled, " + + "only messages sent to a consumer count towards a dispatch rate limit at the broker, topic, and " + + "subscription level. When enabled, messages filtered out due to entry filter logic are counted towards " + + "each relevant rate limit." + ) + private boolean dispatchThrottlingForFilteredEntriesEnabled = false; + // <-- dispatcher read settings --> @FieldContext( dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index f3032d3758b2d..3255f8d515db1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -121,6 +121,9 @@ public int filterEntriesForConsumer(Optional entryWrapper, int e long totalBytes = 0; int totalChunkedMessages = 0; int totalEntries = 0; + int filteredMessageCount = 0; + int filteredEntryCount = 0; + long filteredBytesCount = 0; List entriesToFiltered = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null; List entriesToRedeliver = CollectionUtils.isNotEmpty(entryFilters) ? new ArrayList<>() : null; for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) { @@ -145,12 +148,18 @@ public int filterEntriesForConsumer(Optional entryWrapper, int e entriesToFiltered.add(entry.getPosition()); entries.set(i, null); this.filterRejectedMsgs.add(entryMsgCnt); + filteredEntryCount++; + filteredMessageCount += entryMsgCnt; + filteredBytesCount += metadataAndPayload.readableBytes(); entry.release(); continue; } else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) { entriesToRedeliver.add((PositionImpl) entry.getPosition()); entries.set(i, null); this.filterRescheduledMsgs.add(entryMsgCnt); + filteredEntryCount++; + filteredMessageCount += entryMsgCnt; + filteredBytesCount += metadataAndPayload.readableBytes(); entry.release(); continue; } @@ -233,6 +242,11 @@ public int filterEntriesForConsumer(Optional entryWrapper, int e } + if (serviceConfig.isDispatchThrottlingForFilteredEntriesEnabled()) { + acquirePermitsForDeliveredMessages(subscription.getTopic(), cursor, filteredEntryCount, + filteredMessageCount, filteredBytesCount); + } + sendMessageInfo.setTotalMessages(totalMessages); sendMessageInfo.setTotalBytes(totalBytes); sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages); @@ -245,6 +259,19 @@ private void individualAcknowledgeMessageIfNeeded(Position position, Map + rateLimiter.tryDispatchPermit(permits, totalBytesSent)); + topic.getDispatchRateLimiter().ifPresent(rateLimter -> + rateLimter.tryDispatchPermit(permits, totalBytesSent)); + getRateLimiter().ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(permits, totalBytesSent)); + } + } + /** * Determine whether the number of consumers on the subscription reaches the threshold. * @return diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 0285719fe47c8..11ba88e611bc4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -664,20 +664,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis } } - // acquire message-dispatch permits for already delivered messages - long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent; - if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) { - if (topic.getBrokerDispatchRateLimiter().isPresent()) { - topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent); - } - if (topic.getDispatchRateLimiter().isPresent()) { - topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent); - } - - if (dispatchRateLimiter.isPresent()) { - dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent); - } - } + acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent); if (entriesToDispatch > 0) { if (log.isDebugEnabled()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 5c8b046d2984c..18570ba57ecbb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -219,23 +219,8 @@ protected void dispatchEntriesToConsumer(Consumer currentConsumer, List e redeliveryTracker, epoch) .addListener(future -> { if (future.isSuccess()) { - int permits = dispatchThrottlingOnBatchMessageEnabled ? entries.size() - : sendMessageInfo.getTotalMessages(); - // acquire message-dispatch permits for already delivered messages - if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) { - if (topic.getBrokerDispatchRateLimiter().isPresent()) { - topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits, - sendMessageInfo.getTotalBytes()); - } - - if (topic.getDispatchRateLimiter().isPresent()) { - topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, - sendMessageInfo.getTotalBytes()); - } - dispatchRateLimiter.ifPresent(rateLimiter -> - rateLimiter.tryDispatchPermit(permits, - sendMessageInfo.getTotalBytes())); - } + acquirePermitsForDeliveredMessages(topic, cursor, entries.size(), + sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes()); // Schedule a new read batch operation only after the previous batch has been written to the socket. topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 024ed8581ef1c..5eb553106e679 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -296,19 +296,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis } // acquire message-dispatch permits for already delivered messages - if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) { - long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent; - if (topic.getBrokerDispatchRateLimiter().isPresent()) { - topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent); - } - if (topic.getDispatchRateLimiter().isPresent()) { - topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent); - } - - if (dispatchRateLimiter.isPresent()) { - dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent); - } - } + acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent); stuckConsumers.clear(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java index 82ecd9257b2f4..2277171cca585 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java @@ -20,20 +20,24 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload; -import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; import com.google.common.collect.ImmutableMap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.plugin.EntryFilter; @@ -60,8 +64,9 @@ public class AbstractBaseDispatcherTest { @BeforeMethod public void setup() throws Exception { this.svcConfig = mock(ServiceConfiguration.class); + when(svcConfig.isDispatchThrottlingForFilteredEntriesEnabled()).thenReturn(true); this.subscriptionMock = mock(PersistentSubscription.class); - this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig); + this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig, null); } @Test @@ -89,17 +94,24 @@ public void testFilterEntriesForConsumerOfEntryFilter() throws Exception { EntryFilter.FilterResult.REJECT); ImmutableMap entryFilters = ImmutableMap.of("key", mockFilter); when(mockTopic.getEntryFilters()).thenReturn(entryFilters); + DispatchRateLimiter subscriptionDispatchRateLimiter = mock(DispatchRateLimiter.class); - this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig); + this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig, + subscriptionDispatchRateLimiter); List entries = new ArrayList<>(); - entries.add(EntryImpl.create(1, 2, createMessage("message1", 1))); + Entry e = EntryImpl.create(1, 2, createMessage("message1", 1)); + long expectedBytePermits = e.getLength(); + entries.add(e); SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size()); - // - int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, false, null); + + ManagedCursor cursor = mock(ManagedCursor.class); + + int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, cursor, false, null); assertEquals(size, 0); + verify(subscriptionDispatchRateLimiter).tryDispatchPermit(1, expectedBytePermits); } @Test @@ -201,9 +213,18 @@ private ByteBuf createDelayedMessage(String message, int sequenceId) { private static class AbstractBaseDispatcherTestHelper extends AbstractBaseDispatcher { + private final Optional dispatchRateLimiter; + protected AbstractBaseDispatcherTestHelper(Subscription subscription, - ServiceConfiguration serviceConfig) { + ServiceConfiguration serviceConfig, + DispatchRateLimiter rateLimiter) { super(subscription, serviceConfig); + dispatchRateLimiter = Optional.ofNullable(rateLimiter); + } + + @Override + public Optional getRateLimiter() { + return dispatchRateLimiter; } @Override