Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data corrupted issue when loading from UFS #18500

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions core/common/src/main/java/alluxio/worker/block/io/BlockReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,17 @@ public BlockReader() {
* @return an informational string of the location the reader is reading from
*/
public abstract String getLocation();

/**
* Abort the reader. This method should be called when an exception occurs during execution
* or when the operation is cancelled by the user.
*/
public void abort() throws IOException {
}

/**
* Commit the reader in case there is any uncommitted block cached from UFS.
*/
public void commit() throws IOException {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,32 @@
public class DelegatingBlockReader extends BlockReader {
private final BlockReader mBlockReader;
private final Closer mCloser;
private final IOFunction mCommitter;
private final IOFunction mAborter;

/**
* Default constructor for the abstract reader implementations.
* @param blockReader block reader
* @param closeable closer
* @param closer closer
*/
public DelegatingBlockReader(BlockReader blockReader, Closeable closeable) {
public DelegatingBlockReader(BlockReader blockReader, Closeable closer) {
this(blockReader, closer, null, null);
}

/**
* Default constructor for the abstract reader implementations.
* @param blockReader block reader
* @param closer closer
* @param committer committer
* @param aborter aborter
*/
public DelegatingBlockReader(BlockReader blockReader, Closeable closer,
IOFunction committer, IOFunction aborter) {
mCloser = Closer.create();
mBlockReader = mCloser.register(blockReader);
mCloser.register(closeable);
mCloser.register(closer);
mCommitter = committer;
mAborter = aborter;
}

/**
Expand Down Expand Up @@ -83,4 +99,18 @@ public String toString() {
public void close() throws IOException {
mCloser.close();
}

@Override
public void abort() throws IOException {
if (mAborter != null) {
mAborter.call();
}
}

@Override
public void commit() throws IOException {
if (mCommitter != null) {
mCommitter.call();
}
}
}
25 changes: 25 additions & 0 deletions core/common/src/main/java/alluxio/worker/block/io/IOFunction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.worker.block.io;

import java.io.IOException;

/**
* A callable for IO functions.
*/
public interface IOFunction {
/**
* Compute with some io functions.
* @throws IOException
*/
void call() throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks throwing IOE here makes the catch clause tricky.
Actually we are not really handling the IOE when abort fails, e.g. in following code like

catch (Exception e) {
        reader.abort();
        if (e instanceof IOException) {
          throw (IOException) e;
        }
        throw new RuntimeException(e);
      }
}

For simplicity, how about just use Runnable interface (without being able to throw IOE)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had tried Runnable, the abort and commit method will wrap the IOE into RuntimeException, which might break the current exception handling chain. I'm not sure how the callers of the method I modified are handling the IOE. To be safe, I still prefer to keep current logic, if it's an IOE, I still throw an IOE.

}
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,24 @@ private CacheResult cacheBlockFromUfs(long blockId, long blockSize,
Protocol.OpenUfsBlockOptions openUfsBlockOptions) throws IOException {
try (BlockReader reader = mBlockWorker.createUfsBlockReader(
Sessions.CACHE_UFS_SESSION_ID, blockId, 0, false, openUfsBlockOptions)) {
// Read the entire block, caching to block store will be handled internally in UFS block store
// when close the reader.
// Note that, we read from UFS with a smaller buffer to avoid high pressure on heap
// memory when concurrent async requests are received and thus trigger GC.
long offset = 0;
while (offset < blockSize) {
long bufferSize = Math.min(8L * Constants.MB, blockSize - offset);
reader.read(offset, bufferSize);
offset += bufferSize;
try {
// Read the entire block, caching to block store will be handled internally in UFS block
// store when close the reader.
// Note that, we read from UFS with a smaller buffer to avoid high pressure on heap
// memory when concurrent async requests are received and thus trigger GC.
long offset = 0;
while (offset < blockSize) {
long bufferSize = Math.min(8L * Constants.MB, blockSize - offset);
reader.read(offset, bufferSize);
offset += bufferSize;
}
reader.commit();
} catch (Exception e) {
reader.abort();
if (e instanceof IOException) {
throw (IOException) e;
}
throw new RuntimeException(e);
}
Comment on lines +274 to 292
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Readability: don't need another try but we can just add catch clause to the original try (BlockReader reader = mBlockWorker.createUfsBlockReader(.

try (BlockReader reader = mBlockWorker.createUfsBlockReader(
        Sessions.CACHE_UFS_SESSION_ID, blockId, 0, false, openUfsBlockOptions)) {
  // keep the same 
} catch (Exception e) {
  // handle exception
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, I had tried that actually, but I cannot call reader.abort() inside the catch clause because the it's out the scope of reader's lifecycle.

}
return CacheResult.SUCCEED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,14 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse
BlockReader reader = mUnderFileSystemBlockStore.createBlockReader(sessionId, blockId, offset,
positionShort, options);
BlockReader blockReader = new DelegatingBlockReader(reader,
() -> closeUfsBlock(sessionId, blockId, true));
() -> closeUfsBlock(sessionId, blockId),
() -> commitUfsBlock(sessionId, blockId),
() -> abortUfsBlock(sessionId, blockId));
Metrics.WORKER_ACTIVE_CLIENTS.inc();
return blockReader;
} catch (Exception e) {
try {
closeUfsBlock(sessionId, blockId, false);
closeUfsBlock(sessionId, blockId);
} catch (Exception ee) {
LOG.warn("Failed to close UFS block", ee);
}
Expand All @@ -206,18 +208,12 @@ public BlockReader createUfsBlockReader(long sessionId, long blockId, long offse
}
}

private void closeUfsBlock(long sessionId, long blockId, boolean successful)
private void closeUfsBlock(long sessionId, long blockId)
throws IOException {
try {
mUnderFileSystemBlockStore.closeBlock(sessionId, blockId);
Optional<TempBlockMeta> tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId);
if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) {
if (successful) {
commitBlock(sessionId, blockId, false);
} else {
abortBlock(sessionId, blockId);
}
} else {
if (!tempBlockMeta.isPresent() || tempBlockMeta.get().getSessionId() != sessionId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can merge the if conditions

// When getTempBlockMeta() return null, such as a block readType NO_CACHE writeType THROUGH.
// Counter will not be decrement in the commitblock().
// So we should decrement counter here.
Expand All @@ -230,6 +226,30 @@ private void closeUfsBlock(long sessionId, long blockId, boolean successful)
}
}

private void abortUfsBlock(long sessionId, long blockId) throws IOException {
mUnderFileSystemBlockStore.closeBlock(sessionId, blockId);
Optional<TempBlockMeta> tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId);
if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we have a log warn if this condition does not meet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call, just added

abortBlock(sessionId, blockId);
} else {
LOG.warn("Skipping abort of UFS block due to missing temp block "
+ "metadata or mismatched session ID. Block ID: {}, Session ID: {}",
blockId, sessionId);
}
}

private void commitUfsBlock(long sessionId, long blockId) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: naming seems a bit weird since we are commiting Alluxio blocks

mUnderFileSystemBlockStore.closeBlock(sessionId, blockId);
Optional<TempBlockMeta> tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId);
if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) {
commitBlock(sessionId, blockId, false);
} else {
LOG.warn("Skipping commit of UFS block due to missing temp block "
+ "metadata or mismatched session ID. Block ID: {}, Session ID: {}",
blockId, sessionId);
}
}

@Override
public BlockWriter createBlockWriter(long sessionId, long blockId) throws IOException {
return mLocalBlockStore.createBlockWriter(sessionId, blockId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -550,12 +551,23 @@ protected DataBuffer getDataBuffer(BlockReadRequestContext context, long offset,
try {
while (buf.writableBytes() > 0 && blockReader.transferTo(buf) != -1) {
}
blockReader.commit();
return new NettyDataBuffer(buf.retain());
} catch (Exception e) {
blockReader.abort();
throw e;
} finally {
buf.release();
}
} else {
ByteBuffer buffer = blockReader.read(offset, len);
ByteBuffer buffer;
try {
buffer = blockReader.read(offset, len);
blockReader.commit();
} catch (IOException e) {
blockReader.abort();
throw e;
}
return new NettyDataBuffer(Unpooled.wrappedBuffer(buffer));
}
default:
Expand Down
Loading