Skip to content

Commit

Permalink
Fix ack broken entry succeed in ensemble change unsetting
Browse files Browse the repository at this point in the history
Currently, in `LedgerHandle.unsetSuccessAndSendWriteRequest`,
`LedgerHandle.sendAddSuccessCallbacks` could be called by
`PendingAddOp.unsetSuccessAndSendWriteRequest` **before** all pending
adds evaluated. This will make entries which met ack requirement in old
ensemble but have not evaluated yet succeed in new ensemble.

Fixes #3005.
  • Loading branch information
kezhuw committed Jul 30, 2022
1 parent 4ee8c04 commit 5c1014a
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2015,6 +2015,20 @@ void unsetSuccessAndSendWriteRequest(List<BookieId> ensemble, final Set<Integer>
pendingAddOp.unsetSuccessAndSendWriteRequest(ensemble, bookieIndex);
}
}
// Some entries could fulfill ack requirement before and after ensemble changed.
// We need to invoke #sendAddSuccessCallbacks() for such entries because
// they may have already completed, but they are just waiting for the ensemble
// change to complete.
// E.g.
// ensemble (A, B, C, D), entry k is written to (A, B, D). An ensemble change
// happens to replace C with E. Entry k does not complete until C is
// replaced with E successfully. When the ensemble change completes, it tries
// to unset entry k. C however is not in k's write set, so no entry is written
// again, and no one triggers #sendAddSuccessCallbacks. Consequently, k never
// completes.
//
// We call sendAddSuccessCallback after unsetting all pending adds to cover this case.
sendAddSuccessCallbacks();
}

