Skip to content

Commit

Permalink
PIP-105: Store Subscription properties
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed May 24, 2022
1 parent 3dbf1f5 commit c7be86a
Show file tree
Hide file tree
Showing 11 changed files with 103 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ enum IndividualDeletedEntries {
*/
Map<String, Long> getProperties();

/**
* Return any properties that were associated with the cursor.
*/
Map<String, String> getCursorProperties();

/**
* Add a property associated with the last stored position.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ ManagedCursor openCursor(String name, InitialPosition initialPosition) throws In
* @return the ManagedCursor
* @throws ManagedLedgerException
*/
ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties)
ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties,
Map<String, String> ledgerProperties)
throws InterruptedException, ManagedLedgerException;

/**
Expand Down Expand Up @@ -343,7 +344,7 @@ ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionNam
* opaque context
*/
void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, Long> properties,
OpenCursorCallback callback, Object ctx);
Map<String, String> ledgerProperties, OpenCursorCallback callback, Object ctx);

/**
* Get a list of all the cursors reading from this ManagedLedger.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.StringProperty;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
Expand All @@ -116,6 +117,7 @@ public class ManagedCursorImpl implements ManagedCursor {
protected final ManagedLedgerConfig config;
protected final ManagedLedgerImpl ledger;
private final String name;
private Map<String, String> cursorProperties = Collections.emptyMap();
private final BookKeeper.DigestType digestType;

protected volatile PositionImpl markDeletePosition;
Expand Down Expand Up @@ -280,6 +282,7 @@ public interface VoidCallback {

ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName) {
this.bookkeeper = bookkeeper;
this.cursorProperties = Collections.emptyMap();
this.config = config;
this.ledger = ledger;
this.name = cursorName;
Expand Down Expand Up @@ -313,6 +316,11 @@ public Map<String, Long> getProperties() {
return lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties : Collections.emptyMap();
}

@Override
public Map<String, String> getCursorProperties() {
return cursorProperties;
}

@Override
public boolean putProperty(String key, Long value) {
if (lastMarkDeleteEntry != null) {
Expand Down Expand Up @@ -361,6 +369,18 @@ public void operationComplete(ManagedCursorInfo info, Stat stat) {
cursorLedgerStat = stat;
lastActive = info.getLastActive() != 0 ? info.getLastActive() : lastActive;


Map<String, String> recoveredCursorProperties = Collections.emptyMap();
if (info.getCursorPropertiesCount() > 0) {
// Recover properties map
recoveredCursorProperties = Maps.newHashMap();
for (int i = 0; i < info.getCursorPropertiesCount(); i++) {
StringProperty property = info.getCursorProperties(i);
recoveredCursorProperties.put(property.getName(), property.getValue());
}
}
cursorProperties = recoveredCursorProperties;

if (info.getCursorsLedgerId() == -1L) {
// There is no cursor ledger to read the last position from. It means the cursor has been properly
// closed and the last mark-delete position is stored in the ManagedCursorInfo itself.
Expand All @@ -380,7 +400,7 @@ public void operationComplete(ManagedCursorInfo info, Stat stat) {
}
}

recoveredCursor(recoveredPosition, recoveredProperties, null);
recoveredCursor(recoveredPosition, recoveredProperties, recoveredCursorProperties, null);
callback.operationComplete();
} else {
// Need to proceed and read the last entry in the specified ledger to find out the last position
Expand Down Expand Up @@ -410,7 +430,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
log.error("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name,
BKException.getMessage(rc));
// Rewind to oldest entry available
initialize(getRollbackPosition(info), Collections.emptyMap(), callback);
initialize(getRollbackPosition(info), Collections.emptyMap(), Collections.emptyMap(), callback);
return;
} else if (rc != BKException.Code.OK) {
log.warn("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name,
Expand All @@ -426,7 +446,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
log.warn("[{}] Error reading from metadata ledger {} for consumer {}: No entries in ledger",
ledger.getName(), ledgerId, name);
// Rewind to last cursor snapshot available
initialize(getRollbackPosition(info), Collections.emptyMap(), callback);
initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
return;
}

Expand All @@ -438,7 +458,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
log.error("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));
// Rewind to oldest entry available
initialize(getRollbackPosition(info), Collections.emptyMap(), callback);
initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
return;
} else if (rc1 != BKException.Code.OK) {
log.warn("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(),
Expand Down Expand Up @@ -476,7 +496,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
&& positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) {
recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList());
}
recoveredCursor(position, recoveredProperties, lh);
recoveredCursor(position, recoveredProperties, cursorProperties, lh);
callback.operationComplete();
}, null);
};
Expand Down Expand Up @@ -547,6 +567,7 @@ private void recoverBatchDeletedIndexes (
}

private void recoveredCursor(PositionImpl position, Map<String, Long> properties,
Map<String, String> cursorProperties,
LedgerHandle recoveredFromCursorLedger) {
// if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty),
// we need to move to the next existing ledger
Expand All @@ -564,7 +585,7 @@ private void recoveredCursor(PositionImpl position, Map<String, Long> properties
position = ledger.getLastPosition();
}
log.info("[{}] Cursor {} recovered to position {}", ledger.getName(), name, position);

this.cursorProperties = cursorProperties;
messagesConsumedCounter = -getNumberOfEntries(Range.openClosed(position, ledger.getLastPosition()));
markDeletePosition = position;
persistentMarkDeletePosition = position;
Expand All @@ -577,8 +598,9 @@ private void recoveredCursor(PositionImpl position, Map<String, Long> properties
STATE_UPDATER.set(this, State.NoLedger);
}

void initialize(PositionImpl position, Map<String, Long> properties, final VoidCallback callback) {
recoveredCursor(position, properties, null);
void initialize(PositionImpl position, Map<String, Long> properties, Map<String, String> cursorProperties,
final VoidCallback callback) {
recoveredCursor(position, properties, cursorProperties, null);
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer {} cursor initialized with counters: consumed {} mdPos {} rdPos {}",
ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition);
Expand Down Expand Up @@ -2392,6 +2414,7 @@ private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl positio
.setLastActive(lastActive); //

info.addAllProperties(buildPropertiesMap(properties));
info.addAllCursorProperties(buildStringPropertiesMap(cursorProperties));
if (persistIndividualDeletedMessageRanges) {
info.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges());
if (config.isDeletionAtBatchIndexLevelEnabled()) {
Expand Down Expand Up @@ -2605,7 +2628,7 @@ private CompletableFuture<Void> deleteLedgerAsync(LedgerHandle ledgerHandle) {
}


private List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
private static List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
if (properties.isEmpty()) {
return Collections.emptyList();
}
Expand All @@ -2619,6 +2642,20 @@ private List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
return longProperties;
}

private static List<StringProperty> buildStringPropertiesMap(Map<String, String> properties) {
if (properties == null || properties.isEmpty()) {
return Collections.emptyList();
}

List<StringProperty> stringProperties = Lists.newArrayList();
properties.forEach((name, value) -> {
StringProperty sp = StringProperty.newBuilder().setName(name).setValue(value).build();
stringProperties.add(sp);
});

return stringProperties;
}

private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
lock.readLock().lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -845,11 +845,12 @@ public ManagedCursor openCursor(String cursorName) throws InterruptedException,
@Override
public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition)
throws InterruptedException, ManagedLedgerException {
return openCursor(cursorName, initialPosition, Collections.emptyMap());
return openCursor(cursorName, initialPosition, Collections.emptyMap(), Collections.emptyMap());
}

@Override
public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, Map<String, Long> properties)
public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, Map<String, Long> properties,
Map<String, String> ledgerProperties)
throws InterruptedException, ManagedLedgerException {
final CountDownLatch counter = new CountDownLatch(1);
class Result {
Expand All @@ -858,7 +859,7 @@ class Result {
}
final Result result = new Result();

asyncOpenCursor(cursorName, initialPosition, properties, new OpenCursorCallback() {
asyncOpenCursor(cursorName, initialPosition, properties, ledgerProperties, new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
result.cursor = cursor;
Expand Down Expand Up @@ -893,12 +894,14 @@ public void asyncOpenCursor(final String cursorName, final OpenCursorCallback ca
@Override
public void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition,
final OpenCursorCallback callback, final Object ctx) {
this.asyncOpenCursor(cursorName, initialPosition, Collections.emptyMap(), callback, ctx);
this.asyncOpenCursor(cursorName, initialPosition, Collections.emptyMap(), Collections.emptyMap(),
callback, ctx);
}

@Override
public synchronized void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition,
Map<String, Long> properties, final OpenCursorCallback callback, final Object ctx) {
Map<String, Long> properties, Map<String, String> cursorProperties,
final OpenCursorCallback callback, final Object ctx) {
try {
checkManagedLedgerIsOpen();
checkFenced();
Expand Down Expand Up @@ -932,7 +935,7 @@ public synchronized void asyncOpenCursor(final String cursorName, final InitialP
CompletableFuture<ManagedCursor> cursorFuture = new CompletableFuture<>();
uninitializedCursors.put(cursorName, cursorFuture);
PositionImpl position = InitialPosition.Earliest == initialPosition ? getFirstPosition() : getLastPosition();
cursor.initialize(position, properties, new VoidCallback() {
cursor.initialize(position, properties, cursorProperties, new VoidCallback() {
@Override
public void operationComplete() {
log.info("[{}] Opened new cursor: {}", name, cursor);
Expand Down
9 changes: 9 additions & 0 deletions managed-ledger/src/main/proto/MLDataFormats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ message LongProperty {
required int64 value = 2;
}

message StringProperty {
required string name = 1;
required string value = 2;
}

message ManagedCursorInfo {
// If the ledger id is -1, then the mark-delete position is
// the one from the (ledgerId, entryId) snapshot below
Expand All @@ -123,6 +128,10 @@ message ManagedCursorInfo {

// Store which index in the batch message has been deleted
repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7;

// Additional custom properties associated with
// the cursor
repeated StringProperty cursorProperties = 8;
}

enum CompressionType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public Map<String, Long> getProperties() {
return Collections.emptyMap();
}

@Override
public Map<String, String> getCursorProperties() {
return Collections.emptyMap();
}

@Override
public boolean putProperty(String key, Long value) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,15 @@ void testPropertiesClose() throws Exception {
@Test(timeOut = 20000)
void testPropertiesRecoveryAfterCrash() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());
ManagedCursor c1 = ledger.openCursor("c1");

Map<String, String> cursorProperties = new TreeMap<>();
cursorProperties.put("custom1", "one");
cursorProperties.put("custom2", "two");

ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, Collections.emptyMap(), cursorProperties);

assertEquals(c1.getProperties(), Collections.emptyMap());
assertEquals(c1.getCursorProperties(), cursorProperties);

ledger.addEntry("entry-1".getBytes());
ledger.addEntry("entry-2".getBytes());
Expand All @@ -99,6 +105,7 @@ void testPropertiesRecoveryAfterCrash() throws Exception {

assertEquals(c1.getMarkDeletedPosition(), p3);
assertEquals(c1.getProperties(), properties);
assertEquals(c1.getCursorProperties(), cursorProperties);

factory2.shutdown();
}
Expand Down Expand Up @@ -148,8 +155,13 @@ void testPropertiesAtCreation() throws Exception {
properties.put("b", 2L);
properties.put("c", 3L);

ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, properties);
Map<String, String> cursorProperties = new TreeMap<>();
cursorProperties.put("custom1", "one");
cursorProperties.put("custom2", "two");

ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, properties, cursorProperties);
assertEquals(c1.getProperties(), properties);
assertEquals(c1.getCursorProperties(), cursorProperties);

ledger.addEntry("entry-1".getBytes());

Expand All @@ -160,6 +172,7 @@ void testPropertiesAtCreation() throws Exception {
c1 = ledger.openCursor("c1");

assertEquals(c1.getProperties(), properties);
assertEquals(c1.getCursorProperties(), cursorProperties);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
} else {
final String subscriptionName = Codec.decode(cursor.getName());
subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor,
PersistentSubscription.isCursorFromReplicatedSubscription(cursor), null));
PersistentSubscription.isCursorFromReplicatedSubscription(cursor), cursor.getCursorProperties()));
// subscription-cursor gets activated by default: deactivate as there is no active subscription right
// now
subscriptions.get(subscriptionName).deactivateCursor();
Expand Down Expand Up @@ -878,7 +878,8 @@ private CompletableFuture<Subscription> getDurableSubscription(String subscripti

Map<String, Long> properties = PersistentSubscription.getBaseCursorProperties(replicated);

ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, new OpenCursorCallback() {
ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, subscriptionProperties,
new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1565,7 +1565,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
return null;
}
}).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class),
any(OpenCursorCallback.class), any());
any(Map.class), any(OpenCursorCallback.class), any());

doAnswer(new Answer<Object>() {
@Override
Expand Down Expand Up @@ -2216,7 +2216,7 @@ public void testGetDurableSubscription() throws Exception {
doAnswer((Answer<Object>) invocationOnMock -> {
((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(mockCursor, null);
return null;
}).when(mockLedger).asyncOpenCursor(any(), any(), any(), any(), any());
}).when(mockLedger).asyncOpenCursor(any(), any(), any(), any(), any(), any());
PersistentTopic topic = new PersistentTopic(successTopicName, mockLedger, brokerService);

CommandSubscribe cmd = new CommandSubscribe()
Expand Down
Loading

0 comments on commit c7be86a

Please sign in to comment.