Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #3005: Fix ack broken entry succeed in ensemble change unsetting #3041

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2015,6 +2015,20 @@ void unsetSuccessAndSendWriteRequest(List<BookieId> ensemble, final Set<Integer>
pendingAddOp.unsetSuccessAndSendWriteRequest(ensemble, bookieIndex);
}
}
// Some entries could fulfill ack requirement before and after ensemble changed.
// We need to invoke #sendAddSuccessCallbacks() for such entries because
// they may have already completed, but they are just waiting for the ensemble
// change to complete.
// E.g.
// ensemble (A, B, C, D), entry k is written to (A, B, D). An ensemble change
// happens to replace C with E. Entry k does not complete until C is
// replaced with E successfully. When the ensemble change completes, it tries
// to unset entry k. C however is not in k's write set, so no entry is written
// again, and no one triggers #sendAddSuccessCallbacks. Consequently, k never
// completes.
//
// We call sendAddSuccessCallback after unsetting all pending adds to cover this case.
sendAddSuccessCallbacks();
Copy link
Member

@StevenLuMT StevenLuMT Jul 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for multiple pendingAddOps requests, just call sendAddSuccessCallbacks once time?
is it right? @kezhuw

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sendAddSuccessCallbacks has loop itself, it is unnecessary to call it multiple times after we unset all pending operations.

The keypoint this pr trying to solve is that eager sendAddSuccessCallbacks per pendingAddOp could make later pendingAddOps, which have not been unset, complete spuriously.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok,nice

}

