Skip to content

Commit

Permalink
Handle delete and close in Composite Directory, log current state of …
Browse files Browse the repository at this point in the history
…FileCache and correct it's clear method and modify unit and integration tests as per review comments

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>
  • Loading branch information
rayshrey committed May 29, 2024
1 parent 7491c62 commit 78fee6e
Show file tree
Hide file tree
Showing 13 changed files with 500 additions and 332 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.utils.FileTypeUtils;
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.Node;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.junit.annotations.TestLogging;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false)
// Uncomment the below line to enable trace level logs for this test for better debugging
@TestLogging(reason = "Getting trace logs from composite directory package", value = "org.opensearch.index.store:TRACE")
public class WritableWarmIT extends RemoteStoreBaseIntegTestCase {

protected static final String INDEX_NAME = "test-idx-1";
protected static final int NUM_DOCS_IN_BULK = 1000;

/*
Disabling MockFSIndexStore plugin as the MockFSDirectoryFactory wraps the FSDirectory over a OpenSearchMockDirectoryWrapper which extends FilterDirectory (whereas FSDirectory extends BaseDirectory)
As a result of this wrapping the local directory of Composite Directory does not satisfy the assertion that local directory must be of type FSDirectory
*/
@Override
protected boolean addMockIndexStorePlugin() {
return false;
}

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
featureSettings.put(FeatureFlags.WRITEABLE_REMOTE_INDEX, true);

return featureSettings.build();
}

public void testWritableWarmBasic() throws Exception {
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name())
.build();
assertAcked(client().admin().indices().prepareCreate(INDEX_NAME).setSettings(settings).get());

// Verify from the cluster settings if the data locality is partial
GetIndexResponse getIndexResponse = client().admin()
.indices()
.getIndex(new GetIndexRequest().indices(INDEX_NAME).includeDefaults(true))
.get();
Settings indexSettings = getIndexResponse.settings().get(INDEX_NAME);
assertEquals(IndexModule.DataLocalityType.PARTIAL.name(), indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()));

// Ingesting some docs
indexBulk(INDEX_NAME, NUM_DOCS_IN_BULK);
flushAndRefresh(INDEX_NAME);

// ensuring cluster is green after performing force-merge
ensureGreen();

SearchResponse searchResponse = client().prepareSearch(INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get();
// Asserting that search returns same number of docs as ingested
assertHitCount(searchResponse, NUM_DOCS_IN_BULK);

// Ingesting docs again before force merge
indexBulk(INDEX_NAME, NUM_DOCS_IN_BULK);
flushAndRefresh(INDEX_NAME);

FileCache fileCache = internalCluster().getDataNodeInstance(Node.class).fileCache();
IndexShard shard = internalCluster().getDataNodeInstance(IndicesService.class)
.indexService(resolveIndex(INDEX_NAME))
.getShardOrNull(0);
Directory directory = (((FilterDirectory) (((FilterDirectory) (shard.store().directory())).getDelegate())).getDelegate());

// Force merging the index
Set<String> filesBeforeMerge = new HashSet<>(Arrays.asList(directory.listAll()));
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).get();
flushAndRefresh(INDEX_NAME);
Set<String> filesAfterMerge = new HashSet<>(Arrays.asList(directory.listAll()));

CacheUsage usageBeforePrune = fileCache.usage();
fileCache.prune();
CacheUsage usageAfterPrune = fileCache.usage();

Set<String> filesFromPreviousGenStillPresent = filesBeforeMerge.stream()
.filter(filesAfterMerge::contains)
.filter(file -> !FileTypeUtils.isLockFile(file))
.collect(Collectors.toUnmodifiableSet());

// Asserting that after merge all the files from previous gen are no more part of the directory
assertTrue(filesFromPreviousGenStillPresent.isEmpty());
// Asserting that after the merge, refCount of some files in FileCache dropped to zero which resulted in their eviction after
// pruning
assertTrue(usageAfterPrune.usage() < usageBeforePrune.usage());

