Skip to content

Commit

Permalink
Fix corrupted block causing reads to fail
Browse files Browse the repository at this point in the history
Cherry-pick of existing commit.
orig-pr: #17564
orig-commit: 87ddfd4
orig-commit-author: Bowen Ding <6999708+dbw9580@users.noreply.github.com>

			pr-link: #17594
			change-id: cid-2758ff97e5016c1aae7ededb244e919233ae6d3b
  • Loading branch information
alluxio-bot authored Jun 12, 2023
1 parent e9c0c5a commit de322c3
Show file tree
Hide file tree
Showing 6 changed files with 400 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@
public class BlockDoesNotExistRuntimeException extends NotFoundRuntimeException {

/**
* Constructs a new exception with the specified detail message and cause.
* Constructs a new exception with the specified block ID.
*
* @param blockId block id
*/
public BlockDoesNotExistRuntimeException(long blockId) {
super(MessageFormat.format("BlockMeta not found for blockId {0,number,#}", blockId));
}

/**
* Constructs a new exception with the specified block ID and cause.
*
* @param blockId block id
* @param cause why the block is not found
*/
public BlockDoesNotExistRuntimeException(long blockId, Throwable cause) {
super(MessageFormat.format("Block {0,number,#} not found", blockId), cause);
}
}
19 changes: 19 additions & 0 deletions core/common/src/main/java/alluxio/util/io/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,25 @@ public static void delete(String path) {
}
}

/**
* Deletes the file or directory, if it exists.
*
* @param path pathname string of file or directory
*/
public static void deleteIfExists(String path) {
try {
Files.deleteIfExists(Paths.get(path));
} catch (java.nio.file.InvalidPathException e) {
throw new InvalidArgumentRuntimeException(e);
} catch (DirectoryNotEmptyException e) {
throw new FailedPreconditionRuntimeException(e);
} catch (SecurityException e) {
throw new PermissionDeniedRuntimeException(e);
} catch (IOException e) {
throw new UnknownRuntimeException(e);
}
}