void registerOperationFailureOnBookie(BookieId bookie, long entryId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,24 +196,11 @@ void unsetSuccessAndSendWriteRequest(List<BookieId> ensemble, int bookieIndex) {
// now
return;
}
// Suppose that unset doesn't happen on the write set of an entry. In this
// case we don't need to resend the write request upon an ensemble change.
// We do need to invoke #sendAddSuccessCallbacks() for such entries because
// they may have already completed, but they are just waiting for the ensemble
// to change.
// E.g.
// ensemble (A, B, C, D), entry k is written to (A, B, D). An ensemble change
// happens to replace C with E. Entry k does not complete until C is
// replaced with E successfully. When the ensemble change completes, it tries
// to unset entry k. C however is not in k's write set, so no entry is written
// again, and no one triggers #sendAddSuccessCallbacks. Consequently, k never
// completes.
//
// We call sendAddSuccessCallback when unsetting t cover this case.
DistributionSchedule.WriteSet writeSet = lh.distributionSchedule.getWriteSet(entryId);
try {
if (!writeSet.contains(bookieIndex)) {
lh.sendAddSuccessCallbacks();
// This bookie doesn't belong to the write set of this entry. In this
// case we don't need to resend the write request upon an ensemble change.
return;
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

import com.google.common.collect.Lists;
import io.netty.buffer.AbstractByteBufAllocator;
Expand All @@ -48,6 +54,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.bookkeeper.bookie.BookieException;
Expand All @@ -58,6 +66,8 @@
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.api.WriteAdvHandle;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerMetadataSerDe;
import org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory;
Expand All @@ -68,6 +78,7 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -127,6 +138,13 @@ public BookieWriteLedgerTest() {
baseClientConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
}

private byte[] randomPayload() {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
return entry.array();
}

/**
* Verify write when few bookie failures in last ensemble and forcing
* ensemble reformation.
Expand Down Expand Up @@ -695,7 +713,6 @@ public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries1.add(entry.array());
try {
lh.addEntry(3, entry.array());
fail("AddEntry is called after the close of LedgerHandle,"
Expand Down Expand Up @@ -849,6 +866,104 @@ public void testLedgerCreateAdvWithLedgerIdInLoop2() throws Exception {
}
}

@Test
public void testWrittenEntriesBlockedByEnsembleChanging() throws Throwable {
// This test constructs a scenario:
// * Ensemble size is two, write/ack quorum size is one. Let's assume
// initial ensemble members are b0 and b1.
// * Write four entries e0, e2, .. e3. The distribution algorithm will
// write e0 and e2 to b0, e1 and e3 to b1.
// * e1 completes its write. Bookie b1 crashes. Ensemble changing initiates.
// e0 completes its write but blocked from success due to ensemble changing.
// * Ensemble changing completes. The ledger will unset success and resend
// write requests if needed.
//
// In this scenario, after ensemble changed, all pending entries should proceed
// to success finally. Especially, e1 should unset its success and resend its write.
int numberEntries = 4;

// Trigger e3's sending only after bookie b1 failed.
CompletableFuture<Void> b1FailedFuture = new CompletableFuture<>();

// Intercept ensemble changing and trigger e0's sending so that successfully written
// entries e0, e1 and e3 are blocked by ensemble changing.
CompletableFuture<InvocationOnMock> ensembleChangingFuture = new CompletableFuture<>();

// Start ensemble changing work after e0's write completes.
CompletableFuture<Void> e0WrittenFuture = new CompletableFuture<>();

// Kill bookie b1 after e1 written.
CompletableFuture<Void> e1WrittenFuture = new CompletableFuture<>();

CountDownLatch entriesAddedLatch = new CountDownLatch(numberEntries);

lh = bkc.createLedger(2, 1, digestType, ledgerPassword);

ClientContext clientCtx = lh.clientCtx;
OrderedExecutor executor = clientCtx.getMainWorkerPool();
OrderedExecutor spiedExecutor = spy(executor);

ClientContext spiedClientCtx = spy(clientCtx);
doReturn(spiedExecutor).when(spiedClientCtx).getMainWorkerPool();
Whitebox.setInternalState(lh, "clientCtx", spiedClientCtx);

lh = spy(lh);
doAnswer(invocation -> {
ensembleChangingFuture.complete(invocation);
return null;
}).when(lh).ensembleChangeLoop(any(), any());

doAnswer(invocation -> {
Object callable = invocation.getArgument(1);
if (callable instanceof PendingAddOp) {
PendingAddOp addOp = (PendingAddOp) callable;
if (addOp.entryId == 0) {
ensembleChangingFuture.join();
PendingAddOp spiedAddOp = spy(addOp);
doAnswer(invocation1 -> {
e0WrittenFuture.complete(null);
return invocation1.callRealMethod();
}).when(spiedAddOp).writeComplete(anyInt(), anyLong(), anyLong(), any(), any());
executor.executeOrdered(lh.ledgerId, spiedAddOp);
} else if (addOp.entryId == 3) {
b1FailedFuture.join();
} else if (addOp.entryId == 1) {
PendingAddOp spiedAddOp = spy(addOp);
doAnswer(invocation1 -> {
e1WrittenFuture.complete(null);
return invocation1.callRealMethod();
}).when(spiedAddOp).writeComplete(anyInt(), anyLong(), anyLong(), any(), any());
executor.executeOrdered(lh.ledgerId, spiedAddOp);
}
}
return invocation.callRealMethod();
}).when(spiedExecutor).executeOrdered(anyLong(), any(SafeRunnable.class));

ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < numberEntries; i++) {
entries1.add(new byte[0]);
executorService.execute(() -> {
byte[] bytes = randomPayload();
CompletableFuture<Long> appending = lh.appendAsync(bytes);
long entryId = appending.join();
int entryIndex = (int) entryId;
entries1.set(entryIndex, bytes);
entriesAddedLatch.countDown();
});
}

e1WrittenFuture.join();
BookieId b1 = lh.getCurrentEnsemble().get(1);
killBookie(b1);
b1FailedFuture.complete(null);
e0WrittenFuture.join();
ensembleChangingFuture.join().callRealMethod();

entriesAddedLatch.await();

readEntries(lh, entries1);
}

/**
* Verify asynchronous writing when few bookie failures in last ensemble.
*/
Expand Down Expand Up @@ -1437,7 +1552,7 @@ public void testLedgerMetadataTest() throws Exception {
}

private void readEntries(LedgerHandle lh, List<byte[]> entries) throws InterruptedException, BKException {
ls = lh.readEntries(0, numEntriesToWrite - 1);
ls = lh.readEntries(0, entries.size() - 1);
int index = 0;
while (ls.hasMoreElements()) {
ByteBuffer origbb = ByteBuffer.wrap(entries.get(index++));
Expand Down