diff --git a/core/common/src/main/java/alluxio/worker/block/BlockWorker.java b/core/common/src/main/java/alluxio/worker/block/BlockWorker.java index 143e002f0567..9503a8a3ee85 100644 --- a/core/common/src/main/java/alluxio/worker/block/BlockWorker.java +++ b/core/common/src/main/java/alluxio/worker/block/BlockWorker.java @@ -136,13 +136,11 @@ String createBlock(long sessionId, long blockId, int tier, throws BlockAlreadyExistsException, WorkerOutOfSpaceException, IOException; /** - * @param sessionId the id of the session to get this file * @param blockId the id of the block - * * @return metadata of the block or null if the temp block does not exist */ @Nullable - TempBlockMeta getTempBlockMeta(long sessionId, long blockId); + TempBlockMeta getTempBlockMeta(long blockId); /** * Creates a {@link BlockWriter} for an existing temporary block which is already created by diff --git a/core/server/worker/src/main/java/alluxio/worker/block/BlockStore.java b/core/server/worker/src/main/java/alluxio/worker/block/BlockStore.java index 29ae533a1a2a..528b61989244 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/BlockStore.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/BlockStore.java @@ -126,12 +126,11 @@ BlockMeta getBlockMeta(long sessionId, long blockId, long lockId) /** * Gets the temp metadata of a specific block from local storage. * - * @param sessionId the id of the session to get this file * @param blockId the id of the block * @return metadata of the block or null if the temp block does not exist */ @Nullable - TempBlockMeta getTempBlockMeta(long sessionId, long blockId); + TempBlockMeta getTempBlockMeta(long blockId); /** * Commits a temporary block to the local store. After commit, the block will be available in this diff --git a/core/server/worker/src/main/java/alluxio/worker/block/CacheRequestManager.java b/core/server/worker/src/main/java/alluxio/worker/block/CacheRequestManager.java index 8eec49d02467..5c0d8a154c59 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/CacheRequestManager.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/CacheRequestManager.java @@ -30,6 +30,7 @@ import alluxio.util.network.NetworkAddressUtils; import alluxio.worker.block.io.BlockReader; import alluxio.worker.block.io.BlockWriter; +import alluxio.worker.block.meta.TempBlockMeta; import com.codahale.metrics.Counter; import com.google.common.annotations.VisibleForTesting; @@ -272,10 +273,26 @@ private boolean cacheBlockFromUfs(long blockId, long blockSize, reader.read(offset, bufferSize); offset += bufferSize; } + if (isTempBlockMetaPresent(blockId)) { + mBlockWorker.commitBlock(Sessions.CACHE_UFS_SESSION_ID, blockId, false); + } + } catch (Exception e) { + LOG.warn("Failed to async cache block {} from UFS on reading the block:", blockId, e); + if (isTempBlockMetaPresent(blockId)) { + mBlockWorker.abortBlock(Sessions.CACHE_UFS_SESSION_ID, blockId); + } + return false; } return true; } + private boolean isTempBlockMetaPresent(long blockId) { + TempBlockMeta tempBlockMeta = + mBlockWorker.getTempBlockMeta(blockId); + return tempBlockMeta != null + && tempBlockMeta.getSessionId() == Sessions.CACHE_UFS_SESSION_ID; + } + /** * Caches the block at best effort from a remote worker (possibly from UFS indirectly). * @@ -305,7 +322,7 @@ private boolean cacheBlockFromRemoteWorker(long blockId, long blockSize, return true; } catch (AlluxioException | IOException e) { LOG.warn("Failed to async cache block {} from remote worker ({}) on copying the block: {}", - blockId, sourceAddress, e.toString()); + blockId, sourceAddress, e); try { mBlockWorker.abortBlock(Sessions.CACHE_WORKER_SESSION_ID, blockId); } catch (AlluxioException | IOException ee) { diff --git a/core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java b/core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java index 9a20a9d2019e..03425de678bd 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/DefaultBlockWorker.java @@ -397,8 +397,8 @@ public String createBlock(long sessionId, long blockId, int tier, } @Override - public TempBlockMeta getTempBlockMeta(long sessionId, long blockId) { - return mLocalBlockStore.getTempBlockMeta(sessionId, blockId); + public TempBlockMeta getTempBlockMeta(long blockId) { + return mLocalBlockStore.getTempBlockMeta(blockId); } @Override @@ -546,6 +546,10 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse }); } catch (Exception e) { try { + TempBlockMeta tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId); + if (tempBlockMeta != null && tempBlockMeta.getSessionId() == sessionId) { + abortBlock(sessionId, blockId); + } closeUfsBlock(sessionId, blockId); } catch (Exception ee) { LOG.warn("Failed to close UFS block", ee); @@ -651,7 +655,7 @@ public void closeUfsBlock(long sessionId, long blockId) throws BlockAlreadyExistsException, IOException, WorkerOutOfSpaceException { try { mUnderFileSystemBlockStore.closeReaderOrWriter(sessionId, blockId); - if (mLocalBlockStore.getTempBlockMeta(sessionId, blockId) != null) { + if (mLocalBlockStore.getTempBlockMeta(blockId) != null) { try { commitBlock(sessionId, blockId, false); } catch (BlockDoesNotExistException e) { diff --git a/core/server/worker/src/main/java/alluxio/worker/block/TieredBlockStore.java b/core/server/worker/src/main/java/alluxio/worker/block/TieredBlockStore.java index 45d0e3edcb6d..d0189ebe7785 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/TieredBlockStore.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/TieredBlockStore.java @@ -252,8 +252,8 @@ public BlockMeta getBlockMeta(long sessionId, long blockId, long lockId) } @Override - public TempBlockMeta getTempBlockMeta(long sessionId, long blockId) { - LOG.debug("getTempBlockMeta: sessionId={}, blockId={}", sessionId, blockId); + public TempBlockMeta getTempBlockMeta(long blockId) { + LOG.debug("getTempBlockMeta: blockId={}", blockId); try (LockResource r = new LockResource(mMetadataReadLock)) { return mMetaManager.getTempBlockMetaOrNull(blockId); } diff --git a/core/server/worker/src/main/java/alluxio/worker/grpc/BlockReadHandler.java b/core/server/worker/src/main/java/alluxio/worker/grpc/BlockReadHandler.java index 6af7c10609e9..8631e2673d6a 100644 --- a/core/server/worker/src/main/java/alluxio/worker/grpc/BlockReadHandler.java +++ b/core/server/worker/src/main/java/alluxio/worker/grpc/BlockReadHandler.java @@ -35,6 +35,7 @@ import alluxio.wire.BlockReadRequest; import alluxio.worker.block.BlockWorker; import alluxio.worker.block.io.BlockReader; +import alluxio.worker.block.meta.TempBlockMeta; import com.codahale.metrics.Counter; import com.codahale.metrics.Meter; @@ -50,6 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; @@ -456,14 +458,15 @@ private void runInternal() { } if (error != null) { try { - completeRequest(mContext); + boolean success = !cancel && error == null; + completeRequest(mContext, success); } catch (Exception e) { LOG.error("Failed to close the request.", e); } replyError(error); } else if (eof || cancel) { try { - completeRequest(mContext); + completeRequest(mContext, eof); } catch (Exception e) { LogUtils.warnWithException(LOG, "Exception occurred while completing read request, " + "EOF/CANCEL sessionId: {}. {}", mContext.getRequest().getSessionId(), @@ -483,12 +486,33 @@ private void runInternal() { * state it may have accumulated. * * @param context context of the request to complete + * @param success whether the request was successful */ - private void completeRequest(BlockReadRequestContext context) throws Exception { + private void completeRequest(BlockReadRequestContext context, boolean success) + throws Exception { BlockReader reader = context.getBlockReader(); + long sessionId = context.getRequest().getSessionId(); + long blockId = context.getRequest().getId(); try { if (reader != null) { - reader.close(); + if (!success) { + TempBlockMeta tempBlockMeta = + mWorker.getTempBlockMeta(blockId); + // we need to cancel the block at the final step here to avoid mistakenly commit block. + if (tempBlockMeta != null && tempBlockMeta.getSessionId() == sessionId) { + mWorker.abortBlock(sessionId, blockId); + } + reader.close(); + } + else { + // have to close the reader here to avoid commit block if the block is not fully read. + reader.close(); + TempBlockMeta tempBlockMeta = + mWorker.getTempBlockMeta(blockId); + if (tempBlockMeta != null && tempBlockMeta.getSessionId() == sessionId) { + mWorker.commitBlock(sessionId, blockId, false); + } + } } } finally { context.setBlockReader(null); diff --git a/core/server/worker/src/main/java/alluxio/worker/grpc/UfsFallbackBlockWriteHandler.java b/core/server/worker/src/main/java/alluxio/worker/grpc/UfsFallbackBlockWriteHandler.java index ac1caa572631..b41bf4b83af7 100644 --- a/core/server/worker/src/main/java/alluxio/worker/grpc/UfsFallbackBlockWriteHandler.java +++ b/core/server/worker/src/main/java/alluxio/worker/grpc/UfsFallbackBlockWriteHandler.java @@ -269,7 +269,7 @@ private void transferToUfsBlock(BlockWriteRequestContext context, long pos) thro long sessionId = context.getRequest().getSessionId(); long blockId = context.getRequest().getId(); - TempBlockMeta block = mWorker.getTempBlockMeta(sessionId, blockId); + TempBlockMeta block = mWorker.getTempBlockMeta(blockId); if (block == null) { throw new NotFoundException("block " + blockId + " not found"); } diff --git a/core/server/worker/src/test/java/alluxio/worker/block/DefaultBlockWorkerTest.java b/core/server/worker/src/test/java/alluxio/worker/block/DefaultBlockWorkerTest.java index 06729efd9609..dc8ca648eda6 100644 --- a/core/server/worker/src/test/java/alluxio/worker/block/DefaultBlockWorkerTest.java +++ b/core/server/worker/src/test/java/alluxio/worker/block/DefaultBlockWorkerTest.java @@ -142,7 +142,7 @@ public void abortBlock() throws Exception { long sessionId = mRandom.nextLong(); mBlockWorker.createBlock(sessionId, blockId, 0, Constants.MEDIUM_MEM, 1); mBlockWorker.abortBlock(sessionId, blockId); - assertNull(mBlockWorker.getTempBlockMeta(sessionId, blockId)); + assertNull(mBlockWorker.getTempBlockMeta(blockId)); } @Test @@ -200,7 +200,7 @@ public void getTempBlockWriter() throws Exception { mBlockWorker.createBlock(sessionId, blockId, 0, Constants.MEDIUM_MEM, 1); try (BlockWriter blockWriter = mBlockWorker.createBlockWriter(sessionId, blockId)) { blockWriter.append(BufferUtils.getIncreasingByteBuffer(10)); - TempBlockMeta meta = mBlockWorker.getTempBlockMeta(sessionId, blockId); + TempBlockMeta meta = mBlockWorker.getTempBlockMeta(blockId); assertEquals(Constants.MEDIUM_MEM, meta.getBlockLocation().mediumType()); } mBlockWorker.abortBlock(sessionId, blockId); @@ -338,7 +338,7 @@ public void requestSpace() throws Exception { mBlockWorker.createBlock(sessionId, blockId, 1, "", initialBytes); mBlockWorker.requestSpace(sessionId, blockId, additionalBytes); assertEquals(initialBytes + additionalBytes, - mBlockWorker.getTempBlockMeta(sessionId, blockId).getBlockSize()); + mBlockWorker.getTempBlockMeta(blockId).getBlockSize()); } @Test diff --git a/core/server/worker/src/test/java/alluxio/worker/block/NoopBlockWorker.java b/core/server/worker/src/test/java/alluxio/worker/block/NoopBlockWorker.java index 4b22975d4886..1ff81821f6cb 100644 --- a/core/server/worker/src/test/java/alluxio/worker/block/NoopBlockWorker.java +++ b/core/server/worker/src/test/java/alluxio/worker/block/NoopBlockWorker.java @@ -87,7 +87,7 @@ public String createBlock(long sessionId, long blockId, int tier, @Nullable @Override - public TempBlockMeta getTempBlockMeta(long sessionId, long blockId) { + public TempBlockMeta getTempBlockMeta(long blockId) { return null; } diff --git a/core/server/worker/src/test/java/alluxio/worker/block/UnderFileSystemBlockReaderTest.java b/core/server/worker/src/test/java/alluxio/worker/block/UnderFileSystemBlockReaderTest.java index e255b5a3cebf..377c961b0703 100644 --- a/core/server/worker/src/test/java/alluxio/worker/block/UnderFileSystemBlockReaderTest.java +++ b/core/server/worker/src/test/java/alluxio/worker/block/UnderFileSystemBlockReaderTest.java @@ -100,7 +100,7 @@ public void before() throws Exception { } private void checkTempBlock(long start, long length) throws Exception { - Assert.assertNotNull(mAlluxioBlockStore.getTempBlockMeta(SESSION_ID, BLOCK_ID)); + Assert.assertNotNull(mAlluxioBlockStore.getTempBlockMeta(BLOCK_ID)); mAlluxioBlockStore.commitBlock(SESSION_ID, BLOCK_ID, false); long lockId = mAlluxioBlockStore.lockBlock(SESSION_ID, BLOCK_ID); BlockReader reader = mAlluxioBlockStore.getBlockReader(SESSION_ID, BLOCK_ID, lockId); @@ -128,7 +128,7 @@ public void readPartialBlock() throws Exception { assertTrue(BufferUtils.equalIncreasingByteBuffer(0, (int) TEST_BLOCK_SIZE - 1, buffer)); mReader.close(); // partial block should not be cached - Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(SESSION_ID, BLOCK_ID)); + Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(BLOCK_ID)); } @Test @@ -140,7 +140,7 @@ public void offset() throws Exception { .equalIncreasingByteBuffer(2, (int) TEST_BLOCK_SIZE - 2, buffer)); mReader.close(); // partial block should not be cached - Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(SESSION_ID, BLOCK_ID)); + Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(BLOCK_ID)); } @Test @@ -168,7 +168,7 @@ public void readFullBlockNoCache() throws Exception { // read should succeed even if error is thrown when caching assertTrue(BufferUtils.equalIncreasingByteBuffer(0, (int) TEST_BLOCK_SIZE, buffer)); mReader.close(); - Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(SESSION_ID, BLOCK_ID)); + Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(BLOCK_ID)); } @Test @@ -182,7 +182,7 @@ public void readFullBlockRequestSpaceError() throws Exception { ByteBuffer buffer = mReader.read(0, TEST_BLOCK_SIZE); assertTrue(BufferUtils.equalIncreasingByteBuffer(0, (int) TEST_BLOCK_SIZE, buffer)); mReader.close(); - Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(SESSION_ID, BLOCK_ID)); + Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(BLOCK_ID)); } @Test @@ -195,7 +195,7 @@ public void readFullBlockRequestCreateBlockError() throws Exception { ByteBuffer buffer = mReader.read(0, TEST_BLOCK_SIZE); assertTrue(BufferUtils.equalIncreasingByteBuffer(0, (int) TEST_BLOCK_SIZE, buffer)); mReader.close(); - Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(SESSION_ID, BLOCK_ID)); + Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(BLOCK_ID)); } @Test @@ -232,7 +232,7 @@ public void transferPartialBlock() throws Exception { buf.release(); } // partial block should not be cached - Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(SESSION_ID, BLOCK_ID)); + Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(BLOCK_ID)); } @Test diff --git a/core/server/worker/src/test/java/alluxio/worker/grpc/UfsFallbackBlockWriteHandlerTest.java b/core/server/worker/src/test/java/alluxio/worker/grpc/UfsFallbackBlockWriteHandlerTest.java index 31563d5a4d33..09331a53a1f0 100644 --- a/core/server/worker/src/test/java/alluxio/worker/grpc/UfsFallbackBlockWriteHandlerTest.java +++ b/core/server/worker/src/test/java/alluxio/worker/grpc/UfsFallbackBlockWriteHandlerTest.java @@ -73,8 +73,8 @@ public AtomicReference getWorkerId() { } @Override - public TempBlockMeta getTempBlockMeta(long sessionId, long blockId) { - return mBlockStore.getTempBlockMeta(sessionId, blockId); + public TempBlockMeta getTempBlockMeta(long blockId) { + return mBlockStore.getTempBlockMeta(blockId); } }