-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow to prewarm the cache for searchable snapshot shards #55322
Conversation
Pinging @elastic/es-distributed (:Distributed/Snapshot/Restore) |
@@ -115,8 +135,10 @@ public SearchableSnapshotDirectory( | |||
this.cacheDir = Objects.requireNonNull(cacheDir); | |||
this.closed = new AtomicBoolean(false); | |||
this.useCache = SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings); | |||
this.loadCacheEagerly = useCache ? SNAPSHOT_CACHE_LOAD_EAGERLY_SETTING.get(indexSettings) : false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better name suggestions are welcome
boolean alreadyLoaded = this.loaded; | ||
if (alreadyLoaded == false) { | ||
synchronized (this) { | ||
alreadyLoaded = this.loaded; | ||
if (alreadyLoaded == false) { | ||
this.blobContainer = blobContainerSupplier.get(); | ||
this.snapshot = snapshotSupplier.get(); | ||
if (loadCacheEagerly) { | ||
prewarmCache(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This methods blocks until the cache is fully prewarmed. It must be done before loaded
is set to true
so that other components of the system are not likely to trigger some caching on this directory files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that strictly necessary? I would prefer to initiate the prewarming here, but at the same time allow the shard routing to move to started state as quickly as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not strictly necessary but a misunderstanding from my part. We discussed this and I updated the PR so that cache warming now runs concurrently with the recovery.
|
||
final BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfo(name); | ||
private IndexInput openInput(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, final IOContext context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Splitting this method into two allows to open an IndexInput
even if the snapshot is not marked as loaded
yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is no longer necessary? The private openInput
method is only called in one place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed - I pushed c8a1c6b to remove the method.
); | ||
final long startTimeInNanos = statsCurrentTimeNanosSupplier.getAsLong(); | ||
try { | ||
final IndexInput input = openInput(file, CachedBlobContainerIndexInput.CACHE_WARMING_CONTEXT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method uses an IndexInput
with a specific IOContext
to prewarm the cache for the given Lucene file. The IndexInput
will be cloned for each part to write in cache later and closed once all parts are processed.
...hable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java
Outdated
Show resolved
Hide resolved
@@ -61,12 +61,11 @@ protected void closeInternal() { | |||
@Nullable // if evicted, or there are no listeners | |||
private volatile FileChannel channel; | |||
|
|||
public CacheFile(String description, long length, Path file, int rangeSize) { | |||
public CacheFile(String description, long length, Path file) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We included the rangeSize in the CacheFile to compute the range to fetch given a specific position, but we were never asserting that the fetched ranges really matched the size.
@@ -144,6 +208,7 @@ private void writeCacheFile(FileChannel fc, long start, long end) throws IOExcep | |||
final long length = end - start; | |||
final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, length))]; | |||
logger.trace(() -> new ParameterizedMessage("writing range [{}-{}] to cache file [{}]", start, end, cacheFileReference)); | |||
assert assertRangeOfBytesAlignment(start, end); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This asserts the size of the ranges written in cache, depending of the IOContext.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't really assert the size of ranges now warming runs concurrently. This has been removed.
...apshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java
Show resolved
Hide resolved
@Override | ||
protected void doRun() throws Exception { | ||
CheckedRunnable<Exception> loader; | ||
while (isOpen && (loader = queue.poll(0L, TimeUnit.MILLISECONDS)) != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that this is taking the same approach as we use for uploading snapshot files (BlobStoreRepository). I would prefer not to hold onto workers for such a long time, as it can block the snapshot thread pool for a long time (cc: @original-brownbear).
In both cases (also the one BlobStoreRepository), I would prefer for the worker to process one file, then enqueue another task to the thread pool to pick up the next piece of work. This allows other operations to make progress as well, instead of waiting for a long time in the snapshot queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, that makes sense Yannick.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for the uploading side the downside of
This allows other operations to make progress as well
is that it causes the index commits to be held on for a suboptimally long time. That's why the approach of fully monopolizing the pool was consciously chosen for uploads there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's something to be better controlled at the SnapshotShardsService
level then, though. It could limit the number of shards to be snapshotted concurrently by lazily enqueuing there.
Please hold on reviews - Yannick and I discussed several points on this PR yesterday and I'll address them. |
I've updated this PR so that the cache warming is not blocking anymore and now runs concurrently with the shard recovery. Allowing concurrent reads of different chunk sizes required to remove assertions on the number and the length of gaps to be written in cache (which I think is OK as David planned to improve this). I also introduced a dedicated thread pool for cache warming as suggested by Yannick. This thread pool is sized larger than the default snapshot thread pool. I've quickly ran some benchmarks and compared the results to regular full restores. Depending of the snapshot to restore this change runs from 10% to 50% faster than a regular restore. This makes sense now contention have been reduced in #55662 and custom thread pool is used for warming. |
ML related failure @elasticmachine run elasticsearch-ci/2 |
...hable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java
Show resolved
Hide resolved
@DaveCTurner sorry for the delay. This is ready for review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cunning use of an IOContext
, makes sense to me. I left some small comments but nothing major.
@@ -99,6 +103,11 @@ | |||
true, | |||
Setting.Property.IndexScope | |||
); | |||
public static final Setting<Boolean> SNAPSHOT_CACHE_LOAD_EAGERLY_SETTING = Setting.boolSetting( | |||
"index.store.snapshot.cache.load.eagerly", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest keeping the terminology consistent around "warming", how about index.store.snapshot.cache.prewarm.enabled
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much better name, thanks. I pushed 4ee96af.
@@ -129,7 +129,7 @@ protected InputStream openSlice(long slice) throws IOException { | |||
} | |||
} | |||
|
|||
protected final boolean assertCurrentThreadMayAccessBlobStore() { | |||
protected boolean assertCurrentThreadMayAccessBlobStore() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can relax the assertion here to permit the searchable_snapshots
threadpool to access the repo, rather than overloading it only in CachedBlobContainerIndexInput
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I pushed 0469c7b
|
||
final BlobStoreIndexShardSnapshot.FileInfo fileInfo = fileInfo(name); | ||
private IndexInput openInput(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, final IOContext context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is no longer necessary? The private openInput
method is only called in one place.
...hable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java
Show resolved
Hide resolved
...plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java
Outdated
Show resolved
Hide resolved
...plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java
Outdated
Show resolved
Hide resolved
...plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java
Outdated
Show resolved
Hide resolved
...apshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java
Show resolved
Hide resolved
Thanks @DaveCTurner, I've applied your feedback. This is ready for another round. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two more tiny comments regarding the threadpool, but otherwise LGTM. Great work @tlrx.
...snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java
Show resolved
Hide resolved
...snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java
Outdated
Show resolved
Hide resolved
Thanks David |
Today the cache prewarming introduced in #55322 works by enqueuing altogether the files parts to warm in the searchable_snapshots thread pool. In order to make this fairer among concurrent warmings, this commit starts workers that concurrently polls file parts to warm from a queue, warms the part and then immediately schedule another warming execution. This should leave more room for concurrent shard warming to sneak in and be executed. Relates #55322
Today the cache prewarming introduced in elastic#55322 works by enqueuing altogether the files parts to warm in the searchable_snapshots thread pool. In order to make this fairer among concurrent warmings, this commit starts workers that concurrently polls file parts to warm from a queue, warms the part and then immediately schedule another warming execution. This should leave more room for concurrent shard warming to sneak in and be executed. Relates elastic#55322
Today the cache prewarming introduced in #55322 works by enqueuing altogether the files parts to warm in the searchable_snapshots thread pool. In order to make this fairer among concurrent warmings, this commit starts workers that concurrently polls file parts to warm from a queue, warms the part and then immediately schedule another warming execution. This should leave more room for concurrent shard warming to sneak in and be executed. Relates #55322
This pull requests adds a way to prewarm the cache for searchable snapshot shard files.
It relies on a new index setting named
index.store.snapshot.cache.load.eagerly
(defaults tofalse
) that can be passed when mounting a snapshot as an index. This setting is detected during the pre-recovery step before the snapshot files are exposed to the other components of the system. The methodprewarmCache()
of theSearchableSnapshotDirectory
instance is executed, which builds the list of all parts of snapshot files that needs to be prefetched in cache (excluding the files that are stored in metadata hash and the ones explicitly excluded by theexcluded_file_types
setting).Then parts are prefetched in cache in parallel using the
SNAPSHOT
thread pool. If a snapshot file is composed of multiple parts (or chunks) then the parts can potentially be downloaded and written in cache concurrently. The implementation relies on a newprefetchPart()
method added to theCachedBlobContainerIndexInput
class. This method allows to fetch a complete part of a file (or the whole file if the snapshot file is composed of a single part) in order to write it in cache. This is possible becauseCacheFile
has been modified to work with configurable cache range sizes depending on theIOContext
theIndexInput
has been opened with.When the
IndexInput
is opened using the specificCACHE_WARMING_CONTEXT
context then the file is cached on disk using large ranges of bytes aligned on the beginning and the end of each part (or chunk) of the file. When using a different context then the fill is cached on disk using the normal cache range size defined through therange_size
setting. This implementation allows to reuse the existing cache eviction mechanism if something goes wrong when reading or writing the part. It also simplifies the logic if the recovering shard is closed while prewarming the cache.