Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data corruption, Port [#18525] to branch-2.10 #18546

Merged
merged 2 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import alluxio.util.network.NetworkAddressUtils;
import alluxio.worker.block.io.BlockReader;
import alluxio.worker.block.io.BlockWriter;
import alluxio.worker.block.meta.TempBlockMeta;

import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -38,6 +39,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -268,8 +270,8 @@ private CacheResult cacheBlock(CacheRequest request) throws IOException, Alluxio
*/
private CacheResult cacheBlockFromUfs(long blockId, long blockSize,
Protocol.OpenUfsBlockOptions openUfsBlockOptions) throws IOException {
try (BlockReader reader = mBlockWorker.createUfsBlockReader(
Sessions.CACHE_UFS_SESSION_ID, blockId, 0, false, openUfsBlockOptions)) {
try (BlockReader reader = mBlockWorker.createUfsBlockReader(Sessions.CACHE_UFS_SESSION_ID,
blockId, 0, false, openUfsBlockOptions)) {
// Read the entire block, caching to block store will be handled internally in UFS block store
// when close the reader.
// Note that, we read from UFS with a smaller buffer to avoid high pressure on heap
Expand All @@ -280,10 +282,26 @@ private CacheResult cacheBlockFromUfs(long blockId, long blockSize,
reader.read(offset, bufferSize);
offset += bufferSize;
}
if (isTempBlockMetaPresent(blockId)) {
mBlockWorker.commitBlock(Sessions.CACHE_UFS_SESSION_ID, blockId, false);
}
} catch (Exception e) {
LOG.warn("Failed to async cache block {} from UFS on reading the block:", blockId, e);
if (isTempBlockMetaPresent(blockId)) {
mBlockWorker.abortBlock(Sessions.CACHE_UFS_SESSION_ID, blockId);
}
return CacheResult.FAILED;
}
return CacheResult.SUCCEED;
}

private boolean isTempBlockMetaPresent(long blockId) {
Optional<TempBlockMeta> tempBlockMeta =
mBlockWorker.getBlockStore().getTempBlockMeta(blockId);
return tempBlockMeta.isPresent()
&& tempBlockMeta.get().getSessionId() == Sessions.CACHE_UFS_SESSION_ID;
}

/**
* Caches the block at best effort from a remote worker (possibly from UFS indirectly).
*
Expand Down Expand Up @@ -313,7 +331,7 @@ private CacheResult cacheBlockFromRemoteWorker(long blockId, long blockSize,
return CacheResult.SUCCEED;
} catch (IllegalStateException | IOException e) {
LOG.warn("Failed to async cache block {} from remote worker ({}) on copying the block: {}",
blockId, sourceAddress, e.toString());
blockId, sourceAddress, e);
try {
mBlockWorker.abortBlock(Sessions.CACHE_WORKER_SESSION_ID, blockId);
} catch (IOException ee) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,16 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse
BlockReader reader = mUnderFileSystemBlockStore.createBlockReader(sessionId, blockId, offset,
positionShort, options);
BlockReader blockReader = new DelegatingBlockReader(reader,
() -> closeUfsBlock(sessionId, blockId, true));
() -> closeUfsBlock(sessionId, blockId));
Metrics.WORKER_ACTIVE_CLIENTS.inc();
return blockReader;
} catch (Exception e) {
try {
closeUfsBlock(sessionId, blockId, false);
Optional<TempBlockMeta> tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId);
if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) {
abortBlock(sessionId, blockId);
}
closeUfsBlock(sessionId, blockId);
} catch (Exception ee) {
LOG.warn("Failed to close UFS block", ee);
}
Expand All @@ -206,24 +210,16 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse
}
}

private void closeUfsBlock(long sessionId, long blockId, boolean successful)
private void closeUfsBlock(long sessionId, long blockId)
throws IOException {
try {
mUnderFileSystemBlockStore.closeBlock(sessionId, blockId);
Optional<TempBlockMeta> tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId);
if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) {
if (successful) {
commitBlock(sessionId, blockId, false);
} else {
abortBlock(sessionId, blockId);
}
} else {
if (!mLocalBlockStore.hasTempBlockMeta(blockId) && mUnderFileSystemBlockStore.isNoCache(
sessionId, blockId)) {
// When getTempBlockMeta() return null, such as a block readType NO_CACHE writeType THROUGH.
// Counter will not be decrement in the commitblock().
// So we should decrement counter here.
if (mUnderFileSystemBlockStore.isNoCache(sessionId, blockId)) {
DefaultBlockWorker.Metrics.WORKER_ACTIVE_CLIENTS.dec();
}
Metrics.WORKER_ACTIVE_CLIENTS.dec();
}
} finally {
mUnderFileSystemBlockStore.releaseAccess(sessionId, blockId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import alluxio.worker.block.BlockStoreType;
import alluxio.worker.block.DefaultBlockWorker;
import alluxio.worker.block.io.BlockReader;
import alluxio.worker.block.meta.TempBlockMeta;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
Expand All @@ -55,6 +56,7 @@
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
Expand Down Expand Up @@ -464,14 +466,15 @@ private void runInternal() {
}
if (error != null) {
try {
completeRequest(mContext);
completeRequest(mContext, false);
} catch (Exception e) {
LOG.error("Failed to close the request.", e);
}
replyError(error);
} else if (eof || cancel) {
try {
completeRequest(mContext);
boolean success = !cancel;
completeRequest(mContext, success);
} catch (Exception e) {
LogUtils.warnWithException(LOG, "Exception occurred while completing read request, "
+ "EOF/CANCEL sessionId: {}. {}", mContext.getRequest().getSessionId(),
Expand All @@ -491,12 +494,33 @@ private void runInternal() {
* state it may have accumulated.
*
* @param context context of the request to complete
* @param success whether the request was successful
*/
private void completeRequest(BlockReadRequestContext context) throws Exception {
private void completeRequest(BlockReadRequestContext context, boolean success)
throws Exception {
BlockReader reader = context.getBlockReader();
long sessionId = context.getRequest().getSessionId();
long blockId = context.getRequest().getId();
try {
if (reader != null) {
reader.close();
if (!success) {
Optional<TempBlockMeta> tempBlockMeta =
mWorker.getBlockStore().getTempBlockMeta(blockId);
// we need to cancel the block at the final step here to avoid mistakenly commit block.
if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) {
mWorker.abortBlock(sessionId, blockId);
}
reader.close();
}
else {
// have to close the reader here to avoid commit block if the block is not fully read.
reader.close();
Optional<TempBlockMeta> tempBlockMeta =
mWorker.getBlockStore().getTempBlockMeta(blockId);
if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) {
mWorker.commitBlock(sessionId, blockId, false);
}
}
}
} finally {
context.setBlockReader(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ public void getFallBackUfsReader() throws Exception {

// read a whole block
assertArrayEquals(data, reader.read(0, ufsBlockSize).array());
mBlockWorker.commitBlock(sessionId, blockId, false);
reader.close();

// after closing, the ufs block should be cached locally
Expand Down
2 changes: 2 additions & 0 deletions tests/src/test/java/alluxio/client/fs/TtlIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ public void expireManyAfterFree() throws Exception {
}
CommonUtils.sleepMs(2 * TTL_INTERVAL_MS);
HeartbeatScheduler.execute(HeartbeatContext.MASTER_TTL_CHECK);
// Sleep for a while to make sure the delete operations are done.
CommonUtils.sleepMs(4 * TTL_INTERVAL_MS);
for (int i = 0; i < numFiles; i++) {
if (i % 2 == 0) {
if (i % 20 != 0) {
Expand Down
Loading