From c03b34353d52f4a59db27c9c64738e5520fff6c9 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Wed, 7 Aug 2024 12:30:53 +0530 Subject: [PATCH 1/7] Upload incremental cluster state on master re-election Signed-off-by: Shivansh Arora --- .../remote/RemoteStatePublicationIT.java | 72 +++- .../remotestore/BaseRemoteStoreRestoreIT.java | 25 -- .../RemoteStoreBaseIntegTestCase.java | 17 +- .../coordination/CoordinationState.java | 76 +++- .../PublicationTransportHandler.java | 13 +- .../cluster/coordination/PublishRequest.java | 33 +- .../opensearch/gateway/GatewayMetaState.java | 54 ++- .../remote/RemoteClusterStateService.java | 44 +- .../remotestore/RemoteStoreNodeAttribute.java | 4 + .../coordination/CoordinationStateTests.java | 391 ++++++++++++++++-- .../GatewayMetaStatePersistedStateTests.java | 123 +++++- .../RemoteClusterStateServiceTests.java | 8 +- 12 files changed, 724 insertions(+), 136 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java index 0e6321867a33b..465f629a78aaa 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java @@ -13,10 +13,12 @@ import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.client.Client; +import org.opensearch.cluster.coordination.CoordinationState; +import org.opensearch.cluster.coordination.PersistedStateRegistry; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.FeatureFlags; import org.opensearch.discovery.DiscoveryStats; +import org.opensearch.gateway.GatewayMetaState; import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; @@ -32,9 +34,11 @@ import java.util.Base64; import java.util.Locale; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; +import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.DISCOVERY_NODES; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; @@ -54,21 +58,13 @@ public class RemoteStatePublicationIT extends RemoteStoreBaseIntegTestCase { private static String INDEX_NAME = "test-index"; private boolean isRemoteStateEnabled = true; - private String isRemotePublicationEnabled = "true"; + private boolean isRemotePublicationEnabled = true; @Before public void setup() { asyncUploadMockFsRepo = false; isRemoteStateEnabled = true; - isRemotePublicationEnabled = "true"; - } - - @Override - protected Settings featureFlagSettings() { - return Settings.builder() - .put(super.featureFlagSettings()) - .put(FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL, isRemotePublicationEnabled) - .build(); + isRemotePublicationEnabled = true; } @Override @@ -94,6 +90,7 @@ protected Settings nodeSettings(int nodeOrdinal) { RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.getKey(), RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE ) + .put(REMOTE_PUBLICATION_EXPERIMENTAL, isRemotePublicationEnabled) .build(); } @@ -180,6 +177,59 @@ public void testRemotePublicationDownloadStats() { } + public void testMasterReElectionUsesIncrementalUpload() throws IOException { + prepareCluster(3, 2, INDEX_NAME, 1, 1); + PersistedStateRegistry persistedStateRegistry = internalCluster().getClusterManagerNodeInstance(PersistedStateRegistry.class); + GatewayMetaState.RemotePersistedState remotePersistedState = (GatewayMetaState.RemotePersistedState) persistedStateRegistry + .getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE); + ClusterMetadataManifest manifest = remotePersistedState.getLastAcceptedManifest(); + // force elected master to step down + internalCluster().stopCurrentClusterManagerNode(); + ensureStableCluster(4); + + persistedStateRegistry = internalCluster().getClusterManagerNodeInstance(PersistedStateRegistry.class); + CoordinationState.PersistedState persistedStateAfterElection = persistedStateRegistry.getPersistedState( + PersistedStateRegistry.PersistedStateType.REMOTE + ); + ClusterMetadataManifest manifestAfterElection = persistedStateAfterElection.getLastAcceptedManifest(); + + // coordination metadata is updated, it will be unequal + assertNotEquals(manifest.getCoordinationMetadata(), manifestAfterElection.getCoordinationMetadata()); + // all other attributes are not uploaded again and will be pointing to same files in manifest after new master is elected + assertEquals(manifest.getClusterUUID(), manifestAfterElection.getClusterUUID()); + assertEquals(manifest.getIndices(), manifestAfterElection.getIndices()); + assertEquals(manifest.getSettingsMetadata(), manifestAfterElection.getSettingsMetadata()); + assertEquals(manifest.getTemplatesMetadata(), manifestAfterElection.getTemplatesMetadata()); + assertEquals(manifest.getCustomMetadataMap(), manifestAfterElection.getCustomMetadataMap()); + assertEquals(manifest.getRoutingTableVersion(), manifest.getRoutingTableVersion()); + assertEquals(manifest.getIndicesRouting(), manifestAfterElection.getIndicesRouting()); + } + + public void testVotingConfigAreCommitted() throws ExecutionException, InterruptedException { + prepareCluster(3, 2, INDEX_NAME, 1, 2); + ensureStableCluster(5); + ensureGreen(INDEX_NAME); + // add two new nodes to the cluster, to update the voting config + internalCluster().startClusterManagerOnlyNodes(2, Settings.EMPTY); + ensureStableCluster(7); + + internalCluster().getInstances(PersistedStateRegistry.class).forEach(persistedStateRegistry -> { + CoordinationState.PersistedState localState = persistedStateRegistry.getPersistedState( + PersistedStateRegistry.PersistedStateType.LOCAL + ); + CoordinationState.PersistedState remoteState = persistedStateRegistry.getPersistedState( + PersistedStateRegistry.PersistedStateType.REMOTE + ); + if (remoteState != null) { + assertEquals( + localState.getLastAcceptedState().getLastCommittedConfiguration(), + remoteState.getLastAcceptedState().getLastCommittedConfiguration() + ); + assertEquals(5, remoteState.getLastAcceptedState().getLastCommittedConfiguration().getNodeIds().size()); + } + }); + } + private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) { // assert cluster state stats for data node DiscoveryStats dataNodeDiscoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats(); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java index 280fd13f0fdcf..e8df2c8686610 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java @@ -76,29 +76,4 @@ protected void verifyRestoredData(Map indexStats, String indexName protected void verifyRestoredData(Map indexStats, String indexName) throws Exception { verifyRestoredData(indexStats, indexName, true); } - - public void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) { - prepareCluster(numClusterManagerNodes, numDataOnlyNodes, indices, replicaCount, shardCount, Settings.EMPTY); - } - - public void prepareCluster( - int numClusterManagerNodes, - int numDataOnlyNodes, - String indices, - int replicaCount, - int shardCount, - Settings settings - ) { - prepareCluster(numClusterManagerNodes, numDataOnlyNodes, settings); - for (String index : indices.split(",")) { - createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount)); - ensureYellowAndNoInitializingShards(index); - ensureGreen(index); - } - } - - public void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, Settings settings) { - internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings); - internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings); - } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index ba06bb463e5a8..bcb0d54c0a25c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -351,13 +351,7 @@ protected void restore(boolean restoreAllShards, String... indices) { } protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) { - internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes); - internalCluster().startDataOnlyNodes(numDataOnlyNodes); - for (String index : indices.split(",")) { - createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount)); - ensureYellowAndNoInitializingShards(index); - ensureGreen(index); - } + prepareCluster(numClusterManagerNodes, numDataOnlyNodes, indices, replicaCount, shardCount, Settings.EMPTY); } protected void prepareCluster( @@ -368,11 +362,16 @@ protected void prepareCluster( int shardCount, Settings settings ) { - internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings); - internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings); + prepareCluster(numClusterManagerNodes, numDataOnlyNodes, settings); for (String index : indices.split(",")) { createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount)); + ensureYellowAndNoInitializingShards(index); ensureGreen(index); } } + + protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, Settings settings) { + internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings); + internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings); + } } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index c7820c2c9a365..fb8d300af72b4 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -41,6 +41,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.gateway.remote.ClusterMetadataManifest; import java.io.Closeable; import java.io.IOException; @@ -460,6 +461,10 @@ public PublishResponse handlePublishRequest(PublishRequest publishRequest) { clusterState.term() ); persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setLastAcceptedState(clusterState); + + if (shouldUpdateRemotePersistedState(publishRequest)) { + updateRemotePersistedStateOnPublishRequest(publishRequest); + } assert getLastAcceptedState() == clusterState; return new PublishResponse(clusterState.term(), clusterState.version()); @@ -572,6 +577,9 @@ public void handleCommit(ApplyCommitRequest applyCommit) { ); persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).markLastAcceptedStateAsCommitted(); + if (shouldCommitRemotePersistedState()) { + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).markLastAcceptedStateAsCommitted(); + } assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration()); } @@ -617,6 +625,37 @@ public void close() throws IOException { IOUtils.close(persistedStateRegistry); } + private boolean shouldUpdateRemotePersistedState(PublishRequest publishRequest) { + return isRemotePublicationEnabled + && localNode.isClusterManagerNode() + && publishRequest.getAcceptedState().getNodes().isLocalNodeElectedClusterManager() == false; + } + + private void updateRemotePersistedStateOnPublishRequest(PublishRequest publishRequest) { + if (publishRequest.hasManifest()) { + assert persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null; + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(publishRequest.getAcceptedState()); + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) + .setLastAcceptedManifest(publishRequest.getAcceptedManifest().get()); + } else { + // We will end up here if PublishRequest was sent not using Remote Store even with remotePublication enabled on this node + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(null); + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedManifest(null); + } + } + + private boolean shouldCommitRemotePersistedState() { + return isRemotePublicationEnabled + && localNode.isClusterManagerNode() + && persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL) + .getLastAcceptedState() + .getNodes() + .isLocalNodeElectedClusterManager() == false + && persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null + && persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState() != null + && persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest() != null; + } + /** * Pluggable persistence layer for {@link CoordinationState}. * @@ -654,6 +693,22 @@ public interface PersistedState extends Closeable { */ PersistedStateStats getStats(); + /** + * Returns the last accepted {@link ClusterMetadataManifest}. + * + * @return The last accepted {@link ClusterMetadataManifest}, or null if no manifest + * has been accepted yet. + */ + default ClusterMetadataManifest getLastAcceptedManifest() { + // return null by default, this method needs to be overridden wherever required + return null; + } + + /** + * Sets the last accepted {@link ClusterMetadataManifest}. + */ + default void setLastAcceptedManifest(ClusterMetadataManifest manifest) {} + /** * Marks the last accepted cluster state as committed. * After a successful call to this method, {@link #getLastAcceptedState()} should return the last cluster state that was set, @@ -662,14 +717,7 @@ public interface PersistedState extends Closeable { */ default void markLastAcceptedStateAsCommitted() { final ClusterState lastAcceptedState = getLastAcceptedState(); - Metadata.Builder metadataBuilder = null; - if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) { - final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata()) - .lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration()) - .build(); - metadataBuilder = Metadata.builder(lastAcceptedState.metadata()); - metadataBuilder.coordinationMetadata(coordinationMetadata); - } + Metadata.Builder metadataBuilder = commitVotingConfiguration(lastAcceptedState); // if we receive a commit from a Zen1 cluster-manager that has not recovered its state yet, // the cluster uuid might not been known yet. assert lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false @@ -694,6 +742,18 @@ default void markLastAcceptedStateAsCommitted() { } } + default Metadata.Builder commitVotingConfiguration(ClusterState lastAcceptedState) { + Metadata.Builder metadataBuilder = null; + if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) { + final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata()) + .lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration()) + .build(); + metadataBuilder = Metadata.builder(lastAcceptedState.metadata()); + metadataBuilder.coordinationMetadata(coordinationMetadata); + } + return metadataBuilder; + } + default void close() throws IOException {} } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index ca36011b3a0e9..5f326a94ac6d9 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -199,7 +199,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque } fullClusterStateReceivedCount.incrementAndGet(); logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length()); - final PublishWithJoinResponse response = acceptState(incomingState); + final PublishWithJoinResponse response = acceptState(incomingState, null); lastSeenClusterState.set(incomingState); return response; } else { @@ -230,7 +230,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque incomingState.stateUUID(), request.bytes().length() ); - final PublishWithJoinResponse response = acceptState(incomingState); + final PublishWithJoinResponse response = acceptState(incomingState, null); lastSeenClusterState.compareAndSet(lastSeen, incomingState); return response; } @@ -281,7 +281,7 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest true ); fullClusterStateReceivedCount.incrementAndGet(); - final PublishWithJoinResponse response = acceptState(clusterState); + final PublishWithJoinResponse response = acceptState(clusterState, manifest); lastSeenClusterState.set(clusterState); return response; } else { @@ -300,7 +300,7 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest transportService.getLocalNode().getId() ); compatibleClusterStateDiffReceivedCount.incrementAndGet(); - final PublishWithJoinResponse response = acceptState(clusterState); + final PublishWithJoinResponse response = acceptState(clusterState, manifest); lastSeenClusterState.compareAndSet(lastSeen, clusterState); return response; } @@ -314,7 +314,7 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest } } - private PublishWithJoinResponse acceptState(ClusterState incomingState) { + private PublishWithJoinResponse acceptState(ClusterState incomingState, ClusterMetadataManifest manifest) { // if the state is coming from the current node, use original request instead (see currentPublishRequestToSelf for explanation) if (transportService.getLocalNode().equals(incomingState.nodes().getClusterManagerNode())) { final PublishRequest publishRequest = currentPublishRequestToSelf.get(); @@ -324,6 +324,9 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) { return handlePublishRequest.apply(publishRequest); } } + if (manifest != null) { + return handlePublishRequest.apply(new PublishRequest(incomingState, manifest)); + } return handlePublishRequest.apply(new PublishRequest(incomingState)); } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublishRequest.java b/server/src/main/java/org/opensearch/cluster/coordination/PublishRequest.java index e7c3e2d2c965b..5be777f7fec2d 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublishRequest.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublishRequest.java @@ -32,8 +32,10 @@ package org.opensearch.cluster.coordination; import org.opensearch.cluster.ClusterState; +import org.opensearch.gateway.remote.ClusterMetadataManifest; import java.util.Objects; +import java.util.Optional; /** * Request which is used by the cluster-manager node to publish cluster state changes. @@ -44,15 +46,30 @@ public class PublishRequest { private final ClusterState acceptedState; + private final ClusterMetadataManifest acceptedManifest; public PublishRequest(ClusterState acceptedState) { this.acceptedState = acceptedState; + this.acceptedManifest = null; + } + + public PublishRequest(ClusterState acceptedState, ClusterMetadataManifest acceptedManifest) { + this.acceptedState = acceptedState; + this.acceptedManifest = acceptedManifest; } public ClusterState getAcceptedState() { return acceptedState; } + public boolean hasManifest() { + return acceptedManifest != null; + } + + public Optional getAcceptedManifest() { + return Optional.ofNullable(acceptedManifest); + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -60,16 +77,26 @@ public boolean equals(Object o) { PublishRequest that = (PublishRequest) o; - return acceptedState.term() == that.acceptedState.term() && acceptedState.version() == that.acceptedState.version(); + return acceptedState.term() == that.acceptedState.term() + && acceptedState.version() == that.acceptedState.version() + && Objects.equals(acceptedManifest, that.acceptedManifest); } @Override public int hashCode() { - return Objects.hash(acceptedState.term(), acceptedState.version()); + return Objects.hash(acceptedState.term(), acceptedState.version(), acceptedManifest); } @Override public String toString() { - return "PublishRequest{term=" + acceptedState.term() + ", version=" + acceptedState.version() + ", state=" + acceptedState + '}'; + return "PublishRequest{term=" + + acceptedState.term() + + ", version=" + + acceptedState.version() + + ", state=" + + acceptedState + + ", manifest=" + + acceptedManifest + + '}'; } } diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index bd56c9e1757c6..b3836edcd7d6c 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -697,8 +697,18 @@ public String getLastUploadedManifestFile() { return lastUploadedManifestFile; } + @Override + public ClusterMetadataManifest getLastAcceptedManifest() { + return lastAcceptedManifest; + } + @Override public void setLastAcceptedState(ClusterState clusterState) { + // for non leader node, update the lastAcceptedClusterState + if (clusterState == null || clusterState.getNodes().isLocalNodeElectedClusterManager() == false) { + lastAcceptedState = clusterState; + return; + } try { final RemoteClusterStateManifestInfo manifestDetails; // Decide the codec version @@ -735,7 +745,7 @@ assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == } assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(), clusterState) == true : "Manifest and ClusterState are not in sync"; - lastAcceptedManifest = manifestDetails.getClusterMetadataManifest(); + setLastAcceptedManifest(manifestDetails.getClusterMetadataManifest()); lastAcceptedState = clusterState; lastUploadedManifestFile = manifestDetails.getManifestFileName(); } catch (Exception e) { @@ -744,6 +754,11 @@ assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest( } } + @Override + public void setLastAcceptedManifest(ClusterMetadataManifest manifest) { + this.lastAcceptedManifest = manifest; + } + @Override public PersistedStateStats getStats() { return remoteClusterStateService.getUploadStats(); @@ -767,7 +782,7 @@ private boolean shouldWriteFullClusterState(ClusterState clusterState, int codec assert lastAcceptedManifest == null || lastAcceptedManifest.getCodecVersion() <= codecVersion; if (lastAcceptedState == null || lastAcceptedManifest == null - || lastAcceptedState.term() != clusterState.term() + || (remoteClusterStateService.isRemotePublicationEnabled() == false && lastAcceptedState.term() != clusterState.term()) || lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT || lastAcceptedManifest.getCodecVersion() != codecVersion) { return true; @@ -781,19 +796,32 @@ public void markLastAcceptedStateAsCommitted() { assert lastAcceptedState != null : "Last accepted state is not present"; assert lastAcceptedManifest != null : "Last accepted manifest is not present"; ClusterState clusterState = lastAcceptedState; - if (lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false - && lastAcceptedState.metadata().clusterUUIDCommitted() == false) { + boolean shouldCommitVotingConfig = shouldCommitVotingConfig(); + boolean isClusterUUIDUnknown = lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID); + boolean isClusterUUIDCommitted = lastAcceptedState.metadata().clusterUUIDCommitted(); + if (shouldCommitVotingConfig || (isClusterUUIDUnknown == false && isClusterUUIDCommitted == false)) { Metadata.Builder metadataBuilder = Metadata.builder(lastAcceptedState.metadata()); - metadataBuilder.clusterUUIDCommitted(true); + if (shouldCommitVotingConfig) { + metadataBuilder = commitVotingConfiguration(lastAcceptedState); + } + if (isClusterUUIDUnknown == false && isClusterUUIDCommitted == false) { + metadataBuilder.clusterUUIDCommitted(true); + } clusterState = ClusterState.builder(lastAcceptedState).metadata(metadataBuilder).build(); } - final RemoteClusterStateManifestInfo committedManifestDetails = remoteClusterStateService.markLastStateAsCommitted( - clusterState, - lastAcceptedManifest - ); - lastAcceptedManifest = committedManifestDetails.getClusterMetadataManifest(); + if (clusterState.getNodes().isLocalNodeElectedClusterManager()) { + final RemoteClusterStateManifestInfo committedManifestDetails = remoteClusterStateService.markLastStateAsCommitted( + clusterState, + lastAcceptedManifest, + shouldCommitVotingConfig + ); + assert committedManifestDetails != null; + setLastAcceptedManifest(committedManifestDetails.getClusterMetadataManifest()); + lastUploadedManifestFile = committedManifestDetails.getManifestFileName(); + } else { + setLastAcceptedManifest(ClusterMetadataManifest.builder(lastAcceptedManifest).committed(true).build()); + } lastAcceptedState = clusterState; - lastUploadedManifestFile = committedManifestDetails.getManifestFileName(); } catch (Exception e) { handleExceptionOnWrite(e); } @@ -804,6 +832,10 @@ public void close() throws IOException { remoteClusterStateService.close(); } + private boolean shouldCommitVotingConfig() { + return !lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()); + } + private void handleExceptionOnWrite(Exception e) { throw ExceptionsHelper.convertToRuntime(e); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index a223bfbe736c3..b0c2570d3862d 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -341,12 +341,20 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat * * @return {@link RemoteClusterStateManifestInfo} object containing uploaded manifest detail */ - @Nullable public RemoteClusterStateManifestInfo writeIncrementalMetadata( ClusterState previousClusterState, ClusterState clusterState, ClusterMetadataManifest previousManifest ) throws IOException { + if (previousClusterState == null) { + throw new IllegalArgumentException("previousClusterState cannot be null"); + } + if (clusterState == null) { + throw new IllegalArgumentException("clusterState cannot be null"); + } + if (previousManifest == null) { + throw new IllegalArgumentException("previousManifest cannot be null"); + } logger.trace("WRITING INCREMENTAL STATE"); final long startTimeNanos = relativeTimeNanosSupplier.getAsLong(); @@ -354,7 +362,6 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( logger.error("Local node is not elected cluster manager. Exiting"); return null; } - assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term(); boolean firstUploadForSplitGlobalMetadata = !previousManifest.hasMetadataAttributesFiles(); @@ -924,18 +931,41 @@ public RemoteClusterStateCleanupManager getCleanupManager() { } @Nullable - public RemoteClusterStateManifestInfo markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest) - throws IOException { + public RemoteClusterStateManifestInfo markLastStateAsCommitted( + ClusterState clusterState, + ClusterMetadataManifest previousManifest, + boolean commitVotingConfig + ) throws IOException { assert clusterState != null : "Last accepted cluster state is not set"; if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { logger.error("Local node is not elected cluster manager. Exiting"); return null; } assert previousManifest != null : "Last cluster metadata manifest is not set"; + UploadedMetadataAttribute uploadedCoordinationMetadata = previousManifest.getCoordinationMetadata(); + if (commitVotingConfig) { + // update the coordination metadata if voting config is committed + uploadedCoordinationMetadata = writeMetadataInParallel( + clusterState, + emptyList(), + emptyMap(), + emptyMap(), + true, + false, + false, + false, + false, + false, + emptyMap(), + false, + emptyList(), + emptyMap() + ).uploadedCoordinationMetadata; + } UploadedMetadataResults uploadedMetadataResults = new UploadedMetadataResults( previousManifest.getIndices(), previousManifest.getCustomMetadataMap(), - previousManifest.getCoordinationMetadata(), + uploadedCoordinationMetadata, previousManifest.getSettingsMetadata(), previousManifest.getTemplatesMetadata(), previousManifest.getTransientSettingsMetadata(), @@ -1737,6 +1767,10 @@ public String getLastKnownUUIDFromRemote(String clusterName) { } } + public boolean isRemotePublicationEnabled() { + return this.isPublicationEnabled; + } + public void setRemoteStateReadTimeout(TimeValue remoteStateReadTimeout) { this.remoteStateReadTimeout = remoteStateReadTimeout; } diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java index 55971398634c5..211820d2be093 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java @@ -207,6 +207,10 @@ public static boolean isRemoteRoutingTableEnabled(Settings settings) { return isRemoteRoutingTableAttributePresent(settings); } + public static boolean isRemotePublicationEnabled(Settings settings) { + return isRemoteRoutingTableEnabled(settings); + } + public RepositoriesMetadata getRepositoriesMetadata() { return this.repositoriesMetadata; } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index ee9a2951ec541..6a0f952a66bf4 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -51,12 +51,14 @@ import org.opensearch.gateway.remote.model.RemoteClusterStateManifestInfo; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.EqualsHashCodeTestUtils; +import org.opensearch.test.FeatureFlagSetter; import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; import java.io.IOException; import java.util.Collections; import java.util.Locale; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -67,25 +69,34 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; +import static org.opensearch.common.util.FeatureFlags.initializeFeatureFlags; import static org.opensearch.gateway.remote.ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; public class CoordinationStateTests extends OpenSearchTestCase { private DiscoveryNode node1; private DiscoveryNode node2; private DiscoveryNode node3; + private DiscoveryNode nodeWithPub; private ClusterState initialStateNode1; + private ClusterState initialStateNode2; private PersistedState ps1; private PersistedStateRegistry psr1; @@ -99,16 +110,18 @@ public void setupNodes() { node1 = createNode("node1"); node2 = createNode("node2"); node3 = createNode("node3"); + nodeWithPub = createNode( + "nodeWithPub", + Map.of( + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, + "", + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, + "" + ) + ); initialStateNode1 = clusterState(0L, 0L, node1, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 42L); - ClusterState initialStateNode2 = clusterState( - 0L, - 0L, - node2, - VotingConfiguration.EMPTY_CONFIG, - VotingConfiguration.EMPTY_CONFIG, - 42L - ); + initialStateNode2 = clusterState(0L, 0L, node2, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 42L); ClusterState initialStateNode3 = clusterState( 0L, 0L, @@ -128,6 +141,10 @@ public void setupNodes() { } public static DiscoveryNode createNode(String id) { + return createNode(id, Collections.emptyMap()); + } + + public static DiscoveryNode createNode(String id, Map attributes) { final TransportAddress address = buildNewFakeTransportAddress(); return new DiscoveryNode( "", @@ -136,7 +153,7 @@ public static DiscoveryNode createNode(String id) { address.address().getHostString(), address.getAddress(), address, - Collections.emptyMap(), + attributes, DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT ); @@ -913,7 +930,7 @@ public void testSafety() { public void testHandlePrePublishAndCommitWhenRemoteStateDisabled() { final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, ps1); - final PersistedStateRegistry persistedStateRegistrySpy = Mockito.spy(persistedStateRegistry); + final PersistedStateRegistry persistedStateRegistrySpy = spy(persistedStateRegistry); final CoordinationState coordinationState = createCoordinationState(persistedStateRegistrySpy, node1, Settings.EMPTY); final VotingConfiguration initialConfig = VotingConfiguration.of(node1); final ClusterState clusterState = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L); @@ -932,7 +949,7 @@ public void testHandlePrePublishAndCommitWhenRemoteStateDisabled() { public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOException { final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); final VotingConfiguration initialConfig = VotingConfiguration.of(node1); - final ClusterState clusterState = clusterState(0L, 0L, node1, initialConfig, initialConfig, 42L); + final ClusterState clusterState = clusterStateWithClusterManager(0L, 0L, node1, node1, initialConfig, initialConfig, 42L); final String previousClusterUUID = "prev-cluster-uuid"; final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder() .clusterTerm(0L) @@ -948,8 +965,9 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep .previousClusterUUID(randomAlphaOfLength(10)) .clusterUUIDCommitted(true) .build(); - Mockito.when(remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION)) - .thenReturn(new RemoteClusterStateManifestInfo(manifest, "path/to/manifest")); + when(remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION)).thenReturn( + new RemoteClusterStateManifestInfo(manifest, "path/to/manifest") + ); final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, ps1); @@ -958,40 +976,21 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep new RemotePersistedState(remoteClusterStateService, previousClusterUUID) ); - String randomRepoName = "randomRepoName"; - String stateRepoTypeAttributeKey = String.format( - Locale.getDefault(), - "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, - randomRepoName - ); - String stateRepoSettingsAttributeKeyPrefix = String.format( - Locale.getDefault(), - "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, - randomRepoName - ); - - Settings settings = Settings.builder() - .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, randomRepoName) - .put(stateRepoTypeAttributeKey, FsRepository.TYPE) - .put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath") - .put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) - .build(); - - final CoordinationState coordinationState = createCoordinationState(persistedStateRegistry, node1, settings); + final CoordinationState coordinationState = createCoordinationState(persistedStateRegistry, node1, remoteStateSettings()); coordinationState.handlePrePublish(clusterState); Mockito.verify(remoteClusterStateService, Mockito.times(1)) .writeFullMetadata(clusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION); assertThat(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState(), equalTo(clusterState)); - Mockito.when(remoteClusterStateService.markLastStateAsCommitted(any(), any())) - .thenReturn(new RemoteClusterStateManifestInfo(manifest, "path/to/manifest")); + when(remoteClusterStateService.markLastStateAsCommitted(any(), any(), eq(false))).thenReturn( + new RemoteClusterStateManifestInfo(manifest, "path/to/manifest") + ); coordinationState.handlePreCommit(); ClusterState committedClusterState = ClusterState.builder(clusterState) .metadata(Metadata.builder(clusterState.metadata()).clusterUUIDCommitted(true).build()) .build(); - // Mockito.verify(remoteClusterStateService, Mockito.times(1)).markLastStateAsCommitted(committedClusterState, manifest); ArgumentCaptor clusterStateCaptor = ArgumentCaptor.forClass(ClusterState.class); - verify(remoteClusterStateService, times(1)).markLastStateAsCommitted(clusterStateCaptor.capture(), any()); + verify(remoteClusterStateService, times(1)).markLastStateAsCommitted(clusterStateCaptor.capture(), any(), eq(false)); assertThat(clusterStateCaptor.getValue().metadata().indices(), equalTo(committedClusterState.metadata().indices())); assertThat(clusterStateCaptor.getValue().metadata().clusterUUID(), equalTo(committedClusterState.metadata().clusterUUID())); assertThat(clusterStateCaptor.getValue().stateUUID(), equalTo(committedClusterState.stateUUID())); @@ -1006,6 +1005,271 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep ); } + public void testHandlePublishRequestOnFollowerWhenRemotePublicationEnabled() { + final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); + // cluster manager is node1 and node2 is a follower node + VotingConfiguration initialConfig = VotingConfiguration.of(node1); + ClusterState state1 = clusterState( + 0L, + 0L, + DiscoveryNodes.builder() + .add(node1) + .add(nodeWithPub) + .clusterManagerNodeId(node1.getId()) + .localNodeId(nodeWithPub.getId()) + .build(), + initialConfig, + initialConfig, + 42L + ); + final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); + persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, initialStateNode2)); + persistedStateRegistry.addPersistedState( + PersistedStateType.REMOTE, + new RemotePersistedState(remoteClusterStateService, state1.metadata().clusterUUID()) + ); + + final CoordinationState coordinationState = createCoordinationState( + persistedStateRegistry, + nodeWithPub, + remotePublicationSettings() + ); + coordinationState.setInitialState(state1); + long newTerm = randomLongBetween(1, 10); + StartJoinRequest startJoinRequest = new StartJoinRequest(nodeWithPub, newTerm); + + coordinationState.handleStartJoin(startJoinRequest); + + ClusterState state2 = setValue( + ClusterState.builder(state1) + .metadata( + Metadata.builder(state1.metadata()) + .coordinationMetadata(CoordinationMetadata.builder(state1.coordinationMetadata()).term(newTerm).build()) + .build() + ) + .version(randomLongBetween(1, 10)) + .build(), + 43L + ); + final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder() + .clusterTerm(1L) + .stateVersion(state2.version()) + .clusterUUID(state2.metadata().clusterUUID()) + .nodeId(node1.getId()) + .stateUUID(randomAlphaOfLength(10)) + .opensearchVersion(Version.CURRENT) + .committed(false) + .codecVersion(1) + .globalMetadataFileName(randomAlphaOfLength(10)) + .indices(Collections.emptyList()) + .previousClusterUUID(randomAlphaOfLength(10)) + .clusterUUIDCommitted(true) + .build(); + + PublishResponse publishResponse = coordinationState.handlePublishRequest(new PublishRequest(state2, manifest)); + assertEquals(state2.term(), publishResponse.getTerm()); + assertEquals(state2.version(), publishResponse.getVersion()); + verifyNoInteractions(remoteClusterStateService); + assertEquals(state2, persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState()); + assertEquals(manifest, persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest()); + } + + public void testHandleCommitOnFollowerNodeWhenRemotePublicationEnabled() { + RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); + VotingConfiguration initialConfig = VotingConfiguration.of(node1, nodeWithPub); + ClusterState state1 = clusterState( + 0L, + 0L, + DiscoveryNodes.builder() + .add(node1) + .add(nodeWithPub) + .clusterManagerNodeId(node1.getId()) + .localNodeId(nodeWithPub.getId()) + .build(), + initialConfig, + initialConfig, + 42L + ); + final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); + persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, initialStateNode2)); + persistedStateRegistry.addPersistedState( + PersistedStateType.REMOTE, + new RemotePersistedState(remoteClusterStateService, state1.metadata().clusterUUID()) + ); + + final CoordinationState coordinationState = createCoordinationState( + persistedStateRegistry, + nodeWithPub, + remotePublicationSettings() + ); + coordinationState.setInitialState(state1); + long newTerm = randomLongBetween(1, 10); + StartJoinRequest startJoinRequest = new StartJoinRequest(nodeWithPub, newTerm); + + Join v1 = cs1.handleStartJoin(startJoinRequest); + Join v2 = coordinationState.handleStartJoin(startJoinRequest); + assertTrue(coordinationState.handleJoin(v1)); + assertTrue(coordinationState.handleJoin(v2)); + assertTrue(coordinationState.electionWon()); + VotingConfiguration newConfig = VotingConfiguration.of(node1, nodeWithPub, node3); + ClusterState state2 = clusterState( + startJoinRequest.getTerm(), + 2L, + DiscoveryNodes.builder() + .add(node1) + .add(nodeWithPub) + .clusterManagerNodeId(node1.getId()) + .localNodeId(nodeWithPub.getId()) + .build(), + initialConfig, + newConfig, + 7L + ); + final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder() + .clusterTerm(1L) + .stateVersion(state2.version()) + .clusterUUID(state2.metadata().clusterUUID()) + .nodeId(node1.getId()) + .stateUUID(randomAlphaOfLength(10)) + .opensearchVersion(Version.CURRENT) + .committed(false) + .codecVersion(1) + .globalMetadataFileName(randomAlphaOfLength(10)) + .indices(Collections.emptyList()) + .previousClusterUUID(randomAlphaOfLength(10)) + .clusterUUIDCommitted(true) + .build(); + + PublishRequest publishRequest = coordinationState.handleClientValue(state2); + coordinationState.handlePublishRequest(new PublishRequest(publishRequest.getAcceptedState(), manifest)); + ApplyCommitRequest applyCommitRequest = new ApplyCommitRequest(node2, state2.term(), state2.version()); + coordinationState.handleCommit(applyCommitRequest); + verifyNoInteractions(remoteClusterStateService); + assertTrue( + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState().metadata().clusterUUIDCommitted() + ); + assertTrue(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest().isCommitted()); + assertEquals(coordinationState.getLastCommittedConfiguration(), newConfig); + } + + public void testRemotePersistedStateResetsForPublicationEnabledAfterLocalPublication() { + final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); + // cluster manager is node1 and nodeWithPub is a follower node + VotingConfiguration initialConfig = VotingConfiguration.of(node1); + ClusterState state1 = clusterState( + 0L, + 0L, + DiscoveryNodes.builder() + .add(node1) + .add(nodeWithPub) + .clusterManagerNodeId(node1.getId()) + .localNodeId(nodeWithPub.getId()) + .build(), + initialConfig, + initialConfig, + 42L + ); + final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); + persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, initialStateNode2)); + persistedStateRegistry.addPersistedState( + PersistedStateType.REMOTE, + new RemotePersistedState(remoteClusterStateService, state1.metadata().clusterUUID()) + ); + + final CoordinationState coordinationState = createCoordinationState( + persistedStateRegistry, + nodeWithPub, + remotePublicationSettings() + ); + coordinationState.setInitialState(state1); + long newTerm = randomLongBetween(1, 10); + StartJoinRequest startJoinRequest = new StartJoinRequest(nodeWithPub, newTerm); + + coordinationState.handleStartJoin(startJoinRequest); + + ClusterState state2 = setValue( + ClusterState.builder(state1) + .metadata( + Metadata.builder(state1.metadata()) + .coordinationMetadata(CoordinationMetadata.builder(state1.coordinationMetadata()).term(newTerm).build()) + .build() + ) + .version(randomLongBetween(1, 10)) + .build(), + 43L + ); + + PublishResponse publishResponse = coordinationState.handlePublishRequest(new PublishRequest(state2)); + assertEquals(state2.term(), publishResponse.getTerm()); + assertEquals(state2.version(), publishResponse.getVersion()); + verifyNoInteractions(remoteClusterStateService); + assertNull(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState()); + assertNull(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest()); + } + + public void testHandleCommitOnFollowerNodeWhenRemotePublicationEnabledWithNullRemotePersistedState() { + RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class); + VotingConfiguration initialConfig = VotingConfiguration.of(node1, nodeWithPub); + ClusterState state1 = clusterState( + 0L, + 0L, + DiscoveryNodes.builder() + .add(node1) + .add(nodeWithPub) + .clusterManagerNodeId(node1.getId()) + .localNodeId(nodeWithPub.getId()) + .build(), + initialConfig, + initialConfig, + 42L + ); + final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry(); + persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, initialStateNode2)); + persistedStateRegistry.addPersistedState( + PersistedStateType.REMOTE, + new RemotePersistedState(remoteClusterStateService, state1.metadata().clusterUUID()) + ); + + final CoordinationState coordinationState = createCoordinationState( + persistedStateRegistry, + nodeWithPub, + remotePublicationSettings() + ); + coordinationState.setInitialState(state1); + long newTerm = randomLongBetween(1, 10); + StartJoinRequest startJoinRequest = new StartJoinRequest(nodeWithPub, newTerm); + + Join v1 = cs1.handleStartJoin(startJoinRequest); + Join v2 = coordinationState.handleStartJoin(startJoinRequest); + assertTrue(coordinationState.handleJoin(v1)); + assertTrue(coordinationState.handleJoin(v2)); + assertTrue(coordinationState.electionWon()); + VotingConfiguration newConfig = VotingConfiguration.of(node1, nodeWithPub, node3); + ClusterState state2 = clusterState( + startJoinRequest.getTerm(), + 2L, + DiscoveryNodes.builder() + .add(node1) + .add(nodeWithPub) + .clusterManagerNodeId(node1.getId()) + .localNodeId(nodeWithPub.getId()) + .build(), + initialConfig, + newConfig, + 7L + ); + + PublishRequest publishRequest = coordinationState.handleClientValue(state2); + coordinationState.handlePublishRequest(new PublishRequest(publishRequest.getAcceptedState())); + assertNull(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState()); + assertNull(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest()); + ApplyCommitRequest applyCommitRequest = new ApplyCommitRequest(node2, state2.term(), state2.version()); + PersistedState spyRPS = spy(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)); + coordinationState.handleCommit(applyCommitRequest); + verifyNoInteractions(spyRPS); + verifyNoInteractions(remoteClusterStateService); + } + public void testIsRemotePublicationEnabled_WithInconsistentSettings() { // create settings with remote state disabled but publication enabled Settings settings = Settings.builder() @@ -1042,6 +1306,30 @@ public static ClusterState clusterState( ); } + public static ClusterState clusterStateWithClusterManager( + long term, + long version, + DiscoveryNode localNode, + DiscoveryNode clusterManagerNode, + VotingConfiguration lastCommittedConfig, + VotingConfiguration lastAcceptedConfig, + long value + ) { + return clusterState( + term, + version, + DiscoveryNodes.builder() + .add(localNode) + .add(clusterManagerNode) + .localNodeId(localNode.getId()) + .clusterManagerNodeId(clusterManagerNode.getId()) + .build(), + lastCommittedConfig, + lastAcceptedConfig, + value + ); + } + public static ClusterState clusterState( long term, long version, @@ -1090,4 +1378,33 @@ private static PersistedStateRegistry createPersistedStateRegistry(ClusterState persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, clusterState)); return persistedStateRegistry; } + + private static Settings remoteStateSettings() { + String randomRepoName = "randomRepoName"; + String stateRepoTypeAttributeKey = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT, + randomRepoName + ); + String stateRepoSettingsAttributeKeyPrefix = String.format( + Locale.getDefault(), + "node.attr." + REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX, + randomRepoName + ); + + Settings settings = Settings.builder() + .put("node.attr." + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, randomRepoName) + .put(stateRepoTypeAttributeKey, FsRepository.TYPE) + .put(stateRepoSettingsAttributeKeyPrefix + "location", "randomRepoPath") + .put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .build(); + return settings; + } + + private static Settings remotePublicationSettings() { + FeatureFlagSetter.set(REMOTE_PUBLICATION_EXPERIMENTAL); + Settings remoteStateSettings = Settings.builder().put(remoteStateSettings()).put(REMOTE_PUBLICATION_EXPERIMENTAL, true).build(); + initializeFeatureFlags(remoteStateSettings); + return remoteStateSettings; + } } diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index 9972bbfff5d66..5ea5241762753 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -115,6 +115,7 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; @@ -210,11 +211,15 @@ public void testSetCurrentTerm() throws IOException { } private ClusterState createClusterState(long version, Metadata metadata) { - return ClusterState.builder(clusterName) - .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).build()) - .version(version) - .metadata(metadata) - .build(); + return createClusterState(version, metadata, false); + } + + private ClusterState createClusterState(long version, Metadata metadata, boolean isClusterManagerNode) { + DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()); + if (isClusterManagerNode) { + nodesBuilder.clusterManagerNodeId(localNode.getId()); + } + return ClusterState.builder(clusterName).nodes(nodesBuilder.build()).version(version).metadata(metadata).build(); } private ClusterState createClusterStateWithNodes(long version, Metadata metadata) { @@ -225,7 +230,12 @@ private ClusterState createClusterStateWithNodes(long version, Metadata metadata Sets.newHashSet(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), Version.V_2_13_0 ); - DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).add(oldNode).build(); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() + .add(localNode) + .localNodeId(localNode.getId()) + .clusterManagerNodeId(localNode.getId()) + .add(oldNode) + .build(); return ClusterState.builder(clusterName).nodes(discoveryNodes).version(version).metadata(metadata).build(); } @@ -762,7 +772,8 @@ public void testRemotePersistedState() throws IOException { final long clusterTerm = randomNonNegativeLong(); final ClusterState clusterState = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build() + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build(), + true ); remotePersistedState.setLastAcceptedState(clusterState); @@ -773,7 +784,8 @@ public void testRemotePersistedState() throws IOException { final ClusterState secondClusterState = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build() + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build(), + true ); remotePersistedState.setLastAcceptedState(secondClusterState); @@ -783,11 +795,11 @@ public void testRemotePersistedState() throws IOException { assertThat(remotePersistedState.getLastAcceptedState(), equalTo(secondClusterState)); assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm)); - when(remoteClusterStateService.markLastStateAsCommitted(Mockito.any(), Mockito.any())).thenReturn( + when(remoteClusterStateService.markLastStateAsCommitted(Mockito.any(), Mockito.any(), eq(false))).thenReturn( new RemoteClusterStateManifestInfo(manifest, "path/to/manifest") ); remotePersistedState.markLastAcceptedStateAsCommitted(); - Mockito.verify(remoteClusterStateService, times(1)).markLastStateAsCommitted(Mockito.any(), Mockito.any()); + Mockito.verify(remoteClusterStateService, times(1)).markLastStateAsCommitted(Mockito.any(), Mockito.any(), eq(false)); assertThat(remotePersistedState.getLastAcceptedState(), equalTo(secondClusterState)); assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm)); @@ -825,7 +837,8 @@ public void testRemotePersistedStateWithDifferentNodeConfiguration() throws IOEx ClusterState clusterState2 = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(1L).build()).build() + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(1L).build()).build(), + true ); final ClusterMetadataManifest manifest2 = ClusterMetadataManifest.builder() .clusterTerm(1L) @@ -840,7 +853,8 @@ public void testRemotePersistedStateWithDifferentNodeConfiguration() throws IOEx ClusterState clusterState3 = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(1L).build()).build() + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(1L).build()).build(), + true ); Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())) .thenReturn(new RemoteClusterStateManifestInfo(manifest2, "path/to/manifest3")); @@ -849,6 +863,73 @@ public void testRemotePersistedStateWithDifferentNodeConfiguration() throws IOEx } + public void testRemotePersistentState_FollowerNode() throws IOException { + final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); + final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder() + .clusterTerm(1L) + .stateVersion(5L) + .committed(false) + .build(); + final String previousClusterUUID = "prev-cluster-uuid"; + RemotePersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService, previousClusterUUID); + + assertNull(remotePersistedState.getLastAcceptedState()); + assertNull(remotePersistedState.getLastAcceptedManifest()); + assertEquals(0, remotePersistedState.getCurrentTerm()); + + final long clusterTerm = randomNonNegativeLong(); + final ClusterState clusterState = createClusterState( + randomNonNegativeLong(), + Metadata.builder() + .coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()) + .clusterUUIDCommitted(true) + .build(), + false + ); + + remotePersistedState.setLastAcceptedState(clusterState); + remotePersistedState.setLastAcceptedManifest(manifest); + Mockito.verify(remoteClusterStateService, never()) + .writeFullMetadata(clusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION); + + assertEquals(clusterState, remotePersistedState.getLastAcceptedState()); + assertEquals(clusterTerm, remotePersistedState.getCurrentTerm()); + assertEquals(manifest, remotePersistedState.getLastAcceptedManifest()); + + final ClusterState secondClusterState = createClusterState( + randomNonNegativeLong(), + Metadata.builder() + .coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()) + .clusterUUIDCommitted(false) + .build(), + false + ); + + remotePersistedState.setLastAcceptedState(secondClusterState); + Mockito.verify(remoteClusterStateService, never()) + .writeFullMetadata(secondClusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION); + + assertEquals(secondClusterState, remotePersistedState.getLastAcceptedState()); + assertEquals(clusterTerm, remotePersistedState.getCurrentTerm()); + assertFalse(remotePersistedState.getLastAcceptedManifest().isCommitted()); + + remotePersistedState.markLastAcceptedStateAsCommitted(); + Mockito.verify(remoteClusterStateService, never()).markLastStateAsCommitted(Mockito.any(), Mockito.any(), eq(false)); + + assertEquals(secondClusterState, remotePersistedState.getLastAcceptedState()); + assertEquals(clusterTerm, remotePersistedState.getCurrentTerm()); + assertFalse(remotePersistedState.getLastAcceptedState().metadata().clusterUUIDCommitted()); + assertTrue(remotePersistedState.getLastAcceptedManifest().isCommitted()); + + final ClusterState thirdClusterState = ClusterState.builder(secondClusterState) + .metadata(Metadata.builder(secondClusterState.getMetadata()).clusterUUID(randomAlphaOfLength(10)).build()) + .build(); + remotePersistedState.setLastAcceptedState(thirdClusterState); + remotePersistedState.markLastAcceptedStateAsCommitted(); + assertTrue(remotePersistedState.getLastAcceptedState().metadata().clusterUUIDCommitted()); + assertTrue(remotePersistedState.getLastAcceptedManifest().isCommitted()); + } + public void testRemotePersistedStateNotCommitted() throws IOException { final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); final String previousClusterUUID = "prev-cluster-uuid"; @@ -875,7 +956,8 @@ public void testRemotePersistedStateNotCommitted() throws IOException { final long clusterTerm = randomNonNegativeLong(); ClusterState clusterState = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build() + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build(), + true ); clusterState = ClusterState.builder(clusterState) .metadata(Metadata.builder(clusterState.getMetadata()).clusterUUID(randomAlphaOfLength(10)).clusterUUIDCommitted(false).build()) @@ -901,7 +983,8 @@ public void testRemotePersistedStateExceptionOnFullStateUpload() throws IOExcept final long clusterTerm = randomNonNegativeLong(); final ClusterState clusterState = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build() + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build(), + true ); assertThrows(OpenSearchException.class, () -> remotePersistedState.setLastAcceptedState(clusterState)); @@ -924,7 +1007,8 @@ public void testRemotePersistedStateFailureStats() throws IOException { final long clusterTerm = randomNonNegativeLong(); final ClusterState clusterState = createClusterState( randomNonNegativeLong(), - Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build() + Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(clusterTerm).build()).build(), + true ); assertThrows(OpenSearchException.class, () -> remotePersistedState.setLastAcceptedState(clusterState)); @@ -1025,7 +1109,8 @@ public void testGatewayForRemoteStateForNodeReplacement() throws IOException { false ) .clusterUUID(randomAlphaOfLength(10)) - .build() + .build(), + false ); when(remoteClusterStateService.getLastKnownUUIDFromRemote(clusterName.value())).thenReturn( previousState.metadata().clusterUUID() @@ -1071,7 +1156,8 @@ public void testGatewayForRemoteStateForNodeReboot() throws IOException { .coordinationMetadata(CoordinationMetadata.builder().term(randomLong()).build()) .put(indexMetadata, false) .clusterUUID(randomAlphaOfLength(10)) - .build() + .build(), + false ); gateway = newGatewayForRemoteState( remoteClusterStateService, @@ -1117,7 +1203,8 @@ public void testGatewayForRemoteStateForInitialBootstrapBlocksApplied() throws I .put(indexMetadata, false) .clusterUUID(ClusterState.UNKNOWN_UUID) .persistentSettings(Settings.builder().put(Metadata.SETTING_READ_ONLY_SETTING.getKey(), true).build()) - .build() + .build(), + false ) ).nodes(DiscoveryNodes.EMPTY_NODES).build(); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 21b88e5bd66b9..024cf713d6cc2 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -611,20 +611,20 @@ public void testFailWriteIncrementalMetadataNonClusterManagerNode() throws IOExc final RemoteClusterStateManifestInfo manifestDetails = remoteClusterStateService.writeIncrementalMetadata( clusterState, clusterState, - null + ClusterMetadataManifest.builder().build() ); Assert.assertThat(manifestDetails, nullValue()); assertEquals(0, remoteClusterStateService.getUploadStats().getSuccessCount()); } - public void testFailWriteIncrementalMetadataWhenTermChanged() { + public void testFailWriteIncrementalMetadataWhenManifestNull() { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(2L).build(); final ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT) .metadata(Metadata.builder().coordinationMetadata(coordinationMetadata)) .build(); assertThrows( - AssertionError.class, + IllegalArgumentException.class, () -> remoteClusterStateService.writeIncrementalMetadata(previousClusterState, clusterState, null) ); } @@ -2524,7 +2524,7 @@ public void testMarkLastStateAsCommittedSuccess() throws IOException { List indices = List.of(uploadedIndexMetadata); final ClusterMetadataManifest previousManifest = ClusterMetadataManifest.builder().indices(indices).build(); - final ClusterMetadataManifest manifest = remoteClusterStateService.markLastStateAsCommitted(clusterState, previousManifest) + final ClusterMetadataManifest manifest = remoteClusterStateService.markLastStateAsCommitted(clusterState, previousManifest, false) .getClusterMetadataManifest(); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() From bd9c3148e1a02d1c96f923dc1ff7280e892b34d2 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Thu, 5 Sep 2024 00:36:48 +0530 Subject: [PATCH 2/7] Create RemoteStatePublishRequest, simplify if checks in CoordinationState Signed-off-by: Shivansh Arora --- .../coordination/CoordinationState.java | 16 +++--- .../PublicationTransportHandler.java | 6 +-- .../cluster/coordination/PublishRequest.java | 33 ++---------- .../RemoteStatePublishRequest.java | 51 +++++++++++++++++++ .../remote/RemoteClusterStateService.java | 2 +- .../coordination/CoordinationStateTests.java | 4 +- 6 files changed, 66 insertions(+), 46 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/coordination/RemoteStatePublishRequest.java diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index fb8d300af72b4..b0ac4f73b4476 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -106,6 +106,7 @@ public CoordinationState( .getLastAcceptedConfiguration(); this.publishVotes = new VoteCollection(); this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings); + // ToDo: revisit this check while making the setting dynamic this.isRemotePublicationEnabled = isRemoteStateEnabled && FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) && localNode.isRemoteStatePublicationEnabled(); @@ -461,7 +462,6 @@ public PublishResponse handlePublishRequest(PublishRequest publishRequest) { clusterState.term() ); persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setLastAcceptedState(clusterState); - if (shouldUpdateRemotePersistedState(publishRequest)) { updateRemotePersistedStateOnPublishRequest(publishRequest); } @@ -626,32 +626,28 @@ public void close() throws IOException { } private boolean shouldUpdateRemotePersistedState(PublishRequest publishRequest) { - return isRemotePublicationEnabled - && localNode.isClusterManagerNode() + return persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null && publishRequest.getAcceptedState().getNodes().isLocalNodeElectedClusterManager() == false; } private void updateRemotePersistedStateOnPublishRequest(PublishRequest publishRequest) { - if (publishRequest.hasManifest()) { - assert persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null; + if (publishRequest instanceof RemoteStatePublishRequest) { persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(publishRequest.getAcceptedState()); persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) - .setLastAcceptedManifest(publishRequest.getAcceptedManifest().get()); + .setLastAcceptedManifest(((RemoteStatePublishRequest) publishRequest).getAcceptedManifest()); } else { - // We will end up here if PublishRequest was sent not using Remote Store even with remotePublication enabled on this node + // We will end up here if PublishRequest was sent not using Remote Store even with remote persisted state on this node persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(null); persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedManifest(null); } } private boolean shouldCommitRemotePersistedState() { - return isRemotePublicationEnabled - && localNode.isClusterManagerNode() + return persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null && persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL) .getLastAcceptedState() .getNodes() .isLocalNodeElectedClusterManager() == false - && persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null && persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState() != null && persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest() != null; } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index 5f326a94ac6d9..cdf331b7bb577 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -325,7 +325,7 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState, ClusterM } } if (manifest != null) { - return handlePublishRequest.apply(new PublishRequest(incomingState, manifest)); + return handlePublishRequest.apply(new RemoteStatePublishRequest(incomingState, manifest)); } return handlePublishRequest.apply(new PublishRequest(incomingState)); } @@ -542,7 +542,7 @@ public String executor() { } public void sendClusterState(DiscoveryNode destination, ActionListener listener) { - logger.info("sending cluster state over transport to node: {}", destination.getName()); + logger.debug("sending cluster state over transport to node: {}", destination.getName()); if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) { logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination); sendFullClusterState(destination, listener); @@ -642,7 +642,7 @@ public class RemotePublicationContext extends PublicationContext { @Override public void sendClusterState(final DiscoveryNode destination, final ActionListener listener) { try { - logger.info("sending remote cluster state to node: {}", destination.getName()); + logger.debug("sending remote cluster state to node: {}", destination.getName()); final String manifestFileName = ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)) .getLastUploadedManifestFile(); final RemotePublishRequest remotePublishRequest = new RemotePublishRequest( diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublishRequest.java b/server/src/main/java/org/opensearch/cluster/coordination/PublishRequest.java index 5be777f7fec2d..e7c3e2d2c965b 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublishRequest.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublishRequest.java @@ -32,10 +32,8 @@ package org.opensearch.cluster.coordination; import org.opensearch.cluster.ClusterState; -import org.opensearch.gateway.remote.ClusterMetadataManifest; import java.util.Objects; -import java.util.Optional; /** * Request which is used by the cluster-manager node to publish cluster state changes. @@ -46,30 +44,15 @@ public class PublishRequest { private final ClusterState acceptedState; - private final ClusterMetadataManifest acceptedManifest; public PublishRequest(ClusterState acceptedState) { this.acceptedState = acceptedState; - this.acceptedManifest = null; - } - - public PublishRequest(ClusterState acceptedState, ClusterMetadataManifest acceptedManifest) { - this.acceptedState = acceptedState; - this.acceptedManifest = acceptedManifest; } public ClusterState getAcceptedState() { return acceptedState; } - public boolean hasManifest() { - return acceptedManifest != null; - } - - public Optional getAcceptedManifest() { - return Optional.ofNullable(acceptedManifest); - } - @Override public boolean equals(Object o) { if (this == o) return true; @@ -77,26 +60,16 @@ public boolean equals(Object o) { PublishRequest that = (PublishRequest) o; - return acceptedState.term() == that.acceptedState.term() - && acceptedState.version() == that.acceptedState.version() - && Objects.equals(acceptedManifest, that.acceptedManifest); + return acceptedState.term() == that.acceptedState.term() && acceptedState.version() == that.acceptedState.version(); } @Override public int hashCode() { - return Objects.hash(acceptedState.term(), acceptedState.version(), acceptedManifest); + return Objects.hash(acceptedState.term(), acceptedState.version()); } @Override public String toString() { - return "PublishRequest{term=" - + acceptedState.term() - + ", version=" - + acceptedState.version() - + ", state=" - + acceptedState - + ", manifest=" - + acceptedManifest - + '}'; + return "PublishRequest{term=" + acceptedState.term() + ", version=" + acceptedState.version() + ", state=" + acceptedState + '}'; } } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/RemoteStatePublishRequest.java b/server/src/main/java/org/opensearch/cluster/coordination/RemoteStatePublishRequest.java new file mode 100644 index 0000000000000..5667e6d67d062 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/coordination/RemoteStatePublishRequest.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.coordination; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.gateway.remote.ClusterMetadataManifest; + +import java.util.Objects; + +/** + * PublishRequest created by downloading the accepted {@link ClusterState} from Remote Store, using the published {@link ClusterMetadataManifest} + * + * @opensearch.internal + */ +public class RemoteStatePublishRequest extends PublishRequest { + private final ClusterMetadataManifest manifest; + + public RemoteStatePublishRequest(ClusterState acceptedState, ClusterMetadataManifest acceptedManifest) { + super(acceptedState); + this.manifest = acceptedManifest; + } + + public ClusterMetadataManifest getAcceptedManifest() { + return manifest; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + RemoteStatePublishRequest that = (RemoteStatePublishRequest) o; + return Objects.equals(manifest, that.manifest); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), manifest); + } + + @Override + public String toString() { + return "RemoteStatePublishRequest{" + super.toString() + "manifest=" + manifest + "} "; + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index b0c2570d3862d..a57928fd6cb16 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -959,7 +959,7 @@ public RemoteClusterStateManifestInfo markLastStateAsCommitted( emptyMap(), false, emptyList(), - emptyMap() + null ).uploadedCoordinationMetadata; } UploadedMetadataResults uploadedMetadataResults = new UploadedMetadataResults( diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index 6a0f952a66bf4..85c0cec575dc7 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -1066,7 +1066,7 @@ public void testHandlePublishRequestOnFollowerWhenRemotePublicationEnabled() { .clusterUUIDCommitted(true) .build(); - PublishResponse publishResponse = coordinationState.handlePublishRequest(new PublishRequest(state2, manifest)); + PublishResponse publishResponse = coordinationState.handlePublishRequest(new RemoteStatePublishRequest(state2, manifest)); assertEquals(state2.term(), publishResponse.getTerm()); assertEquals(state2.version(), publishResponse.getVersion()); verifyNoInteractions(remoteClusterStateService); @@ -1141,7 +1141,7 @@ public void testHandleCommitOnFollowerNodeWhenRemotePublicationEnabled() { .build(); PublishRequest publishRequest = coordinationState.handleClientValue(state2); - coordinationState.handlePublishRequest(new PublishRequest(publishRequest.getAcceptedState(), manifest)); + coordinationState.handlePublishRequest(new RemoteStatePublishRequest(publishRequest.getAcceptedState(), manifest)); ApplyCommitRequest applyCommitRequest = new ApplyCommitRequest(node2, state2.term(), state2.version()); coordinationState.handleCommit(applyCommitRequest); verifyNoInteractions(remoteClusterStateService); From 48d4159ca0ff0f27e3edee38e8800ff4439baf70 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Thu, 5 Sep 2024 13:02:17 +0530 Subject: [PATCH 3/7] Removed redundant method Signed-off-by: Shivansh Arora --- .../opensearch/node/remotestore/RemoteStoreNodeAttribute.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java index 211820d2be093..55971398634c5 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java @@ -207,10 +207,6 @@ public static boolean isRemoteRoutingTableEnabled(Settings settings) { return isRemoteRoutingTableAttributePresent(settings); } - public static boolean isRemotePublicationEnabled(Settings settings) { - return isRemoteRoutingTableEnabled(settings); - } - public RepositoriesMetadata getRepositoriesMetadata() { return this.repositoriesMetadata; } From 134aa9c6c0600336aeb72e00e6da4c3e14f20639 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Thu, 5 Sep 2024 13:14:12 +0530 Subject: [PATCH 4/7] Add CHANGELOG Signed-off-by: Shivansh Arora --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d1c0c78d6db02..1ccecd42c5af0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -54,6 +54,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Reset DiscoveryNodes in all transport node actions request ([#15131](https://github.com/opensearch-project/OpenSearch/pull/15131)) - Relax the join validation for Remote State publication ([#15471](https://github.com/opensearch-project/OpenSearch/pull/15471)) - MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637)) +- [Remote Publication] Upload incremental cluster state on master re-election ([#15145](https://github.com/opensearch-project/OpenSearch/pull/15145)) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) From f2cf708cd9d51c411a656c79fee26f02e5241eff Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Thu, 5 Sep 2024 13:29:38 +0530 Subject: [PATCH 5/7] spotless apply Signed-off-by: Shivansh Arora --- .../org/opensearch/gateway/remote/RemoteStatePublicationIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java index e7c97e2eef604..914934c0cceec 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java @@ -18,8 +18,8 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.settings.Settings; import org.opensearch.discovery.DiscoveryStats; -import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.gateway.GatewayMetaState; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest; import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore; import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm; From f0df4903cddecac85077c54332905b9afdea260d Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Fri, 6 Sep 2024 10:37:59 +0530 Subject: [PATCH 6/7] Fix Publication setting reference Signed-off-by: Shivansh Arora --- .../org/opensearch/gateway/remote/RemoteStatePublicationIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java index 276f6fff9bf82..faab3645ae894 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java @@ -114,7 +114,7 @@ protected Settings nodeSettings(int nodeOrdinal) { RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.getKey(), RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE ) - .put(REMOTE_PUBLICATION_EXPERIMENTAL, isRemotePublicationEnabled) + .put(REMOTE_PUBLICATION_SETTING_KEY, isRemotePublicationEnabled) .put( RemoteClusterStateService.CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.getKey(), hasRemoteStateCharPrefix ? REMOTE_STATE_PREFIX : "" From 4b4b8b3d08d33c6de7206d2b7fad02a74314d67f Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Fri, 6 Sep 2024 11:23:20 +0530 Subject: [PATCH 7/7] fix build failure Signed-off-by: Shivansh Arora --- .../cluster/coordination/CoordinationStateTests.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index 2d9460c5c6eed..32cb95e0c04f6 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -51,7 +51,6 @@ import org.opensearch.gateway.remote.model.RemoteClusterStateManifestInfo; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.test.EqualsHashCodeTestUtils; -import org.opensearch.test.FeatureFlagSetter; import org.opensearch.test.OpenSearchTestCase; import org.junit.Before; @@ -68,7 +67,6 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; - import static org.opensearch.gateway.remote.ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY; @@ -1402,9 +1400,6 @@ private static Settings remoteStateSettings() { } private static Settings remotePublicationSettings() { - FeatureFlagSetter.set(REMOTE_PUBLICATION_EXPERIMENTAL); - Settings remoteStateSettings = Settings.builder().put(remoteStateSettings()).put(REMOTE_PUBLICATION_EXPERIMENTAL, true).build(); - initializeFeatureFlags(remoteStateSettings); - return remoteStateSettings; + return Settings.builder().put(remoteStateSettings()).put(REMOTE_PUBLICATION_SETTING_KEY, true).build(); } }