Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Duplicate ByteBuffer when Caching Backlogged Consumers #17105

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,9 @@ private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boo
entriesToReturn.add(entry);
totalSize += entry.getLength();
if (shouldCacheEntry) {
insert(entry);
EntryImpl cacheEntry = EntryImpl.create(entry);
insert(cacheEntry);
cacheEntry.release();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
Expand All @@ -34,17 +33,18 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;

import lombok.Cleanup;
import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand All @@ -66,6 +66,7 @@ public void testRead() throws Exception {
when(lh.getId()).thenReturn((long) 0);

EntryCacheManager cacheManager = factory.getEntryCacheManager();
@Cleanup(value = "clear")
EntryCache entryCache = cacheManager.getEntryCache(ml);

byte[] data = new byte[10];
Expand Down Expand Up @@ -98,6 +99,7 @@ public void testReadMissingBefore() throws Exception {
when(lh.getId()).thenReturn((long) 0);

EntryCacheManager cacheManager = factory.getEntryCacheManager();
@Cleanup(value = "clear")
EntryCache entryCache = cacheManager.getEntryCache(ml);

byte[] data = new byte[10];
Expand Down Expand Up @@ -126,6 +128,7 @@ public void testReadMissingAfter() throws Exception {
when(lh.getId()).thenReturn((long) 0);

EntryCacheManager cacheManager = factory.getEntryCacheManager();
@Cleanup(value = "clear")
EntryCache entryCache = cacheManager.getEntryCache(ml);

byte[] data = new byte[10];
Expand Down Expand Up @@ -154,6 +157,7 @@ public void testReadMissingMiddle() throws Exception {
when(lh.getId()).thenReturn((long) 0);

EntryCacheManager cacheManager = factory.getEntryCacheManager();
@Cleanup(value = "clear")
EntryCache entryCache = cacheManager.getEntryCache(ml);

byte[] data = new byte[10];
Expand Down Expand Up @@ -183,6 +187,7 @@ public void testReadMissingMultiple() throws Exception {
when(lh.getId()).thenReturn((long) 0);

EntryCacheManager cacheManager = factory.getEntryCacheManager();
@Cleanup(value = "clear")
EntryCache entryCache = cacheManager.getEntryCache(ml);

byte[] data = new byte[10];
Expand All @@ -206,6 +211,53 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
counter.await();
}

@Test
public void testCachedReadReturnsDifferentByteBuffer() throws Exception {
ReadHandle lh = getLedgerHandle();
when(lh.getId()).thenReturn((long) 0);

EntryCacheManager cacheManager = factory.getEntryCacheManager();
@Cleanup(value = "clear")
EntryCache entryCache = cacheManager.getEntryCache(ml);

CompletableFuture<List<Entry>> cacheMissFutureEntries = new CompletableFuture<>();

entryCache.asyncReadEntry(lh, 0, 1, true, new ReadEntriesCallback() {
public void readEntriesComplete(List<Entry> entries, Object ctx) {
cacheMissFutureEntries.complete(entries);
}

public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
cacheMissFutureEntries.completeExceptionally(exception);
}
}, null);

List<Entry> cacheMissEntries = cacheMissFutureEntries.get();
// Ensure first entry is 0 and
assertEquals(cacheMissEntries.size(), 2);
assertEquals(cacheMissEntries.get(0).getEntryId(), 0);
assertEquals(cacheMissEntries.get(0).getDataBuffer().readerIndex(), 0);

// Move the reader index to simulate consumption
cacheMissEntries.get(0).getDataBuffer().readerIndex(10);

CompletableFuture<List<Entry>> cacheHitFutureEntries = new CompletableFuture<>();

entryCache.asyncReadEntry(lh, 0, 1, true, new ReadEntriesCallback() {
public void readEntriesComplete(List<Entry> entries, Object ctx) {
cacheHitFutureEntries.complete(entries);
}

public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
cacheHitFutureEntries.completeExceptionally(exception);
}
}, null);

List<Entry> cacheHitEntries = cacheHitFutureEntries.get();
assertEquals(cacheHitEntries.get(0).getEntryId(), 0);
assertEquals(cacheHitEntries.get(0).getDataBuffer().readerIndex(), 0);
}

@Test(timeOut = 5000)
public void testReadWithError() throws Exception {
final ReadHandle lh = getLedgerHandle();
Expand All @@ -218,6 +270,7 @@ public void testReadWithError() throws Exception {
}).when(lh).readAsync(anyLong(), anyLong());

EntryCacheManager cacheManager = factory.getEntryCacheManager();
@Cleanup(value = "clear")
EntryCache entryCache = cacheManager.getEntryCache(ml);

byte[] data = new byte[10];
Expand All @@ -239,18 +292,14 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {

static ReadHandle getLedgerHandle() {
final ReadHandle lh = mock(ReadHandle.class);
final LedgerEntry ledgerEntry = mock(LedgerEntry.class, Mockito.CALLS_REAL_METHODS);
doReturn(Unpooled.wrappedBuffer(new byte[10])).when(ledgerEntry).getEntryBuffer();
doReturn((long) 10).when(ledgerEntry).getLength();

doAnswer((invocation) -> {
Object[] args = invocation.getArguments();
long firstEntry = (Long) args[0];
long lastEntry = (Long) args[1];

List<LedgerEntry> entries = new ArrayList<>();
for (int i = 0; i <= (lastEntry - firstEntry); i++) {
entries.add(ledgerEntry);
entries.add(LedgerEntryImpl.create(0, i, 10, Unpooled.wrappedBuffer(new byte[10])));
}
LedgerEntries ledgerEntries = mock(LedgerEntries.class);
doAnswer((invocation2) -> entries.iterator()).when(ledgerEntries).iterator();
Expand Down