Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connect dispatcher again on reset cursor complete #250

Merged
merged 3 commits into from
Feb 28, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,17 @@ public interface Dispatcher {

boolean canUnsubscribe(Consumer consumer);

CompletableFuture<Void> disconnect();
/**
* disconnect all consumers and mark dispatcher closed to stop new incoming requests
*
* @return
*/
CompletableFuture<Void> disconnectAllConsumers();

/**
* mark dispatcher open to serve new incoming requests
*/
void reset();

SubType getType();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso

@Override
public synchronized void addConsumer(Consumer consumer) {
if (closeFuture != null) {
log.warn("[{}] Dispatcher is already closed. Closing consumer ", name, consumer);
consumer.disconnect();
}
if (consumerList.isEmpty()) {
if (havePendingRead || havePendingReplayRead) {
// There is a pending read from previous run. We must wait for it to complete and then rewind
Expand Down Expand Up @@ -210,7 +214,7 @@ public synchronized boolean canUnsubscribe(Consumer consumer) {
}

@Override
public synchronized CompletableFuture<Void> disconnect() {
public synchronized CompletableFuture<Void> disconnectAllConsumers() {
closeFuture = new CompletableFuture<>();
if (consumerList.isEmpty()) {
closeFuture.complete(null);
Expand All @@ -223,6 +227,11 @@ public synchronized CompletableFuture<Void> disconnect() {
return closeFuture;
}

@Override
public synchronized void reset() {
closeFuture = null;
}

@Override
public SubType getType() {
return SubType.Shared;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ private void pickAndScheduleActiveConsumer() {

@Override
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
if (closeFuture != null) {
log.warn("[{}] Dispatcher is already closed. Closing consumer ", this.topic.getName(), consumer);
consumer.disconnect();
}
if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
throw new ConsumerBusyException("Exclusive consumer is already connected");
}
Expand Down Expand Up @@ -156,7 +160,7 @@ public synchronized boolean canUnsubscribe(Consumer consumer) {
* @return
*/
@Override
public synchronized CompletableFuture<Void> disconnect() {
public synchronized CompletableFuture<Void> disconnectAllConsumers() {
closeFuture = new CompletableFuture<>();

if (!consumers.isEmpty()) {
Expand All @@ -171,6 +175,11 @@ public synchronized CompletableFuture<Void> disconnect() {
return closeFuture;
}

@Override
public synchronized void reset() {
closeFuture = null;
}

@Override
public synchronized void readEntriesComplete(final List<Entry> entries, Object obj) {
Consumer readConsumer = (Consumer) obj;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,12 @@ public void findEntryComplete(Position position, Object ctx) {
return;
}

dispatcher.disconnect().whenComplete((aVoid, throwable) -> {
dispatcher.disconnectAllConsumers().whenComplete((aVoid, throwable) -> {
if (throwable != null) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Failed to disconnect consumer from subscription", topicName, subName, throwable);
}
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
clearFencingState();
future.completeExceptionally(new SubscriptionBusyException("Failed to disconnect consumers from subscription"));
return;
}
Expand All @@ -349,15 +349,15 @@ public void resetComplete(Object ctx) {
log.debug("[{}][{}] Successfully reset subscription to timestamp {}", topicName, subName,
timestamp);
}
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
clearFencingState();
future.complete(null);
}

@Override
public void resetFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}][{}] Failed to reset subscription to timestamp {}", topicName, subName, timestamp,
exception);
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
clearFencingState();
// todo - retry on InvalidCursorPositionException
// or should we just ask user to retry one more time?
if (exception instanceof InvalidCursorPositionException) {
Expand Down Expand Up @@ -471,13 +471,12 @@ public synchronized CompletableFuture<Void> disconnect() {
// block any further consumers on this subscription
IS_FENCED_UPDATER.set(this, TRUE);

(dispatcher != null ? dispatcher.disconnect() : CompletableFuture.completedFuture(null))
(dispatcher != null ? dispatcher.disconnectAllConsumers() : CompletableFuture.completedFuture(null))
.thenCompose(v -> close()).thenRun(() -> {
log.info("[{}][{}] Successfully disconnected and closed subscription", topicName, subName);
disconnectFuture.complete(null);
}).exceptionally(exception -> {
IS_FENCED_UPDATER.set(this, FALSE);

clearFencingState();
log.error("[{}][{}] Error disconnecting consumers from subscription", topicName, subName,
exception);
disconnectFuture.completeExceptionally(exception);
Expand Down Expand Up @@ -561,6 +560,11 @@ public double getExpiredMessageRate() {
return expiryMonitor.getMessageExpiryRate();
}

private void clearFencingState() {
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
dispatcher.reset();
}

public PersistentSubscriptionStats getStats() {
PersistentSubscriptionStats subStats = new PersistentSubscriptionStats();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,25 @@
package com.yahoo.pulsar.client.impl;

import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
Expand All @@ -38,16 +46,18 @@
import com.yahoo.pulsar.client.api.Consumer;
import com.yahoo.pulsar.client.api.ConsumerConfiguration;
import com.yahoo.pulsar.client.api.Message;
import com.yahoo.pulsar.client.api.MessageListener;
import com.yahoo.pulsar.client.api.Producer;
import com.yahoo.pulsar.client.api.ProducerConfiguration;
import com.yahoo.pulsar.client.api.ProducerConsumerBase;
import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.client.api.SubscriptionType;
import com.yahoo.pulsar.client.impl.HandlerBase.State;
import com.yahoo.pulsar.common.api.PulsarHandler;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.policies.data.RetentionPolicies;
import com.yahoo.pulsar.common.util.collections.ConcurrentLongHashMap;

public class BrokerClientIntegrationTest extends ProducerConsumerBase {
Expand Down Expand Up @@ -337,5 +347,125 @@ public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws
batchProducer.close();
log.info("-- Exiting {} test --", methodName);
}


@Test(timeOut = 10000)
public void testResetCursor() throws Exception {
final RetentionPolicies policy = new RetentionPolicies(60, 52 * 1024);
final DestinationName destName = DestinationName.get("persistent://my-property/use/my-ns/unacked-topic");
final int warmup = 20;
final int testSize = 150;
final List<Message> received = new ArrayList<Message>();
final ConsumerConfiguration consConfig = new ConsumerConfiguration();
final String subsId = "sub";

final NavigableMap<Long, TimestampEntryCount> publishTimeIdMap = new ConcurrentSkipListMap<>();

consConfig.setSubscriptionType(SubscriptionType.Shared);
consConfig.setMessageListener((MessageListener) (Consumer consumer, Message msg) -> {
try {
synchronized (received) {
received.add(msg);
}
consumer.acknowledge(msg);
long publishTime = ((MessageImpl) msg).getPublishTime();
System.out.println(" publish time is " + publishTime + "," + msg.getMessageId());
TimestampEntryCount timestampEntryCount = publishTimeIdMap.computeIfAbsent(publishTime,
(k) -> new TimestampEntryCount(publishTime));
timestampEntryCount.incrementAndGet();
} catch (final PulsarClientException e) {
System.out.println("Failed to ack!");
}
});

admin.namespaces().setRetention(destName.getNamespace(), policy);

Consumer consumer = pulsarClient.subscribe(destName.toString(), subsId, consConfig);
final Producer producer = pulsarClient.createProducer(destName.toString());

log.info("warm up started for " + destName.toString());
// send warmup msgs
byte[] msgBytes = new byte[1000];
for (Integer i = 0; i < warmup; i++) {
producer.send(msgBytes);
}
log.info("warm up finished.");

// sleep to ensure receiving of msgs
for (int n = 0; n < 10 && received.size() < warmup; n++) {
Thread.sleep(100);
}

// validate received msgs
Assert.assertEquals(received.size(), warmup);
received.clear();

// publish testSize num of msgs
System.out.println("Sending more messages.");
for (Integer n = 0; n < testSize; n++) {
producer.send(msgBytes);
Thread.sleep(1);
}
log.info("Sending more messages done.");

Thread.sleep(3000);

long begints = publishTimeIdMap.firstEntry().getKey();
long endts = publishTimeIdMap.lastEntry().getKey();
// find reset timestamp
long timestamp = (endts - begints) / 2 + begints;
timestamp = publishTimeIdMap.floorKey(timestamp);

NavigableMap<Long, TimestampEntryCount> expectedMessages = new ConcurrentSkipListMap<>();
expectedMessages.putAll(publishTimeIdMap.tailMap(timestamp, true));

received.clear();

log.info("reset cursor to " + timestamp + " for topic " + destName.toString() + " for subs " + subsId);
System.out.println("issuing admin operation on " + admin.getServiceUrl().toString());
List<String> subList = admin.persistentTopics().getSubscriptions(destName.toString());
for (String subs : subList) {
log.info("got sub " + subs);
}
consumer.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consumer close is not needed here, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's correct. fixed it.

publishTimeIdMap.clear();
// reset the cursor to this timestamp
Assert.assertTrue(subList.contains(subsId));
admin.persistentTopics().resetCursor(destName.toString(), subsId, timestamp);

consumer = pulsarClient.subscribe(destName.toString(), subsId, consConfig);
Thread.sleep(3000);
int totalExpected = 0;
for (TimestampEntryCount tec : expectedMessages.values()) {
totalExpected += tec.numMessages;
}
// validate that replay happens after the timestamp
Assert.assertTrue(publishTimeIdMap.firstEntry().getKey() >= timestamp);
consumer.unsubscribe();
producer.close();
// validate that expected and received counts match
int totalReceived = 0;
for (TimestampEntryCount tec : publishTimeIdMap.values()) {
totalReceived += tec.numMessages;
}
Assert.assertEquals(totalReceived, totalExpected, "did not receive all messages on replay after reset");
}

private static class TimestampEntryCount {
private final long timestamp;
private int numMessages;

public TimestampEntryCount(long ts) {
this.numMessages = 0;
this.timestamp = ts;
}

public int incrementAndGet() {
return ++numMessages;
}

public long getTimestamp() {
return timestamp;
}
}

}