Skip to content

Commit

Permalink
Fix data corrupted issue when loading from UFS
Browse files Browse the repository at this point in the history
  • Loading branch information
beinan committed Jan 29, 2024
1 parent e939a39 commit 8a89d03
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 23 deletions.
13 changes: 13 additions & 0 deletions core/common/src/main/java/alluxio/worker/block/io/BlockReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,17 @@ public BlockReader() {
* @return an informational string of the location the reader is reading from
*/
public abstract String getLocation();

/**
* Abort the reader. This method should be called when an exception occurs during execution
* or when the operation is cancelled by the user.
*/
public void abort() throws IOException {
}

/**
* Commit the reader in case there is any uncommitted block cached from UFS.
*/
public void commit() throws IOException {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,32 @@
public class DelegatingBlockReader extends BlockReader {
private final BlockReader mBlockReader;
private final Closer mCloser;
private final IOFunction mCommitter;
private final IOFunction mAborter;

/**
* Default constructor for the abstract reader implementations.
* @param blockReader block reader
* @param closeable closer
* @param closer closer
*/
public DelegatingBlockReader(BlockReader blockReader, Closeable closeable) {
public DelegatingBlockReader(BlockReader blockReader, Closeable closer) {
this(blockReader, closer, null, null);
}

/**
* Default constructor for the abstract reader implementations.
* @param blockReader block reader
* @param closer closer
* @param committer committer
* @param aborter aborter
*/
public DelegatingBlockReader(BlockReader blockReader, Closeable closer,
IOFunction committer, IOFunction aborter) {
mCloser = Closer.create();
mBlockReader = mCloser.register(blockReader);
mCloser.register(closeable);
mCloser.register(closer);
mCommitter = committer;
mAborter = aborter;
}

/**
Expand Down Expand Up @@ -83,4 +99,18 @@ public String toString() {
public void close() throws IOException {
mCloser.close();
}

@Override
public void abort() throws IOException {
if (mAborter != null) {
mAborter.call();
}
}

@Override
public void commit() throws IOException {
if (mCommitter != null) {
mCommitter.call();
}
}
}
25 changes: 25 additions & 0 deletions core/common/src/main/java/alluxio/worker/block/io/IOFunction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.worker.block.io;

import java.io.IOException;

/**
* A callable for IO functions.
*/
public interface IOFunction {
/**
* Compute with some io functions.
* @throws IOException
*/
void call() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,24 @@ private CacheResult cacheBlockFromUfs(long blockId, long blockSize,
Protocol.OpenUfsBlockOptions openUfsBlockOptions) throws IOException {
try (BlockReader reader = mBlockWorker.createUfsBlockReader(
Sessions.CACHE_UFS_SESSION_ID, blockId, 0, false, openUfsBlockOptions)) {
// Read the entire block, caching to block store will be handled internally in UFS block store
// when close the reader.
// Note that, we read from UFS with a smaller buffer to avoid high pressure on heap
// memory when concurrent async requests are received and thus trigger GC.
long offset = 0;
while (offset < blockSize) {
long bufferSize = Math.min(8L * Constants.MB, blockSize - offset);
reader.read(offset, bufferSize);
offset += bufferSize;
try {
// Read the entire block, caching to block store will be handled internally in UFS block
// store when close the reader.
// Note that, we read from UFS with a smaller buffer to avoid high pressure on heap
// memory when concurrent async requests are received and thus trigger GC.
long offset = 0;
while (offset < blockSize) {
long bufferSize = Math.min(8L * Constants.MB, blockSize - offset);
reader.read(offset, bufferSize);
offset += bufferSize;
}
reader.commit();
} catch (Exception e) {
reader.abort();
if (e instanceof IOException) {
throw (IOException) e;
}
throw new RuntimeException(e);
}
}
return CacheResult.SUCCEED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,14 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse
BlockReader reader = mUnderFileSystemBlockStore.createBlockReader(sessionId, blockId, offset,
positionShort, options);
BlockReader blockReader = new DelegatingBlockReader(reader,
() -> closeUfsBlock(sessionId, blockId, true));
() -> closeUfsBlock(sessionId, blockId),
() -> commitUfsBlock(sessionId, blockId),
() -> abortUfsBlock(sessionId, blockId));
Metrics.WORKER_ACTIVE_CLIENTS.inc();
return blockReader;
} catch (Exception e) {
try {
closeUfsBlock(sessionId, blockId, false);
closeUfsBlock(sessionId, blockId);
} catch (Exception ee) {
LOG.warn("Failed to close UFS block", ee);
}
Expand All @@ -206,18 +208,12 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse
}
}

private void closeUfsBlock(long sessionId, long blockId, boolean successful)
private void closeUfsBlock(long sessionId, long blockId)
throws IOException {
try {
mUnderFileSystemBlockStore.closeBlock(sessionId, blockId);
Optional<TempBlockMeta> tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId);
if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) {
if (successful) {
commitBlock(sessionId, blockId, false);
} else {
abortBlock(sessionId, blockId);
}
} else {
if (!tempBlockMeta.isPresent() || tempBlockMeta.get().getSessionId() != sessionId) {
// When getTempBlockMeta() return null, such as a block readType NO_CACHE writeType THROUGH.
// Counter will not be decrement in the commitblock().
// So we should decrement counter here.
Expand All @@ -230,6 +226,22 @@ private void closeUfsBlock(long sessionId, long blockId, boolean successful)
}
}

private void abortUfsBlock(long sessionId, long blockId) throws IOException {
mUnderFileSystemBlockStore.closeBlock(sessionId, blockId);
Optional<TempBlockMeta> tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId);
if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) {
abortBlock(sessionId, blockId);
}
}

private void commitUfsBlock(long sessionId, long blockId) throws IOException {
mUnderFileSystemBlockStore.closeBlock(sessionId, blockId);
Optional<TempBlockMeta> tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId);
if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) {
commitBlock(sessionId, blockId, false);
}
}

@Override
public BlockWriter createBlockWriter(long sessionId, long blockId) throws IOException {
return mLocalBlockStore.createBlockWriter(sessionId, blockId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -550,12 +551,23 @@ protected DataBuffer getDataBuffer(BlockReadRequestContext context, long offset,
try {
while (buf.writableBytes() > 0 && blockReader.transferTo(buf) != -1) {
}
blockReader.commit();
return new NettyDataBuffer(buf.retain());
} catch (Exception e) {
blockReader.abort();
throw e;
} finally {
buf.release();
}
} else {
ByteBuffer buffer = blockReader.read(offset, len);
ByteBuffer buffer;
try {
buffer = blockReader.read(offset, len);
blockReader.commit();
} catch (IOException e) {
blockReader.abort();
throw e;
}
return new NettyDataBuffer(Unpooled.wrappedBuffer(buffer));
}
default:
Expand Down

0 comments on commit 8a89d03

Please sign in to comment.