From cbbd5b9a57325a9da6686a4138bb48509a57a3ec Mon Sep 17 00:00:00 2001 From: jja725 Date: Tue, 12 Mar 2024 16:50:56 -0700 Subject: [PATCH] Fix data corrupted issue when loading from UFS Cherry-pick of existing commit. orig-pr: Alluxio/alluxio#18525 orig-commit: Alluxio/alluxio@6507cf8ecd2b5f91c4c7b1698b525d3482bed246 orig-commit-author: jja725 pr-link: Alluxio/alluxio#18546 change-id: cid-bbba0feb29231e70750e5e79da5f405bb591d47a --- .../worker/block/CacheRequestManager.java | 24 ++++++++++++-- .../alluxio/worker/block/MonoBlockStore.java | 24 ++++++-------- .../alluxio/worker/grpc/BlockReadHandler.java | 32 ++++++++++++++++--- .../worker/block/DefaultBlockWorkerTest.java | 1 + .../alluxio/client/fs/TtlIntegrationTest.java | 2 ++ 5 files changed, 62 insertions(+), 21 deletions(-) diff --git a/core/server/worker/src/main/java/alluxio/worker/block/CacheRequestManager.java b/core/server/worker/src/main/java/alluxio/worker/block/CacheRequestManager.java index 464feabc2356..69ea1b6adfc9 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/CacheRequestManager.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/CacheRequestManager.java @@ -29,6 +29,7 @@ import alluxio.util.network.NetworkAddressUtils; import alluxio.worker.block.io.BlockReader; import alluxio.worker.block.io.BlockWriter; +import alluxio.worker.block.meta.TempBlockMeta; import com.codahale.metrics.Counter; import com.google.common.annotations.VisibleForTesting; @@ -38,6 +39,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -268,8 +270,8 @@ private CacheResult cacheBlock(CacheRequest request) throws IOException, Alluxio */ private CacheResult cacheBlockFromUfs(long blockId, long blockSize, Protocol.OpenUfsBlockOptions openUfsBlockOptions) throws IOException { - try (BlockReader reader = mBlockWorker.createUfsBlockReader( - Sessions.CACHE_UFS_SESSION_ID, blockId, 0, false, openUfsBlockOptions)) { + try (BlockReader reader = mBlockWorker.createUfsBlockReader(Sessions.CACHE_UFS_SESSION_ID, + blockId, 0, false, openUfsBlockOptions)) { // Read the entire block, caching to block store will be handled internally in UFS block store // when close the reader. // Note that, we read from UFS with a smaller buffer to avoid high pressure on heap @@ -280,10 +282,26 @@ private CacheResult cacheBlockFromUfs(long blockId, long blockSize, reader.read(offset, bufferSize); offset += bufferSize; } + if (isTempBlockMetaPresent(blockId)) { + mBlockWorker.commitBlock(Sessions.CACHE_UFS_SESSION_ID, blockId, false); + } + } catch (Exception e) { + LOG.warn("Failed to async cache block {} from UFS on reading the block:", blockId, e); + if (isTempBlockMetaPresent(blockId)) { + mBlockWorker.abortBlock(Sessions.CACHE_UFS_SESSION_ID, blockId); + } + return CacheResult.FAILED; } return CacheResult.SUCCEED; } + private boolean isTempBlockMetaPresent(long blockId) { + Optional tempBlockMeta = + mBlockWorker.getBlockStore().getTempBlockMeta(blockId); + return tempBlockMeta.isPresent() + && tempBlockMeta.get().getSessionId() == Sessions.CACHE_UFS_SESSION_ID; + } + /** * Caches the block at best effort from a remote worker (possibly from UFS indirectly). * @@ -313,7 +331,7 @@ private CacheResult cacheBlockFromRemoteWorker(long blockId, long blockSize, return CacheResult.SUCCEED; } catch (IllegalStateException | IOException e) { LOG.warn("Failed to async cache block {} from remote worker ({}) on copying the block: {}", - blockId, sourceAddress, e.toString()); + blockId, sourceAddress, e); try { mBlockWorker.abortBlock(Sessions.CACHE_WORKER_SESSION_ID, blockId); } catch (IOException ee) { diff --git a/core/server/worker/src/main/java/alluxio/worker/block/MonoBlockStore.java b/core/server/worker/src/main/java/alluxio/worker/block/MonoBlockStore.java index 5f18e4cac897..d3d091b197a5 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/MonoBlockStore.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/MonoBlockStore.java @@ -187,12 +187,16 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse BlockReader reader = mUnderFileSystemBlockStore.createBlockReader(sessionId, blockId, offset, positionShort, options); BlockReader blockReader = new DelegatingBlockReader(reader, - () -> closeUfsBlock(sessionId, blockId, true)); + () -> closeUfsBlock(sessionId, blockId)); Metrics.WORKER_ACTIVE_CLIENTS.inc(); return blockReader; } catch (Exception e) { try { - closeUfsBlock(sessionId, blockId, false); + Optional tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId); + if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) { + abortBlock(sessionId, blockId); + } + closeUfsBlock(sessionId, blockId); } catch (Exception ee) { LOG.warn("Failed to close UFS block", ee); } @@ -206,24 +210,16 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse } } - private void closeUfsBlock(long sessionId, long blockId, boolean successful) + private void closeUfsBlock(long sessionId, long blockId) throws IOException { try { mUnderFileSystemBlockStore.closeBlock(sessionId, blockId); - Optional tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId); - if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) { - if (successful) { - commitBlock(sessionId, blockId, false); - } else { - abortBlock(sessionId, blockId); - } - } else { + if (!mLocalBlockStore.hasTempBlockMeta(blockId) && mUnderFileSystemBlockStore.isNoCache( + sessionId, blockId)) { // When getTempBlockMeta() return null, such as a block readType NO_CACHE writeType THROUGH. // Counter will not be decrement in the commitblock(). // So we should decrement counter here. - if (mUnderFileSystemBlockStore.isNoCache(sessionId, blockId)) { - DefaultBlockWorker.Metrics.WORKER_ACTIVE_CLIENTS.dec(); - } + Metrics.WORKER_ACTIVE_CLIENTS.dec(); } } finally { mUnderFileSystemBlockStore.releaseAccess(sessionId, blockId); diff --git a/core/server/worker/src/main/java/alluxio/worker/grpc/BlockReadHandler.java b/core/server/worker/src/main/java/alluxio/worker/grpc/BlockReadHandler.java index bf0cf4c5603e..3e5abac7edc5 100644 --- a/core/server/worker/src/main/java/alluxio/worker/grpc/BlockReadHandler.java +++ b/core/server/worker/src/main/java/alluxio/worker/grpc/BlockReadHandler.java @@ -38,6 +38,7 @@ import alluxio.worker.block.BlockStoreType; import alluxio.worker.block.DefaultBlockWorker; import alluxio.worker.block.io.BlockReader; +import alluxio.worker.block.meta.TempBlockMeta; import com.codahale.metrics.Counter; import com.codahale.metrics.Meter; @@ -55,6 +56,7 @@ import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; +import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; @@ -464,14 +466,15 @@ private void runInternal() { } if (error != null) { try { - completeRequest(mContext); + completeRequest(mContext, false); } catch (Exception e) { LOG.error("Failed to close the request.", e); } replyError(error); } else if (eof || cancel) { try { - completeRequest(mContext); + boolean success = !cancel; + completeRequest(mContext, success); } catch (Exception e) { LogUtils.warnWithException(LOG, "Exception occurred while completing read request, " + "EOF/CANCEL sessionId: {}. {}", mContext.getRequest().getSessionId(), @@ -491,12 +494,33 @@ private void runInternal() { * state it may have accumulated. * * @param context context of the request to complete + * @param success whether the request was successful */ - private void completeRequest(BlockReadRequestContext context) throws Exception { + private void completeRequest(BlockReadRequestContext context, boolean success) + throws Exception { BlockReader reader = context.getBlockReader(); + long sessionId = context.getRequest().getSessionId(); + long blockId = context.getRequest().getId(); try { if (reader != null) { - reader.close(); + if (!success) { + Optional tempBlockMeta = + mWorker.getBlockStore().getTempBlockMeta(blockId); + // we need to cancel the block at the final step here to avoid mistakenly commit block. + if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) { + mWorker.abortBlock(sessionId, blockId); + } + reader.close(); + } + else { + // have to close the reader here to avoid commit block if the block is not fully read. + reader.close(); + Optional tempBlockMeta = + mWorker.getBlockStore().getTempBlockMeta(blockId); + if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) { + mWorker.commitBlock(sessionId, blockId, false); + } + } } } finally { context.setBlockReader(null); diff --git a/core/server/worker/src/test/java/alluxio/worker/block/DefaultBlockWorkerTest.java b/core/server/worker/src/test/java/alluxio/worker/block/DefaultBlockWorkerTest.java index fb33d9c8be1f..28540c0e100a 100644 --- a/core/server/worker/src/test/java/alluxio/worker/block/DefaultBlockWorkerTest.java +++ b/core/server/worker/src/test/java/alluxio/worker/block/DefaultBlockWorkerTest.java @@ -425,6 +425,7 @@ public void getFallBackUfsReader() throws Exception { // read a whole block assertArrayEquals(data, reader.read(0, ufsBlockSize).array()); + mBlockWorker.commitBlock(sessionId, blockId, false); reader.close(); // after closing, the ufs block should be cached locally diff --git a/tests/src/test/java/alluxio/client/fs/TtlIntegrationTest.java b/tests/src/test/java/alluxio/client/fs/TtlIntegrationTest.java index ec66859f78e5..cecb1d2ebaf6 100644 --- a/tests/src/test/java/alluxio/client/fs/TtlIntegrationTest.java +++ b/tests/src/test/java/alluxio/client/fs/TtlIntegrationTest.java @@ -148,6 +148,8 @@ public void expireManyAfterFree() throws Exception { } CommonUtils.sleepMs(2 * TTL_INTERVAL_MS); HeartbeatScheduler.execute(HeartbeatContext.MASTER_TTL_CHECK); + // Sleep for a while to make sure the delete operations are done. + CommonUtils.sleepMs(4 * TTL_INTERVAL_MS); for (int i = 0; i < numFiles; i++) { if (i % 2 == 0) { if (i % 20 != 0) {