Skip to content

Commit

Permalink
Only disconnect consumers on cursor-reset
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Feb 28, 2017
1 parent 943f74f commit a32b7ae
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,14 @@ public interface Dispatcher {
boolean canUnsubscribe(Consumer consumer);

/**
* disconnect all consumers and mark dispatcher closed to stop new incoming requests
* mark dispatcher closed to stop new incoming requests and disconnect all consumers
*
* @return
*/
CompletableFuture<Void> close();

/**
* disconnect all consumers
*
* @return
*/
Expand All @@ -57,4 +64,5 @@ public interface Dispatcher {
void redeliverUnacknowledgedMessages(Consumer consumer);

void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions);

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
Expand All @@ -36,12 +37,12 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.util.Codec;
import com.yahoo.pulsar.broker.service.BrokerServiceException;
import com.yahoo.pulsar.broker.service.Consumer;
import com.yahoo.pulsar.broker.service.Dispatcher;
import com.yahoo.pulsar.broker.service.BrokerServiceException;
import com.yahoo.pulsar.client.impl.Backoff;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.util.Codec;
import com.yahoo.pulsar.utils.CopyOnWriteArrayList;

/**
Expand All @@ -68,6 +69,11 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn
private int totalAvailablePermits = 0;
private int readBatchSize;
private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
private static final int FALSE = 0;
private static final int TRUE = 1;
private static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> IS_CLOSED_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "isClosed");
private volatile int isClosed = FALSE;

enum ReadType {
Normal, Replay
Expand All @@ -83,7 +89,7 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso

@Override
public synchronized void addConsumer(Consumer consumer) {
if (closeFuture != null) {
if (IS_CLOSED_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher is already closed. Closing consumer ", name, consumer);
consumer.disconnect();
}
Expand Down Expand Up @@ -213,6 +219,12 @@ public synchronized boolean canUnsubscribe(Consumer consumer) {
return consumerList.size() == 1 && consumerSet.contains(consumer);
}

@Override
public CompletableFuture<Void> close() {
IS_CLOSED_UPDATER.set(this, TRUE);
return disconnectAllConsumers();
}

@Override
public synchronized CompletableFuture<Void> disconnectAllConsumers() {
closeFuture = new CompletableFuture<>();
Expand All @@ -228,8 +240,8 @@ public synchronized CompletableFuture<Void> disconnectAllConsumers() {
}

@Override
public synchronized void reset() {
closeFuture = null;
public void reset() {
IS_CLOSED_UPDATER.set(this, FALSE);
}

@Override
Expand Down Expand Up @@ -409,7 +421,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
* @return nextAvailableConsumer
*/
public Consumer getNextConsumer() {
if (consumerList.isEmpty() || closeFuture != null) {
if (consumerList.isEmpty() || IS_CLOSED_UPDATER.get(this) == TRUE) {
// abort read if no consumers are connected or if disconnect is initiated
return null;
}
Expand Down Expand Up @@ -475,7 +487,7 @@ public Consumer getNextConsumer() {
* @return
*/
private boolean isAtleastOneConsumerAvailable() {
if (consumerList.isEmpty() || closeFuture != null) {
if (consumerList.isEmpty() || IS_CLOSED_UPDATER.get(this) == TRUE) {
// abort read if no consumers are connected or if disconnect is initiated
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
Expand Down Expand Up @@ -60,6 +61,11 @@ public final class PersistentDispatcherSingleActiveConsumer implements Dispatche
private static final int MaxReadBatchSize = 100;
private int readBatchSize;
private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
private static final int FALSE = 0;
private static final int TRUE = 1;
private static final AtomicIntegerFieldUpdater<PersistentDispatcherSingleActiveConsumer> IS_CLOSED_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherSingleActiveConsumer.class, "isClosed");
private volatile int isClosed = FALSE;

public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex,
PersistentTopic topic) {
Expand Down Expand Up @@ -99,7 +105,7 @@ private void pickAndScheduleActiveConsumer() {

@Override
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
if (closeFuture != null) {
if (IS_CLOSED_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher is already closed. Closing consumer ", this.topic.getName(), consumer);
consumer.disconnect();
}
Expand Down Expand Up @@ -153,6 +159,12 @@ public synchronized boolean canUnsubscribe(Consumer consumer) {
return (consumers.size() == 1) && Objects.equals(consumer, ACTIVE_CONSUMER_UPDATER.get(this));
}

@Override
public CompletableFuture<Void> close() {
IS_CLOSED_UPDATER.set(this, TRUE);
return disconnectAllConsumers();
}

/**
* Disconnect all consumers on this dispatcher (server side close). This triggers channelInactive on the inbound
* handler which calls dispatcher.removeConsumer(), where the closeFuture is completed
Expand All @@ -176,8 +188,8 @@ public synchronized CompletableFuture<Void> disconnectAllConsumers() {
}

@Override
public synchronized void reset() {
closeFuture = null;
public void reset() {
IS_CLOSED_UPDATER.set(this, FALSE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ public void findEntryComplete(Position position, Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Failed to disconnect consumer from subscription", topicName, subName, throwable);
}
clearFencingState();
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
future.completeExceptionally(new SubscriptionBusyException("Failed to disconnect consumers from subscription"));
return;
}
Expand All @@ -349,15 +349,15 @@ public void resetComplete(Object ctx) {
log.debug("[{}][{}] Successfully reset subscription to timestamp {}", topicName, subName,
timestamp);
}
clearFencingState();
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
future.complete(null);
}

@Override
public void resetFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}][{}] Failed to reset subscription to timestamp {}", topicName, subName, timestamp,
exception);
clearFencingState();
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
// todo - retry on InvalidCursorPositionException
// or should we just ask user to retry one more time?
if (exception instanceof InvalidCursorPositionException) {
Expand Down Expand Up @@ -471,12 +471,13 @@ public synchronized CompletableFuture<Void> disconnect() {
// block any further consumers on this subscription
IS_FENCED_UPDATER.set(this, TRUE);

(dispatcher != null ? dispatcher.disconnectAllConsumers() : CompletableFuture.completedFuture(null))
(dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null))
.thenCompose(v -> close()).thenRun(() -> {
log.info("[{}][{}] Successfully disconnected and closed subscription", topicName, subName);
disconnectFuture.complete(null);
}).exceptionally(exception -> {
clearFencingState();
IS_FENCED_UPDATER.set(this, FALSE);
dispatcher.reset();
log.error("[{}][{}] Error disconnecting consumers from subscription", topicName, subName,
exception);
disconnectFuture.completeExceptionally(exception);
Expand Down Expand Up @@ -560,11 +561,6 @@ public double getExpiredMessageRate() {
return expiryMonitor.getMessageExpiryRate();
}

private void clearFencingState() {
IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
dispatcher.reset();
}

public PersistentSubscriptionStats getStats() {
PersistentSubscriptionStats subStats = new PersistentSubscriptionStats();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,8 @@ public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws
log.info("-- Exiting {} test --", methodName);
}

@Test(timeOut = 10000)
public void testResetCursor() throws Exception {
@Test(timeOut = 10000, dataProvider = "subType")
public void testResetCursor(SubscriptionType subType) throws Exception {
final RetentionPolicies policy = new RetentionPolicies(60, 52 * 1024);
final DestinationName destName = DestinationName.get("persistent://my-property/use/my-ns/unacked-topic");
final int warmup = 20;
Expand All @@ -360,7 +360,7 @@ public void testResetCursor() throws Exception {

final NavigableMap<Long, TimestampEntryCount> publishTimeIdMap = new ConcurrentSkipListMap<>();

consConfig.setSubscriptionType(SubscriptionType.Shared);
consConfig.setSubscriptionType(subType);
consConfig.setMessageListener((MessageListener) (Consumer consumer, Message msg) -> {
try {
synchronized (received) {
Expand Down Expand Up @@ -426,7 +426,6 @@ public void testResetCursor() throws Exception {
for (String subs : subList) {
log.info("got sub " + subs);
}
consumer.close();
publishTimeIdMap.clear();
// reset the cursor to this timestamp
Assert.assertTrue(subList.contains(subsId));
Expand All @@ -440,7 +439,7 @@ public void testResetCursor() throws Exception {
}
// validate that replay happens after the timestamp
Assert.assertTrue(publishTimeIdMap.firstEntry().getKey() >= timestamp);
consumer.unsubscribe();
consumer.close();
producer.close();
// validate that expected and received counts match
int totalReceived = 0;
Expand Down

0 comments on commit a32b7ae

Please sign in to comment.