Skip to content

Commit

Permalink
Fix data corrupted issue when loading from UFS
Browse files Browse the repository at this point in the history
A this fix a bug in corrupted data files.
Previously,  #17497 attempt to solve this issue but only covers the case when creating a UFS reader
Notice we only handle block reader. So this may not fix `paged Block Reader` in 2.x

Please clarify why the changes are needed. For instance,
  1. If you propose a new API, clarify the use case for a new API.
  2. If you fix a bug, describe the bug.

Please list the user-facing changes introduced by your change, including
  1. change in user-facing APIs
  2. addition or removal of property keys
  3. webui

			pr-link: #18525
			change-id: cid-bbba0feb29231e70750e5e79da5f405bb591d47a
  • Loading branch information
jja725 authored and yuzhu committed Apr 1, 2024
1 parent b6f99f6 commit f237505
Show file tree
Hide file tree
Showing 11 changed files with 71 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,11 @@ String createBlock(long sessionId, long blockId, int tier,
throws BlockAlreadyExistsException, WorkerOutOfSpaceException, IOException;

/**
* @param sessionId the id of the session to get this file
* @param blockId the id of the block
*
* @return metadata of the block or null if the temp block does not exist
*/
@Nullable
TempBlockMeta getTempBlockMeta(long sessionId, long blockId);
TempBlockMeta getTempBlockMeta(long blockId);

/**
* Creates a {@link BlockWriter} for an existing temporary block which is already created by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,11 @@ BlockMeta getBlockMeta(long sessionId, long blockId, long lockId)
/**
* Gets the temp metadata of a specific block from local storage.
*
* @param sessionId the id of the session to get this file
* @param blockId the id of the block
* @return metadata of the block or null if the temp block does not exist
*/
@Nullable
TempBlockMeta getTempBlockMeta(long sessionId, long blockId);
TempBlockMeta getTempBlockMeta(long blockId);

/**
* Commits a temporary block to the local store. After commit, the block will be available in this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import alluxio.util.network.NetworkAddressUtils;
import alluxio.worker.block.io.BlockReader;
import alluxio.worker.block.io.BlockWriter;
import alluxio.worker.block.meta.TempBlockMeta;

import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -272,10 +273,26 @@ private boolean cacheBlockFromUfs(long blockId, long blockSize,
reader.read(offset, bufferSize);
offset += bufferSize;
}
if (isTempBlockMetaPresent(blockId)) {
mBlockWorker.commitBlock(Sessions.CACHE_UFS_SESSION_ID, blockId, false);
}
} catch (Exception e) {
LOG.warn("Failed to async cache block {} from UFS on reading the block:", blockId, e);
if (isTempBlockMetaPresent(blockId)) {
mBlockWorker.abortBlock(Sessions.CACHE_UFS_SESSION_ID, blockId);
}
return false;
}
return true;
}

private boolean isTempBlockMetaPresent(long blockId) {
TempBlockMeta tempBlockMeta =
mBlockWorker.getTempBlockMeta(blockId);
return tempBlockMeta != null
&& tempBlockMeta.getSessionId() == Sessions.CACHE_UFS_SESSION_ID;
}

/**
* Caches the block at best effort from a remote worker (possibly from UFS indirectly).
*
Expand Down Expand Up @@ -305,7 +322,7 @@ private boolean cacheBlockFromRemoteWorker(long blockId, long blockSize,
return true;
} catch (AlluxioException | IOException e) {
LOG.warn("Failed to async cache block {} from remote worker ({}) on copying the block: {}",
blockId, sourceAddress, e.toString());
blockId, sourceAddress, e);
try {
mBlockWorker.abortBlock(Sessions.CACHE_WORKER_SESSION_ID, blockId);
} catch (AlluxioException | IOException ee) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,8 @@ public String createBlock(long sessionId, long blockId, int tier,
}

@Override
public TempBlockMeta getTempBlockMeta(long sessionId, long blockId) {
return mLocalBlockStore.getTempBlockMeta(sessionId, blockId);
public TempBlockMeta getTempBlockMeta(long blockId) {
return mLocalBlockStore.getTempBlockMeta(blockId);
}

@Override
Expand Down Expand Up @@ -546,6 +546,10 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse
});
} catch (Exception e) {
try {
TempBlockMeta tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId);
if (tempBlockMeta != null && tempBlockMeta.getSessionId() == sessionId) {
abortBlock(sessionId, blockId);
}
closeUfsBlock(sessionId, blockId);
} catch (Exception ee) {
LOG.warn("Failed to close UFS block", ee);
Expand Down Expand Up @@ -651,7 +655,7 @@ public void closeUfsBlock(long sessionId, long blockId)
throws BlockAlreadyExistsException, IOException, WorkerOutOfSpaceException {
try {
mUnderFileSystemBlockStore.closeReaderOrWriter(sessionId, blockId);
if (mLocalBlockStore.getTempBlockMeta(sessionId, blockId) != null) {
if (mLocalBlockStore.getTempBlockMeta(blockId) != null) {
try {
commitBlock(sessionId, blockId, false);
} catch (BlockDoesNotExistException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ public BlockMeta getBlockMeta(long sessionId, long blockId, long lockId)
}

@Override
public TempBlockMeta getTempBlockMeta(long sessionId, long blockId) {
LOG.debug("getTempBlockMeta: sessionId={}, blockId={}", sessionId, blockId);
public TempBlockMeta getTempBlockMeta(long blockId) {
LOG.debug("getTempBlockMeta: blockId={}", blockId);
try (LockResource r = new LockResource(mMetadataReadLock)) {
return mMetaManager.getTempBlockMetaOrNull(blockId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import alluxio.wire.BlockReadRequest;
import alluxio.worker.block.BlockWorker;
import alluxio.worker.block.io.BlockReader;
import alluxio.worker.block.meta.TempBlockMeta;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
Expand All @@ -50,6 +51,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
Expand Down Expand Up @@ -456,14 +458,15 @@ private void runInternal() {
}
if (error != null) {
try {
completeRequest(mContext);
boolean success = !cancel && error == null;
completeRequest(mContext, success);
} catch (Exception e) {
LOG.error("Failed to close the request.", e);
}
replyError(error);
} else if (eof || cancel) {
try {
completeRequest(mContext);
completeRequest(mContext, eof);
} catch (Exception e) {
LogUtils.warnWithException(LOG, "Exception occurred while completing read request, "
+ "EOF/CANCEL sessionId: {}. {}", mContext.getRequest().getSessionId(),
Expand All @@ -483,12 +486,33 @@ private void runInternal() {
* state it may have accumulated.
*
* @param context context of the request to complete
* @param success whether the request was successful
*/
private void completeRequest(BlockReadRequestContext context) throws Exception {
private void completeRequest(BlockReadRequestContext context, boolean success)
throws Exception {
BlockReader reader = context.getBlockReader();
long sessionId = context.getRequest().getSessionId();
long blockId = context.getRequest().getId();
try {
if (reader != null) {
reader.close();
if (!success) {
TempBlockMeta tempBlockMeta =
mWorker.getTempBlockMeta(blockId);
// we need to cancel the block at the final step here to avoid mistakenly commit block.
if (tempBlockMeta != null && tempBlockMeta.getSessionId() == sessionId) {
mWorker.abortBlock(sessionId, blockId);
}
reader.close();
}
else {
// have to close the reader here to avoid commit block if the block is not fully read.
reader.close();
TempBlockMeta tempBlockMeta =
mWorker.getTempBlockMeta(blockId);
if (tempBlockMeta != null && tempBlockMeta.getSessionId() == sessionId) {
mWorker.commitBlock(sessionId, blockId, false);
}
}
}
} finally {
context.setBlockReader(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ private void transferToUfsBlock(BlockWriteRequestContext context, long pos) thro

long sessionId = context.getRequest().getSessionId();
long blockId = context.getRequest().getId();
TempBlockMeta block = mWorker.getTempBlockMeta(sessionId, blockId);
TempBlockMeta block = mWorker.getTempBlockMeta(blockId);
if (block == null) {
throw new NotFoundException("block " + blockId + " not found");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void abortBlock() throws Exception {
long sessionId = mRandom.nextLong();
mBlockWorker.createBlock(sessionId, blockId, 0, Constants.MEDIUM_MEM, 1);
mBlockWorker.abortBlock(sessionId, blockId);
assertNull(mBlockWorker.getTempBlockMeta(sessionId, blockId));
assertNull(mBlockWorker.getTempBlockMeta(blockId));
}

@Test
Expand Down Expand Up @@ -200,7 +200,7 @@ public void getTempBlockWriter() throws Exception {
mBlockWorker.createBlock(sessionId, blockId, 0, Constants.MEDIUM_MEM, 1);
try (BlockWriter blockWriter = mBlockWorker.createBlockWriter(sessionId, blockId)) {
blockWriter.append(BufferUtils.getIncreasingByteBuffer(10));
TempBlockMeta meta = mBlockWorker.getTempBlockMeta(sessionId, blockId);
TempBlockMeta meta = mBlockWorker.getTempBlockMeta(blockId);
assertEquals(Constants.MEDIUM_MEM, meta.getBlockLocation().mediumType());
}
mBlockWorker.abortBlock(sessionId, blockId);
Expand Down Expand Up @@ -338,7 +338,7 @@ public void requestSpace() throws Exception {
mBlockWorker.createBlock(sessionId, blockId, 1, "", initialBytes);
mBlockWorker.requestSpace(sessionId, blockId, additionalBytes);
assertEquals(initialBytes + additionalBytes,
mBlockWorker.getTempBlockMeta(sessionId, blockId).getBlockSize());
mBlockWorker.getTempBlockMeta(blockId).getBlockSize());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public String createBlock(long sessionId, long blockId, int tier,

@Nullable
@Override
public TempBlockMeta getTempBlockMeta(long sessionId, long blockId) {
public TempBlockMeta getTempBlockMeta(long blockId) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void before() throws Exception {
}

private void checkTempBlock(long start, long length) throws Exception {
Assert.assertNotNull(mAlluxioBlockStore.getTempBlockMeta(SESSION_ID, BLOCK_ID));
Assert.assertNotNull(mAlluxioBlockStore.getTempBlockMeta(BLOCK_ID));
mAlluxioBlockStore.commitBlock(SESSION_ID, BLOCK_ID, false);
long lockId = mAlluxioBlockStore.lockBlock(SESSION_ID, BLOCK_ID);
BlockReader reader = mAlluxioBlockStore.getBlockReader(SESSION_ID, BLOCK_ID, lockId);
Expand Down Expand Up @@ -128,7 +128,7 @@ public void readPartialBlock() throws Exception {
assertTrue(BufferUtils.equalIncreasingByteBuffer(0, (int) TEST_BLOCK_SIZE - 1, buffer));
mReader.close();
// partial block should not be cached
Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(SESSION_ID, BLOCK_ID));
Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(BLOCK_ID));
}

@Test
Expand All @@ -140,7 +140,7 @@ public void offset() throws Exception {
.equalIncreasingByteBuffer(2, (int) TEST_BLOCK_SIZE - 2, buffer));
mReader.close();
// partial block should not be cached
Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(SESSION_ID, BLOCK_ID));
Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(BLOCK_ID));
}

@Test
Expand Down Expand Up @@ -168,7 +168,7 @@ public void readFullBlockNoCache() throws Exception {
// read should succeed even if error is thrown when caching
assertTrue(BufferUtils.equalIncreasingByteBuffer(0, (int) TEST_BLOCK_SIZE, buffer));
mReader.close();
Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(SESSION_ID, BLOCK_ID));
Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(BLOCK_ID));
}

@Test
Expand All @@ -182,7 +182,7 @@ public void readFullBlockRequestSpaceError() throws Exception {
ByteBuffer buffer = mReader.read(0, TEST_BLOCK_SIZE);
assertTrue(BufferUtils.equalIncreasingByteBuffer(0, (int) TEST_BLOCK_SIZE, buffer));
mReader.close();
Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(SESSION_ID, BLOCK_ID));
Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(BLOCK_ID));
}

@Test
Expand All @@ -195,7 +195,7 @@ public void readFullBlockRequestCreateBlockError() throws Exception {
ByteBuffer buffer = mReader.read(0, TEST_BLOCK_SIZE);
assertTrue(BufferUtils.equalIncreasingByteBuffer(0, (int) TEST_BLOCK_SIZE, buffer));
mReader.close();
Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(SESSION_ID, BLOCK_ID));
Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(BLOCK_ID));
}

@Test
Expand Down Expand Up @@ -232,7 +232,7 @@ public void transferPartialBlock() throws Exception {
buf.release();
}
// partial block should not be cached
Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(SESSION_ID, BLOCK_ID));
Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(BLOCK_ID));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ public AtomicReference<Long> getWorkerId() {
}

@Override
public TempBlockMeta getTempBlockMeta(long sessionId, long blockId) {
return mBlockStore.getTempBlockMeta(sessionId, blockId);
public TempBlockMeta getTempBlockMeta(long blockId) {
return mBlockStore.getTempBlockMeta(blockId);
}
}

Expand Down

0 comments on commit f237505

Please sign in to comment.