Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Safely close replicator and Prevent creation of duplicate replicator #175

Merged
merged 1 commit into from
Jan 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,22 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {

return closeFuture;
}

private CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
CompletableFuture<Void> result = new CompletableFuture<Void>();
checkReplication().thenAccept(res -> {
log.info("[{}] Policies updated successfully", topic);
result.complete(null);
}).exceptionally(th -> {
log.error("[{}] Policies update failed {}, scheduled retry in {} seconds", topic, th.getMessage(),
POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS, th);
brokerService.executor().schedule(this::checkReplicationAndRetryOnFailure,
POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS, TimeUnit.SECONDS);
result.completeExceptionally(th);
return null;
});
return result;
}

@Override
public CompletableFuture<Void> checkReplication() {
Expand Down Expand Up @@ -632,8 +648,8 @@ CompletableFuture<Void> startReplicator(String remoteCluster) {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
replicators.put(remoteCluster, new PersistentReplicator(PersistentTopic.this, cursor, localCluster,
remoteCluster, brokerService));
replicators.computeIfAbsent(remoteCluster, r -> new PersistentReplicator(PersistentTopic.this, cursor,
localCluster, remoteCluster, brokerService));
future.complete(null);
}

Expand Down Expand Up @@ -1071,19 +1087,7 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
producers.forEach(Producer::checkPermissions);
subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(Consumer::checkPermissions));
checkMessageExpiry();
CompletableFuture<Void> result = new CompletableFuture<Void>();
checkReplication().thenAccept(res -> {
log.info("Policies updated successfully {}", data);
result.complete(null);
}).exceptionally(th -> {
log.error("Policies update failed {} {}, scheduled retry in {} seconds", data, th.getMessage(),
POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS, th);
brokerService.executor().schedule(this::checkReplication, POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS,
TimeUnit.SECONDS);
result.completeExceptionally(th);
return null;
});
return result;
return checkReplicationAndRetryOnFailure();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.fail;

import java.lang.reflect.Field;
Expand All @@ -29,13 +28,16 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand All @@ -51,9 +53,11 @@
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.client.admin.PulsarAdminException;
import com.yahoo.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import com.yahoo.pulsar.client.api.ClientConfiguration;
import com.yahoo.pulsar.client.api.MessageBuilder;
import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.client.impl.ProducerImpl;
import com.yahoo.pulsar.client.impl.PulsarClientImpl;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -631,6 +635,89 @@ public void testReplicatorProducerClosing() throws Exception {
ProducerImpl producer = (ProducerImpl) field.get(replicator);
assertNull(producer);
}

@Test
public void testConcurrentReplicator() throws Exception {

log.info("--- Starting ReplicatorTest::testConfigChange ---");

final DestinationName dest = DestinationName.get(String.format("persistent://pulsar/global/ns1/topic-%d", 0));
ClientConfiguration conf = new ClientConfiguration();
conf.setStatsInterval(0, TimeUnit.SECONDS);
ProducerImpl producer = (ProducerImpl) PulsarClient.create(url1.toString(), conf).createProducer(dest.toString());
producer.close();

PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopic(dest.toString()).get();

PulsarClientImpl pulsarClient = spy((PulsarClientImpl) pulsar1.getBrokerService().getReplicationClient("r3"));
final Method startRepl = PersistentTopic.class.getDeclaredMethod("startReplicator", String.class);
startRepl.setAccessible(true);

Field replClientField = BrokerService.class.getDeclaredField("replicationClients");
replClientField.setAccessible(true);
ConcurrentOpenHashMap<String, PulsarClient> replicationClients = (ConcurrentOpenHashMap<String, PulsarClient>) replClientField
.get(pulsar1.getBrokerService());
replicationClients.put("r3", pulsarClient);

ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
try {
startRepl.invoke(topic, "r3");
} catch (Exception e) {
fail("setting replicator failed", e);
}
});
}
Thread.sleep(3000);

Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.anyString(), Mockito.anyObject(),
Mockito.anyString());

}

/**
* It verifies that: if it fails while removing replicator-cluster-cursor: it should not restart the replicator and
* it should have cleaned up from the list
*
* @throws Exception
*/
@Test
public void testDeleteReplicatorFailure() throws Exception {
log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---");
final String topicName = "persistent://pulsar/global/ns/repltopicbatch";
final DestinationName dest = DestinationName.get(topicName);
MessageProducer producer1 = new MessageProducer(url1, dest);
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName);
final String replicatorClusterName = topic.getReplicators().keys().get(0);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
CountDownLatch latch = new CountDownLatch(1);
// delete cursor already : so next time if topic.removeReplicator will get exception but then it should
// remove-replicator from the list even with failure
ledger.asyncDeleteCursor("pulsar.repl." + replicatorClusterName, new DeleteCursorCallback() {
@Override
public void deleteCursorComplete(Object ctx) {
latch.countDown();
}

@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
latch.countDown();
}
}, null);
latch.await();

Method removeReplicator = PersistentTopic.class.getDeclaredMethod("removeReplicator", String.class);
removeReplicator.setAccessible(true);
// invoke removeReplicator : it fails as cursor is not present: but still it should remove the replicator from
// list without restarting it
CompletableFuture<Void> result = (CompletableFuture<Void>) removeReplicator.invoke(topic,
replicatorClusterName);
result.thenApply((v) -> {
assertNull(topic.getPersistentReplicator(replicatorClusterName));
return null;
});
}

private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ void setup() throws Exception {
assertEquals(admin2.clusters().getCluster("r1").getServiceUrl(), url1.toString());
assertEquals(admin2.clusters().getCluster("r2").getServiceUrl(), url2.toString());
assertEquals(admin2.clusters().getCluster("r3").getServiceUrl(), url3.toString());
admin1.namespaces().createNamespace("pulsar/global/ns1");
admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns1", Lists.newArrayList("r1", "r2"));
/*
* assertEquals(admin2.clusters().getCluster("global").getServiceUrl(), "http://global:8080");
* assertEquals(admin2.properties().getPropertyAdmin("pulsar").getAdminRoles(), Lists.newArrayList("appid1",
Expand Down