From 8ae503624d1d896c7269ba1ae83d9b3c786a7d85 Mon Sep 17 00:00:00 2001 From: Kezhu Wang Date: Fri, 21 Jan 2022 11:33:51 +0800 Subject: [PATCH] Fix ack broken entry succeed in ensemble change unsetting Currently, in `LedgerHandle.unsetSuccessAndSendWriteRequest`, `LedgerHandle.sendAddSuccessCallbacks` could be called by `PendingAddOp.unsetSuccessAndSendWriteRequest` **before** all pending adds evaluated. This will make entries which met ack requirement in old ensemble but have not evaluated yet succeed in new ensemble. Fixes #3005. --- .../bookkeeper/client/LedgerHandle.java | 14 ++ .../bookkeeper/client/PendingAddOp.java | 17 +-- .../client/BookieWriteLedgerTest.java | 120 +++++++++++++++++- 3 files changed, 134 insertions(+), 17 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 661499e96cb..39c3d0eaace 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -2002,6 +2002,20 @@ void unsetSuccessAndSendWriteRequest(List ensemble, final Set pendingAddOp.unsetSuccessAndSendWriteRequest(ensemble, bookieIndex); } } + // Some entries could fulfill ack requirement before and after ensemble changed. + // We need to invoke #sendAddSuccessCallbacks() for such entries because + // they may have already completed, but they are just waiting for the ensemble + // change to complete. + // E.g. + // ensemble (A, B, C, D), entry k is written to (A, B, D). An ensemble change + // happens to replace C with E. Entry k does not complete until C is + // replaced with E successfully. When the ensemble change completes, it tries + // to unset entry k. C however is not in k's write set, so no entry is written + // again, and no one triggers #sendAddSuccessCallbacks. Consequently, k never + // completes. + // + // We call sendAddSuccessCallback after unsetting all pending adds to cover this case. + sendAddSuccessCallbacks(); } void registerOperationFailureOnBookie(BookieId bookie, long entryId) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java index 729d9fe077c..3fb7479dac0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java @@ -198,24 +198,11 @@ void unsetSuccessAndSendWriteRequest(List ensemble, int bookieIndex) { // now return; } - // Suppose that unset doesn't happen on the write set of an entry. In this - // case we don't need to resend the write request upon an ensemble change. - // We do need to invoke #sendAddSuccessCallbacks() for such entries because - // they may have already completed, but they are just waiting for the ensemble - // to change. - // E.g. - // ensemble (A, B, C, D), entry k is written to (A, B, D). An ensemble change - // happens to replace C with E. Entry k does not complete until C is - // replaced with E successfully. When the ensemble change completes, it tries - // to unset entry k. C however is not in k's write set, so no entry is written - // again, and no one triggers #sendAddSuccessCallbacks. Consequently, k never - // completes. - // - // We call sendAddSuccessCallback when unsetting t cover this case. DistributionSchedule.WriteSet writeSet = lh.distributionSchedule.getWriteSet(entryId); try { if (!writeSet.contains(bookieIndex)) { - lh.sendAddSuccessCallbacks(); + // This bookie doesn't belong to the write set of this entry. In this + // case we don't need to resend the write request upon an ensemble change. return; } } finally { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java index fb9f83a6aa3..a29292e2366 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java @@ -28,6 +28,13 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; + import com.google.common.collect.Lists; import io.netty.buffer.AbstractByteBufAllocator; import io.netty.buffer.ByteBuf; @@ -47,6 +54,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.bookkeeper.bookie.BookieException; @@ -57,6 +66,8 @@ import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.client.api.WriteAdvHandle; +import org.apache.bookkeeper.common.util.OrderedExecutor; +import org.apache.bookkeeper.common.util.SafeRunnable; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.LedgerMetadataSerDe; import org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory; @@ -67,6 +78,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; import org.powermock.reflect.Whitebox; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,6 +138,13 @@ public BookieWriteLedgerTest() { baseClientConf.setLedgerManagerFactoryClassName(ledgerManagerFactory); } + private byte[] randomPayload() { + ByteBuffer entry = ByteBuffer.allocate(4); + entry.putInt(rng.nextInt(maxInt)); + entry.position(0); + return entry.array(); + } + /** * Verify write when few bookie failures in last ensemble and forcing * ensemble reformation. @@ -694,7 +713,6 @@ public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) { entry = ByteBuffer.allocate(4); entry.putInt(rng.nextInt(maxInt)); entry.position(0); - entries1.add(entry.array()); try { lh.addEntry(3, entry.array()); fail("AddEntry is called after the close of LedgerHandle," @@ -848,6 +866,104 @@ public void testLedgerCreateAdvWithLedgerIdInLoop2() throws Exception { } } + @Test + public void testWrittenEntriesBlockedByEnsembleChanging() throws Throwable { + // This test constructs a scenario: + // * Ensemble size is two, write/ack quorum size is one. Let's assume + // initial ensemble members are b0 and b1. + // * Write four entries e0, e2, .. e3. The distribution algorithm will + // write e0 and e2 to b0, e1 and e3 to b1. + // * e1 completes its write. Bookie b1 crashes. Ensemble changing initiates. + // e0 completes its write but blocked from success due to ensemble changing. + // * Ensemble changing completes. The ledger will unset success and resend + // write requests if needed. + // + // In this scenario, after ensemble changed, all pending entries should proceed + // to success finally. Especially, e1 should unset its success and resend its write. + int numberEntries = 4; + + // Trigger e3's sending only after bookie b1 failed. + CompletableFuture b1FailedFuture = new CompletableFuture<>(); + + // Intercept ensemble changing and trigger e0's sending so that successfully written + // entries e0, e1 and e3 are blocked by ensemble changing. + CompletableFuture ensembleChangingFuture = new CompletableFuture<>(); + + // Start ensemble changing work after e0's write completes. + CompletableFuture e0WrittenFuture = new CompletableFuture<>(); + + // Kill bookie b1 after e1 written. + CompletableFuture e1WrittenFuture = new CompletableFuture<>(); + + CountDownLatch entriesAddedLatch = new CountDownLatch(numberEntries); + + lh = bkc.createLedger(2, 1, digestType, ledgerPassword); + + ClientContext clientCtx = lh.clientCtx; + OrderedExecutor executor = clientCtx.getMainWorkerPool(); + OrderedExecutor spiedExecutor = spy(executor); + + ClientContext spiedClientCtx = spy(clientCtx); + doReturn(spiedExecutor).when(spiedClientCtx).getMainWorkerPool(); + Whitebox.setInternalState(lh, "clientCtx", spiedClientCtx); + + lh = spy(lh); + doAnswer(invocation -> { + ensembleChangingFuture.complete(invocation); + return null; + }).when(lh).ensembleChangeLoop(any(), any()); + + doAnswer(invocation -> { + Object callable = invocation.getArgument(1); + if (callable instanceof PendingAddOp) { + PendingAddOp addOp = (PendingAddOp) callable; + if (addOp.entryId == 0) { + ensembleChangingFuture.join(); + PendingAddOp spiedAddOp = spy(addOp); + doAnswer(invocation1 -> { + e0WrittenFuture.complete(null); + return invocation1.callRealMethod(); + }).when(spiedAddOp).writeComplete(anyInt(), anyLong(), anyLong(), any(), any()); + executor.executeOrdered(lh.ledgerId, spiedAddOp); + } else if (addOp.entryId == 3) { + b1FailedFuture.join(); + } else if (addOp.entryId == 1) { + PendingAddOp spiedAddOp = spy(addOp); + doAnswer(invocation1 -> { + e1WrittenFuture.complete(null); + return invocation1.callRealMethod(); + }).when(spiedAddOp).writeComplete(anyInt(), anyLong(), anyLong(), any(), any()); + executor.executeOrdered(lh.ledgerId, spiedAddOp); + } + } + return invocation.callRealMethod(); + }).when(spiedExecutor).executeOrdered(anyLong(), any(SafeRunnable.class)); + + ExecutorService executorService = Executors.newCachedThreadPool(); + for (int i = 0; i < numberEntries; i++) { + entries1.add(new byte[0]); + executorService.execute(() -> { + byte[] bytes = randomPayload(); + CompletableFuture appending = lh.appendAsync(bytes); + long entryId = appending.join(); + int entryIndex = (int) entryId; + entries1.set(entryIndex, bytes); + entriesAddedLatch.countDown(); + }); + } + + e1WrittenFuture.join(); + BookieId b1 = lh.getCurrentEnsemble().get(1); + killBookie(b1); + b1FailedFuture.complete(null); + e0WrittenFuture.join(); + ensembleChangingFuture.join().callRealMethod(); + + entriesAddedLatch.await(); + + readEntries(lh, entries1); + } + /** * Verify asynchronous writing when few bookie failures in last ensemble. */ @@ -1436,7 +1552,7 @@ public void testLedgerMetadataTest() throws Exception { } private void readEntries(LedgerHandle lh, List entries) throws InterruptedException, BKException { - ls = lh.readEntries(0, numEntriesToWrite - 1); + ls = lh.readEntries(0, entries.size() - 1); int index = 0; while (ls.hasMoreElements()) { ByteBuffer origbb = ByteBuffer.wrap(entries.get(index++));