From c504b517f27dde35f893b8f8971440344706132e Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Sun, 29 Jan 2023 21:08:38 +0530 Subject: [PATCH] Cluster health call to throw decommissioned exception for local decommissioned node (#6008) * Cluster health call to throw decommissioned exception for local decommissioned nodes Signed-off-by: Rishab Nahata --- CHANGELOG.md | 1 + .../rest-api-spec/api/cluster.health.json | 4 +++ .../AwarenessAttributeDecommissionIT.java | 30 +++++++++++++++++++ .../cluster/health/ClusterHealthRequest.java | 24 +++++++++++++++ .../health/ClusterHealthRequestBuilder.java | 8 +++++ .../health/TransportClusterHealthAction.java | 15 ++++++++-- .../cluster/coordination/Coordinator.java | 3 +- .../NodeDecommissionedException.java | 6 ++++ .../cluster/RestClusterHealthAction.java | 3 ++ .../health/ClusterHealthRequestTests.java | 18 +++++++++++ .../health/ClusterStateHealthTests.java | 3 +- .../cluster/RestClusterHealthActionTests.java | 6 ++++ 12 files changed, 116 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e4b256f3ff689..6d49a60564109 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Changed - Use ReplicationFailedException instead of OpensearchException in ReplicationTarget ([#4725](https://github.com/opensearch-project/OpenSearch/pull/4725)) - [Refactor] Use local opensearch.common.SetOnce instead of lucene's utility class ([#5947](https://github.com/opensearch-project/OpenSearch/pull/5947)) +- Cluster health call to throw decommissioned exception for local decommissioned node([#6008](https://github.com/opensearch-project/OpenSearch/pull/6008)) ### Deprecated diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.health.json b/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.health.json index 2fc9a2d4fd716..a7c5dac5fe414 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.health.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.health.json @@ -111,6 +111,10 @@ "awareness_attribute":{ "type":"string", "description":"The awareness attribute for which the health is required" + }, + "ensure_node_commissioned":{ + "type":"boolean", + "description": "Checks whether local node is commissioned or not. If set to true on a local call it will throw exception if node is decommissioned (default: false)" } } } diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java index e2eb08bd0969c..676cebce9e6af 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java @@ -305,6 +305,36 @@ public boolean innerMatch(LogEvent event) { Coordinator coordinator = (Coordinator) internalCluster().getInstance(Discovery.class, decommissionedNode); assertFalse(coordinator.localNodeCommissioned()); + // Check cluster health API for decommissioned and active node + ClusterHealthResponse activeNodeLocalHealth = client(activeNode).admin() + .cluster() + .prepareHealth() + .setLocal(true) + .setEnsureNodeCommissioned(true) + .execute() + .actionGet(); + assertFalse(activeNodeLocalHealth.isTimedOut()); + + ClusterHealthResponse decommissionedNodeLocalHealth = client(decommissionedNode).admin() + .cluster() + .prepareHealth() + .setLocal(true) + .execute() + .actionGet(); + assertFalse(decommissionedNodeLocalHealth.isTimedOut()); + + NodeDecommissionedException ex = expectThrows( + NodeDecommissionedException.class, + () -> client(decommissionedNode).admin() + .cluster() + .prepareHealth() + .setLocal(true) + .setEnsureNodeCommissioned(true) + .execute() + .actionGet() + ); + assertTrue(ex.getMessage().contains("local node is decommissioned")); + // Recommissioning the zone back to gracefully succeed the test once above tests succeeds DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(activeNode).execute( DeleteDecommissionStateAction.INSTANCE, diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequest.java index a9a9369e052c0..d137d3b1ac9d9 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequest.java @@ -68,6 +68,7 @@ public class ClusterHealthRequest extends ClusterManagerNodeReadRequesttrue if local information is to be returned only when local node is also commissioned + * false to not check local node if commissioned or not for a local request + */ + public final boolean ensureNodeCommissioned() { + return ensureNodeCommissioned; + } + @Override public ActionRequestValidationException validate() { if (level.equals(Level.AWARENESS_ATTRIBUTES) && indices.length > 0) { @@ -328,6 +349,9 @@ public ActionRequestValidationException validate() { } else if (!level.equals(Level.AWARENESS_ATTRIBUTES) && awarenessAttribute != null) { return addValidationError("level=awareness_attributes is required with awareness_attribute parameter", null); } + if (ensureNodeCommissioned && local == false) { + return addValidationError("not a local request to ensure local node commissioned", null); + } return null; } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequestBuilder.java index ac9697e06a5d7..98d19b8e32247 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequestBuilder.java @@ -161,4 +161,12 @@ public ClusterHealthRequestBuilder setLevel(String level) { request.setLevel(level); return this; } + + /** + * Specifies if the local request should ensure that the local node is commissioned + */ + public final ClusterHealthRequestBuilder setEnsureNodeCommissioned(boolean ensureNodeCommissioned) { + request.ensureNodeCommissioned(ensureNodeCommissioned); + return this; + } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java index a0b760d0dce28..a94631aae066f 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/health/TransportClusterHealthAction.java @@ -46,6 +46,8 @@ import org.opensearch.cluster.LocalClusterUpdateTask; import org.opensearch.cluster.NotClusterManagerException; import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.coordination.Coordinator; +import org.opensearch.cluster.decommission.NodeDecommissionedException; import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException; @@ -57,6 +59,7 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.CollectionUtils; +import org.opensearch.discovery.Discovery; import org.opensearch.index.IndexNotFoundException; import org.opensearch.node.NodeClosedException; import org.opensearch.tasks.Task; @@ -77,6 +80,7 @@ public class TransportClusterHealthAction extends TransportClusterManagerNodeRea private static final Logger logger = LogManager.getLogger(TransportClusterHealthAction.class); private final AllocationService allocationService; + private final Discovery discovery; @Inject public TransportClusterHealthAction( @@ -85,7 +89,8 @@ public TransportClusterHealthAction( ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - AllocationService allocationService + AllocationService allocationService, + Discovery discovery ) { super( ClusterHealthAction.NAME, @@ -98,6 +103,7 @@ public TransportClusterHealthAction( indexNameExpressionResolver ); this.allocationService = allocationService; + this.discovery = discovery; } @Override @@ -134,7 +140,12 @@ protected void clusterManagerOperation( final ClusterState unusedState, final ActionListener listener ) { - + if (request.ensureNodeCommissioned() + && discovery instanceof Coordinator + && ((Coordinator) discovery).localNodeCommissioned() == false) { + listener.onFailure(new NodeDecommissionedException("local node is decommissioned")); + return; + } final int waitCount = getWaitCount(request); if (request.waitForEvents() != null) { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index dabf06b89e5b1..4240a3bfed44a 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -1438,8 +1438,7 @@ synchronized void onNodeCommissionStatusChange(boolean localNodeCommissioned) { peerFinder.onNodeCommissionStatusChange(localNodeCommissioned); } - // package-visible for testing - boolean localNodeCommissioned() { + public boolean localNodeCommissioned() { return localNodeCommissioned; } diff --git a/server/src/main/java/org/opensearch/cluster/decommission/NodeDecommissionedException.java b/server/src/main/java/org/opensearch/cluster/decommission/NodeDecommissionedException.java index 847d5a527b017..c91509a0db161 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/NodeDecommissionedException.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/NodeDecommissionedException.java @@ -10,6 +10,7 @@ import org.opensearch.OpenSearchException; import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.rest.RestStatus; import java.io.IOException; @@ -28,4 +29,9 @@ public NodeDecommissionedException(String msg, Object... args) { public NodeDecommissionedException(StreamInput in) throws IOException { super(in); } + + @Override + public RestStatus status() { + return RestStatus.FAILED_DEPENDENCY; + } } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterHealthAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterHealthAction.java index 83fc093e57727..3202329dd39c4 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterHealthAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterHealthAction.java @@ -89,6 +89,9 @@ public static ClusterHealthRequest fromRequest(final RestRequest request) { final ClusterHealthRequest clusterHealthRequest = clusterHealthRequest(Strings.splitStringByCommaToArray(request.param("index"))); clusterHealthRequest.indicesOptions(IndicesOptions.fromRequest(request, clusterHealthRequest.indicesOptions())); clusterHealthRequest.local(request.paramAsBoolean("local", clusterHealthRequest.local())); + clusterHealthRequest.ensureNodeCommissioned( + request.paramAsBoolean("ensure_node_commissioned", clusterHealthRequest.ensureNodeCommissioned()) + ); clusterHealthRequest.clusterManagerNodeTimeout( request.paramAsTime("cluster_manager_timeout", clusterHealthRequest.clusterManagerNodeTimeout()) ); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequestTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequestTests.java index c84279a0782c3..8c1a43b7816b1 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/health/ClusterHealthRequestTests.java @@ -33,6 +33,7 @@ package org.opensearch.action.admin.cluster.health; import org.opensearch.LegacyESVersion; +import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.support.IndicesOptions; import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.common.Priority; @@ -152,6 +153,23 @@ public void testBwcSerialization() throws Exception { } } + public void testValidation() { + ClusterHealthRequest clusterHealthRequest = randomRequest(); + { + clusterHealthRequest.local(false); + clusterHealthRequest.ensureNodeCommissioned(true); + ActionRequestValidationException e = clusterHealthRequest.validate(); + assertNotNull(e); + assertTrue(e.getMessage().contains("not a local request to ensure local node commissioned")); + } + { + clusterHealthRequest.local(true); + clusterHealthRequest.ensureNodeCommissioned(false); + ActionRequestValidationException e = clusterHealthRequest.validate(); + assertNull(e); + } + } + private ClusterHealthRequest randomRequest() { ClusterHealthRequest request = new ClusterHealthRequest(); request.waitForStatus(randomFrom(ClusterHealthStatus.values())); diff --git a/server/src/test/java/org/opensearch/cluster/health/ClusterStateHealthTests.java b/server/src/test/java/org/opensearch/cluster/health/ClusterStateHealthTests.java index 80e76d44b9c41..19458c9f8a4f3 100644 --- a/server/src/test/java/org/opensearch/cluster/health/ClusterStateHealthTests.java +++ b/server/src/test/java/org/opensearch/cluster/health/ClusterStateHealthTests.java @@ -177,7 +177,8 @@ public void testClusterHealthWaitsForClusterStateApplication() throws Interrupte threadPool, new ActionFilters(new HashSet<>()), indexNameExpressionResolver, - new AllocationService(null, new TestGatewayAllocator(), null, null, null) + new AllocationService(null, new TestGatewayAllocator(), null, null, null), + null ); PlainActionFuture listener = new PlainActionFuture<>(); action.execute(new ClusterHealthRequest().waitForGreenStatus(), listener); diff --git a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterHealthActionTests.java b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterHealthActionTests.java index 975a4d8120965..4c60ea810f591 100644 --- a/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterHealthActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/admin/cluster/RestClusterHealthActionTests.java @@ -52,6 +52,10 @@ public void testFromRequest() { Map params = new HashMap<>(); String index = "index"; boolean local = randomBoolean(); + boolean ensureLocalNodeCommissioned = false; + if (local) { + ensureLocalNodeCommissioned = randomBoolean(); + } String clusterManagerTimeout = randomTimeValue(); String timeout = randomTimeValue(); ClusterHealthStatus waitForStatus = randomFrom(ClusterHealthStatus.values()); @@ -63,6 +67,7 @@ public void testFromRequest() { params.put("index", index); params.put("local", String.valueOf(local)); + params.put("ensure_node_commissioned", String.valueOf(ensureLocalNodeCommissioned)); params.put("cluster_manager_timeout", clusterManagerTimeout); params.put("timeout", timeout); params.put("wait_for_status", waitForStatus.name()); @@ -81,6 +86,7 @@ public void testFromRequest() { assertThat(clusterHealthRequest.indices().length, equalTo(1)); assertThat(clusterHealthRequest.indices()[0], equalTo(index)); assertThat(clusterHealthRequest.local(), equalTo(local)); + assertThat(clusterHealthRequest.ensureNodeCommissioned(), equalTo(ensureLocalNodeCommissioned)); assertThat(clusterHealthRequest.clusterManagerNodeTimeout(), equalTo(TimeValue.parseTimeValue(clusterManagerTimeout, "test"))); assertThat(clusterHealthRequest.timeout(), equalTo(TimeValue.parseTimeValue(timeout, "test"))); assertThat(clusterHealthRequest.waitForStatus(), equalTo(waitForStatus));