Skip to content

Commit

Permalink
Merge branch 'fix/pip-105-store-properties' into impl/pip-105-update-…
Browse files Browse the repository at this point in the history
…subscription-properties
  • Loading branch information
eolivelli committed May 25, 2022
2 parents 8aa88c6 + 72430a7 commit 77cdfcb
Show file tree
Hide file tree
Showing 15 changed files with 317 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
Expand Down Expand Up @@ -79,6 +80,20 @@ enum IndividualDeletedEntries {
*/
Map<String, Long> getProperties();

/**
* Return any properties that were associated with the cursor.
*/
Map<String, String> getCursorProperties();

/**
* Updates the properties.
* @param cursorProperties
* @return a handle to the result of the operation
*/
default CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
return CompletableFuture.completedFuture(null);
}

/**
* Add a property associated with the last stored position.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,13 @@ ManagedCursor openCursor(String name, InitialPosition initialPosition) throws In
* @param properties
* user defined properties that will be attached to the first position of the cursor, if the open
* operation will trigger the creation of the cursor.
* @param cursorProperties
* the properties for the Cursor
* @return the ManagedCursor
* @throws ManagedLedgerException
*/
ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties)
ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties,
Map<String, String> cursorProperties)
throws InterruptedException, ManagedLedgerException;

/**
Expand Down Expand Up @@ -337,13 +340,15 @@ ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionNam
* @param initialPosition
* the cursor will be set at lastest position or not when first created
* default is <b>true</b>
* @param cursorProperties
* the properties for the Cursor
* @param callback
* callback object
* @param ctx
* opaque context
*/
void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, Long> properties,
OpenCursorCallback callback, Object ctx);
Map<String, String> cursorProperties, OpenCursorCallback callback, Object ctx);

