diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Dispatcher.java index 4d6f7c0abca0a..dc5cb60ae0355 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Dispatcher.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Dispatcher.java @@ -41,7 +41,24 @@ public interface Dispatcher { boolean canUnsubscribe(Consumer consumer); - CompletableFuture disconnect(); + /** + * mark dispatcher closed to stop new incoming requests and disconnect all consumers + * + * @return + */ + CompletableFuture close(); + + /** + * disconnect all consumers + * + * @return + */ + CompletableFuture disconnectAllConsumers(); + + /** + * mark dispatcher open to serve new incoming requests + */ + void reset(); SubType getType(); diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 832b1a5e17b5f..0fae0000c998c 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -15,14 +15,13 @@ */ package com.yahoo.pulsar.broker.service.persistent; -import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import com.yahoo.pulsar.common.api.proto.PulsarApi; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -30,7 +29,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; -import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,12 +37,12 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; -import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; -import com.yahoo.pulsar.common.util.Codec; +import com.yahoo.pulsar.broker.service.BrokerServiceException; import com.yahoo.pulsar.broker.service.Consumer; import com.yahoo.pulsar.broker.service.Dispatcher; -import com.yahoo.pulsar.broker.service.BrokerServiceException; import com.yahoo.pulsar.client.impl.Backoff; +import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; +import com.yahoo.pulsar.common.util.Codec; import com.yahoo.pulsar.utils.CopyOnWriteArrayList; /** @@ -71,6 +69,11 @@ public class PersistentDispatcherMultipleConsumers implements Dispatcher, ReadEn private int totalAvailablePermits = 0; private int readBatchSize; private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES); + private static final int FALSE = 0; + private static final int TRUE = 1; + private static final AtomicIntegerFieldUpdater IS_CLOSED_UPDATER = AtomicIntegerFieldUpdater + .newUpdater(PersistentDispatcherMultipleConsumers.class, "isClosed"); + private volatile int isClosed = FALSE; enum ReadType { Normal, Replay @@ -86,6 +89,10 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso @Override public synchronized void addConsumer(Consumer consumer) { + if (IS_CLOSED_UPDATER.get(this) == TRUE) { + log.warn("[{}] Dispatcher is already closed. Closing consumer ", name, consumer); + consumer.disconnect(); + } if (consumerList.isEmpty()) { if (havePendingRead || havePendingReplayRead) { // There is a pending read from previous run. We must wait for it to complete and then rewind @@ -212,7 +219,13 @@ public synchronized boolean canUnsubscribe(Consumer consumer) { } @Override - public synchronized CompletableFuture disconnect() { + public CompletableFuture close() { + IS_CLOSED_UPDATER.set(this, TRUE); + return disconnectAllConsumers(); + } + + @Override + public synchronized CompletableFuture disconnectAllConsumers() { closeFuture = new CompletableFuture<>(); if (consumerList.isEmpty()) { closeFuture.complete(null); @@ -225,6 +238,11 @@ public synchronized CompletableFuture disconnect() { return closeFuture; } + @Override + public void reset() { + IS_CLOSED_UPDATER.set(this, FALSE); + } + @Override public SubType getType() { return SubType.Shared; @@ -353,7 +371,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj } private Consumer getNextConsumer() { - if (consumerList.isEmpty() || closeFuture != null) { + if (consumerList.isEmpty() || IS_CLOSED_UPDATER.get(this) == TRUE) { // abort read if no consumers are connected or if disconnect is initiated return null; } @@ -384,7 +402,7 @@ private Consumer getNextConsumer() { * @return */ private boolean isAtleastOneConsumerAvailable() { - if (consumerList.isEmpty() || closeFuture != null) { + if (consumerList.isEmpty() || IS_CLOSED_UPDATER.get(this) == TRUE) { // abort read if no consumers are connected or if disconnect is initiated return false; } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 7b5bbb0470d09..d4b1fdf7be0ae 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -22,6 +22,7 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; @@ -60,6 +61,11 @@ public final class PersistentDispatcherSingleActiveConsumer implements Dispatche private static final int MaxReadBatchSize = 100; private int readBatchSize; private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES); + private static final int FALSE = 0; + private static final int TRUE = 1; + private static final AtomicIntegerFieldUpdater IS_CLOSED_UPDATER = AtomicIntegerFieldUpdater + .newUpdater(PersistentDispatcherSingleActiveConsumer.class, "isClosed"); + private volatile int isClosed = FALSE; public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex, PersistentTopic topic) { @@ -99,6 +105,10 @@ private void pickAndScheduleActiveConsumer() { @Override public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException { + if (IS_CLOSED_UPDATER.get(this) == TRUE) { + log.warn("[{}] Dispatcher is already closed. Closing consumer ", this.topic.getName(), consumer); + consumer.disconnect(); + } if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) { throw new ConsumerBusyException("Exclusive consumer is already connected"); } @@ -149,6 +159,12 @@ public synchronized boolean canUnsubscribe(Consumer consumer) { return (consumers.size() == 1) && Objects.equals(consumer, ACTIVE_CONSUMER_UPDATER.get(this)); } + @Override + public CompletableFuture close() { + IS_CLOSED_UPDATER.set(this, TRUE); + return disconnectAllConsumers(); + } + /** * Disconnect all consumers on this dispatcher (server side close). This triggers channelInactive on the inbound * handler which calls dispatcher.removeConsumer(), where the closeFuture is completed @@ -156,7 +172,7 @@ public synchronized boolean canUnsubscribe(Consumer consumer) { * @return */ @Override - public synchronized CompletableFuture disconnect() { + public synchronized CompletableFuture disconnectAllConsumers() { closeFuture = new CompletableFuture<>(); if (!consumers.isEmpty()) { @@ -171,6 +187,11 @@ public synchronized CompletableFuture disconnect() { return closeFuture; } + @Override + public void reset() { + IS_CLOSED_UPDATER.set(this, FALSE); + } + @Override public synchronized void readEntriesComplete(final List entries, Object obj) { Consumer readConsumer = (Consumer) obj; diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java index 37a16ed6b640d..68a57115e54ed 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java @@ -332,7 +332,7 @@ public void findEntryComplete(Position position, Object ctx) { return; } - dispatcher.disconnect().whenComplete((aVoid, throwable) -> { + dispatcher.disconnectAllConsumers().whenComplete((aVoid, throwable) -> { if (throwable != null) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Failed to disconnect consumer from subscription", topicName, subName, throwable); @@ -471,13 +471,13 @@ public synchronized CompletableFuture disconnect() { // block any further consumers on this subscription IS_FENCED_UPDATER.set(this, TRUE); - (dispatcher != null ? dispatcher.disconnect() : CompletableFuture.completedFuture(null)) + (dispatcher != null ? dispatcher.close() : CompletableFuture.completedFuture(null)) .thenCompose(v -> close()).thenRun(() -> { log.info("[{}][{}] Successfully disconnected and closed subscription", topicName, subName); disconnectFuture.complete(null); }).exceptionally(exception -> { IS_FENCED_UPDATER.set(this, FALSE); - + dispatcher.reset(); log.error("[{}][{}] Error disconnecting consumers from subscription", topicName, subName, exception); disconnectFuture.completeExceptionally(exception); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/BrokerClientIntegrationTest.java index aedc085358382..e461dc7db00fd 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/BrokerClientIntegrationTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/impl/BrokerClientIntegrationTest.java @@ -16,17 +16,25 @@ package com.yahoo.pulsar.client.impl; import static org.mockito.Matchers.anyObject; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import java.lang.reflect.Field; -import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; +import java.util.NavigableMap; import java.util.Set; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -38,16 +46,18 @@ import com.yahoo.pulsar.client.api.Consumer; import com.yahoo.pulsar.client.api.ConsumerConfiguration; import com.yahoo.pulsar.client.api.Message; +import com.yahoo.pulsar.client.api.MessageListener; import com.yahoo.pulsar.client.api.Producer; import com.yahoo.pulsar.client.api.ProducerConfiguration; import com.yahoo.pulsar.client.api.ProducerConsumerBase; import com.yahoo.pulsar.client.api.PulsarClient; +import com.yahoo.pulsar.client.api.PulsarClientException; import com.yahoo.pulsar.client.api.SubscriptionType; import com.yahoo.pulsar.client.impl.HandlerBase.State; import com.yahoo.pulsar.common.api.PulsarHandler; import com.yahoo.pulsar.common.naming.DestinationName; import com.yahoo.pulsar.common.naming.NamespaceBundle; -import com.yahoo.pulsar.common.naming.NamespaceBundle; +import com.yahoo.pulsar.common.policies.data.RetentionPolicies; import com.yahoo.pulsar.common.util.collections.ConcurrentLongHashMap; public class BrokerClientIntegrationTest extends ProducerConsumerBase { @@ -338,4 +348,122 @@ public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws log.info("-- Exiting {} test --", methodName); } + @Test(timeOut = 10000, dataProvider = "subType") + public void testResetCursor(SubscriptionType subType) throws Exception { + final RetentionPolicies policy = new RetentionPolicies(60, 52 * 1024); + final DestinationName destName = DestinationName.get("persistent://my-property/use/my-ns/unacked-topic"); + final int warmup = 20; + final int testSize = 150; + final List received = new ArrayList(); + final ConsumerConfiguration consConfig = new ConsumerConfiguration(); + final String subsId = "sub"; + + final NavigableMap publishTimeIdMap = new ConcurrentSkipListMap<>(); + + consConfig.setSubscriptionType(subType); + consConfig.setMessageListener((MessageListener) (Consumer consumer, Message msg) -> { + try { + synchronized (received) { + received.add(msg); + } + consumer.acknowledge(msg); + long publishTime = ((MessageImpl) msg).getPublishTime(); + System.out.println(" publish time is " + publishTime + "," + msg.getMessageId()); + TimestampEntryCount timestampEntryCount = publishTimeIdMap.computeIfAbsent(publishTime, + (k) -> new TimestampEntryCount(publishTime)); + timestampEntryCount.incrementAndGet(); + } catch (final PulsarClientException e) { + System.out.println("Failed to ack!"); + } + }); + + admin.namespaces().setRetention(destName.getNamespace(), policy); + + Consumer consumer = pulsarClient.subscribe(destName.toString(), subsId, consConfig); + final Producer producer = pulsarClient.createProducer(destName.toString()); + + log.info("warm up started for " + destName.toString()); + // send warmup msgs + byte[] msgBytes = new byte[1000]; + for (Integer i = 0; i < warmup; i++) { + producer.send(msgBytes); + } + log.info("warm up finished."); + + // sleep to ensure receiving of msgs + for (int n = 0; n < 10 && received.size() < warmup; n++) { + Thread.sleep(100); + } + + // validate received msgs + Assert.assertEquals(received.size(), warmup); + received.clear(); + + // publish testSize num of msgs + System.out.println("Sending more messages."); + for (Integer n = 0; n < testSize; n++) { + producer.send(msgBytes); + Thread.sleep(1); + } + log.info("Sending more messages done."); + + Thread.sleep(3000); + + long begints = publishTimeIdMap.firstEntry().getKey(); + long endts = publishTimeIdMap.lastEntry().getKey(); + // find reset timestamp + long timestamp = (endts - begints) / 2 + begints; + timestamp = publishTimeIdMap.floorKey(timestamp); + + NavigableMap expectedMessages = new ConcurrentSkipListMap<>(); + expectedMessages.putAll(publishTimeIdMap.tailMap(timestamp, true)); + + received.clear(); + + log.info("reset cursor to " + timestamp + " for topic " + destName.toString() + " for subs " + subsId); + System.out.println("issuing admin operation on " + admin.getServiceUrl().toString()); + List subList = admin.persistentTopics().getSubscriptions(destName.toString()); + for (String subs : subList) { + log.info("got sub " + subs); + } + publishTimeIdMap.clear(); + // reset the cursor to this timestamp + Assert.assertTrue(subList.contains(subsId)); + admin.persistentTopics().resetCursor(destName.toString(), subsId, timestamp); + + consumer = pulsarClient.subscribe(destName.toString(), subsId, consConfig); + Thread.sleep(3000); + int totalExpected = 0; + for (TimestampEntryCount tec : expectedMessages.values()) { + totalExpected += tec.numMessages; + } + // validate that replay happens after the timestamp + Assert.assertTrue(publishTimeIdMap.firstEntry().getKey() >= timestamp); + consumer.close(); + producer.close(); + // validate that expected and received counts match + int totalReceived = 0; + for (TimestampEntryCount tec : publishTimeIdMap.values()) { + totalReceived += tec.numMessages; + } + Assert.assertEquals(totalReceived, totalExpected, "did not receive all messages on replay after reset"); + } + + private static class TimestampEntryCount { + private final long timestamp; + private int numMessages; + + public TimestampEntryCount(long ts) { + this.numMessages = 0; + this.timestamp = ts; + } + + public int incrementAndGet() { + return ++numMessages; + } + + public long getTimestamp() { + return timestamp; + } + } }