diff --git a/CHANGELOG.md b/CHANGELOG.md index 55750d42d5701..cd519ac3b8575 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for restoring from snapshot with search replicas ([#16111](https://github.com/opensearch-project/OpenSearch/pull/16111)) - Ensure support of the transport-nio by security plugin ([#16474](https://github.com/opensearch-project/OpenSearch/pull/16474)) - Switch from `buildSrc/version.properties` to Gradle version catalog (`gradle/libs.versions.toml`) to enable dependabot to perform automated upgrades on common libs ([#16284](https://github.com/opensearch-project/OpenSearch/pull/16284)) -- Add dynamic setting allowing size > 0 requests to be cached in the request cache ([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483/files)) +- Add dynamic setting allowing size > 0 requests to be cached in the request cache ([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483)) +- Make IndexStoreListener a pluggable interface ([#16583](https://github.com/opensearch-project/OpenSearch/pull/16583)) ### Dependencies - Bump `com.google.apis:google-api-services-compute` from v1-rev20240407-2.0.0 to v1-rev20241021-2.0.0 ([#16502](https://github.com/opensearch-project/OpenSearch/pull/16502), [#16548](https://github.com/opensearch-project/OpenSearch/pull/16548)) diff --git a/server/src/main/java/org/opensearch/env/NodeEnvironment.java b/server/src/main/java/org/opensearch/env/NodeEnvironment.java index 80ab7448d8d09..3915f5adfa9ed 100644 --- a/server/src/main/java/org/opensearch/env/NodeEnvironment.java +++ b/server/src/main/java/org/opensearch/env/NodeEnvironment.java @@ -71,6 +71,7 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; import org.opensearch.index.store.FsDirectoryFactory; +import org.opensearch.index.store.IndexStoreListener; import org.opensearch.monitor.fs.FsInfo; import org.opensearch.monitor.fs.FsProbe; import org.opensearch.monitor.jvm.JvmInfo; @@ -199,7 +200,7 @@ public String toString() { private final NodeMetadata nodeMetadata; - private final IndexStoreListener indexStoreListener; + private final org.opensearch.index.store.IndexStoreListener indexStoreListener; /** * Maximum number of data nodes that should run in an environment. @@ -298,14 +299,24 @@ public void close() { } public NodeEnvironment(Settings settings, Environment environment) throws IOException { - this(settings, environment, IndexStoreListener.EMPTY); + this(settings, environment, org.opensearch.index.store.IndexStoreListener.EMPTY); + } + + /** + * Use {@link NodeEnvironment#NodeEnvironment(Settings, Environment, org.opensearch.index.store.IndexStoreListener)} instead. + * This constructor is kept around on 2.x to avoid breaking changes. + */ + @Deprecated(forRemoval = true, since = "2.19.0") + public NodeEnvironment(Settings settings, Environment environment, IndexStoreListener indexStoreListener) throws IOException { + this(settings, environment, (org.opensearch.index.store.IndexStoreListener) indexStoreListener); } /** * Setup the environment. * @param settings settings from opensearch.yml */ - public NodeEnvironment(Settings settings, Environment environment, IndexStoreListener indexStoreListener) throws IOException { + public NodeEnvironment(Settings settings, Environment environment, org.opensearch.index.store.IndexStoreListener indexStoreListener) + throws IOException { if (DiscoveryNode.nodeRequiresLocalStorage(settings) == false) { nodePaths = null; fileCacheNodePath = null; @@ -313,7 +324,7 @@ public NodeEnvironment(Settings settings, Environment environment, IndexStoreLis locks = null; nodeLockId = -1; nodeMetadata = new NodeMetadata(generateNodeId(settings), Version.CURRENT); - this.indexStoreListener = IndexStoreListener.EMPTY; + this.indexStoreListener = org.opensearch.index.store.IndexStoreListener.EMPTY; return; } boolean success = false; @@ -1414,15 +1425,12 @@ private static void tryWriteTempFile(Path path) throws IOException { } /** - * A listener that is executed on per-index and per-shard store events, like deleting shard path + * Use {@link org.opensearch.index.store.IndexStoreListener} instead. This interface is kept around on 2.x to avoid breaking changes. * * @opensearch.internal */ - public interface IndexStoreListener { - default void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {} - - default void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {} - + @Deprecated(forRemoval = true, since = "2.19.0") + public interface IndexStoreListener extends org.opensearch.index.store.IndexStoreListener { IndexStoreListener EMPTY = new IndexStoreListener() { }; } diff --git a/server/src/main/java/org/opensearch/index/store/IndexStoreListener.java b/server/src/main/java/org/opensearch/index/store/IndexStoreListener.java new file mode 100644 index 0000000000000..5a8dd28d43bbc --- /dev/null +++ b/server/src/main/java/org/opensearch/index/store/IndexStoreListener.java @@ -0,0 +1,73 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.store; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.common.annotation.PublicApi; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.index.IndexSettings; + +import java.util.Collections; +import java.util.List; + +/** + * A listener that is executed on per-index and per-shard store events, like deleting shard path + * + * @opensearch.api + */ +@PublicApi(since = "2.19.0") +public interface IndexStoreListener { + default void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) {} + + default void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) {} + + IndexStoreListener EMPTY = new IndexStoreListener() { + }; + + /** + * A Composite listener that multiplexes calls to each of the listeners methods. + * + * @opensearch.api + */ + @PublicApi(since = "2.19.0") + final class CompositeIndexStoreListener implements IndexStoreListener { + private final List listeners; + private final static Logger logger = LogManager.getLogger(CompositeIndexStoreListener.class); + + public CompositeIndexStoreListener(List listeners) { + this.listeners = Collections.unmodifiableList(listeners); + } + + @Override + public void beforeShardPathDeleted(ShardId shardId, IndexSettings indexSettings, NodeEnvironment env) { + for (IndexStoreListener listener : listeners) { + try { + listener.beforeShardPathDeleted(shardId, indexSettings, env); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("beforeShardPathDeleted listener [{}] failed", listener), e); + } + } + } + + @Override + public void beforeIndexPathDeleted(Index index, IndexSettings indexSettings, NodeEnvironment env) { + for (IndexStoreListener listener : listeners) { + try { + listener.beforeIndexPathDeleted(index, indexSettings, env); + } catch (Exception e) { + logger.warn(() -> new ParameterizedMessage("beforeIndexPathDeleted listener [{}] failed", listener), e); + } + } + } + } +} diff --git a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java index 0261ab24dfa7a..3cdd41b94a5e9 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java +++ b/server/src/main/java/org/opensearch/index/store/remote/filecache/FileCacheCleaner.java @@ -18,6 +18,7 @@ import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; +import org.opensearch.index.store.IndexStoreListener; import java.io.IOException; import java.nio.file.DirectoryStream; @@ -33,7 +34,7 @@ * * @opensearch.internal */ -public class FileCacheCleaner implements NodeEnvironment.IndexStoreListener { +public class FileCacheCleaner implements IndexStoreListener { private static final Logger logger = LogManager.getLogger(FileCacheCleaner.class); private final Provider fileCacheProvider; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index e0bdd64daf5bb..ee28d416cf5ee 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -156,6 +156,7 @@ import org.opensearch.index.recovery.RemoteStoreRestoreService; import org.opensearch.index.remote.RemoteIndexPathUploader; import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; +import org.opensearch.index.store.IndexStoreListener; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.index.store.remote.filecache.FileCache; import org.opensearch.index.store.remote.filecache.FileCacheCleaner; @@ -547,10 +548,27 @@ protected Node( */ this.environment = new Environment(settings, initialEnvironment.configFile(), Node.NODE_LOCAL_STORAGE_SETTING.get(settings)); Environment.assertEquivalent(initialEnvironment, this.environment); + Stream indexStoreListenerStream = pluginsService.filterPlugins(IndexStorePlugin.class) + .stream() + .map(IndexStorePlugin::getIndexStoreListener) + .filter(Optional::isPresent) + .map(Optional::get); + // FileCache is only initialized on search nodes, so we only create FileCacheCleaner on search nodes as well if (DiscoveryNode.isSearchNode(settings) == false) { - nodeEnvironment = new NodeEnvironment(tmpSettings, environment); + nodeEnvironment = new NodeEnvironment( + settings, + environment, + new IndexStoreListener.CompositeIndexStoreListener(indexStoreListenerStream.collect(Collectors.toList())) + ); } else { - nodeEnvironment = new NodeEnvironment(settings, environment, new FileCacheCleaner(this::fileCache)); + nodeEnvironment = new NodeEnvironment( + settings, + environment, + new IndexStoreListener.CompositeIndexStoreListener( + Stream.concat(indexStoreListenerStream, Stream.of(new FileCacheCleaner(this::fileCache))) + .collect(Collectors.toList()) + ) + ); } logger.info( "node name [{}], node ID [{}], cluster name [{}], roles {}", diff --git a/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java b/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java index ebd5717a00319..f0df8a122ed7d 100644 --- a/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java +++ b/server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java @@ -39,11 +39,13 @@ import org.opensearch.common.annotation.PublicApi; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; +import org.opensearch.index.store.IndexStoreListener; import org.opensearch.indices.recovery.RecoveryState; import java.io.IOException; import java.util.Collections; import java.util.Map; +import java.util.Optional; /** * A plugin that provides alternative directory implementations. @@ -105,4 +107,11 @@ interface RecoveryStateFactory { default Map getRecoveryStateFactories() { return Collections.emptyMap(); } + + /** + * The {@link IndexStoreListener}s for this plugin which are triggered upon shard/index path deletion + */ + default Optional getIndexStoreListener() { + return Optional.empty(); + } } diff --git a/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java b/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java index 962eb743dca6e..3ee9e859c198f 100644 --- a/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java +++ b/server/src/test/java/org/opensearch/env/NodeEnvironmentTests.java @@ -45,6 +45,7 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.gateway.MetadataStateFormat; import org.opensearch.index.IndexSettings; +import org.opensearch.index.store.IndexStoreListener; import org.opensearch.node.Node; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.NodeRoles; @@ -360,24 +361,39 @@ protected void doRun() throws Exception { } public void testIndexStoreListener() throws Exception { - final AtomicInteger shardCounter = new AtomicInteger(0); - final AtomicInteger indexCounter = new AtomicInteger(0); + final AtomicInteger shardCounter1 = new AtomicInteger(0); + final AtomicInteger shardCounter2 = new AtomicInteger(0); + final AtomicInteger indexCounter1 = new AtomicInteger(0); + final AtomicInteger indexCounter2 = new AtomicInteger(0); final Index index = new Index("foo", "fooUUID"); final ShardId shardId = new ShardId(index, 0); - final NodeEnvironment.IndexStoreListener listener = new NodeEnvironment.IndexStoreListener() { + final IndexStoreListener listener1 = new IndexStoreListener() { @Override public void beforeShardPathDeleted(ShardId inShardId, IndexSettings indexSettings, NodeEnvironment env) { assertEquals(shardId, inShardId); - shardCounter.incrementAndGet(); + shardCounter1.incrementAndGet(); } @Override public void beforeIndexPathDeleted(Index inIndex, IndexSettings indexSettings, NodeEnvironment env) { assertEquals(index, inIndex); - indexCounter.incrementAndGet(); + indexCounter1.incrementAndGet(); } }; - final NodeEnvironment env = newNodeEnvironment(listener); + final IndexStoreListener listener2 = new IndexStoreListener() { + @Override + public void beforeShardPathDeleted(ShardId inShardId, IndexSettings indexSettings, NodeEnvironment env) { + assertEquals(shardId, inShardId); + shardCounter2.incrementAndGet(); + } + + @Override + public void beforeIndexPathDeleted(Index inIndex, IndexSettings indexSettings, NodeEnvironment env) { + assertEquals(index, inIndex); + indexCounter2.incrementAndGet(); + } + }; + final NodeEnvironment env = newNodeEnvironment(new IndexStoreListener.CompositeIndexStoreListener(List.of(listener1, listener2))); for (Path path : env.indexPaths(index)) { Files.createDirectories(path.resolve("0")); @@ -386,26 +402,30 @@ public void beforeIndexPathDeleted(Index inIndex, IndexSettings indexSettings, N for (Path path : env.indexPaths(index)) { assertTrue(Files.exists(path.resolve("0"))); } - assertEquals(0, shardCounter.get()); + assertEquals(0, shardCounter1.get()); + assertEquals(0, shardCounter2.get()); env.deleteShardDirectorySafe(new ShardId(index, 0), idxSettings); for (Path path : env.indexPaths(index)) { assertFalse(Files.exists(path.resolve("0"))); } - assertEquals(1, shardCounter.get()); + assertEquals(1, shardCounter1.get()); + assertEquals(1, shardCounter2.get()); for (Path path : env.indexPaths(index)) { assertTrue(Files.exists(path)); } - assertEquals(0, indexCounter.get()); + assertEquals(0, indexCounter1.get()); + assertEquals(0, indexCounter2.get()); env.deleteIndexDirectorySafe(index, 5000, idxSettings); for (Path path : env.indexPaths(index)) { assertFalse(Files.exists(path)); } - assertEquals(1, indexCounter.get()); + assertEquals(1, indexCounter1.get()); + assertEquals(1, indexCounter2.get()); assertTrue("LockedShards: " + env.lockedShards(), env.lockedShards().isEmpty()); env.close(); } @@ -680,7 +700,7 @@ public NodeEnvironment newNodeEnvironment() throws IOException { return newNodeEnvironment(Settings.EMPTY); } - public NodeEnvironment newNodeEnvironment(NodeEnvironment.IndexStoreListener listener) throws IOException { + public NodeEnvironment newNodeEnvironment(IndexStoreListener listener) throws IOException { Settings build = buildEnvSettings(Settings.EMPTY); return new NodeEnvironment(build, TestEnvironment.newEnvironment(build), listener); }