forked from opensearch-project/OpenSearch
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Writable Warm] Composite Directory implementation and integrating it…
… with FileCache (opensearch-project#12782) * Composite Directory POC Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Refactor TransferManager interface to RemoteStoreFileTrackerAdapter Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Implement block level fetch for Composite Directory Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Removed CACHE state from FileTracker Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Fixes after latest pull Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Add new setting for warm, remove store type setting, FileTracker and RemoteStoreFileTrackerAdapter, CompositeDirectoryFactory and update Composite Directory implementation Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Modify TransferManager - replace BlobContainer with Functional Interface to fetch an InputStream instead Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Reuse OnDemandBlockSnapshotIndexInput instead of OnDemandBlockCompositeIndexInput Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Modify constructors to avoid breaking public api contract and code review fixes Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Add experimental annotations for newly created classes and review comment fixes Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Use ref count as a temporary measure to prevent file from eviction until uploaded to Remote Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Remove method level locks Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Handle tmp file deletion Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Nit fixes Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Handle delete and close in Composite Directory, log current state of FileCache and correct it's clear method and modify unit and integration tests as per review comments Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Refactor usages of WRITEABLE_REMOTE_INDEX_SETTING to TIERED_REMOTE_INDEX_SETTING Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Add tests for FileCachedIndexInput and review comment fixes Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Add additional IT for feature flag disabled Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Move setting for Partial Locality type behind Feature Flag, fix bug for ref count via cloneMap in FullFileCachedIndexInput and other review fixes Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Minor test and nit fixes Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Add javadocs for FullFileCachedIndexInput Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> * Minor precommit fixes Signed-off-by: Shreyansh Ray <rayshrey@amazon.com> --------- Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>
- Loading branch information
Showing
35 changed files
with
1,889 additions
and
209 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
160 changes: 160 additions & 0 deletions
160
server/src/internalClusterTest/java/org/opensearch/remotestore/WritableWarmIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.remotestore; | ||
|
||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; | ||
|
||
import org.apache.lucene.store.Directory; | ||
import org.apache.lucene.store.FilterDirectory; | ||
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; | ||
import org.opensearch.action.admin.indices.get.GetIndexRequest; | ||
import org.opensearch.action.admin.indices.get.GetIndexResponse; | ||
import org.opensearch.action.search.SearchResponse; | ||
import org.opensearch.cluster.metadata.IndexMetadata; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.settings.SettingsException; | ||
import org.opensearch.common.util.FeatureFlags; | ||
import org.opensearch.index.IndexModule; | ||
import org.opensearch.index.query.QueryBuilders; | ||
import org.opensearch.index.shard.IndexShard; | ||
import org.opensearch.index.store.CompositeDirectory; | ||
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; | ||
import org.opensearch.index.store.remote.filecache.FileCache; | ||
import org.opensearch.index.store.remote.utils.FileTypeUtils; | ||
import org.opensearch.indices.IndicesService; | ||
import org.opensearch.node.Node; | ||
import org.opensearch.test.InternalTestCluster; | ||
import org.opensearch.test.OpenSearchIntegTestCase; | ||
|
||
import java.util.Arrays; | ||
import java.util.HashSet; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
|
||
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; | ||
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; | ||
|
||
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) | ||
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false) | ||
// Uncomment the below line to enable trace level logs for this test for better debugging | ||
// @TestLogging(reason = "Getting trace logs from composite directory package", value = "org.opensearch.index.store:TRACE") | ||
public class WritableWarmIT extends RemoteStoreBaseIntegTestCase { | ||
|
||
protected static final String INDEX_NAME = "test-idx-1"; | ||
protected static final int NUM_DOCS_IN_BULK = 1000; | ||
|
||
/* | ||
Disabling MockFSIndexStore plugin as the MockFSDirectoryFactory wraps the FSDirectory over a OpenSearchMockDirectoryWrapper which extends FilterDirectory (whereas FSDirectory extends BaseDirectory) | ||
As a result of this wrapping the local directory of Composite Directory does not satisfy the assertion that local directory must be of type FSDirectory | ||
*/ | ||
@Override | ||
protected boolean addMockIndexStorePlugin() { | ||
return false; | ||
} | ||
|
||
@Override | ||
protected Settings featureFlagSettings() { | ||
Settings.Builder featureSettings = Settings.builder(); | ||
featureSettings.put(FeatureFlags.TIERED_REMOTE_INDEX, true); | ||
return featureSettings.build(); | ||
} | ||
|
||
public void testWritableWarmFeatureFlagDisabled() { | ||
Settings clusterSettings = Settings.builder().put(super.nodeSettings(0)).put(FeatureFlags.TIERED_REMOTE_INDEX, false).build(); | ||
InternalTestCluster internalTestCluster = internalCluster(); | ||
internalTestCluster.startClusterManagerOnlyNode(clusterSettings); | ||
internalTestCluster.startDataOnlyNode(clusterSettings); | ||
|
||
Settings indexSettings = Settings.builder() | ||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) | ||
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name()) | ||
.build(); | ||
|
||
try { | ||
prepareCreate(INDEX_NAME).setSettings(indexSettings).get(); | ||
fail("Should have thrown Exception as setting should not be registered if Feature Flag is Disabled"); | ||
} catch (SettingsException ex) { | ||
assertEquals( | ||
"unknown setting [" | ||
+ IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey() | ||
+ "] please check that any required plugins are installed, or check the " | ||
+ "breaking changes documentation for removed settings", | ||
ex.getMessage() | ||
); | ||
} | ||
} | ||
|
||
public void testWritableWarmBasic() throws Exception { | ||
InternalTestCluster internalTestCluster = internalCluster(); | ||
internalTestCluster.startClusterManagerOnlyNode(); | ||
internalTestCluster.startDataOnlyNode(); | ||
Settings settings = Settings.builder() | ||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) | ||
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name()) | ||
.build(); | ||
assertAcked(client().admin().indices().prepareCreate(INDEX_NAME).setSettings(settings).get()); | ||
|
||
// Verify from the cluster settings if the data locality is partial | ||
GetIndexResponse getIndexResponse = client().admin() | ||
.indices() | ||
.getIndex(new GetIndexRequest().indices(INDEX_NAME).includeDefaults(true)) | ||
.get(); | ||
Settings indexSettings = getIndexResponse.settings().get(INDEX_NAME); | ||
assertEquals(IndexModule.DataLocalityType.PARTIAL.name(), indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey())); | ||
|
||
// Ingesting some docs | ||
indexBulk(INDEX_NAME, NUM_DOCS_IN_BULK); | ||
flushAndRefresh(INDEX_NAME); | ||
|
||
// ensuring cluster is green after performing force-merge | ||
ensureGreen(); | ||
|
||
SearchResponse searchResponse = client().prepareSearch(INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get(); | ||
// Asserting that search returns same number of docs as ingested | ||
assertHitCount(searchResponse, NUM_DOCS_IN_BULK); | ||
|
||
// Ingesting docs again before force merge | ||
indexBulk(INDEX_NAME, NUM_DOCS_IN_BULK); | ||
flushAndRefresh(INDEX_NAME); | ||
|
||
FileCache fileCache = internalTestCluster.getDataNodeInstance(Node.class).fileCache(); | ||
IndexShard shard = internalTestCluster.getDataNodeInstance(IndicesService.class) | ||
.indexService(resolveIndex(INDEX_NAME)) | ||
.getShardOrNull(0); | ||
Directory directory = (((FilterDirectory) (((FilterDirectory) (shard.store().directory())).getDelegate())).getDelegate()); | ||
|
||
// Force merging the index | ||
Set<String> filesBeforeMerge = new HashSet<>(Arrays.asList(directory.listAll())); | ||
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).get(); | ||
flushAndRefresh(INDEX_NAME); | ||
Set<String> filesAfterMerge = new HashSet<>(Arrays.asList(directory.listAll())); | ||
|
||
Set<String> filesFromPreviousGenStillPresent = filesBeforeMerge.stream() | ||
.filter(filesAfterMerge::contains) | ||
.filter(file -> !FileTypeUtils.isLockFile(file)) | ||
.filter(file -> !FileTypeUtils.isSegmentsFile(file)) | ||
.collect(Collectors.toUnmodifiableSet()); | ||
|
||
// Asserting that after merge all the files from previous gen are no more part of the directory | ||
assertTrue(filesFromPreviousGenStillPresent.isEmpty()); | ||
|
||
// Asserting that files from previous gen are not present in File Cache as well | ||
filesBeforeMerge.stream() | ||
.filter(file -> !FileTypeUtils.isLockFile(file)) | ||
.filter(file -> !FileTypeUtils.isSegmentsFile(file)) | ||
.forEach(file -> assertNull(fileCache.get(((CompositeDirectory) directory).getFilePath(file)))); | ||
|
||
// Deleting the index (so that ref count drops to zero for all the files) and then pruning the cache to clear it to avoid any file | ||
// leaks | ||
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); | ||
fileCache.prune(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.