Skip to content

Commit

Permalink
add index store listener
Browse files Browse the repository at this point in the history
Signed-off-by: panguixin <panguixin@bytedance.com>
  • Loading branch information
bugmakerrrrrr committed Feb 2, 2024
1 parent bb58e2b commit e1e1472
Show file tree
Hide file tree
Showing 7 changed files with 328 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -877,12 +877,12 @@ public void testRelocateSearchableSnapshotIndex() throws Exception {
takeSnapshot(client, snapshotName, repoName, indexName);
deleteIndicesAndEnsureGreen(client, indexName);

String searchNode1 = internalCluster().startSearchNode();
String searchNode1 = internalCluster().startSearchOnlyNodes(1).get(0);
internalCluster().validateClusterFormed();
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertRemoteSnapshotIndexSettings(client, restoredIndexName);

String searchNode2 = internalCluster().startSearchNode();
String searchNode2 = internalCluster().startSearchOnlyNodes(1).get(0);
internalCluster().validateClusterFormed();

final Index index = resolveIndex(restoredIndexName);
Expand Down
78 changes: 25 additions & 53 deletions server/src/main/java/org/opensearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.store.NativeFSLockFactory;
import org.apache.lucene.util.SetOnce;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -72,7 +71,6 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.monitor.jvm.JvmInfo;
Expand Down Expand Up @@ -107,7 +105,6 @@
import java.util.stream.Stream;

import static java.util.Collections.unmodifiableSet;
import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION;

/**
* A component that holds all data paths for a single node.
Expand Down Expand Up @@ -202,7 +199,7 @@ public String toString() {

private final NodeMetadata nodeMetadata;

private final SetOnce<FileCache> fileCache = new SetOnce<>();
private final IndexStoreListener indexStoreListener;

/**
* Maximum number of data nodes that should run in an environment.
Expand Down Expand Up @@ -300,18 +297,23 @@ public void close() {
}
}

public NodeEnvironment(Settings settings, Environment environment) throws IOException {
this(settings, environment, IndexStoreListener.EMPTY);
}

/**
* Setup the environment.
* @param settings settings from opensearch.yml
*/
public NodeEnvironment(Settings settings, Environment environment) throws IOException {
if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) {
public NodeEnvironment(Settings settings, Environment environment, IndexStoreListener indexStoreListener) throws IOException {
if (DiscoveryNode.nodeRequiresLocalStorage(settings) == false) {
nodePaths = null;
fileCacheNodePath = null;
sharedDataPath = null;
locks = null;
nodeLockId = -1;
nodeMetadata = new NodeMetadata(generateNodeId(settings), Version.CURRENT);
this.indexStoreListener = IndexStoreListener.EMPTY;
return;
}
boolean success = false;
Expand Down Expand Up @@ -390,6 +392,7 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
}

this.nodeMetadata = loadNodeMetadata(settings, logger, nodePaths);
this.indexStoreListener = indexStoreListener;
success = true;
} finally {
if (success == false) {
Expand All @@ -398,10 +401,6 @@ public NodeEnvironment(Settings settings, Environment environment) throws IOExce
}
}

public void setFileCache(final FileCache fileCache) {
this.fileCache.set(fileCache);
}

/**
* Resolve a specific nodes/{node.id} path for the specified path and node lock id.
*
Expand Down Expand Up @@ -587,12 +586,7 @@ public void deleteShardDirectoryUnderLock(ShardLock lock, IndexSettings indexSet
final ShardId shardId = lock.getShardId();
assert isShardLocked(shardId) : "shard " + shardId + " is not locked";

if (indexSettings.isRemoteSnapshot()) {
final ShardPath shardPath = ShardPath.loadFileCachePath(this, shardId);
cleanupShardFileCache(shardPath);
deleteShardFileCacheDirectory(shardPath);
logger.trace("deleted shard {} file cache directory, path: [{}]", shardId, shardPath.getDataPath());
}
indexStoreListener.beforeShardPathDeleted(shardId, indexSettings, this);

final Path[] paths = availableShardPaths(shardId);
logger.trace("acquiring locks for {}, paths: [{}]", shardId, paths);
Expand All @@ -609,40 +603,6 @@ public void deleteShardDirectoryUnderLock(ShardLock lock, IndexSettings indexSet
assert assertPathsDoNotExist(paths);
}

/**
* Cleans up the corresponding index file path entries from FileCache
*
* @param shardPath the shard path
*/
private void cleanupShardFileCache(ShardPath shardPath) {
try {
final FileCache fc = fileCache.get();
assert fc != null;
final Path localStorePath = shardPath.getDataPath().resolve(LOCAL_STORE_LOCATION);
try (DirectoryStream<Path> ds = Files.newDirectoryStream(localStorePath)) {
for (Path subPath : ds) {
fc.remove(subPath.toRealPath());
}
}
} catch (IOException ioe) {
logger.error(
() -> new ParameterizedMessage("Error removing items from cache during shard deletion {}", shardPath.getShardId()),
ioe
);
}
}

private void deleteShardFileCacheDirectory(ShardPath shardPath) {
final Path path = shardPath.getDataPath();
try {
if (Files.exists(path)) {
IOUtils.rm(path);
}
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Failed to delete cache path for shard {}", shardPath.getShardId()), e);
}
}

private static boolean assertPathsDoNotExist(final Path[] paths) {
Set<Path> existingPaths = Stream.of(paths).filter(FileSystemUtils::exists).filter(leftOver -> {
// Relaxed assertion for the special case where only the empty state directory exists after deleting
Expand Down Expand Up @@ -704,9 +664,7 @@ public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, IndexSetti
* @param indexSettings settings for the index being deleted
*/
public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettings) throws IOException {
if (indexSettings.isRemoteSnapshot()) {
deleteIndexFileCacheDirectory(index);
}
indexStoreListener.beforeIndexPathDeleted(index, indexSettings, this);

final Path[] indexPaths = indexPaths(index);
logger.trace("deleting index {} directory, paths({}): [{}]", index, indexPaths.length, indexPaths);
Expand Down Expand Up @@ -1454,4 +1412,18 @@ private static void tryWriteTempFile(Path path) throws IOException {
}
}
}

/**
* A listener that is executed on per-index and per-shard store events, like deleting shard path
*
* @opensearch.internal
*/
public interface IndexStoreListener {
default void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {}

default void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {}

IndexStoreListener EMPTY = new IndexStoreListener() {
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store.remote.filecache;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.inject.Provider;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;

import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;

import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION;

/**
* IndexStoreListener to clean up file cache when the index is deleted. The cached entries will be eligible
* for eviction when the shard is deleted, but this listener deterministically removes entries from memory and
* from disk at the time of shard deletion as opposed to waiting for the cache to need to perform eviction.
*
* @opensearch.internal
*/
public class FileCacheCleaner implements NodeEnvironment.IndexStoreListener {
private static final Logger logger = LogManager.getLogger(FileCacheCleaner.class);

private final Provider<FileCache> fileCacheProvider;

public FileCacheCleaner(Provider<FileCache> fileCacheProvider) {
this.fileCacheProvider = fileCacheProvider;
}

/**
* before shard path deleted, cleans up the corresponding index file path entries from FC and delete the corresponding shard file
* cache path.
*
* @param shardId the shard id
* @param indexSettings the index settings
* @param nodeEnvironment the node environment
*/
@Override
public void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment nodeEnvironment) {
if (indexSettings.isRemoteSnapshot()) {
final ShardPath shardPath = ShardPath.loadFileCachePath(nodeEnvironment, shardId);
cleanupShardFileCache(shardPath);
deleteShardFileCacheDirectory(shardPath);
}
}

/**
* Cleans up the corresponding index file path entries from FileCache
*
* @param shardPath the shard path
*/
private void cleanupShardFileCache(ShardPath shardPath) {
try {
final FileCache fc = fileCacheProvider.get();
assert fc != null;
final Path localStorePath = shardPath.getDataPath().resolve(LOCAL_STORE_LOCATION);
try (DirectoryStream<Path> ds = Files.newDirectoryStream(localStorePath)) {
for (Path subPath : ds) {
fc.remove(subPath.toRealPath());
}
}
} catch (IOException ioe) {
logger.error(
() -> new ParameterizedMessage("Error removing items from cache during shard deletion {}", shardPath.getShardId()),

Check warning on line 79 in server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java#L78-L79

Added lines #L78 - L79 were not covered by tests
ioe
);
}
}

private void deleteShardFileCacheDirectory(ShardPath shardPath) {
final Path path = shardPath.getDataPath();
try {
if (Files.exists(path)) {
IOUtils.rm(path);
}
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Failed to delete cache path for shard {}", shardPath.getShardId()), e);

Check warning on line 92 in server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java#L91-L92

Added lines #L91 - L92 were not covered by tests
}
}

/**
* before index path deleted, delete the corresponding index file cache path.
*
* @param index the index
* @param indexSettings the index settings
* @param nodeEnvironment the node environment
*/
@Override
public void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment nodeEnvironment) {
if (indexSettings.isRemoteSnapshot()) {
final Path indexCachePath = nodeEnvironment.fileCacheNodePath().fileCachePath.resolve(index.getUUID());
if (Files.exists(indexCachePath)) {
try {
IOUtils.rm(indexCachePath);
} catch (IOException e) {
logger.error(() -> new ParameterizedMessage("Failed to delete cache path for index {}", index), e);

Check warning on line 111 in server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java#L111

Added line #L111 was not covered by tests
}
}
}
}
}
8 changes: 6 additions & 2 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheCleaner;
import org.opensearch.index.store.remote.filecache.FileCacheFactory;
import org.opensearch.indices.IndicesModule;
import org.opensearch.indices.IndicesService;
Expand Down Expand Up @@ -526,7 +527,11 @@ protected Node(
*/
this.environment = new Environment(settings, initialEnvironment.configDir(), Node.NODE_LOCAL_STORAGE_SETTING.get(settings));
Environment.assertEquivalent(initialEnvironment, this.environment);
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
if (DiscoveryNode.isSearchNode(settings) == false) {
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
} else {
nodeEnvironment = new NodeEnvironment(settings, environment, new FileCacheCleaner(this::fileCache));
}
logger.info(
"node name [{}], node ID [{}], cluster name [{}], roles {}",
NODE_NAME_SETTING.get(tmpSettings),
Expand Down Expand Up @@ -677,7 +682,6 @@ protected Node(
);
// File cache will be initialized by the node once circuit breakers are in place.
initializeFileCache(settings, circuitBreakerService.getBreaker(CircuitBreaker.REQUEST));
nodeEnvironment.setFileCache(fileCache);
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, fileCache);

pluginsService.filterPlugins(CircuitBreakerPlugin.class).forEach(plugin -> {
Expand Down
56 changes: 56 additions & 0 deletions server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,57 @@ protected void doRun() throws Exception {
env.close();
}

public void testIndexStoreListener() throws Exception {
final AtomicInteger shardCounter = new AtomicInteger(0);
final AtomicInteger indexCounter = new AtomicInteger(0);
final Index index = new Index("foo", "fooUUID");
final ShardId shardId = new ShardId(index, 0);
final NodeEnvironment.IndexStoreListener listener = new NodeEnvironment.IndexStoreListener() {
@Override
public void beforeShardPathDeleted(ShardId inShardId, IndexSettings indexSettings, NodeEnvironment env) {
assertEquals(shardId, inShardId);
shardCounter.incrementAndGet();
}

@Override
public void beforeIndexPathDeleted(Index inIndex, IndexSettings indexSettings, NodeEnvironment env) {
assertEquals(index, inIndex);
indexCounter.incrementAndGet();
}
};
final NodeEnvironment env = newNodeEnvironment(listener);

for (Path path : env.indexPaths(index)) {
Files.createDirectories(path.resolve("0"));
}

for (Path path : env.indexPaths(index)) {
assertTrue(Files.exists(path.resolve("0")));
}
assertEquals(0, shardCounter.get());

env.deleteShardDirectorySafe(new ShardId(index, 0), idxSettings);

for (Path path : env.indexPaths(index)) {
assertFalse(Files.exists(path.resolve("0")));
}
assertEquals(1, shardCounter.get());

for (Path path : env.indexPaths(index)) {
assertTrue(Files.exists(path));
}
assertEquals(0, indexCounter.get());

env.deleteIndexDirectorySafe(index, 5000, idxSettings);

for (Path path : env.indexPaths(index)) {
assertFalse(Files.exists(path));
}
assertEquals(1, indexCounter.get());
assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty());
env.close();
}

public void testStressShardLock() throws IOException, InterruptedException {
class Int {
int value = 0;
Expand Down Expand Up @@ -629,6 +680,11 @@ public NodeEnvironment newNodeEnvironment() throws IOException {
return newNodeEnvironment(Settings.EMPTY);
}

public NodeEnvironment newNodeEnvironment(NodeEnvironment.IndexStoreListener listener) throws IOException {
Settings build = buildEnvSettings(Settings.EMPTY);
return new NodeEnvironment(build, TestEnvironment.newEnvironment(build), listener);
}

@Override
public NodeEnvironment newNodeEnvironment(Settings settings) throws IOException {
Settings build = buildEnvSettings(settings);
Expand Down
Loading

0 comments on commit e1e1472

Please sign in to comment.