Skip to content

Commit

Permalink
Disconnect consumers without closing dispatcher on cursor-reset (#250)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Feb 28, 2017
1 parent e188ed5 commit 0337e61
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,24 @@ public interface Dispatcher {

boolean canUnsubscribe(Consumer consumer);

CompletableFuture<Void> disconnect();
/**
* mark dispatcher closed to stop new incoming requests and disconnect all consumers
*
* @return
*/
CompletableFuture<Void> close();

/**
* disconnect all consumers
*
* @return
*/
CompletableFuture<Void> disconnectAllConsumers();

/**
* mark dispatcher open to serve new incoming requests
*/
void reset();

SubType getType();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,20 @@
*/
package com.yahoo.pulsar.broker.service.persistent;

import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import com.yahoo.pulsar.common.api.proto.PulsarApi;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,12 +37,12 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.util.Codec;
import com.yahoo.pulsar.broker.service.BrokerServiceException;
import com.yahoo.pulsar.broker.service.Consumer;
import com.yahoo.pulsar.broker.service.Dispatcher;
import com.yahoo.pulsar.broker.service.BrokerServiceException;
import com.yahoo.pulsar.client.impl.Backoff;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.util.Codec;
import com.yahoo.pulsar.utils.CopyOnWriteArrayList;

/**
Expand All @@ -71,6 +69,11 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
private int totalAvailablePermits = 0;
private int readBatchSize;
private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
private static final int FALSE = 0;
private static final int TRUE = 1;
private static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> IS_CLOSED_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(PersistentDispatcherMultipleConsumers.class, "isClosed");
private volatile int isClosed = FALSE;

enum ReadType {
Normal, Replay
Expand All @@ -86,6 +89,10 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso

@Override
public synchronized void addConsumer(Consumer consumer) {
if (IS_CLOSED_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher is already closed. Closing consumer ", name, consumer);
consumer.disconnect();
}
if (consumerList.isEmpty()) {
if (havePendingRead || havePendingReplayRead) {
// There is a pending read from previous run. We must wait for it to complete and then rewind
Expand Down Expand Up @@ -212,7 +219,13 @@ public synchronized boolean canUnsubscribe(Consumer consumer) {
}

@Override
public synchronized CompletableFuture<Void> disconnect() {
public CompletableFuture<Void> close() {
IS_CLOSED_UPDATER.set(this, TRUE);
return disconnectAllConsumers();
}

@Override
public synchronized CompletableFuture<Void> disconnectAllConsumers() {
closeFuture = new CompletableFuture<>();
if (consumerList.isEmpty()) {
closeFuture.complete(null);
Expand All @@ -225,6 +238,11 @@ public synchronized CompletableFuture<Void> disconnect() {
return closeFuture;
}

@Override
public void reset() {
IS_CLOSED_UPDATER.set(this, FALSE);
}

@Override
public SubType getType() {
return SubType.Shared;
Expand Down Expand Up @@ -353,7 +371,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
}

private Consumer getNextConsumer() {
if (consumerList.isEmpty() || closeFuture != null) {
if (consumerList.isEmpty() || IS_CLOSED_UPDATER.get(this) == TRUE) {
// abort read if no consumers are connected or if disconnect is initiated
return null;
}
Expand Down Expand Up @@ -384,7 +402,7 @@ private Consumer getNextConsumer() {
* @return
*/
private boolean isAtleastOneConsumerAvailable() {
if (consumerList.isEmpty() || closeFuture != null) {
if (consumerList.isEmpty() || IS_CLOSED_UPDATER.get(this) == TRUE) {
// abort read if no consumers are connected or if disconnect is initiated
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
Expand Down Expand Up @@ -60,6 +61,11 @@ public final class PersistentDispatcherSingleActiveConsumer implements Dispatche
private static final int MaxReadBatchSize = 100;
private int readBatchSize;
private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
private static final int FALSE = 0;
private static final int TRUE = 1;
private static final AtomicIntegerFieldUpdater<PersistentDispatcherSingleActiveConsumer> IS_CLOSED_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(PersistentDispatcherSingleActiveConsumer.class, "isClosed");
private volatile int isClosed = FALSE;

public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex,
PersistentTopic topic) {
Expand Down Expand Up @@ -99,6 +105,10 @@ private void pickAndScheduleActiveConsumer() {

@Override
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
if (IS_CLOSED_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher is already closed. Closing consumer ", this.topic.getName(), consumer);
consumer.disconnect();
}
if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) {
throw new ConsumerBusyException("Exclusive consumer is already connected");
}
Expand Down Expand Up @@ -149,14 +159,20 @@ public synchronized boolean canUnsubscribe(Consumer consumer) {
return (consumers.size() == 1) && Objects.equals(consumer, ACTIVE_CONSUMER_UPDATER.get(this));
}

@Override
public CompletableFuture<Void> close() {
IS_CLOSED_UPDATER.set(this, TRUE);
return disconnectAllConsumers();
}

/**
* Disconnect all consumers on this dispatcher (server side close). This triggers channelInactive on the inbound
* handler which calls dispatcher.removeConsumer(), where the closeFuture is completed
*
* @return
*/
@Override
public synchronized CompletableFuture<Void> disconnect() {
public synchronized CompletableFuture<Void> disconnectAllConsumers() {
closeFuture = new CompletableFuture<>();

if (!consumers.isEmpty()) {
Expand All @@ -171,6 +187,11 @@ public synchronized CompletableFuture<Void> disconnect() {
return closeFuture;
}

@Override
public void reset() {
IS_CLOSED_UPDATER.set(this, FALSE);
}

@Override
public synchronized void readEntriesComplete(final List<Entry> entries, Object obj) {
Consumer readConsumer = (Consumer) obj;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public void findEntryComplete(Position position, Object ctx) {
return;
}

dispatcher.disconnect().whenComplete((aVoid, throwable) -> {
dispatcher.disconnectAllConsumers().whenComplete((aVoid, throwable) -> {
if (throwable != null) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Failed to disconnect consumer from subscription", topicName, subName, throwable);
Expand Down Expand Up @@ -471,13 +471,13 @@ public synchronized CompletableFuture<Void> disconnect() {
// block any further consumers on this subscription
IS_FENCED_UPDATER.set(this, TRUE);

(dispatcher != null ? dispatcher.disconnect() : CompletableFuture.completedFuture(null))
(dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null))
.thenCompose(v -> close()).thenRun(() -> {
log.info("[{}][{}] Successfully disconnected and closed subscription", topicName, subName);
disconnectFuture.complete(null);
}).exceptionally(exception -> {
IS_FENCED_UPDATER.set(this, FALSE);

dispatcher.reset();
log.error("[{}][{}] Error disconnecting consumers from subscription", topicName, subName,
exception);
disconnectFuture.completeExceptionally(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,25 @@
package com.yahoo.pulsar.client.impl;

import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
Expand All @@ -38,16 +46,18 @@
import com.yahoo.pulsar.client.api.Consumer;
import com.yahoo.pulsar.client.api.ConsumerConfiguration;
import com.yahoo.pulsar.client.api.Message;
import com.yahoo.pulsar.client.api.MessageListener;
import com.yahoo.pulsar.client.api.Producer;
import com.yahoo.pulsar.client.api.ProducerConfiguration;
import com.yahoo.pulsar.client.api.ProducerConsumerBase;
import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.client.api.SubscriptionType;
import com.yahoo.pulsar.client.impl.HandlerBase.State;
import com.yahoo.pulsar.common.api.PulsarHandler;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.naming.NamespaceBundle;
import com.yahoo.pulsar.common.policies.data.RetentionPolicies;
import com.yahoo.pulsar.common.util.collections.ConcurrentLongHashMap;

public class BrokerClientIntegrationTest extends ProducerConsumerBase {
Expand Down Expand Up @@ -338,4 +348,122 @@ public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws
log.info("-- Exiting {} test --", methodName);
}

@Test(timeOut = 10000, dataProvider = "subType")
public void testResetCursor(SubscriptionType subType) throws Exception {
final RetentionPolicies policy = new RetentionPolicies(60, 52 * 1024);
final DestinationName destName = DestinationName.get("persistent://my-property/use/my-ns/unacked-topic");
final int warmup = 20;
final int testSize = 150;
final List<Message> received = new ArrayList<Message>();
final ConsumerConfiguration consConfig = new ConsumerConfiguration();
final String subsId = "sub";

final NavigableMap<Long, TimestampEntryCount> publishTimeIdMap = new ConcurrentSkipListMap<>();

consConfig.setSubscriptionType(subType);
consConfig.setMessageListener((MessageListener) (Consumer consumer, Message msg) -> {
try {
synchronized (received) {
received.add(msg);
}
consumer.acknowledge(msg);
long publishTime = ((MessageImpl) msg).getPublishTime();
System.out.println(" publish time is " + publishTime + "," + msg.getMessageId());
TimestampEntryCount timestampEntryCount = publishTimeIdMap.computeIfAbsent(publishTime,
(k) -> new TimestampEntryCount(publishTime));
timestampEntryCount.incrementAndGet();
} catch (final PulsarClientException e) {
System.out.println("Failed to ack!");
}
});

admin.namespaces().setRetention(destName.getNamespace(), policy);

Consumer consumer = pulsarClient.subscribe(destName.toString(), subsId, consConfig);
final Producer producer = pulsarClient.createProducer(destName.toString());

log.info("warm up started for " + destName.toString());
// send warmup msgs
byte[] msgBytes = new byte[1000];
for (Integer i = 0; i < warmup; i++) {
producer.send(msgBytes);
}
log.info("warm up finished.");

// sleep to ensure receiving of msgs
for (int n = 0; n < 10 && received.size() < warmup; n++) {
Thread.sleep(100);
}

// validate received msgs
Assert.assertEquals(received.size(), warmup);
received.clear();

// publish testSize num of msgs
System.out.println("Sending more messages.");
for (Integer n = 0; n < testSize; n++) {
producer.send(msgBytes);
Thread.sleep(1);
}
log.info("Sending more messages done.");

Thread.sleep(3000);

long begints = publishTimeIdMap.firstEntry().getKey();
long endts = publishTimeIdMap.lastEntry().getKey();
// find reset timestamp
long timestamp = (endts - begints) / 2 + begints;
timestamp = publishTimeIdMap.floorKey(timestamp);

NavigableMap<Long, TimestampEntryCount> expectedMessages = new ConcurrentSkipListMap<>();
expectedMessages.putAll(publishTimeIdMap.tailMap(timestamp, true));

received.clear();

log.info("reset cursor to " + timestamp + " for topic " + destName.toString() + " for subs " + subsId);
System.out.println("issuing admin operation on " + admin.getServiceUrl().toString());
List<String> subList = admin.persistentTopics().getSubscriptions(destName.toString());
for (String subs : subList) {
log.info("got sub " + subs);
}
publishTimeIdMap.clear();
// reset the cursor to this timestamp
Assert.assertTrue(subList.contains(subsId));
admin.persistentTopics().resetCursor(destName.toString(), subsId, timestamp);

consumer = pulsarClient.subscribe(destName.toString(), subsId, consConfig);
Thread.sleep(3000);
int totalExpected = 0;
for (TimestampEntryCount tec : expectedMessages.values()) {
totalExpected += tec.numMessages;
}
// validate that replay happens after the timestamp
Assert.assertTrue(publishTimeIdMap.firstEntry().getKey() >= timestamp);
consumer.close();
producer.close();
// validate that expected and received counts match
int totalReceived = 0;
for (TimestampEntryCount tec : publishTimeIdMap.values()) {
totalReceived += tec.numMessages;
}
Assert.assertEquals(totalReceived, totalExpected, "did not receive all messages on replay after reset");
}

private static class TimestampEntryCount {
private final long timestamp;
private int numMessages;

public TimestampEntryCount(long ts) {
this.numMessages = 0;
this.timestamp = ts;
}

public int incrementAndGet() {
return ++numMessages;
}

public long getTimestamp() {
return timestamp;
}
}
}

0 comments on commit 0337e61

Please sign in to comment.