From 0726c1e7bb6f1189fb3291f74a7a9255da907caa Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Mon, 16 Sep 2024 10:15:18 +0530 Subject: [PATCH 1/4] Make Remote Publication a dynamic setting Signed-off-by: Shivansh Arora --- .../remote/RemoteStatePublicationIT.java | 279 +++++++++++++++++- .../coordination/CoordinationState.java | 15 +- .../cluster/coordination/Coordinator.java | 7 +- .../remote/RemoteClusterStateService.java | 17 +- .../remote/RemotePersistenceStats.java | 4 +- .../coordination/CoordinationStateTests.java | 2 +- .../coordination/PreVoteCollectorTests.java | 3 +- .../coordination/PublicationTests.java | 8 +- .../CoordinationStateTestCluster.java | 4 +- 9 files changed, 323 insertions(+), 16 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java index 2409b5d0d0e45..0ceba16ddeecc 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java @@ -8,7 +8,10 @@ package org.opensearch.gateway.remote; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.opensearch.action.admin.cluster.node.stats.NodeStats; import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; @@ -19,6 +22,8 @@ import org.opensearch.cluster.coordination.PublishClusterStateStats; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.discovery.DiscoveryStats; import org.opensearch.gateway.GatewayMetaState; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; @@ -27,19 +32,32 @@ import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; import org.opensearch.index.remote.RemoteStoreEnums.PathType; import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.plugins.NetworkPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.PluginsService; import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.fs.ReloadableFsRepository; +import org.opensearch.tasks.Task; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; import org.opensearch.test.OpenSearchIntegTestCase.Scope; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportInterceptor; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; import org.junit.Before; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Base64; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; @@ -50,6 +68,8 @@ import static org.opensearch.action.admin.cluster.node.info.NodesInfoRequest.Metric.SETTINGS; import static org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest.Metric.DISCOVERY; +import static org.opensearch.cluster.coordination.PublicationTransportHandler.PUBLISH_REMOTE_STATE_ACTION_NAME; +import static org.opensearch.cluster.coordination.PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME; import static org.opensearch.cluster.metadata.Metadata.isGlobalStateEquals; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.DISCOVERY_NODES; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; @@ -57,6 +77,8 @@ import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteDownloadStats.CHECKSUM_VALIDATION_FAILED_COUNT; +import static org.opensearch.gateway.remote.RemotePersistenceStats.DIFF_DOWNLOAD_STATS; +import static org.opensearch.gateway.remote.RemotePersistenceStats.FULL_DOWNLOAD_STATS; import static org.opensearch.gateway.remote.model.RemoteClusterBlocks.CLUSTER_BLOCKS; import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA; import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_METADATA; @@ -88,6 +110,7 @@ public void setup() { isRemotePublicationEnabled = true; hasRemoteStateCharPrefix = randomBoolean(); hasRemoteRoutingCharPrefix = randomBoolean(); + clearInterceptedActions(); } @Override @@ -132,6 +155,13 @@ protected Settings nodeSettings(int nodeOrdinal) { .build(); } + @Override + protected Collection> nodePlugins() { + List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(InterceptingTransportService.TestPlugin.class); + return plugins; + } + public void testPublication() throws Exception { // create cluster with multi node (3 master + 2 data) prepareCluster(3, 2, INDEX_NAME, 1, 2); @@ -236,7 +266,7 @@ public void testRemotePublicationDownloadStats() { .addMetric(DISCOVERY.metricName()) .get(); - assertDataNodeDownloadStats(nodesStatsResponseDataNode); + assertDataNodeDownloadStats(nodesStatsResponseDataNode.getNodes().get(0)); } public void testRemotePublicationDisabledByRollingRestart() throws Exception { @@ -399,9 +429,162 @@ public void testVotingConfigAreCommitted() throws ExecutionException, Interrupte }); } - private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) { + public void testRemotePublicationDisabledAfterSettingChange() throws ExecutionException, InterruptedException { + prepareCluster(3, 2, Settings.EMPTY); + ensureStableCluster(5); + interceptTransportActions(PUBLISH_REMOTE_STATE_ACTION_NAME, PUBLISH_STATE_ACTION_NAME); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 2)); + ensureGreen(INDEX_NAME); + + assertEquals(0, (int) getRequestCount(PUBLISH_STATE_ACTION_NAME)); + assertTrue(getRequestCount(PUBLISH_REMOTE_STATE_ACTION_NAME) > 0); + + // disable remote publication + internalCluster().client() + .admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(REMOTE_PUBLICATION_SETTING_KEY, false).build()) + .get(); + + clearInterceptedActions(); + interceptTransportActions(PUBLISH_REMOTE_STATE_ACTION_NAME, PUBLISH_STATE_ACTION_NAME); + + createIndex(INDEX_NAME + "2", remoteStoreIndexSettings(1, 2)); + ensureGreen(INDEX_NAME + "2"); + + assertTrue(getRequestCount(PUBLISH_STATE_ACTION_NAME) > 0); + assertEquals(0, (int) getRequestCount(PUBLISH_REMOTE_STATE_ACTION_NAME)); + } + + public void testRemotePublicationEnabledAfterSettingChange() { + isRemotePublicationEnabled = false; + prepareCluster(3, 2, Settings.EMPTY); + interceptTransportActions(PUBLISH_REMOTE_STATE_ACTION_NAME, PUBLISH_STATE_ACTION_NAME); + + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 2)); + ensureGreen(INDEX_NAME); + + assertTrue(getRequestCount(PUBLISH_STATE_ACTION_NAME) > 0); + assertEquals(0, (int) getRequestCount(PUBLISH_REMOTE_STATE_ACTION_NAME)); + + // enable remote publication + internalCluster().client() + .admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(REMOTE_PUBLICATION_SETTING_KEY, true).build()) + .get(); + + clearInterceptedActions(); + interceptTransportActions(PUBLISH_REMOTE_STATE_ACTION_NAME, PUBLISH_STATE_ACTION_NAME); + + createIndex(INDEX_NAME + "2", remoteStoreIndexSettings(1, 2)); + ensureGreen(INDEX_NAME + "2"); + + assertEquals(0, (int) getRequestCount(PUBLISH_STATE_ACTION_NAME)); + assertTrue(getRequestCount(PUBLISH_REMOTE_STATE_ACTION_NAME) > 0); + } + + public void testRemotePublicationSettingChangePersistedAfterRestart() throws Exception { + isRemotePublicationEnabled = false; + prepareCluster(3, 2, Settings.EMPTY); + ensureStableCluster(5); + // enable remote publication + internalCluster().client() + .admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(REMOTE_PUBLICATION_SETTING_KEY, true).build()) + .get(); + interceptTransportActions(PUBLISH_REMOTE_STATE_ACTION_NAME, PUBLISH_STATE_ACTION_NAME); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 2)); + ensureGreen(INDEX_NAME); + + assertEquals(0, (int) getRequestCount(PUBLISH_STATE_ACTION_NAME)); + assertTrue(getRequestCount(PUBLISH_REMOTE_STATE_ACTION_NAME) > 0); + + // disable remote publication + internalCluster().client() + .admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(REMOTE_PUBLICATION_SETTING_KEY, false).build()) + .get(); + + clearInterceptedActions(); + interceptTransportActions(PUBLISH_REMOTE_STATE_ACTION_NAME, PUBLISH_STATE_ACTION_NAME); + + internalCluster().rollingRestart(new InternalTestCluster.RestartCallback() { + @Override + public void doAfterNodes(int n, Client client) { + interceptTransportActions(PUBLISH_REMOTE_STATE_ACTION_NAME, PUBLISH_STATE_ACTION_NAME); + } + }); + + ensureStableCluster(5); + assertEquals(0, (int) getRequestCount(PUBLISH_REMOTE_STATE_ACTION_NAME)); + assertTrue(getRequestCount(PUBLISH_STATE_ACTION_NAME) > 0); + } + + public void testRemotePublicationSettingChangePersistedAfterFullRestart() throws Exception { + isRemotePublicationEnabled = false; + prepareCluster(3, 2, Settings.EMPTY); + ensureStableCluster(5); + // enable remote publication + internalCluster().client() + .admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(REMOTE_PUBLICATION_SETTING_KEY, true).build()) + .get(); + interceptTransportActions(PUBLISH_REMOTE_STATE_ACTION_NAME, PUBLISH_STATE_ACTION_NAME); + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 2)); + ensureGreen(INDEX_NAME); + + assertEquals(0, (int) getRequestCount(PUBLISH_STATE_ACTION_NAME)); + assertTrue(getRequestCount(PUBLISH_REMOTE_STATE_ACTION_NAME) > 0); + + // disable remote publication + internalCluster().client() + .admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().put(REMOTE_PUBLICATION_SETTING_KEY, false).build()) + .get(); + + internalCluster().fullRestart(); + + ensureStableCluster(5); + NodesStatsResponse response = internalCluster().client() + .admin() + .cluster() + .prepareNodesStats(internalCluster().getNodeNames()) + .clear() + .addMetric(DISCOVERY.metricName()) + .get(); + response.getNodes().forEach(nodeStats -> { + DiscoveryStats discoveryStats = nodeStats.getDiscoveryStats(); + assertNotNull(discoveryStats.getClusterStateStats()); + // ensure none of the nodes received remote publication + discoveryStats.getClusterStateStats() + .getPersistenceStats() + .stream() + .filter( + persistedStateStats -> persistedStateStats.getStatsName().equals(FULL_DOWNLOAD_STATS) + || persistedStateStats.getStatsName().equals(DIFF_DOWNLOAD_STATS) + ) + .forEach(persistedStateStats -> { + assertEquals(0, persistedStateStats.getSuccessCount()); + assertEquals(0, persistedStateStats.getFailedCount()); + assertEquals(0, persistedStateStats.getTotalTimeInMillis()); + }); + }); + } + + private void assertDataNodeDownloadStats(NodeStats nodeStats) { // assert cluster state stats for data node - DiscoveryStats dataNodeDiscoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats(); + DiscoveryStats dataNodeDiscoveryStats = nodeStats.getDiscoveryStats(); assertNotNull(dataNodeDiscoveryStats.getClusterStateStats()); assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getUpdateSuccess()); assertTrue(dataNodeDiscoveryStats.getClusterStateStats().getPersistenceStats().get(0).getSuccessCount() > 0); @@ -446,4 +629,94 @@ private Map getMetadataFiles(BlobStoreRepository repository, St return fileName.split(DELIMITER)[0]; }).collect(Collectors.toMap(Function.identity(), key -> 1, Integer::sum)); } + + private static void clearInterceptedActions() { + Iterable pluginsServices = internalCluster().getInstances(PluginsService.class); + for (PluginsService pluginsService : pluginsServices) { + pluginsService.filterPlugins(InterceptingTransportService.TestPlugin.class).stream().findFirst().get().instance + .clearInterceptedActions(); + } + } + + private static void interceptTransportActions(String... actions) { + Iterable pluginsServices = internalCluster().getInstances(PluginsService.class); + for (PluginsService pluginsService : pluginsServices) { + pluginsService.filterPlugins(InterceptingTransportService.TestPlugin.class).stream().findFirst().get().instance + .interceptTransportActions(actions); + } + } + + private static Integer getRequestCount(String action) { + Iterable pluginsServices = internalCluster().getInstances(PluginsService.class); + return pluginsServices.iterator() + .next() + .filterPlugins(InterceptingTransportService.TestPlugin.class) + .stream() + .findFirst() + .get().instance.getRequestCount(action); + } + + public static class InterceptingTransportService implements TransportInterceptor { + + public static class TestPlugin extends Plugin implements NetworkPlugin { + public final InterceptingTransportService instance = new InterceptingTransportService(); + + @Override + public List getTransportInterceptors( + NamedWriteableRegistry namedWriteableRegistry, + ThreadContext threadContext + ) { + return Collections.singletonList(instance); + } + } + + private final Set actions = new HashSet<>(); + private final Map requests = new HashMap<>(); + + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler + ) { + return new InterceptingTransportService.InterceptingHandler<>(action, actualHandler); + } + + synchronized Integer getRequestCount(String action) { + return requests.getOrDefault(action, 0); + } + + synchronized void interceptTransportActions(String... actions) { + Collections.addAll(this.actions, actions); + } + + synchronized void clearInterceptedActions() { + actions.clear(); + requests.clear(); + } + + private class InterceptingHandler implements TransportRequestHandler { + private final String action; + private final TransportRequestHandler handler; + private final Logger logger = LogManager.getLogger(InterceptingHandler.class); + + InterceptingHandler(String action, TransportRequestHandler handler) { + this.action = action; + this.handler = handler; + } + + @Override + public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { + synchronized (this) { + if (actions.contains(action)) { + Integer requestCount = requests.getOrDefault(action, 0); + requests.put(action, requestCount + 1); + logger.info("intercepted action: {} count: {}", action, requestCount + 1); + } + } + handler.messageReceived(request, channel, task); + } + } + } } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index 9cffc7051d756..a0d3e39aea092 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -38,6 +38,7 @@ import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; import org.opensearch.gateway.remote.ClusterMetadataManifest; @@ -87,7 +88,8 @@ public CoordinationState( DiscoveryNode localNode, PersistedStateRegistry persistedStateRegistry, ElectionStrategy electionStrategy, - Settings settings + Settings settings, + ClusterSettings clusterSettings ) { this.localNode = localNode; @@ -105,10 +107,10 @@ public CoordinationState( .getLastAcceptedConfiguration(); this.publishVotes = new VoteCollection(); this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings); - // ToDo: revisit this check while making the setting dynamic this.isRemotePublicationEnabled = isRemoteStateEnabled && REMOTE_PUBLICATION_SETTING.get(settings) && localNode.isRemoteStatePublicationEnabled(); + clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting); } public boolean isRemotePublicationEnabled() { @@ -651,6 +653,15 @@ private boolean shouldCommitRemotePersistedState() { && persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest() != null; } + private void setRemotePublicationSetting(boolean remotePublicationSetting) { + if (remotePublicationSetting == false) { + this.isRemotePublicationEnabled = false; + } else { + this.isRemotePublicationEnabled = isRemoteStateEnabled && localNode.isRemoteStatePublicationConfigured(); + } + + } + /** * Pluggable persistence layer for {@link CoordinationState}. * diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 1b3ae89251ac0..a56b8f5bdece3 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -187,9 +187,11 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private Optional currentPublication = Optional.empty(); private final NodeHealthService nodeHealthService; private final PersistedStateRegistry persistedStateRegistry; + private final RemoteClusterStateService remoteClusterStateService; private final RemoteStoreNodeService remoteStoreNodeService; private NodeConnectionsService nodeConnectionsService; private final RemoteClusterStateService remoteClusterStateService; + private final ClusterSettings clusterSettings; /** * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}. @@ -314,6 +316,7 @@ public Coordinator( this.localNodeCommissioned = true; this.remoteStoreNodeService = remoteStoreNodeService; this.remoteClusterStateService = remoteClusterStateService; + this.clusterSettings = clusterSettings; } private ClusterFormationState getClusterFormationState() { @@ -869,7 +872,9 @@ boolean publicationInProgress() { @Override protected void doStart() { synchronized (mutex) { - coordinationState.set(new CoordinationState(getLocalNode(), persistedStateRegistry, electionStrategy, settings)); + coordinationState.set( + new CoordinationState(getLocalNode(), persistedStateRegistry, electionStrategy, settings, clusterSettings) + ); peerFinder.setCurrentTerm(getCurrentTerm()); configuredHostsResolver.start(); final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState(); diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 1d7200792442f..849a2f5b43b09 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -112,6 +112,8 @@ import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateConfigured; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** * A Service which provides APIs to upload and download cluster metadata from remote store. @@ -132,7 +134,7 @@ public class RemoteClusterStateService implements Closeable { REMOTE_PUBLICATION_SETTING_KEY, false, Property.NodeScope, - Property.Final + Property.Dynamic ); /** @@ -232,7 +234,7 @@ public static RemoteClusterStateValidationMode parseString(String mode) { private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged " + "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata " + "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]"; - private final boolean isPublicationEnabled; + private boolean isPublicationEnabled; private final String remotePathPrefix; private final RemoteClusterStateCache remoteClusterStateCache; @@ -273,9 +275,10 @@ public RemoteClusterStateService( this.remoteStateStats = new RemotePersistenceStats(); this.namedWriteableRegistry = namedWriteableRegistry; this.indexMetadataUploadListeners = indexMetadataUploadListeners; - this.isPublicationEnabled = REMOTE_PUBLICATION_SETTING.get(settings) + this.isPublicationEnabled = clusterSettings.get(REMOTE_PUBLICATION_SETTING) && RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings) && RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings); + clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting); this.remotePathPrefix = CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.get(settings); this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService( repositoriesService, @@ -1115,6 +1118,14 @@ private void setChecksumValidationMode(RemoteClusterStateValidationMode remoteCl this.remoteClusterStateValidationMode = remoteClusterStateValidationMode; } + private void setRemotePublicationSetting(boolean remotePublicationSetting) { + if (remotePublicationSetting == false) { + this.isPublicationEnabled = false; + } else { + this.isPublicationEnabled = isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableEnabled(settings); + } + } + // Package private for unit test RemoteRoutingTableService getRemoteRoutingTableService() { return this.remoteRoutingTableService; diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java b/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java index 11f26ac8b3ed9..1a8e85f30527d 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemotePersistenceStats.java @@ -21,8 +21,8 @@ public class RemotePersistenceStats { RemoteDownloadStats remoteDiffDownloadStats; RemoteDownloadStats remoteFullDownloadStats; - final String FULL_DOWNLOAD_STATS = "remote_full_download"; - final String DIFF_DOWNLOAD_STATS = "remote_diff_download"; + public static final String FULL_DOWNLOAD_STATS = "remote_full_download"; + public static final String DIFF_DOWNLOAD_STATS = "remote_diff_download"; public RemotePersistenceStats() { remoteUploadStats = new RemoteUploadStats(); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index 32cb95e0c04f6..a77a94723d966 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -1283,7 +1283,7 @@ public static CoordinationState createCoordinationState( DiscoveryNode localNode, Settings settings ) { - return new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, settings); + return new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, settings, null); } public static ClusterState clusterState( diff --git a/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java index 5ddf614db3334..1852be6310f05 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java @@ -302,7 +302,8 @@ public void testPrevotingIndicatesElectionSuccess() { localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, - Settings.EMPTY + Settings.EMPTY, + null ); final long newTerm = randomLongBetween(currentTerm + 1, Long.MAX_VALUE); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java index 4d18ff95887dd..a2c54dcc88efb 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java @@ -94,7 +94,13 @@ class MockNode { ); PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, initialState)); - coordinationState = new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, Settings.EMPTY); + coordinationState = new CoordinationState( + localNode, + persistedStateRegistry, + ElectionStrategy.DEFAULT_INSTANCE, + Settings.EMPTY, + null + ); } final DiscoveryNode localNode; diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java index cbe695cbb2136..2215ecafc16c9 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java @@ -150,7 +150,7 @@ static class ClusterNode { persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, persistedState); this.electionStrategy = electionStrategy; - state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY); + state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY, null); } void reboot() { @@ -189,7 +189,7 @@ void reboot() { localNode.getVersion() ); - state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY); + state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY, null); } void setInitialState(CoordinationMetadata.VotingConfiguration initialConfig, long initialValue) { From e2122554144d6979319b7322110140249df1d0f2 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Thu, 3 Oct 2024 12:41:07 +0530 Subject: [PATCH 2/4] removed isRemotePublicationEnabled flag from CoordinationState Signed-off-by: Shivansh Arora --- .../coordination/CoordinationState.java | 23 +---------- .../cluster/coordination/Coordinator.java | 10 ++--- .../remote/RemoteClusterStateService.java | 40 ++++++++++--------- .../coordination/CoordinationStateTests.java | 13 +----- .../coordination/PreVoteCollectorTests.java | 3 +- .../coordination/PublicationTests.java | 8 +--- .../RemoteClusterStateServiceTests.java | 6 +++ .../CoordinationStateTestCluster.java | 4 +- 8 files changed, 38 insertions(+), 69 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index a0d3e39aea092..01b02db20fb24 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -38,7 +38,6 @@ import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; import org.opensearch.gateway.remote.ClusterMetadataManifest; @@ -54,7 +53,6 @@ import java.util.Set; import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -82,14 +80,12 @@ public class CoordinationState { private VotingConfiguration lastPublishedConfiguration; private VoteCollection publishVotes; private final boolean isRemoteStateEnabled; - private boolean isRemotePublicationEnabled; public CoordinationState( DiscoveryNode localNode, PersistedStateRegistry persistedStateRegistry, ElectionStrategy electionStrategy, - Settings settings, - ClusterSettings clusterSettings + Settings settings ) { this.localNode = localNode; @@ -107,14 +103,6 @@ public CoordinationState( .getLastAcceptedConfiguration(); this.publishVotes = new VoteCollection(); this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings); - this.isRemotePublicationEnabled = isRemoteStateEnabled - && REMOTE_PUBLICATION_SETTING.get(settings) - && localNode.isRemoteStatePublicationEnabled(); - clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting); - } - - public boolean isRemotePublicationEnabled() { - return isRemotePublicationEnabled; } public long getCurrentTerm() { @@ -653,15 +641,6 @@ private boolean shouldCommitRemotePersistedState() { && persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest() != null; } - private void setRemotePublicationSetting(boolean remotePublicationSetting) { - if (remotePublicationSetting == false) { - this.isRemotePublicationEnabled = false; - } else { - this.isRemotePublicationEnabled = isRemoteStateEnabled && localNode.isRemoteStatePublicationConfigured(); - } - - } - /** * Pluggable persistence layer for {@link CoordinationState}. * diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index a56b8f5bdece3..2ea251decd056 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -872,9 +872,7 @@ boolean publicationInProgress() { @Override protected void doStart() { synchronized (mutex) { - coordinationState.set( - new CoordinationState(getLocalNode(), persistedStateRegistry, electionStrategy, settings, clusterSettings) - ); + coordinationState.set(new CoordinationState(getLocalNode(), persistedStateRegistry, electionStrategy, settings)); peerFinder.setCurrentTerm(getCurrentTerm()); configuredHostsResolver.start(); final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState(); @@ -1363,7 +1361,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId()) final PublicationTransportHandler.PublicationContext publicationContext = publicationHandler.newPublicationContext( clusterChangedEvent, - coordinationState.get().isRemotePublicationEnabled(), + this.isRemotePublicationEnabled(), persistedStateRegistry ); logger.debug("initialized PublicationContext using class: {}", publicationContext.getClass().toString()); @@ -1892,8 +1890,8 @@ public static boolean isZen1Node(DiscoveryNode discoveryNode) { } public boolean isRemotePublicationEnabled() { - if (coordinationState.get() != null) { - return coordinationState.get().isRemotePublicationEnabled(); + if (remoteClusterStateService != null) { + return remoteClusterStateService.isRemotePublicationEnabled(); } return false; } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 849a2f5b43b09..abb5d826315ed 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -83,6 +83,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.LongSupplier; @@ -234,7 +235,7 @@ public static RemoteClusterStateValidationMode parseString(String mode) { private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged " + "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata " + "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]"; - private boolean isPublicationEnabled; + private AtomicBoolean isPublicationEnabled; private final String remotePathPrefix; private final RemoteClusterStateCache remoteClusterStateCache; @@ -275,9 +276,11 @@ public RemoteClusterStateService( this.remoteStateStats = new RemotePersistenceStats(); this.namedWriteableRegistry = namedWriteableRegistry; this.indexMetadataUploadListeners = indexMetadataUploadListeners; - this.isPublicationEnabled = clusterSettings.get(REMOTE_PUBLICATION_SETTING) - && RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings) - && RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings); + this.isPublicationEnabled = new AtomicBoolean( + clusterSettings.get(REMOTE_PUBLICATION_SETTING) + && RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings) + && RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings) + ); clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting); this.remotePathPrefix = CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.get(settings); this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService( @@ -306,19 +309,20 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat return null; } + boolean publicationEnabled = isPublicationEnabled.get(); UploadedMetadataResults uploadedMetadataResults = writeMetadataInParallel( clusterState, new ArrayList<>(clusterState.metadata().indices().values()), emptyMap(), - RemoteGlobalMetadataManager.filterCustoms(clusterState.metadata().customs(), isPublicationEnabled), + RemoteGlobalMetadataManager.filterCustoms(clusterState.metadata().customs(), publicationEnabled), true, true, true, - isPublicationEnabled, - isPublicationEnabled, - isPublicationEnabled, - isPublicationEnabled ? clusterState.customs() : Collections.emptyMap(), - isPublicationEnabled, + publicationEnabled, + publicationEnabled, + publicationEnabled, + publicationEnabled ? clusterState.customs() : Collections.emptyMap(), + publicationEnabled, remoteRoutingTableService.getIndicesRouting(clusterState.getRoutingTable()), null ); @@ -397,9 +401,9 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( boolean firstUploadForSplitGlobalMetadata = !previousManifest.hasMetadataAttributesFiles(); final DiffableUtils.MapDiff> customsDiff = remoteGlobalMetadataManager - .getCustomsDiff(clusterState, previousClusterState, firstUploadForSplitGlobalMetadata, isPublicationEnabled); + .getCustomsDiff(clusterState, previousClusterState, firstUploadForSplitGlobalMetadata, isPublicationEnabled.get()); final DiffableUtils.MapDiff> clusterStateCustomsDiff = - remoteClusterStateAttributesManager.getUpdatedCustoms(clusterState, previousClusterState, isPublicationEnabled, false); + remoteClusterStateAttributesManager.getUpdatedCustoms(clusterState, previousClusterState, isPublicationEnabled.get(), false); final Map allUploadedCustomMap = new HashMap<>(previousManifest.getCustomMetadataMap()); final Map allUploadedClusterStateCustomsMap = new HashMap<>( previousManifest.getClusterStateCustomMap() @@ -464,10 +468,10 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( boolean updateTemplatesMetadata = firstUploadForSplitGlobalMetadata || Metadata.isTemplatesMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false; - final boolean updateDiscoveryNodes = isPublicationEnabled + final boolean updateDiscoveryNodes = isPublicationEnabled.get() && clusterState.getNodes().delta(previousClusterState.getNodes()).hasChanges(); - final boolean updateClusterBlocks = isPublicationEnabled && !clusterState.blocks().equals(previousClusterState.blocks()); - final boolean updateHashesOfConsistentSettings = isPublicationEnabled + final boolean updateClusterBlocks = isPublicationEnabled.get() && !clusterState.blocks().equals(previousClusterState.blocks()); + final boolean updateHashesOfConsistentSettings = isPublicationEnabled.get() && Metadata.isHashesOfConsistentSettingsEqual(previousClusterState.metadata(), clusterState.metadata()) == false; uploadedMetadataResults = writeMetadataInParallel( @@ -1120,9 +1124,9 @@ private void setChecksumValidationMode(RemoteClusterStateValidationMode remoteCl private void setRemotePublicationSetting(boolean remotePublicationSetting) { if (remotePublicationSetting == false) { - this.isPublicationEnabled = false; + this.isPublicationEnabled.set(false); } else { - this.isPublicationEnabled = isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableEnabled(settings); + this.isPublicationEnabled.set(isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableEnabled(settings)); } } @@ -1841,7 +1845,7 @@ public String getLastKnownUUIDFromRemote(String clusterName) { } public boolean isRemotePublicationEnabled() { - return this.isPublicationEnabled; + return this.isPublicationEnabled.get(); } public void setRemoteStateReadTimeout(TimeValue remoteStateReadTimeout) { diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index a77a94723d966..b5d16e7be849f 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -68,7 +68,6 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.opensearch.gateway.remote.ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; @@ -1268,22 +1267,12 @@ public void testHandleCommitOnFollowerNodeWhenRemotePublicationEnabledWithNullRe verifyNoInteractions(remoteClusterStateService); } - public void testIsRemotePublicationEnabled_WithInconsistentSettings() { - // create settings with remote state disabled but publication enabled - Settings settings = Settings.builder() - .put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), false) - .put(REMOTE_PUBLICATION_SETTING_KEY, true) - .build(); - CoordinationState coordinationState = createCoordinationState(psr1, node1, settings); - assertFalse(coordinationState.isRemotePublicationEnabled()); - } - public static CoordinationState createCoordinationState( PersistedStateRegistry persistedStateRegistry, DiscoveryNode localNode, Settings settings ) { - return new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, settings, null); + return new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, settings); } public static ClusterState clusterState( diff --git a/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java index 1852be6310f05..5ddf614db3334 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/PreVoteCollectorTests.java @@ -302,8 +302,7 @@ public void testPrevotingIndicatesElectionSuccess() { localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, - Settings.EMPTY, - null + Settings.EMPTY ); final long newTerm = randomLongBetween(currentTerm + 1, Long.MAX_VALUE); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java index a2c54dcc88efb..4d18ff95887dd 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/PublicationTests.java @@ -94,13 +94,7 @@ class MockNode { ); PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, initialState)); - coordinationState = new CoordinationState( - localNode, - persistedStateRegistry, - ElectionStrategy.DEFAULT_INSTANCE, - Settings.EMPTY, - null - ); + coordinationState = new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, Settings.EMPTY); } final DiscoveryNode localNode; diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 35a8ae16cacf7..c21f81af479ad 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -370,6 +370,8 @@ public void testWriteFullMetadataSuccessPublicationEnabled() throws IOException // TODO Make the publication flag parameterized publicationEnabled = true; settings = Settings.builder().put(settings).put(REMOTE_PUBLICATION_SETTING_KEY, publicationEnabled).build(); + clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); remoteClusterStateService = new RemoteClusterStateService( "test-node-id", repositoriesServiceSupplier, @@ -388,6 +390,7 @@ public void testWriteFullMetadataSuccessPublicationEnabled() throws IOException ), writableRegistry() ); + assertTrue(remoteClusterStateService.isRemotePublicationEnabled()); final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()) .customs( Map.of( @@ -747,6 +750,8 @@ public void testWriteIncrementalMetadataSuccess() throws IOException { public void testWriteIncrementalMetadataSuccessWhenPublicationEnabled() throws IOException { publicationEnabled = true; settings = Settings.builder().put(settings).put(REMOTE_PUBLICATION_SETTING_KEY, true).build(); + clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); remoteClusterStateService = new RemoteClusterStateService( "test-node-id", repositoriesServiceSupplier, @@ -765,6 +770,7 @@ public void testWriteIncrementalMetadataSuccessWhenPublicationEnabled() throws I ), writableRegistry() ); + assertTrue(remoteClusterStateService.isRemotePublicationEnabled()); final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects(); final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build(); diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java index 2215ecafc16c9..cbe695cbb2136 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/CoordinationStateTestCluster.java @@ -150,7 +150,7 @@ static class ClusterNode { persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, persistedState); this.electionStrategy = electionStrategy; - state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY, null); + state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY); } void reboot() { @@ -189,7 +189,7 @@ void reboot() { localNode.getVersion() ); - state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY, null); + state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY); } void setInitialState(CoordinationMetadata.VotingConfiguration initialConfig, long initialValue) { From 6044b8996559926f7d52791c7d1969c5b284623c Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Wed, 9 Oct 2024 14:21:41 +0530 Subject: [PATCH 3/4] Rename isRemoteRoutingTableEnabled method Signed-off-by: Sooraj Sinha --- .../org/opensearch/cluster/coordination/Coordinator.java | 1 - .../routing/remote/InternalRemoteRoutingTableService.java | 6 +++--- .../routing/remote/RemoteRoutingTableServiceFactory.java | 4 ++-- .../gateway/remote/RemoteClusterStateService.java | 6 +++--- .../node/remotestore/RemoteStoreNodeAttribute.java | 2 +- .../gateway/remote/RemoteClusterStateServiceTests.java | 4 ++-- 6 files changed, 11 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 2ea251decd056..40e60daf4a86c 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -190,7 +190,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final RemoteClusterStateService remoteClusterStateService; private final RemoteStoreNodeService remoteStoreNodeService; private NodeConnectionsService nodeConnectionsService; - private final RemoteClusterStateService remoteClusterStateService; private final ClusterSettings clusterSettings; /** diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java index 220093b428989..ea8f980c14972 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java @@ -49,7 +49,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableConfigured; /** * A Service which provides APIs to upload and download routing table from remote store. @@ -76,7 +76,7 @@ public InternalRemoteRoutingTableService( ThreadPool threadpool, String clusterName ) { - assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled"; + assert isRemoteRoutingTableConfigured(settings) : "Remote routing table is not enabled"; this.repositoriesService = repositoriesService; this.settings = settings; this.threadPool = threadpool; @@ -234,7 +234,7 @@ protected void doClose() throws IOException { @Override protected void doStart() { - assert isRemoteRoutingTableEnabled(settings) == true : "Remote routing table is not enabled"; + assert isRemoteRoutingTableConfigured(settings) == true : "Remote routing table is not enabled"; final String remoteStoreRepo = settings.get( Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY ); diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java index 56dfa03215a64..4fc616f521eab 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java @@ -15,7 +15,7 @@ import java.util.function.Supplier; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableConfigured; /** * Factory to provide impl for RemoteRoutingTableService based on settings. @@ -37,7 +37,7 @@ public static RemoteRoutingTableService getService( ThreadPool threadPool, String clusterName ) { - if (isRemoteRoutingTableEnabled(settings)) { + if (isRemoteRoutingTableConfigured(settings)) { return new InternalRemoteRoutingTableService(repositoriesService, settings, clusterSettings, threadPool, clusterName); } return new NoopRemoteRoutingTableService(); diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index abb5d826315ed..513561dfdeaf5 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -113,7 +113,7 @@ import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; import static org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable.INDEX_ROUTING_METADATA_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateConfigured; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableConfigured; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -279,7 +279,7 @@ public RemoteClusterStateService( this.isPublicationEnabled = new AtomicBoolean( clusterSettings.get(REMOTE_PUBLICATION_SETTING) && RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings) - && RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings) + && RemoteStoreNodeAttribute.isRemoteRoutingTableConfigured(settings) ); clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting); this.remotePathPrefix = CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.get(settings); @@ -1126,7 +1126,7 @@ private void setRemotePublicationSetting(boolean remotePublicationSetting) { if (remotePublicationSetting == false) { this.isPublicationEnabled.set(false); } else { - this.isPublicationEnabled.set(isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableEnabled(settings)); + this.isPublicationEnabled.set(isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableConfigured(settings)); } } diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java index d6a58f8e1d471..d52b37f9a7bd6 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java @@ -202,7 +202,7 @@ private static boolean isRemoteRoutingTableAttributePresent(Settings settings) { .isEmpty() == false; } - public static boolean isRemoteRoutingTableEnabled(Settings settings) { + public static boolean isRemoteRoutingTableConfigured(Settings settings) { return isRemoteRoutingTableAttributePresent(settings); } diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index c21f81af479ad..dffbb9d82545a 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -151,7 +151,7 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; -import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableConfigured; import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -2755,7 +2755,7 @@ public void testRemoteStateUploadStats() throws IOException { } public void testRemoteRoutingTableNotInitializedWhenDisabled() { - if (isRemoteRoutingTableEnabled(settings)) { + if (isRemoteRoutingTableConfigured(settings)) { assertTrue(remoteClusterStateService.getRemoteRoutingTableService() instanceof InternalRemoteRoutingTableService); } else { assertTrue(remoteClusterStateService.getRemoteRoutingTableService() instanceof NoopRemoteRoutingTableService); From 4e9148bb02202468e18d779533fd84577a79a886 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha Date: Mon, 14 Oct 2024 13:47:05 +0530 Subject: [PATCH 4/4] Make isPublicationEnabled volatile Signed-off-by: Sooraj Sinha --- .../opensearch/gateway/remote/RemoteClusterStateService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 513561dfdeaf5..0cd2025b98783 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -235,7 +235,7 @@ public static RemoteClusterStateValidationMode parseString(String mode) { private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged " + "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata " + "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]"; - private AtomicBoolean isPublicationEnabled; + private volatile AtomicBoolean isPublicationEnabled; private final String remotePathPrefix; private final RemoteClusterStateCache remoteClusterStateCache;