Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup file cache on deleting index/shard directory #11443

Merged
merged 2 commits into from
Feb 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@
import org.opensearch.cluster.routing.GroupShardsIterator;
import org.opensearch.cluster.routing.ShardIterator;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.opensearch.common.Priority;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.monitor.fs.FsInfo;
Expand Down Expand Up @@ -859,4 +862,75 @@ private void assertCacheDirectoryReplicaAndIndexCount(int numCacheFolderCount, i
// Verifies if all the shards (primary and replica) have been deleted
assertEquals(numCacheFolderCount, searchNodeFileCachePaths.size());
}

public void testRelocateSearchableSnapshotIndex() throws Exception {
final String snapshotName = "test-snap";
final String repoName = "test-repo";
final String indexName = "test-idx-1";
final String restoredIndexName = indexName + "-copy";
final Client client = client();

internalCluster().ensureAtLeastNumDataNodes(1);
createIndexWithDocsAndEnsureGreen(0, 100, indexName);

createRepositoryWithSettings(null, repoName);
takeSnapshot(client, snapshotName, repoName, indexName);
deleteIndicesAndEnsureGreen(client, indexName);

String searchNode1 = internalCluster().startSearchOnlyNodes(1).get(0);
internalCluster().validateClusterFormed();
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertRemoteSnapshotIndexSettings(client, restoredIndexName);

String searchNode2 = internalCluster().startSearchOnlyNodes(1).get(0);
internalCluster().validateClusterFormed();

final Index index = resolveIndex(restoredIndexName);
assertSearchableSnapshotIndexDirectoryExistence(searchNode1, index, true);
assertSearchableSnapshotIndexDirectoryExistence(searchNode2, index, false);

// relocate the shard from node1 to node2
client.admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(restoredIndexName, 0, searchNode1, searchNode2))
.execute()
.actionGet();
ClusterHealthResponse clusterHealthResponse = client.admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.setTimeout(new TimeValue(5, TimeUnit.MINUTES))
.execute()
.actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
assertDocCount(restoredIndexName, 100L);

assertSearchableSnapshotIndexDirectoryExistence(searchNode1, index, false);
assertSearchableSnapshotIndexDirectoryExistence(searchNode2, index, true);
deleteIndicesAndEnsureGreen(client, restoredIndexName);
assertSearchableSnapshotIndexDirectoryExistence(searchNode2, index, false);
}

private void assertSearchableSnapshotIndexDirectoryExistence(String nodeName, Index index, boolean exists) throws Exception {
final Node node = internalCluster().getInstance(Node.class, nodeName);
final ShardId shardId = new ShardId(index, 0);
final ShardPath shardPath = ShardPath.loadFileCachePath(node.getNodeEnvironment(), shardId);

assertBusy(() -> {
assertTrue(
"shard state path should " + (exists ? "exist" : "not exist"),
Files.exists(shardPath.getShardStatePath()) == exists
);
assertTrue("shard cache path should " + (exists ? "exist" : "not exist"), Files.exists(shardPath.getDataPath()) == exists);
}, 30, TimeUnit.SECONDS);

final Path indexDataPath = node.getNodeEnvironment().fileCacheNodePath().fileCachePath.resolve(index.getUUID());
final Path indexPath = node.getNodeEnvironment().fileCacheNodePath().indicesPath.resolve(index.getUUID());
assertBusy(() -> {
assertTrue("index path should " + (exists ? "exist" : "not exist"), Files.exists(indexDataPath) == exists);
assertTrue("index cache path should " + (exists ? "exist" : "not exist"), Files.exists(indexPath) == exists);
}, 30, TimeUnit.SECONDS);
}
}
43 changes: 41 additions & 2 deletions server/src/main/java/org/opensearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@

private final NodeMetadata nodeMetadata;

private final IndexStoreListener indexStoreListener;

/**
* Maximum number of data nodes that should run in an environment.
*/
Expand Down Expand Up @@ -295,18 +297,23 @@
}
}

public NodeEnvironment(Settings settings, Environment environment) throws IOException {
this(settings, environment, IndexStoreListener.EMPTY);
}

