Skip to content

Commit

Permalink
Replicator-producer creation should be idempotent (#161)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored and merlimat committed Jan 9, 2017
1 parent efd21ba commit e2d8c2e
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ CompletableFuture<Void> startReplicator(String remoteCluster) {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
replicators.put(remoteCluster, new PersistentReplicator(PersistentTopic.this, cursor, localCluster,
replicators.computeIfAbsent(remoteCluster, r -> new PersistentReplicator(PersistentTopic.this, cursor, localCluster,
remoteCluster, brokerService));
future.complete(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,20 @@
import static org.testng.Assert.fail;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand All @@ -44,10 +48,12 @@
import com.yahoo.pulsar.broker.namespace.OwnershipCache;
import com.yahoo.pulsar.broker.service.persistent.PersistentReplicator;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.client.admin.PulsarAdminException;
import com.yahoo.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import com.yahoo.pulsar.client.api.ClientConfiguration;
import com.yahoo.pulsar.client.api.MessageBuilder;
import com.yahoo.pulsar.client.api.Producer;
import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.client.impl.PulsarClientImpl;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -155,6 +161,46 @@ public Void call() throws Exception {

// Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
}

@Test
public void testConcurrentReplicator() throws Exception {

log.info("--- Starting ReplicatorTest::testConfigChange ---");

final DestinationName dest = DestinationName.get(String.format("persistent://pulsar/global/ns1/topic-%d", 0));
ClientConfiguration conf = new ClientConfiguration();
conf.setStatsInterval(0, TimeUnit.SECONDS);
Producer producer = PulsarClient.create(url1.toString(), conf).createProducer(dest.toString());
producer.close();

PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopic(dest.toString()).get();

PulsarClientImpl pulsarClient = spy((PulsarClientImpl) pulsar1.getBrokerService().getReplicationClient("r3"));
final Method startRepl = PersistentTopic.class.getDeclaredMethod("startReplicator", String.class);
startRepl.setAccessible(true);

Field replClientField = BrokerService.class.getDeclaredField("replicationClients");
replClientField.setAccessible(true);
ConcurrentOpenHashMap<String, PulsarClient> replicationClients = (ConcurrentOpenHashMap<String, PulsarClient>) replClientField
.get(pulsar1.getBrokerService());
replicationClients.put("r3", pulsarClient);

ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
try {
startRepl.invoke(topic, "r3");
} catch (Exception e) {
fail("setting replicator failed", e);
}
});
}
Thread.sleep(3000);

Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.anyString(), Mockito.anyObject(),
Mockito.anyString());

}

@Test(enabled = false)
public void testConfigChangeNegativeCases() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ void setup() throws Exception {
new PropertyAdmin(Lists.newArrayList("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r2", "r3")));
admin1.namespaces().createNamespace("pulsar/global/ns");
admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns", Lists.newArrayList("r1", "r2", "r3"));
admin1.namespaces().createNamespace("pulsar/global/ns1");
admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns1", Lists.newArrayList("r1", "r2"));

assertEquals(admin2.clusters().getCluster("r1").getServiceUrl(), url1.toString());
assertEquals(admin2.clusters().getCluster("r2").getServiceUrl(), url2.toString());
Expand Down

0 comments on commit e2d8c2e

Please sign in to comment.