Skip to content

Commit

Permalink
[fix] [broker] Make ManagedLedger and ManagedCursor suit the custom p…
Browse files Browse the repository at this point in the history
…lugin. (apache#268)(apache#276)
  • Loading branch information
horizonzy authored and lhotari committed Aug 6, 2024
1 parent bc9e8a0 commit a78cc75
Show file tree
Hide file tree
Showing 51 changed files with 1,145 additions and 664 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Optional;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.impl.ReadOnlyManagedLedgerImpl;

/**
* Definition of all the callbacks used for the ManagedLedger asynchronous API.
Expand All @@ -48,7 +47,7 @@ interface OpenReadOnlyCursorCallback {
}

interface OpenReadOnlyManagedLedgerCallback {
void openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedgerImpl managedLedger, Object ctx);
void openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedger managedLedger, Object ctx);

void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Object ctx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;

/**
* A ManagedCursor is a persisted cursor inside a ManagedLedger.
Expand All @@ -45,6 +46,8 @@
@InterfaceStability.Stable
public interface ManagedCursor {

String CURSOR_INTERNAL_PROPERTY_PREFIX = "#pulsar.internal.";

@SuppressWarnings("checkstyle:javadoctype")
enum FindPositionConstraint {
SearchActiveEntries, SearchAllAvailableEntries
Expand Down Expand Up @@ -885,4 +888,16 @@ default boolean periodicRollover() {
default ManagedCursorAttributes getManagedCursorAttributes() {
return new ManagedCursorAttributes(this);
}

ManagedLedgerInternalStats.CursorStats getCursorStats();

boolean isMessageDeleted(Position position);

ManagedCursor duplicateNonDurableCursor(String nonDurableCursorName) throws ManagedLedgerException;

long[] getBatchPositionAckSet(Position position);

int applyMaxSizeCap(int maxEntries, long maxSizeBytes);

void updateReadStats(int readEntriesCount, long readEntriesSize);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
*/
package org.apache.bookkeeper.mledger;

import com.google.common.collect.Range;
import io.netty.buffer.ByteBuf;
import java.util.Collections;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
Expand All @@ -35,6 +39,7 @@
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;

/**
Expand Down Expand Up @@ -374,6 +379,8 @@ void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, L
*/
long getNumberOfEntries();

long getNumberOfEntries(Range<Position> range);

/**
* Get the total number of active entries for this managed ledger.
*
Expand Down Expand Up @@ -703,4 +710,40 @@ default void skipNonRecoverableLedger(long ledgerId){}
default ManagedLedgerAttributes getManagedLedgerAttributes() {
return new ManagedLedgerAttributes(this);
}

void asyncReadEntry(Position position, AsyncCallbacks.ReadEntryCallback callback, Object ctx);

/**
* Get all the managed ledgers.
*/
NavigableMap<Long, LedgerInfo> getLedgersInfo();

Position getNextValidPosition(Position position);

Position getPreviousPosition(Position position);

long getEstimatedBacklogSize(Position position);

Position getPositionAfterN(Position startPosition, long n, PositionBound startRange);

int getPendingAddEntriesCount();

long getCacheSize();

CompletableFuture<String> getLedgerMetadata(long ledgerId);

ManagedLedgerInternalStats getInternalStats();

default CompletableFuture<Set<String>> getLedgerLocations(long ledgerId) {
return CompletableFuture.completedFuture(Collections.emptySet());
}

default CompletableFuture<Position> getLastDispatchablePosition(final Predicate<Entry> predicate,
final Position startPosition) {
return CompletableFuture.completedFuture(PositionFactory.EARLIEST);
}

void dropBacklogForTimeLimit(BacklogQuota quota);

Position getFirstPosition();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenReadOnlyCursorCallback;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;

/**
* A factory to open/create managed ledgers and delete them.
Expand Down Expand Up @@ -233,4 +235,14 @@ void asyncDelete(String name, CompletableFuture<ManagedLedgerConfig> mlConfigFut
* @return properties of this managedLedger.
*/
CompletableFuture<Map<String, String>> getManagedLedgerPropertiesAsync(String name);

Map<String, ManagedLedger> getManagedLedgers();

ManagedLedgerFactoryMXBean getCacheStats();


void estimateUnloadedTopicBacklog(PersistentOfflineTopicStats offlineTopicStats, TopicName topicName,
boolean accurate, Object ctx) throws Exception;

ManagedLedgerFactoryConfig getConfig();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

public enum PositionBound {
// define boundaries for position based seeks and searches
startIncluded, startExcluded
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

import java.util.Map;

public interface ReadOnlyManagedLedger {

void asyncReadEntry(Position position, AsyncCallbacks.ReadEntryCallback callback, Object ctx);

long getNumberOfEntries();

ReadOnlyCursor createReadOnlyCursor(Position position);

Map<String, String> getProperties();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.MetaStore;
import org.apache.bookkeeper.mledger.impl.ReadOnlyManagedLedgerImpl;

public class ReadOnlyManagedLedgerImplWrapper implements ReadOnlyManagedLedger {

private final ReadOnlyManagedLedgerImpl readOnlyManagedLedger;

public ReadOnlyManagedLedgerImplWrapper(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store,
ManagedLedgerConfig config, OrderedScheduler scheduledExecutor,
String name) {
this.readOnlyManagedLedger =
new ReadOnlyManagedLedgerImpl(factory, bookKeeper, store, config, scheduledExecutor, name);
}

public CompletableFuture<Void> initialize() {
return readOnlyManagedLedger.initialize();
}

@Override
public void asyncReadEntry(Position position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
readOnlyManagedLedger.asyncReadEntry(position, callback, ctx);
}

@Override
public long getNumberOfEntries() {
return readOnlyManagedLedger.getNumberOfEntries();
}

@Override
public ReadOnlyCursor createReadOnlyCursor(Position position) {
return readOnlyManagedLedger.createReadOnlyCursor(position);
}

@Override
public Map<String, String> getProperties() {
return readOnlyManagedLedger.getProperties();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionBound;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.ScanOutcome;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
Expand All @@ -98,6 +98,8 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.StringProperty;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
Expand All @@ -124,8 +126,6 @@ public class ManagedCursorImpl implements ManagedCursor {
protected final ManagedLedgerImpl ledger;
private final String name;

public static final String CURSOR_INTERNAL_PROPERTY_PREFIX = "#pulsar.internal.";

private volatile Map<String, String> cursorProperties;
private final BookKeeper.DigestType digestType;

Expand Down Expand Up @@ -1769,7 +1769,7 @@ public void asyncSkipEntries(int numEntriesToSkip, IndividualDeletedEntries dele
}

asyncMarkDelete(ledger.getPositionAfterN(markDeletePosition, numEntriesToSkip + numDeletedMessages,
ManagedLedgerImpl.PositionBound.startExcluded), new MarkDeleteCallback() {
PositionBound.startExcluded), new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
callback.skipEntriesComplete(ctx);
Expand Down Expand Up @@ -3485,6 +3485,7 @@ public Position processIndividuallyDeletedMessagesAndGetMarkDeletedPosition(
return mdp;
}

@Override
public boolean isMessageDeleted(Position position) {
lock.readLock().lock();
try {
Expand All @@ -3496,6 +3497,7 @@ public boolean isMessageDeleted(Position position) {
}

//this method will return a copy of the position's ack set
@Override
public long[] getBatchPositionAckSet(Position position) {
if (batchDeletedIndexes != null) {
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.get(position);
Expand Down Expand Up @@ -3618,6 +3620,7 @@ public ManagedCursorMXBean getStats() {
return this.mbean;
}

@Override
public void updateReadStats(int readEntriesCount, long readEntriesSize) {
this.entriesReadCount += readEntriesCount;
this.entriesReadSize += readEntriesSize;
Expand Down Expand Up @@ -3650,6 +3653,7 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
}, null);
}

@Override
public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
if (maxSizeBytes == NO_MAX_SIZE_LIMIT) {
return maxEntries;
Expand Down Expand Up @@ -3712,6 +3716,7 @@ public ManagedLedgerConfig getConfig() {
/***
* Create a non-durable cursor and copy the ack stats.
*/
@Override
public ManagedCursor duplicateNonDurableCursor(String nonDurableCursorName) throws ManagedLedgerException {
NonDurableCursorImpl newNonDurableCursor =
(NonDurableCursorImpl) ledger.newNonDurableCursor(getMarkDeletedPosition(), nonDurableCursorName);
Expand Down Expand Up @@ -3746,4 +3751,24 @@ public ManagedCursorAttributes getManagedCursorAttributes() {
}
return ATTRIBUTES_UPDATER.updateAndGet(this, old -> old != null ? old : new ManagedCursorAttributes(this));
}

@Override
public ManagedLedgerInternalStats.CursorStats getCursorStats() {
ManagedLedgerInternalStats.CursorStats cs = new ManagedLedgerInternalStats.CursorStats();
cs.markDeletePosition = getMarkDeletedPosition().toString();
cs.readPosition = getReadPosition().toString();
cs.waitingReadOp = hasPendingReadRequest();
cs.pendingReadOps = getPendingReadOpsCount();
cs.messagesConsumedCounter = getMessagesConsumedCounter();
cs.cursorLedger = getCursorLedger();
cs.cursorLedgerLastEntry = getCursorLedgerLastEntry();
cs.individuallyDeletedMessages = getIndividuallyDeletedMessages();
cs.lastLedgerSwitchTimestamp = DateFormatter.format(getLastLedgerSwitchTimestamp());
cs.state = getState();
cs.active = isActive();
cs.numberOfEntriesSinceFirstNotAckedMessage = getNumberOfEntriesSinceFirstNotAckedMessage();
cs.totalNonContiguousDeletedMessagesRange = getTotalNonContiguousDeletedMessagesRange();
cs.properties = getProperties();
return cs;
}
}
Loading

0 comments on commit a78cc75

Please sign in to comment.