Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 15750: PIP-105: Store Subscription properties #15757

Merged
merged 5 commits into from
Jun 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
Expand Down Expand Up @@ -79,6 +80,20 @@ enum IndividualDeletedEntries {
*/
Map<String, Long> getProperties();

/**
* Return any properties that were associated with the cursor.
*/
Map<String, String> getCursorProperties();

/**
* Updates the properties.
* @param cursorProperties
* @return a handle to the result of the operation
*/
default CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
return CompletableFuture.completedFuture(null);
}

/**
* Add a property associated with the last stored position.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,13 @@ ManagedCursor openCursor(String name, InitialPosition initialPosition) throws In
* @param properties
* user defined properties that will be attached to the first position of the cursor, if the open
* operation will trigger the creation of the cursor.
* @param cursorProperties
* the properties for the Cursor
* @return the ManagedCursor
* @throws ManagedLedgerException
*/
ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties)
ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties,
Map<String, String> cursorProperties)
throws InterruptedException, ManagedLedgerException;

/**
Expand Down Expand Up @@ -337,13 +340,15 @@ ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionNam
* @param initialPosition
* the cursor will be set at lastest position or not when first created
* default is <b>true</b>
* @param cursorProperties
* the properties for the Cursor
* @param callback
* callback object
* @param ctx
* opaque context
*/
void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, Long> properties,
OpenCursorCallback callback, Object ctx);
Map<String, String> cursorProperties, OpenCursorCallback callback, Object ctx);

