diff --git a/core/common/src/main/java/alluxio/worker/block/io/BlockReader.java b/core/common/src/main/java/alluxio/worker/block/io/BlockReader.java index c1ce215923ca..0b19c70a4bdd 100644 --- a/core/common/src/main/java/alluxio/worker/block/io/BlockReader.java +++ b/core/common/src/main/java/alluxio/worker/block/io/BlockReader.java @@ -82,4 +82,17 @@ public BlockReader() { * @return an informational string of the location the reader is reading from */ public abstract String getLocation(); + + /** + * Abort the reader. This method should be called when an exception occurs during execution + * or when the operation is cancelled by the user. + */ + public void abort() throws IOException { + } + + /** + * Commit the reader in case there is any uncommitted block cached from UFS. + */ + public void commit() throws IOException { + } } diff --git a/core/common/src/main/java/alluxio/worker/block/io/DelegatingBlockReader.java b/core/common/src/main/java/alluxio/worker/block/io/DelegatingBlockReader.java index 19e20bcb3d5f..23938aaa48a0 100644 --- a/core/common/src/main/java/alluxio/worker/block/io/DelegatingBlockReader.java +++ b/core/common/src/main/java/alluxio/worker/block/io/DelegatingBlockReader.java @@ -25,16 +25,32 @@ public class DelegatingBlockReader extends BlockReader { private final BlockReader mBlockReader; private final Closer mCloser; + private final IOFunction mCommitter; + private final IOFunction mAborter; /** * Default constructor for the abstract reader implementations. * @param blockReader block reader - * @param closeable closer + * @param closer closer */ - public DelegatingBlockReader(BlockReader blockReader, Closeable closeable) { + public DelegatingBlockReader(BlockReader blockReader, Closeable closer) { + this(blockReader, closer, null, null); + } + + /** + * Default constructor for the abstract reader implementations. + * @param blockReader block reader + * @param closer closer + * @param committer committer + * @param aborter aborter + */ + public DelegatingBlockReader(BlockReader blockReader, Closeable closer, + IOFunction committer, IOFunction aborter) { mCloser = Closer.create(); mBlockReader = mCloser.register(blockReader); - mCloser.register(closeable); + mCloser.register(closer); + mCommitter = committer; + mAborter = aborter; } /** @@ -83,4 +99,18 @@ public String toString() { public void close() throws IOException { mCloser.close(); } + + @Override + public void abort() throws IOException { + if (mAborter != null) { + mAborter.call(); + } + } + + @Override + public void commit() throws IOException { + if (mCommitter != null) { + mCommitter.call(); + } + } } diff --git a/core/common/src/main/java/alluxio/worker/block/io/IOFunction.java b/core/common/src/main/java/alluxio/worker/block/io/IOFunction.java new file mode 100644 index 000000000000..8c53e9083aa4 --- /dev/null +++ b/core/common/src/main/java/alluxio/worker/block/io/IOFunction.java @@ -0,0 +1,25 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.worker.block.io; + +import java.io.IOException; + +/** + * A callable for IO functions. + */ +public interface IOFunction { + /** + * Compute with some io functions. + * @throws IOException + */ + void call() throws IOException; +} diff --git a/core/server/worker/src/main/java/alluxio/worker/block/CacheRequestManager.java b/core/server/worker/src/main/java/alluxio/worker/block/CacheRequestManager.java index 0c2bb7ebc9bc..b383589c5961 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/CacheRequestManager.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/CacheRequestManager.java @@ -271,15 +271,24 @@ private CacheResult cacheBlockFromUfs(long blockId, long blockSize, Protocol.OpenUfsBlockOptions openUfsBlockOptions) throws IOException { try (BlockReader reader = mBlockWorker.createUfsBlockReader( Sessions.CACHE_UFS_SESSION_ID, blockId, 0, false, openUfsBlockOptions)) { - // Read the entire block, caching to block store will be handled internally in UFS block store - // when close the reader. - // Note that, we read from UFS with a smaller buffer to avoid high pressure on heap - // memory when concurrent async requests are received and thus trigger GC. - long offset = 0; - while (offset < blockSize) { - long bufferSize = Math.min(8L * Constants.MB, blockSize - offset); - reader.read(offset, bufferSize); - offset += bufferSize; + try { + // Read the entire block, caching to block store will be handled internally in UFS block + // store when close the reader. + // Note that, we read from UFS with a smaller buffer to avoid high pressure on heap + // memory when concurrent async requests are received and thus trigger GC. + long offset = 0; + while (offset < blockSize) { + long bufferSize = Math.min(8L * Constants.MB, blockSize - offset); + reader.read(offset, bufferSize); + offset += bufferSize; + } + reader.commit(); + } catch (Exception e) { + reader.abort(); + if (e instanceof IOException) { + throw (IOException) e; + } + throw new RuntimeException(e); } } return CacheResult.SUCCEED; diff --git a/core/server/worker/src/main/java/alluxio/worker/block/MonoBlockStore.java b/core/server/worker/src/main/java/alluxio/worker/block/MonoBlockStore.java index 4ef1238dfd31..4d67cb41ecf2 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/MonoBlockStore.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/MonoBlockStore.java @@ -187,12 +187,14 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse BlockReader reader = mUnderFileSystemBlockStore.createBlockReader(sessionId, blockId, offset, positionShort, options); BlockReader blockReader = new DelegatingBlockReader(reader, - () -> closeUfsBlock(sessionId, blockId, true)); + () -> closeUfsBlock(sessionId, blockId), + () -> commitUfsBlock(sessionId, blockId), + () -> abortUfsBlock(sessionId, blockId)); Metrics.WORKER_ACTIVE_CLIENTS.inc(); return blockReader; } catch (Exception e) { try { - closeUfsBlock(sessionId, blockId, false); + closeUfsBlock(sessionId, blockId); } catch (Exception ee) { LOG.warn("Failed to close UFS block", ee); } @@ -206,18 +208,12 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse } } - private void closeUfsBlock(long sessionId, long blockId, boolean successful) + private void closeUfsBlock(long sessionId, long blockId) throws IOException { try { mUnderFileSystemBlockStore.closeBlock(sessionId, blockId); Optional tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId); - if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) { - if (successful) { - commitBlock(sessionId, blockId, false); - } else { - abortBlock(sessionId, blockId); - } - } else { + if (!tempBlockMeta.isPresent() || tempBlockMeta.get().getSessionId() != sessionId) { // When getTempBlockMeta() return null, such as a block readType NO_CACHE writeType THROUGH. // Counter will not be decrement in the commitblock(). // So we should decrement counter here. @@ -230,6 +226,30 @@ private void closeUfsBlock(long sessionId, long blockId, boolean successful) } } + private void abortUfsBlock(long sessionId, long blockId) throws IOException { + mUnderFileSystemBlockStore.closeBlock(sessionId, blockId); + Optional tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId); + if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) { + abortBlock(sessionId, blockId); + } else { + LOG.warn("Skipping abort of UFS block due to missing temp block " + + "metadata or mismatched session ID. Block ID: {}, Session ID: {}", + blockId, sessionId); + } + } + + private void commitUfsBlock(long sessionId, long blockId) throws IOException { + mUnderFileSystemBlockStore.closeBlock(sessionId, blockId); + Optional tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId); + if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) { + commitBlock(sessionId, blockId, false); + } else { + LOG.warn("Skipping commit of UFS block due to missing temp block " + + "metadata or mismatched session ID. Block ID: {}, Session ID: {}", + blockId, sessionId); + } + } + @Override public BlockWriter createBlockWriter(long sessionId, long blockId) throws IOException { return mLocalBlockStore.createBlockWriter(sessionId, blockId); diff --git a/core/server/worker/src/main/java/alluxio/worker/grpc/BlockReadHandler.java b/core/server/worker/src/main/java/alluxio/worker/grpc/BlockReadHandler.java index af8310e262bb..db94fb33e7f4 100644 --- a/core/server/worker/src/main/java/alluxio/worker/grpc/BlockReadHandler.java +++ b/core/server/worker/src/main/java/alluxio/worker/grpc/BlockReadHandler.java @@ -54,6 +54,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -550,12 +551,23 @@ protected DataBuffer getDataBuffer(BlockReadRequestContext context, long offset, try { while (buf.writableBytes() > 0 && blockReader.transferTo(buf) != -1) { } + blockReader.commit(); return new NettyDataBuffer(buf.retain()); + } catch (Exception e) { + blockReader.abort(); + throw e; } finally { buf.release(); } } else { - ByteBuffer buffer = blockReader.read(offset, len); + ByteBuffer buffer; + try { + buffer = blockReader.read(offset, len); + blockReader.commit(); + } catch (IOException e) { + blockReader.abort(); + throw e; + } return new NettyDataBuffer(Unpooled.wrappedBuffer(buffer)); } default: