Skip to content

Commit

Permalink
[pulsar-broker] Support caching to drain backlog consumers (apache#12258
Browse files Browse the repository at this point in the history
)

(cherry picked from commit 5869791)
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
nodece committed Sep 10, 2024
1 parent 85fd190 commit e85ff54
Show file tree
Hide file tree
Showing 18 changed files with 305 additions and 21 deletions.
10 changes: 10 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,16 @@ managedLedgerAddEntryTimeoutSeconds=0
# Of course, use a smaller value may degrade consumption throughput. Default is 10ms.
managedLedgerNewEntriesCheckDelayInMillis=10

# Minimum cursors that must be in backlog state to cache and reuse the read entries.
# (Default =0 to disable backlog reach cache)
managedLedgerMinimumBacklogCursorsForCaching=0

# Minimum backlog entries for any cursor before start caching reads.
managedLedgerMinimumBacklogEntriesForCaching=100

# Maximum backlog entry difference to prevent caching entries that can't be reused.
managedLedgerMaxBacklogBetweenCursorsForCaching=10000

# Use Open Range-Set to cache unacked messages
managedLedgerUnackedRangesOpenCacheSetEnabled=true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,4 +676,9 @@ default void skipNonRecoverableLedger(long ledgerId){}
* roll over that ledger if inactive.
*/
void checkInactiveLedgerAndRollOver();

/**
* Check if managed ledger should cache backlog reads.
*/
void checkCursorsToCacheEntries();
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ public class ManagedLedgerConfig {
@Getter
@Setter
private boolean cacheEvictionByMarkDeletedPosition = false;
private int minimumBacklogCursorsForCaching = 0;
private int minimumBacklogEntriesForCaching = 1000;
private int maxBacklogBetweenCursorsForCaching = 1000;

public boolean isCreateIfMissing() {
return createIfMissing;
Expand Down Expand Up @@ -674,4 +677,60 @@ public void setInactiveLedgerRollOverTime(int inactiveLedgerRollOverTimeMs, Time
this.inactiveLedgerRollOverTimeMs = (int) unit.toMillis(inactiveLedgerRollOverTimeMs);
}

/**
* Minimum cursors with backlog after which broker is allowed to cache read entries to reuse them for other cursors'
* backlog reads. (Default = 0, broker will not cache backlog reads)
*
* @return
*/
public int getMinimumBacklogCursorsForCaching() {
return minimumBacklogCursorsForCaching;
}

/**
* Set Minimum cursors with backlog after which broker is allowed to cache read entries to reuse them for other
* cursors' backlog reads.
*
* @param minimumBacklogCursorsForCaching
*/
public void setMinimumBacklogCursorsForCaching(int minimumBacklogCursorsForCaching) {
this.minimumBacklogCursorsForCaching = minimumBacklogCursorsForCaching;
}

/**
* Minimum backlog should exist to leverage caching for backlog reads.
*
* @return
*/
public int getMinimumBacklogEntriesForCaching() {
return minimumBacklogEntriesForCaching;
}

/**
* Set Minimum backlog after that broker will start caching backlog reads.
*
* @param minimumBacklogEntriesForCaching
*/
public void setMinimumBacklogEntriesForCaching(int minimumBacklogEntriesForCaching) {
this.minimumBacklogEntriesForCaching = minimumBacklogEntriesForCaching;
}

/**
* Max backlog gap between backlogged cursors while caching to avoid caching entry which can be
* invalidated before other backlog cursor can reuse it from cache.
*
* @return
*/
public int getMaxBacklogBetweenCursorsForCaching() {
return maxBacklogBetweenCursorsForCaching;
}

/**
* Set maximum backlog distance between backlogged curosr to avoid caching unused entry.
*
* @param maxBacklogBetweenCursorsForCaching
*/
public void setMaxBacklogBetweenCursorsForCaching(int maxBacklogBetweenCursorsForCaching) {
this.maxBacklogBetweenCursorsForCaching = maxBacklogBetweenCursorsForCaching;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ public class ManagedCursorImpl implements ManagedCursor {
private long entriesReadSize;
private int individualDeletedMessagesSerializedSize;
private static final String COMPACTION_CURSOR_NAME = "__compaction";
private volatile boolean cacheReadEntry = false;

// active state cache in ManagedCursor. It should be in sync with the state in activeCursors in ManagedLedger.
private volatile boolean isActive = false;
Expand Down Expand Up @@ -3360,9 +3361,17 @@ public void setState(State state) {
this.state = state;
}

public ManagedLedgerConfig getConfig() {
return getManagedLedger().getConfig();
public void setCacheReadEntry(boolean cacheReadEntry) {
this.cacheReadEntry = cacheReadEntry;
}

public boolean isCacheReadEntry() {
return cacheReadEntry;
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class);

public ManagedLedgerConfig getConfig() {
return getManagedLedger().getConfig();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private long lastOffloadSuccessTimestamp = 0;
private long lastOffloadFailureTimestamp = 0;

private int minBacklogCursorsForCaching = 0;
private int minBacklogEntriesForCaching = 1000;
private int maxBacklogBetweenCursorsForCaching = 1000;

private static final Random random = new Random(System.currentTimeMillis());
private long maximumRolloverTimeMs;
protected final Supplier<Boolean> mlOwnershipChecker;
Expand Down Expand Up @@ -343,10 +347,13 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
this.maximumRolloverTimeMs = getMaximumRolloverTimeMs(config);
this.mlOwnershipChecker = mlOwnershipChecker;
this.propertiesMap = Maps.newHashMap();
this.inactiveLedgerRollOverTimeMs = config.getInactiveLedgerRollOverTimeMs();
if (config.getManagedLedgerInterceptor() != null) {
this.managedLedgerInterceptor = config.getManagedLedgerInterceptor();
}
this.inactiveLedgerRollOverTimeMs = config.getInactiveLedgerRollOverTimeMs();
this.minBacklogCursorsForCaching = config.getMinimumBacklogCursorsForCaching();
this.minBacklogEntriesForCaching = config.getMinimumBacklogEntriesForCaching();
this.maxBacklogBetweenCursorsForCaching = config.getMaxBacklogBetweenCursorsForCaching();
}

synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callback, final Object ctx) {
Expand Down Expand Up @@ -2044,7 +2051,7 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry,
lastEntry);
}
asyncReadEntry(ledger, firstEntry, lastEntry, false, opReadEntry, opReadEntry.ctx);
asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry, opReadEntry.ctx);
}

protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntryCallback callback, Object ctx) {
Expand All @@ -2061,18 +2068,20 @@ protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntr
}
}

protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, boolean isSlowestReader,
OpReadEntry opReadEntry, Object ctx) {
protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, OpReadEntry opReadEntry,
Object ctx) {
if (config.getReadEntryTimeoutSeconds() > 0) {
// set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
long createdTime = System.nanoTime();
ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
opReadEntry, readOpCount, createdTime, ctx);
lastReadCallback = readCallback;
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, readCallback, readOpCount);
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(),
readCallback, readOpCount);
} else {
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, opReadEntry, ctx);
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, opReadEntry.cursor.isCacheReadEntry(), opReadEntry,
ctx);
}
}

Expand Down Expand Up @@ -4359,4 +4368,41 @@ public Position getTheSlowestNonDurationReadPosition() {
return theSlowestNonDurableReadPosition;
}


public void checkCursorsToCacheEntries() {
if (minBacklogCursorsForCaching < 1) {
return;
}
Iterator<ManagedCursor> it = cursors.iterator();
Map<ManagedCursorImpl, Long> cursorBacklogMap = new HashMap<>();
while (it.hasNext()) {
ManagedCursorImpl cursor = (ManagedCursorImpl) it.next();
if (cursor.isDurable()) {
cursorBacklogMap.put(cursor, cursor.getNumberOfEntries());
}
}
int cursorsInSameBacklogRange = 0;
for (java.util.Map.Entry<ManagedCursorImpl, Long> cursor : cursorBacklogMap.entrySet()) {
cursorsInSameBacklogRange = 0;
for (java.util.Map.Entry<ManagedCursorImpl, Long> other : cursorBacklogMap.entrySet()) {
if (cursor.equals(other)) {
continue;
}
long backlog = cursor.getValue();
// if backlog difference is > maxBacklogBetweenCursorsForCaching (eg: 10000) then cached entry might be
// invalidated by the time so, skip caching such long range messages.
if (backlog < minBacklogEntriesForCaching) {
continue;
}
if (Math.abs(backlog - other.getValue()) <= maxBacklogBetweenCursorsForCaching) {
cursorsInSameBacklogRange++;
}
}
cursor.getKey().setCacheReadEntry(cursorsInSameBacklogRange >= minBacklogCursorsForCaching);
if (log.isDebugEnabled()) {
log.info("{} Enabling cache read = {} for {}", name,
cursorsInSameBacklogRange >= minBacklogCursorsForCaching, cursor.getKey().getName());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,11 @@ public void safeRun() {
EntryImpl entry = EntryImpl.create(ledger.getId(), entryId, data);
// EntryCache.insert: duplicates entry by allocating new entry and data. so, recycle entry after calling
// insert
ml.entryCache.insert(entry);
entry.release();
// Entry cache doesn't copy the data if entry already exist into the cache.
// Backlog read tries to add entry into cache which can try to add duplicate entry into cache.
if (ml.entryCache.insert(entry)) {
entry.release();
}
}

PositionImpl lastEntry = PositionImpl.get(ledger.getId(), entryId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ public interface EntryCache extends Comparable<EntryCache> {
* the first entry to read (inclusive)
* @param lastEntry
* the last entry to read (inclusive)
* @param isSlowestReader
* whether the reader cursor is the most far behind in the stream
* @param shouldCacheEntry
* whether the read entry should be cached
* @param callback
* the callback object that will be notified when read is done
* @param ctx
* the context object
*/
void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
ReadEntriesCallback callback, Object ctx);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ public boolean insert(EntryImpl entry) {
entry.getLength());
}

PositionImpl position = entry.getPosition();
if (entries.exists(position)) {
return false;
}

ByteBuf cachedData;
if (copyEntries) {
cachedData = copyEntry(entry);
Expand All @@ -109,7 +114,6 @@ public boolean insert(EntryImpl entry) {
cachedData = entry.getDataBuffer().retain();
}

PositionImpl position = entry.getPosition();
EntryImpl cacheEntry = EntryImpl.create(position, cachedData);
cachedData.release();
if (entries.put(position, cacheEntry)) {
Expand Down Expand Up @@ -239,10 +243,10 @@ private void asyncReadEntry0(ReadHandle lh, PositionImpl position, final ReadEnt
}

@Override
public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
final ReadEntriesCallback callback, Object ctx) {
try {
asyncReadEntry0(lh, firstEntry, lastEntry, isSlowestReader, callback, ctx);
asyncReadEntry0(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx);
} catch (Throwable t) {
log.warn("failed to read entries for {}--{}-{}", lh.getId(), firstEntry, lastEntry, t);
// invalidate all entries related to ledger from the cache (it might happen if entry gets corrupt
Expand All @@ -254,7 +258,7 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry,
final ReadEntriesCallback callback, Object ctx) {
final long ledgerId = lh.getId();
final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
Expand Down Expand Up @@ -303,9 +307,11 @@ private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boo
final List<EntryImpl> entriesToReturn = Lists.newArrayListWithExpectedSize(entriesToRead);
for (LedgerEntry e : ledgerEntries) {
EntryImpl entry = RangeEntryCacheManagerImpl.create(e, interceptor);

entriesToReturn.add(entry);
totalSize += entry.getLength();
if (shouldCacheEntry) {
insert(entry);
}
}

manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ public boolean put(Key key, Value value) {
}
}

public boolean exists(Key key) {
return key != null ? entries.containsKey(key) : true;
}

public Value get(Key key) {
Value value = entries.get(key);
if (value == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3152,7 +3152,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {

}, null, PositionImpl.LATEST);
ledger.asyncReadEntry(ledgerHandle, PositionImpl.EARLIEST.getEntryId(), PositionImpl.EARLIEST.getEntryId(),
false, opReadEntry, ctxStr);
opReadEntry, ctxStr);
retryStrategically((test) -> {
return responseException2.get() != null;
}, 5, 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1988,13 +1988,35 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ "If value is invalid or NONE, then save the ManagedLedgerInfo bytes data directly.")
private String managedLedgerInfoCompressionType = "NONE";


@FieldContext(category = CATEGORY_STORAGE_ML,
doc = "ManagedCursorInfo compression type, option values (NONE, LZ4, ZLIB, ZSTD, SNAPPY). \n"
+ "If value is NONE, then save the ManagedCursorInfo bytes data directly.")
private String managedCursorInfoCompressionType = "NONE";

/*** --- Load balancer. --- ****/
@FieldContext(
dynamic = true,
category = CATEGORY_STORAGE_ML,
doc = "Minimum cursors that must be in backlog state to cache and reuse the read entries."
+ "(Default =0 to disable backlog reach cache)"
)
private int managedLedgerMinimumBacklogCursorsForCaching = 0;

@FieldContext(
dynamic = true,
category = CATEGORY_STORAGE_ML,
doc = "Minimum backlog entries for any cursor before start caching reads"
)
private int managedLedgerMinimumBacklogEntriesForCaching = 1000;
@FieldContext(
dynamic = true,
category = CATEGORY_STORAGE_ML,
doc = "Maximum backlog entry difference to prevent caching entries that can't be reused"
)
private int managedLedgerMaxBacklogBetweenCursorsForCaching = 1000;

/*** --- Load balancer. --- ****/
@FieldContext(
category = CATEGORY_LOAD_BALANCER,
doc = "Enable load balancer"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1827,6 +1827,12 @@ private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull T
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS);
managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
serviceConfig.isCacheEvictionByMarkDeletedPosition());
managedLedgerConfig.setMinimumBacklogCursorsForCaching(
serviceConfig.getManagedLedgerMinimumBacklogCursorsForCaching());
managedLedgerConfig.setMinimumBacklogEntriesForCaching(
serviceConfig.getManagedLedgerMinimumBacklogEntriesForCaching());
managedLedgerConfig.setMaxBacklogBetweenCursorsForCaching(
serviceConfig.getManagedLedgerMaxBacklogBetweenCursorsForCaching());

OffloadPoliciesImpl nsLevelOffloadPolicies =
(OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ public synchronized void updateStats(
// this task: helps to activate inactive-backlog-cursors which have caught up and
// connected, also deactivate active-backlog-cursors which has backlog
topic.checkBackloggedCursors();
topic.checkCursorsToCacheEntries();
// check if topic is inactive and require ledger rollover
((PersistentTopic) topic).checkInactiveLedgers();
} else if (topic instanceof NonPersistentTopic) {
Expand Down Expand Up @@ -259,4 +260,4 @@ public void recordConnectionCreateSuccess() {
public void recordConnectionCreateFail() {
brokerOperabilityMetrics.recordConnectionCreateFail();
}
}
}
Loading

0 comments on commit e85ff54

Please sign in to comment.