diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java index 6fc13a9b3aa88..06659b9b2f710 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java @@ -267,8 +267,26 @@ private boolean hasRemoteProducers() { return foundRemote.get(); } - private void startReplProducers() { - replicators.forEach((region, replicator) -> replicator.startProducer()); + public void startReplProducers() { + // read repl-cluster from policies to avoid restart of replicator which are in process of disconnect and close + try { + Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache() + .get(AdminResource.path("policies", DestinationName.get(topic).getNamespace())) + .orElseThrow(() -> new KeeperException.NoNodeException()); + if (policies.replication_clusters != null) { + Set configuredClusters = Sets.newTreeSet(policies.replication_clusters); + replicators.forEach((region, replicator) -> { + if (configuredClusters.contains(region)) { + replicator.startProducer(); + } + }); + } + } catch (Exception e) { + if (log.isDebugEnabled()) { + log.debug("[{}] Error getting policies while starting repl-producers {}", topic, e.getMessage()); + } + replicators.forEach((region, replicator) -> replicator.startProducer()); + } } public CompletableFuture stopReplProducers() { diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicTest.java index d3e7517af32de..e8347fba9a4b7 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicTest.java @@ -22,6 +22,7 @@ import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -32,7 +33,9 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import java.lang.reflect.Method; import java.net.InetSocketAddress; +import java.net.URL; import java.util.ArrayList; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -57,8 +60,11 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.zookeeper.ZooKeeper; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; @@ -69,12 +75,15 @@ import com.yahoo.pulsar.broker.PulsarService; import com.yahoo.pulsar.broker.ServiceConfiguration; +import com.yahoo.pulsar.broker.admin.AdminResource; import com.yahoo.pulsar.broker.cache.ConfigurationCacheService; import com.yahoo.pulsar.broker.namespace.NamespaceService; import com.yahoo.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import com.yahoo.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; +import com.yahoo.pulsar.broker.service.persistent.PersistentReplicator; import com.yahoo.pulsar.broker.service.persistent.PersistentSubscription; import com.yahoo.pulsar.broker.service.persistent.PersistentTopic; +import com.yahoo.pulsar.client.api.PulsarClient; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata; @@ -828,4 +837,62 @@ public void testFailoverSubscription() throws Exception { assertNull(topic2.getPersistentSubscription(successSubName)); } + + /** + * {@link PersistentReplicator.removeReplicator} doesn't remove replicator in atomic way and does in multiple step: + * 1. disconnect replicator producer + *

+ * 2. close cursor + *

+ * 3. remove from replicator-list. + *

+ * + * If we try to startReplicationProducer before step-c finish then it should not avoid restarting repl-producer. + * + * @throws Exception + */ + @Test + public void testAtomicReplicationRemoval() throws Exception { + final String globalTopicName = "persistent://prop/global/ns-abc/successTopic"; + String localCluster = "local"; + String remoteCluster = "remote"; + final ManagedLedger ledgerMock = mock(ManagedLedger.class); + doNothing().when(ledgerMock).asyncDeleteCursor(anyObject(), anyObject(), anyObject()); + doReturn(new ArrayList()).when(ledgerMock).getCursors(); + + PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService); + String remoteReplicatorName = topic.replicatorPrefix + "." + remoteCluster; + ConcurrentOpenHashMap replicatorMap = topic.getReplicators(); + ; + final URL brokerUrl = new URL( + "http://" + pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getBrokerServicePort()); + PulsarClient client = PulsarClient.create(brokerUrl.toString()); + ManagedCursor cursor = mock(ManagedCursorImpl.class); + doReturn(remoteCluster).when(cursor).getName(); + brokerService.getReplicationClients().put(remoteCluster, client); + PersistentReplicator replicator = spy( + new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService)); + replicatorMap.put(remoteReplicatorName, replicator); + + // step-1 remove replicator : it will disconnect the producer but it will wait for callback to be completed + Method removeMethod = PersistentTopic.class.getDeclaredMethod("removeReplicator", String.class); + removeMethod.setAccessible(true); + removeMethod.invoke(topic, remoteReplicatorName); + + // step-2 now, policies doesn't have removed replication cluster so, it should not invoke "startProducer" of the + // replicator + when(pulsar.getConfigurationCache().policiesCache() + .get(AdminResource.path("policies", DestinationName.get(globalTopicName).getNamespace()))) + .thenReturn(Optional.of(new Policies())); + // try to start replicator again + topic.startReplProducers(); + // verify: replicator.startProducer is not invoked + verify(replicator, Mockito.times(0)).startProducer(); + + // step-3 : complete the callback to remove replicator from the list + ArgumentCaptor captor = ArgumentCaptor.forClass(DeleteCursorCallback.class); + Mockito.verify(ledgerMock).asyncDeleteCursor(anyObject(), captor.capture(), anyObject()); + DeleteCursorCallback callback = captor.getValue(); + callback.deleteCursorComplete(null); + } }