/**
* Setup the environment.
* @param settings settings from opensearch.yml
*/
public NodeEnvironment(Settings settings, Environment environment) throws IOException {
if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) {
public NodeEnvironment(Settings settings, Environment environment, IndexStoreListener indexStoreListener) throws IOException {
if (DiscoveryNode.nodeRequiresLocalStorage(settings) == false) {
nodePaths = null;
fileCacheNodePath = null;
sharedDataPath = null;
locks = null;
nodeLockId = -1;
nodeMetadata = new NodeMetadata(generateNodeId(settings), Version.CURRENT);
this.indexStoreListener = IndexStoreListener.EMPTY;
return;
}
boolean success = false;
Expand Down Expand Up @@ -385,6 +392,7 @@
}

this.nodeMetadata = loadNodeMetadata(settings, logger, nodePaths);
this.indexStoreListener = indexStoreListener;
success = true;
} finally {
if (success == false) {
Expand Down Expand Up @@ -577,6 +585,9 @@
public void deleteShardDirectoryUnderLock(ShardLock lock, IndexSettings indexSettings) throws IOException {
final ShardId shardId = lock.getShardId();
assert isShardLocked(shardId) : "shard " + shardId + " is not locked";

indexStoreListener.beforeShardPathDeleted(shardId, indexSettings, this);

final Path[] paths = availableShardPaths(shardId);
logger.trace("acquiring locks for {}, paths: [{}]", shardId, paths);
acquireFSLockForPaths(indexSettings, paths);
Expand Down Expand Up @@ -653,6 +664,8 @@
* @param indexSettings settings for the index being deleted
*/
public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettings) throws IOException {
indexStoreListener.beforeIndexPathDeleted(index, indexSettings, this);

final Path[] indexPaths = indexPaths(index);
logger.trace("deleting index {} directory, paths({}): [{}]", index, indexPaths.length, indexPaths);
IOUtils.rm(indexPaths);
Expand All @@ -663,6 +676,18 @@
}
}

private void deleteIndexFileCacheDirectory(Index index) {
final Path indexCachePath = fileCacheNodePath().fileCachePath.resolve(index.getUUID());
logger.trace("deleting index {} file cache directory, path: [{}]", index, indexCachePath);

Check warning on line 681 in server/src/main/java/org/opensearch/env/NodeEnvironment.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/env/NodeEnvironment.java#L680-L681

Added lines #L680 - L681 were not covered by tests
if (Files.exists(indexCachePath)) {
try {
IOUtils.rm(indexCachePath);
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Failed to delete cache path for index {}", index), e);
}

Check warning on line 687 in server/src/main/java/org/opensearch/env/NodeEnvironment.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/env/NodeEnvironment.java#L684-L687

Added lines #L684 - L687 were not covered by tests
}
}

Check warning on line 689 in server/src/main/java/org/opensearch/env/NodeEnvironment.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/env/NodeEnvironment.java#L689

Added line #L689 was not covered by tests

/**
* Tries to lock all local shards for the given index. If any of the shard locks can't be acquired
* a {@link ShardLockObtainFailedException} is thrown and all previously acquired locks are released.
Expand Down Expand Up @@ -1387,4 +1412,18 @@
}
}
}

/**
* A listener that is executed on per-index and per-shard store events, like deleting shard path
*
* @opensearch.internal
*/
public interface IndexStoreListener {
default void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {}

default void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {}

IndexStoreListener EMPTY = new IndexStoreListener() {
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,13 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.inject.Provider;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.indices.cluster.IndicesClusterStateService;

import java.io.IOException;
import java.nio.file.DirectoryStream;
Expand All @@ -30,79 +27,90 @@
import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION;

/**
* IndexEventListener to clean up file cache when the index is deleted. The cached entries will be eligible
* IndexStoreListener to clean up file cache when the index is deleted. The cached entries will be eligible
* for eviction when the shard is deleted, but this listener deterministically removes entries from memory and
* from disk at the time of shard deletion as opposed to waiting for the cache to need to perform eviction.
*
* @opensearch.internal
*/
public class FileCacheCleaner implements IndexEventListener {
private static final Logger log = LogManager.getLogger(FileCacheCleaner.class);
public class FileCacheCleaner implements NodeEnvironment.IndexStoreListener {
private static final Logger logger = LogManager.getLogger(FileCacheCleaner.class);

private final NodeEnvironment nodeEnvironment;
private final FileCache fileCache;
private final Provider<FileCache> fileCacheProvider;

public FileCacheCleaner(NodeEnvironment nodeEnvironment, FileCache fileCache) {
this.nodeEnvironment = nodeEnvironment;
this.fileCache = fileCache;
public FileCacheCleaner(Provider<FileCache> fileCacheProvider) {
this.fileCacheProvider = fileCacheProvider;
}

/**
* before shard deleted and after shard closed, cleans up the corresponding index file path entries from FC.
* @param shardId The shard id
* @param settings the shards index settings
* before shard path deleted, cleans up the corresponding index file path entries from FC and delete the corresponding shard file
* cache path.
*
* @param shardId the shard id
* @param indexSettings the index settings
* @param nodeEnvironment the node environment
*/
@Override
public void beforeIndexShardDeleted(ShardId shardId, Settings settings) {
public void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment nodeEnvironment) {
if (indexSettings.isRemoteSnapshot()) {
final ShardPath shardPath = ShardPath.loadFileCachePath(nodeEnvironment, shardId);
cleanupShardFileCache(shardPath);
deleteShardFileCacheDirectory(shardPath);
}
}

/**
* Cleans up the corresponding index file path entries from FileCache
*
* @param shardPath the shard path
*/
private void cleanupShardFileCache(ShardPath shardPath) {
try {
if (isRemoteSnapshot(settings)) {
final ShardPath shardPath = ShardPath.loadFileCachePath(nodeEnvironment, shardId);
final Path localStorePath = shardPath.getDataPath().resolve(LOCAL_STORE_LOCATION);
try (DirectoryStream<Path> ds = Files.newDirectoryStream(localStorePath)) {
for (Path subPath : ds) {
fileCache.remove(subPath.toRealPath());
}
final FileCache fc = fileCacheProvider.get();
assert fc != null;
final Path localStorePath = shardPath.getDataPath().resolve(LOCAL_STORE_LOCATION);
try (DirectoryStream<Path> ds = Files.newDirectoryStream(localStorePath)) {
for (Path subPath : ds) {
fc.remove(subPath.toRealPath());
}
}
} catch (IOException ioe) {
log.error(() -> new ParameterizedMessage("Error removing items from cache during shard deletion {}", shardId), ioe);
logger.error(
() -> new ParameterizedMessage("Error removing items from cache during shard deletion {}", shardPath.getShardId()),

Check warning on line 79 in server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java#L78-L79

Added lines #L78 - L79 were not covered by tests
ioe
);
}
}

@Override
public void afterIndexShardDeleted(ShardId shardId, Settings settings) {
if (isRemoteSnapshot(settings)) {
final Path path = ShardPath.loadFileCachePath(nodeEnvironment, shardId).getDataPath();
try {
if (Files.exists(path)) {
IOUtils.rm(path);
}
} catch (IOException e) {
log.error(() -> new ParameterizedMessage("Failed to delete cache path for shard {}", shardId), e);
private void deleteShardFileCacheDirectory(ShardPath shardPath) {
final Path path = shardPath.getDataPath();
try {
if (Files.exists(path)) {
IOUtils.rm(path);
}
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Failed to delete cache path for shard {}", shardPath.getShardId()), e);

Check warning on line 92 in server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java#L91-L92

Added lines #L91 - L92 were not covered by tests
}
}

/**
* before index path deleted, delete the corresponding index file cache path.
*
* @param index the index
* @param indexSettings the index settings
* @param nodeEnvironment the node environment
*/
@Override
public void afterIndexRemoved(
Index index,
IndexSettings indexSettings,
IndicesClusterStateService.AllocatedIndices.IndexRemovalReason reason
) {
if (isRemoteSnapshot(indexSettings.getSettings())
&& reason == IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.DELETED) {
public void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment nodeEnvironment) {
if (indexSettings.isRemoteSnapshot()) {
final Path indexCachePath = nodeEnvironment.fileCacheNodePath().fileCachePath.resolve(index.getUUID());
if (Files.exists(indexCachePath)) {
try {
IOUtils.rm(indexCachePath);
} catch (IOException e) {
log.error(() -> new ParameterizedMessage("Failed to delete cache path for index {}", index), e);
logger.error(() -> new ParameterizedMessage("Failed to delete cache path for index {}", index), e);

Check warning on line 111 in server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java#L111

Added line #L111 was not covered by tests
}
}
}
}

private static boolean isRemoteSnapshot(Settings settings) {
return IndexModule.Type.REMOTE_SNAPSHOT.match(settings.get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.IndexingStats;
import org.opensearch.index.shard.IndexingStats.Stats.DocStatusStats;
import org.opensearch.index.store.remote.filecache.FileCacheCleaner;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory;
import org.opensearch.index.translog.TranslogFactory;
Expand Down Expand Up @@ -362,7 +361,6 @@ public class IndicesService extends AbstractLifecycleComponent
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
private volatile TimeValue clusterDefaultRefreshInterval;
private volatile TimeValue clusterRemoteTranslogBufferInterval;
private final FileCacheCleaner fileCacheCleaner;

private final SearchRequestStats searchRequestStats;

Expand Down Expand Up @@ -395,7 +393,6 @@ public IndicesService(
Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
Supplier<RepositoriesService> repositoriesServiceSupplier,
FileCacheCleaner fileCacheCleaner,
SearchRequestStats searchRequestStats,
@Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
RecoverySettings recoverySettings
Expand Down Expand Up @@ -450,7 +447,6 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon

this.directoryFactories = directoryFactories;
this.recoveryStateFactories = recoveryStateFactories;
this.fileCacheCleaner = fileCacheCleaner;
// doClose() is called when shutting down a node, yet there might still be ongoing requests
// that we need to wait for before closing some resources such as the caches. In order to
// avoid closing these resources while ongoing requests are still being processed, we use a
Expand Down Expand Up @@ -766,7 +762,6 @@ public void onStoreClosed(ShardId shardId) {
};
finalListeners.add(onStoreClose);
finalListeners.add(oldShardsStats);
finalListeners.add(fileCacheCleaner);
final IndexService indexService = createIndexService(
CREATE_INDEX,
indexMetadata,
Expand Down
Loading
Loading