Skip to content

Commit

Permalink
Safely stop(close replication-producer) and remove replicator (#159)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored Jan 4, 2017
1 parent 62f759e commit c10ae7c
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,8 @@ public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog)
return disconnectFuture;
}

if (producer != null && state.compareAndSet(State.Started, State.Stopping)) {
if (producer != null && (state.compareAndSet(State.Starting, State.Stopping)
|| state.compareAndSet(State.Started, State.Stopping))) {
log.info("[{}][{} -> {}] Disconnect replicator at position {} with backlog {}", topicName, localCluster,
remoteCluster, cursor.getMarkDeletedPosition(), cursor.getNumberOfEntriesInBacklog());
return closeProducerAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public class PersistentTopic implements Topic, AddEntryCallback {
public final String replicatorPrefix;

private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;

private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;

public static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.systemDefault());

Expand Down Expand Up @@ -662,15 +664,13 @@ public void deleteCursorComplete(Object ctx) {

@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to delete cursor {}", topic, name);
// Connect the producers back
replicators.get(remoteCluster).startProducer();
log.error("[{}] Failed to delete cursor {} {}", topic, name, exception.getMessage(), exception);
future.completeExceptionally(new PersistenceException(exception));
}
}, null);

}).exceptionally(e -> {
log.error("[{}] Failed to close replication producer {}", topic, name);
log.error("[{}] Failed to close replication producer {} {}", topic, name, e.getMessage(), e);
future.completeExceptionally(e);
return null;
});
Expand Down Expand Up @@ -1071,7 +1071,19 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
producers.forEach(Producer::checkPermissions);
subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(Consumer::checkPermissions));
checkMessageExpiry();
return checkReplication();
CompletableFuture<Void> result = new CompletableFuture<Void>();
checkReplication().thenAccept(res -> {
log.info("Policies updated successfully {}", data);
result.complete(null);
}).exceptionally(th -> {
log.error("Policies update failed {} {}, scheduled retry in {} seconds", data, th.getMessage(),
POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS, th);
brokerService.executor().schedule(this::checkReplication, POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS,
TimeUnit.SECONDS);
result.completeExceptionally(th);
return null;
});
return result;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,24 @@
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.fail;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand All @@ -48,6 +53,7 @@
import com.yahoo.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import com.yahoo.pulsar.client.api.MessageBuilder;
import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.client.impl.ProducerImpl;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -563,6 +569,68 @@ public Void call() throws Exception {
}
}
}

/**
* It verifies that: if it fails while removing replicator-cluster-cursor: it should not restart the replicator and
* it should have cleaned up from the list
*
* @throws Exception
*/
@Test
public void testDeleteReplicatorFailure2() throws Exception {
log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---");
final String topicName = "persistent://pulsar/global/ns/repltopicbatch";
final DestinationName dest = DestinationName.get(topicName);
MessageProducer producer1 = new MessageProducer(url1, dest);
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName);
final String replicatorClusterName = topic.getReplicators().keys().get(0);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
CountDownLatch latch = new CountDownLatch(1);
// delete cursor already : so next time if topic.removeReplicator will get exception but then it should
// remove-replicator from the list even with failure
ledger.asyncDeleteCursor("pulsar.repl." + replicatorClusterName, new DeleteCursorCallback() {
@Override
public void deleteCursorComplete(Object ctx) {
latch.countDown();
}

@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
latch.countDown();
}
}, null);
latch.await();

Method removeReplicator = PersistentTopic.class.getDeclaredMethod("removeReplicator", String.class);
removeReplicator.setAccessible(true);
// invoke removeReplicator : it fails as cursor is not present: but still it should remove the replicator from
// list without restarting it
CompletableFuture<Void> result = (CompletableFuture<Void>) removeReplicator.invoke(topic,
replicatorClusterName);
result.thenApply((v) -> {
assertNull(topic.getPersistentReplicator(replicatorClusterName));
return null;
});
}

@Test
public void testReplicatorProducerClosing() throws Exception {
log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---");
final String topicName = "persistent://pulsar/global/ns/repltopicbatch";
final DestinationName dest = DestinationName.get(topicName);
MessageProducer producer1 = new MessageProducer(url1, dest);
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName);
final String replicatorClusterName = topic.getReplicators().keys().get(0);
PersistentReplicator replicator = topic.getPersistentReplicator(replicatorClusterName);
pulsar2.close();
pulsar3.close();
replicator.disconnect(false);
Thread.sleep(100);
Field field = PersistentReplicator.class.getDeclaredField("producer");
field.setAccessible(true);
ProducerImpl producer = (ProducerImpl) field.get(replicator);
assertNull(producer);
}

private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);

Expand Down

0 comments on commit c10ae7c

Please sign in to comment.