Skip to content

Commit

Permalink
Fix a bug that creates 0 byte block file mistakenly
Browse files Browse the repository at this point in the history
Cherry-pick of existing commit.
orig-pr: #17497
orig-commit: 812855f
orig-commit-author: Bin Fan <fanbin103@gmail.com>
			pr-link: #17596
			change-id: cid-c10726a1e917ba139168e5b8bcf4c8bc567a2b01
  • Loading branch information
Xenorith authored Jun 12, 2023
1 parent de322c3 commit 7f5abd7
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,32 +187,36 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse
BlockReader reader = mUnderFileSystemBlockStore.createBlockReader(sessionId, blockId, offset,
positionShort, options);
BlockReader blockReader = new DelegatingBlockReader(reader,
() -> closeUfsBlock(sessionId, blockId));
() -> closeUfsBlock(sessionId, blockId, true));
Metrics.WORKER_ACTIVE_CLIENTS.inc();
return blockReader;
} catch (Exception e) {
try {
closeUfsBlock(sessionId, blockId);
closeUfsBlock(sessionId, blockId, false);
} catch (Exception ee) {
LOG.warn("Failed to close UFS block", ee);
}
String errorMessage = format("Failed to read from UFS, sessionId=%d, "
+ "blockId=%d, offset=%d, positionShort=%s, options=%s: %s",
sessionId, blockId, offset, positionShort, options, e);
sessionId, blockId, offset, positionShort, options, e.toString());
if (e instanceof FileNotFoundException) {
throw new NotFoundException(errorMessage, e);
}
throw new UnavailableException(errorMessage, e);
}
}

private void closeUfsBlock(long sessionId, long blockId)
private void closeUfsBlock(long sessionId, long blockId, boolean successful)
throws IOException {
try {
mUnderFileSystemBlockStore.closeBlock(sessionId, blockId);
Optional<TempBlockMeta> tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId);
if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) {
commitBlock(sessionId, blockId, false);
if (successful) {
commitBlock(sessionId, blockId, false);
} else {
abortBlock(sessionId, blockId);
}
} else {
// When getTempBlockMeta() return null, such as a block readType NO_CACHE writeType THROUGH.
// Counter will not be decrement in the commitblock().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ public InputStream open(String path, OpenOptions options) throws IOException {
LOG.debug("Using original API to HDFS");
return new HdfsUnderFileInputStream(inputStream);
} catch (IOException e) {
LOG.warn("{} try to open {} : {}", retryPolicy.getAttemptCount(), path, e.toString());
LOG.debug("{} try to open {} : {}", retryPolicy.getAttemptCount(), path, e.toString());
te = e;
if (options.getRecoverFailedOpen() && dfs != null && e.getMessage().toLowerCase()
.startsWith("cannot obtain block length for")) {
Expand All @@ -694,7 +694,12 @@ public InputStream open(String path, OpenOptions options) throws IOException {
}
}
}
throw te;
if (te != null) {
LOG.error("{} failed attempts to open \"{}\" with last error:",
retryPolicy.getAttemptCount(), path, te);
throw te;
}
throw new IllegalStateException("Exceeded the number of retry attempts with no exception");
}

@Override
Expand Down

0 comments on commit 7f5abd7

Please sign in to comment.