Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate attributes of routing nodes for Routing Weights API #5607

Merged
merged 8 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,92 @@ public void testDecommissionFailedWithOnlyOneAttributeValue() throws Exception {
ensureStableCluster(6, TimeValue.timeValueMinutes(2));
}

public void testDecommissionAcknowledgedIfWeightsNotSetForNonRoutingNode() throws ExecutionException, InterruptedException {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

logger.info("--> start 3 cluster manager nodes on zones 'd' & 'e' & 'f'");
List<String> clusterManagerNodes = internalCluster().startNodes(
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "d")
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "e")
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "f")
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))
.build()
);

logger.info("--> start 3 data nodes on zones 'a' & 'b' & 'c'");
List<String> dataNodes = internalCluster().startNodes(
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "a")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "b")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "c")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build()
);

ensureStableCluster(6);

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

logger.info("--> starting decommissioning nodes in zone {}", 'd');
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "d");
// Set the timeout to 0 to do immediate Decommission
DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute);
decommissionRequest.setNoDelay(true);
DecommissionResponse decommissionResponse = client(dataNodes.get(0)).execute(DecommissionAction.INSTANCE, decommissionRequest)
.get();
assertTrue(decommissionResponse.isAcknowledged());

client(dataNodes.get(0)).admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();

ClusterState clusterState = client(dataNodes.get(0)).admin().cluster().prepareState().execute().actionGet().getState();

// assert that number of nodes should be 5 ( 2 cluster manager nodes + 3 data nodes )
assertEquals(clusterState.nodes().getNodes().size(), 5);
assertEquals(clusterState.nodes().getDataNodes().size(), 3);
assertEquals(clusterState.nodes().getClusterManagerNodes().size(), 2);

// Recommissioning the zone back to gracefully succeed the test once above tests succeeds
DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(dataNodes.get(0)).execute(
DeleteDecommissionStateAction.INSTANCE,
new DeleteDecommissionStateRequest()
).get();
assertTrue(deleteDecommissionStateResponse.isAcknowledged());

// will wait for cluster to stabilise with a timeout of 2 min as by then all nodes should have joined the cluster
ensureStableCluster(6, TimeValue.timeValueMinutes(2));
}

