Skip to content

Commit

Permalink
Awareness attribute decommission backports (opensearch-project#4970)
Browse files Browse the repository at this point in the history
* Add DecommissionService and helper to execute awareness attribute decommissioning opensearch-project#4084
* Add APIs (GET/PUT) to decommission awareness attribute opensearch-project#4261
* Controlling discovery for decommissioned nodes opensearch-project#4590
* Fix decommission status update to non leader nodes opensearch-project#4800
* Remove redundant field from GetDecommissionStateResponse opensearch-project#4751
* Service Layer changes for Recommission API opensearch-project#4320
* Recommission api level support opensearch-project#4604
* Fix bug in AwarenessAttributeDecommissionIT opensearch-project#4822

Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN committed Nov 3, 2022
1 parent 017f76b commit 43561e9
Show file tree
Hide file tree
Showing 67 changed files with 4,418 additions and 11 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Added missing no-jdk distributions ([#4722](https://github.com/opensearch-project/OpenSearch/pull/4722))
- Copy `build.sh` over from opensearch-build ([#4887](https://github.com/opensearch-project/OpenSearch/pull/4887))
- Update GeoGrid base class access modifier to support extensibility ([#4921](https://github.com/opensearch-project/OpenSearch/pull/4921))
- Recommission API changes for service layer ([#4320](https://github.com/opensearch-project/OpenSearch/pull/4320))
- Recommissioning of zone. REST layer support. ([#4624](https://github.com/opensearch-project/OpenSearch/pull/4604))
- Build no-jdk distributions as part of release build ([#4902](https://github.com/opensearch-project/OpenSearch/pull/4902))
- Use getParameterCount instead of getParameterTypes ([#4821](https://github.com/opensearch-project/OpenSearch/pull/4821))
- Added in-flight cancellation of SearchShardTask based on resource consumption ([#4565](https://github.com/opensearch-project/OpenSearch/pull/4565))
Expand Down Expand Up @@ -64,6 +66,9 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- GET api for weighted shard routing([#4275](https://github.com/opensearch-project/OpenSearch/pull/4275/))
- Delete api for weighted shard routing([#4400](https://github.com/opensearch-project/OpenSearch/pull/4400/))
- Fix weighted routing metadata deserialization error on process restart ([#4691](https://github.com/opensearch-project/OpenSearch/pull/4691))
- Add DecommissionService and helper to execute awareness attribute decommissioning ([#4084](https://github.com/opensearch-project/OpenSearch/pull/4084))
- Add APIs (GET/PUT) to decommission awareness attribute ([#4261](https://github.com/opensearch-project/OpenSearch/pull/4261))
- Controlling discovery for decommissioned nodes ([#4590](https://github.com/opensearch-project/OpenSearch/pull/4590))

### Deprecated
### Removed
Expand All @@ -89,9 +94,12 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- [Bug]: Fixed invalid location of JDK dependency for arm64 architecture([#4613](https://github.com/opensearch-project/OpenSearch/pull/4613))
- [Bug]: Alias filter lost after rollover ([#4499](https://github.com/opensearch-project/OpenSearch/pull/4499))
- Fixing Gradle warnings associated with publishPluginZipPublicationToXxx tasks ([#4696](https://github.com/opensearch-project/OpenSearch/pull/4696))
- [BUG]: Remove redundant field from GetDecommissionStateResponse ([#4751](https://github.com/opensearch-project/OpenSearch/pull/4751))
- Fixed randomly failing test ([4774](https://github.com/opensearch-project/OpenSearch/pull/4774))
- Fix recovery path for searchable snapshots ([4813](https://github.com/opensearch-project/OpenSearch/pull/4813))
- Fix a bug on handling an invalid array value for point type field #4900([#4900](https://github.com/opensearch-project/OpenSearch/pull/4900))
- Fix decommission status update to non leader nodes ([4800](https://github.com/opensearch-project/OpenSearch/pull/4800))
- Fix bug in AwarenessAttributeDecommissionIT([4822](https://github.com/opensearch-project/OpenSearch/pull/4822))
- Fix for failing checkExtraction, checkLicense and checkNotice tasks for windows gradle check ([#4941](https://github.com/opensearch-project/OpenSearch/pull/4941))
### Security
- CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,10 @@ public void testApiNamingConventions() throws Exception {
"remote_store.restore",
"cluster.put_weighted_routing",
"cluster.get_weighted_routing",
"cluster.delete_weighted_routing", };
"cluster.delete_weighted_routing",
"cluster.put_decommission_awareness",
"cluster.get_decommission_awareness",
"cluster.delete_decommission_awareness", };
List<String> booleanReturnMethods = Arrays.asList("security.enable_user", "security.disable_user", "security.change_password");
Set<String> deprecatedMethods = new HashSet<>();
deprecatedMethods.add("indices.force_merge");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"cluster.delete_decommission_awareness": {
"documentation": {
"url": "https://opensearch.org/docs/latest/opensearch/rest-api/decommission/",
"description": "Delete any existing decommission."
},
"stability": "experimental",
"url": {
"paths": [
{
"path": "/_cluster/decommission/awareness/",
"methods": [
"DELETE"
]
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"cluster.get_decommission_awareness": {
"documentation": {
"url": "https://opensearch.org/docs/latest/opensearch/rest-api/decommission/",
"description": "Get details and status of decommissioned attribute"
},
"stability": "experimental",
"url": {
"paths": [
{
"path":"/_cluster/decommission/awareness/{awareness_attribute_name}/_status",
"methods":[
"GET"
],
"parts":{
"awareness_attribute_name":{
"type":"string",
"description":"Awareness attribute name"
}
}
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"cluster.put_decommission_awareness": {
"documentation": {
"url": "https://opensearch.org/docs/latest/opensearch/rest-api/decommission/",
"description": "Decommissions an awareness attribute"
},
"stability": "experimental",
"url": {
"paths": [
{
"path": "/_cluster/decommission/awareness/{awareness_attribute_name}/{awareness_attribute_value}",
"methods": [
"PUT"
],
"parts": {
"awareness_attribute_name": {
"type": "string",
"description": "Awareness attribute name"
},
"awareness_attribute_value": {
"type": "string",
"description": "Awareness attribute value"
}
}
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.coordination;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.After;
import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateAction;
import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateRequest;
import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateResponse;
import org.opensearch.action.admin.cluster.decommission.awareness.get.GetDecommissionStateAction;
import org.opensearch.action.admin.cluster.decommission.awareness.get.GetDecommissionStateRequest;
import org.opensearch.action.admin.cluster.decommission.awareness.get.GetDecommissionStateResponse;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionAction;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.decommission.DecommissionAttribute;
import org.opensearch.cluster.decommission.DecommissionStatus;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;

import static org.opensearch.test.NodeRoles.onlyRole;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoTimeout;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class AwarenessAttributeDecommissionIT extends OpenSearchIntegTestCase {
private final Logger logger = LogManager.getLogger(AwarenessAttributeDecommissionIT.class);

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(MockTransportService.TestPlugin.class);
}

@After
public void cleanup() throws Exception {
assertNoTimeout(client().admin().cluster().prepareHealth().get());
}

public void testDecommissionStatusUpdatePublishedToAllNodes() throws ExecutionException, InterruptedException {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

logger.info("--> start 3 cluster manager nodes on zones 'a' & 'b' & 'c'");
List<String> clusterManagerNodes = internalCluster().startNodes(
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "a")
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "b")
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "c")
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))
.build()
);

logger.info("--> start 3 data nodes on zones 'a' & 'b' & 'c'");
List<String> dataNodes = internalCluster().startNodes(
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "a")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "b")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "c")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build()
);

ensureStableCluster(6);

logger.info("--> starting decommissioning nodes in zone {}", 'c');
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "c");
DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute);
DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest).get();
assertTrue(decommissionResponse.isAcknowledged());

// Will wait for all events to complete
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();

// assert that decommission status is successful
GetDecommissionStateResponse response = client().execute(
GetDecommissionStateAction.INSTANCE,
new GetDecommissionStateRequest(decommissionAttribute.attributeName())
).get();
assertEquals(response.getAttributeValue(), decommissionAttribute.attributeValue());
assertEquals(response.getDecommissionStatus(), DecommissionStatus.SUCCESSFUL);

ClusterState clusterState = client(clusterManagerNodes.get(0)).admin().cluster().prepareState().execute().actionGet().getState();
assertEquals(4, clusterState.nodes().getSize());

// assert status on nodes that are part of cluster currently
Iterator<DiscoveryNode> discoveryNodeIterator = clusterState.nodes().getNodes().valuesIt();
while (discoveryNodeIterator.hasNext()) {
// assert no node has decommissioned attribute
DiscoveryNode node = discoveryNodeIterator.next();
assertNotEquals(node.getAttributes().get("zone"), "c");

// assert all the nodes has status as SUCCESSFUL
ClusterService localNodeClusterService = internalCluster().getInstance(ClusterService.class, node.getName());
assertEquals(
localNodeClusterService.state().metadata().decommissionAttributeMetadata().status(),
DecommissionStatus.SUCCESSFUL
);
}

// assert status on decommissioned node
// Here we will verify that until it got kicked out, it received appropriate status updates
// decommissioned nodes hence will have status as IN_PROGRESS as it will be kicked out later after this
// and won't receive status update to SUCCESSFUL
String randomDecommissionedNode = randomFrom(clusterManagerNodes.get(2), dataNodes.get(2));
ClusterService decommissionedNodeClusterService = internalCluster().getInstance(ClusterService.class, randomDecommissionedNode);
assertEquals(
decommissionedNodeClusterService.state().metadata().decommissionAttributeMetadata().status(),
DecommissionStatus.IN_PROGRESS
);

// Will wait for all events to complete
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();

// Recommissioning the zone back to gracefully succeed the test once above tests succeeds
DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(clusterManagerNodes.get(0)).execute(
DeleteDecommissionStateAction.INSTANCE,
new DeleteDecommissionStateRequest()
).get();
assertTrue(deleteDecommissionStateResponse.isAcknowledged());

// will wait for cluster to stabilise with a timeout of 2 min (findPeerInterval for decommissioned nodes)
// as by then all nodes should have joined the cluster
ensureStableCluster(6, TimeValue.timeValueMinutes(2));
}
}
13 changes: 13 additions & 0 deletions server/src/main/java/org/opensearch/OpenSearchException.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import static java.util.Collections.unmodifiableMap;
import static org.opensearch.Version.V_2_1_0;
import static org.opensearch.Version.V_2_3_0;
import static org.opensearch.Version.V_2_4_0;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_UUID_NA_VALUE;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureFieldName;
Expand Down Expand Up @@ -1608,6 +1609,18 @@ private enum OpenSearchExceptionHandle {
org.opensearch.index.shard.PrimaryShardClosedException::new,
162,
V_2_3_0
),
DECOMMISSIONING_FAILED_EXCEPTION(
org.opensearch.cluster.decommission.DecommissioningFailedException.class,
org.opensearch.cluster.decommission.DecommissioningFailedException::new,
163,
V_2_4_0
),
NODE_DECOMMISSIONED_EXCEPTION(
org.opensearch.cluster.decommission.NodeDecommissionedException.class,
org.opensearch.cluster.decommission.NodeDecommissionedException::new,
164,
V_2_4_0
);

final Class<? extends OpenSearchException> exceptionClass;
Expand Down
16 changes: 16 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.TransportClearVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.decommission.awareness.get.GetDecommissionStateAction;
import org.opensearch.action.admin.cluster.decommission.awareness.get.TransportGetDecommissionStateAction;
import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateAction;
import org.opensearch.action.admin.cluster.decommission.awareness.delete.TransportDeleteDecommissionStateAction;
import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionAction;
import org.opensearch.action.admin.cluster.decommission.awareness.put.TransportDecommissionAction;
import org.opensearch.action.admin.cluster.health.ClusterHealthAction;
import org.opensearch.action.admin.cluster.health.TransportClusterHealthAction;
import org.opensearch.action.admin.cluster.node.hotthreads.NodesHotThreadsAction;
Expand Down Expand Up @@ -309,9 +315,11 @@
import org.opensearch.rest.action.admin.cluster.RestClusterStatsAction;
import org.opensearch.rest.action.admin.cluster.RestClusterUpdateSettingsAction;
import org.opensearch.rest.action.admin.cluster.RestCreateSnapshotAction;
import org.opensearch.rest.action.admin.cluster.RestDeleteDecommissionStateAction;
import org.opensearch.rest.action.admin.cluster.RestDeleteRepositoryAction;
import org.opensearch.rest.action.admin.cluster.RestDeleteSnapshotAction;
import org.opensearch.rest.action.admin.cluster.RestDeleteStoredScriptAction;
import org.opensearch.rest.action.admin.cluster.RestGetDecommissionStateAction;
import org.opensearch.rest.action.admin.cluster.RestGetRepositoriesAction;
import org.opensearch.rest.action.admin.cluster.RestGetScriptContextAction;
import org.opensearch.rest.action.admin.cluster.RestGetScriptLanguageAction;
Expand All @@ -324,6 +332,7 @@
import org.opensearch.rest.action.admin.cluster.RestNodesStatsAction;
import org.opensearch.rest.action.admin.cluster.RestNodesUsageAction;
import org.opensearch.rest.action.admin.cluster.RestPendingClusterTasksAction;
import org.opensearch.rest.action.admin.cluster.RestDecommissionAction;
import org.opensearch.rest.action.admin.cluster.RestPutRepositoryAction;
import org.opensearch.rest.action.admin.cluster.RestPutStoredScriptAction;
import org.opensearch.rest.action.admin.cluster.RestReloadSecureSettingsAction;
Expand Down Expand Up @@ -694,6 +703,10 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(PitSegmentsAction.INSTANCE, TransportPitSegmentsAction.class);
actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class);

// Decommission actions
actions.register(DecommissionAction.INSTANCE, TransportDecommissionAction.class);
actions.register(GetDecommissionStateAction.INSTANCE, TransportGetDecommissionStateAction.class);
actions.register(DeleteDecommissionStateAction.INSTANCE, TransportDeleteDecommissionStateAction.class);
return unmodifiableMap(actions.getRegistry());
}

Expand Down Expand Up @@ -875,6 +888,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestDeletePitAction());
registerHandler.accept(new RestGetAllPitsAction(nodesInCluster));
registerHandler.accept(new RestPitSegmentsAction(nodesInCluster));
registerHandler.accept(new RestDeleteDecommissionStateAction());

for (ActionPlugin plugin : actionPlugins) {
for (RestHandler handler : plugin.getRestHandlers(
Expand All @@ -890,6 +904,8 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
}
}
registerHandler.accept(new RestCatAction(catActions));
registerHandler.accept(new RestDecommissionAction());
registerHandler.accept(new RestGetDecommissionStateAction());

// Remote Store APIs
if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) {
Expand Down
Loading

0 comments on commit 43561e9

Please sign in to comment.