Skip to content

Commit

Permalink
connect dispatcher again on reset cursor complete
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Feb 27, 2017
1 parent ae381b4 commit 149ac5f
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,18 @@ public interface Dispatcher {

boolean canUnsubscribe(Consumer consumer);

/**
* disconnect all consumers and mark dispatcher closed to stop new incoming requests
*
* @return
*/
CompletableFuture<Void> disconnect();

/**
* mark dispatcher open to serve new incoming requests
*/
void connect();

SubType getType();

void redeliverUnacknowledgedMessages(Consumer consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso

@Override
public synchronized void addConsumer(Consumer consumer) {
if (closeFuture != null) {
log.warn("[{}] Dispatcher is already closed. Closing consumer ", name, consumer);
consumer.disconnect();
}
if (consumerList.isEmpty()) {
if (havePendingRead || havePendingReplayRead) {
// There is a pending read from previous run. We must wait for it to complete and then rewind
Expand Down Expand Up @@ -223,6 +227,11 @@ public synchronized CompletableFuture<Void> disconnect() {
return closeFuture;
}

@Override
public synchronized void connect() {
closeFuture = null;
}

@Override
public SubType getType() {
return SubType.Shared;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ private void pickAndScheduleActiveConsumer() {

@Override
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
if (closeFuture != null) {
log.warn("[{}] Dispatcher is already closed. Closing consumer ", this.topic.getName(), consumer);
consumer.disconnect();
}
if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
throw new ConsumerBusyException("Exclusive consumer is already connected");
}
Expand Down Expand Up @@ -171,6 +175,11 @@ public synchronized CompletableFuture<Void> disconnect() {
return closeFuture;
}

@Override
public synchronized void connect() {
closeFuture = null;
}

@Override
public synchronized void readEntriesComplete(final List<Entry> entries, Object obj) {
Consumer readConsumer = (Consumer) obj;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ public void findEntryComplete(Position position, Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Failed to disconnect consumer from subscription", topicName, subName, throwable);
}
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
unfenceSubscriptionAndConnectDispatcher();
future.completeExceptionally(new SubscriptionBusyException("Failed to disconnect consumers from subscription"));
return;
}
Expand All @@ -349,15 +349,15 @@ public void resetComplete(Object ctx) {
log.debug("[{}][{}] Successfully reset subscription to timestamp {}", topicName, subName,
timestamp);
}
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
unfenceSubscriptionAndConnectDispatcher();
future.complete(null);
}

@Override
public void resetFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}][{}] Failed to reset subscription to timestamp {}", topicName, subName, timestamp,
exception);
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
unfenceSubscriptionAndConnectDispatcher();
// todo - retry on InvalidCursorPositionException
// or should we just ask user to retry one more time?
if (exception instanceof InvalidCursorPositionException) {
Expand Down Expand Up @@ -476,8 +476,7 @@ public synchronized CompletableFuture<Void> disconnect() {
log.info("[{}][{}] Successfully disconnected and closed subscription", topicName, subName);
disconnectFuture.complete(null);
}).exceptionally(exception -> {
IS_FENCED_UPDATER.set(this, FALSE);

unfenceSubscriptionAndConnectDispatcher();
log.error("[{}][{}] Error disconnecting consumers from subscription", topicName, subName,
exception);
disconnectFuture.completeExceptionally(exception);
Expand Down Expand Up @@ -561,6 +560,11 @@ public double getExpiredMessageRate() {
return expiryMonitor.getMessageExpiryRate();
}

private void unfenceSubscriptionAndConnectDispatcher() {
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
dispatcher.connect();
}

public PersistentSubscriptionStats getStats() {
PersistentSubscriptionStats subStats = new PersistentSubscriptionStats();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,25 @@
package com.yahoo.pulsar.client.impl;

import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
Expand All @@ -38,16 +46,18 @@
import com.yahoo.pulsar.client.api.Consumer;
import com.yahoo.pulsar.client.api.ConsumerConfiguration;
import com.yahoo.pulsar.client.api.Message;
import com.yahoo.pulsar.client.api.MessageListener;
import com.yahoo.pulsar.client.api.Producer;
import com.yahoo.pulsar.client.api.ProducerConfiguration;
import com.yahoo.pulsar.client.api.ProducerConsumerBase;
import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.client.api.SubscriptionType;
import com.yahoo.pulsar.client.impl.HandlerBase.State;
import com.yahoo.pulsar.common.api.PulsarHandler;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.policies.data.RetentionPolicies;
import com.yahoo.pulsar.common.util.collections.ConcurrentLongHashMap;

public class BrokerClientIntegrationTest extends ProducerConsumerBase {
Expand Down Expand Up @@ -337,5 +347,125 @@ public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws
batchProducer.close();
log.info("-- Exiting {} test --", methodName);
}


@Test(timeOut = 10000)
public void testResetCursor() throws Exception {
final RetentionPolicies policy = new RetentionPolicies(60, 52 * 1024);
final DestinationName destName = DestinationName.get("persistent://my-property/use/my-ns/unacked-topic");
final int warmup = 20;
final int testSize = 150;
final List<Message> received = new ArrayList<Message>();
final ConsumerConfiguration consConfig = new ConsumerConfiguration();
final String subsId = "sub";

final NavigableMap<Long, TimestampEntryCount> publishTimeIdMap = new ConcurrentSkipListMap<>();

consConfig.setSubscriptionType(SubscriptionType.Shared);
consConfig.setMessageListener((MessageListener) (Consumer consumer, Message msg) -> {
try {
synchronized (received) {
received.add(msg);
}
consumer.acknowledge(msg);
long publishTime = ((MessageImpl) msg).getPublishTime();
System.out.println(" publish time is " + publishTime + "," + msg.getMessageId());
TimestampEntryCount timestampEntryCount = publishTimeIdMap.computeIfAbsent(publishTime,
(k) -> new TimestampEntryCount(publishTime));
timestampEntryCount.incrementAndGet();
} catch (final PulsarClientException e) {
System.out.println("Failed to ack!");
}
});

admin.namespaces().setRetention(destName.getNamespace(), policy);

Consumer consumer = pulsarClient.subscribe(destName.toString(), subsId, consConfig);
final Producer producer = pulsarClient.createProducer(destName.toString());

log.info("warm up started for " + destName.toString());
// send warmup msgs
byte[] msgBytes = new byte[1000];
for (Integer i = 0; i < warmup; i++) {
producer.send(msgBytes);
}
log.info("warm up finished.");

// sleep to ensure receiving of msgs
for (int n = 0; n < 10 && received.size() < warmup; n++) {
Thread.sleep(100);
}

// validate received msgs
Assert.assertEquals(received.size(), warmup);
received.clear();

// publish testSize num of msgs
System.out.println("Sending more messages.");
for (Integer n = 0; n < testSize; n++) {
producer.send(msgBytes);
Thread.sleep(1);
}
log.info("Sending more messages done.");

Thread.sleep(3000);

long begints = publishTimeIdMap.firstEntry().getKey();
long endts = publishTimeIdMap.lastEntry().getKey();
// find reset timestamp
long timestamp = (endts - begints) / 2 + begints;
timestamp = publishTimeIdMap.floorKey(timestamp);

NavigableMap<Long, TimestampEntryCount> expectedMessages = new ConcurrentSkipListMap<>();
expectedMessages.putAll(publishTimeIdMap.tailMap(timestamp, true));

received.clear();

log.info("reset cursor to " + timestamp + " for topic " + destName.toString() + " for subs " + subsId);
System.out.println("issuing admin operation on " + admin.getServiceUrl().toString());
List<String> subList = admin.persistentTopics().getSubscriptions(destName.toString());
for (String subs : subList) {
log.info("got sub " + subs);
}
consumer.close();
publishTimeIdMap.clear();
// reset the cursor to this timestamp
Assert.assertTrue(subList.contains(subsId));
admin.persistentTopics().resetCursor(destName.toString(), subsId, timestamp);

consumer = pulsarClient.subscribe(destName.toString(), subsId, consConfig);
Thread.sleep(3000);
int totalExpected = 0;
for (TimestampEntryCount tec : expectedMessages.values()) {
totalExpected += tec.numMessages;
}
// validate that replay happens after the timestamp
Assert.assertTrue(publishTimeIdMap.firstEntry().getKey() >= timestamp);
consumer.unsubscribe();
producer.close();
// validate that expected and received counts match
int totalReceived = 0;
for (TimestampEntryCount tec : publishTimeIdMap.values()) {
totalReceived += tec.numMessages;
}
Assert.assertEquals(totalReceived, totalExpected, "did not receive all messages on replay after reset");
}

private static class TimestampEntryCount {
private final long timestamp;
private int numMessages;

public TimestampEntryCount(long ts) {
this.numMessages = 0;
this.timestamp = ts;
}

public int incrementAndGet() {
return ++numMessages;
}

public long getTimestamp() {
return timestamp;
}
}

}

0 comments on commit 149ac5f

Please sign in to comment.