diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index efd183d0bfb70..46ca0f14003b5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.common.annotation.InterfaceAudience; import org.apache.bookkeeper.common.annotation.InterfaceStability; import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback; @@ -79,6 +80,20 @@ enum IndividualDeletedEntries { */ Map getProperties(); + /** + * Return any properties that were associated with the cursor. + */ + Map getCursorProperties(); + + /** + * Updates the properties. + * @param cursorProperties + * @return a handle to the result of the operation + */ + default CompletableFuture setCursorProperties(Map cursorProperties) { + return CompletableFuture.completedFuture(null); + } + /** * Add a property associated with the last stored position. */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 1f6e0d3af4641..7196a3b4c039c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -240,10 +240,13 @@ ManagedCursor openCursor(String name, InitialPosition initialPosition) throws In * @param properties * user defined properties that will be attached to the first position of the cursor, if the open * operation will trigger the creation of the cursor. + * @param cursorProperties + * the properties for the Cursor * @return the ManagedCursor * @throws ManagedLedgerException */ - ManagedCursor openCursor(String name, InitialPosition initialPosition, Map properties) + ManagedCursor openCursor(String name, InitialPosition initialPosition, Map properties, + Map cursorProperties) throws InterruptedException, ManagedLedgerException; /** @@ -337,13 +340,15 @@ ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionNam * @param initialPosition * the cursor will be set at lastest position or not when first created * default is true + * @param cursorProperties + * the properties for the Cursor * @param callback * callback object * @param ctx * opaque context */ void asyncOpenCursor(String name, InitialPosition initialPosition, Map properties, - OpenCursorCallback callback, Object ctx); + Map cursorProperties, OpenCursorCallback callback, Object ctx); /** * Get a list of all the cursors reading from this ManagedLedger. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 0c3e46c1acd6f..3d558a231dba6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -91,6 +91,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.StringProperty; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.apache.pulsar.common.util.collections.LongPairRangeSet; @@ -116,6 +117,7 @@ public class ManagedCursorImpl implements ManagedCursor { protected final ManagedLedgerConfig config; protected final ManagedLedgerImpl ledger; private final String name; + private volatile Map cursorProperties; private final BookKeeper.DigestType digestType; protected volatile PositionImpl markDeletePosition; @@ -280,6 +282,7 @@ public interface VoidCallback { ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName) { this.bookkeeper = bookkeeper; + this.cursorProperties = Collections.emptyMap(); this.config = config; this.ledger = ledger; this.name = cursorName; @@ -313,6 +316,52 @@ public Map getProperties() { return lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties : Collections.emptyMap(); } + @Override + public Map getCursorProperties() { + return cursorProperties; + } + + @Override + public CompletableFuture setCursorProperties(Map cursorProperties) { + CompletableFuture updateCursorPropertiesResult = new CompletableFuture<>(); + ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() { + @Override + public void operationComplete(ManagedCursorInfo info, Stat stat) { + ManagedCursorInfo copy = ManagedCursorInfo + .newBuilder(info) + .clearCursorProperties() + .addAllCursorProperties(buildStringPropertiesMap(cursorProperties)) + .build(); + ledger.getStore().asyncUpdateCursorInfo(ledger.getName(), + name, copy, stat, new MetaStoreCallback<>() { + @Override + public void operationComplete(Void result, Stat stat) { + log.info("[{}] Updated ledger cursor: {} properties {}", ledger.getName(), + name, cursorProperties); + ManagedCursorImpl.this.cursorProperties = cursorProperties; + cursorLedgerStat = stat; + updateCursorPropertiesResult.complete(result); + } + + @Override + public void operationFailed(MetaStoreException e) { + log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(), + name, cursorProperties, e); + updateCursorPropertiesResult.completeExceptionally(e); + } + }); + } + + @Override + public void operationFailed(MetaStoreException e) { + log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(), + name, cursorProperties, e); + updateCursorPropertiesResult.completeExceptionally(e); + } + }); + return updateCursorPropertiesResult; + } + @Override public boolean putProperty(String key, Long value) { if (lastMarkDeleteEntry != null) { @@ -361,6 +410,18 @@ public void operationComplete(ManagedCursorInfo info, Stat stat) { cursorLedgerStat = stat; lastActive = info.getLastActive() != 0 ? info.getLastActive() : lastActive; + + Map recoveredCursorProperties = Collections.emptyMap(); + if (info.getCursorPropertiesCount() > 0) { + // Recover properties map + recoveredCursorProperties = Maps.newHashMap(); + for (int i = 0; i < info.getCursorPropertiesCount(); i++) { + StringProperty property = info.getCursorProperties(i); + recoveredCursorProperties.put(property.getName(), property.getValue()); + } + } + cursorProperties = recoveredCursorProperties; + if (info.getCursorsLedgerId() == -1L) { // There is no cursor ledger to read the last position from. It means the cursor has been properly // closed and the last mark-delete position is stored in the ManagedCursorInfo itself. @@ -380,7 +441,7 @@ public void operationComplete(ManagedCursorInfo info, Stat stat) { } } - recoveredCursor(recoveredPosition, recoveredProperties, null); + recoveredCursor(recoveredPosition, recoveredProperties, recoveredCursorProperties, null); callback.operationComplete(); } else { // Need to proceed and read the last entry in the specified ledger to find out the last position @@ -410,7 +471,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac log.error("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name, BKException.getMessage(rc)); // Rewind to oldest entry available - initialize(getRollbackPosition(info), Collections.emptyMap(), callback); + initialize(getRollbackPosition(info), Collections.emptyMap(), Collections.emptyMap(), callback); return; } else if (rc != BKException.Code.OK) { log.warn("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name, @@ -426,7 +487,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac log.warn("[{}] Error reading from metadata ledger {} for consumer {}: No entries in ledger", ledger.getName(), ledgerId, name); // Rewind to last cursor snapshot available - initialize(getRollbackPosition(info), Collections.emptyMap(), callback); + initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback); return; } @@ -438,7 +499,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac log.error("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name, BKException.getMessage(rc1)); // Rewind to oldest entry available - initialize(getRollbackPosition(info), Collections.emptyMap(), callback); + initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback); return; } else if (rc1 != BKException.Code.OK) { log.warn("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(), @@ -476,7 +537,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac && positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) { recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList()); } - recoveredCursor(position, recoveredProperties, lh); + recoveredCursor(position, recoveredProperties, cursorProperties, lh); callback.operationComplete(); }, null); }; @@ -547,6 +608,7 @@ private void recoverBatchDeletedIndexes ( } private void recoveredCursor(PositionImpl position, Map properties, + Map cursorProperties, LedgerHandle recoveredFromCursorLedger) { // if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty), // we need to move to the next existing ledger @@ -564,7 +626,7 @@ private void recoveredCursor(PositionImpl position, Map properties position = ledger.getLastPosition(); } log.info("[{}] Cursor {} recovered to position {}", ledger.getName(), name, position); - + this.cursorProperties = cursorProperties; messagesConsumedCounter = -getNumberOfEntries(Range.openClosed(position, ledger.getLastPosition())); markDeletePosition = position; persistentMarkDeletePosition = position; @@ -577,8 +639,9 @@ private void recoveredCursor(PositionImpl position, Map properties STATE_UPDATER.set(this, State.NoLedger); } - void initialize(PositionImpl position, Map properties, final VoidCallback callback) { - recoveredCursor(position, properties, null); + void initialize(PositionImpl position, Map properties, Map cursorProperties, + final VoidCallback callback) { + recoveredCursor(position, properties, cursorProperties, null); if (log.isDebugEnabled()) { log.debug("[{}] Consumer {} cursor initialized with counters: consumed {} mdPos {} rdPos {}", ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition); @@ -2392,6 +2455,7 @@ private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl positio .setLastActive(lastActive); // info.addAllProperties(buildPropertiesMap(properties)); + info.addAllCursorProperties(buildStringPropertiesMap(cursorProperties)); if (persistIndividualDeletedMessageRanges) { info.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges()); if (config.isDeletionAtBatchIndexLevelEnabled()) { @@ -2605,7 +2669,7 @@ private CompletableFuture deleteLedgerAsync(LedgerHandle ledgerHandle) { } - private List buildPropertiesMap(Map properties) { + private static List buildPropertiesMap(Map properties) { if (properties.isEmpty()) { return Collections.emptyList(); } @@ -2619,6 +2683,20 @@ private List buildPropertiesMap(Map properties) { return longProperties; } + private static List buildStringPropertiesMap(Map properties) { + if (properties == null || properties.isEmpty()) { + return Collections.emptyList(); + } + + List stringProperties = Lists.newArrayList(); + properties.forEach((name, value) -> { + StringProperty sp = StringProperty.newBuilder().setName(name).setValue(value).build(); + stringProperties.add(sp); + }); + + return stringProperties; + } + private List buildIndividualDeletedMessageRanges() { lock.readLock().lock(); try { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 277e4df0244b8..f228c32a90eba 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -845,11 +845,12 @@ public ManagedCursor openCursor(String cursorName) throws InterruptedException, @Override public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition) throws InterruptedException, ManagedLedgerException { - return openCursor(cursorName, initialPosition, Collections.emptyMap()); + return openCursor(cursorName, initialPosition, Collections.emptyMap(), Collections.emptyMap()); } @Override - public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, Map properties) + public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, Map properties, + Map cursorProperties) throws InterruptedException, ManagedLedgerException { final CountDownLatch counter = new CountDownLatch(1); class Result { @@ -858,7 +859,7 @@ class Result { } final Result result = new Result(); - asyncOpenCursor(cursorName, initialPosition, properties, new OpenCursorCallback() { + asyncOpenCursor(cursorName, initialPosition, properties, cursorProperties, new OpenCursorCallback() { @Override public void openCursorComplete(ManagedCursor cursor, Object ctx) { result.cursor = cursor; @@ -893,12 +894,14 @@ public void asyncOpenCursor(final String cursorName, final OpenCursorCallback ca @Override public void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition, final OpenCursorCallback callback, final Object ctx) { - this.asyncOpenCursor(cursorName, initialPosition, Collections.emptyMap(), callback, ctx); + this.asyncOpenCursor(cursorName, initialPosition, Collections.emptyMap(), Collections.emptyMap(), + callback, ctx); } @Override public synchronized void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition, - Map properties, final OpenCursorCallback callback, final Object ctx) { + Map properties, Map cursorProperties, + final OpenCursorCallback callback, final Object ctx) { try { checkManagedLedgerIsOpen(); checkFenced(); @@ -932,7 +935,7 @@ public synchronized void asyncOpenCursor(final String cursorName, final InitialP CompletableFuture cursorFuture = new CompletableFuture<>(); uninitializedCursors.put(cursorName, cursorFuture); PositionImpl position = InitialPosition.Earliest == initialPosition ? getFirstPosition() : getLastPosition(); - cursor.initialize(position, properties, new VoidCallback() { + cursor.initialize(position, properties, cursorProperties, new VoidCallback() { @Override public void operationComplete() { log.info("[{}] Opened new cursor: {}", name, cursor); diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto index 4671816c1a199..c4e502819fa9e 100644 --- a/managed-ledger/src/main/proto/MLDataFormats.proto +++ b/managed-ledger/src/main/proto/MLDataFormats.proto @@ -105,6 +105,11 @@ message LongProperty { required int64 value = 2; } +message StringProperty { + required string name = 1; + required string value = 2; +} + message ManagedCursorInfo { // If the ledger id is -1, then the mark-delete position is // the one from the (ledgerId, entryId) snapshot below @@ -123,6 +128,10 @@ message ManagedCursorInfo { // Store which index in the batch message has been deleted repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7; + + // Additional custom properties associated with + // the cursor + repeated StringProperty cursorProperties = 8; } enum CompressionType { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index a654b30e60b9e..05f34df47c1e9 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -67,6 +67,11 @@ public Map getProperties() { return Collections.emptyMap(); } + @Override + public Map getCursorProperties() { + return Collections.emptyMap(); + } + @Override public boolean putProperty(String key, Long value) { return false; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java index 727f3850ad287..74db9d791f3e3 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java @@ -23,6 +23,8 @@ import java.util.Collections; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; @@ -75,9 +77,15 @@ void testPropertiesClose() throws Exception { @Test(timeOut = 20000) void testPropertiesRecoveryAfterCrash() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig()); - ManagedCursor c1 = ledger.openCursor("c1"); + + Map cursorProperties = new TreeMap<>(); + cursorProperties.put("custom1", "one"); + cursorProperties.put("custom2", "two"); + + ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, Collections.emptyMap(), cursorProperties); assertEquals(c1.getProperties(), Collections.emptyMap()); + assertEquals(c1.getCursorProperties(), cursorProperties); ledger.addEntry("entry-1".getBytes()); ledger.addEntry("entry-2".getBytes()); @@ -99,6 +107,7 @@ void testPropertiesRecoveryAfterCrash() throws Exception { assertEquals(c1.getMarkDeletedPosition(), p3); assertEquals(c1.getProperties(), properties); + assertEquals(c1.getCursorProperties(), cursorProperties); factory2.shutdown(); } @@ -148,8 +157,13 @@ void testPropertiesAtCreation() throws Exception { properties.put("b", 2L); properties.put("c", 3L); - ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, properties); + Map cursorProperties = new TreeMap<>(); + cursorProperties.put("custom1", "one"); + cursorProperties.put("custom2", "two"); + + ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, properties, cursorProperties); assertEquals(c1.getProperties(), properties); + assertEquals(c1.getCursorProperties(), cursorProperties); ledger.addEntry("entry-1".getBytes()); @@ -160,6 +174,50 @@ void testPropertiesAtCreation() throws Exception { c1 = ledger.openCursor("c1"); assertEquals(c1.getProperties(), properties); + assertEquals(c1.getCursorProperties(), cursorProperties); } + @Test + void testUpdateCursorProperties() throws Exception { + ManagedLedger ledger = factory.open("testUpdateCursorProperties", new ManagedLedgerConfig()); + + Map properties = new TreeMap<>(); + properties.put("a", 1L); + + Map cursorProperties = new TreeMap<>(); + cursorProperties.put("custom1", "one"); + cursorProperties.put("custom2", "two"); + + ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, properties, cursorProperties); + assertEquals(c1.getProperties(), properties); + assertEquals(c1.getCursorProperties(), cursorProperties); + + ledger.addEntry("entry-1".getBytes()); + + Map cursorPropertiesUpdated = new TreeMap<>(); + cursorPropertiesUpdated.put("custom1", "three"); + cursorPropertiesUpdated.put("custom2", "four"); + + c1.setCursorProperties(cursorPropertiesUpdated).get(10, TimeUnit.SECONDS); + + ledger.close(); + + // Reopen the managed ledger + ledger = factory.open("testUpdateCursorProperties", new ManagedLedgerConfig()); + c1 = ledger.openCursor("c1"); + + assertEquals(c1.getProperties(), properties); + assertEquals(c1.getCursorProperties(), cursorPropertiesUpdated); + + // Create a new factory to force a managed ledger close and recovery + ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); + // Reopen the managed ledger + ledger = factory2.open("testUpdateCursorProperties", new ManagedLedgerConfig()); + c1 = ledger.openCursor("c1"); + + assertEquals(c1.getProperties(), properties); + assertEquals(c1.getCursorProperties(), cursorPropertiesUpdated); + + factory2.shutdown(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index b1ccb4d1eb0de..49b906b79594c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -103,6 +103,8 @@ default long getNumberOfEntriesDelayed() { Map getSubscriptionProperties(); + CompletableFuture updateSubscriptionProperties(Map subscriptionProperties); + default void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) { // Default is no-op } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index ae49b3623ca12..a9777f5dd0df2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -68,7 +68,7 @@ public class NonPersistentSubscription implements Subscription { private final LongAdder bytesOutFromRemovedConsumers = new LongAdder(); private final LongAdder msgOutFromRemovedConsumer = new LongAdder(); - private final Map subscriptionProperties; + private volatile Map subscriptionProperties; // If isDurable is false(such as a Reader), remove subscription from topic when closing this subscription. private final boolean isDurable; @@ -526,4 +526,15 @@ public void updateLastActive() { public Map getSubscriptionProperties() { return subscriptionProperties; } + + @Override + public CompletableFuture updateSubscriptionProperties(Map subscriptionProperties) { + if (subscriptionProperties == null || subscriptionProperties.isEmpty()) { + this.subscriptionProperties = Collections.emptyMap(); + } else { + this.subscriptionProperties = Collections.unmodifiableMap(subscriptionProperties); + } + return CompletableFuture.completedFuture(null); + } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index ba2982df6275e..363224b11343e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -114,7 +113,7 @@ public class PersistentSubscription implements Subscription { private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache; private final PendingAckHandle pendingAckHandle; - private Map subscriptionProperties; + private volatile Map subscriptionProperties; private final LongAdder bytesOutFromRemovedConsumers = new LongAdder(); private final LongAdder msgOutFromRemovedConsumer = new LongAdder(); @@ -137,7 +136,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma } public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor, - boolean replicated, Map subscriptionProperties) { + boolean replicated, Map subscriptionProperties) { this.topic = topic; this.cursor = cursor; this.topicName = topic.getName(); @@ -146,7 +145,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this); this.setReplicated(replicated); this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties) - ? new HashMap<>() : Collections.unmodifiableMap(subscriptionProperties); + ? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties); if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled() && !isEventSystemTopic(TopicName.get(topicName))) { this.pendingAckHandle = new PendingAckHandleImpl(this); @@ -1094,6 +1093,20 @@ public Map getSubscriptionProperties() { return subscriptionProperties; } + @Override + public CompletableFuture updateSubscriptionProperties(Map subscriptionProperties) { + Map newSubscriptionProperties; + if (subscriptionProperties == null || subscriptionProperties.isEmpty()) { + newSubscriptionProperties = Collections.emptyMap(); + } else { + newSubscriptionProperties = Collections.unmodifiableMap(subscriptionProperties); + } + return cursor.setCursorProperties(newSubscriptionProperties) + .thenRun(() -> { + this.subscriptionProperties = newSubscriptionProperties; + }); + } + /** * Return a merged map that contains the cursor properties specified by used * (eg. when using compaction subscription) and the subscription properties. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 30de90ac1a7e1..e0a019fac3ae6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -75,7 +75,6 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.net.BookieId; -import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.namespace.NamespaceService; @@ -279,7 +278,8 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS } else { final String subscriptionName = Codec.decode(cursor.getName()); subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor, - PersistentSubscription.isCursorFromReplicatedSubscription(cursor), null)); + PersistentSubscription.isCursorFromReplicatedSubscription(cursor), + cursor.getCursorProperties())); // subscription-cursor gets activated by default: deactivate as there is no active subscription right // now subscriptions.get(subscriptionName).deactivateCursor(); @@ -876,7 +876,8 @@ private CompletableFuture getDurableSubscription(String subscripti Map properties = PersistentSubscription.getBaseCursorProperties(replicated); - ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, new OpenCursorCallback() { + ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, subscriptionProperties, + new OpenCursorCallback() { @Override public void openCursorComplete(ManagedCursor cursor, Object ctx) { if (log.isDebugEnabled()) { @@ -896,11 +897,6 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) { return; } } - if (MapUtils.isEmpty(subscription.getSubscriptionProperties()) - && MapUtils.isNotEmpty(subscriptionProperties)) { - subscription.getSubscriptionProperties().putAll(subscriptionProperties); - } - if (replicated && !subscription.isReplicated()) { // Flip the subscription state subscription.setReplicated(replicated); @@ -979,11 +975,6 @@ private CompletableFuture getNonDurableSubscription(Stri return FutureUtil.failedFuture( new NotAllowedException("Durable subscription with the same name already exists.")); } - - if (MapUtils.isEmpty(subscription.getSubscriptionProperties()) - && MapUtils.isNotEmpty(subscriptionProperties)) { - subscription.getSubscriptionProperties().putAll(subscriptionProperties); - } } if (startMessageRollbackDurationSec > 0) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java index e0f5e0a77a946..12b742a01916f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/CreateSubscriptionTest.java @@ -213,10 +213,7 @@ public void addSubscriptionPropertiesTest(SubscriptionMode subscriptionMode) thr PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService() .getTopicReference(topic).get().getSubscription(subName); Map properties = subscription.getSubscriptionProperties(); - assertTrue(properties.containsKey("1")); - assertTrue(properties.containsKey("2")); - assertEquals(properties.get("1"), "1"); - assertEquals(properties.get("2"), "2"); + assertEquals(properties, map); // after updating mark delete position, the properties should still exist Producer producer = pulsarClient.newProducer().topic(topic).create(); @@ -232,10 +229,7 @@ public void addSubscriptionPropertiesTest(SubscriptionMode subscriptionMode) thr assertEquals(subscription.getCursor().getMarkDeletedPosition().getEntryId(), messageId.getEntryId()); }); properties = subscription.getSubscriptionProperties(); - assertTrue(properties.containsKey("1")); - assertTrue(properties.containsKey("2")); - assertEquals(properties.get("1"), "1"); - assertEquals(properties.get("2"), "2"); + assertEquals(properties, map); consumer.close(); producer.close(); @@ -249,10 +243,7 @@ public void addSubscriptionPropertiesTest(SubscriptionMode subscriptionMode) thr .getTopicReference(topic).get().getSubscription(subName); Awaitility.await().untilAsserted(() -> { Map properties2 = subscription2.getSubscriptionProperties(); - assertTrue(properties2.containsKey("1")); - assertTrue(properties2.containsKey("2")); - assertEquals(properties2.get("1"), "1"); - assertEquals(properties2.get("2"), "2"); + assertEquals(properties2, map); }); consumer2.close(); @@ -264,13 +255,11 @@ public void addSubscriptionPropertiesTest(SubscriptionMode subscriptionMode) thr .receiverQueueSize(1) .subscriptionProperties(map3).subscriptionName(subName).subscribe(); Map properties3 = subscription.getSubscriptionProperties(); - assertTrue(properties3.containsKey("1")); - assertTrue(properties3.containsKey("2")); - assertEquals(properties3.get("1"), "1"); - assertEquals(properties3.get("2"), "2"); + assertEquals(properties3, map); consumer3.close(); - //restart and create a new consumer with new properties, the new properties should be updated + //restart and create a new consumer with new properties, the new properties must not be updated + // for a Durable subscription, but for a NonDurable subscription we pick up the new values restartBroker(); Consumer consumer4 = pulsarClient.newConsumer().subscriptionMode(subscriptionMode) .topic(topic).receiverQueueSize(1) @@ -278,10 +267,12 @@ public void addSubscriptionPropertiesTest(SubscriptionMode subscriptionMode) thr PersistentSubscription subscription4 = (PersistentSubscription) pulsar.getBrokerService() .getTopicReference(topic).get().getSubscription(subName); Map properties4 = subscription4.getSubscriptionProperties(); - assertTrue(properties4.containsKey("3")); - assertTrue(properties4.containsKey("4")); - assertEquals(properties4.get("3"), "3"); - assertEquals(properties4.get("4"), "4"); + if (subscriptionMode == SubscriptionMode.Durable) { + assertEquals(properties4, map); + } else { + assertEquals(properties4, map3); + + } consumer4.close(); @@ -294,26 +285,28 @@ public void addSubscriptionPropertiesTest(SubscriptionMode subscriptionMode) thr .getTopicReference(topic).get().getSubscription(subName); properties4 = subscription4.getSubscriptionProperties(); if (subscriptionMode == SubscriptionMode.Durable) { - assertTrue(properties4.containsKey("3")); - assertTrue(properties4.containsKey("4")); - assertEquals(properties4.get("3"), "3"); - assertEquals(properties4.get("4"), "4"); + assertEquals(properties4, map); } else { assertTrue(properties4.isEmpty()); } consumer4.close(); - //restart broker, it won't get any properties + //restart broker, properties for Durable subscription are reloaded from Metadata restartBroker(); consumer4 = pulsarClient.newConsumer().topic(topic).subscriptionMode(subscriptionMode) .receiverQueueSize(1) .subscriptionName(subName).subscribe(); subscription4 = (PersistentSubscription) pulsar.getBrokerService() .getTopicReference(topic).get().getSubscription(subName); - assertEquals(subscription4.getSubscriptionProperties().size(), 0); + properties4 = subscription4.getSubscriptionProperties(); + if (subscriptionMode == SubscriptionMode.Durable) { + assertEquals(properties4, map); + } else { + assertTrue(properties4.isEmpty()); + } consumer4.close(); - //restart broker and create a new consumer with new properties, the properties will be updated + //restart broker and create a new consumer with new properties, the properties will not be updated restartBroker(); consumer4 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1) .subscriptionMode(subscriptionMode) @@ -321,16 +314,17 @@ public void addSubscriptionPropertiesTest(SubscriptionMode subscriptionMode) thr .subscriptionName(subName).subscribe(); PersistentSubscription subscription5 = (PersistentSubscription) pulsar.getBrokerService() .getTopicReference(topic).get().getSubscription(subName); - properties = subscription5.getSubscriptionProperties(); - assertTrue(properties.containsKey("1")); - assertTrue(properties.containsKey("2")); - assertEquals(properties.get("1"), "1"); - assertEquals(properties.get("2"), "2"); - consumer4.close(); + properties4 = subscription5.getSubscriptionProperties(); + + // for the NonDurable subscription here we have the same properties because they + // are sent by the Consumer + assertEquals(properties4, map); + consumer4.close(); String subNameShared = "my-sub-shared"; Map mapShared = new HashMap<>(); + mapShared.put("6", "7"); // open two consumers with a Shared Subscription Consumer consumerShared1 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1) .subscriptionMode(subscriptionMode) @@ -342,26 +336,25 @@ public void addSubscriptionPropertiesTest(SubscriptionMode subscriptionMode) thr properties = subscriptionShared.getSubscriptionProperties(); assertEquals(properties, mapShared); - // add a new consumer, the properties are updated because they were empty - mapShared = new HashMap<>(); - mapShared.put("6", "7"); - mapShared.put("8", "9"); + // add a new consumer, the properties are not updated + Map mapShared2 = new HashMap<>(); + mapShared2.put("8", "9"); Consumer consumerShared2 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1) .subscriptionMode(subscriptionMode) .subscriptionType(SubscriptionType.Shared) - .subscriptionProperties(mapShared) + .subscriptionProperties(mapShared2) .subscriptionName(subNameShared).subscribe(); properties = subscriptionShared.getSubscriptionProperties(); assertEquals(properties, mapShared); - // add a third consumer, the properties are NOT updated because they are not empty - Map mapShared2 = new HashMap<>(); - mapShared2.put("10", "11"); + // add a third consumer, the properties are NOT updated + Map mapShared3 = new HashMap<>(); + mapShared3.put("10", "11"); Consumer consumerShared3 = pulsarClient.newConsumer().topic(topic).receiverQueueSize(1) .subscriptionMode(subscriptionMode) .subscriptionType(SubscriptionType.Shared) - .subscriptionProperties(mapShared2) + .subscriptionProperties(mapShared3) .subscriptionName(subNameShared).subscribe(); properties = subscriptionShared.getSubscriptionProperties(); @@ -373,6 +366,65 @@ public void addSubscriptionPropertiesTest(SubscriptionMode subscriptionMode) thr consumerShared3.close(); } + @Test + public void subscriptionModePersistedTest() throws Exception { + String topic = "persistent://my-property/my-ns/topic" + UUID.randomUUID(); + admin.topics().createNonPartitionedTopic(topic); + Map map = new HashMap<>(); + map.put("1", "1"); + map.put("2", "2"); + String subName = "my-sub"; + pulsarClient.newConsumer() + .subscriptionMode(SubscriptionMode.Durable) + .topic(topic) + .subscriptionProperties(map) + .subscriptionName(subName) + .subscribe() + .close(); + PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService() + .getTopicReference(topic).get().getSubscription(subName); + Map properties = subscription.getSubscriptionProperties(); + assertTrue(properties.containsKey("1")); + assertTrue(properties.containsKey("2")); + assertEquals(properties.get("1"), "1"); + assertEquals(properties.get("2"), "2"); + + Map subscriptionPropertiesFromAdmin = + admin.topics().getStats(topic).getSubscriptions().get(subName).getSubscriptionProperties(); + assertEquals(map, subscriptionPropertiesFromAdmin); + + // unload the topic + admin.topics().unload(topic); + + // verify that the properties are still there + subscriptionPropertiesFromAdmin = + admin.topics().getStats(topic).getSubscriptions().get(subName).getSubscriptionProperties(); + assertEquals(map, subscriptionPropertiesFromAdmin); + + + // create a new subscription, initially properties are empty + String subName2 = "my-sub2"; + admin.topics().createSubscription(topic, subName2, MessageId.latest); + + subscriptionPropertiesFromAdmin = + admin.topics().getStats(topic).getSubscriptions().get(subName2).getSubscriptionProperties(); + assertTrue(subscriptionPropertiesFromAdmin.isEmpty()); + + // create a consumer, this is not allowed to update the properties + pulsarClient.newConsumer() + .subscriptionMode(SubscriptionMode.Durable) + .topic(topic) + .subscriptionProperties(map) + .subscriptionName(subName2) + .subscribe() + .close(); + + // verify that the properties are not changed + subscriptionPropertiesFromAdmin = + admin.topics().getStats(topic).getSubscriptions().get(subName2).getSubscriptionProperties(); + assertTrue(subscriptionPropertiesFromAdmin.isEmpty()); + } + @Test public void createSubscriptionBySpecifyingStringPosition() throws IOException, PulsarAdminException { final int numberOfMessages = 5; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 331772b7cdce9..a9559ac96b93d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -1561,11 +1561,11 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null); + ((OpenCursorCallback) invocationOnMock.getArguments()[4]).openCursorComplete(cursorMock, null); return null; } }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class), - any(OpenCursorCallback.class), any()); + any(Map.class), any(OpenCursorCallback.class), any()); doAnswer(new Answer() { @Override @@ -2214,9 +2214,9 @@ public void testGetDurableSubscription() throws Exception { return null; }).when(mockLedger).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), any()); doAnswer((Answer) invocationOnMock -> { - ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(mockCursor, null); + ((OpenCursorCallback) invocationOnMock.getArguments()[4]).openCursorComplete(mockCursor, null); return null; - }).when(mockLedger).asyncOpenCursor(any(), any(), any(), any(), any()); + }).when(mockLedger).asyncOpenCursor(any(), any(), any(), any(), any(), any()); PersistentTopic topic = new PersistentTopic(successTopicName, mockLedger, brokerService); CommandSubscribe cmd = new CommandSubscribe() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index e8fa8c6063923..cffc82bcf73c6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -1686,20 +1686,20 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { doAnswer((Answer) invocationOnMock -> { Thread.sleep(300); - ((OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null); + ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null); return null; }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), any()); doAnswer((Answer) invocationOnMock -> { Thread.sleep(300); - ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null); + ((OpenCursorCallback) invocationOnMock.getArguments()[4]).openCursorComplete(cursorMock, null); return null; - }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class), + }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class), any(Map.class), any(OpenCursorCallback.class), any()); doAnswer((Answer) invocationOnMock -> { Thread.sleep(300); - ((OpenCursorCallback) invocationOnMock.getArguments()[2]) + ((OpenCursorCallback) invocationOnMock.getArguments()[3]) .openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null); return null; }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(OpenCursorCallback.class), any()); @@ -1709,7 +1709,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { ((OpenCursorCallback) invocationOnMock.getArguments()[3]) .openCursorFailed(new ManagedLedgerException("Managed ledger failure"), null); return null; - }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(Map.class), + }).when(ledgerMock).asyncOpenCursor(matches(".*fail.*"), any(InitialPosition.class), any(Map.class), any(Map.class), any(OpenCursorCallback.class), any()); doAnswer((Answer) invocationOnMock -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java index 81ad811f43ce5..b2edbda885546 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java @@ -69,10 +69,13 @@ protected void cleanup() throws Exception { } public void testFilter() throws Exception { - + Map map = new HashMap<>(); + map.put("1","1"); + map.put("2","2"); String topic = "persistent://prop/ns-abc/topic" + UUID.randomUUID(); String subName = "sub"; Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic) + .subscriptionProperties(map) .subscriptionName(subName).subscribe(); // mock entry filters PersistentSubscription subscription = (PersistentSubscription) pulsar.getBrokerService() @@ -132,9 +135,6 @@ public void testFilter() throws Exception { }); consumer.close(); - Map map = new HashMap<>(); - map.put("1","1"); - map.put("2","2"); consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionProperties(map) .subscriptionName(subName).subscribe(); for (int i = 0; i < 10; i++) { diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java index 4ad872919e3f3..3eaf276c3c574 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java @@ -105,7 +105,8 @@ public ManagedCursor openCursor(String name, CommandSubscribe.InitialPosition in @Override public ManagedCursor openCursor(String name, CommandSubscribe.InitialPosition initialPosition, - Map properties) throws InterruptedException, ManagedLedgerException { + Map properties, Map cursorProperties) + throws InterruptedException, ManagedLedgerException { return null; } @@ -155,7 +156,8 @@ public void asyncOpenCursor(String name, CommandSubscribe.InitialPosition initia @Override public void asyncOpenCursor(String name, CommandSubscribe.InitialPosition initialPosition, - Map properties, AsyncCallbacks.OpenCursorCallback callback, Object ctx) { + Map properties, Map cursorProperties, + AsyncCallbacks.OpenCursorCallback callback, Object ctx) { }