Skip to content

Commit

Permalink
[fix][broker] Duplicate ByteBuffer when Caching Backlogged Consumers (a…
Browse files Browse the repository at this point in the history
…pache#17105)

Fixes apache#16979 

### Motivation

apache#12258 introduced caching for backlogged consumers. When caching the entry, it is important to duplicate the `ByteBuffer` so that the reader index is not shared. The current code has a race condition where the `ByteBuffer` reference in the cache is shared with the dispatcher. When another consumer reads from the cache, the cache calls `duplicate()` on the shared `ByteBuffer`, which copies the current reader index, which might not be 0 if the original dispatcher read data from the `ByteBuffer`.

Note: it seems like the caching `insert` method creates (or recycles) more `EntryImpl` instances than is really necessary. Changing that is outside this PR's scope, so I am going to leave it as is.

### Modifications

* Create a new `Entry` before inserting it into the cache.
* Add a new test to the `EntryCacheTest`. The test fails before this change and passes after it.
* Update the `EntryCacheTest` mocking so that it returns unique entries when mocking reads from the bookkeeper. Before, all returned `LedgerEntry` objects had ledgerId 0 and entryId 0, which messed with the caching for the new test.

### Verifying this change

This change includes a test that failed before the PR and passes after it.

### Documentation

- [x] `doc-not-needed`
  • Loading branch information
michaeljmarshall authored Aug 16, 2022
1 parent e23a4c7 commit 76f4195
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,9 @@ private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boo
entriesToReturn.add(entry);
totalSize += entry.getLength();
if (shouldCacheEntry) {
insert(entry);
EntryImpl cacheEntry = EntryImpl.create(entry);
insert(cacheEntry);
cacheEntry.release();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
Expand All @@ -34,17 +33,18 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;

import lombok.Cleanup;
import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand All @@ -66,6 +66,7 @@ public void testRead() throws Exception {
when(lh.getId()).thenReturn((long) 0);

EntryCacheManager cacheManager = factory.getEntryCacheManager();
@Cleanup(value = "clear")
EntryCache entryCache = cacheManager.getEntryCache(ml);

byte[] data = new byte[10];
Expand Down Expand Up @@ -98,6 +99,7 @@ public void testReadMissingBefore() throws Exception {
when(lh.getId()).thenReturn((long) 0);

EntryCacheManager cacheManager = factory.getEntryCacheManager();
@Cleanup(value = "clear")
EntryCache entryCache = cacheManager.getEntryCache(ml);

byte[] data = new byte[10];
Expand Down Expand Up @@ -126,6 +128,7 @@ public void testReadMissingAfter() throws Exception {
when(lh.getId()).thenReturn((long) 0);

EntryCacheManager cacheManager = factory.getEntryCacheManager();
@Cleanup(value = "clear")
EntryCache entryCache = cacheManager.getEntryCache(ml);

byte[] data = new byte[10];
Expand Down Expand Up @@ -154,6 +157,7 @@ public void testReadMissingMiddle() throws Exception {
when(lh.getId()).thenReturn((long) 0);

EntryCacheManager cacheManager = factory.getEntryCacheManager();
@Cleanup(value = "clear")
EntryCache entryCache = cacheManager.getEntryCache(ml);

byte[] data = new byte[10];
Expand Down Expand Up @@ -183,6 +187,7 @@ public void testReadMissingMultiple() throws Exception {
when(lh.getId()).thenReturn((long) 0);

EntryCacheManager cacheManager = factory.getEntryCacheManager();
@Cleanup(value = "clear")
EntryCache entryCache = cacheManager.getEntryCache(ml);

byte[] data = new byte[10];
Expand All @@ -206,6 +211,53 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
counter.await();
}

@Test
public void testCachedReadReturnsDifferentByteBuffer() throws Exception {
ReadHandle lh = getLedgerHandle();
when(lh.getId()).thenReturn((long) 0);

EntryCacheManager cacheManager = factory.getEntryCacheManager();
@Cleanup(value = "clear")
EntryCache entryCache = cacheManager.getEntryCache(ml);

CompletableFuture<List<Entry>> cacheMissFutureEntries = new CompletableFuture<>();

entryCache.asyncReadEntry(lh, 0, 1, true, new ReadEntriesCallback() {
public void readEntriesComplete(List<Entry> entries, Object ctx) {
cacheMissFutureEntries.complete(entries);
}

public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
cacheMissFutureEntries.completeExceptionally(exception);
}
}, null);

List<Entry> cacheMissEntries = cacheMissFutureEntries.get();
// Ensure first entry is 0 and
assertEquals(cacheMissEntries.size(), 2);
assertEquals(cacheMissEntries.get(0).getEntryId(), 0);
assertEquals(cacheMissEntries.get(0).getDataBuffer().readerIndex(), 0);

// Move the reader index to simulate consumption
cacheMissEntries.get(0).getDataBuffer().readerIndex(10);

CompletableFuture<List<Entry>> cacheHitFutureEntries = new CompletableFuture<>();

entryCache.asyncReadEntry(lh, 0, 1, true, new ReadEntriesCallback() {
public void readEntriesComplete(List<Entry> entries, Object ctx) {
cacheHitFutureEntries.complete(entries);
}

public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
cacheHitFutureEntries.completeExceptionally(exception);
}
}, null);

List<Entry> cacheHitEntries = cacheHitFutureEntries.get();
assertEquals(cacheHitEntries.get(0).getEntryId(), 0);
assertEquals(cacheHitEntries.get(0).getDataBuffer().readerIndex(), 0);
}

@Test(timeOut = 5000)
public void testReadWithError() throws Exception {
final ReadHandle lh = getLedgerHandle();
Expand All @@ -218,6 +270,7 @@ public void testReadWithError() throws Exception {
}).when(lh).readAsync(anyLong(), anyLong());

EntryCacheManager cacheManager = factory.getEntryCacheManager();
@Cleanup(value = "clear")
EntryCache entryCache = cacheManager.getEntryCache(ml);

byte[] data = new byte[10];
Expand All @@ -239,18 +292,14 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {

static ReadHandle getLedgerHandle() {
final ReadHandle lh = mock(ReadHandle.class);
final LedgerEntry ledgerEntry = mock(LedgerEntry.class, Mockito.CALLS_REAL_METHODS);
doReturn(Unpooled.wrappedBuffer(new byte[10])).when(ledgerEntry).getEntryBuffer();
doReturn((long) 10).when(ledgerEntry).getLength();

doAnswer((invocation) -> {
Object[] args = invocation.getArguments();
long firstEntry = (Long) args[0];
long lastEntry = (Long) args[1];

List<LedgerEntry> entries = new ArrayList<>();
for (int i = 0; i <= (lastEntry - firstEntry); i++) {
entries.add(ledgerEntry);
entries.add(LedgerEntryImpl.create(0, i, 10, Unpooled.wrappedBuffer(new byte[10])));
}
LedgerEntries ledgerEntries = mock(LedgerEntries.class);
doAnswer((invocation2) -> entries.iterator()).when(ledgerEntries).iterator();
Expand Down

0 comments on commit 76f4195

Please sign in to comment.