From 7f5abd77adf203b9c9d04c34114d848f40dba9b4 Mon Sep 17 00:00:00 2001 From: Rico Chiu Date: Mon, 12 Jun 2023 10:11:23 -0700 Subject: [PATCH] Fix a bug that creates 0 byte block file mistakenly Cherry-pick of existing commit. orig-pr: Alluxio/alluxio#17497 orig-commit: Alluxio/alluxio@812855fe5040ae94bcdfb2ebd7fed233632fcf70 orig-commit-author: Bin Fan pr-link: Alluxio/alluxio#17596 change-id: cid-c10726a1e917ba139168e5b8bcf4c8bc567a2b01 --- .../java/alluxio/worker/block/MonoBlockStore.java | 14 +++++++++----- .../alluxio/underfs/hdfs/HdfsUnderFileSystem.java | 9 +++++++-- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/core/server/worker/src/main/java/alluxio/worker/block/MonoBlockStore.java b/core/server/worker/src/main/java/alluxio/worker/block/MonoBlockStore.java index fcdc8ae68643..5f18e4cac897 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/MonoBlockStore.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/MonoBlockStore.java @@ -187,18 +187,18 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse BlockReader reader = mUnderFileSystemBlockStore.createBlockReader(sessionId, blockId, offset, positionShort, options); BlockReader blockReader = new DelegatingBlockReader(reader, - () -> closeUfsBlock(sessionId, blockId)); + () -> closeUfsBlock(sessionId, blockId, true)); Metrics.WORKER_ACTIVE_CLIENTS.inc(); return blockReader; } catch (Exception e) { try { - closeUfsBlock(sessionId, blockId); + closeUfsBlock(sessionId, blockId, false); } catch (Exception ee) { LOG.warn("Failed to close UFS block", ee); } String errorMessage = format("Failed to read from UFS, sessionId=%d, " + "blockId=%d, offset=%d, positionShort=%s, options=%s: %s", - sessionId, blockId, offset, positionShort, options, e); + sessionId, blockId, offset, positionShort, options, e.toString()); if (e instanceof FileNotFoundException) { throw new NotFoundException(errorMessage, e); } @@ -206,13 +206,17 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse } } - private void closeUfsBlock(long sessionId, long blockId) + private void closeUfsBlock(long sessionId, long blockId, boolean successful) throws IOException { try { mUnderFileSystemBlockStore.closeBlock(sessionId, blockId); Optional tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId); if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) { - commitBlock(sessionId, blockId, false); + if (successful) { + commitBlock(sessionId, blockId, false); + } else { + abortBlock(sessionId, blockId); + } } else { // When getTempBlockMeta() return null, such as a block readType NO_CACHE writeType THROUGH. // Counter will not be decrement in the commitblock(). diff --git a/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java b/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java index 630bb683addf..5acc359f53d8 100755 --- a/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java +++ b/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java @@ -667,7 +667,7 @@ public InputStream open(String path, OpenOptions options) throws IOException { LOG.debug("Using original API to HDFS"); return new HdfsUnderFileInputStream(inputStream); } catch (IOException e) { - LOG.warn("{} try to open {} : {}", retryPolicy.getAttemptCount(), path, e.toString()); + LOG.debug("{} try to open {} : {}", retryPolicy.getAttemptCount(), path, e.toString()); te = e; if (options.getRecoverFailedOpen() && dfs != null && e.getMessage().toLowerCase() .startsWith("cannot obtain block length for")) { @@ -694,7 +694,12 @@ public InputStream open(String path, OpenOptions options) throws IOException { } } } - throw te; + if (te != null) { + LOG.error("{} failed attempts to open \"{}\" with last error:", + retryPolicy.getAttemptCount(), path, te); + throw te; + } + throw new IllegalStateException("Exceeded the number of retry attempts with no exception"); } @Override