From 812855fe5040ae94bcdfb2ebd7fed233632fcf70 Mon Sep 17 00:00:00 2001 From: Bin Fan Date: Sat, 27 May 2023 02:45:28 +0800 Subject: [PATCH] Fix a bug that creates 0 byte block file mistakenly ### What changes are proposed in this pull request? Fix a bug that may create 0-byte block file on worker, when there is issue to read a file from UFS. Also fix the logging as it is too spammy when HDFSUnderFileSystem fails to read a UFS file ### Why are the changes needed? When we are caching a file (async), and somehow the file does not exist on UFS any more (perhaps modified out of band). An exception will be thrown from `UnderFileSystemBlockStore.createBlockReader`. In its exception handling part, we treated this case the same as a normal close and commit the temp block. This commit fixes this by abort the temp block instead on error cases. Besides, the exception message in `createUfsBlockReader` is constructed wrong by also attaching the stacktrace into errorMessage. This is also fixed. In addition, surpressing the warn log on HDFS UFS when attempting to read a file to debug level, but only show the last error. ``` 2023-05-17 06:43:13,039 WARN UfsInputStreamCache - Failed to create a new cached ufs instream of file id 6321787562360831 and path hdfs://nameservice1/user/hive/warehouse/some/table/period_name_desc=2023-17/period_end_date=2023-03-31/000008_0 java.util.concurrent.ExecutionException: java.io.FileNotFoundException: File does not exist: /user/hive/warehouse/some/table/period_name_desc=2023-17/period_end_date=2023-03-31/000008_0 at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66) at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2168) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2138) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2049) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:583) at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:94) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:377) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2278) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2274) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2272) at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:588) at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:547) at com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:113) at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:244) at com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2317) at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2283) at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2159) at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2049) at com.google.common.cache.LocalCache.get(LocalCache.java:3966) at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4863) at alluxio.worker.block.UfsInputStreamCache.acquire(UfsInputStreamCache.java:227) at alluxio.worker.block.UnderFileSystemBlockReader.updateUnderFileSystemInputStream(UnderFileSystemBlockReader.java:373) at alluxio.worker.block.UnderFileSystemBlockReader.init(UnderFileSystemBlockReader.java:194) at alluxio.worker.block.UnderFileSystemBlockReader.create(UnderFileSystemBlockReader.java:137) at alluxio.worker.block.UnderFileSystemBlockStore.createBlockReader(UnderFileSystemBlockStore.java:306) at alluxio.worker.block.MonoBlockStore.createUfsBlockReader(MonoBlockStore.java:199) at alluxio.worker.block.DefaultBlockWorker.createUfsBlockReader(DefaultBlockWorker.java:413) at alluxio.worker.block.CacheRequestManager.cacheBlockFromUfs(CacheRequestManager.java:261) at alluxio.worker.block.CacheRequestManager.cacheBlock(CacheRequestManager.java:239) at alluxio.worker.block.CacheRequestManager.access$000(CacheRequestManager.java:56) at alluxio.worker.block.CacheRequestManager$CacheTask.call(CacheRequestManager.java:210) at alluxio.worker.block.CacheRequestManager$CacheTask.call(CacheRequestManager.java:164) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at alluxio.worker.grpc.GrpcExecutors$ImpersonateThreadPoolExecutor.lambda$execute$0(GrpcExecutors.java:159) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) ``` ### Does this PR introduce any user facing changes? No pr-link: Alluxio/alluxio#17497 change-id: cid-92e96a46cf67606c3087115cf065a8470d929421 --- .../java/alluxio/worker/block/MonoBlockStore.java | 14 +++++++++----- .../alluxio/underfs/hdfs/HdfsUnderFileSystem.java | 9 +++++++-- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/core/server/worker/src/main/java/alluxio/worker/block/MonoBlockStore.java b/core/server/worker/src/main/java/alluxio/worker/block/MonoBlockStore.java index 26b2757c885c..3b2f4a17e568 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/MonoBlockStore.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/MonoBlockStore.java @@ -183,18 +183,18 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse BlockReader reader = mUnderFileSystemBlockStore.createBlockReader(sessionId, blockId, offset, positionShort, options); BlockReader blockReader = new DelegatingBlockReader(reader, - () -> closeUfsBlock(sessionId, blockId)); + () -> closeUfsBlock(sessionId, blockId, true)); Metrics.WORKER_ACTIVE_CLIENTS.inc(); return blockReader; } catch (Exception e) { try { - closeUfsBlock(sessionId, blockId); + closeUfsBlock(sessionId, blockId, false); } catch (Exception ee) { LOG.warn("Failed to close UFS block", ee); } String errorMessage = format("Failed to read from UFS, sessionId=%d, " + "blockId=%d, offset=%d, positionShort=%s, options=%s: %s", - sessionId, blockId, offset, positionShort, options, e); + sessionId, blockId, offset, positionShort, options, e.toString()); if (e instanceof FileNotFoundException) { throw new NotFoundException(errorMessage, e); } @@ -202,13 +202,17 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse } } - private void closeUfsBlock(long sessionId, long blockId) + private void closeUfsBlock(long sessionId, long blockId, boolean successful) throws IOException { try { mUnderFileSystemBlockStore.closeBlock(sessionId, blockId); Optional tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId); if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) { - commitBlock(sessionId, blockId, false); + if (successful) { + commitBlock(sessionId, blockId, false); + } else { + abortBlock(sessionId, blockId); + } } else { // When getTempBlockMeta() return null, such as a block readType NO_CACHE writeType THROUGH. // Counter will not be decrement in the commitblock(). diff --git a/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java b/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java index 2c876fa0d979..aa672d4dd248 100755 --- a/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java +++ b/underfs/hdfs/src/main/java/alluxio/underfs/hdfs/HdfsUnderFileSystem.java @@ -684,7 +684,7 @@ public InputStream open(String path, OpenOptions options) throws IOException { LOG.debug("Using original API to HDFS"); return new HdfsUnderFileInputStream(inputStream); } catch (IOException e) { - LOG.warn("{} try to open {} : {}", retryPolicy.getAttemptCount(), path, e.toString()); + LOG.debug("{} try to open {} : {}", retryPolicy.getAttemptCount(), path, e.toString()); te = e; if (options.getRecoverFailedOpen() && dfs != null && e.getMessage().toLowerCase() .startsWith("cannot obtain block length for")) { @@ -711,7 +711,12 @@ public InputStream open(String path, OpenOptions options) throws IOException { } } } - throw te; + if (te != null) { + LOG.error("{} failed attempts to open \"{}\" with last error:", + retryPolicy.getAttemptCount(), path, te); + throw te; + } + throw new IllegalStateException("Exceeded the number of retry attempts with no exception"); } @Override