/**
* Get a list of all the cursors reading from this ManagedLedger.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.StringProperty;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
Expand All @@ -116,6 +117,7 @@ public class ManagedCursorImpl implements ManagedCursor {
protected final ManagedLedgerConfig config;
protected final ManagedLedgerImpl ledger;
private final String name;
private volatile Map<String, String> cursorProperties;
private final BookKeeper.DigestType digestType;

protected volatile PositionImpl markDeletePosition;
Expand Down Expand Up @@ -280,6 +282,7 @@ public interface VoidCallback {

ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName) {
this.bookkeeper = bookkeeper;
this.cursorProperties = Collections.emptyMap();
this.config = config;
this.ledger = ledger;
this.name = cursorName;
Expand Down Expand Up @@ -313,6 +316,52 @@ public Map<String, Long> getProperties() {
return lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties : Collections.emptyMap();
}

@Override
public Map<String, String> getCursorProperties() {
return cursorProperties;
}

@Override
public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
@Override
public void operationComplete(ManagedCursorInfo info, Stat stat) {
ManagedCursorInfo copy = ManagedCursorInfo
.newBuilder(info)
.clearCursorProperties()
.addAllCursorProperties(buildStringPropertiesMap(cursorProperties))
.build();
ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
name, copy, stat, new MetaStoreCallback<>() {
@Override
public void operationComplete(Void result, Stat stat) {
log.info("[{}] Updated ledger cursor: {} properties {}", ledger.getName(),
name, cursorProperties);
ManagedCursorImpl.this.cursorProperties = cursorProperties;
cursorLedgerStat = stat;
updateCursorPropertiesResult.complete(result);
}

@Override
public void operationFailed(MetaStoreException e) {
log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(),
name, cursorProperties, e);
updateCursorPropertiesResult.completeExceptionally(e);
}
});
}

@Override
public void operationFailed(MetaStoreException e) {
log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(),
name, cursorProperties, e);
updateCursorPropertiesResult.completeExceptionally(e);
}
});
return updateCursorPropertiesResult;
}

@Override
public boolean putProperty(String key, Long value) {
if (lastMarkDeleteEntry != null) {
Expand Down Expand Up @@ -361,6 +410,18 @@ public void operationComplete(ManagedCursorInfo info, Stat stat) {
cursorLedgerStat = stat;
lastActive = info.getLastActive() != 0 ? info.getLastActive() : lastActive;


Map<String, String> recoveredCursorProperties = Collections.emptyMap();
if (info.getCursorPropertiesCount() > 0) {
// Recover properties map
recoveredCursorProperties = Maps.newHashMap();
eolivelli marked this conversation as resolved.
Show resolved Hide resolved
for (int i = 0; i < info.getCursorPropertiesCount(); i++) {
StringProperty property = info.getCursorProperties(i);
recoveredCursorProperties.put(property.getName(), property.getValue());
}
}
cursorProperties = recoveredCursorProperties;

if (info.getCursorsLedgerId() == -1L) {
// There is no cursor ledger to read the last position from. It means the cursor has been properly
// closed and the last mark-delete position is stored in the ManagedCursorInfo itself.
Expand All @@ -380,7 +441,7 @@ public void operationComplete(ManagedCursorInfo info, Stat stat) {
}
}

recoveredCursor(recoveredPosition, recoveredProperties, null);
recoveredCursor(recoveredPosition, recoveredProperties, recoveredCursorProperties, null);
callback.operationComplete();
} else {
// Need to proceed and read the last entry in the specified ledger to find out the last position
Expand Down Expand Up @@ -410,7 +471,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
log.error("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name,
BKException.getMessage(rc));
// Rewind to oldest entry available
initialize(getRollbackPosition(info), Collections.emptyMap(), callback);
initialize(getRollbackPosition(info), Collections.emptyMap(), Collections.emptyMap(), callback);
return;
} else if (rc != BKException.Code.OK) {
log.warn("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name,
Expand All @@ -426,7 +487,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
log.warn("[{}] Error reading from metadata ledger {} for consumer {}: No entries in ledger",
ledger.getName(), ledgerId, name);
// Rewind to last cursor snapshot available
initialize(getRollbackPosition(info), Collections.emptyMap(), callback);
initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
return;
}

Expand All @@ -438,7 +499,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
log.error("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));
// Rewind to oldest entry available
initialize(getRollbackPosition(info), Collections.emptyMap(), callback);
initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
return;
} else if (rc1 != BKException.Code.OK) {
log.warn("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(),
Expand Down Expand Up @@ -476,7 +537,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
&& positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) {
recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList());
}
recoveredCursor(position, recoveredProperties, lh);
recoveredCursor(position, recoveredProperties, cursorProperties, lh);
callback.operationComplete();
}, null);
};
Expand Down Expand Up @@ -547,6 +608,7 @@ private void recoverBatchDeletedIndexes (
}

private void recoveredCursor(PositionImpl position, Map<String, Long> properties,
Map<String, String> cursorProperties,
LedgerHandle recoveredFromCursorLedger) {
// if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty),
// we need to move to the next existing ledger
Expand All @@ -564,7 +626,7 @@ private void recoveredCursor(PositionImpl position, Map<String, Long> properties
position = ledger.getLastPosition();
}
log.info("[{}] Cursor {} recovered to position {}", ledger.getName(), name, position);

this.cursorProperties = cursorProperties;
messagesConsumedCounter = -getNumberOfEntries(Range.openClosed(position, ledger.getLastPosition()));
markDeletePosition = position;
persistentMarkDeletePosition = position;
Expand All @@ -577,8 +639,9 @@ private void recoveredCursor(PositionImpl position, Map<String, Long> properties
STATE_UPDATER.set(this, State.NoLedger);
}

void initialize(PositionImpl position, Map<String, Long> properties, final VoidCallback callback) {
recoveredCursor(position, properties, null);
void initialize(PositionImpl position, Map<String, Long> properties, Map<String, String> cursorProperties,
final VoidCallback callback) {
recoveredCursor(position, properties, cursorProperties, null);
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer {} cursor initialized with counters: consumed {} mdPos {} rdPos {}",
ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition);
Expand Down Expand Up @@ -2392,6 +2455,7 @@ private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl positio
.setLastActive(lastActive); //

info.addAllProperties(buildPropertiesMap(properties));
info.addAllCursorProperties(buildStringPropertiesMap(cursorProperties));
if (persistIndividualDeletedMessageRanges) {
info.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges());
if (config.isDeletionAtBatchIndexLevelEnabled()) {
Expand Down Expand Up @@ -2605,7 +2669,7 @@ private CompletableFuture<Void> deleteLedgerAsync(LedgerHandle ledgerHandle) {
}


private List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
private static List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
if (properties.isEmpty()) {
return Collections.emptyList();
}
Expand All @@ -2619,6 +2683,20 @@ private List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
return longProperties;
}

private static List<StringProperty> buildStringPropertiesMap(Map<String, String> properties) {
if (properties == null || properties.isEmpty()) {
return Collections.emptyList();
}

List<StringProperty> stringProperties = Lists.newArrayList();
eolivelli marked this conversation as resolved.
Show resolved Hide resolved
properties.forEach((name, value) -> {
StringProperty sp = StringProperty.newBuilder().setName(name).setValue(value).build();
stringProperties.add(sp);
});

return stringProperties;
}

private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
lock.readLock().lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -845,11 +845,12 @@ public ManagedCursor openCursor(String cursorName) throws InterruptedException,
@Override
public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition)
throws InterruptedException, ManagedLedgerException {
return openCursor(cursorName, initialPosition, Collections.emptyMap());
return openCursor(cursorName, initialPosition, Collections.emptyMap(), Collections.emptyMap());
eolivelli marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, Map<String, Long> properties)
public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, Map<String, Long> properties,
Map<String, String> cursorProperties)
throws InterruptedException, ManagedLedgerException {
final CountDownLatch counter = new CountDownLatch(1);
class Result {
Expand All @@ -858,7 +859,7 @@ class Result {
}
final Result result = new Result();

asyncOpenCursor(cursorName, initialPosition, properties, new OpenCursorCallback() {
asyncOpenCursor(cursorName, initialPosition, properties, cursorProperties, new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
result.cursor = cursor;
Expand Down Expand Up @@ -893,12 +894,14 @@ public void asyncOpenCursor(final String cursorName, final OpenCursorCallback ca
@Override
public void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition,
final OpenCursorCallback callback, final Object ctx) {
this.asyncOpenCursor(cursorName, initialPosition, Collections.emptyMap(), callback, ctx);
this.asyncOpenCursor(cursorName, initialPosition, Collections.emptyMap(), Collections.emptyMap(),
callback, ctx);
}

@Override
public synchronized void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition,
Map<String, Long> properties, final OpenCursorCallback callback, final Object ctx) {
Map<String, Long> properties, Map<String, String> cursorProperties,
final OpenCursorCallback callback, final Object ctx) {
try {
checkManagedLedgerIsOpen();
checkFenced();
Expand Down Expand Up @@ -932,7 +935,7 @@ public synchronized void asyncOpenCursor(final String cursorName, final InitialP
CompletableFuture<ManagedCursor> cursorFuture = new CompletableFuture<>();
uninitializedCursors.put(cursorName, cursorFuture);
PositionImpl position = InitialPosition.Earliest == initialPosition ? getFirstPosition() : getLastPosition();
cursor.initialize(position, properties, new VoidCallback() {
cursor.initialize(position, properties, cursorProperties, new VoidCallback() {
@Override
public void operationComplete() {
log.info("[{}] Opened new cursor: {}", name, cursor);
Expand Down
9 changes: 9 additions & 0 deletions managed-ledger/src/main/proto/MLDataFormats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ message LongProperty {
required int64 value = 2;
}

message StringProperty {
required string name = 1;
required string value = 2;
}

message ManagedCursorInfo {
// If the ledger id is -1, then the mark-delete position is
// the one from the (ledgerId, entryId) snapshot below
Expand All @@ -123,6 +128,10 @@ message ManagedCursorInfo {

// Store which index in the batch message has been deleted
repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7;

// Additional custom properties associated with
// the cursor
repeated StringProperty cursorProperties = 8;
}

enum CompressionType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public Map<String, Long> getProperties() {
return Collections.emptyMap();
}

@Override
public Map<String, String> getCursorProperties() {
return Collections.emptyMap();
}

@Override
public boolean putProperty(String key, Long value) {
return false;
Expand Down
Loading