void registerOperationFailureOnBookie(BookieId bookie, long entryId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,24 +196,11 @@ void unsetSuccessAndSendWriteRequest(List<BookieId> ensemble, int bookieIndex) {
// now
return;
}
// Suppose that unset doesn't happen on the write set of an entry. In this
// case we don't need to resend the write request upon an ensemble change.
// We do need to invoke #sendAddSuccessCallbacks() for such entries because
// they may have already completed, but they are just waiting for the ensemble
// to change.
// E.g.
// ensemble (A, B, C, D), entry k is written to (A, B, D). An ensemble change
// happens to replace C with E. Entry k does not complete until C is
// replaced with E successfully. When the ensemble change completes, it tries
// to unset entry k. C however is not in k's write set, so no entry is written
// again, and no one triggers #sendAddSuccessCallbacks. Consequently, k never
// completes.
//
// We call sendAddSuccessCallback when unsetting t cover this case.
DistributionSchedule.WriteSet writeSet = lh.distributionSchedule.getWriteSet(entryId);
try {
if (!writeSet.contains(bookieIndex)) {
lh.sendAddSuccessCallbacks();
// This bookie doesn't belong to the write set of this entry. In this
// case we don't need to resend the write request upon an ensemble change.
return;
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

import com.google.common.collect.Lists;
import io.netty.buffer.AbstractByteBufAllocator;
Expand All @@ -48,6 +54,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.bookkeeper.bookie.BookieException;
Expand All @@ -58,6 +66,8 @@
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.api.WriteAdvHandle;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerMetadataSerDe;
import org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory;
Expand All @@ -68,6 +78,7 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -127,6 +138,13 @@ public BookieWriteLedgerTest() {
baseClientConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
}

private byte[] randomPayload() {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
return entry.array();
}

/**
* Verify write when few bookie failures in last ensemble and forcing
* ensemble reformation.
Expand Down Expand Up @@ -695,7 +713,6 @@ public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries1.add(entry.array());
try {
lh.addEntry(3, entry.array());
fail("AddEntry is called after the close of LedgerHandle,"
Expand Down Expand Up @@ -849,6 +866,104 @@ public void testLedgerCreateAdvWithLedgerIdInLoop2() throws Exception {
}
}

@Test
public void testWrittenEntriesBlockedByEnsembleChanging() throws Throwable {
// This test constructs a scenario:
// * Ensemble size is two, write/ack quorum size is one. Let's assume
// initial ensemble members are b0 and b1.
// * Write four entries e0, e2, .. e3. The distribution algorithm will
// write e0 and e2 to b0, e1 and e3 to b1.
// * e1 completes its write. Bookie b1 crashes. Ensemble changing initiates.
// e0 completes its write but blocked from success due to ensemble changing.
// * Ensemble changing completes. The ledger will unset success and resend
// write requests if needed.
//
// In this scenario, after ensemble changed, all pending entries should proceed
// to success finally. Especially, e1 should unset its success and resend its write.
int numberEntries = 4;

// Trigger e3's sending only after bookie b1 failed.
CompletableFuture<Void> b1FailedFuture = new CompletableFuture<>();

// Intercept ensemble changing and trigger e0's sending so that successfully written
// entries e0, e1 and e3 are blocked by ensemble changing.
CompletableFuture<InvocationOnMock> ensembleChangingFuture = new CompletableFuture<>();

// Start ensemble changing work after e0's write completes.
CompletableFuture<Void> e0WrittenFuture = new CompletableFuture<>();

// Kill bookie b1 after e1 written.
CompletableFuture<Void> e1WrittenFuture = new CompletableFuture<>();

CountDownLatch entriesAddedLatch = new CountDownLatch(numberEntries);

lh = bkc.createLedger(2, 1, digestType, ledgerPassword);

ClientContext clientCtx = lh.clientCtx;
OrderedExecutor executor = clientCtx.getMainWorkerPool();
OrderedExecutor spiedExecutor = spy(executor);

ClientContext spiedClientCtx = spy(clientCtx);
doReturn(spiedExecutor).when(spiedClientCtx).getMainWorkerPool();
Whitebox.setInternalState(lh, "clientCtx", spiedClientCtx);

lh = spy(lh);
doAnswer(invocation -> {
ensembleChangingFuture.complete(invocation);
return null;
}).when(lh).ensembleChangeLoop(any(), any());

doAnswer(invocation -> {
Object callable = invocation.getArgument(1);
if (callable instanceof PendingAddOp) {
PendingAddOp addOp = (PendingAddOp) callable;
if (addOp.entryId == 0) {
ensembleChangingFuture.join();
PendingAddOp spiedAddOp = spy(addOp);
doAnswer(invocation1 -> {
e0WrittenFuture.complete(null);
return invocation1.callRealMethod();
}).when(spiedAddOp).writeComplete(anyInt(), anyLong(), anyLong(), any(), any());
executor.executeOrdered(lh.ledgerId, spiedAddOp);
} else if (addOp.entryId == 3) {
b1FailedFuture.join();
} else if (addOp.entryId == 1) {
PendingAddOp spiedAddOp = spy(addOp);
doAnswer(invocation1 -> {
e1WrittenFuture.complete(null);
return invocation1.callRealMethod();
}).when(spiedAddOp).writeComplete(anyInt(), anyLong(), anyLong(), any(), any());
executor.executeOrdered(lh.ledgerId, spiedAddOp);
}
}
return invocation.callRealMethod();
}).when(spiedExecutor).executeOrdered(anyLong(), any(SafeRunnable.class));

ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < numberEntries; i++) {
entries1.add(new byte[0]);
executorService.execute(() -> {
byte[] bytes = randomPayload();
CompletableFuture<Long> appending = lh.appendAsync(bytes);
long entryId = appending.join();
int entryIndex = (int) entryId;
entries1.set(entryIndex, bytes);
entriesAddedLatch.countDown();
});
}

e1WrittenFuture.join();
BookieId b1 = lh.getCurrentEnsemble().get(1);
killBookie(b1);
b1FailedFuture.complete(null);
e0WrittenFuture.join();
ensembleChangingFuture.join().callRealMethod();

entriesAddedLatch.await();

readEntries(lh, entries1);
}

/**
* Verify asynchronous writing when few bookie failures in last ensemble.
*/
Expand Down Expand Up @@ -1437,7 +1552,7 @@ public void testLedgerMetadataTest() throws Exception {
}

private void readEntries(LedgerHandle lh, List<byte[]> entries) throws InterruptedException, BKException {
ls = lh.readEntries(0, numEntriesToWrite - 1);
ls = lh.readEntries(0, entries.size() - 1);
int index = 0;
while (ls.hasMoreElements()) {
ByteBuffer origbb = ByteBuffer.wrap(entries.get(index++));
Expand Down

0 comments on commit 5c1014a

Please sign in to comment.