From 4655285d536136d3cb13be4507b7e44a62741f6c Mon Sep 17 00:00:00 2001 From: Michael Marshall Date: Mon, 15 Aug 2022 15:43:28 -0500 Subject: [PATCH] [fix][broker] Duplicate ByteBuffer when Caching Backlogged Consumers --- .../impl/cache/RangeEntryCacheImpl.java | 4 +- .../mledger/impl/EntryCacheTest.java | 63 ++++++++++++++++--- 2 files changed, 59 insertions(+), 8 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index b118dd2df8377..0d29194a88395 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -309,7 +309,9 @@ private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boo entriesToReturn.add(entry); totalSize += entry.getLength(); if (shouldCacheEntry) { - insert(entry); + EntryImpl cacheEntry = EntryImpl.create(entry); + insert(cacheEntry); + cacheEntry.release(); } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java index f54cdd5b62048..c1ee5a1083a8b 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java @@ -20,7 +20,6 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -34,17 +33,18 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import lombok.Cleanup; import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.client.impl.LedgerEntryImpl; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.cache.EntryCache; import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; -import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; @@ -66,6 +66,7 @@ public void testRead() throws Exception { when(lh.getId()).thenReturn((long) 0); EntryCacheManager cacheManager = factory.getEntryCacheManager(); + @Cleanup(value = "clear") EntryCache entryCache = cacheManager.getEntryCache(ml); byte[] data = new byte[10]; @@ -98,6 +99,7 @@ public void testReadMissingBefore() throws Exception { when(lh.getId()).thenReturn((long) 0); EntryCacheManager cacheManager = factory.getEntryCacheManager(); + @Cleanup(value = "clear") EntryCache entryCache = cacheManager.getEntryCache(ml); byte[] data = new byte[10]; @@ -126,6 +128,7 @@ public void testReadMissingAfter() throws Exception { when(lh.getId()).thenReturn((long) 0); EntryCacheManager cacheManager = factory.getEntryCacheManager(); + @Cleanup(value = "clear") EntryCache entryCache = cacheManager.getEntryCache(ml); byte[] data = new byte[10]; @@ -154,6 +157,7 @@ public void testReadMissingMiddle() throws Exception { when(lh.getId()).thenReturn((long) 0); EntryCacheManager cacheManager = factory.getEntryCacheManager(); + @Cleanup(value = "clear") EntryCache entryCache = cacheManager.getEntryCache(ml); byte[] data = new byte[10]; @@ -183,6 +187,7 @@ public void testReadMissingMultiple() throws Exception { when(lh.getId()).thenReturn((long) 0); EntryCacheManager cacheManager = factory.getEntryCacheManager(); + @Cleanup(value = "clear") EntryCache entryCache = cacheManager.getEntryCache(ml); byte[] data = new byte[10]; @@ -206,6 +211,53 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { counter.await(); } + @Test + public void testCachedReadReturnsDifferentByteBuffer() throws Exception { + ReadHandle lh = getLedgerHandle(); + when(lh.getId()).thenReturn((long) 0); + + EntryCacheManager cacheManager = factory.getEntryCacheManager(); + @Cleanup(value = "clear") + EntryCache entryCache = cacheManager.getEntryCache(ml); + + CompletableFuture> cacheMissFutureEntries = new CompletableFuture<>(); + + entryCache.asyncReadEntry(lh, 0, 1, true, new ReadEntriesCallback() { + public void readEntriesComplete(List entries, Object ctx) { + cacheMissFutureEntries.complete(entries); + } + + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + cacheMissFutureEntries.completeExceptionally(exception); + } + }, null); + + List cacheMissEntries = cacheMissFutureEntries.get(); + // Ensure first entry is 0 and + assertEquals(cacheMissEntries.size(), 2); + assertEquals(cacheMissEntries.get(0).getEntryId(), 0); + assertEquals(cacheMissEntries.get(0).getDataBuffer().readerIndex(), 0); + + // Move the reader index to simulate consumption + cacheMissEntries.get(0).getDataBuffer().readerIndex(10); + + CompletableFuture> cacheHitFutureEntries = new CompletableFuture<>(); + + entryCache.asyncReadEntry(lh, 0, 1, true, new ReadEntriesCallback() { + public void readEntriesComplete(List entries, Object ctx) { + cacheHitFutureEntries.complete(entries); + } + + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + cacheHitFutureEntries.completeExceptionally(exception); + } + }, null); + + List cacheHitEntries = cacheHitFutureEntries.get(); + assertEquals(cacheHitEntries.get(0).getEntryId(), 0); + assertEquals(cacheHitEntries.get(0).getDataBuffer().readerIndex(), 0); + } + @Test(timeOut = 5000) public void testReadWithError() throws Exception { final ReadHandle lh = getLedgerHandle(); @@ -218,6 +270,7 @@ public void testReadWithError() throws Exception { }).when(lh).readAsync(anyLong(), anyLong()); EntryCacheManager cacheManager = factory.getEntryCacheManager(); + @Cleanup(value = "clear") EntryCache entryCache = cacheManager.getEntryCache(ml); byte[] data = new byte[10]; @@ -239,10 +292,6 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { static ReadHandle getLedgerHandle() { final ReadHandle lh = mock(ReadHandle.class); - final LedgerEntry ledgerEntry = mock(LedgerEntry.class, Mockito.CALLS_REAL_METHODS); - doReturn(Unpooled.wrappedBuffer(new byte[10])).when(ledgerEntry).getEntryBuffer(); - doReturn((long) 10).when(ledgerEntry).getLength(); - doAnswer((invocation) -> { Object[] args = invocation.getArguments(); long firstEntry = (Long) args[0]; @@ -250,7 +299,7 @@ static ReadHandle getLedgerHandle() { List entries = new ArrayList<>(); for (int i = 0; i <= (lastEntry - firstEntry); i++) { - entries.add(ledgerEntry); + entries.add(LedgerEntryImpl.create(0, i, 10, Unpooled.wrappedBuffer(new byte[10]))); } LedgerEntries ledgerEntries = mock(LedgerEntries.class); doAnswer((invocation2) -> entries.iterator()).when(ledgerEntries).iterator();