Skip to content

Commit

Permalink
[fix] [broker] Producer is blocked on creation because backlog exceed…
Browse files Browse the repository at this point in the history
…ed on topic, when dedup is enabled and no producer is there (#20951)

(cherry picked from commit 30073db)
  • Loading branch information
heesung-sn authored and poorbarcode committed Aug 29, 2023
1 parent 25f76ef commit f68589e
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ public void handleExceededBacklogQuota(PersistentTopic persistentTopic, BacklogQ
break;
case producer_exception:
case producer_request_hold:
disconnectProducers(persistentTopic);
if (!advanceSlowestSystemCursor(persistentTopic)) {
// The slowest is not a system cursor. Disconnecting producers to put backpressure.
disconnectProducers(persistentTopic);
}
break;
default:
break;
Expand Down Expand Up @@ -268,4 +271,27 @@ private void disconnectProducers(PersistentTopic persistentTopic) {

});
}

/**
* Advances the slowest cursor if that is a system cursor.
*
* @param persistentTopic
* @return true if the slowest cursor is a system cursor
*/
private boolean advanceSlowestSystemCursor(PersistentTopic persistentTopic) {

ManagedLedgerImpl mLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
ManagedCursor slowestConsumer = mLedger.getSlowestConsumer();
if (slowestConsumer == null) {
return false;
}

if (PersistentTopic.isDedupCursorName(slowestConsumer.getName())) {
persistentTopic.getMessageDeduplication().takeSnapshot();
return true;
}

// We may need to check other system cursors here : replicator, compaction
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
Expand Down Expand Up @@ -130,6 +131,9 @@ public MessageDupUnknownException() {

private final String replicatorPrefix;


private final AtomicBoolean snapshotTaking = new AtomicBoolean(false);

public MessageDeduplication(PulsarService pulsar, PersistentTopic topic, ManagedLedger managedLedger) {
this.pulsar = pulsar;
this.topic = topic;
Expand Down Expand Up @@ -406,6 +410,11 @@ private void takeSnapshot(Position position) {
if (log.isDebugEnabled()) {
log.debug("[{}] Taking snapshot of sequence ids map", topic.getName());
}

if (!snapshotTaking.compareAndSet(false, true)) {
return;
}

Map<String, Long> snapshot = new TreeMap<>();
highestSequencedPersisted.forEach((producerName, sequenceId) -> {
if (snapshot.size() < maxNumberOfProducers) {
Expand All @@ -420,11 +429,13 @@ public void markDeleteComplete(Object ctx) {
log.debug("[{}] Stored new deduplication snapshot at {}", topic.getName(), position);
}
lastSnapshotTimestamp = System.currentTimeMillis();
snapshotTaking.set(false);
}

@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Failed to store new deduplication snapshot at {}", topic.getName(), position);
snapshotTaking.set(false);
}
}, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
private final TopicName shadowSourceTopic;

static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";

public static boolean isDedupCursorName(String name) {
return DEDUPLICATION_CURSOR_NAME.equals(name);
}
private static final String TOPIC_EPOCH_PROPERTY_NAME = "pulsar.topic.epoch";

private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ void setup() throws Exception {
config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER);
config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
config.setSystemTopicEnabled(false);
config.setTopicLevelPoliciesEnabled(false);
config.setSystemTopicEnabled(true);
config.setTopicLevelPoliciesEnabled(true);
config.setForceDeleteNamespaceAllowed(true);

pulsar = new PulsarService(config);
Expand Down Expand Up @@ -1169,8 +1169,13 @@ public void testProducerException() throws Exception {
assertTrue(gotException, "backlog exceeded exception did not occur");
}

@Test
public void testProducerExceptionAndThenUnblockSizeQuota() throws Exception {
@DataProvider(name = "dedupTestSet")
public static Object[][] dedupTestSet() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}

@Test(dataProvider = "dedupTestSet")
public void testProducerExceptionAndThenUnblockSizeQuota(boolean dedupTestSet) throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/quotahold"),
new HashMap<>());
admin.namespaces().setBacklogQuota("prop/quotahold",
Expand All @@ -1186,9 +1191,12 @@ public void testProducerExceptionAndThenUnblockSizeQuota() throws Exception {
boolean gotException = false;

Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();

byte[] content = new byte[1024];
Producer<byte[]> producer = createProducer(client, topic1);

admin.topicPolicies().setDeduplicationStatus(topic1, dedupTestSet);
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);

for (int i = 0; i < 10; i++) {
producer.send(content);
}
Expand All @@ -1207,6 +1215,7 @@ public void testProducerExceptionAndThenUnblockSizeQuota() throws Exception {
}

assertTrue(gotException, "backlog exceeded exception did not occur");
assertFalse(producer.isConnected());
// now remove backlog and ensure that producer is unblocked;

TopicStats stats = getTopicStats(topic1);
Expand All @@ -1223,14 +1232,33 @@ public void testProducerExceptionAndThenUnblockSizeQuota() throws Exception {
Exception sendException = null;
gotException = false;
try {
for (int i = 0; i < 5; i++) {
for (int i = 0; i < 10; i++) {
producer.send(content);
Message<?> msg = consumer.receive();
consumer.acknowledge(msg);
}
} catch (Exception e) {
gotException = true;
sendException = e;
}
Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
assertFalse(gotException, "unable to publish due to " + sendException);

gotException = false;
long lastDisconnectedTimestamp = producer.getLastDisconnectedTimestamp();
try {
// try to send over backlog quota and make sure it passes
producer.send(content);
producer.send(content);
} catch (PulsarClientException ce) {
assertTrue(ce instanceof PulsarClientException.ProducerBlockedQuotaExceededException
|| ce instanceof PulsarClientException.TimeoutException, ce.getMessage());
gotException = true;
sendException = ce;
}
assertFalse(gotException, "unable to publish due to " + sendException);
assertEquals(lastDisconnectedTimestamp, producer.getLastDisconnectedTimestamp());

}

@Test
Expand Down

0 comments on commit f68589e

Please sign in to comment.