Skip to content

Commit

Permalink
[Backport 2.x] Validate attributes of routing nodes for Routing Weigh…
Browse files Browse the repository at this point in the history
…ts API (#5691)

* Validate attributes of routing nodes for Routing Weights API (#5607)

* Changes following things in Routing Weights API and Decommission Action -
        a. Consider only routing node's attribute for validation instead of all nodes
        b. Ensure no more than 50% node has weight zero
        c. Removal of decommission action's direct dependence on force zone awareness

Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN authored Jan 4, 2023
1 parent a5afdd4 commit 53a4cae
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -781,15 +781,15 @@ public void testDecommissionFailedWhenAttributeNotWeighedAway() throws Exception
});
}

public void testDecommissionFailedWithOnlyOneAttributeValue() throws Exception {
public void testDecommissionFailedWithOnlyOneAttributeValueForLeader() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a")
.put("cluster.routing.allocation.awareness.force.zone.values", "b") // force zone values is only set for zones of routing nodes
.build();
// Start 3 cluster manager eligible nodes
// Start 3 cluster manager eligible nodes in zone a
internalCluster().startClusterManagerOnlyNodes(3, Settings.builder().put(commonSettings).put("node.attr.zone", "a").build());
// start 3 data nodes
internalCluster().startDataOnlyNodes(3, Settings.builder().put(commonSettings).put("node.attr.zone", "a").build());
// Start 3 data nodes in zone b
internalCluster().startDataOnlyNodes(3, Settings.builder().put(commonSettings).put("node.attr.zone", "b").build());
ensureStableCluster(6);
ClusterHealthResponse health = client().admin()
.cluster()
Expand All @@ -802,7 +802,7 @@ public void testDecommissionFailedWithOnlyOneAttributeValue() throws Exception {
assertFalse(health.isTimedOut());

logger.info("--> setting shard routing weights");
Map<String, Double> weights = Map.of("a", 0.0);
Map<String, Double> weights = Map.of("b", 1.0); // weights are expected to be set only for routing nodes
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin()
Expand Down Expand Up @@ -868,6 +868,92 @@ public void testDecommissionFailedWithOnlyOneAttributeValue() throws Exception {
ensureStableCluster(6, TimeValue.timeValueMinutes(2));
}

public void testDecommissionAcknowledgedIfWeightsNotSetForNonRoutingNode() throws ExecutionException, InterruptedException {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

logger.info("--> start 3 cluster manager nodes on zones 'd' & 'e' & 'f'");
List<String> clusterManagerNodes = internalCluster().startNodes(
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "d")
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "e")
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "f")
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))
.build()
);

logger.info("--> start 3 data nodes on zones 'a' & 'b' & 'c'");
List<String> dataNodes = internalCluster().startNodes(
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "a")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "b")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "c")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build()
);

ensureStableCluster(6);

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

logger.info("--> starting decommissioning nodes in zone {}", 'd');
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "d");
// Set the timeout to 0 to do immediate Decommission
DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute);
decommissionRequest.setNoDelay(true);
DecommissionResponse decommissionResponse = client(dataNodes.get(0)).execute(DecommissionAction.INSTANCE, decommissionRequest)
.get();
assertTrue(decommissionResponse.isAcknowledged());

client(dataNodes.get(0)).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();

ClusterState clusterState = client(dataNodes.get(0)).admin().cluster().prepareState().execute().actionGet().getState();

// assert that number of nodes should be 5 ( 2 cluster manager nodes + 3 data nodes )
assertEquals(clusterState.nodes().getNodes().size(), 5);
assertEquals(clusterState.nodes().getDataNodes().size(), 3);
assertEquals(clusterState.nodes().getClusterManagerNodes().size(), 2);

// Recommissioning the zone back to gracefully succeed the test once above tests succeeds
DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(dataNodes.get(0)).execute(
DeleteDecommissionStateAction.INSTANCE,
new DeleteDecommissionStateRequest()
).get();
assertTrue(deleteDecommissionStateResponse.isAcknowledged());

