Skip to content

Commit

Permalink
Copy checkpoint atomically when rolling generation (#35407)
Browse files Browse the repository at this point in the history
Today when rolling a transog generation we copy the checkpoint from
`translog.ckp` to `translog-nnnn.ckp` using a simple `Files.copy()` followed by
appropriate `fsync()` calls. The copy operation is not atomic, so if we crash
at the wrong moment we can leave an incomplete checkpoint file on disk. In
practice the checkpoint is so small that it's either empty or fully written.
However, we do not correctly handle the case where it's empty when the node
restarts.

In contrast, in `recoverFromFiles()` we _do_ copy the checkpoint atomically.
This commit extracts the atomic copy operation from `recoverFromFiles()` and
re-uses it in `rollGeneration()`.
  • Loading branch information
DaveCTurner authored Nov 23, 2018
1 parent be69a77 commit d01436d
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 19 deletions.
41 changes: 22 additions & 19 deletions server/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,6 @@ public Translog(
private ArrayList<TranslogReader> recoverFromFiles(Checkpoint checkpoint) throws IOException {
boolean success = false;
ArrayList<TranslogReader> foundTranslogs = new ArrayList<>();
// a temp file to copy checkpoint to - note it must be in on the same FS otherwise atomic move won't work
final Path tempFile = Files.createTempFile(location, TRANSLOG_FILE_PREFIX, TRANSLOG_FILE_SUFFIX);
boolean tempFileRenamed = false;
try (ReleasableLock lock = writeLock.acquire()) {
logger.debug("open uncommitted translog checkpoint {}", checkpoint);

Expand Down Expand Up @@ -263,20 +260,32 @@ private ArrayList<TranslogReader> recoverFromFiles(Checkpoint checkpoint) throws
" already exists but has corrupted content expected: " + checkpoint + " but got: " + checkpointFromDisk);
}
} else {
// we first copy this into the temp-file and then fsync it followed by an atomic move into the target file
// that way if we hit a disk-full here we are still in an consistent state.
Files.copy(location.resolve(CHECKPOINT_FILE_NAME), tempFile, StandardCopyOption.REPLACE_EXISTING);
IOUtils.fsync(tempFile, false);
Files.move(tempFile, commitCheckpoint, StandardCopyOption.ATOMIC_MOVE);
tempFileRenamed = true;
// we only fsync the directory the tempFile was already fsynced
IOUtils.fsync(commitCheckpoint.getParent(), true);
copyCheckpointTo(commitCheckpoint);
}
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(foundTranslogs);
}
}
return foundTranslogs;
}

private void copyCheckpointTo(Path targetPath) throws IOException {
// a temp file to copy checkpoint to - note it must be in on the same FS otherwise atomic move won't work
final Path tempFile = Files.createTempFile(location, TRANSLOG_FILE_PREFIX, CHECKPOINT_SUFFIX);
boolean tempFileRenamed = false;

try {
// we first copy this into the temp-file and then fsync it followed by an atomic move into the target file
// that way if we hit a disk-full here we are still in an consistent state.
Files.copy(location.resolve(CHECKPOINT_FILE_NAME), tempFile, StandardCopyOption.REPLACE_EXISTING);
IOUtils.fsync(tempFile, false);
Files.move(tempFile, targetPath, StandardCopyOption.ATOMIC_MOVE);
tempFileRenamed = true;
// we only fsync the directory the tempFile was already fsynced
IOUtils.fsync(targetPath.getParent(), true);
} finally {
if (tempFileRenamed == false) {
try {
Files.delete(tempFile);
Expand All @@ -285,7 +294,6 @@ private ArrayList<TranslogReader> recoverFromFiles(Checkpoint checkpoint) throws
}
}
}
return foundTranslogs;
}

TranslogReader openReader(Path path, Checkpoint checkpoint) throws IOException {
Expand Down Expand Up @@ -1643,13 +1651,8 @@ public void rollGeneration() throws IOException {
try {
final TranslogReader reader = current.closeIntoReader();
readers.add(reader);
final Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME);
assert Checkpoint.read(checkpoint).generation == current.getGeneration();
final Path generationCheckpoint =
location.resolve(getCommitCheckpointFileName(current.getGeneration()));
Files.copy(checkpoint, generationCheckpoint);
IOUtils.fsync(generationCheckpoint, false);
IOUtils.fsync(generationCheckpoint.getParent(), true);
assert Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME)).generation == current.getGeneration();
copyCheckpointTo(location.resolve(getCommitCheckpointFileName(current.getGeneration())));
// create a new translog file; this will sync it and update the checkpoint data;
current = createWriter(current.getGeneration() + 1);
logger.trace("current translog set to [{}]", current.getGeneration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.Term;
import org.apache.lucene.mockfile.FilterFileChannel;
import org.apache.lucene.mockfile.FilterFileSystemProvider;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.store.MockDirectoryWrapper;
Expand Down Expand Up @@ -81,6 +82,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.file.CopyOption;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
Expand Down Expand Up @@ -3197,4 +3199,36 @@ public void close() throws IOException {
snapshot.close();
}
}

public void testCrashDuringCheckpointCopy() throws IOException {
final Path path = createTempDir();
final AtomicBoolean failOnCopy = new AtomicBoolean();
final String expectedExceptionMessage = "simulated failure during copy";
final FilterFileSystemProvider filterFileSystemProvider
= new FilterFileSystemProvider(path.getFileSystem().provider().getScheme(), path.getFileSystem()) {

@Override
public void copy(Path source, Path target, CopyOption... options) throws IOException {
if (failOnCopy.get() && source.toString().endsWith(Translog.CHECKPOINT_SUFFIX)) {
deleteIfExists(target);
Files.createFile(target);
throw new IOException(expectedExceptionMessage);
} else {
super.copy(source, target, options);
}
}
};

try (Translog brokenTranslog = create(filterFileSystemProvider.getPath(path.toUri()))) {
failOnCopy.set(true);
assertThat(expectThrows(IOException.class, brokenTranslog::rollGeneration).getMessage(), equalTo(expectedExceptionMessage));
assertFalse(brokenTranslog.isOpen());

try (Translog recoveredTranslog = new Translog(getTranslogConfig(path), brokenTranslog.getTranslogUUID(),
brokenTranslog.getDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) {
recoveredTranslog.rollGeneration();
assertFilePresences(recoveredTranslog);
}
}
}
}

0 comments on commit d01436d

Please sign in to comment.