diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java index bc2541c802e63..6ad1697adfc39 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java @@ -103,7 +103,10 @@ public void handleExceededBacklogQuota(PersistentTopic persistentTopic, BacklogQ break; case producer_exception: case producer_request_hold: - disconnectProducers(persistentTopic); + if (!advanceSlowestSystemCursor(persistentTopic)) { + // The slowest is not a system cursor. Disconnecting producers to put backpressure. + disconnectProducers(persistentTopic); + } break; default: break; @@ -268,4 +271,27 @@ private void disconnectProducers(PersistentTopic persistentTopic) { }); } + + /** + * Advances the slowest cursor if that is a system cursor. + * + * @param persistentTopic + * @return true if the slowest cursor is a system cursor + */ + private boolean advanceSlowestSystemCursor(PersistentTopic persistentTopic) { + + ManagedLedgerImpl mLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + ManagedCursor slowestConsumer = mLedger.getSlowestConsumer(); + if (slowestConsumer == null) { + return false; + } + + if (PersistentTopic.isDedupCursorName(slowestConsumer.getName())) { + persistentTopic.getMessageDeduplication().takeSnapshot(); + return true; + } + + // We may need to check other system cursors here : replicator, compaction + return false; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 490be4a8876fc..d1b4b74945f8e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback; @@ -130,6 +131,9 @@ public MessageDupUnknownException() { private final String replicatorPrefix; + + private final AtomicBoolean snapshotTaking = new AtomicBoolean(false); + public MessageDeduplication(PulsarService pulsar, PersistentTopic topic, ManagedLedger managedLedger) { this.pulsar = pulsar; this.topic = topic; @@ -406,6 +410,11 @@ private void takeSnapshot(Position position) { if (log.isDebugEnabled()) { log.debug("[{}] Taking snapshot of sequence ids map", topic.getName()); } + + if (!snapshotTaking.compareAndSet(false, true)) { + return; + } + Map snapshot = new TreeMap<>(); highestSequencedPersisted.forEach((producerName, sequenceId) -> { if (snapshot.size() < maxNumberOfProducers) { @@ -420,11 +429,13 @@ public void markDeleteComplete(Object ctx) { log.debug("[{}] Stored new deduplication snapshot at {}", topic.getName(), position); } lastSnapshotTimestamp = System.currentTimeMillis(); + snapshotTaking.set(false); } @Override public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { log.warn("[{}] Failed to store new deduplication snapshot at {}", topic.getName(), position); + snapshotTaking.set(false); } }, null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 593f741ce39d7..ffb6828c8c311 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -194,6 +194,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal private final TopicName shadowSourceTopic; static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup"; + + public static boolean isDedupCursorName(String name) { + return DEDUPLICATION_CURSOR_NAME.equals(name); + } private static final String TOPIC_EPOCH_PROPERTY_NAME = "pulsar.topic.epoch"; private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index f3463ee121d75..0ac5fdaef1599 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -127,8 +127,8 @@ void setup() throws Exception { config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER); config.setManagedLedgerMinLedgerRolloverTimeMinutes(0); config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); - config.setSystemTopicEnabled(false); - config.setTopicLevelPoliciesEnabled(false); + config.setSystemTopicEnabled(true); + config.setTopicLevelPoliciesEnabled(true); config.setForceDeleteNamespaceAllowed(true); pulsar = new PulsarService(config); @@ -1169,8 +1169,13 @@ public void testProducerException() throws Exception { assertTrue(gotException, "backlog exceeded exception did not occur"); } - @Test - public void testProducerExceptionAndThenUnblockSizeQuota() throws Exception { + @DataProvider(name = "dedupTestSet") + public static Object[][] dedupTestSet() { + return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; + } + + @Test(dataProvider = "dedupTestSet") + public void testProducerExceptionAndThenUnblockSizeQuota(boolean dedupTestSet) throws Exception { assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"), new HashMap<>()); admin.namespaces().setBacklogQuota("prop/quotahold", @@ -1186,9 +1191,12 @@ public void testProducerExceptionAndThenUnblockSizeQuota() throws Exception { boolean gotException = false; Consumer consumer = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); - byte[] content = new byte[1024]; Producer producer = createProducer(client, topic1); + + admin.topicPolicies().setDeduplicationStatus(topic1, dedupTestSet); + Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); + for (int i = 0; i < 10; i++) { producer.send(content); } @@ -1207,6 +1215,7 @@ public void testProducerExceptionAndThenUnblockSizeQuota() throws Exception { } assertTrue(gotException, "backlog exceeded exception did not occur"); + assertFalse(producer.isConnected()); // now remove backlog and ensure that producer is unblocked; TopicStats stats = getTopicStats(topic1); @@ -1223,14 +1232,33 @@ public void testProducerExceptionAndThenUnblockSizeQuota() throws Exception { Exception sendException = null; gotException = false; try { - for (int i = 0; i < 5; i++) { + for (int i = 0; i < 10; i++) { producer.send(content); + Message msg = consumer.receive(); + consumer.acknowledge(msg); } } catch (Exception e) { gotException = true; sendException = e; } + Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); assertFalse(gotException, "unable to publish due to " + sendException); + + gotException = false; + long lastDisconnectedTimestamp = producer.getLastDisconnectedTimestamp(); + try { + // try to send over backlog quota and make sure it passes + producer.send(content); + producer.send(content); + } catch (PulsarClientException ce) { + assertTrue(ce instanceof PulsarClientException.ProducerBlockedQuotaExceededException + || ce instanceof PulsarClientException.TimeoutException, ce.getMessage()); + gotException = true; + sendException = ce; + } + assertFalse(gotException, "unable to publish due to " + sendException); + assertEquals(lastDisconnectedTimestamp, producer.getLastDisconnectedTimestamp()); + } @Test