diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java index be8d00922fccc..d996380db0a58 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java @@ -617,7 +617,7 @@ public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) return closeProducerAsync(); } else { // If there's already a reconnection happening, signal to close it whenever it's ready - STATE_UPDATER.set(this, State.Stopped); + STATE_UPDATER.set(this, State.Stopping); } return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java index 4e750e86eb139..6fc13a9b3aa88 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java @@ -237,6 +237,9 @@ public void addProducer(Producer producer) throws BrokerServiceException { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(), USAGE_COUNT_UPDATER.get(this)); } + + // Start replication producers if not already + startReplProducers(); } finally { lock.readLock().unlock(); }