/**
* Get a list of all the cursors reading from this ManagedLedger.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.StringProperty;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
Expand All @@ -116,6 +117,7 @@ public class ManagedCursorImpl implements ManagedCursor {
protected final ManagedLedgerConfig config;
protected final ManagedLedgerImpl ledger;
private final String name;
private volatile Map<String, String> cursorProperties;
private final BookKeeper.DigestType digestType;

protected volatile PositionImpl markDeletePosition;
Expand Down Expand Up @@ -280,6 +282,7 @@ public interface VoidCallback {

ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName) {
this.bookkeeper = bookkeeper;
this.cursorProperties = Collections.emptyMap();
this.config = config;
this.ledger = ledger;
this.name = cursorName;
Expand Down Expand Up @@ -313,6 +316,52 @@ public Map<String, Long> getProperties() {
return lastMarkDeleteEntry != null ? lastMarkDeleteEntry.properties : Collections.emptyMap();
}

@Override
public Map<String, String> getCursorProperties() {
return cursorProperties;
}

@Override
public CompletableFuture<Void> setCursorProperties(Map<String, String> cursorProperties) {
CompletableFuture<Void> updateCursorPropertiesResult = new CompletableFuture<>();
ledger.getStore().asyncGetCursorInfo(ledger.getName(), name, new MetaStoreCallback<>() {
@Override
public void operationComplete(ManagedCursorInfo info, Stat stat) {
ManagedCursorInfo copy = ManagedCursorInfo
.newBuilder(info)
.clearCursorProperties()
.addAllCursorProperties(buildStringPropertiesMap(cursorProperties))
.build();
ledger.getStore().asyncUpdateCursorInfo(ledger.getName(),
name, copy, stat, new MetaStoreCallback<>() {
@Override
public void operationComplete(Void result, Stat stat) {
log.info("[{}] Updated ledger cursor: {} properties {}", ledger.getName(),
name, cursorProperties);
ManagedCursorImpl.this.cursorProperties = cursorProperties;
cursorLedgerStat = stat;
updateCursorPropertiesResult.complete(result);
}

@Override
public void operationFailed(MetaStoreException e) {
log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(),
name, cursorProperties, e);
updateCursorPropertiesResult.completeExceptionally(e);
}
});
}

@Override
public void operationFailed(MetaStoreException e) {
log.error("[{}] Error while updating ledger cursor: {} properties {}", ledger.getName(),
name, cursorProperties, e);
updateCursorPropertiesResult.completeExceptionally(e);
}
});
return updateCursorPropertiesResult;
}

@Override
public boolean putProperty(String key, Long value) {
if (lastMarkDeleteEntry != null) {
Expand Down Expand Up @@ -361,6 +410,18 @@ public void operationComplete(ManagedCursorInfo info, Stat stat) {
cursorLedgerStat = stat;
lastActive = info.getLastActive() != 0 ? info.getLastActive() : lastActive;


Map<String, String> recoveredCursorProperties = Collections.emptyMap();
if (info.getCursorPropertiesCount() > 0) {
// Recover properties map
recoveredCursorProperties = Maps.newHashMap();
for (int i = 0; i < info.getCursorPropertiesCount(); i++) {
StringProperty property = info.getCursorProperties(i);
recoveredCursorProperties.put(property.getName(), property.getValue());
}
}
cursorProperties = recoveredCursorProperties;

if (info.getCursorsLedgerId() == -1L) {
// There is no cursor ledger to read the last position from. It means the cursor has been properly
// closed and the last mark-delete position is stored in the ManagedCursorInfo itself.
Expand All @@ -380,7 +441,7 @@ public void operationComplete(ManagedCursorInfo info, Stat stat) {
}
}

recoveredCursor(recoveredPosition, recoveredProperties, null);
recoveredCursor(recoveredPosition, recoveredProperties, recoveredCursorProperties, null);
callback.operationComplete();
} else {
// Need to proceed and read the last entry in the specified ledger to find out the last position
Expand Down Expand Up @@ -410,7 +471,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
log.error("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name,
BKException.getMessage(rc));
// Rewind to oldest entry available
initialize(getRollbackPosition(info), Collections.emptyMap(), callback);
initialize(getRollbackPosition(info), Collections.emptyMap(), Collections.emptyMap(), callback);
return;
} else if (rc != BKException.Code.OK) {
log.warn("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name,
Expand All @@ -426,7 +487,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
log.warn("[{}] Error reading from metadata ledger {} for consumer {}: No entries in ledger",
ledger.getName(), ledgerId, name);
// Rewind to last cursor snapshot available
initialize(getRollbackPosition(info), Collections.emptyMap(), callback);
initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
return;
}

Expand All @@ -438,7 +499,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
log.error("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));
// Rewind to oldest entry available
initialize(getRollbackPosition(info), Collections.emptyMap(), callback);
initialize(getRollbackPosition(info), Collections.emptyMap(), cursorProperties, callback);
return;
} else if (rc1 != BKException.Code.OK) {
log.warn("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(),
Expand Down Expand Up @@ -476,7 +537,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
&& positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) {
recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList());
}
recoveredCursor(position, recoveredProperties, lh);
recoveredCursor(position, recoveredProperties, cursorProperties, lh);
callback.operationComplete();
}, null);
};
Expand Down Expand Up @@ -547,6 +608,7 @@ private void recoverBatchDeletedIndexes (
}

private void recoveredCursor(PositionImpl position, Map<String, Long> properties,
Map<String, String> cursorProperties,
LedgerHandle recoveredFromCursorLedger) {
// if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty),
// we need to move to the next existing ledger
Expand All @@ -564,7 +626,7 @@ private void recoveredCursor(PositionImpl position, Map<String, Long> properties
position = ledger.getLastPosition();
}
log.info("[{}] Cursor {} recovered to position {}", ledger.getName(), name, position);

this.cursorProperties = cursorProperties;
messagesConsumedCounter = -getNumberOfEntries(Range.openClosed(position, ledger.getLastPosition()));
markDeletePosition = position;
persistentMarkDeletePosition = position;
Expand All @@ -577,8 +639,9 @@ private void recoveredCursor(PositionImpl position, Map<String, Long> properties
STATE_UPDATER.set(this, State.NoLedger);
}

void initialize(PositionImpl position, Map<String, Long> properties, final VoidCallback callback) {
recoveredCursor(position, properties, null);
void initialize(PositionImpl position, Map<String, Long> properties, Map<String, String> cursorProperties,
final VoidCallback callback) {
recoveredCursor(position, properties, cursorProperties, null);
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer {} cursor initialized with counters: consumed {} mdPos {} rdPos {}",
ledger.getName(), name, messagesConsumedCounter, markDeletePosition, readPosition);
Expand Down Expand Up @@ -2392,6 +2455,7 @@ private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl positio
.setLastActive(lastActive); //

info.addAllProperties(buildPropertiesMap(properties));
info.addAllCursorProperties(buildStringPropertiesMap(cursorProperties));
if (persistIndividualDeletedMessageRanges) {
info.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges());
if (config.isDeletionAtBatchIndexLevelEnabled()) {
Expand Down Expand Up @@ -2605,7 +2669,7 @@ private CompletableFuture<Void> deleteLedgerAsync(LedgerHandle ledgerHandle) {
}


private List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
private static List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
if (properties.isEmpty()) {
return Collections.emptyList();
}
Expand All @@ -2619,6 +2683,20 @@ private List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
return longProperties;
}

private static List<StringProperty> buildStringPropertiesMap(Map<String, String> properties) {
if (properties == null || properties.isEmpty()) {
return Collections.emptyList();
}

List<StringProperty> stringProperties = Lists.newArrayList();
properties.forEach((name, value) -> {
StringProperty sp = StringProperty.newBuilder().setName(name).setValue(value).build();
stringProperties.add(sp);
});

return stringProperties;
}

private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
lock.readLock().lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -845,11 +845,12 @@ public ManagedCursor openCursor(String cursorName) throws InterruptedException,
@Override
public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition)
throws InterruptedException, ManagedLedgerException {
return openCursor(cursorName, initialPosition, Collections.emptyMap());
return openCursor(cursorName, initialPosition, Collections.emptyMap(), Collections.emptyMap());
}

@Override
public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, Map<String, Long> properties)
public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition, Map<String, Long> properties,
Map<String, String> cursorProperties)
throws InterruptedException, ManagedLedgerException {
final CountDownLatch counter = new CountDownLatch(1);
class Result {
Expand All @@ -858,7 +859,7 @@ class Result {
}
final Result result = new Result();

asyncOpenCursor(cursorName, initialPosition, properties, new OpenCursorCallback() {
asyncOpenCursor(cursorName, initialPosition, properties, cursorProperties, new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
result.cursor = cursor;
Expand Down Expand Up @@ -893,12 +894,14 @@ public void asyncOpenCursor(final String cursorName, final OpenCursorCallback ca
@Override
public void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition,
final OpenCursorCallback callback, final Object ctx) {
this.asyncOpenCursor(cursorName, initialPosition, Collections.emptyMap(), callback, ctx);
this.asyncOpenCursor(cursorName, initialPosition, Collections.emptyMap(), Collections.emptyMap(),
callback, ctx);
}

@Override
public synchronized void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition,
Map<String, Long> properties, final OpenCursorCallback callback, final Object ctx) {
Map<String, Long> properties, Map<String, String> cursorProperties,
final OpenCursorCallback callback, final Object ctx) {
try {
checkManagedLedgerIsOpen();
checkFenced();
Expand Down Expand Up @@ -932,7 +935,7 @@ public synchronized void asyncOpenCursor(final String cursorName, final InitialP
CompletableFuture<ManagedCursor> cursorFuture = new CompletableFuture<>();
uninitializedCursors.put(cursorName, cursorFuture);
PositionImpl position = InitialPosition.Earliest == initialPosition ? getFirstPosition() : getLastPosition();
cursor.initialize(position, properties, new VoidCallback() {
cursor.initialize(position, properties, cursorProperties, new VoidCallback() {
@Override
public void operationComplete() {
log.info("[{}] Opened new cursor: {}", name, cursor);
Expand Down
9 changes: 9 additions & 0 deletions managed-ledger/src/main/proto/MLDataFormats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ message LongProperty {
required int64 value = 2;
}

message StringProperty {
required string name = 1;
required string value = 2;
}

message ManagedCursorInfo {
// If the ledger id is -1, then the mark-delete position is
// the one from the (ledgerId, entryId) snapshot below
Expand All @@ -123,6 +128,10 @@ message ManagedCursorInfo {

// Store which index in the batch message has been deleted
repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7;

// Additional custom properties associated with
// the cursor
repeated StringProperty cursorProperties = 8;
}

enum CompressionType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public Map<String, Long> getProperties() {
return Collections.emptyMap();
}

@Override
public Map<String, String> getCursorProperties() {
return Collections.emptyMap();
}

@Override
public boolean putProperty(String key, Long value) {
return false;
Expand Down
Loading

0 comments on commit 77cdfcb

Please sign in to comment.