Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data corrupted issue when loading from UFS #18564

Merged
merged 1 commit into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,11 @@ String createBlock(long sessionId, long blockId, int tier,
throws BlockAlreadyExistsException, WorkerOutOfSpaceException, IOException;

/**
* @param sessionId the id of the session to get this file
* @param blockId the id of the block
*
* @return metadata of the block or null if the temp block does not exist
*/
@Nullable
TempBlockMeta getTempBlockMeta(long sessionId, long blockId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually not sure we want to change the interface here, we better just update the place we invoke the method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

many places, we do not have a reference to the sessionId, that's why I changed it..

TempBlockMeta getTempBlockMeta(long blockId);

/**
* Creates a {@link BlockWriter} for an existing temporary block which is already created by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,11 @@ BlockMeta getBlockMeta(long sessionId, long blockId, long lockId)
/**
* Gets the temp metadata of a specific block from local storage.
*
* @param sessionId the id of the session to get this file
* @param blockId the id of the block
* @return metadata of the block or null if the temp block does not exist
*/
@Nullable
TempBlockMeta getTempBlockMeta(long sessionId, long blockId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

TempBlockMeta getTempBlockMeta(long blockId);

/**
* Commits a temporary block to the local store. After commit, the block will be available in this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import alluxio.util.network.NetworkAddressUtils;
import alluxio.worker.block.io.BlockReader;
import alluxio.worker.block.io.BlockWriter;
import alluxio.worker.block.meta.TempBlockMeta;

import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -272,10 +273,26 @@ private boolean cacheBlockFromUfs(long blockId, long blockSize,
reader.read(offset, bufferSize);
offset += bufferSize;
}
if (isTempBlockMetaPresent(blockId)) {
mBlockWorker.commitBlock(Sessions.CACHE_UFS_SESSION_ID, blockId, false);
}
} catch (Exception e) {
LOG.warn("Failed to async cache block {} from UFS on reading the block:", blockId, e);
if (isTempBlockMetaPresent(blockId)) {
mBlockWorker.abortBlock(Sessions.CACHE_UFS_SESSION_ID, blockId);
}
return false;
}
return true;
}

private boolean isTempBlockMetaPresent(long blockId) {
TempBlockMeta tempBlockMeta =
mBlockWorker.getTempBlockMeta(blockId);
return tempBlockMeta != null
&& tempBlockMeta.getSessionId() == Sessions.CACHE_UFS_SESSION_ID;
}

/**
* Caches the block at best effort from a remote worker (possibly from UFS indirectly).
*
Expand Down Expand Up @@ -305,7 +322,7 @@ private boolean cacheBlockFromRemoteWorker(long blockId, long blockSize,
return true;
} catch (AlluxioException | IOException e) {
LOG.warn("Failed to async cache block {} from remote worker ({}) on copying the block: {}",
blockId, sourceAddress, e.toString());
blockId, sourceAddress, e);
try {
mBlockWorker.abortBlock(Sessions.CACHE_WORKER_SESSION_ID, blockId);
} catch (AlluxioException | IOException ee) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,8 @@ public String createBlock(long sessionId, long blockId, int tier,
}

@Override
public TempBlockMeta getTempBlockMeta(long sessionId, long blockId) {
return mLocalBlockStore.getTempBlockMeta(sessionId, blockId);
public TempBlockMeta getTempBlockMeta(long blockId) {
return mLocalBlockStore.getTempBlockMeta(blockId);
}

@Override
Expand Down Expand Up @@ -546,6 +546,10 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse
});
} catch (Exception e) {
try {
TempBlockMeta tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId);
if (tempBlockMeta != null && tempBlockMeta.getSessionId() == sessionId) {
abortBlock(sessionId, blockId);
}
closeUfsBlock(sessionId, blockId);
} catch (Exception ee) {
LOG.warn("Failed to close UFS block", ee);
Expand Down Expand Up @@ -651,7 +655,7 @@ public void closeUfsBlock(long sessionId, long blockId)
throws BlockAlreadyExistsException, IOException, WorkerOutOfSpaceException {
try {
mUnderFileSystemBlockStore.closeReaderOrWriter(sessionId, blockId);
if (mLocalBlockStore.getTempBlockMeta(sessionId, blockId) != null) {
if (mLocalBlockStore.getTempBlockMeta(blockId) != null) {
try {
commitBlock(sessionId, blockId, false);
} catch (BlockDoesNotExistException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ public BlockMeta getBlockMeta(long sessionId, long blockId, long lockId)
}

@Override
public TempBlockMeta getTempBlockMeta(long sessionId, long blockId) {
LOG.debug("getTempBlockMeta: sessionId={}, blockId={}", sessionId, blockId);
public TempBlockMeta getTempBlockMeta(long blockId) {
LOG.debug("getTempBlockMeta: blockId={}", blockId);
try (LockResource r = new LockResource(mMetadataReadLock)) {
return mMetaManager.getTempBlockMetaOrNull(blockId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import alluxio.wire.BlockReadRequest;
import alluxio.worker.block.BlockWorker;
import alluxio.worker.block.io.BlockReader;
import alluxio.worker.block.meta.TempBlockMeta;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
Expand Down Expand Up @@ -456,14 +457,15 @@ private void runInternal() {
}
if (error != null) {
try {
completeRequest(mContext);
boolean success = !cancel && error == null;
completeRequest(mContext, success);
} catch (Exception e) {
LOG.error("Failed to close the request.", e);
}
replyError(error);
} else if (eof || cancel) {
try {
completeRequest(mContext);
completeRequest(mContext, eof);
} catch (Exception e) {
LogUtils.warnWithException(LOG, "Exception occurred while completing read request, "
+ "EOF/CANCEL sessionId: {}. {}", mContext.getRequest().getSessionId(),
Expand All @@ -483,12 +485,33 @@ private void runInternal() {
* state it may have accumulated.
*
* @param context context of the request to complete
* @param success whether the request was successful
*/
private void completeRequest(BlockReadRequestContext context) throws Exception {
private void completeRequest(BlockReadRequestContext context, boolean success)
throws Exception {
BlockReader reader = context.getBlockReader();
long sessionId = context.getRequest().getSessionId();
long blockId = context.getRequest().getId();
try {
if (reader != null) {
reader.close();
if (!success) {
TempBlockMeta tempBlockMeta =
mWorker.getTempBlockMeta(blockId);
// we need to cancel the block at the final step here to avoid mistakenly commit block.
if (tempBlockMeta != null && tempBlockMeta.getSessionId() == sessionId) {
mWorker.abortBlock(sessionId, blockId);
}
reader.close();
}
else {
// have to close the reader here to avoid commit block if the block is not fully read.
reader.close();
TempBlockMeta tempBlockMeta =
mWorker.getTempBlockMeta(blockId);
if (tempBlockMeta != null && tempBlockMeta.getSessionId() == sessionId) {
mWorker.commitBlock(sessionId, blockId, false);
}
}
}
} finally {
context.setBlockReader(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ private void transferToUfsBlock(BlockWriteRequestContext context, long pos) thro

long sessionId = context.getRequest().getSessionId();
long blockId = context.getRequest().getId();
TempBlockMeta block = mWorker.getTempBlockMeta(sessionId, blockId);
TempBlockMeta block = mWorker.getTempBlockMeta(blockId);
if (block == null) {
throw new NotFoundException("block " + blockId + " not found");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void abortBlock() throws Exception {
long sessionId = mRandom.nextLong();
mBlockWorker.createBlock(sessionId, blockId, 0, Constants.MEDIUM_MEM, 1);
mBlockWorker.abortBlock(sessionId, blockId);
assertNull(mBlockWorker.getTempBlockMeta(sessionId, blockId));
assertNull(mBlockWorker.getTempBlockMeta(blockId));
}

@Test
Expand Down Expand Up @@ -200,7 +200,7 @@ public void getTempBlockWriter() throws Exception {
mBlockWorker.createBlock(sessionId, blockId, 0, Constants.MEDIUM_MEM, 1);
try (BlockWriter blockWriter = mBlockWorker.createBlockWriter(sessionId, blockId)) {
blockWriter.append(BufferUtils.getIncreasingByteBuffer(10));
TempBlockMeta meta = mBlockWorker.getTempBlockMeta(sessionId, blockId);
TempBlockMeta meta = mBlockWorker.getTempBlockMeta(blockId);
assertEquals(Constants.MEDIUM_MEM, meta.getBlockLocation().mediumType());
}
mBlockWorker.abortBlock(sessionId, blockId);
Expand Down Expand Up @@ -338,7 +338,7 @@ public void requestSpace() throws Exception {
mBlockWorker.createBlock(sessionId, blockId, 1, "", initialBytes);
mBlockWorker.requestSpace(sessionId, blockId, additionalBytes);
assertEquals(initialBytes + additionalBytes,
mBlockWorker.getTempBlockMeta(sessionId, blockId).getBlockSize());
mBlockWorker.getTempBlockMeta(blockId).getBlockSize());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public String createBlock(long sessionId, long blockId, int tier,

@Nullable
@Override
public TempBlockMeta getTempBlockMeta(long sessionId, long blockId) {
public TempBlockMeta getTempBlockMeta(long blockId) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void before() throws Exception {
}

private void checkTempBlock(long start, long length) throws Exception {
Assert.assertNotNull(mAlluxioBlockStore.getTempBlockMeta(SESSION_ID, BLOCK_ID));
Assert.assertNotNull(mAlluxioBlockStore.getTempBlockMeta(BLOCK_ID));
mAlluxioBlockStore.commitBlock(SESSION_ID, BLOCK_ID, false);
long lockId = mAlluxioBlockStore.lockBlock(SESSION_ID, BLOCK_ID);
BlockReader reader = mAlluxioBlockStore.getBlockReader(SESSION_ID, BLOCK_ID, lockId);
Expand Down Expand Up @@ -128,7 +128,7 @@ public void readPartialBlock() throws Exception {
assertTrue(BufferUtils.equalIncreasingByteBuffer(0, (int) TEST_BLOCK_SIZE - 1, buffer));
mReader.close();
// partial block should not be cached
Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(SESSION_ID, BLOCK_ID));
Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(BLOCK_ID));
}

@Test
Expand All @@ -140,7 +140,7 @@ public void offset() throws Exception {
.equalIncreasingByteBuffer(2, (int) TEST_BLOCK_SIZE - 2, buffer));
mReader.close();
// partial block should not be cached
Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(SESSION_ID, BLOCK_ID));
Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(BLOCK_ID));
}

@Test
Expand Down Expand Up @@ -168,7 +168,7 @@ public void readFullBlockNoCache() throws Exception {
// read should succeed even if error is thrown when caching
assertTrue(BufferUtils.equalIncreasingByteBuffer(0, (int) TEST_BLOCK_SIZE, buffer));
mReader.close();
Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(SESSION_ID, BLOCK_ID));
Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(BLOCK_ID));
}

@Test
Expand All @@ -182,7 +182,7 @@ public void readFullBlockRequestSpaceError() throws Exception {
ByteBuffer buffer = mReader.read(0, TEST_BLOCK_SIZE);
assertTrue(BufferUtils.equalIncreasingByteBuffer(0, (int) TEST_BLOCK_SIZE, buffer));
mReader.close();
Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(SESSION_ID, BLOCK_ID));
Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(BLOCK_ID));
}

@Test
Expand All @@ -195,7 +195,7 @@ public void readFullBlockRequestCreateBlockError() throws Exception {
ByteBuffer buffer = mReader.read(0, TEST_BLOCK_SIZE);
assertTrue(BufferUtils.equalIncreasingByteBuffer(0, (int) TEST_BLOCK_SIZE, buffer));
mReader.close();
Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(SESSION_ID, BLOCK_ID));
Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(BLOCK_ID));
}

@Test
Expand Down Expand Up @@ -232,7 +232,7 @@ public void transferPartialBlock() throws Exception {
buf.release();
}
// partial block should not be cached
Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(SESSION_ID, BLOCK_ID));
Assert.assertNull(mAlluxioBlockStore.getTempBlockMeta(BLOCK_ID));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ public AtomicReference<Long> getWorkerId() {
}

@Override
public TempBlockMeta getTempBlockMeta(long sessionId, long blockId) {
return mBlockStore.getTempBlockMeta(sessionId, blockId);
public TempBlockMeta getTempBlockMeta(long blockId) {
return mBlockStore.getTempBlockMeta(blockId);
}
}

Expand Down
Loading