// will wait for cluster to stabilise with a timeout of 2 min as by then all nodes should have joined the cluster
ensureStableCluster(6, TimeValue.timeValueMinutes(2));
}

private static class WaitForFailedDecommissionState implements ClusterStateObserver.Listener {

final CountDownLatch doneLatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.io.IOException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

import static org.opensearch.action.ValidateActions.addValidationError;
Expand Down Expand Up @@ -126,17 +127,36 @@ public ActionRequestValidationException validate() {
if (weightedRouting.weights() == null || weightedRouting.weights().isEmpty()) {
validationException = addValidationError("Weights are missing", validationException);
}
int countValueWithZeroWeights = 0;
double weight;
try {
for (Object value : weightedRouting.weights().values()) {
if (value == null) {
validationException = addValidationError(("Weight is null"), validationException);
} else {
Double.parseDouble(value.toString());
weight = Double.parseDouble(value.toString());
countValueWithZeroWeights = (weight == 0) ? countValueWithZeroWeights + 1 : countValueWithZeroWeights;
}
}
} catch (NumberFormatException e) {
validationException = addValidationError(("Weight is not a number"), validationException);
}
// Returning validation exception here itself if it is not null, so we can have a descriptive message for the count check
if (validationException != null) {
return validationException;
}
if (countValueWithZeroWeights > weightedRouting.weights().size() / 2) {
validationException = addValidationError(
(String.format(
Locale.ROOT,
"There are too many attribute values [%s] given zero weight [%d]. Maximum expected number of routing weights having zero weight is [%d]",
weightedRouting.weights().toString(),
countValueWithZeroWeights,
weightedRouting.weights().size() / 2
)),
null
);
}
return validationException;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,10 +415,12 @@ private static void validateAwarenessAttribute(
msg = "invalid awareness attribute requested for decommissioning";
} else if (forcedAwarenessAttributes.containsKey(decommissionAttribute.attributeName()) == false) {
msg = "forced awareness attribute [" + forcedAwarenessAttributes.toString() + "] doesn't have the decommissioning attribute";
} else if (forcedAwarenessAttributes.get(decommissionAttribute.attributeName())
.contains(decommissionAttribute.attributeValue()) == false) {
msg = "invalid awareness attribute value requested for decommissioning. Set forced awareness values before to decommission";
}
}
// we don't need to check for attributes presence in forced awareness attribute because, weights API ensures that weights are set
// for all discovered routing attributes and forced attributes.
// So, if the weight is not present for the attribute it could mean its a non routing node (eg. cluster manager)
// And in that case, we are ok to proceed with the decommission. A routing node's attribute absence in forced awareness attribute is
// a problem elsewhere

if (msg != null) {
throw new DecommissioningFailedException(decommissionAttribute, msg);
Expand All @@ -440,8 +442,11 @@ private static void ensureToBeDecommissionedAttributeWeighedAway(ClusterState st
"no weights are specified to attribute [" + decommissionAttribute.attributeName() + "]"
);
}
// in case the weight is not set for the attribute value, then we know that attribute values was not part of discovered routing node
// attribute or forced awareness attribute and in that case, we are ok if the attribute's value weight is not set. But if it's set,
// its weight has to be zero
Double attributeValueWeight = weightedRouting.weights().get(decommissionAttribute.attributeValue());
if (attributeValueWeight == null || attributeValueWeight.equals(0.0) == false) {
if (attributeValueWeight != null && attributeValueWeight.equals(0.0) == false) {
throw new DecommissioningFailedException(
decommissionAttribute,
"weight for decommissioned attribute is expected to be [0.0] but found [" + attributeValueWeight + "]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.cluster.routing;

import com.carrotsearch.hppc.ObjectIntHashMap;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand Down Expand Up @@ -40,6 +42,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.action.ValidateActions.addValidationError;
import static org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING;
Expand Down Expand Up @@ -189,26 +192,47 @@ public void verifyAwarenessAttribute(String attributeName) {

private void ensureWeightsSetForAllDiscoveredAndForcedAwarenessValues(ClusterState state, ClusterPutWeightedRoutingRequest request) {
String attributeName = request.getWeightedRouting().attributeName();
// build attr_value -> nodes map
ObjectIntHashMap<String> nodesPerAttribute = state.getRoutingNodes().nodesPerAttributesCounts(attributeName);
Set<String> discoveredAwarenessValues = new HashSet<>();
state.nodes().forEach(node -> {
if (node.getAttributes().containsKey(attributeName)) {
discoveredAwarenessValues.add(node.getAttributes().get(attributeName));
}
});
for (ObjectCursor<String> stringObjectCursor : nodesPerAttribute.keys()) {
if (stringObjectCursor.value != null) discoveredAwarenessValues.add(stringObjectCursor.value);
}
Set<String> allAwarenessValues;
if (forcedAwarenessAttributes.get(attributeName) == null) {
allAwarenessValues = new HashSet<>();
} else {
allAwarenessValues = new HashSet<>(forcedAwarenessAttributes.get(attributeName));
}
allAwarenessValues.addAll(discoveredAwarenessValues);
AtomicInteger countWithZeroWeight = new AtomicInteger();
allAwarenessValues.forEach(awarenessValue -> {
if (request.getWeightedRouting().weights().containsKey(awarenessValue) == false) {
throw new UnsupportedWeightedRoutingStateException(
"weight for [" + awarenessValue + "] is not set and it is part of forced awareness value or a node has this attribute."
"weight for ["
+ awarenessValue
+ "] is not set and it is part of forced awareness value or a routing node has this attribute."
);
}
if (request.getWeightedRouting().weights().get(awarenessValue) == 0) {
countWithZeroWeight.addAndGet(1);
}
});
// We have validations in place to check that not more than half of the values weights are set to 0 in the request object
// Adding this check again here on allAwarenessValues such that in no case we land up in a situation where more than half of
// discovered awareness values has weight zero
if (countWithZeroWeight.get() > allAwarenessValues.size() / 2) {
throw addValidationError(
(String.format(
Locale.ROOT,
"There are too many discovered attribute values [%s] given zero weight [%d]. Maximum expected number of routing weights having zero weight is [%d]",
request.getWeightedRouting().weights().toString(),
countWithZeroWeight.get(),
allAwarenessValues.size() / 2
)),
null
);
}
}

private void ensureDecommissionedAttributeHasZeroWeight(ClusterState state, ClusterPutWeightedRoutingRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,14 @@ public void testValidate_AttributeMissing() {
assertTrue(actionRequestValidationException.getMessage().contains("Attribute name is missing"));
}

public void testValidate_MoreThanHalfWithZeroWeight() {
String reqString = "{\"us-east-1c\" : \"0\", \"us-east-1b\":\"0\",\"us-east-1a\":\"1\"}";
ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone");
request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON);
ActionRequestValidationException actionRequestValidationException = request.validate();
assertNotNull(actionRequestValidationException);
assertTrue(
actionRequestValidationException.getMessage().contains("Maximum expected number of routing weights having zero weight is [1]")
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,33 +145,6 @@ public void onFailure(Exception e) {
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

@SuppressWarnings("unchecked")
public void testDecommissioningNotStartedForInvalidAttributeValue() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "rack-a");
ActionListener<DecommissionResponse> listener = new ActionListener<DecommissionResponse>() {
@Override
public void onResponse(DecommissionResponse decommissionResponse) {
fail("on response shouldn't have been called");
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof DecommissioningFailedException);
assertThat(
e.getMessage(),
Matchers.endsWith(
"invalid awareness attribute value requested for decommissioning. "
+ "Set forced awareness values before to decommission"
)
);
countDownLatch.countDown();
}
};
decommissionService.startDecommissionAction(new DecommissionRequest(decommissionAttribute), listener);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

public void testDecommissionNotStartedWithoutWeighingAwayAttribute_1() throws InterruptedException {
Map<String, Double> weights = Map.of("zone_1", 1.0, "zone_2", 1.0, "zone_3", 0.0);
setWeightedRoutingWeights(weights);
Expand Down
Loading

0 comments on commit 53a4cae

Please sign in to comment.