Skip to content

Commit

Permalink
[Backport 2.x] Make IndexStoreListener a pluggable interface (#16594)
Browse files Browse the repository at this point in the history
* Make IndexStoreListener a pluggable interface (#16583)

Signed-off-by: Jay Deng <jayd0104@gmail.com>
(cherry picked from commit 9b7681c)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* Resolve breaking changes

Signed-off-by: Jay Deng <jayd0104@gmail.com>

---------

Signed-off-by: Jay Deng <jayd0104@gmail.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Jay Deng <jayd0104@gmail.com>
  • Loading branch information
3 people authored Nov 7, 2024
1 parent 9b9c5f9 commit 3e1e2cf
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 25 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for restoring from snapshot with search replicas ([#16111](https://github.com/opensearch-project/OpenSearch/pull/16111))
- Ensure support of the transport-nio by security plugin ([#16474](https://github.com/opensearch-project/OpenSearch/pull/16474))
- Switch from `buildSrc/version.properties` to Gradle version catalog (`gradle/libs.versions.toml`) to enable dependabot to perform automated upgrades on common libs ([#16284](https://github.com/opensearch-project/OpenSearch/pull/16284))
- Add dynamic setting allowing size > 0 requests to be cached in the request cache ([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483/files))
- Add dynamic setting allowing size > 0 requests to be cached in the request cache ([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483))
- Make IndexStoreListener a pluggable interface ([#16583](https://github.com/opensearch-project/OpenSearch/pull/16583))

### Dependencies
- Bump `com.google.apis:google-api-services-compute` from v1-rev20240407-2.0.0 to v1-rev20241021-2.0.0 ([#16502](https://github.com/opensearch-project/OpenSearch/pull/16502), [#16548](https://github.com/opensearch-project/OpenSearch/pull/16548))
Expand Down
28 changes: 18 additions & 10 deletions server/src/main/java/org/opensearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.IndexStoreListener;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.monitor.fs.FsProbe;
import org.opensearch.monitor.jvm.JvmInfo;
Expand Down Expand Up @@ -199,7 +200,7 @@ public String toString() {

private final NodeMetadata nodeMetadata;

private final IndexStoreListener indexStoreListener;
private final org.opensearch.index.store.IndexStoreListener indexStoreListener;

/**
* Maximum number of data nodes that should run in an environment.
Expand Down Expand Up @@ -298,22 +299,32 @@ public void close() {
}

public NodeEnvironment(Settings settings, Environment environment) throws IOException {
this(settings, environment, IndexStoreListener.EMPTY);
this(settings, environment, org.opensearch.index.store.IndexStoreListener.EMPTY);
}

/**
* Use {@link NodeEnvironment#NodeEnvironment(Settings, Environment, org.opensearch.index.store.IndexStoreListener)} instead.
* This constructor is kept around on 2.x to avoid breaking changes.
*/
@Deprecated(forRemoval = true, since = "2.19.0")
public NodeEnvironment(Settings settings, Environment environment, IndexStoreListener indexStoreListener) throws IOException {
this(settings, environment, (org.opensearch.index.store.IndexStoreListener) indexStoreListener);
}

/**
* Setup the environment.
* @param settings settings from opensearch.yml
*/
public NodeEnvironment(Settings settings, Environment environment, IndexStoreListener indexStoreListener) throws IOException {
public NodeEnvironment(Settings settings, Environment environment, org.opensearch.index.store.IndexStoreListener indexStoreListener)
throws IOException {
if (DiscoveryNode.nodeRequiresLocalStorage(settings) == false) {
nodePaths = null;
fileCacheNodePath = null;
sharedDataPath = null;
locks = null;
nodeLockId = -1;
nodeMetadata = new NodeMetadata(generateNodeId(settings), Version.CURRENT);
this.indexStoreListener = IndexStoreListener.EMPTY;
this.indexStoreListener = org.opensearch.index.store.IndexStoreListener.EMPTY;
return;
}
boolean success = false;
Expand Down Expand Up @@ -1414,15 +1425,12 @@ private static void tryWriteTempFile(Path path) throws IOException {
}

/**
* A listener that is executed on per-index and per-shard store events, like deleting shard path
* Use {@link org.opensearch.index.store.IndexStoreListener} instead. This interface is kept around on 2.x to avoid breaking changes.
*
* @opensearch.internal
*/
public interface IndexStoreListener {
default void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {}

default void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {}

@Deprecated(forRemoval = true, since = "2.19.0")
public interface IndexStoreListener extends org.opensearch.index.store.IndexStoreListener {
IndexStoreListener EMPTY = new IndexStoreListener() {
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexSettings;

import java.util.Collections;
import java.util.List;

/**
* A listener that is executed on per-index and per-shard store events, like deleting shard path
*
* @opensearch.api
*/
@PublicApi(since = "2.19.0")
public interface IndexStoreListener {
default void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {}

default void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {}

IndexStoreListener EMPTY = new IndexStoreListener() {
};

/**
* A Composite listener that multiplexes calls to each of the listeners methods.
*
* @opensearch.api
*/
@PublicApi(since = "2.19.0")
final class CompositeIndexStoreListener implements IndexStoreListener {
private final List<IndexStoreListener> listeners;
private final static Logger logger = LogManager.getLogger(CompositeIndexStoreListener.class);

public CompositeIndexStoreListener(List<IndexStoreListener> listeners) {
this.listeners = Collections.unmodifiableList(listeners);
}

@Override
public void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {
for (IndexStoreListener listener : listeners) {
try {
listener.beforeShardPathDeleted(shardId, indexSettings, env);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("beforeShardPathDeleted listener [{}] failed", listener), e);
}
}
}

@Override
public void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {
for (IndexStoreListener listener : listeners) {
try {
listener.beforeIndexPathDeleted(index, indexSettings, env);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("beforeIndexPathDeleted listener [{}] failed", listener), e);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.IndexStoreListener;

import java.io.IOException;
import java.nio.file.DirectoryStream;
Expand All @@ -33,7 +34,7 @@
*
* @opensearch.internal
*/
public class FileCacheCleaner implements NodeEnvironment.IndexStoreListener {
public class FileCacheCleaner implements IndexStoreListener {
private static final Logger logger = LogManager.getLogger(FileCacheCleaner.class);

private final Provider<FileCache> fileCacheProvider;
Expand Down
22 changes: 20 additions & 2 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@
import org.opensearch.index.recovery.RemoteStoreRestoreService;
import org.opensearch.index.remote.RemoteIndexPathUploader;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.store.IndexStoreListener;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheCleaner;
Expand Down Expand Up @@ -547,10 +548,27 @@ protected Node(
*/
this.environment = new Environment(settings, initialEnvironment.configFile(), Node.NODE_LOCAL_STORAGE_SETTING.get(settings));
Environment.assertEquivalent(initialEnvironment, this.environment);
Stream<IndexStoreListener> indexStoreListenerStream = pluginsService.filterPlugins(IndexStorePlugin.class)
.stream()
.map(IndexStorePlugin::getIndexStoreListener)
.filter(Optional::isPresent)
.map(Optional::get);
// FileCache is only initialized on search nodes, so we only create FileCacheCleaner on search nodes as well
if (DiscoveryNode.isSearchNode(settings) == false) {
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
nodeEnvironment = new NodeEnvironment(
settings,
environment,
new IndexStoreListener.CompositeIndexStoreListener(indexStoreListenerStream.collect(Collectors.toList()))
);
} else {
nodeEnvironment = new NodeEnvironment(settings, environment, new FileCacheCleaner(this::fileCache));
nodeEnvironment = new NodeEnvironment(
settings,
environment,
new IndexStoreListener.CompositeIndexStoreListener(
Stream.concat(indexStoreListenerStream, Stream.of(new FileCacheCleaner(this::fileCache)))
.collect(Collectors.toList())
)
);
}
logger.info(
"node name [{}], node ID [{}], cluster name [{}], roles {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.IndexStoreListener;
import org.opensearch.indices.recovery.RecoveryState;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;

/**
* A plugin that provides alternative directory implementations.
Expand Down Expand Up @@ -105,4 +107,11 @@ interface RecoveryStateFactory {
default Map<String, RecoveryStateFactory> getRecoveryStateFactories() {
return Collections.emptyMap();
}

/**
* The {@link IndexStoreListener}s for this plugin which are triggered upon shard/index path deletion
*/
default Optional<IndexStoreListener> getIndexStoreListener() {
return Optional.empty();
}
}
42 changes: 31 additions & 11 deletions server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.gateway.MetadataStateFormat;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.store.IndexStoreListener;
import org.opensearch.node.Node;
import org.opensearch.test.IndexSettingsModule;
import org.opensearch.test.NodeRoles;
Expand Down Expand Up @@ -360,24 +361,39 @@ protected void doRun() throws Exception {
}

public void testIndexStoreListener() throws Exception {
final AtomicInteger shardCounter = new AtomicInteger(0);
final AtomicInteger indexCounter = new AtomicInteger(0);
final AtomicInteger shardCounter1 = new AtomicInteger(0);
final AtomicInteger shardCounter2 = new AtomicInteger(0);
final AtomicInteger indexCounter1 = new AtomicInteger(0);
final AtomicInteger indexCounter2 = new AtomicInteger(0);
final Index index = new Index("foo", "fooUUID");
final ShardId shardId = new ShardId(index, 0);
final NodeEnvironment.IndexStoreListener listener = new NodeEnvironment.IndexStoreListener() {
final IndexStoreListener listener1 = new IndexStoreListener() {
@Override
public void beforeShardPathDeleted(ShardId inShardId, IndexSettings indexSettings, NodeEnvironment env) {
assertEquals(shardId, inShardId);
shardCounter.incrementAndGet();
shardCounter1.incrementAndGet();
}

@Override
public void beforeIndexPathDeleted(Index inIndex, IndexSettings indexSettings, NodeEnvironment env) {
assertEquals(index, inIndex);
indexCounter.incrementAndGet();
indexCounter1.incrementAndGet();
}
};
final NodeEnvironment env = newNodeEnvironment(listener);
final IndexStoreListener listener2 = new IndexStoreListener() {
@Override
public void beforeShardPathDeleted(ShardId inShardId, IndexSettings indexSettings, NodeEnvironment env) {
assertEquals(shardId, inShardId);
shardCounter2.incrementAndGet();
}

@Override
public void beforeIndexPathDeleted(Index inIndex, IndexSettings indexSettings, NodeEnvironment env) {
assertEquals(index, inIndex);
indexCounter2.incrementAndGet();
}
};
final NodeEnvironment env = newNodeEnvironment(new IndexStoreListener.CompositeIndexStoreListener(List.of(listener1, listener2)));

for (Path path : env.indexPaths(index)) {
Files.createDirectories(path.resolve("0"));
Expand All @@ -386,26 +402,30 @@ public void beforeIndexPathDeleted(Index inIndex, IndexSettings indexSettings, N
for (Path path : env.indexPaths(index)) {
assertTrue(Files.exists(path.resolve("0")));
}
assertEquals(0, shardCounter.get());
assertEquals(0, shardCounter1.get());
assertEquals(0, shardCounter2.get());

env.deleteShardDirectorySafe(new ShardId(index, 0), idxSettings);

for (Path path : env.indexPaths(index)) {
assertFalse(Files.exists(path.resolve("0")));
}
assertEquals(1, shardCounter.get());
assertEquals(1, shardCounter1.get());
assertEquals(1, shardCounter2.get());

for (Path path : env.indexPaths(index)) {
assertTrue(Files.exists(path));
}
assertEquals(0, indexCounter.get());
assertEquals(0, indexCounter1.get());
assertEquals(0, indexCounter2.get());

env.deleteIndexDirectorySafe(index, 5000, idxSettings);

for (Path path : env.indexPaths(index)) {
assertFalse(Files.exists(path));
}
assertEquals(1, indexCounter.get());
assertEquals(1, indexCounter1.get());
assertEquals(1, indexCounter2.get());
assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty());
env.close();
}
Expand Down Expand Up @@ -680,7 +700,7 @@ public NodeEnvironment newNodeEnvironment() throws IOException {
return newNodeEnvironment(Settings.EMPTY);
}

public NodeEnvironment newNodeEnvironment(NodeEnvironment.IndexStoreListener listener) throws IOException {
public NodeEnvironment newNodeEnvironment(IndexStoreListener listener) throws IOException {
Settings build = buildEnvSettings(Settings.EMPTY);
return new NodeEnvironment(build, TestEnvironment.newEnvironment(build), listener);
}
Expand Down

0 comments on commit 3e1e2cf

Please sign in to comment.