Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy checkpoint atomically when rolling generation #35407

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 22 additions & 18 deletions server/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,6 @@ public Translog(
private ArrayList<TranslogReader> recoverFromFiles(Checkpoint checkpoint) throws IOException {
boolean success = false;
ArrayList<TranslogReader> foundTranslogs = new ArrayList<>();
final Path tempFile = Files.createTempFile(location, TRANSLOG_FILE_PREFIX, TRANSLOG_FILE_SUFFIX); // a temp file to copy checkpoint to - note it must be in on the same FS otherwise atomic move won't work
boolean tempFileRenamed = false;
try (ReleasableLock lock = writeLock.acquire()) {
logger.debug("open uncommitted translog checkpoint {}", checkpoint);

Expand Down Expand Up @@ -253,20 +251,32 @@ private ArrayList<TranslogReader> recoverFromFiles(Checkpoint checkpoint) throws
throw new IllegalStateException("Checkpoint file " + commitCheckpoint.getFileName() + " already exists but has corrupted content expected: " + checkpoint + " but got: " + checkpointFromDisk);
}
} else {
// we first copy this into the temp-file and then fsync it followed by an atomic move into the target file
// that way if we hit a disk-full here we are still in an consistent state.
Files.copy(location.resolve(CHECKPOINT_FILE_NAME), tempFile, StandardCopyOption.REPLACE_EXISTING);
IOUtils.fsync(tempFile, false);
Files.move(tempFile, commitCheckpoint, StandardCopyOption.ATOMIC_MOVE);
tempFileRenamed = true;
// we only fsync the directory the tempFile was already fsynced
IOUtils.fsync(commitCheckpoint.getParent(), true);
copyCheckpointTo(commitCheckpoint);
}
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(foundTranslogs);
}
}
return foundTranslogs;
}

private void copyCheckpointTo(Path targetPath) throws IOException {
// a temp file to copy checkpoint to - note it must be in on the same FS otherwise atomic move won't work
final Path tempFile = Files.createTempFile(location, TRANSLOG_FILE_PREFIX, CHECKPOINT_SUFFIX);
boolean tempFileRenamed = false;

try {
// we first copy this into the temp-file and then fsync it followed by an atomic move into the target file
// that way if we hit a disk-full here we are still in an consistent state.
Files.copy(location.resolve(CHECKPOINT_FILE_NAME), tempFile, StandardCopyOption.REPLACE_EXISTING);
IOUtils.fsync(tempFile, false);
Files.move(tempFile, targetPath, StandardCopyOption.ATOMIC_MOVE);
tempFileRenamed = true;
// we only fsync the directory the tempFile was already fsynced
IOUtils.fsync(targetPath.getParent(), true);
} finally {
if (tempFileRenamed == false) {
try {
Files.delete(tempFile);
Expand All @@ -275,7 +285,6 @@ private ArrayList<TranslogReader> recoverFromFiles(Checkpoint checkpoint) throws
}
}
}
return foundTranslogs;
}

TranslogReader openReader(Path path, Checkpoint checkpoint) throws IOException {
Expand Down Expand Up @@ -1630,13 +1639,8 @@ public void rollGeneration() throws IOException {
try {
final TranslogReader reader = current.closeIntoReader();
readers.add(reader);
final Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME);
assert Checkpoint.read(checkpoint).generation == current.getGeneration();
final Path generationCheckpoint =
location.resolve(getCommitCheckpointFileName(current.getGeneration()));
Files.copy(checkpoint, generationCheckpoint);
IOUtils.fsync(generationCheckpoint, false);
IOUtils.fsync(generationCheckpoint.getParent(), true);
assert Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME)).generation == current.getGeneration();
copyCheckpointTo(location.resolve(getCommitCheckpointFileName(current.getGeneration())));
// create a new translog file; this will sync it and update the checkpoint data;
current = createWriter(current.getGeneration() + 1);
logger.trace("current translog set to [{}]", current.getGeneration());
Expand Down