// Clearing the file cache to avoid any file leaks
fileCache.clear();
}
}
21 changes: 1 addition & 20 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.lucene.search.Sort;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Accountable;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -94,7 +93,6 @@
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.CompositeDirectory;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.remote.filecache.FileCache;
Expand Down Expand Up @@ -617,25 +615,8 @@ public synchronized IndexShard createShard(
if (FeatureFlags.isEnabled(FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING) &&
// TODO : Need to remove this check after support for hot indices is added in Composite Directory
this.indexSettings.isStoreLocalityPartial()) {
/*
Currently Composite Directory only supports local directory to be of type FSDirectory
The reason is that FileCache currently has it key type as Path
Composite Directory currently uses FSDirectory's getDirectory() method to fetch and use the Path for operating on FileCache
TODO : Refactor FileCache to have key in form of String instead of Path. Once that is done we can remove this assertion
*/
Directory localDirectory = directoryFactory.newDirectory(this.indexSettings, path);

if (localDirectory instanceof FSDirectory == false) throw new IllegalStateException(
"For Composite Directory, local directory must be of type FSDirectory"
);
else if (fileCache == null) throw new IllegalStateException(
"File Cache not initialized on this Node, cannot create Composite Directory without FileCache"
);
else if (remoteDirectory == null) throw new IllegalStateException(
"Remote Directory must not be null for Composite Directory"
);

directory = new CompositeDirectory((FSDirectory) localDirectory, (RemoteSegmentStoreDirectory) remoteDirectory, fileCache);
directory = new CompositeDirectory(localDirectory, remoteDirectory, fileCache);
} else {
directory = directoryFactory.newDirectory(this.indexSettings, path);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ private boolean syncSegments() {
Collection<String> segmentsToRefresh = localSegmentsPostRefresh.stream()
.filter(file -> !skipUpload(file))
.collect(Collectors.toList());
Directory directory = ((FilterDirectory) (((FilterDirectory) storeDirectory).getDelegate())).getDelegate();

CountDownLatch latch = new CountDownLatch(1);
ActionListener<Void> segmentUploadsCompletedListener = new LatchedActionListener<>(new ActionListener<>() {
Expand All @@ -277,9 +276,6 @@ public void onResponse(Void unused) {
// At this point since we have uploaded new segments, segment infos and segment metadata file,
// along with marking minSeqNoToKeep, upload has succeeded completely.
successful.set(true);
if (directory instanceof CompositeDirectory) {
((CompositeDirectory) directory).afterSyncToRemote(segmentsToRefresh);
}
} catch (Exception e) {
// We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried
// as part of exponential back-off retry logic. This should not affect durability of the indexed data
Expand Down Expand Up @@ -431,13 +427,17 @@ private void uploadNewSegments(
logger.debug("Effective new segments files to upload {}", filteredFiles);
ActionListener<Collection<Void>> mappedListener = ActionListener.map(listener, resp -> null);
GroupedActionListener<Void> batchUploadListener = new GroupedActionListener<>(mappedListener, filteredFiles.size());
Directory directory = ((FilterDirectory) (((FilterDirectory) storeDirectory).getDelegate())).getDelegate();

for (String src : filteredFiles) {
// Initializing listener here to ensure that the stats increment operations are thread-safe
UploadListener statsListener = createUploadListener(localSegmentsSizeMap);
ActionListener<Void> aggregatedListener = ActionListener.wrap(resp -> {
statsListener.onSuccess(src);
batchUploadListener.onResponse(resp);
if (directory instanceof CompositeDirectory) {
((CompositeDirectory) directory).afterSyncToRemote(src);
}
}, ex -> {
logger.warn(() -> new ParameterizedMessage("Exception: [{}] while uploading segment files", ex), ex);
if (ex instanceof CorruptIndexException) {
Expand Down
Loading

0 comments on commit 78fee6e

Please sign in to comment.