-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix data corrupted issue when loading from UFS #18500
Conversation
66c2662
to
8a89d03
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing this issue!
Comments left.
try { | ||
// Read the entire block, caching to block store will be handled internally in UFS block | ||
// store when close the reader. | ||
// Note that, we read from UFS with a smaller buffer to avoid high pressure on heap | ||
// memory when concurrent async requests are received and thus trigger GC. | ||
long offset = 0; | ||
while (offset < blockSize) { | ||
long bufferSize = Math.min(8L * Constants.MB, blockSize - offset); | ||
reader.read(offset, bufferSize); | ||
offset += bufferSize; | ||
} | ||
reader.commit(); | ||
} catch (Exception e) { | ||
reader.abort(); | ||
if (e instanceof IOException) { | ||
throw (IOException) e; | ||
} | ||
throw new RuntimeException(e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Readability: don't need another try
but we can just add catch clause to the original try (BlockReader reader = mBlockWorker.createUfsBlockReader(
.
try (BlockReader reader = mBlockWorker.createUfsBlockReader(
Sessions.CACHE_UFS_SESSION_ID, blockId, 0, false, openUfsBlockOptions)) {
// keep the same
} catch (Exception e) {
// handle exception
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, I had tried that actually, but I cannot call reader.abort() inside the catch clause because the it's out the scope of reader's lifecycle.
* Compute with some io functions. | ||
* @throws IOException | ||
*/ | ||
void call() throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks throwing IOE here makes the catch
clause tricky.
Actually we are not really handling the IOE when abort fails, e.g. in following code like
catch (Exception e) {
reader.abort();
if (e instanceof IOException) {
throw (IOException) e;
}
throw new RuntimeException(e);
}
}
For simplicity, how about just use Runnable
interface (without being able to throw IOE)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had tried Runnable, the abort and commit method will wrap the IOE into RuntimeException, which might break the current exception handling chain. I'm not sure how the callers of the method I modified are handling the IOE. To be safe, I still prefer to keep current logic, if it's an IOE, I still throw an IOE.
private void abortUfsBlock(long sessionId, long blockId) throws IOException { | ||
mUnderFileSystemBlockStore.closeBlock(sessionId, blockId); | ||
Optional<TempBlockMeta> tempBlockMeta = mLocalBlockStore.getTempBlockMeta(blockId); | ||
if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == sessionId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we have a log warn if this condition does not meet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good call, just added
8a89d03
to
37a1892
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a high level comment, does the same issue apply to other types of block readers? In this PR, we only addressed DelegatingBlockReader. Just want to make sure we catch them all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fix seems like would fail in short-circuit read & page store
abortBlock(sessionId, blockId); | ||
} | ||
} else { | ||
if (!tempBlockMeta.isPresent() || tempBlockMeta.get().getSessionId() != sessionId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can merge the if conditions
} | ||
} | ||
|
||
private void commitUfsBlock(long sessionId, long blockId) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: naming seems a bit weird since we are commiting Alluxio blocks
replaced by #18525 |
What changes are proposed in this pull request?
A this fix a bug in corrupted data files.
Previously, #17497 attempt to solve this issue but only covers the case when creating a UFS reader
Why are the changes needed?
Please clarify why the changes are needed. For instance,
Does this PR introduce any user facing changes?
Please list the user-facing changes introduced by your change, including