Skip to content

Commit

Permalink
Safely stop(close replication-producer) and remove replicator (#152)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored and merlimat committed Jan 10, 2017
1 parent 12f896e commit a6186b2
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,8 @@ public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog)
return disconnectFuture;
}

if (producer != null && state.compareAndSet(State.Started, State.Stopping)) {
if (producer != null && (state.compareAndSet(State.Starting, State.Stopping)
|| state.compareAndSet(State.Started, State.Stopping))) {
log.info("[{}][{} -> {}] Disconnect replicator at position {} with backlog {}", topicName, localCluster,
remoteCluster, cursor.getMarkDeletedPosition(), cursor.getNumberOfEntriesInBacklog());
return closeProducerAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ public class PersistentTopic implements Topic, AddEntryCallback {
public final String replicatorPrefix;

private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;

private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;

public static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.systemDefault());

Expand Down Expand Up @@ -544,6 +546,22 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
return closeFuture;
}

private CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
CompletableFuture<Void> result = new CompletableFuture<Void>();
checkReplication().thenAccept(res -> {
log.info("[{}] Policies updated successfully", topic);
result.complete(null);
}).exceptionally(th -> {
log.error("[{}] Policies update failed {}, scheduled retry in {} seconds", topic, th.getMessage(),
POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS, th);
brokerService.executor().schedule(this::checkReplicationAndRetryOnFailure,
POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS, TimeUnit.SECONDS);
result.completeExceptionally(th);
return null;
});
return result;
}

@Override
public CompletableFuture<Void> checkReplication() {
DestinationName name = DestinationName.get(topic);
Expand Down Expand Up @@ -665,15 +683,13 @@ public void deleteCursorComplete(Object ctx) {

@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to delete cursor {}", topic, name);
// Connect the producers back
replicators.get(remoteCluster).startProducer();
log.error("[{}] Failed to delete cursor {} {}", topic, name, exception.getMessage(), exception);
future.completeExceptionally(new PersistenceException(exception));
}
}, null);

}).exceptionally(e -> {
log.error("[{}] Failed to close replication producer {}", topic, name);
log.error("[{}] Failed to close replication producer {} {}", topic, name, e.getMessage(), e);
future.completeExceptionally(e);
return null;
});
Expand Down Expand Up @@ -1082,7 +1098,7 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
producers.forEach(Producer::checkPermissions);
subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(Consumer::checkPermissions));
checkMessageExpiry();
return checkReplication();
return checkReplicationAndRetryOnFailure();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.mockito.Mockito;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand All @@ -54,6 +57,7 @@
import com.yahoo.pulsar.client.api.Producer;
import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.client.impl.PulsarClientImpl;
import com.yahoo.pulsar.client.impl.ProducerImpl;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -604,6 +608,68 @@ public Void call() throws Exception {
}
}
}

/**
* It verifies that: if it fails while removing replicator-cluster-cursor: it should not restart the replicator and
* it should have cleaned up from the list
*
* @throws Exception
*/
@Test
public void testDeleteReplicatorFailure() throws Exception {
log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---");
final String topicName = "persistent://pulsar/global/ns/repltopicbatch";
final DestinationName dest = DestinationName.get(topicName);
MessageProducer producer1 = new MessageProducer(url1, dest);
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName);
final String replicatorClusterName = topic.getReplicators().keys().get(0);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
CountDownLatch latch = new CountDownLatch(1);
// delete cursor already : so next time if topic.removeReplicator will get exception but then it should
// remove-replicator from the list even with failure
ledger.asyncDeleteCursor("pulsar.repl." + replicatorClusterName, new DeleteCursorCallback() {
@Override
public void deleteCursorComplete(Object ctx) {
latch.countDown();
}

@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
latch.countDown();
}
}, null);
latch.await();

Method removeReplicator = PersistentTopic.class.getDeclaredMethod("removeReplicator", String.class);
removeReplicator.setAccessible(true);
// invoke removeReplicator : it fails as cursor is not present: but still it should remove the replicator from
// list without restarting it
CompletableFuture<Void> result = (CompletableFuture<Void>) removeReplicator.invoke(topic,
replicatorClusterName);
result.thenApply((v) -> {
assertNull(topic.getPersistentReplicator(replicatorClusterName));
return null;
});
}

@Test
public void testReplicatorProducerClosing() throws Exception {
log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---");
final String topicName = "persistent://pulsar/global/ns/repltopicbatch";
final DestinationName dest = DestinationName.get(topicName);
MessageProducer producer1 = new MessageProducer(url1, dest);
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName);
final String replicatorClusterName = topic.getReplicators().keys().get(0);
PersistentReplicator replicator = topic.getPersistentReplicator(replicatorClusterName);
pulsar2.close();
pulsar3.close();
replicator.disconnect(false);
Thread.sleep(100);
Field field = PersistentReplicator.class.getDeclaredField("producer");
field.setAccessible(true);
ProducerImpl producer = (ProducerImpl) field.get(replicator);
assertNull(producer);
}

private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,15 @@ public static BundlesData defaultBundle() {
bundle.setBoundaries(boundaries);
return bundle;
}

@Override
public String toString() {
return Objects.toStringHelper(this).add("auth_policies", auth_policies)
.add("replication_clusters", replication_clusters).add("bundles", bundles)
.add("backlog_quota_map", backlog_quota_map).add("persistence", persistence)
.add("latency_stats_sample_rate", latency_stats_sample_rate)
.add("message_ttl_in_seconds", message_ttl_in_seconds).add("retention_policies", retention_policies)
.add("deleted", deleted).toString();
}

}

0 comments on commit a6186b2

Please sign in to comment.