Skip to content

Commit

Permalink
Fix data corrupted issue when loading from UFS
Browse files Browse the repository at this point in the history
Cherry-pick of existing commit.
orig-pr: #18525
orig-commit: 6507cf8
orig-commit-author: jja725 <jja725@gmail.com>

			pr-link: #18546
			change-id: cid-bbba0feb29231e70750e5e79da5f405bb591d47a
  • Loading branch information
jja725 authored Mar 12, 2024
1 parent c699c63 commit cbbd5b9
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import alluxio.util.network.NetworkAddressUtils;
import alluxio.worker.block.io.BlockReader;
import alluxio.worker.block.io.BlockWriter;
import alluxio.worker.block.meta.TempBlockMeta;

import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -38,6 +39,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -268,8 +270,8 @@ private CacheResult cacheBlock(CacheRequest request) throws IOException, Alluxio
*/
private CacheResult cacheBlockFromUfs(long blockId, long blockSize,
Protocol.OpenUfsBlockOptions openUfsBlockOptions) throws IOException {
try (BlockReader reader = mBlockWorker.createUfsBlockReader(
Sessions.CACHE_UFS_SESSION_ID, blockId, 0, false, openUfsBlockOptions)) {
try (BlockReader reader = mBlockWorker.createUfsBlockReader(Sessions.CACHE_UFS_SESSION_ID,
blockId, 0, false, openUfsBlockOptions)) {
// Read the entire block, caching to block store will be handled internally in UFS block store
// when close the reader.
// Note that, we read from UFS with a smaller buffer to avoid high pressure on heap
Expand All @@ -280,10 +282,26 @@ private CacheResult cacheBlockFromUfs(long blockId, long blockSize,
reader.read(offset, bufferSize);
offset += bufferSize;
}
if (isTempBlockMetaPresent(blockId)) {
mBlockWorker.commitBlock(Sessions.CACHE_UFS_SESSION_ID, blockId, false);
}
} catch (Exception e) {
LOG.warn("Failed to async cache block {} from UFS on reading the block:", blockId, e);
if (isTempBlockMetaPresent(blockId)) {
mBlockWorker.abortBlock(Sessions.CACHE_UFS_SESSION_ID, blockId);
}
return CacheResult.FAILED;
}
return CacheResult.SUCCEED;
}

private boolean isTempBlockMetaPresent(long blockId) {
Optional<TempBlockMeta> tempBlockMeta =
mBlockWorker.getBlockStore().getTempBlockMeta(blockId);
return tempBlockMeta.isPresent()
&& tempBlockMeta.get().getSessionId() == Sessions.CACHE_UFS_SESSION_ID;
}

/**
* Caches the block at best effort from a remote worker (possibly from UFS indirectly).
*
Expand Down Expand Up @@ -313,7 +331,7 @@ private CacheResult cacheBlockFromRemoteWorker(long blockId, long blockSize,
return CacheResult.SUCCEED;
} catch (IllegalStateException | IOException e) {
LOG.warn("Failed to async cache block {} from remote worker ({}) on copying the block: {}",
blockId, sourceAddress, e.toString());
blockId, sourceAddress, e);
try {
mBlockWorker.abortBlock(Sessions.CACHE_WORKER_SESSION_ID, blockId);
} catch (IOException ee) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,16 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse
BlockReader reader = mUnderFileSystemBlockStore.createBlockReader(sessionId, blockId, offset,
positionShort, options);
BlockReader blockReader = new DelegatingBlockReader(reader,
() -> closeUfsBlock(sessionId, blockId, true));
() -> closeUfsBlock(sessionId, blockId));
Metrics.WORKER_ACTIVE_CLIENTS.inc();
return blockReader;
} catch (Exception e) {
try {
closeUfsBlock(sessionId, blockId, false);
Optional<TempBlockMeta> tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId);
if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) {
abortBlock(sessionId, blockId);
}
closeUfsBlock(sessionId, blockId);
} catch (Exception ee) {
LOG.warn("Failed to close UFS block", ee);
}
Expand All @@ -206,24 +210,16 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse
}
}

private void closeUfsBlock(long sessionId, long blockId, boolean successful)
private void closeUfsBlock(long sessionId, long blockId)
throws IOException {
try {
mUnderFileSystemBlockStore.closeBlock(sessionId, blockId);
Optional<TempBlockMeta> tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId);
if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) {
if (successful) {
commitBlock(sessionId, blockId, false);
} else {
abortBlock(sessionId, blockId);
}
} else {
if (!mLocalBlockStore.hasTempBlockMeta(blockId) && mUnderFileSystemBlockStore.isNoCache(
sessionId, blockId)) {
// When getTempBlockMeta() return null, such as a block readType NO_CACHE writeType THROUGH.
// Counter will not be decrement in the commitblock().
// So we should decrement counter here.
if (mUnderFileSystemBlockStore.isNoCache(sessionId, blockId)) {
DefaultBlockWorker.Metrics.WORKER_ACTIVE_CLIENTS.dec();
}
Metrics.WORKER_ACTIVE_CLIENTS.dec();
}
} finally {
mUnderFileSystemBlockStore.releaseAccess(sessionId, blockId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import alluxio.worker.block.BlockStoreType;
import alluxio.worker.block.DefaultBlockWorker;
import alluxio.worker.block.io.BlockReader;
import alluxio.worker.block.meta.TempBlockMeta;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
Expand All @@ -55,6 +56,7 @@
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
Expand Down Expand Up @@ -464,14 +466,15 @@ private void runInternal() {
}
if (error != null) {
try {
completeRequest(mContext);
completeRequest(mContext, false);
} catch (Exception e) {
LOG.error("Failed to close the request.", e);
}
replyError(error);
} else if (eof || cancel) {
try {
completeRequest(mContext);
boolean success = !cancel;
completeRequest(mContext, success);
} catch (Exception e) {
LogUtils.warnWithException(LOG, "Exception occurred while completing read request, "
+ "EOF/CANCEL sessionId: {}. {}", mContext.getRequest().getSessionId(),
Expand All @@ -491,12 +494,33 @@ private void runInternal() {
* state it may have accumulated.
*
* @param context context of the request to complete
* @param success whether the request was successful
*/
private void completeRequest(BlockReadRequestContext context) throws Exception {
private void completeRequest(BlockReadRequestContext context, boolean success)
throws Exception {
BlockReader reader = context.getBlockReader();
long sessionId = context.getRequest().getSessionId();
long blockId = context.getRequest().getId();
try {
if (reader != null) {
reader.close();
if (!success) {
Optional<TempBlockMeta> tempBlockMeta =
mWorker.getBlockStore().getTempBlockMeta(blockId);
// we need to cancel the block at the final step here to avoid mistakenly commit block.
if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) {
mWorker.abortBlock(sessionId, blockId);
}
reader.close();
}
else {
// have to close the reader here to avoid commit block if the block is not fully read.
reader.close();
Optional<TempBlockMeta> tempBlockMeta =
mWorker.getBlockStore().getTempBlockMeta(blockId);
if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) {
mWorker.commitBlock(sessionId, blockId, false);
}
}
}
} finally {
context.setBlockReader(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ public void getFallBackUfsReader() throws Exception {

// read a whole block
assertArrayEquals(data, reader.read(0, ufsBlockSize).array());
mBlockWorker.commitBlock(sessionId, blockId, false);
reader.close();

// after closing, the ufs block should be cached locally
Expand Down
2 changes: 2 additions & 0 deletions tests/src/test/java/alluxio/client/fs/TtlIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ public void expireManyAfterFree() throws Exception {
}
CommonUtils.sleepMs(2 * TTL_INTERVAL_MS);
HeartbeatScheduler.execute(HeartbeatContext.MASTER_TTL_CHECK);
// Sleep for a while to make sure the delete operations are done.
CommonUtils.sleepMs(4 * TTL_INTERVAL_MS);
for (int i = 0; i < numFiles; i++) {
if (i % 2 == 0) {
if (i % 20 != 0) {
Expand Down

0 comments on commit cbbd5b9

Please sign in to comment.