diff --git a/core/server/worker/src/main/java/alluxio/worker/block/MonoBlockStore.java b/core/server/worker/src/main/java/alluxio/worker/block/MonoBlockStore.java index 26b2757c885c..3b2f4a17e568 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/MonoBlockStore.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/MonoBlockStore.java @@ -183,18 +183,18 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse BlockReader reader = mUnderFileSystemBlockStore.createBlockReader(sessionId, blockId, offset, positionShort, options); BlockReader blockReader = new DelegatingBlockReader(reader, - () -> closeUfsBlock(sessionId, blockId)); + () -> closeUfsBlock(sessionId, blockId, true)); Metrics.WORKER_ACTIVE_CLIENTS.inc(); return blockReader; } catch (Exception e) { try { - closeUfsBlock(sessionId, blockId); + closeUfsBlock(sessionId, blockId, false); } catch (Exception ee) { LOG.warn("Failed to close UFS block", ee); } String errorMessage = format("Failed to read from UFS, sessionId=%d, " + "blockId=%d, offset=%d, positionShort=%s, options=%s: %s", - sessionId, blockId, offset, positionShort, options, e); + sessionId, blockId, offset, positionShort, options, e.toString()); if (e instanceof FileNotFoundException) { throw new NotFoundException(errorMessage, e); } @@ -202,13 +202,17 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse } } - private void closeUfsBlock(long sessionId, long blockId) + private void closeUfsBlock(long sessionId, long blockId, boolean successful) throws IOException { try { mUnderFileSystemBlockStore.closeBlock(sessionId, blockId); Optional tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId); if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) { - commitBlock(sessionId, blockId, false); + if (successful) { + commitBlock(sessionId, blockId, false); + } else { + abortBlock(sessionId, blockId); + } } else { // When getTempBlockMeta() return null, such as a block readType NO_CACHE writeType THROUGH. // Counter will not be decrement in the commitblock(). diff --git a/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java b/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java index 2c876fa0d979..aa672d4dd248 100755 --- a/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java +++ b/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java @@ -684,7 +684,7 @@ public InputStream open(String path, OpenOptions options) throws IOException { LOG.debug("Using original API to HDFS"); return new HdfsUnderFileInputStream(inputStream); } catch (IOException e) { - LOG.warn("{} try to open {} : {}", retryPolicy.getAttemptCount(), path, e.toString()); + LOG.debug("{} try to open {} : {}", retryPolicy.getAttemptCount(), path, e.toString()); te = e; if (options.getRecoverFailedOpen() && dfs != null && e.getMessage().toLowerCase() .startsWith("cannot obtain block length for")) { @@ -711,7 +711,12 @@ public InputStream open(String path, OpenOptions options) throws IOException { } } } - throw te; + if (te != null) { + LOG.error("{} failed attempts to open \"{}\" with last error:", + retryPolicy.getAttemptCount(), path, te); + throw te; + } + throw new IllegalStateException("Exceeded the number of retry attempts with no exception"); } @Override