Skip to content

Commit

Permalink
Concurrent file chunk fetching for CCR restore (#38495)
Browse files Browse the repository at this point in the history
Adds the ability to fetch chunks from different files in parallel, configurable using the new `ccr.indices.recovery.max_concurrent_file_chunks` setting, which defaults to 5 in this PR.

The implementation uses the parallel file writer functionality that is also used by peer recoveries.
  • Loading branch information
ywelsch authored and Tim-Brooks committed Feb 8, 2019
1 parent a1c63a3 commit 9b75a70
Show file tree
Hide file tree
Showing 15 changed files with 472 additions and 297 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ private static int indexOfKeptCommits(List<? extends IndexCommit> commits, long
return 0;
}

/**
* Checks whether the deletion policy is holding on to snapshotted commits
*/
synchronized boolean hasSnapshottedCommits() {
return snapshottedCommits.isEmpty() == false;
}

/**
* Checks if the deletion policy can release some index commits with the latest global checkpoint.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,11 @@ Translog getTranslog() {
return translog;
}

// Package private for testing purposes only
boolean hasSnapshottedCommits() {
return combinedDeletionPolicy.hasSnapshottedCommits();
}

@Override
public boolean isTranslogSyncNeeded() {
return getTranslog().syncNeeded();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentMap;

public class MultiFileWriter implements Releasable {

public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempFilePrefix, Logger logger, Runnable ensureOpen) {
this.store = store;
this.indexState = indexState;
this.tempFilePrefix = tempFilePrefix;
this.logger = logger;
this.ensureOpen = ensureOpen;
}

private final Runnable ensureOpen;
private final Logger logger;
private final Store store;
private final RecoveryState.Index indexState;
private final String tempFilePrefix;

private final ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<String, FileChunkWriter> fileChunkWriters = ConcurrentCollections.newConcurrentMap();


final Map<String, String> tempFileNames = ConcurrentCollections.newConcurrentMap();

public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean lastChunk)
throws IOException {
final FileChunkWriter writer = fileChunkWriters.computeIfAbsent(fileMetaData.name(), name -> new FileChunkWriter());
writer.writeChunk(new FileChunk(fileMetaData, content, position, lastChunk));
}

/** Get a temporary name for the provided file name. */
String getTempNameForFile(String origFile) {
return tempFilePrefix + origFile;
}

public IndexOutput getOpenIndexOutput(String key) {
ensureOpen.run();
return openIndexOutputs.get(key);
}

/** remove and {@link IndexOutput} for a given file. It is the caller's responsibility to close it */
public IndexOutput removeOpenIndexOutputs(String name) {
ensureOpen.run();
return openIndexOutputs.remove(name);
}

/**
* Creates an {@link IndexOutput} for the given file name. Note that the
* IndexOutput actually point at a temporary file.
* <p>
* Note: You can use {@link #getOpenIndexOutput(String)} with the same filename to retrieve the same IndexOutput
* at a later stage
*/
public IndexOutput openAndPutIndexOutput(String fileName, StoreFileMetaData metaData, Store store) throws IOException {
ensureOpen.run();
String tempFileName = getTempNameForFile(fileName);
if (tempFileNames.containsKey(tempFileName)) {
throw new IllegalStateException("output for file [" + fileName + "] has already been created");
}
// add first, before it's created
tempFileNames.put(tempFileName, fileName);
IndexOutput indexOutput = store.createVerifyingOutput(tempFileName, metaData, IOContext.DEFAULT);
openIndexOutputs.put(fileName, indexOutput);
return indexOutput;
}

private void innerWriteFileChunk(StoreFileMetaData fileMetaData, long position,
BytesReference content, boolean lastChunk) throws IOException {
final String name = fileMetaData.name();
IndexOutput indexOutput;
if (position == 0) {
indexOutput = openAndPutIndexOutput(name, fileMetaData, store);
} else {
indexOutput = getOpenIndexOutput(name);
}
assert indexOutput.getFilePointer() == position : "file-pointer " + indexOutput.getFilePointer() + " != " + position;
BytesRefIterator iterator = content.iterator();
BytesRef scratch;
while((scratch = iterator.next()) != null) { // we iterate over all pages - this is a 0-copy for all core impls
indexOutput.writeBytes(scratch.bytes, scratch.offset, scratch.length);
}
indexState.addRecoveredBytesToFile(name, content.length());
if (indexOutput.getFilePointer() >= fileMetaData.length() || lastChunk) {
try {
Store.verify(indexOutput);
} finally {
// we are done
indexOutput.close();
}
final String temporaryFileName = getTempNameForFile(name);
assert Arrays.asList(store.directory().listAll()).contains(temporaryFileName) :
"expected: [" + temporaryFileName + "] in " + Arrays.toString(store.directory().listAll());
store.directory().sync(Collections.singleton(temporaryFileName));
IndexOutput remove = removeOpenIndexOutputs(name);
assert remove == null || remove == indexOutput; // remove maybe null if we got finished
}
}

@Override
public void close() {
fileChunkWriters.clear();
// clean open index outputs
Iterator<Map.Entry<String, IndexOutput>> iterator = openIndexOutputs.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, IndexOutput> entry = iterator.next();
logger.trace("closing IndexOutput file [{}]", entry.getValue());
try {
entry.getValue().close();
} catch (Exception e) {
logger.debug(() -> new ParameterizedMessage("error while closing recovery output [{}]", entry.getValue()), e);
}
iterator.remove();
}
if (Strings.hasText(tempFilePrefix)) {
// trash temporary files
for (String file : tempFileNames.keySet()) {
logger.trace("cleaning temporary file [{}]", file);
store.deleteQuiet(file);
}
}
}

/** renames all temporary files to their true name, potentially overriding existing files */
public void renameAllTempFiles() throws IOException {
ensureOpen.run();
store.renameTempFilesSafe(tempFileNames);
}

static final class FileChunk {
final StoreFileMetaData md;
final BytesReference content;
final long position;
final boolean lastChunk;
FileChunk(StoreFileMetaData md, BytesReference content, long position, boolean lastChunk) {
this.md = md;
this.content = content;
this.position = position;
this.lastChunk = lastChunk;
}
}

private final class FileChunkWriter {
// chunks can be delivered out of order, we need to buffer chunks if there's a gap between them.
final PriorityQueue<FileChunk> pendingChunks = new PriorityQueue<>(Comparator.comparing(fc -> fc.position));
long lastPosition = 0;

void writeChunk(FileChunk newChunk) throws IOException {
synchronized (this) {
pendingChunks.add(newChunk);
}
while (true) {
final FileChunk chunk;
synchronized (this) {
chunk = pendingChunks.peek();
if (chunk == null || chunk.position != lastPosition) {
return;
}
pendingChunks.remove();
}
innerWriteFileChunk(chunk.md, chunk.position, chunk.content, chunk.lastChunk);
synchronized (this) {
assert lastPosition == chunk.position : "last_position " + lastPosition + " != chunk_position " + chunk.position;
lastPosition += chunk.content.length();
if (chunk.lastChunk) {
assert pendingChunks.isEmpty() == true : "still have pending chunks [" + pendingChunks + "]";
fileChunkWriters.remove(chunk.md.name());
assert fileChunkWriters.containsValue(this) == false : "chunk writer [" + newChunk.md + "] was not removed";
}
}
}
}
}
}
Loading

0 comments on commit 9b75a70

Please sign in to comment.