Skip to content

Commit

Permalink
Fix a bug that creates 0 byte block file mistakenly
Browse files Browse the repository at this point in the history
Fix a bug that may create 0-byte block file on worker, when there is issue to read a file from UFS.
Also fix the logging as it is too spammy when HDFSUnderFileSystem fails to read a UFS file

When we are caching a file (async),  and somehow the file does not exist on UFS any more (perhaps modified out of band).
An exception will be thrown from `UnderFileSystemBlockStore.createBlockReader`.
In its exception handling part, we treated this case the same as a normal close and commit the temp block.
This commit fixes this by abort the temp block instead on error cases.

Besides, the exception message in `createUfsBlockReader` is constructed wrong by also attaching the stacktrace into errorMessage. This is also fixed.

In addition, surpressing the warn log on HDFS UFS when attempting to read a file to debug level, but only show the last error.

```
2023-05-17 06:43:13,039 WARN  UfsInputStreamCache - Failed to create a new cached ufs instream of file id 6321787562360831 and path hdfs://nameservice1/user/hive/warehouse/some/table/period_name_desc=2023-17/period_end_date=2023-03-31/000008_0
java.util.concurrent.ExecutionException: java.io.FileNotFoundException: File does not exist: /user/hive/warehouse/some/table/period_name_desc=2023-17/period_end_date=2023-03-31/000008_0
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2168)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2138)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2049)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:583)
	at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:94)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:377)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2278)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2274)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2272)

	at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:588)
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:547)
	at com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:113)
	at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:244)
	at com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2317)
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2283)
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2159)
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2049)
	at com.google.common.cache.LocalCache.get(LocalCache.java:3966)
	at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4863)
	at alluxio.worker.block.UfsInputStreamCache.acquire(UfsInputStreamCache.java:227)
	at alluxio.worker.block.UnderFileSystemBlockReader.updateUnderFileSystemInputStream(UnderFileSystemBlockReader.java:373)
	at alluxio.worker.block.UnderFileSystemBlockReader.init(UnderFileSystemBlockReader.java:194)
	at alluxio.worker.block.UnderFileSystemBlockReader.create(UnderFileSystemBlockReader.java:137)
	at alluxio.worker.block.UnderFileSystemBlockStore.createBlockReader(UnderFileSystemBlockStore.java:306)
	at alluxio.worker.block.MonoBlockStore.createUfsBlockReader(MonoBlockStore.java:199)
	at alluxio.worker.block.DefaultBlockWorker.createUfsBlockReader(DefaultBlockWorker.java:413)
	at alluxio.worker.block.CacheRequestManager.cacheBlockFromUfs(CacheRequestManager.java:261)
	at alluxio.worker.block.CacheRequestManager.cacheBlock(CacheRequestManager.java:239)
	at alluxio.worker.block.CacheRequestManager.access$000(CacheRequestManager.java:56)
	at alluxio.worker.block.CacheRequestManager$CacheTask.call(CacheRequestManager.java:210)
	at alluxio.worker.block.CacheRequestManager$CacheTask.call(CacheRequestManager.java:164)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at alluxio.worker.grpc.GrpcExecutors$ImpersonateThreadPoolExecutor.lambda$execute$0(GrpcExecutors.java:159)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
```

No

			pr-link: Alluxio#17497
			change-id: cid-92e96a46cf67606c3087115cf065a8470d929421
  • Loading branch information
apc999 authored and Xenorith committed May 30, 2023
1 parent 5307320 commit d9a7999
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,30 +171,46 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse
try {
BlockReader reader = mUnderFileSystemBlockStore.createBlockReader(sessionId, blockId, offset,
positionShort, options);
<<<<<<< HEAD
return new DelegatingBlockReader(reader, () -> closeUfsBlock(sessionId, blockId));
||||||| parent of 812855fe50 (Fix a bug that creates 0 byte block file mistakenly)
BlockReader blockReader = new DelegatingBlockReader(reader,
() -> closeUfsBlock(sessionId, blockId));
Metrics.WORKER_ACTIVE_CLIENTS.inc();
return blockReader;
=======
BlockReader blockReader = new DelegatingBlockReader(reader,
() -> closeUfsBlock(sessionId, blockId, true));
Metrics.WORKER_ACTIVE_CLIENTS.inc();
return blockReader;
>>>>>>> 812855fe50 (Fix a bug that creates 0 byte block file mistakenly)
} catch (Exception e) {
try {
closeUfsBlock(sessionId, blockId);
closeUfsBlock(sessionId, blockId, false);
} catch (Exception ee) {
LOG.warn("Failed to close UFS block", ee);
}
String errorMessage = format("Failed to read from UFS, sessionId=%d, "
+ "blockId=%d, offset=%d, positionShort=%s, options=%s: %s",
sessionId, blockId, offset, positionShort, options, e);
sessionId, blockId, offset, positionShort, options, e.toString());
if (e instanceof FileNotFoundException) {
throw new NotFoundException(errorMessage, e);
}
throw new UnavailableException(errorMessage, e);
}
}

private void closeUfsBlock(long sessionId, long blockId)
private void closeUfsBlock(long sessionId, long blockId, boolean successful)
throws IOException {
try {
mUnderFileSystemBlockStore.closeBlock(sessionId, blockId);
Optional<TempBlockMeta> tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId);
if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) {
commitBlock(sessionId, blockId, false);
if (successful) {
commitBlock(sessionId, blockId, false);
} else {
abortBlock(sessionId, blockId);
}
} else {
// When getTempBlockMeta() return null, such as a block readType NO_CACHE writeType THROUGH.
// Counter will not be decrement in the commitblock().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ public InputStream open(String path, OpenOptions options) throws IOException {
LOG.debug("Using original API to HDFS");
return new HdfsUnderFileInputStream(inputStream);
} catch (IOException e) {
LOG.warn("{} try to open {} : {}", retryPolicy.getAttemptCount(), path, e.toString());
LOG.debug("{} try to open {} : {}", retryPolicy.getAttemptCount(), path, e.toString());
te = e;
if (options.getRecoverFailedOpen() && dfs != null && e.getMessage().toLowerCase()
.startsWith("cannot obtain block length for")) {
Expand All @@ -694,7 +694,12 @@ public InputStream open(String path, OpenOptions options) throws IOException {
}
}
}
throw te;
if (te != null) {
LOG.error("{} failed attempts to open \"{}\" with last error:",
retryPolicy.getAttemptCount(), path, te);
throw te;
}
throw new IllegalStateException("Exceeded the number of retry attempts with no exception");
}

@Override
Expand Down

0 comments on commit d9a7999

Please sign in to comment.