/**
* Deletes a file or a directory, recursively if it is a directory.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,23 @@ public BlockReader createBlockReader(long sessionId, long blockId, long offset,
boolean positionShort, Protocol.OpenUfsBlockOptions options)
throws IOException {
BlockReader reader;
Optional<? extends BlockMeta> blockMeta = mLocalBlockStore.getVolatileBlockMeta(blockId);
if (blockMeta.isPresent()) {
// first try reading from Alluxio cache
try {
reader = mLocalBlockStore.createBlockReader(sessionId, blockId, offset);
DefaultBlockWorker.Metrics.WORKER_ACTIVE_CLIENTS.inc();
} else {
return reader;
} catch (BlockDoesNotExistRuntimeException e) {
LOG.debug("Block {} does not exist in Alluxio cache: {}", blockId, e.getMessage());
// the block does not exist in Alluxio, try loading from UFS
boolean checkUfs = options != null && (options.hasUfsPath() || options.getBlockInUfsTier());
if (!checkUfs) {
throw new BlockDoesNotExistRuntimeException(blockId);
throw e;
}
// When the block does not exist in Alluxio but exists in UFS, try to open the UFS block.
reader = createUfsBlockReader(sessionId, blockId, offset, positionShort, options);
DefaultBlockWorker.Metrics.WORKER_ACTIVE_CLIENTS.inc();
return reader;
}
return reader;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@
import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.BasicFileAttributes;
import java.text.MessageFormat;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -188,8 +193,35 @@ public BlockReader createBlockReader(long sessionId, long blockId, long offset)
blockLock.close();
throw new BlockDoesNotExistRuntimeException(blockId);
}
BlockMeta block = blockMeta.get();
try {
BlockReader reader = new StoreBlockReader(sessionId, blockMeta.get());
validateBlockIntegrityForRead(block);
} catch (IllegalStateException validationError) {
LOG.warn("Block {} is corrupted, removing it: {}",
blockId, validationError.getMessage());
// in case of a corrupted block, remove it and propagate the exception
// release the read lock because removeBlockInternal needs a write lock on the same block
blockLock.close();
// at this point we are not holding any lock, so two threads may attempt to remove the same
// block concurrently. This is fine as long as removeBlockInternal is no-op for a
// non-existing block.
try {
removeBlockInternal(sessionId, blockId, REMOVE_BLOCK_TIMEOUT_MS);
for (BlockStoreEventListener listener : mBlockStoreEventListeners) {
synchronized (listener) {
listener.onRemoveBlockByWorker(blockId);
listener.onRemoveBlock(blockId, block.getBlockLocation());
}
}
} catch (Exception removeBlockError) {
LOG.warn("Failed to remove a corrupted block {}", blockId, removeBlockError);
validationError.addSuppressed(removeBlockError);
}
throw new BlockDoesNotExistRuntimeException(blockId, validationError);
}

try {
BlockReader reader = new StoreBlockReader(sessionId, block);
((FileChannel) reader.getChannel()).position(offset);
accessBlock(sessionId, blockId);
return new DelegatingBlockReader(reader, blockLock);
Expand All @@ -200,6 +232,57 @@ public BlockReader createBlockReader(long sessionId, long blockId, long offset)
}
}

/**
* Validates the integrity of the block for reading:
* 1. the block file should exist
* 2. the length of the block file should match its BlockMeta
* If any of the above does not hold, this can be a result of corrupted block files
* due to faulty storage hardware, manual manipulation of the block files by admin,
* or a bug where the block was pre-maturely committed when it was not done writing.
*
* @param blockMeta the block meta acquired from meta data manager
* @throws IllegalStateException if the block is deemed corrupted
*/
public static void validateBlockIntegrityForRead(BlockMeta blockMeta)
throws IllegalStateException {
final long blockId = blockMeta.getBlockId();
final Path blockPath = Paths.get(blockMeta.getPath());
final BasicFileAttributes blockFileAttrs;
try {
blockFileAttrs = Files.readAttributes(blockPath, BasicFileAttributes.class);
} catch (NoSuchFileException e) {
throw new IllegalStateException(String.format(
"Block %s exists in block meta but actual physical block file %s does not exist",
blockId, blockPath));
} catch (IOException e) {
// cannot read file attributes, possibly due to bad permission or bad file type
LOG.debug("Cannot read file attributes for block {}", blockId, e);
throw new IllegalStateException(String.format(
"Cannot read attributes of file %s for block %s during validation", blockId, blockPath));
}
// need to check if file is a regular file, as for directories and device files the file length
// is unspecified
if (!blockFileAttrs.isRegularFile()) {
throw new IllegalStateException(String.format(
"Block file %s for block %s is not a regular file", blockPath, blockId));
}
final long actualLength = blockFileAttrs.size();
final long expectedLength = blockMeta.getBlockSize();
// check if the actual file length matches the expected length from block meta
if (actualLength != expectedLength) {
LOG.debug("Block {} is expected to be {} bytes, "
+ "but the actual block file length is {}", blockId, expectedLength, actualLength);
// Note: we only errors out on 0-sized blocks which are definitely not correct
// but if the size is not 0, we treat it as valid
if (actualLength == 0) {
throw new IllegalStateException(String.format(
"Block %s exists in block meta but the size from block meta does not match that of "
+ "the block file %s, expected block size = %d, actual block file length = %d",
blockId, blockPath, expectedLength, actualLength));
}
}
}

@Override
public TempBlockMeta createBlock(long sessionId, long blockId, AllocateOptions options) {
LOG.debug("createBlock: sessionId={}, blockId={}, options={}", sessionId, blockId, options);
Expand Down Expand Up @@ -820,7 +903,7 @@ private MoveBlockResult moveBlockInternal(long sessionId, long blockId,
* @param blockMeta block metadata
*/
private void removeBlockFileAndMeta(BlockMeta blockMeta) {
FileUtils.delete(blockMeta.getPath());
FileUtils.deleteIfExists(blockMeta.getPath());
mMetaManager.removeBlockMeta(blockMeta);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import alluxio.worker.block.BlockStore;
import alluxio.worker.block.BlockStoreLocation;
import alluxio.worker.block.DefaultBlockWorker;
import alluxio.worker.block.TieredBlockStore;
import alluxio.worker.block.meta.BlockMeta;

import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -86,6 +87,13 @@ public OpenLocalBlockResponse call() throws Exception {
if (!meta.isPresent()) {
throw new BlockDoesNotExistRuntimeException(mRequest.getBlockId());
}
try {
// assuming the underlying BlockStore is TieredBlockStore, as it's the only impl
// that allows short-circuit read
TieredBlockStore.validateBlockIntegrityForRead(meta.get());
} catch (IllegalStateException validationError) {
throw new BlockDoesNotExistRuntimeException(mRequest.getBlockId(), validationError);
}
if (mRequest.getPromote()) {
// TODO(calvin): Move this logic into BlockStore#moveBlockInternal if possible
// Because the move operation is expensive, we first check if the operation is necessary
Expand Down
Loading

0 comments on commit de322c3

Please sign in to comment.