From 936cdb9043fb183cabe208fd7f7fc44a44b8f842 Mon Sep 17 00:00:00 2001 From: "Spencer G. Jones" Date: Thu, 24 Oct 2024 11:07:57 -0700 Subject: [PATCH 1/8] Change to correct version since this has been backported (#16472) Signed-off-by: Spencer G. Jones --- .../snapshots/restore/RestoreSnapshotRequest.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequest.java index 42c64e04268e3..84bc87a5cb1ba 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequest.java @@ -166,11 +166,10 @@ public RestoreSnapshotRequest(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_2_17_0)) { sourceRemoteTranslogRepository = in.readOptionalString(); } - // TODO: change to V_2_18_0 once this is backported into that version - if (in.getVersion().onOrAfter(Version.CURRENT)) { + if (in.getVersion().onOrAfter(Version.V_2_18_0)) { renameAliasPattern = in.readOptionalString(); } - if (in.getVersion().onOrAfter(Version.CURRENT)) { + if (in.getVersion().onOrAfter(Version.V_2_18_0)) { renameAliasReplacement = in.readOptionalString(); } } @@ -200,11 +199,10 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_17_0)) { out.writeOptionalString(sourceRemoteTranslogRepository); } - // TODO: change to V_2_18_0 once this is backported into that version - if (out.getVersion().onOrAfter(Version.CURRENT)) { + if (out.getVersion().onOrAfter(Version.V_2_18_0)) { out.writeOptionalString(renameAliasPattern); } - if (out.getVersion().onOrAfter(Version.CURRENT)) { + if (out.getVersion().onOrAfter(Version.V_2_18_0)) { out.writeOptionalString(renameAliasReplacement); } } From 4ad1be3825f8a47ff94fdb3cee63db78c2c142a7 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha <81695996+soosinha@users.noreply.github.com> Date: Fri, 25 Oct 2024 11:08:05 +0530 Subject: [PATCH 2/8] Revert uploading of manifest using min codec version (#16403) Signed-off-by: Sooraj Sinha --- CHANGELOG.md | 1 + .../PublicationTransportHandler.java | 1 - .../opensearch/gateway/GatewayMetaState.java | 14 +-- .../remote/RemoteClusterStateService.java | 12 +-- .../gateway/remote/RemoteManifestManager.java | 5 +- .../coordination/CoordinationStateTests.java | 6 +- .../GatewayMetaStatePersistedStateTests.java | 36 +++---- .../RemoteClusterStateServiceTests.java | 97 ++++++------------- 8 files changed, 57 insertions(+), 115 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e682c1b226f4a..f5dc69a9ec290 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -99,6 +99,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix typo super->sb in method toString() of RemoteStoreNodeAttribute ([#15362](https://github.com/opensearch-project/OpenSearch/pull/15362)) - [Workload Management] Fixing Create/Update QueryGroup TransportActions to execute from non-cluster manager nodes ([16422](https://github.com/opensearch-project/OpenSearch/pull/16422)) - Fix flaky test in `testApproximateRangeWithSizeOverDefault` by adjusting totalHits assertion logic ([#16434](https://github.com/opensearch-project/OpenSearch/pull/16434#pullrequestreview-2386999409)) +- Revert changes to upload remote state manifest using minimum codec version([#16403](https://github.com/opensearch-project/OpenSearch/pull/16403)) ### Security diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index d30efde52bffb..c4cb484cda693 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -367,7 +367,6 @@ public PublicationContext newPublicationContext( } private boolean validateRemotePublicationConfiguredOnAllNodes(DiscoveryNodes discoveryNodes) { - assert ClusterMetadataManifest.getCodecForVersion(discoveryNodes.getMinNodeVersion()) >= ClusterMetadataManifest.CODEC_V0; for (DiscoveryNode node : discoveryNodes.getNodes().values()) { // if a node is non-remote then created local publication context if (node.isRemoteStatePublicationEnabled() == false) { diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index f5da6df2689bd..1e2f5612ca002 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -753,12 +753,8 @@ public void setLastAcceptedState(ClusterState clusterState) { } try { final RemoteClusterStateManifestInfo manifestDetails; - // Decide the codec version - int codecVersion = ClusterMetadataManifest.getCodecForVersion(clusterState.nodes().getMinNodeVersion()); - assert codecVersion >= 0 : codecVersion; - logger.info("codec version is {}", codecVersion); - if (shouldWriteFullClusterState(clusterState, codecVersion)) { + if (shouldWriteFullClusterState(clusterState)) { final Optional latestManifest = remoteClusterStateService.getLatestClusterMetadataManifest( clusterState.getClusterName().value(), clusterState.metadata().clusterUUID() @@ -775,7 +771,7 @@ public void setLastAcceptedState(ClusterState clusterState) { clusterState.metadata().clusterUUID() ); } - manifestDetails = remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID, codecVersion); + manifestDetails = remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID); } else { assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == true : "Previous manifest and previous ClusterState are not in sync"; @@ -820,13 +816,11 @@ private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest, return true; } - private boolean shouldWriteFullClusterState(ClusterState clusterState, int codecVersion) { - assert lastAcceptedManifest == null || lastAcceptedManifest.getCodecVersion() <= codecVersion; + private boolean shouldWriteFullClusterState(ClusterState clusterState) { if (lastAcceptedState == null || lastAcceptedManifest == null || (remoteClusterStateService.isRemotePublicationEnabled() == false && lastAcceptedState.term() != clusterState.term()) - || lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT - || lastAcceptedManifest.getCodecVersion() != codecVersion) { + || lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT) { return true; } return false; diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 0cd2025b98783..dc41189afc3cb 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -301,8 +301,7 @@ public RemoteClusterStateService( * @return A manifest object which contains the details of uploaded entity metadata. */ @Nullable - public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterState, String previousClusterUUID, int codecVersion) - throws IOException { + public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterState, String previousClusterUUID) throws IOException { final long startTimeNanos = relativeTimeNanosSupplier.getAsLong(); if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { logger.error("Local node is not elected cluster manager. Exiting"); @@ -342,8 +341,7 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState, threadpool) : null, - false, - codecVersion + false ); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); @@ -551,8 +549,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState, threadpool) : null, - false, - previousManifest.getCodecVersion() + false ); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); @@ -1024,8 +1021,7 @@ public RemoteClusterStateManifestInfo markLastStateAsCommitted( !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState, threadpool) : null, - true, - previousManifest.getCodecVersion() + true ); if (!previousManifest.isClusterUUIDCommitted() && committedManifestDetails.getClusterMetadataManifest().isClusterUUIDCommitted()) { remoteClusterStateCleanupManager.deleteStaleClusterUUIDs(clusterState, committedManifestDetails.getClusterMetadataManifest()); diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java index b243269fe323e..20e14ff805ca8 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java @@ -100,8 +100,7 @@ RemoteClusterStateManifestInfo uploadManifest( String previousClusterUUID, ClusterStateDiffManifest clusterDiffManifest, ClusterStateChecksum clusterStateChecksum, - boolean committed, - int codecVersion + boolean committed ) { synchronized (this) { ClusterMetadataManifest.Builder manifestBuilder = ClusterMetadataManifest.builder(); @@ -112,7 +111,7 @@ RemoteClusterStateManifestInfo uploadManifest( .opensearchVersion(Version.CURRENT) .nodeId(nodeId) .committed(committed) - .codecVersion(codecVersion) + .codecVersion(ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION) .indices(uploadedMetadataResult.uploadedIndexMetadata) .previousClusterUUID(previousClusterUUID) .clusterUUIDCommitted(clusterState.metadata().clusterUUIDCommitted()) diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java index b5d16e7be849f..f707198efb073 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationStateTests.java @@ -67,7 +67,6 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; -import static org.opensearch.gateway.remote.ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; @@ -962,7 +961,7 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep .previousClusterUUID(randomAlphaOfLength(10)) .clusterUUIDCommitted(true) .build(); - when(remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION)).thenReturn( + when(remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID)).thenReturn( new RemoteClusterStateManifestInfo(manifest, "path/to/manifest") ); @@ -975,8 +974,7 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep final CoordinationState coordinationState = createCoordinationState(persistedStateRegistry, node1, remoteStateSettings()); coordinationState.handlePrePublish(clusterState); - Mockito.verify(remoteClusterStateService, Mockito.times(1)) - .writeFullMetadata(clusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION); + Mockito.verify(remoteClusterStateService, Mockito.times(1)).writeFullMetadata(clusterState, previousClusterUUID); assertThat(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState(), equalTo(clusterState)); when(remoteClusterStateService.markLastStateAsCommitted(any(), any(), eq(false))).thenReturn( diff --git a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java index efdb3076f419c..955ea82e219e8 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -759,7 +759,7 @@ public void testRemotePersistedState() throws IOException { final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder().clusterTerm(1L).stateVersion(5L).build(); final String previousClusterUUID = "prev-cluster-uuid"; - Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any(), eq(MANIFEST_CURRENT_CODEC_VERSION))) + Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any())) .thenReturn(new RemoteClusterStateManifestInfo(manifest, "path/to/manifest")); Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())) @@ -777,7 +777,7 @@ public void testRemotePersistedState() throws IOException { ); remotePersistedState.setLastAcceptedState(clusterState); - Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION); + Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterState, previousClusterUUID); assertThat(remotePersistedState.getLastAcceptedState(), equalTo(clusterState)); assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm)); @@ -789,8 +789,7 @@ public void testRemotePersistedState() throws IOException { ); remotePersistedState.setLastAcceptedState(secondClusterState); - Mockito.verify(remoteClusterStateService, times(1)) - .writeFullMetadata(secondClusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION); + Mockito.verify(remoteClusterStateService, times(1)).writeFullMetadata(secondClusterState, previousClusterUUID); assertThat(remotePersistedState.getLastAcceptedState(), equalTo(secondClusterState)); assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm)); @@ -820,9 +819,9 @@ public void testRemotePersistedStateWithDifferentNodeConfiguration() throws IOEx .clusterTerm(1L) .stateVersion(5L) .codecVersion(CODEC_V1) - .opensearchVersion(Version.CURRENT) + .opensearchVersion(Version.V_2_15_0) .build(); - Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any(), eq(CODEC_V1))) + Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any())) .thenReturn(new RemoteClusterStateManifestInfo(manifest, "path/to/manifest2")); CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService, previousClusterUUID); @@ -833,7 +832,7 @@ public void testRemotePersistedStateWithDifferentNodeConfiguration() throws IOEx ); remotePersistedState.setLastAcceptedState(clusterState1); - Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterState1, previousClusterUUID, CODEC_V1); + Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterState1, previousClusterUUID); ClusterState clusterState2 = createClusterState( randomNonNegativeLong(), @@ -846,10 +845,10 @@ public void testRemotePersistedStateWithDifferentNodeConfiguration() throws IOEx .codecVersion(MANIFEST_CURRENT_CODEC_VERSION) .opensearchVersion(Version.CURRENT) .build(); - Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any(), eq(MANIFEST_CURRENT_CODEC_VERSION))) + Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any())) .thenReturn(new RemoteClusterStateManifestInfo(manifest2, "path/to/manifest")); remotePersistedState.setLastAcceptedState(clusterState2); - Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterState2, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION); + Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterState2, previousClusterUUID); ClusterState clusterState3 = createClusterState( randomNonNegativeLong(), @@ -889,8 +888,7 @@ public void testRemotePersistentState_FollowerNode() throws IOException { remotePersistedState.setLastAcceptedState(clusterState); remotePersistedState.setLastAcceptedManifest(manifest); - Mockito.verify(remoteClusterStateService, never()) - .writeFullMetadata(clusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION); + Mockito.verify(remoteClusterStateService, never()).writeFullMetadata(clusterState, previousClusterUUID); assertEquals(clusterState, remotePersistedState.getLastAcceptedState()); assertEquals(clusterTerm, remotePersistedState.getCurrentTerm()); @@ -906,8 +904,7 @@ public void testRemotePersistentState_FollowerNode() throws IOException { ); remotePersistedState.setLastAcceptedState(secondClusterState); - Mockito.verify(remoteClusterStateService, never()) - .writeFullMetadata(secondClusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION); + Mockito.verify(remoteClusterStateService, never()).writeFullMetadata(secondClusterState, previousClusterUUID); assertEquals(secondClusterState, remotePersistedState.getLastAcceptedState()); assertEquals(clusterTerm, remotePersistedState.getCurrentTerm()); @@ -940,7 +937,7 @@ public void testRemotePersistedStateNotCommitted() throws IOException { .build(); Mockito.when(remoteClusterStateService.getLatestClusterMetadataManifest(Mockito.any(), Mockito.any())) .thenReturn(Optional.of(manifest)); - Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any(), eq(MANIFEST_CURRENT_CODEC_VERSION))) + Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any())) .thenReturn(new RemoteClusterStateManifestInfo(manifest, "path/to/manifest")); Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any())) @@ -966,17 +963,14 @@ public void testRemotePersistedStateNotCommitted() throws IOException { remotePersistedState.setLastAcceptedState(clusterState); ArgumentCaptor previousClusterUUIDCaptor = ArgumentCaptor.forClass(String.class); ArgumentCaptor clusterStateCaptor = ArgumentCaptor.forClass(ClusterState.class); - Mockito.verify(remoteClusterStateService) - .writeFullMetadata(clusterStateCaptor.capture(), previousClusterUUIDCaptor.capture(), eq(MANIFEST_CURRENT_CODEC_VERSION)); + Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterStateCaptor.capture(), previousClusterUUIDCaptor.capture()); assertEquals(previousClusterUUID, previousClusterUUIDCaptor.getValue()); } public void testRemotePersistedStateExceptionOnFullStateUpload() throws IOException { final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); final String previousClusterUUID = "prev-cluster-uuid"; - Mockito.doThrow(IOException.class) - .when(remoteClusterStateService) - .writeFullMetadata(Mockito.any(), Mockito.any(), eq(MANIFEST_CURRENT_CODEC_VERSION)); + Mockito.doThrow(IOException.class).when(remoteClusterStateService).writeFullMetadata(Mockito.any(), Mockito.any()); CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService, previousClusterUUID); @@ -994,9 +988,7 @@ public void testRemotePersistedStateFailureStats() throws IOException { RemoteUploadStats remoteStateStats = new RemoteUploadStats(); final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class); final String previousClusterUUID = "prev-cluster-uuid"; - Mockito.doThrow(IOException.class) - .when(remoteClusterStateService) - .writeFullMetadata(Mockito.any(), Mockito.any(), eq(MANIFEST_CURRENT_CODEC_VERSION)); + Mockito.doThrow(IOException.class).when(remoteClusterStateService).writeFullMetadata(Mockito.any(), Mockito.any()); when(remoteClusterStateService.getUploadStats()).thenReturn(remoteStateStats); doAnswer((i) -> { remoteStateStats.stateFailed(); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index dffbb9d82545a..448b9cc9d78ac 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -280,8 +280,7 @@ public void testFailWriteFullMetadataNonClusterManagerNode() throws IOException final ClusterState clusterState = generateClusterStateWithOneIndex().build(); final RemoteClusterStateManifestInfo manifestDetails = remoteClusterStateService.writeFullMetadata( clusterState, - randomAlphaOfLength(10), - MANIFEST_CURRENT_CODEC_VERSION + randomAlphaOfLength(10) ); Assert.assertThat(manifestDetails, nullValue()); } @@ -327,11 +326,8 @@ public void testWriteFullMetadataSuccess() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects(); remoteClusterStateService.start(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata( - clusterState, - "prev-cluster-uuid", - MANIFEST_CURRENT_CODEC_VERSION - ).getClusterMetadataManifest(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid") + .getClusterMetadataManifest(); final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); List indices = List.of(uploadedIndexMetadata); @@ -401,11 +397,8 @@ public void testWriteFullMetadataSuccessPublicationEnabled() throws IOException .build(); mockBlobStoreObjects(); remoteClusterStateService.start(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata( - clusterState, - "prev-cluster-uuid", - MANIFEST_CURRENT_CODEC_VERSION - ).getClusterMetadataManifest(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid") + .getClusterMetadataManifest(); final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); List indices = List.of(uploadedIndexMetadata); @@ -453,11 +446,8 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException { }).when(container).asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture()); remoteClusterStateService.start(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata( - clusterState, - "prev-cluster-uuid", - MANIFEST_CURRENT_CODEC_VERSION - ).getClusterMetadataManifest(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid") + .getClusterMetadataManifest(); final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); List indices = List.of(uploadedIndexMetadata); @@ -534,7 +524,7 @@ public void run() { remoteClusterStateService.start(); assertThrows( RemoteStateTransferException.class, - () -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10), MANIFEST_CURRENT_CODEC_VERSION) + () -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10)) ); } @@ -578,7 +568,7 @@ public void testTimeoutWhileWritingManifestFile() throws IOException { ).thenReturn(new RemoteClusterStateUtils.UploadedMetadataResults()); RemoteStateTransferException ex = expectThrows( RemoteStateTransferException.class, - () -> spiedService.writeFullMetadata(clusterState, randomAlphaOfLength(10), MANIFEST_CURRENT_CODEC_VERSION) + () -> spiedService.writeFullMetadata(clusterState, randomAlphaOfLength(10)) ); assertTrue(ex.getMessage().contains("Timed out waiting for transfer")); } @@ -600,7 +590,7 @@ public void testWriteFullMetadataInParallelFailureForIndexMetadata() throws IOEx remoteClusterStateService.start(); assertThrows( RemoteStateTransferException.class, - () -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10), MANIFEST_CURRENT_CODEC_VERSION) + () -> remoteClusterStateService.writeFullMetadata(clusterState, randomAlphaOfLength(10)) ); assertEquals(0, remoteClusterStateService.getUploadStats().getSuccessCount()); } @@ -1850,7 +1840,7 @@ private void verifyCodecMigrationManifest(int previousCodec) throws IOException // global metadata is updated assertThat(manifestAfterUpdate.hasMetadataAttributesFiles(), is(true)); // During incremental update, codec version will not change. - assertThat(manifestAfterUpdate.getCodecVersion(), is(previousCodec)); + assertThat(manifestAfterUpdate.getCodecVersion(), is(MANIFEST_CURRENT_CODEC_VERSION)); } public void testWriteIncrementalGlobalMetadataFromCodecV0Success() throws IOException { @@ -1885,7 +1875,7 @@ private void verifyWriteIncrementalGlobalMetadataFromOlderCodecSuccess(ClusterMe ).getClusterMetadataManifest(); final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() - .codecVersion(previousManifest.getCodecVersion()) + .codecVersion(MANIFEST_CURRENT_CODEC_VERSION) .indices(Collections.emptyList()) .clusterTerm(1L) .stateVersion(1L) @@ -2074,11 +2064,8 @@ public void testCustomMetadataDeletedUpdatedAndAdded() throws IOException { // Initial cluster state with index. final ClusterState initialClusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); remoteClusterStateService.start(); - final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata( - initialClusterState, - "_na_", - MANIFEST_CURRENT_CODEC_VERSION - ).getClusterMetadataManifest(); + final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata(initialClusterState, "_na_") + .getClusterMetadataManifest(); ClusterState clusterState1 = ClusterState.builder(initialClusterState) .metadata( @@ -2156,11 +2143,8 @@ public void testIndexMetadataDeletedUpdatedAndAdded() throws IOException { // Initial cluster state with index. final ClusterState initialClusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); remoteClusterStateService.start(); - final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata( - initialClusterState, - "_na_", - MANIFEST_CURRENT_CODEC_VERSION - ).getClusterMetadataManifest(); + final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata(initialClusterState, "_na_") + .getClusterMetadataManifest(); String initialIndex = "test-index"; Index index1 = new Index("test-index-1", "index-uuid-1"); Index index2 = new Index("test-index-2", "index-uuid-2"); @@ -2238,11 +2222,8 @@ private void verifyMetadataAttributeOnlyUpdated( // Initial cluster state with index. final ClusterState initialClusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); remoteClusterStateService.start(); - final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata( - initialClusterState, - "_na_", - MANIFEST_CURRENT_CODEC_VERSION - ).getClusterMetadataManifest(); + final ClusterMetadataManifest initialManifest = remoteClusterStateService.writeFullMetadata(initialClusterState, "_na_") + .getClusterMetadataManifest(); ClusterState newClusterState = clusterStateUpdater.apply(initialClusterState); @@ -2255,11 +2236,8 @@ private void verifyMetadataAttributeOnlyUpdated( initialManifest ).getClusterMetadataManifest(); } else { - manifestAfterMetadataUpdate = remoteClusterStateService.writeFullMetadata( - newClusterState, - initialClusterState.stateUUID(), - MANIFEST_CURRENT_CODEC_VERSION - ).getClusterMetadataManifest(); + manifestAfterMetadataUpdate = remoteClusterStateService.writeFullMetadata(newClusterState, initialClusterState.stateUUID()) + .getClusterMetadataManifest(); } assertions.accept(initialManifest, manifestAfterMetadataUpdate); @@ -2742,11 +2720,8 @@ public void testRemoteStateUploadStats() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); mockBlobStoreObjects(); remoteClusterStateService.start(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata( - clusterState, - "prev-cluster-uuid", - MANIFEST_CURRENT_CODEC_VERSION - ).getClusterMetadataManifest(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid") + .getClusterMetadataManifest(); assertTrue(remoteClusterStateService.getUploadStats() != null); assertEquals(1, remoteClusterStateService.getUploadStats().getSuccessCount()); @@ -2801,11 +2776,8 @@ public void testWriteFullMetadataSuccessWithRoutingTable() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); remoteClusterStateService.start(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata( - clusterState, - "prev-cluster-uuid", - MANIFEST_CURRENT_CODEC_VERSION - ).getClusterMetadataManifest(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid") + .getClusterMetadataManifest(); final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); final UploadedIndexMetadata uploadedIndiceRoutingMetadata = new UploadedIndexMetadata( "test-index", @@ -2854,11 +2826,8 @@ public void testWriteFullMetadataInParallelSuccessWithRoutingTable() throws IOEx when((blobStoreRepository.basePath())).thenReturn(BlobPath.cleanPath().add("base-path")); remoteClusterStateService.start(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata( - clusterState, - "prev-cluster-uuid", - MANIFEST_CURRENT_CODEC_VERSION - ).getClusterMetadataManifest(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid") + .getClusterMetadataManifest(); final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); final UploadedIndexMetadata uploadedIndiceRoutingMetadata = new UploadedIndexMetadata( @@ -3108,11 +3077,8 @@ public void testWriteFullMetadataSuccessWithChecksumValidationEnabled() throws I final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); remoteClusterStateService.start(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata( - clusterState, - "prev-cluster-uuid", - MANIFEST_CURRENT_CODEC_VERSION - ).getClusterMetadataManifest(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid") + .getClusterMetadataManifest(); final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); final UploadedIndexMetadata uploadedIndiceRoutingMetadata = new UploadedIndexMetadata( "test-index", @@ -3152,11 +3118,8 @@ public void testWriteFullMetadataSuccessWithChecksumValidationModeNone() throws final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); remoteClusterStateService.start(); - final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata( - clusterState, - "prev-cluster-uuid", - MANIFEST_CURRENT_CODEC_VERSION - ).getClusterMetadataManifest(); + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState, "prev-cluster-uuid") + .getClusterMetadataManifest(); final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); final UploadedIndexMetadata uploadedIndiceRoutingMetadata = new UploadedIndexMetadata( "test-index", From bb45f0343553944627b18a3fd8fa0b79d0991b69 Mon Sep 17 00:00:00 2001 From: Arpit-Bandejiya Date: Fri, 25 Oct 2024 17:46:59 +0530 Subject: [PATCH 3/8] Add Setting to adjust the primary constraint weights (#16471) Add Setting to adjust the primary constraint weights (#16471) Signed-off-by: Arpit Bandejiya --- CHANGELOG.md | 1 + .../allocation/AllocationConstraints.java | 4 +-- .../routing/allocation/Constraint.java | 13 ++++--- .../routing/allocation/ConstraintTypes.java | 10 ++++++ .../allocation/RebalanceConstraints.java | 4 +-- .../allocator/BalancedShardsAllocator.java | 35 ++++++++++++++++--- .../common/settings/ClusterSettings.java | 1 + .../AllocationConstraintsTests.java | 28 ++++++++------- 8 files changed, 71 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f5dc69a9ec290..d6e76d946866b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - URI path filtering support in cluster stats API ([#15938](https://github.com/opensearch-project/OpenSearch/pull/15938)) - [Star Tree - Search] Add support for metric aggregations with/without term query ([15289](https://github.com/opensearch-project/OpenSearch/pull/15289)) - Add support for restoring from snapshot with search replicas ([#16111](https://github.com/opensearch-project/OpenSearch/pull/16111)) +- Add Setting to adjust the primary constraint weights ([#16471](https://github.com/opensearch-project/OpenSearch/pull/16471)) ### Dependencies - Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578)) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java index 6702db4b43e91..4b71347e4071b 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java @@ -39,8 +39,8 @@ public void updateAllocationConstraint(String constraint, boolean enable) { this.constraints.get(constraint).setEnable(enable); } - public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) { - Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index); + public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index, long primaryThresholdWeight) { + Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index, primaryThresholdWeight); return params.weight(constraints); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java index e9c3c0afcbe88..ce0bb70d7d0b7 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/Constraint.java @@ -14,7 +14,7 @@ import java.util.Map; import java.util.function.Predicate; -import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CONSTRAINT_WEIGHT; +import static org.opensearch.cluster.routing.allocation.ConstraintTypes.predicateKeyToWeightMap; /** * Defines a constraint useful to de-prioritize certain nodes as target of unassigned shards used in {@link AllocationConstraints} or @@ -44,11 +44,13 @@ static class ConstraintParams { private ShardsBalancer balancer; private BalancedShardsAllocator.ModelNode node; private String index; + private long PrimaryConstraintThreshold; - ConstraintParams(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) { + ConstraintParams(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index, long primaryConstraintThreshold) { this.balancer = balancer; this.node = node; this.index = index; + this.PrimaryConstraintThreshold = primaryConstraintThreshold; } public ShardsBalancer getBalancer() { @@ -75,9 +77,12 @@ public String getIndex() { */ public long weight(Map constraints) { long totalConstraintWeight = 0; - for (Constraint constraint : constraints.values()) { + for (Map.Entry entry : constraints.entrySet()) { + String key = entry.getKey(); + Constraint constraint = entry.getValue(); if (constraint.test(this)) { - totalConstraintWeight += CONSTRAINT_WEIGHT; + double weight = predicateKeyToWeightMap(key, PrimaryConstraintThreshold); + totalConstraintWeight += weight; } } return totalConstraintWeight; diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java index 28ad199218884..ff40556a4894b 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java @@ -86,4 +86,14 @@ public static Predicate isPrimaryShardsPerNodeBreac return primaryShardCount >= allowedPrimaryShardCount; }; } + + public static long predicateKeyToWeightMap(String key, long primaryConstraintWeight) { + switch (key) { + case CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID: + case CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID: + return primaryConstraintWeight; + default: + return CONSTRAINT_WEIGHT; + } + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java index 2c2138af18abc..803ef04ce1675 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java @@ -42,8 +42,8 @@ public void updateRebalanceConstraint(String constraint, boolean enable) { this.constraints.get(constraint).setEnable(enable); } - public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index) { - Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index); + public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode node, String index, long primaryConstraintThreshold) { + Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index, primaryConstraintThreshold); return params.weight(constraints); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 785636fa7ff2a..cfbb4d34c3a38 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -139,6 +139,14 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.NodeScope ); + public static final Setting PRIMARY_CONSTRAINT_THRESHOLD_SETTING = Setting.longSetting( + "cluster.routing.allocation.primary_constraint.threshold", + 10, + 0, + Property.Dynamic, + Property.NodeScope + ); + /** * This setting governs whether primary shards balance is desired during allocation. This is used by {@link ConstraintTypes#isPerIndexPrimaryShardsPerNodeBreached()} * and {@link ConstraintTypes#isPrimaryShardsPerNodeBreached} which is used during unassigned shard allocation @@ -201,6 +209,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { private volatile float shardBalanceFactor; private volatile WeightFunction weightFunction; private volatile float threshold; + private volatile long primaryConstraintThreshold; private volatile boolean ignoreThrottleInRestore; private volatile TimeValue allocatorTimeout; @@ -219,6 +228,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting setIgnoreThrottleInRestore(IGNORE_THROTTLE_FOR_REMOTE_RESTORE.get(settings)); updateWeightFunction(); setThreshold(THRESHOLD_SETTING.get(settings)); + setPrimaryConstraintThresholdSetting(PRIMARY_CONSTRAINT_THRESHOLD_SETTING.get(settings)); setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings)); setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings)); setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings)); @@ -231,6 +241,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::updatePreferPrimaryShardBalanceBuffer); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance); clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); + clusterSettings.addSettingsUpdateConsumer(PRIMARY_CONSTRAINT_THRESHOLD_SETTING, this::setPrimaryConstraintThresholdSetting); clusterSettings.addSettingsUpdateConsumer(IGNORE_THROTTLE_FOR_REMOTE_RESTORE, this::setIgnoreThrottleInRestore); clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout); } @@ -294,7 +305,12 @@ private void updatePreferPrimaryShardBalanceBuffer(float preferPrimaryShardBalan } private void updateWeightFunction() { - weightFunction = new WeightFunction(this.indexBalanceFactor, this.shardBalanceFactor, this.preferPrimaryShardRebalanceBuffer); + weightFunction = new WeightFunction( + this.indexBalanceFactor, + this.shardBalanceFactor, + this.preferPrimaryShardRebalanceBuffer, + this.primaryConstraintThreshold + ); } /** @@ -317,6 +333,11 @@ private void setThreshold(float threshold) { this.threshold = threshold; } + private void setPrimaryConstraintThresholdSetting(long threshold) { + this.primaryConstraintThreshold = threshold; + this.weightFunction.updatePrimaryConstraintThreshold(threshold); + } + private void setAllocatorTimeout(TimeValue allocatorTimeout) { this.allocatorTimeout = allocatorTimeout; } @@ -489,10 +510,11 @@ static class WeightFunction { private final float shardBalance; private final float theta0; private final float theta1; + private long primaryConstraintThreshold; private AllocationConstraints constraints; private RebalanceConstraints rebalanceConstraints; - WeightFunction(float indexBalance, float shardBalance, float preferPrimaryBalanceBuffer) { + WeightFunction(float indexBalance, float shardBalance, float preferPrimaryBalanceBuffer, long primaryConstraintThreshold) { float sum = indexBalance + shardBalance; if (sum <= 0.0f) { throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); @@ -501,6 +523,7 @@ static class WeightFunction { theta1 = indexBalance / sum; this.indexBalance = indexBalance; this.shardBalance = shardBalance; + this.primaryConstraintThreshold = primaryConstraintThreshold; RebalanceParameter rebalanceParameter = new RebalanceParameter(preferPrimaryBalanceBuffer); this.constraints = new AllocationConstraints(); this.rebalanceConstraints = new RebalanceConstraints(rebalanceParameter); @@ -510,12 +533,12 @@ static class WeightFunction { public float weightWithAllocationConstraints(ShardsBalancer balancer, ModelNode node, String index) { float balancerWeight = weight(balancer, node, index); - return balancerWeight + constraints.weight(balancer, node, index); + return balancerWeight + constraints.weight(balancer, node, index, primaryConstraintThreshold); } public float weightWithRebalanceConstraints(ShardsBalancer balancer, ModelNode node, String index) { float balancerWeight = weight(balancer, node, index); - return balancerWeight + rebalanceConstraints.weight(balancer, node, index); + return balancerWeight + rebalanceConstraints.weight(balancer, node, index, primaryConstraintThreshold); } float weight(ShardsBalancer balancer, ModelNode node, String index) { @@ -531,6 +554,10 @@ void updateAllocationConstraint(String constraint, boolean enable) { void updateRebalanceConstraint(String constraint, boolean add) { this.rebalanceConstraints.updateRebalanceConstraint(constraint, add); } + + void updatePrimaryConstraintThreshold(long primaryConstraintThreshold) { + this.primaryConstraintThreshold = primaryConstraintThreshold; + } } /** diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index f769f8729c25b..c1f4e52706465 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -276,6 +276,7 @@ public void apply(Settings value, Settings current, Settings previous) { BalancedShardsAllocator.THRESHOLD_SETTING, BalancedShardsAllocator.IGNORE_THROTTLE_FOR_REMOTE_RESTORE, BalancedShardsAllocator.ALLOCATOR_TIMEOUT_SETTING, + BalancedShardsAllocator.PRIMARY_CONSTRAINT_THRESHOLD_SETTING, BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING, BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING, BreakerSettings.CIRCUIT_BREAKER_TYPE, diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java index 4c9fcd1650664..23d0b708441f8 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationConstraintsTests.java @@ -25,6 +25,8 @@ public class AllocationConstraintsTests extends OpenSearchAllocationTestCase { + long constraintWeight = 20L; + public void testSettings() { Settings.Builder settings = Settings.builder(); ClusterSettings service = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); @@ -69,7 +71,7 @@ public void testIndexShardsPerNodeConstraint() { when(node.getNodeId()).thenReturn("test-node"); long expectedWeight = (shardCount >= avgShardsPerNode) ? CONSTRAINT_WEIGHT : 0; - assertEquals(expectedWeight, constraints.weight(balancer, node, "index")); + assertEquals(expectedWeight, constraints.weight(balancer, node, "index", constraintWeight)); } @@ -91,14 +93,14 @@ public void testPerIndexPrimaryShardsConstraint() { when(node.numPrimaryShards(anyString())).thenReturn(perIndexPrimaryShardCount); when(node.getNodeId()).thenReturn("test-node"); - assertEquals(0, constraints.weight(balancer, node, indexName)); + assertEquals(0, constraints.weight(balancer, node, indexName, constraintWeight)); perIndexPrimaryShardCount = 2; when(node.numPrimaryShards(anyString())).thenReturn(perIndexPrimaryShardCount); - assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName)); + assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName, constraintWeight)); constraints.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, false); - assertEquals(0, constraints.weight(balancer, node, indexName)); + assertEquals(0, constraints.weight(balancer, node, indexName, constraintWeight)); } /** @@ -118,14 +120,14 @@ public void testGlobalPrimaryShardsConstraint() { when(node.numPrimaryShards()).thenReturn(primaryShardCount); when(node.getNodeId()).thenReturn("test-node"); - assertEquals(0, constraints.weight(balancer, node, indexName)); + assertEquals(0, constraints.weight(balancer, node, indexName, constraintWeight)); primaryShardCount = 3; when(node.numPrimaryShards()).thenReturn(primaryShardCount); - assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName)); + assertEquals(constraintWeight, constraints.weight(balancer, node, indexName, constraintWeight)); constraints.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, false); - assertEquals(0, constraints.weight(balancer, node, indexName)); + assertEquals(0, constraints.weight(balancer, node, indexName, constraintWeight)); } /** @@ -150,22 +152,22 @@ public void testPrimaryShardsConstraints() { when(node.numPrimaryShards()).thenReturn(primaryShardCount); when(node.getNodeId()).thenReturn("test-node"); - assertEquals(0, constraints.weight(balancer, node, indexName)); + assertEquals(0, constraints.weight(balancer, node, indexName, constraintWeight)); // breaching global primary shard count but not per index primary shard count primaryShardCount = 5; when(node.numPrimaryShards()).thenReturn(primaryShardCount); - assertEquals(CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName)); + assertEquals(constraintWeight, constraints.weight(balancer, node, indexName, constraintWeight)); // when per index primary shard count constraint is also breached perIndexPrimaryShardCount = 3; when(node.numPrimaryShards(indexName)).thenReturn(perIndexPrimaryShardCount); - assertEquals(2 * CONSTRAINT_WEIGHT, constraints.weight(balancer, node, indexName)); + assertEquals(CONSTRAINT_WEIGHT + constraintWeight, constraints.weight(balancer, node, indexName, constraintWeight)); // disable both constraints constraints.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, false); constraints.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, false); - assertEquals(0, constraints.weight(balancer, node, indexName)); + assertEquals(0, constraints.weight(balancer, node, indexName, constraintWeight)); } /** @@ -202,8 +204,8 @@ public void testAllConstraints() { long expectedWeight = (shardCount >= (int) Math.ceil(avgPerIndexShardsPerNode)) ? CONSTRAINT_WEIGHT : 0; expectedWeight += perIndexPrimaryShardCount > (int) Math.ceil(avgPerIndexPrimaryShardsPerNode) ? CONSTRAINT_WEIGHT : 0; - expectedWeight += primaryShardsPerNode >= (int) Math.ceil(avgPrimaryShardsPerNode) ? CONSTRAINT_WEIGHT : 0; - assertEquals(expectedWeight, constraints.weight(balancer, node, indexName)); + expectedWeight += primaryShardsPerNode >= (int) Math.ceil(avgPrimaryShardsPerNode) ? constraintWeight : 0; + assertEquals(expectedWeight, constraints.weight(balancer, node, indexName, constraintWeight)); } } From b2d537a01a368e11840582b656264f69cb06ed4d Mon Sep 17 00:00:00 2001 From: gaobinlong Date: Sat, 26 Oct 2024 00:02:58 +0800 Subject: [PATCH 4/8] Update version check in yaml test file for the bug fix for get index settings API (#16480) Signed-off-by: Gao Binlong --- .../test/indices.get_settings/40_number_of_routing_shards.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.get_settings/40_number_of_routing_shards.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.get_settings/40_number_of_routing_shards.yml index 3fb392d6db134..3c86678a690b8 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.get_settings/40_number_of_routing_shards.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.get_settings/40_number_of_routing_shards.yml @@ -22,8 +22,8 @@ setup: --- Test retrieval of number_routing_shards settings: - skip: - version: " - 2.99.99" - reason: "introduced in 3.0.0" # TODO: change it to 2.18.0 after backport to 2.x branch + version: " - 2.18.99" + reason: "introduced in 2.19.0" - do: indices.get_settings: flat_settings: true From 6f1b59e54bec41d40772f8571c7b65d4b523f8b1 Mon Sep 17 00:00:00 2001 From: Sumit Bansal Date: Sat, 26 Oct 2024 11:34:44 +0530 Subject: [PATCH 5/8] Add logic in master service to optimize performance and retain detailed logging for critical cluster operations. (#16421) Signed-off-by: Sumit Bansal Signed-off-by: shwetathareja Co-authored-by: shwetathareja --- CHANGELOG.md | 1 + .../cluster/service/MasterService.java | 32 ++++++++------ .../cluster/service/TaskBatcher.java | 34 +++++++++----- .../cluster/service/MasterServiceTests.java | 44 +++++++++---------- 4 files changed, 64 insertions(+), 47 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d6e76d946866b..e20df483030d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - URI path filtering support in cluster stats API ([#15938](https://github.com/opensearch-project/OpenSearch/pull/15938)) - [Star Tree - Search] Add support for metric aggregations with/without term query ([15289](https://github.com/opensearch-project/OpenSearch/pull/15289)) - Add support for restoring from snapshot with search replicas ([#16111](https://github.com/opensearch-project/OpenSearch/pull/16111)) +- Add logic in master service to optimize performance and retain detailed logging for critical cluster operations. ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795)) - Add Setting to adjust the primary constraint weights ([#16471](https://github.com/opensearch-project/OpenSearch/pull/16471)) ### Dependencies diff --git a/server/src/main/java/org/opensearch/cluster/service/MasterService.java b/server/src/main/java/org/opensearch/cluster/service/MasterService.java index 713de8cdd0fda..455e7301a490d 100644 --- a/server/src/main/java/org/opensearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/MasterService.java @@ -299,33 +299,37 @@ public static boolean assertNotMasterUpdateThread(String reason) { } private void runTasks(TaskInputs taskInputs) { - final String longSummary = logger.isTraceEnabled() ? taskInputs.taskSummaryGenerator.apply(true) : ""; - final String shortSummary = taskInputs.taskSummaryGenerator.apply(false); + final String summary; + if (logger.isTraceEnabled()) { + summary = taskInputs.taskSummaryGenerator.apply(true); + } else { + summary = taskInputs.taskSummaryGenerator.apply(false); + } if (!lifecycle.started()) { - logger.debug("processing [{}]: ignoring, cluster-manager service not started", shortSummary); + logger.debug("processing [{}]: ignoring, cluster-manager service not started", summary); return; } if (logger.isTraceEnabled()) { - logger.trace("executing cluster state update for [{}]", longSummary); + logger.trace("executing cluster state update for [{}]", summary); } else { - logger.debug("executing cluster state update for [{}]", shortSummary); + logger.debug("executing cluster state update for [{}]", summary); } final ClusterState previousClusterState = state(); if (!previousClusterState.nodes().isLocalNodeElectedClusterManager() && taskInputs.runOnlyWhenClusterManager()) { - logger.debug("failing [{}]: local node is no longer cluster-manager", shortSummary); + logger.debug("failing [{}]: local node is no longer cluster-manager", summary); taskInputs.onNoLongerClusterManager(); return; } final long computationStartTime = threadPool.preciseRelativeTimeInNanos(); - final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, shortSummary); + final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, summary); taskOutputs.notifyFailedTasks(); final TimeValue computationTime = getTimeSince(computationStartTime); - logExecutionTime(computationTime, "compute cluster state update", shortSummary); + logExecutionTime(computationTime, "compute cluster state update", summary); clusterManagerMetrics.recordLatency( clusterManagerMetrics.clusterStateComputeHistogram, @@ -337,17 +341,17 @@ private void runTasks(TaskInputs taskInputs) { final long notificationStartTime = threadPool.preciseRelativeTimeInNanos(); taskOutputs.notifySuccessfulTasksOnUnchangedClusterState(); final TimeValue executionTime = getTimeSince(notificationStartTime); - logExecutionTime(executionTime, "notify listeners on unchanged cluster state", shortSummary); + logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary); } else { final ClusterState newClusterState = taskOutputs.newClusterState; if (logger.isTraceEnabled()) { - logger.trace("cluster state updated, source [{}]\n{}", longSummary, newClusterState); + logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState); } else { - logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), shortSummary); + logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary); } final long publicationStartTime = threadPool.preciseRelativeTimeInNanos(); try { - ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(shortSummary, newClusterState, previousClusterState); + ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState); // new cluster state, notify all listeners final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta(); if (nodesDelta.hasChanges() && logger.isInfoEnabled()) { @@ -355,7 +359,7 @@ private void runTasks(TaskInputs taskInputs) { if (nodesDeltaSummary.length() > 0) { logger.info( "{}, term: {}, version: {}, delta: {}", - shortSummary, + summary, newClusterState.term(), newClusterState.version(), nodesDeltaSummary @@ -366,7 +370,7 @@ private void runTasks(TaskInputs taskInputs) { logger.debug("publishing cluster state version [{}]", newClusterState.version()); publish(clusterChangedEvent, taskOutputs, publicationStartTime); } catch (Exception e) { - handleException(shortSummary, publicationStartTime, newClusterState, e); + handleException(summary, publicationStartTime, newClusterState, e); } } } diff --git a/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java b/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java index 3513bfffb7157..ac54693b8ad1e 100644 --- a/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java +++ b/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java @@ -195,16 +195,12 @@ void runIfNotProcessed(BatchedTask updateTask) { if (toExecute.isEmpty() == false) { Function taskSummaryGenerator = (longSummaryRequired) -> { if (longSummaryRequired == null || !longSummaryRequired) { - return buildShortSummary(updateTask.batchingKey, toExecute.size()); + final List sampleTasks = toExecute.stream() + .limit(Math.min(1000, toExecute.size())) + .collect(Collectors.toList()); + return buildShortSummary(updateTask.batchingKey, toExecute.size(), getSummary(updateTask, sampleTasks)); } - final Map> processTasksBySource = new HashMap<>(); - for (final BatchedTask task : toExecute) { - processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task); - } - return processTasksBySource.entrySet().stream().map(entry -> { - String tasks = updateTask.describeTasks(entry.getValue()); - return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]"; - }).reduce((s1, s2) -> s1 + ", " + s2).orElse(""); + return getSummary(updateTask, toExecute); }; taskBatcherListener.onBeginProcessing(toExecute); run(updateTask.batchingKey, toExecute, taskSummaryGenerator); @@ -212,8 +208,24 @@ void runIfNotProcessed(BatchedTask updateTask) { } } - private String buildShortSummary(final Object batchingKey, final int taskCount) { - return "Tasks batched with key: " + batchingKey.toString().split("\\$")[0] + " and count: " + taskCount; + private String getSummary(final BatchedTask updateTask, final List toExecute) { + final Map> processTasksBySource = new HashMap<>(); + for (final BatchedTask task : toExecute) { + processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task); + } + return processTasksBySource.entrySet().stream().map(entry -> { + String tasks = updateTask.describeTasks(entry.getValue()); + return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]"; + }).reduce((s1, s2) -> s1 + ", " + s2).orElse(""); + } + + private String buildShortSummary(final Object batchingKey, final int taskCount, final String sampleTasks) { + return "Tasks batched with key: " + + batchingKey.toString().split("\\$")[0] + + ", count:" + + taskCount + + " and sample tasks: " + + sampleTasks; } /** diff --git a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java index db9abe0310e40..bb9e34d93431f 100644 --- a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java @@ -391,7 +391,7 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { "test1 computation", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [1s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "took [1s] to compute cluster state update for [test1]" ) ); mockAppender.addExpectation( @@ -399,7 +399,7 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { "test1 notification", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "took [0s] to notify listeners on unchanged cluster state for [test1]" ) ); @@ -416,7 +416,7 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { "test2 failure", MasterService.class.getCanonicalName(), Level.TRACE, - "failed to execute cluster state update (on version: [*], uuid: [*]) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]*" + "failed to execute cluster state update (on version: [*], uuid: [*]) for [test2]*" ) ); mockAppender.addExpectation( @@ -424,7 +424,7 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { "test2 computation", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [2s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "took [2s] to compute cluster state update for [test2]" ) ); mockAppender.addExpectation( @@ -432,7 +432,7 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { "test2 notification", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "took [0s] to notify listeners on unchanged cluster state for [test2]" ) ); @@ -449,7 +449,7 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { "test3 computation", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [3s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "took [3s] to compute cluster state update for [test3]" ) ); mockAppender.addExpectation( @@ -457,7 +457,7 @@ public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception { "test3 notification", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [4s] to notify listeners on successful publication of cluster state (version: *, uuid: *) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "took [4s] to notify listeners on successful publication of cluster state (version: *, uuid: *) for [test3]" ) ); @@ -548,7 +548,7 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception { "test1 start", MasterService.class.getCanonicalName(), Level.DEBUG, - "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test1]" ) ); mockAppender.addExpectation( @@ -556,7 +556,7 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception { "test1 computation", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [1s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "took [1s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test1]" ) ); mockAppender.addExpectation( @@ -564,7 +564,7 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception { "test1 notification", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test1]" ) ); @@ -573,7 +573,7 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception { "test2 start", MasterService.class.getCanonicalName(), Level.DEBUG, - "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test2]" ) ); mockAppender.addExpectation( @@ -581,7 +581,7 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception { "test2 failure", MasterService.class.getCanonicalName(), Level.DEBUG, - "failed to execute cluster state update (on version: [*], uuid: [*]) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]*" + "failed to execute cluster state update (on version: [*], uuid: [*]) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test2]*" ) ); mockAppender.addExpectation( @@ -589,7 +589,7 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception { "test2 computation", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [2s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "took [2s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test2]" ) ); mockAppender.addExpectation( @@ -597,7 +597,7 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception { "test2 notification", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test2]" ) ); @@ -606,7 +606,7 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception { "test3 start", MasterService.class.getCanonicalName(), Level.DEBUG, - "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test3]" ) ); mockAppender.addExpectation( @@ -614,7 +614,7 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception { "test3 computation", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [3s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "took [3s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test3]" ) ); mockAppender.addExpectation( @@ -622,7 +622,7 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception { "test3 notification", MasterService.class.getCanonicalName(), Level.DEBUG, - "took [4s] to notify listeners on successful publication of cluster state (version: *, uuid: *) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "took [4s] to notify listeners on successful publication of cluster state (version: *, uuid: *) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test3]" ) ); @@ -631,7 +631,7 @@ public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception { "test4", MasterService.class.getCanonicalName(), Level.DEBUG, - "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test4]" ) ); @@ -1238,7 +1238,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { "test2", MasterService.class.getCanonicalName(), Level.WARN, - "*took [*], which is over [10s], to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "*took [*], which is over [10s], to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test2]" ) ); mockAppender.addExpectation( @@ -1246,7 +1246,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { "test3", MasterService.class.getCanonicalName(), Level.WARN, - "*took [*], which is over [10s], to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "*took [*], which is over [10s], to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test3]" ) ); mockAppender.addExpectation( @@ -1254,7 +1254,7 @@ public void testLongClusterStateUpdateLogging() throws Exception { "test4", MasterService.class.getCanonicalName(), Level.WARN, - "*took [*], which is over [10s], to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]" + "*took [*], which is over [10s], to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test4]" ) ); mockAppender.addExpectation( @@ -1432,7 +1432,7 @@ public void testLongClusterStateUpdateLoggingForFailedPublication() throws Excep "test1 should log due to slow and failing publication", MasterService.class.getCanonicalName(), Level.WARN, - "took [*] and then failed to publish updated cluster state (version: *, uuid: *) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests and count: 1]:*" + "took [*] and then failed to publish updated cluster state (version: *, uuid: *) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests, count:1 and sample tasks: test1]:*" ) ); From 72559bf60c84a59908a0685db01d2ed5b1419111 Mon Sep 17 00:00:00 2001 From: rajiv-kv <157019998+rajiv-kv@users.noreply.github.com> Date: Mon, 28 Oct 2024 17:46:04 +0530 Subject: [PATCH 6/8] create publication repos during join task execution (#16383) * create publication repos during join task Signed-off-by: Rajiv Kumar Vaidyanathan --- .../RemotePublicationConfigurationIT.java | 10 +- .../coordination/JoinTaskExecutor.java | 42 +++- .../remotestore/RemoteStoreNodeService.java | 2 +- .../coordination/JoinTaskExecutorTests.java | 233 +++++++++++++++++- 4 files changed, 272 insertions(+), 15 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemotePublicationConfigurationIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemotePublicationConfigurationIT.java index 98859f517aad4..57bf9eccbf5b4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemotePublicationConfigurationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemotePublicationConfigurationIT.java @@ -14,6 +14,7 @@ import org.opensearch.remotemigration.MigrationBaseTestCase; import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.repositories.fs.FsRepository; import org.opensearch.repositories.fs.ReloadableFsRepository; import org.opensearch.test.InternalSettingsPlugin; import org.opensearch.test.OpenSearchIntegTestCase; @@ -97,23 +98,26 @@ public Settings.Builder remotePublishConfiguredNodeSetting() { .put(stateRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, true) .put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, ROUTING_TABLE_REPO_NAME) - .put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE) + .put(routingTableRepoTypeAttributeKey, FsRepository.TYPE) .put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath); return builder; } public Settings.Builder remoteWithRoutingTableNodeSetting() { // Remote Cluster with Routing table + return Settings.builder() .put( - buildRemoteStoreNodeAttributes( + remoteStoreClusterSettings( REPOSITORY_NAME, segmentRepoPath, + ReloadableFsRepository.TYPE, REPOSITORY_2_NAME, translogRepoPath, + ReloadableFsRepository.TYPE, REPOSITORY_NAME, segmentRepoPath, - false + ReloadableFsRepository.TYPE ) ) .put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java index 13033b670d44b..d597b51c32ccd 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java @@ -42,6 +42,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoriesMetadata; +import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.RerouteService; @@ -57,6 +58,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -185,11 +187,30 @@ public ClusterTasksResult execute(ClusterState currentState, List jo // for every set of node join task which we can optimize to not compute if cluster state already has // repository information. Optional remoteDN = currentNodes.getNodes().values().stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst(); - DiscoveryNode dn = remoteDN.orElseGet(() -> (currentNodes.getNodes().values()).stream().findFirst().get()); - RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata( - dn, - currentState.getMetadata().custom(RepositoriesMetadata.TYPE) - ); + Optional remotePublicationDN = currentNodes.getNodes() + .values() + .stream() + .filter(DiscoveryNode::isRemoteStatePublicationEnabled) + .findFirst(); + RepositoriesMetadata existingRepositoriesMetadata = currentState.getMetadata().custom(RepositoriesMetadata.TYPE); + Map repositories = new LinkedHashMap<>(); + if (existingRepositoriesMetadata != null) { + existingRepositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r)); + } + if (remoteDN.isPresent()) { + RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata( + remoteDN.get(), + existingRepositoriesMetadata + ); + repositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r)); + } + if (remotePublicationDN.isPresent()) { + RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata( + remotePublicationDN.get(), + existingRepositoriesMetadata + ); + repositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r)); + } assert nodesBuilder.isLocalNodeElectedClusterManager(); @@ -219,15 +240,16 @@ public ClusterTasksResult execute(ClusterState currentState, List jo ensureNodeCommissioned(node, currentState.metadata()); nodesBuilder.add(node); - if (remoteDN.isEmpty() && node.isRemoteStoreNode()) { + if ((remoteDN.isEmpty() && node.isRemoteStoreNode()) + || (remotePublicationDN.isEmpty() && node.isRemoteStatePublicationEnabled())) { // This is hit only on cases where we encounter first remote node logger.info("Updating system repository now for remote store"); - repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata( + RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata( node, - currentState.getMetadata().custom(RepositoriesMetadata.TYPE) + existingRepositoriesMetadata ); + repositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r)); } - nodesChanged = true; minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion()); maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion()); @@ -241,7 +263,7 @@ public ClusterTasksResult execute(ClusterState currentState, List jo } results.success(joinTask); } - + RepositoriesMetadata repositoriesMetadata = new RepositoriesMetadata(new ArrayList<>(repositories.values())); if (nodesChanged) { rerouteService.reroute( "post-join reroute", diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java index cc5d8b0e62e90..c1c041ce01198 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java @@ -168,7 +168,7 @@ public void createAndVerifyRepositories(DiscoveryNode localNode) { * node repository metadata an exception will be thrown and the node will not be allowed to join the cluster. */ public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode, RepositoriesMetadata existingRepositories) { - if (joiningNode.isRemoteStoreNode()) { + if (joiningNode.isRemoteStoreNode() || joiningNode.isRemoteStatePublicationEnabled()) { List updatedRepositoryMetadataList = new ArrayList<>(); List newRepositoryMetadataList = new RemoteStoreNodeAttribute(joiningNode).getRepositoriesMetadata() .repositories(); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java index d192a2556c36b..f6fb203bfe1a9 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java @@ -959,6 +959,198 @@ public void testUpdatesClusterStateWithMultiNodeClusterAndSameRepository() throw validateRepositoryMetadata(result.resultingState, clusterManagerNode, 2); } + public void testUpdatesRepoRemoteNodeJoinPublicationCluster() throws Exception { + final AllocationService allocationService = mock(AllocationService.class); + when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]); + final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); + final RemoteStoreNodeService remoteStoreNodeService = new RemoteStoreNodeService( + new SetOnce<>(mock(RepositoriesService.class))::get, + null + ); + + final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor( + Settings.EMPTY, + allocationService, + logger, + rerouteService, + remoteStoreNodeService + ); + + final DiscoveryNode clusterManagerNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remotePublicationNodeAttributes(), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes( + DiscoveryNodes.builder() + .add(clusterManagerNode) + .localNodeId(clusterManagerNode.getId()) + .clusterManagerNodeId(clusterManagerNode.getId()) + ) + .build(); + + final ClusterStateTaskExecutor.ClusterTasksResult result = joinTaskExecutor.execute( + clusterState, + List.of(new JoinTaskExecutor.Task(clusterManagerNode, "elect leader")) + ); + assertThat(result.executionResults.entrySet(), hasSize(1)); + final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next(); + assertTrue(taskResult.isSuccess()); + validatePublicationRepositoryMetadata(result.resultingState, clusterManagerNode); + + final Settings settings = Settings.builder() + .put(MIGRATION_DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE) + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed") + .build(); + final Settings nodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build(); + FeatureFlags.initializeFeatureFlags(nodeSettings); + Metadata metadata = Metadata.builder().persistentSettings(settings).build(); + + ClusterState currentState = ClusterState.builder(result.resultingState).metadata(metadata).build(); + + final DiscoveryNode remoteStoreNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + final ClusterStateTaskExecutor.ClusterTasksResult resultAfterRemoteNodeJoin = joinTaskExecutor.execute( + currentState, + List.of(new JoinTaskExecutor.Task(remoteStoreNode, "test")) + ); + assertThat(resultAfterRemoteNodeJoin.executionResults.entrySet(), hasSize(1)); + final ClusterStateTaskExecutor.TaskResult taskResult1 = resultAfterRemoteNodeJoin.executionResults.values().iterator().next(); + assertTrue(taskResult1.isSuccess()); + validateRepositoriesMetadata(resultAfterRemoteNodeJoin.resultingState, remoteStoreNode, clusterManagerNode); + } + + public void testUpdatesRepoPublicationNodeJoinRemoteCluster() throws Exception { + final AllocationService allocationService = mock(AllocationService.class); + when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]); + final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); + final RemoteStoreNodeService remoteStoreNodeService = new RemoteStoreNodeService( + new SetOnce<>(mock(RepositoriesService.class))::get, + null + ); + + final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor( + Settings.EMPTY, + allocationService, + logger, + rerouteService, + remoteStoreNodeService + ); + + final DiscoveryNode clusterManagerNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes( + DiscoveryNodes.builder() + .add(clusterManagerNode) + .localNodeId(clusterManagerNode.getId()) + .clusterManagerNodeId(clusterManagerNode.getId()) + ) + .build(); + + final ClusterStateTaskExecutor.ClusterTasksResult result = joinTaskExecutor.execute( + clusterState, + List.of(new JoinTaskExecutor.Task(clusterManagerNode, "elect leader")) + ); + final Settings settings = Settings.builder() + .put(MIGRATION_DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE) + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed") + .build(); + final Settings nodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build(); + FeatureFlags.initializeFeatureFlags(nodeSettings); + Metadata metadata = Metadata.builder().persistentSettings(settings).build(); + + ClusterState currentState = ClusterState.builder(result.resultingState).metadata(metadata).build(); + + final DiscoveryNode remotePublicationNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remotePublicationNodeAttributes(), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + final ClusterStateTaskExecutor.ClusterTasksResult resultAfterRemoteNodeJoin = joinTaskExecutor.execute( + currentState, + List.of(new JoinTaskExecutor.Task(remotePublicationNode, "test")) + ); + assertThat(resultAfterRemoteNodeJoin.executionResults.entrySet(), hasSize(1)); + final ClusterStateTaskExecutor.TaskResult taskResult1 = resultAfterRemoteNodeJoin.executionResults.values().iterator().next(); + assertTrue(taskResult1.isSuccess()); + validateRepositoriesMetadata(resultAfterRemoteNodeJoin.resultingState, clusterManagerNode, remotePublicationNode); + } + + public void testUpdatesClusterStateWithMultiplePublicationNodeJoin() throws Exception { + Map remoteStoreNodeAttributes = remotePublicationNodeAttributes(); + final AllocationService allocationService = mock(AllocationService.class); + when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]); + final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); + final RemoteStoreNodeService remoteStoreNodeService = new RemoteStoreNodeService( + new SetOnce<>(mock(RepositoriesService.class))::get, + null + ); + + final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor( + Settings.EMPTY, + allocationService, + logger, + rerouteService, + remoteStoreNodeService + ); + + final DiscoveryNode clusterManagerNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + List repositoriesMetadata = new ArrayList<>(); + + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes( + DiscoveryNodes.builder() + .add(clusterManagerNode) + .localNodeId(clusterManagerNode.getId()) + .clusterManagerNodeId(clusterManagerNode.getId()) + ) + .metadata(Metadata.builder().putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(repositoriesMetadata))) + .build(); + + final DiscoveryNode joiningNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + final ClusterStateTaskExecutor.ClusterTasksResult result = joinTaskExecutor.execute( + clusterState, + List.of(new JoinTaskExecutor.Task(joiningNode, "test")) + ); + assertThat(result.executionResults.entrySet(), hasSize(1)); + final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next(); + assertTrue(taskResult.isSuccess()); + validatePublicationRepositoryMetadata(result.resultingState, clusterManagerNode); + } + public void testNodeJoinInMixedMode() { List versions = allOpenSearchVersions(); assert versions.size() >= 2 : "test requires at least two open search versions"; @@ -1191,7 +1383,9 @@ private void validateRepositoryMetadata(ClusterState updatedState, DiscoveryNode final RepositoriesMetadata repositoriesMetadata = updatedState.metadata().custom(RepositoriesMetadata.TYPE); assertTrue(repositoriesMetadata.repositories().size() == expectedRepositories); - if (repositoriesMetadata.repositories().size() == 2 || repositoriesMetadata.repositories().size() == 3) { + if (repositoriesMetadata.repositories().size() == 2 + || repositoriesMetadata.repositories().size() == 3 + || repositoriesMetadata.repositories().size() == 4) { final RepositoryMetadata segmentRepositoryMetadata = buildRepositoryMetadata(existingNode, SEGMENT_REPO); final RepositoryMetadata translogRepositoryMetadata = buildRepositoryMetadata(existingNode, TRANSLOG_REPO); for (RepositoryMetadata repositoryMetadata : repositoriesMetadata.repositories()) { @@ -1212,6 +1406,43 @@ private void validateRepositoryMetadata(ClusterState updatedState, DiscoveryNode } } + private void validatePublicationRepositoryMetadata(ClusterState updatedState, DiscoveryNode existingNode) throws Exception { + final RepositoriesMetadata repositoriesMetadata = updatedState.metadata().custom(RepositoriesMetadata.TYPE); + assertTrue(repositoriesMetadata.repositories().size() == 2); + final RepositoryMetadata clusterStateRepoMetadata = buildRepositoryMetadata(existingNode, CLUSTER_STATE_REPO); + final RepositoryMetadata routingTableRepoMetadata = buildRepositoryMetadata(existingNode, ROUTING_TABLE_REPO); + for (RepositoryMetadata repositoryMetadata : repositoriesMetadata.repositories()) { + if (repositoryMetadata.name().equals(clusterStateRepoMetadata.name())) { + assertTrue(clusterStateRepoMetadata.equalsIgnoreGenerations(repositoryMetadata)); + } else if (repositoryMetadata.name().equals(routingTableRepoMetadata.name())) { + assertTrue(routingTableRepoMetadata.equalsIgnoreGenerations(repositoryMetadata)); + } + } + } + + private void validateRepositoriesMetadata(ClusterState updatedState, DiscoveryNode remoteNode, DiscoveryNode publicationNode) + throws Exception { + + final RepositoriesMetadata repositoriesMetadata = updatedState.metadata().custom(RepositoriesMetadata.TYPE); + assertEquals(4, repositoriesMetadata.repositories().size()); + final RepositoryMetadata segmentRepositoryMetadata = buildRepositoryMetadata(remoteNode, SEGMENT_REPO); + final RepositoryMetadata translogRepositoryMetadata = buildRepositoryMetadata(remoteNode, TRANSLOG_REPO); + final RepositoryMetadata clusterStateRepositoryMetadata = buildRepositoryMetadata(remoteNode, CLUSTER_STATE_REPO); + final RepositoryMetadata routingTableRepositoryMetadata = buildRepositoryMetadata(publicationNode, ROUTING_TABLE_REPO); + + for (RepositoryMetadata repositoryMetadata : repositoriesMetadata.repositories()) { + if (repositoryMetadata.name().equals(segmentRepositoryMetadata.name())) { + assertTrue(segmentRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata)); + } else if (repositoryMetadata.name().equals(translogRepositoryMetadata.name())) { + assertTrue(translogRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata)); + } else if (repositoryMetadata.name().equals(clusterStateRepositoryMetadata.name())) { + assertTrue(clusterStateRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata)); + } else if (repositoryMetadata.name().equals(routingTableRepositoryMetadata.name())) { + assertTrue(repositoryMetadata.equalsIgnoreGenerations(routingTableRepositoryMetadata)); + } + } + } + private DiscoveryNode newDiscoveryNode(Map attributes) { return new DiscoveryNode( randomAlphaOfLength(10), From 0fcb3ab82e0e2373f79287dd035c3b5e2a5fa306 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 28 Oct 2024 11:15:44 -0400 Subject: [PATCH 7/8] Bump ch.qos.logback:logback-core from 1.5.10 to 1.5.12 in /test/fixtures/hdfs-fixture (#16503) * Bump ch.qos.logback:logback-core in /test/fixtures/hdfs-fixture Bumps [ch.qos.logback:logback-core](https://github.com/qos-ch/logback) from 1.5.10 to 1.5.12. - [Commits](https://github.com/qos-ch/logback/compare/v_1.5.10...v_1.5.12) --- updated-dependencies: - dependency-name: ch.qos.logback:logback-core dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] * Update changelog Signed-off-by: dependabot[bot] --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] --- CHANGELOG.md | 2 +- test/fixtures/hdfs-fixture/build.gradle | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e20df483030d6..c1e4f459d1422 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,7 +41,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `peter-evans/create-pull-request` from 6 to 7 ([#15863](https://github.com/opensearch-project/OpenSearch/pull/15863)) - Bump `com.nimbusds:oauth2-oidc-sdk` from 11.9.1 to 11.19.1 ([#15862](https://github.com/opensearch-project/OpenSearch/pull/15862)) - Bump `com.microsoft.azure:msal4j` from 1.17.0 to 1.17.2 ([#15945](https://github.com/opensearch-project/OpenSearch/pull/15945), [#16406](https://github.com/opensearch-project/OpenSearch/pull/16406)) -- Bump `ch.qos.logback:logback-core` from 1.5.6 to 1.5.10 ([#15946](https://github.com/opensearch-project/OpenSearch/pull/15946), [#16307](https://github.com/opensearch-project/OpenSearch/pull/16307)) +- Bump `ch.qos.logback:logback-core` from 1.5.6 to 1.5.12 ([#15946](https://github.com/opensearch-project/OpenSearch/pull/15946), [#16307](https://github.com/opensearch-project/OpenSearch/pull/16307), [#16503](https://github.com/opensearch-project/OpenSearch/pull/16503)) - Update protobuf from 3.25.4 to 3.25.5 ([#16011](https://github.com/opensearch-project/OpenSearch/pull/16011)) - Bump `org.roaringbitmap:RoaringBitmap` from 1.2.1 to 1.3.0 ([#16040](https://github.com/opensearch-project/OpenSearch/pull/16040)) - Bump `com.nimbusds:nimbus-jose-jwt` from 9.40 to 9.41.1 ([#16038](https://github.com/opensearch-project/OpenSearch/pull/16038)) diff --git a/test/fixtures/hdfs-fixture/build.gradle b/test/fixtures/hdfs-fixture/build.gradle index 2bd0268ca136b..d27273f357758 100644 --- a/test/fixtures/hdfs-fixture/build.gradle +++ b/test/fixtures/hdfs-fixture/build.gradle @@ -74,7 +74,7 @@ dependencies { api 'org.apache.zookeeper:zookeeper:3.9.2' api "org.apache.commons:commons-text:1.12.0" api "commons-net:commons-net:3.11.1" - api "ch.qos.logback:logback-core:1.5.10" + api "ch.qos.logback:logback-core:1.5.12" api "ch.qos.logback:logback-classic:1.2.13" api "org.jboss.xnio:xnio-nio:3.8.16.Final" api 'org.jline:jline:3.27.1' From 9f7d3b6dcebf23a9ed40fa34a80f187569f40048 Mon Sep 17 00:00:00 2001 From: rajiv-kv <157019998+rajiv-kv@users.noreply.github.com> Date: Mon, 28 Oct 2024 21:35:48 +0530 Subject: [PATCH 8/8] using the routing allocation to cancel existing recoveries (#16468) Signed-off-by: Rajiv Kumar Vaidyanathan --- .../coordination/RareClusterStateIT.java | 179 ++++++++++++++++++ .../gateway/AsyncShardBatchFetch.java | 12 +- .../gateway/ReplicaShardBatchAllocator.java | 31 ++- 3 files changed, 212 insertions(+), 10 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/RareClusterStateIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/RareClusterStateIT.java index b3cb15d028090..cc0264f375103 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/RareClusterStateIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/RareClusterStateIT.java @@ -40,6 +40,7 @@ import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.MappingMetadata; @@ -48,29 +49,39 @@ import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.action.ActionFuture; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.index.Index; +import org.opensearch.core.transport.TransportResponse; import org.opensearch.discovery.Discovery; import org.opensearch.index.IndexService; import org.opensearch.index.mapper.DocumentMapper; import org.opensearch.index.mapper.MapperService; import org.opensearch.indices.IndicesService; +import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.disruption.BlockClusterStateProcessing; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.TransportService; import org.opensearch.transport.TransportSettings; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.opensearch.action.DocWriteResponse.Result.CREATED; +import static org.opensearch.cluster.action.shard.ShardStateAction.SHARD_STARTED_ACTION_NAME; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; @@ -409,4 +420,172 @@ public void testDelayedMappingPropagationOnReplica() throws Exception { assertThat(dynamicMappingsFut.get(10, TimeUnit.SECONDS).getResult(), equalTo(CREATED)); } + public void testDisassociateNodesWhileShardInit() throws InterruptedException { + final String clusterManagerName = internalCluster().startClusterManagerOnlyNode( + Settings.builder() + .put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s") + .put(ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_BATCH_MODE.getKey(), true) + .build() + ); + internalCluster().startDataOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build()); + internalCluster().startDataOnlyNode(Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build()); + String node2 = internalCluster().startDataOnlyNode( + Settings.builder().put(TransportSettings.CONNECT_TIMEOUT.getKey(), "1s").build() + ); + + final ClusterService clusterService = internalCluster().clusterService(clusterManagerName); + blockShardStartedResponse(clusterManagerName, clusterService); + + final String index = "index"; + + // create index with 3 primary and 1 replica each + prepareCreate(index).setSettings( + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + // .put(INDEX_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "primaries") + ).get(); + ensureGreen(index); + + // close to have some unassigned started shards shards.. + client().admin().indices().prepareClose(index).get(); + + // block so that replicas are always in init and not started + blockReplicaStart.set(true); + final AllocationService allocationService = internalCluster().getInstance(AllocationService.class, clusterManagerName); + clusterService.submitStateUpdateTask("test-delete-node-and-reroute", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + ClusterState.Builder builder = ClusterState.builder(currentState); + // open index + final IndexMetadata indexMetadata = IndexMetadata.builder(currentState.metadata().index(index)) + .state(IndexMetadata.State.OPEN) + .build(); + + builder.metadata(Metadata.builder(currentState.metadata()).put(indexMetadata, true)); + builder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(index)); + ClusterState updatedState = builder.build(); + RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable()); + routingTable.addAsRecovery(updatedState.metadata().index(index)); + updatedState = ClusterState.builder(updatedState).routingTable(routingTable.build()).build(); + ClusterState state = allocationService.reroute(updatedState, "reroute"); + return state; + } + + @Override + public void onFailure(String source, Exception e) { + logger.error(e.getMessage(), e); + } + }); + + ensureYellow(index); + assertTrue(waitUntil(() -> { + ClusterState state = clusterService.state(); + return state.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 3; + + })); + + logger.info("Initializing shards"); + logger.info(clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING)); + + // trigger 2nd reroute after shard in initialized + clusterService.submitStateUpdateTask("test-delete-node-and-reroute", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + return allocationService.reroute(currentState, "reroute"); + } + + @Override + public void onFailure(String source, Exception e) {} + }); + + ensureYellow(index); + assertTrue(waitUntil(() -> clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 3)); + clusterService.submitStateUpdateTask("test-remove-injected-node", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + // remove the primary node of replica shard which is in init + ShardRouting next = currentState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0); + ShardRouting primaryShard = currentState.getRoutingNodes().activePrimary(next.shardId()); + + ClusterState.Builder builder = ClusterState.builder(currentState); + builder.nodes(DiscoveryNodes.builder(currentState.nodes()).remove(primaryShard.currentNodeId())); + currentState = builder.build(); + logger.info("removed the node {}", primaryShard.currentNodeId()); + logger.info("shard {}", next); + ClusterState state = allocationService.disassociateDeadNodes(currentState, true, "reroute"); + return state; + } + + @Override + public void onFailure(String source, Exception e) {} + }); + assertTrue(waitUntil(() -> { + ClusterState state = clusterService.state(); + logger.info("current state {} ", state); + return clusterService.state().nodes().getSize() == 3; + + })); + + logger.info(clusterService.state().getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING)); + blockReplicaStart.set(false); + + clusterService.submitStateUpdateTask("test-inject-node-and-reroute", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + ClusterState.Builder builder = ClusterState.builder(currentState); + final IndexMetadata indexMetadata = IndexMetadata.builder(currentState.metadata().index(index)) + .state(IndexMetadata.State.OPEN) + .build(); + builder.metadata(Metadata.builder(currentState.metadata()).put(indexMetadata, true)); + builder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(index)); + ClusterState updatedState = builder.build(); + RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable()); + routingTable.addAsRecovery(updatedState.metadata().index(index)); + updatedState = ClusterState.builder(updatedState).routingTable(routingTable.build()).build(); + + return allocationService.reroute(updatedState, "reroute"); + } + + @Override + public void onFailure(String source, Exception e) {} + }); + + ensureGreen(index); + } + + AtomicBoolean blockReplicaStart = new AtomicBoolean(false); + + private void blockShardStartedResponse(String master, ClusterService service) { + MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, master); + primaryService.addRequestHandlingBehavior(SHARD_STARTED_ACTION_NAME, (handler, request, channel, task) -> { + + if (blockReplicaStart.get()) { + ShardStateAction.StartedShardEntry req = (ShardStateAction.StartedShardEntry) request; + String stringRep = req.toString(); + logger.info("ShardStateAction.StartedShardEntry {}", stringRep); + + String incomingRequest = req.toString(); + Optional matchReplica = service.state() + .routingTable() + .allShardsSatisfyingPredicate(r -> !r.primary()) + .getShardRoutings() + .stream() + .filter(r -> r.allocationId() != null) + .filter(r -> incomingRequest.contains(r.allocationId().getId())) + .findAny(); + + if (matchReplica.isPresent()) { + channel.sendResponse(TransportResponse.Empty.INSTANCE); + } else { + handler.messageReceived(request, channel, task); + } + } else { + handler.messageReceived(request, channel, task); + } + }); + } + + @Override + protected Collection> nodePlugins() { + return List.of(MockTransportService.TestPlugin.class); + } } diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java index d86d41bb1a359..095730bd84f8d 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java @@ -241,11 +241,13 @@ private void fillShardData(Map shardDataFromNode, Map shardData : shardDataFromNode.entrySet()) { if (shardData.getValue() != null) { ShardId shardId = shardData.getKey(); - if (emptyShardResponsePredicate.test(shardData.getValue())) { - this.emptyShardResponse[shardIdKey.get(shardId)] = true; - this.shardData[shardIdKey.get(shardId)] = null; - } else { - this.shardData[shardIdKey.get(shardId)] = shardData.getValue(); + if (shardIdKey.get(shardId) != null) {// the response might be for shard which is no longer present in cache + if (emptyShardResponsePredicate.test(shardData.getValue())) { + this.emptyShardResponse[shardIdKey.get(shardId)] = true; + this.shardData[shardIdKey.get(shardId)] = null; + } else { + this.shardData[shardIdKey.get(shardId)] = shardData.getValue(); + } } } } diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java index 020a543ac5fc5..2b6fdfa8dd411 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java @@ -12,6 +12,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.NodeAllocationResult; @@ -51,6 +52,12 @@ public abstract class ReplicaShardBatchAllocator extends ReplicaShardAllocator { */ public void processExistingRecoveries(RoutingAllocation allocation, List> shardBatches) { List shardCancellationActions = new ArrayList<>(); + Map> initReplicasFromRouting = new HashMap<>(); + allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).stream().filter(r -> !r.primary()).forEach(r -> { + initReplicasFromRouting.putIfAbsent(r.shardId(), new ArrayList<>()); + initReplicasFromRouting.get(r.shardId()).add(r); + }); + // iterate through the batches, each batch needs to be processed together as fetch call should be made for shards from same batch for (List shardBatch : shardBatches) { List eligibleShards = new ArrayList<>(); @@ -58,6 +65,12 @@ public void processExistingRecoveries(RoutingAllocation allocation, List nodeShardStores = convertToNodeStoreFilesMetadataMap(shard, shardState); - - Runnable cancellationAction = cancelExistingRecoveryForBetterMatch(shard, allocation, nodeShardStores); - if (cancellationAction != null) { - shardCancellationActions.add(cancellationAction); + for (ShardRouting initShardsFromAllocation : initReplicasFromRouting.get(shard.shardId())) { + Map nodeShardStores = convertToNodeStoreFilesMetadataMap( + initShardsFromAllocation, + shardState + ); + Runnable cancellationAction = cancelExistingRecoveryForBetterMatch( + initShardsFromAllocation, + allocation, + nodeShardStores + ); + if (cancellationAction != null) { + shardCancellationActions.add(cancellationAction); + } } } }