Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connect dispatcher again on reset cursor complete #250

Merged
merged 3 commits into from
Feb 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,29 @@ public interface Dispatcher {

boolean canUnsubscribe(Consumer consumer);

CompletableFuture<Void> disconnect();
/**
* mark dispatcher closed to stop new incoming requests and disconnect all consumers
*
* @return
*/
CompletableFuture<Void> close();

/**
* disconnect all consumers
*
* @return
*/
CompletableFuture<Void> disconnectAllConsumers();

/**
* mark dispatcher open to serve new incoming requests
*/
void reset();

SubType getType();

void redeliverUnacknowledgedMessages(Consumer consumer);

void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions);

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
Expand All @@ -36,12 +37,12 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.util.Codec;
import com.yahoo.pulsar.broker.service.BrokerServiceException;
import com.yahoo.pulsar.broker.service.Consumer;
import com.yahoo.pulsar.broker.service.Dispatcher;
import com.yahoo.pulsar.broker.service.BrokerServiceException;
import com.yahoo.pulsar.client.impl.Backoff;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.util.Codec;
import com.yahoo.pulsar.utils.CopyOnWriteArrayList;

/**
Expand All @@ -68,6 +69,11 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
private int totalAvailablePermits = 0;
private int readBatchSize;
private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
private static final int FALSE = 0;
private static final int TRUE = 1;
private static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> IS_CLOSED_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "isClosed");
private volatile int isClosed = FALSE;

enum ReadType {
Normal, Replay
Expand All @@ -83,6 +89,10 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso

@Override
public synchronized void addConsumer(Consumer consumer) {
if (IS_CLOSED_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher is already closed. Closing consumer ", name, consumer);
consumer.disconnect();
}
if (consumerList.isEmpty()) {
if (havePendingRead || havePendingReplayRead) {
// There is a pending read from previous run. We must wait for it to complete and then rewind
Expand Down Expand Up @@ -210,7 +220,13 @@ public synchronized boolean canUnsubscribe(Consumer consumer) {
}

@Override
public synchronized CompletableFuture<Void> disconnect() {
public CompletableFuture<Void> close() {
IS_CLOSED_UPDATER.set(this, TRUE);
return disconnectAllConsumers();
}

@Override
public synchronized CompletableFuture<Void> disconnectAllConsumers() {
closeFuture = new CompletableFuture<>();
if (consumerList.isEmpty()) {
closeFuture.complete(null);
Expand All @@ -223,6 +239,11 @@ public synchronized CompletableFuture<Void> disconnect() {
return closeFuture;
}

@Override
public void reset() {
IS_CLOSED_UPDATER.set(this, FALSE);
}

@Override
public SubType getType() {
return SubType.Shared;
Expand Down Expand Up @@ -400,7 +421,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
* @return nextAvailableConsumer
*/
public Consumer getNextConsumer() {
if (consumerList.isEmpty() || closeFuture != null) {
if (consumerList.isEmpty() || IS_CLOSED_UPDATER.get(this) == TRUE) {
// abort read if no consumers are connected or if disconnect is initiated
return null;
}
Expand Down Expand Up @@ -466,7 +487,7 @@ public Consumer getNextConsumer() {
* @return
*/
private boolean isAtleastOneConsumerAvailable() {
if (consumerList.isEmpty() || closeFuture != null) {
if (consumerList.isEmpty() || IS_CLOSED_UPDATER.get(this) == TRUE) {
// abort read if no consumers are connected or if disconnect is initiated
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
Expand Down Expand Up @@ -60,6 +61,11 @@ public final class PersistentDispatcherSingleActiveConsumer implements Dispatche
private static final int MaxReadBatchSize = 100;
private int readBatchSize;
private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
private static final int FALSE = 0;
private static final int TRUE = 1;
private static final AtomicIntegerFieldUpdater<PersistentDispatcherSingleActiveConsumer> IS_CLOSED_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherSingleActiveConsumer.class, "isClosed");
private volatile int isClosed = FALSE;

public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex,
PersistentTopic topic) {
Expand Down Expand Up @@ -99,6 +105,10 @@ private void pickAndScheduleActiveConsumer() {

@Override
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
if (IS_CLOSED_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher is already closed. Closing consumer ", this.topic.getName(), consumer);
consumer.disconnect();
}
if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
throw new ConsumerBusyException("Exclusive consumer is already connected");
}
Expand Down Expand Up @@ -149,14 +159,20 @@ public synchronized boolean canUnsubscribe(Consumer consumer) {
return (consumers.size() == 1) && Objects.equals(consumer, ACTIVE_CONSUMER_UPDATER.get(this));
}

@Override
public CompletableFuture<Void> close() {
IS_CLOSED_UPDATER.set(this, TRUE);
return disconnectAllConsumers();
}

/**
* Disconnect all consumers on this dispatcher (server side close). This triggers channelInactive on the inbound
* handler which calls dispatcher.removeConsumer(), where the closeFuture is completed
*
* @return
*/
@Override
public synchronized CompletableFuture<Void> disconnect() {
public synchronized CompletableFuture<Void> disconnectAllConsumers() {
closeFuture = new CompletableFuture<>();

if (!consumers.isEmpty()) {
Expand All @@ -171,6 +187,11 @@ public synchronized CompletableFuture<Void> disconnect() {
return closeFuture;
}

@Override
public void reset() {
IS_CLOSED_UPDATER.set(this, FALSE);
}

@Override
public synchronized void readEntriesComplete(final List<Entry> entries, Object obj) {
Consumer readConsumer = (Consumer) obj;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public void findEntryComplete(Position position, Object ctx) {
return;
}

dispatcher.disconnect().whenComplete((aVoid, throwable) -> {
dispatcher.disconnectAllConsumers().whenComplete((aVoid, throwable) -> {
if (throwable != null) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Failed to disconnect consumer from subscription", topicName, subName, throwable);
Expand Down Expand Up @@ -471,13 +471,13 @@ public synchronized CompletableFuture<Void> disconnect() {
// block any further consumers on this subscription
IS_FENCED_UPDATER.set(this, TRUE);

(dispatcher != null ? dispatcher.disconnect() : CompletableFuture.completedFuture(null))
(dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null))
.thenCompose(v -> close()).thenRun(() -> {
log.info("[{}][{}] Successfully disconnected and closed subscription", topicName, subName);
disconnectFuture.complete(null);
}).exceptionally(exception -> {
IS_FENCED_UPDATER.set(this, FALSE);

dispatcher.reset();
log.error("[{}][{}] Error disconnecting consumers from subscription", topicName, subName,
exception);
disconnectFuture.completeExceptionally(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,25 @@
package com.yahoo.pulsar.client.impl;

import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
Expand All @@ -38,16 +46,18 @@
import com.yahoo.pulsar.client.api.Consumer;
import com.yahoo.pulsar.client.api.ConsumerConfiguration;
import com.yahoo.pulsar.client.api.Message;
import com.yahoo.pulsar.client.api.MessageListener;
import com.yahoo.pulsar.client.api.Producer;
import com.yahoo.pulsar.client.api.ProducerConfiguration;
import com.yahoo.pulsar.client.api.ProducerConsumerBase;
import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.client.api.SubscriptionType;
import com.yahoo.pulsar.client.impl.HandlerBase.State;
import com.yahoo.pulsar.common.api.PulsarHandler;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.policies.data.RetentionPolicies;
import com.yahoo.pulsar.common.util.collections.ConcurrentLongHashMap;

public class BrokerClientIntegrationTest extends ProducerConsumerBase {
Expand Down Expand Up @@ -337,5 +347,124 @@ public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws
batchProducer.close();
log.info("-- Exiting {} test --", methodName);
}


@Test(timeOut = 10000, dataProvider = "subType")
public void testResetCursor(SubscriptionType subType) throws Exception {
final RetentionPolicies policy = new RetentionPolicies(60, 52 * 1024);
final DestinationName destName = DestinationName.get("persistent://my-property/use/my-ns/unacked-topic");
final int warmup = 20;
final int testSize = 150;
final List<Message> received = new ArrayList<Message>();
final ConsumerConfiguration consConfig = new ConsumerConfiguration();
final String subsId = "sub";

final NavigableMap<Long, TimestampEntryCount> publishTimeIdMap = new ConcurrentSkipListMap<>();

consConfig.setSubscriptionType(subType);
consConfig.setMessageListener((MessageListener) (Consumer consumer, Message msg) -> {
try {
synchronized (received) {
received.add(msg);
}
consumer.acknowledge(msg);
long publishTime = ((MessageImpl) msg).getPublishTime();
System.out.println(" publish time is " + publishTime + "," + msg.getMessageId());
TimestampEntryCount timestampEntryCount = publishTimeIdMap.computeIfAbsent(publishTime,
(k) -> new TimestampEntryCount(publishTime));
timestampEntryCount.incrementAndGet();
} catch (final PulsarClientException e) {
System.out.println("Failed to ack!");
}
});

admin.namespaces().setRetention(destName.getNamespace(), policy);

Consumer consumer = pulsarClient.subscribe(destName.toString(), subsId, consConfig);
final Producer producer = pulsarClient.createProducer(destName.toString());

log.info("warm up started for " + destName.toString());
// send warmup msgs
byte[] msgBytes = new byte[1000];
for (Integer i = 0; i < warmup; i++) {
producer.send(msgBytes);
}
log.info("warm up finished.");

// sleep to ensure receiving of msgs
for (int n = 0; n < 10 && received.size() < warmup; n++) {
Thread.sleep(100);
}

// validate received msgs
Assert.assertEquals(received.size(), warmup);
received.clear();

// publish testSize num of msgs
System.out.println("Sending more messages.");
for (Integer n = 0; n < testSize; n++) {
producer.send(msgBytes);
Thread.sleep(1);
}
log.info("Sending more messages done.");

Thread.sleep(3000);

long begints = publishTimeIdMap.firstEntry().getKey();
long endts = publishTimeIdMap.lastEntry().getKey();
// find reset timestamp
long timestamp = (endts - begints) / 2 + begints;
timestamp = publishTimeIdMap.floorKey(timestamp);

NavigableMap<Long, TimestampEntryCount> expectedMessages = new ConcurrentSkipListMap<>();
expectedMessages.putAll(publishTimeIdMap.tailMap(timestamp, true));

received.clear();

log.info("reset cursor to " + timestamp + " for topic " + destName.toString() + " for subs " + subsId);
System.out.println("issuing admin operation on " + admin.getServiceUrl().toString());
List<String> subList = admin.persistentTopics().getSubscriptions(destName.toString());
for (String subs : subList) {
log.info("got sub " + subs);
}
publishTimeIdMap.clear();
// reset the cursor to this timestamp
Assert.assertTrue(subList.contains(subsId));
admin.persistentTopics().resetCursor(destName.toString(), subsId, timestamp);

consumer = pulsarClient.subscribe(destName.toString(), subsId, consConfig);
Thread.sleep(3000);
int totalExpected = 0;
for (TimestampEntryCount tec : expectedMessages.values()) {
totalExpected += tec.numMessages;
}
// validate that replay happens after the timestamp
Assert.assertTrue(publishTimeIdMap.firstEntry().getKey() >= timestamp);
consumer.close();
producer.close();
// validate that expected and received counts match
int totalReceived = 0;
for (TimestampEntryCount tec : publishTimeIdMap.values()) {
totalReceived += tec.numMessages;
}
Assert.assertEquals(totalReceived, totalExpected, "did not receive all messages on replay after reset");
}

private static class TimestampEntryCount {
private final long timestamp;
private int numMessages;

public TimestampEntryCount(long ts) {
this.numMessages = 0;
this.timestamp = ts;
}

public int incrementAndGet() {
return ++numMessages;
}

public long getTimestamp() {
return timestamp;
}
}

}