Skip to content

Commit

Permalink
Fix decommission status update to non leader nodes (opensearch-projec…
Browse files Browse the repository at this point in the history
…t#4800)

* Fix decommission status update to non leader nodes

Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN committed Oct 28, 2022
1 parent fb0e4d4 commit 91123ba
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Fixed randomly failing test ([4774](https://github.com/opensearch-project/OpenSearch/pull/4774))
- Fix recovery path for searchable snapshots ([4813](https://github.com/opensearch-project/OpenSearch/pull/4813))
- Fix a bug on handling an invalid array value for point type field #4900([#4900](https://github.com/opensearch-project/OpenSearch/pull/4900))
- Fix decommission status update to non leader nodes ([4800](https://github.com/opensearch-project/OpenSearch/pull/4800))

### Security
- CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.coordination;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.After;
import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateAction;
import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateRequest;
import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateResponse;
import org.opensearch.action.admin.cluster.decommission.awareness.get.GetDecommissionStateAction;
import org.opensearch.action.admin.cluster.decommission.awareness.get.GetDecommissionStateRequest;
import org.opensearch.action.admin.cluster.decommission.awareness.get.GetDecommissionStateResponse;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionAction;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.decommission.DecommissionAttribute;
import org.opensearch.cluster.decommission.DecommissionStatus;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.opensearch.test.NodeRoles.onlyRole;
import static org.opensearch.test.OpenSearchIntegTestCase.client;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoTimeout;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class AwarenessAttributeDecommissionIT extends OpenSearchIntegTestCase {
private final Logger logger = LogManager.getLogger(AwarenessAttributeDecommissionIT.class);

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(MockTransportService.TestPlugin.class);
}

@After
public void cleanup() throws Exception {
assertNoTimeout(client().admin().cluster().prepareHealth().get());
}

public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionException, InterruptedException {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

logger.info("--> start 3 cluster manager nodes on zones 'a' & 'b' & 'c'");
List<String> clusterManagerNodes = internalCluster().startNodes(
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "a")
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "b")
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "c")
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))
.build()
);

logger.info("--> start 3 data nodes on zones 'a' & 'b' & 'c'");
List<String> dataNodes = internalCluster().startNodes(
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "a")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "b")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "c")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build()
);

ensureStableCluster(6);

logger.info("--> starting decommissioning nodes in zone {}", 'c');
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c");
DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute);
DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest).get();
assertTrue(decommissionResponse.isAcknowledged());

// Will wait for all events to complete
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();

// assert that decommission status is successful
GetDecommissionStateResponse response = client().execute(GetDecommissionStateAction.INSTANCE, new GetDecommissionStateRequest())
.get();
assertEquals(response.getDecommissionedAttribute(), decommissionAttribute);
assertEquals(response.getDecommissionStatus(), DecommissionStatus.SUCCESSFUL);

ClusterState clusterState = client(clusterManagerNodes.get(0)).admin().cluster().prepareState().execute().actionGet().getState();
assertEquals(4, clusterState.nodes().getSize());

// assert status on nodes that are part of cluster currently
Iterator<DiscoveryNode> discoveryNodeIterator = clusterState.nodes().getNodes().valuesIt();
while (discoveryNodeIterator.hasNext()) {
// assert no node has decommissioned attribute
DiscoveryNode node = discoveryNodeIterator.next();
assertNotEquals(node.getAttributes().get("zone"), "c");

// assert all the nodes has status as SUCCESSFUL
ClusterService localNodeClusterService = internalCluster().getInstance(ClusterService.class, node.getName());
assertEquals(
localNodeClusterService.state().metadata().decommissionAttributeMetadata().status(),
DecommissionStatus.SUCCESSFUL
);
}

// assert status on decommissioned node
// Here we will verify that until it got kicked out, it received appropriate status updates
// decommissioned nodes hence will have status as IN_PROGRESS as it will be kicked out later after this
// and won't receive status update to SUCCESSFUL
String randomDecommissionedNode = randomFrom(clusterManagerNodes.get(2), dataNodes.get(2));
ClusterService decommissionedNodeClusterService = internalCluster().getInstance(ClusterService.class, randomDecommissionedNode);
assertEquals(
decommissionedNodeClusterService.state().metadata().decommissionAttributeMetadata().status(),
DecommissionStatus.IN_PROGRESS
);

// Will wait for all events to complete
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();

// Recommissioning the zone back to gracefully succeed the test once above tests succeeds
DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(clusterManagerNodes.get(0)).execute(
DeleteDecommissionStateAction.INSTANCE,
new DeleteDecommissionStateRequest()
).get();
assertTrue(deleteDecommissionStateResponse.isAcknowledged());

// will wait for cluster to stabilise with a timeout of 2 min (findPeerInterval for decommissioned nodes)
// as by then all nodes should have joined the cluster
ensureStableCluster(6, TimeValue.timeValueMinutes(2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,35 +79,29 @@ public DecommissionStatus status() {
/**
* Returns instance of the metadata with updated status
* @param newStatus status to be updated with
* @return instance with valid status
*/
// synchronized is strictly speaking not needed (this is called by a single thread), but just to be safe
public synchronized DecommissionAttributeMetadata setUpdatedStatus(DecommissionStatus newStatus) {
// if the current status is the expected status already, we return the same instance
if (newStatus.equals(status)) {
return this;
public synchronized void validateNewStatus(DecommissionStatus newStatus) {
// if the current status is the expected status already or new status is FAILED, we let the check pass
if (newStatus.equals(status) || newStatus.equals(DecommissionStatus.FAILED)) {
return;
}
// We don't expect that INIT will be new status, as it is registered only when starting the decommission action
switch (newStatus) {
case IN_PROGRESS:
validateAndSetStatus(DecommissionStatus.INIT, newStatus);
validateStatus(DecommissionStatus.INIT, newStatus);
break;
case SUCCESSFUL:
validateAndSetStatus(DecommissionStatus.IN_PROGRESS, newStatus);
break;
case FAILED:
// we don't need to validate here and directly update status to FAILED
this.status = newStatus;
validateStatus(DecommissionStatus.IN_PROGRESS, newStatus);
break;
default:
throw new IllegalArgumentException(
"illegal decommission status [" + newStatus.status() + "] requested for updating metadata"
);
}
return this;
}

private void validateAndSetStatus(DecommissionStatus expected, DecommissionStatus next) {
private void validateStatus(DecommissionStatus expected, DecommissionStatus next) {
if (status.equals(expected) == false) {
assert false : "can't move decommission status to ["
+ next
Expand All @@ -120,7 +114,6 @@ private void validateAndSetStatus(DecommissionStatus expected, DecommissionStatu
"can't move decommission status to [" + next + "]. current status: [" + status + "] (expected [" + expected + "])"
);
}
status = next;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,12 @@ public ClusterState execute(ClusterState currentState) {
decommissionAttributeMetadata.status(),
decommissionStatus
);
// setUpdatedStatus can throw IllegalStateException if the sequence of update is not valid
decommissionAttributeMetadata.setUpdatedStatus(decommissionStatus);
// validateNewStatus can throw IllegalStateException if the sequence of update is not valid
decommissionAttributeMetadata.validateNewStatus(decommissionStatus);
decommissionAttributeMetadata = new DecommissionAttributeMetadata(
decommissionAttributeMetadata.decommissionAttribute(),
decommissionStatus
);
return ClusterState.builder(currentState)
.metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata))
.build();
Expand Down

0 comments on commit 91123ba

Please sign in to comment.