Skip to content

Commit

Permalink
Allow to prewarm the cache for searchable snapshot shards (#55322)
Browse files Browse the repository at this point in the history
Relates #50999
  • Loading branch information
tlrx authored Apr 24, 2020
1 parent 4afb360 commit bd40d06
Show file tree
Hide file tree
Showing 13 changed files with 596 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ public static boolean isSearchableSnapshotStore(Settings indexSettings) {
return SEARCHABLE_SNAPSHOTS_FEATURE_ENABLED
&& SNAPSHOT_DIRECTORY_FACTORY_KEY.equals(INDEX_STORE_TYPE_SETTING.get(indexSettings));
}

public static final String SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME = "searchable_snapshots";
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -136,6 +137,9 @@ protected final boolean assertCurrentThreadMayAccessBlobStore() {
|| threadName.contains('[' + ThreadPool.Names.SEARCH + ']')
|| threadName.contains('[' + ThreadPool.Names.SEARCH_THROTTLED + ']')

// Cache prewarming runs on a dedicated thread pool.
|| threadName.contains('[' + SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME + ']')

// Today processExistingRecoveries considers all shards and constructs a shard store snapshot on this thread, this needs
// addressing. TODO NORELEASE
|| threadName.contains('[' + ThreadPool.Names.FETCH_SHARD_STORE + ']')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
*/
package org.elasticsearch.index.store;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.store.BaseDirectory;
import org.apache.lucene.store.Directory;
Expand All @@ -18,8 +21,12 @@
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.LazyInitializable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
Expand Down Expand Up @@ -47,18 +54,22 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.lucene.store.BufferedIndexInput.bufferSize;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_REPOSITORY_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING;
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME;

/**
* Implementation of {@link Directory} that exposes files from a snapshot as a Lucene directory. Because snapshot are immutable this
Expand All @@ -73,15 +84,19 @@
*/
public class SearchableSnapshotDirectory extends BaseDirectory {

private static final Logger logger = LogManager.getLogger(SearchableSnapshotDirectory.class);

private final Supplier<BlobContainer> blobContainerSupplier;
private final Supplier<BlobStoreIndexShardSnapshot> snapshotSupplier;
private final SnapshotId snapshotId;
private final IndexId indexId;
private final ShardId shardId;
private final LongSupplier statsCurrentTimeNanosSupplier;
private final Map<String, IndexInputStats> stats;
private final ThreadPool threadPool;
private final CacheService cacheService;
private final boolean useCache;
private final boolean prewarmCache;
private final Set<String> excludedFileTypes;
private final long uncachedChunkSize; // if negative use BlobContainer#readBlobPreferredLength, see #getUncachedChunkSize()
private final Path cacheDir;
Expand All @@ -101,7 +116,8 @@ public SearchableSnapshotDirectory(
Settings indexSettings,
LongSupplier currentTimeNanosSupplier,
CacheService cacheService,
Path cacheDir
Path cacheDir,
ThreadPool threadPool
) {
super(new SingleInstanceLockFactory());
this.snapshotSupplier = Objects.requireNonNull(snapshot);
Expand All @@ -115,8 +131,10 @@ public SearchableSnapshotDirectory(
this.cacheDir = Objects.requireNonNull(cacheDir);
this.closed = new AtomicBoolean(false);
this.useCache = SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings);
this.prewarmCache = useCache ? SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING.get(indexSettings) : false;
this.excludedFileTypes = new HashSet<>(SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING.get(indexSettings));
this.uncachedChunkSize = SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING.get(indexSettings).getBytes();
this.threadPool = threadPool;
this.loaded = false;
assert invariant();
}
Expand All @@ -142,6 +160,7 @@ protected final boolean assertCurrentThreadMayLoadSnapshot() {
* @return true if the snapshot was loaded by executing this method, false otherwise
*/
public boolean loadSnapshot() {
assert assertCurrentThreadMayLoadSnapshot();
boolean alreadyLoaded = this.loaded;
if (alreadyLoaded == false) {
synchronized (this) {
Expand All @@ -150,10 +169,10 @@ public boolean loadSnapshot() {
this.blobContainer = blobContainerSupplier.get();
this.snapshot = snapshotSupplier.get();
this.loaded = true;
prewarmCache();
}
}
}
assert assertCurrentThreadMayLoadSnapshot();
assert invariant();
return alreadyLoaded == false;
}
Expand Down Expand Up @@ -300,7 +319,7 @@ public IndexInput openInput(final String name, final IOContext context) throws I

final IndexInputStats inputStats = stats.computeIfAbsent(name, n -> createIndexInputStats(fileInfo.length()));
if (useCache && isExcludedFromCache(name) == false) {
return new CachedBlobContainerIndexInput(this, fileInfo, context, inputStats);
return new CachedBlobContainerIndexInput(this, fileInfo, context, inputStats, cacheService.getRangeSize());
} else {
return new DirectBlobContainerIndexInput(
blobContainer(),
Expand Down Expand Up @@ -331,12 +350,86 @@ public String toString() {
return this.getClass().getSimpleName() + "@snapshotId=" + snapshotId + " lockFactory=" + lockFactory;
}

private void prewarmCache() {
if (prewarmCache) {
final List<BlobStoreIndexShardSnapshot.FileInfo> cacheFiles = snapshot().indexFiles()
.stream()
.filter(file -> file.metadata().hashEqualsContents() == false)
.filter(file -> isExcludedFromCache(file.physicalName()) == false)
.collect(Collectors.toList());

final Executor executor = threadPool.executor(SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME);
logger.debug("{} warming shard cache for [{}] files", shardId, cacheFiles.size());

for (BlobStoreIndexShardSnapshot.FileInfo cacheFile : cacheFiles) {
final String fileName = cacheFile.physicalName();
try {
final IndexInput input = openInput(fileName, CachedBlobContainerIndexInput.CACHE_WARMING_CONTEXT);
assert input instanceof CachedBlobContainerIndexInput : "expected cached index input but got " + input.getClass();

final long numberOfParts = cacheFile.numberOfParts();
final CountDown countDown = new CountDown(Math.toIntExact(numberOfParts));
for (long p = 0; p < numberOfParts; p++) {
final int part = Math.toIntExact(p);
// TODO use multiple workers to warm each part instead of filling the thread pool
executor.execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
ensureOpen();

logger.trace("warming cache for [{}] part [{}/{}]", fileName, part, numberOfParts);
final long startTimeInNanos = statsCurrentTimeNanosSupplier.getAsLong();

final CachedBlobContainerIndexInput cachedIndexInput = (CachedBlobContainerIndexInput) input.clone();
final int bytesRead = cachedIndexInput.prefetchPart(part); // TODO does not include any rate limitation
assert bytesRead == cacheFile.partBytes(part);

logger.trace(
() -> new ParameterizedMessage(
"part [{}/{}] of [{}] warmed in [{}] ms",
part,
numberOfParts,
fileName,
TimeValue.timeValueNanos(statsCurrentTimeNanosSupplier.getAsLong() - startTimeInNanos).millis()
)
);
}

@Override
public void onFailure(Exception e) {
logger.trace(
() -> new ParameterizedMessage(
"failed to warm cache for [{}] part [{}/{}]",
fileName,
part,
numberOfParts
),
e
);
}

@Override
public void onAfter() {
if (countDown.countDown()) {
IOUtils.closeWhileHandlingException(input);
}
}
});
}
} catch (IOException e) {
logger.trace(() -> new ParameterizedMessage("failed to warm cache for [{}]", fileName), e);
}
}
}
}

public static Directory create(
RepositoriesService repositories,
CacheService cache,
IndexSettings indexSettings,
ShardPath shardPath,
LongSupplier currentTimeNanosSupplier
LongSupplier currentTimeNanosSupplier,
ThreadPool threadPool
) throws IOException {

final Repository repository = repositories.repository(SNAPSHOT_REPOSITORY_SETTING.get(indexSettings.getSettings()));
Expand Down Expand Up @@ -371,7 +464,8 @@ public static Directory create(
indexSettings.getSettings(),
currentTimeNanosSupplier,
cache,
cacheDir
cacheDir,
threadPool
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ protected void closeInternal() {
private final ReleasableLock readLock;

private final SparseFileTracker tracker;
private final int rangeSize;
private final String description;
private final Path file;

Expand All @@ -61,12 +60,11 @@ protected void closeInternal() {
@Nullable // if evicted, or there are no listeners
private volatile FileChannel channel;

public CacheFile(String description, long length, Path file, int rangeSize) {
public CacheFile(String description, long length, Path file) {
this.tracker = new SparseFileTracker(file.toString(), length);
this.description = Objects.requireNonNull(description);
this.file = Objects.requireNonNull(file);
this.listeners = new HashSet<>();
this.rangeSize = rangeSize;
this.evicted = false;

final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
Expand Down Expand Up @@ -249,41 +247,35 @@ private void ensureOpen() {
}

CompletableFuture<Integer> fetchRange(
long position,
long start,
long end,
CheckedBiFunction<Long, Long, Integer, IOException> onRangeAvailable,
CheckedBiConsumer<Long, Long, IOException> onRangeMissing
) {
final CompletableFuture<Integer> future = new CompletableFuture<>();
try {
if (position < 0 || position > tracker.getLength()) {
throw new IllegalArgumentException("Wrong read position [" + position + "]");
if (start < 0 || start > tracker.getLength() || start > end || end > tracker.getLength()) {
throw new IllegalArgumentException(
"Invalid range [start=" + start + ", end=" + end + "] for length [" + tracker.getLength() + ']'
);
}

ensureOpen();
final long rangeStart = (position / rangeSize) * rangeSize;
final long rangeEnd = Math.min(rangeStart + rangeSize, tracker.getLength());

final List<SparseFileTracker.Gap> gaps = tracker.waitForRange(
rangeStart,
rangeEnd,
start,
end,
ActionListener.wrap(
rangeReady -> future.complete(onRangeAvailable.apply(rangeStart, rangeEnd)),
rangeReady -> future.complete(onRangeAvailable.apply(start, end)),
rangeFailure -> future.completeExceptionally(rangeFailure)
)
);

if (gaps.size() > 0) {
final SparseFileTracker.Gap range = gaps.get(0);
assert gaps.size() == 1 : "expected 1 range to fetch but got " + gaps.size();
assert range.start == rangeStart : "range/gap start mismatch (" + range.start + ',' + rangeStart + ')';
assert range.end == rangeEnd : "range/gap end mismatch (" + range.end + ',' + rangeEnd + ')';

for (SparseFileTracker.Gap gap : gaps) {
try {
ensureOpen();
onRangeMissing.accept(rangeStart, rangeEnd);
range.onResponse(null);
onRangeMissing.accept(gap.start, gap.end);
gap.onResponse(null);
} catch (Exception e) {
range.onFailure(e);
gap.onFailure(e);
}
}
} catch (Exception e) {
Expand Down
Loading

0 comments on commit bd40d06

Please sign in to comment.