private static class WaitForFailedDecommissionState implements ClusterStateObserver.Listener {

final CountDownLatch doneLatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.io.IOException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

import static org.opensearch.action.ValidateActions.addValidationError;
Expand Down Expand Up @@ -126,17 +127,26 @@ public ActionRequestValidationException validate() {
if (weightedRouting.weights() == null || weightedRouting.weights().isEmpty()) {
validationException = addValidationError("Weights are missing", validationException);
}
int countValueWithZeroWeights = 0;
double weight;
try {
for (Object value : weightedRouting.weights().values()) {
if (value == null) {
validationException = addValidationError(("Weight is null"), validationException);
} else {
Double.parseDouble(value.toString());
weight = Double.parseDouble(value.toString());
countValueWithZeroWeights = (weight == 0) ? countValueWithZeroWeights + 1 : countValueWithZeroWeights;
}
}
} catch (NumberFormatException e) {
validationException = addValidationError(("Weight is not a number"), validationException);
}
if (countValueWithZeroWeights > weightedRouting.weights().size() / 2) {
validationException = addValidationError(
(String.format(Locale.ROOT, "More than half [%d] value has weight set as 0", countValueWithZeroWeights)),
validationException
);
}
imRishN marked this conversation as resolved.
Show resolved Hide resolved
return validationException;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,10 +415,12 @@ private static void validateAwarenessAttribute(
msg = "invalid awareness attribute requested for decommissioning";
} else if (forcedAwarenessAttributes.containsKey(decommissionAttribute.attributeName()) == false) {
msg = "forced awareness attribute [" + forcedAwarenessAttributes.toString() + "] doesn't have the decommissioning attribute";
} else if (forcedAwarenessAttributes.get(decommissionAttribute.attributeName())
.contains(decommissionAttribute.attributeValue()) == false) {
msg = "invalid awareness attribute value requested for decommissioning. Set forced awareness values before to decommission";
}
}
// we don't need to check for attributes presence in forced awareness attribute because, weights API ensures that weights are set
// for all discovered routing attributes and forced attributes.
// So, if the weight is not present for the attribute it could mean its a non routing node (eg. cluster manager)
// And in that case, we are ok to proceed with the decommission. A routing node's attribute absence in forced awareness attribute is
// a problem elsewhere

if (msg != null) {
throw new DecommissioningFailedException(decommissionAttribute, msg);
Expand All @@ -440,8 +442,11 @@ private static void ensureToBeDecommissionedAttributeWeighedAway(ClusterState st
"no weights are specified to attribute [" + decommissionAttribute.attributeName() + "]"
);
}
// in case the weight is not set for the attribute value, then we know that attribute values was not part of discovered routing node
// attribute or forced awareness attribute and in that case, we are ok if the attribute's value weight is not set. But if it's set,
// its weight has to be zero
Double attributeValueWeight = weightedRouting.weights().get(decommissionAttribute.attributeValue());
if (attributeValueWeight == null || attributeValueWeight.equals(0.0) == false) {
if (attributeValueWeight != null && attributeValueWeight.equals(0.0) == false) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an assert that if weight for an attribute value is not set the it is NOT a routing node. Can we add similar checks at all places where we are modifying weights and not weights corresponding to an attribute is set?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its tough to say which attributes weight must be set. Rather we can assert that all routing nodes weights are set. Now, weights gets updated only at central place in WeightedRoutingService, where I have added the method ensureWeightsSetForAllDiscoveredAndForcedAwarenessValues which does the same

throw new DecommissioningFailedException(
decommissionAttribute,
"weight for decommissioned attribute is expected to be [0.0] but found [" + attributeValueWeight + "]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.cluster.routing;

import com.carrotsearch.hppc.ObjectIntHashMap;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand Down Expand Up @@ -40,6 +42,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.action.ValidateActions.addValidationError;
import static org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING;
Expand Down Expand Up @@ -189,26 +192,41 @@ public void verifyAwarenessAttribute(String attributeName) {

private void ensureWeightsSetForAllDiscoveredAndForcedAwarenessValues(ClusterState state, ClusterPutWeightedRoutingRequest request) {
imRishN marked this conversation as resolved.
Show resolved Hide resolved
String attributeName = request.getWeightedRouting().attributeName();
// build attr_value -> nodes map
ObjectIntHashMap<String> nodesPerAttribute = state.getRoutingNodes().nodesPerAttributesCounts(attributeName);
Set<String> discoveredAwarenessValues = new HashSet<>();
state.nodes().forEach(node -> {
if (node.getAttributes().containsKey(attributeName)) {
discoveredAwarenessValues.add(node.getAttributes().get(attributeName));
}
});
for (ObjectCursor<String> stringObjectCursor : nodesPerAttribute.keys()) {
if (stringObjectCursor.value != null) discoveredAwarenessValues.add(stringObjectCursor.value);
}
Set<String> allAwarenessValues;
if (forcedAwarenessAttributes.get(attributeName) == null) {
allAwarenessValues = new HashSet<>();
} else {
allAwarenessValues = new HashSet<>(forcedAwarenessAttributes.get(attributeName));
}
allAwarenessValues.addAll(discoveredAwarenessValues);
AtomicInteger countWithZeroWeight = new AtomicInteger();
allAwarenessValues.forEach(awarenessValue -> {
if (request.getWeightedRouting().weights().containsKey(awarenessValue) == false) {
throw new UnsupportedWeightedRoutingStateException(
"weight for [" + awarenessValue + "] is not set and it is part of forced awareness value or a node has this attribute."
"weight for ["
+ awarenessValue
+ "] is not set and it is part of forced awareness value or a routing node has this attribute."
);
}
if (request.getWeightedRouting().weights().get(awarenessValue) == 0) {
countWithZeroWeight.addAndGet(1);
}
});
// We have validations in place to check that not more than half of the values weights are set to 0 in the request object
// Adding this check again here on allAwarenessValues such that in no case we land up in a situation where more than half of
// discovered awareness values has weight zero
if (countWithZeroWeight.get() > allAwarenessValues.size() / 2) {
throw addValidationError(
(String.format(Locale.ROOT, "More than half [%d] value has weight set as 0", countWithZeroWeight.get())),
imRishN marked this conversation as resolved.
Show resolved Hide resolved
null
);
}
}

private void ensureDecommissionedAttributeHasZeroWeight(ClusterState state, ClusterPutWeightedRoutingRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,12 @@ public void testValidate_AttributeMissing() {
assertTrue(actionRequestValidationException.getMessage().contains("Attribute name is missing"));
}

public void testValidate_MoreThanHalfWithZeroWeight() {
String reqString = "{\"us-east-1c\" : \"0\", \"us-east-1b\":\"0\",\"us-east-1a\":\"1\"}";
ClusterPutWeightedRoutingRequest request = new ClusterPutWeightedRoutingRequest("zone");
request.setWeightedRouting(new BytesArray(reqString), XContentType.JSON);
ActionRequestValidationException actionRequestValidationException = request.validate();
assertNotNull(actionRequestValidationException);
assertTrue(actionRequestValidationException.getMessage().contains("More than half [2] value has weight set as 0"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,33 +145,6 @@ public void onFailure(Exception e) {
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

@SuppressWarnings("unchecked")
public void testDecommissioningNotStartedForInvalidAttributeValue() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "rack-a");
ActionListener<DecommissionResponse> listener = new ActionListener<DecommissionResponse>() {
@Override
public void onResponse(DecommissionResponse decommissionResponse) {
fail("on response shouldn't have been called");
}

@Override
public void onFailure(Exception e) {
assertTrue(e instanceof DecommissioningFailedException);
assertThat(
e.getMessage(),
Matchers.endsWith(
"invalid awareness attribute value requested for decommissioning. "
+ "Set forced awareness values before to decommission"
)
);
countDownLatch.countDown();
}
};
decommissionService.startDecommissionAction(new DecommissionRequest(decommissionAttribute), listener);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
}

public void testDecommissionNotStartedWithoutWeighingAwayAttribute_1() throws InterruptedException {
Map<String, Double> weights = Map.of("zone_1", 1.0, "zone_2", 1.0, "zone_3", 0.0);
setWeightedRoutingWeights(weights);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void testRegisterWeightedRoutingMetadataWithChangedWeights() throws Inter
client,
ClusterAddWeightedRoutingAction.INSTANCE
);
WeightedRouting updatedWeightedRouting = new WeightedRouting("zone", Map.of("zone_A", 1.0, "zone_B", 0.0, "zone_C", 0.0));
WeightedRouting updatedWeightedRouting = new WeightedRouting("zone", Map.of("zone_A", 1.0, "zone_B", 1.0, "zone_C", 0.0));
request.setWeightedRouting(updatedWeightedRouting);
final CountDownLatch countDownLatch = new CountDownLatch(1);
ActionListener<ClusterStateUpdateResponse> listener = new ActionListener<ClusterStateUpdateResponse>() {
Expand Down Expand Up @@ -323,10 +323,39 @@ public void onFailure(Exception e) {
MatcherAssert.assertThat(exceptionReference.get(), instanceOf(UnsupportedWeightedRoutingStateException.class));
MatcherAssert.assertThat(
exceptionReference.get().getMessage(),
containsString("weight for [zone_B] is not set and it is part of forced awareness value or a node has this attribute.")
containsString("weight for [zone_B] is not set and it is part of forced awareness value or a routing node has this attribute.")
);
}

public void testAddWeightedRoutingFailsWhenWeightsForMoreThanHalfIsZero() throws InterruptedException {
ClusterPutWeightedRoutingRequestBuilder request = new ClusterPutWeightedRoutingRequestBuilder(
client,
ClusterAddWeightedRoutingAction.INSTANCE
);
Map<String, Double> weights = Map.of("zone_A", 0.0, "zone_B", 0.0, "zone_C", 1.0, "zone_D", 1.0, "zone_E", 1.0, "zone_F", 1.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);
request.setWeightedRouting(weightedRouting);
final CountDownLatch countDownLatch = new CountDownLatch(1);
final AtomicReference<Exception> exceptionReference = new AtomicReference<>();
ActionListener<ClusterStateUpdateResponse> listener = new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) {
countDownLatch.countDown();
}

@Override
public void onFailure(Exception e) {
exceptionReference.set(e);
countDownLatch.countDown();
}
};
weightedRoutingService.registerWeightedRoutingMetadata(request.request(), listener);
assertTrue(countDownLatch.await(30, TimeUnit.SECONDS));
MatcherAssert.assertThat("Expected onFailure to be called", exceptionReference.get(), notNullValue());
MatcherAssert.assertThat(exceptionReference.get(), instanceOf(ActionRequestValidationException.class));
MatcherAssert.assertThat(exceptionReference.get().getMessage(), containsString("More than half [2] value has weight set as 0"));
}

public void testAddWeightedRoutingFailsWhenDecommissionOngoing() throws InterruptedException {
Map<String, Double> weights = Map.of("zone_A", 1.0, "zone_B", 1.0, "zone_C", 1.0);
DecommissionStatus status = randomFrom(DecommissionStatus.INIT, DecommissionStatus.IN_PROGRESS, DecommissionStatus.SUCCESSFUL);
Expand Down