Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Safely stop(close replication-producer) and remove replicator #152

Merged
merged 1 commit into from
Jan 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,8 @@ public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog)
return disconnectFuture;
}

if (producer != null && state.compareAndSet(State.Started, State.Stopping)) {
if (producer != null && (state.compareAndSet(State.Starting, State.Stopping)
|| state.compareAndSet(State.Started, State.Stopping))) {
log.info("[{}][{} -> {}] Disconnect replicator at position {} with backlog {}", topicName, localCluster,
remoteCluster, cursor.getMarkDeletedPosition(), cursor.getNumberOfEntriesInBacklog());
return closeProducerAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ public class PersistentTopic implements Topic, AddEntryCallback {
public final String replicatorPrefix;

private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;

private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;

public static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.systemDefault());

Expand Down Expand Up @@ -544,6 +546,22 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
return closeFuture;
}

private CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
CompletableFuture<Void> result = new CompletableFuture<Void>();
checkReplication().thenAccept(res -> {
log.info("[{}] Policies updated successfully", topic);
result.complete(null);
}).exceptionally(th -> {
log.error("[{}] Policies update failed {}, scheduled retry in {} seconds", topic, th.getMessage(),
POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS, th);
brokerService.executor().schedule(this::checkReplicationAndRetryOnFailure,
POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS, TimeUnit.SECONDS);
result.completeExceptionally(th);
return null;
});
return result;
}

@Override
public CompletableFuture<Void> checkReplication() {
DestinationName name = DestinationName.get(topic);
Expand Down Expand Up @@ -665,15 +683,13 @@ public void deleteCursorComplete(Object ctx) {

@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to delete cursor {}", topic, name);
// Connect the producers back
replicators.get(remoteCluster).startProducer();
log.error("[{}] Failed to delete cursor {} {}", topic, name, exception.getMessage(), exception);
future.completeExceptionally(new PersistenceException(exception));
}
}, null);

}).exceptionally(e -> {
log.error("[{}] Failed to close replication producer {}", topic, name);
log.error("[{}] Failed to close replication producer {} {}", topic, name, e.getMessage(), e);
future.completeExceptionally(e);
return null;
});
Expand Down Expand Up @@ -1082,7 +1098,7 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
producers.forEach(Producer::checkPermissions);
subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(Consumer::checkPermissions));
checkMessageExpiry();
return checkReplication();
return checkReplicationAndRetryOnFailure();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.mockito.Mockito;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand All @@ -54,6 +57,7 @@
import com.yahoo.pulsar.client.api.Producer;
import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.client.impl.PulsarClientImpl;
import com.yahoo.pulsar.client.impl.ProducerImpl;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -604,6 +608,68 @@ public Void call() throws Exception {
}
}
}

/**
* It verifies that: if it fails while removing replicator-cluster-cursor: it should not restart the replicator and
* it should have cleaned up from the list
*
* @throws Exception
*/
@Test
public void testDeleteReplicatorFailure() throws Exception {
log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---");
final String topicName = "persistent://pulsar/global/ns/repltopicbatch";
final DestinationName dest = DestinationName.get(topicName);
MessageProducer producer1 = new MessageProducer(url1, dest);
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName);
final String replicatorClusterName = topic.getReplicators().keys().get(0);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
CountDownLatch latch = new CountDownLatch(1);
// delete cursor already : so next time if topic.removeReplicator will get exception but then it should
// remove-replicator from the list even with failure
ledger.asyncDeleteCursor("pulsar.repl." + replicatorClusterName, new DeleteCursorCallback() {
@Override
public void deleteCursorComplete(Object ctx) {
latch.countDown();
}

@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
latch.countDown();
}
}, null);
latch.await();

Method removeReplicator = PersistentTopic.class.getDeclaredMethod("removeReplicator", String.class);
removeReplicator.setAccessible(true);
// invoke removeReplicator : it fails as cursor is not present: but still it should remove the replicator from
// list without restarting it
CompletableFuture<Void> result = (CompletableFuture<Void>) removeReplicator.invoke(topic,
replicatorClusterName);
result.thenApply((v) -> {
assertNull(topic.getPersistentReplicator(replicatorClusterName));
return null;
});
}

@Test
public void testReplicatorProducerClosing() throws Exception {
log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---");
final String topicName = "persistent://pulsar/global/ns/repltopicbatch";
final DestinationName dest = DestinationName.get(topicName);
MessageProducer producer1 = new MessageProducer(url1, dest);
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName);
final String replicatorClusterName = topic.getReplicators().keys().get(0);
PersistentReplicator replicator = topic.getPersistentReplicator(replicatorClusterName);
pulsar2.close();
pulsar3.close();
replicator.disconnect(false);
Thread.sleep(100);
Field field = PersistentReplicator.class.getDeclaredField("producer");
field.setAccessible(true);
ProducerImpl producer = (ProducerImpl) field.get(replicator);
assertNull(producer);
}

private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,15 @@ public static BundlesData defaultBundle() {
bundle.setBoundaries(boundaries);
return bundle;
}

@Override
public String toString() {
return Objects.toStringHelper(this).add("auth_policies", auth_policies)
.add("replication_clusters", replication_clusters).add("bundles", bundles)
.add("backlog_quota_map", backlog_quota_map).add("persistence", persistence)
.add("latency_stats_sample_rate", latency_stats_sample_rate)
.add("message_ttl_in_seconds", message_ttl_in_seconds).add("retention_policies", retention_policies)
.add("deleted", deleted).toString();
}

}