From 7d0fe34e3098ea79907be80e0edfc742abf4660c Mon Sep 17 00:00:00 2001 From: pranikum <109206473+pranikum@users.noreply.github.com> Date: Thu, 6 Oct 2022 09:49:02 +0530 Subject: [PATCH] Service Layer changes for Recommission API (#4320) * Recommission API service level changes Signed-off-by: pranikum <109206473+pranikum@users.noreply.github.com> --- CHANGELOG.md | 2 +- .../decommission/DecommissionService.java | 50 +++++++++++ .../DecommissionServiceTests.java | 85 +++++++++++++++++++ 3 files changed, 136 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 290fd5d74fe80..6b7b7b29cc904 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,7 +28,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Added precommit support for windows ([#4676](https://github.com/opensearch-project/OpenSearch/pull/4676)) - Added release notes for 1.3.6 ([#4681](https://github.com/opensearch-project/OpenSearch/pull/4681)) - Added precommit support for MacOS ([#4682](https://github.com/opensearch-project/OpenSearch/pull/4682)) - +- Recommission API changes for service layer ([#4320](https://github.com/opensearch-project/OpenSearch/pull/4320)) ### Dependencies - Bumps `log4j-core` from 2.18.0 to 2.19.0 - Bumps `reactor-netty-http` from 1.0.18 to 1.0.23 diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index c31ea53a5fd16..fcab411f073ba 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -14,6 +14,7 @@ import org.opensearch.OpenSearchTimeoutException; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse; +import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.ClusterStateUpdateTask; @@ -481,4 +482,53 @@ public void onFailure(Exception e) { } }; } + + public void startRecommissionAction(final ActionListener listener) { + /* + * For abandoned requests, we might not really know if it actually restored the exclusion list. + * And can land up in cases where even after recommission, exclusions are set(which is unexpected). + * And by definition of OpenSearch - Clusters should have no voting configuration exclusions in normal operation. + * Once the excluded nodes have stopped, clear the voting configuration exclusions with DELETE /_cluster/voting_config_exclusions. + * And hence it is safe to remove the exclusion if any. User should make conscious choice before decommissioning awareness attribute. + */ + decommissionController.clearVotingConfigExclusion(new ActionListener() { + @Override + public void onResponse(Void unused) { + logger.info("successfully cleared voting config exclusion for deleting the decommission."); + deleteDecommissionState(listener); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failure in clearing voting config during delete_decommission request.", e); + listener.onFailure(e); + } + }, false); + } + + void deleteDecommissionState(ActionListener listener) { + clusterService.submitStateUpdateTask("delete_decommission_state", new ClusterStateUpdateTask(Priority.URGENT) { + @Override + public ClusterState execute(ClusterState currentState) { + logger.info("Deleting the decommission attribute from the cluster state"); + Metadata metadata = currentState.metadata(); + Metadata.Builder mdBuilder = Metadata.builder(metadata); + mdBuilder.removeCustom(DecommissionAttributeMetadata.TYPE); + return ClusterState.builder(currentState).metadata(mdBuilder).build(); + } + + @Override + public void onFailure(String source, Exception e) { + logger.error(() -> new ParameterizedMessage("Failed to clear decommission attribute. [{}]", source), e); + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + // Cluster state processed for deleting the decommission attribute. + assert newState.metadata().decommissionAttributeMetadata() == null; + listener.onResponse(new AcknowledgedResponse(true)); + } + }); + } } diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java index 61a6662ef0cf3..840ce1634a68e 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java @@ -11,9 +11,13 @@ import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse; +import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest; +import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.CoordinationMetadata; @@ -30,6 +34,7 @@ import org.opensearch.test.transport.MockTransport; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportResponseHandler; import org.opensearch.transport.TransportService; import java.util.Collections; @@ -201,6 +206,86 @@ public void onFailure(Exception e) { assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } + public void testClearClusterDecommissionState() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-2"); + DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( + decommissionAttribute, + DecommissionStatus.SUCCESSFUL + ); + ClusterState state = ClusterState.builder(new ClusterName("test")) + .metadata(Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build()) + .build(); + + ActionListener listener = new ActionListener() { + @Override + public void onResponse(AcknowledgedResponse decommissionResponse) { + DecommissionAttributeMetadata metadata = clusterService.state().metadata().custom(DecommissionAttributeMetadata.TYPE); + assertNull(metadata); + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + fail("on failure shouldn't have been called"); + countDownLatch.countDown(); + } + }; + + this.decommissionService.deleteDecommissionState(listener); + + // Decommission Attribute should be removed. + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + } + + public void testDeleteDecommissionAttributeClearVotingExclusion() { + TransportService mockTransportService = Mockito.mock(TransportService.class); + Mockito.when(mockTransportService.getLocalNode()).thenReturn(Mockito.mock(DiscoveryNode.class)); + DecommissionService decommissionService = new DecommissionService( + Settings.EMPTY, + clusterSettings, + clusterService, + mockTransportService, + threadPool, + allocationService + ); + decommissionService.startRecommissionAction(Mockito.mock(ActionListener.class)); + + ArgumentCaptor clearVotingConfigExclusionsRequestArgumentCaptor = ArgumentCaptor.forClass( + ClearVotingConfigExclusionsRequest.class + ); + Mockito.verify(mockTransportService) + .sendRequest( + Mockito.any(DiscoveryNode.class), + Mockito.anyString(), + clearVotingConfigExclusionsRequestArgumentCaptor.capture(), + Mockito.any(TransportResponseHandler.class) + ); + + ClearVotingConfigExclusionsRequest request = clearVotingConfigExclusionsRequestArgumentCaptor.getValue(); + assertFalse(request.getWaitForRemoval()); + } + + public void testClusterUpdateTaskForDeletingDecommission() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + ActionListener listener = new ActionListener<>() { + @Override + public void onResponse(AcknowledgedResponse response) { + assertTrue(response.isAcknowledged()); + assertNull(clusterService.state().metadata().decommissionAttributeMetadata()); + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + fail("On Failure shouldn't have been called"); + countDownLatch.countDown(); + } + }; + decommissionService.deleteDecommissionState(listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + } + private ClusterState addDataNodes(ClusterState clusterState, String zone, String... nodeIds) { DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()); org.opensearch.common.collect.List.of(nodeIds).forEach(nodeId -> nodeBuilder.add(newDataNode(nodeId, singletonMap("zone", zone))));