Skip to content

Commit

Permalink
Gracefully handle concurrent zone decommission action (#5542)
Browse files Browse the repository at this point in the history
* Control concurrency and handle retries during decommissioning of awareness attributes

Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN authored Jan 10, 2023
1 parent b71caae commit ac3351c
Show file tree
Hide file tree
Showing 17 changed files with 338 additions and 105 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Changed http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773))
- Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https://github.com/opensearch-project/OpenSearch/pull/5283))
- Pre conditions check before updating weighted routing metadata ([#4955](https://github.com/opensearch-project/OpenSearch/pull/4955))
- Gracefully handle concurrent zone decommission action ([#5542](https://github.com/opensearch-project/OpenSearch/pull/5542))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.opensearch.test.MockLogAppender;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.RemoteTransportException;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportService;
Expand All @@ -59,6 +61,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

import static org.opensearch.test.NodeRoles.onlyRole;
Expand Down Expand Up @@ -961,6 +964,114 @@ public void testDecommissionAcknowledgedIfWeightsNotSetForNonRoutingNode() throw
ensureStableCluster(6, TimeValue.timeValueMinutes(2));
}

public void testConcurrentDecommissionAction() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

logger.info("--> start 3 cluster manager nodes on zones 'a' & 'b' & 'c'");
internalCluster().startNodes(
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "a")
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "b")
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "c")
.put(onlyRole(commonSettings, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE))
.build()
);
logger.info("--> start 3 data nodes on zones 'a' & 'b' & 'c'");
internalCluster().startNodes(
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "a")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "b")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build(),
Settings.builder()
.put(commonSettings)
.put("node.attr.zone", "c")
.put(onlyRole(commonSettings, DiscoveryNodeRole.DATA_ROLE))
.build()
);

ensureStableCluster(6);
ClusterHealthResponse health = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes(Integer.toString(6))
.execute()
.actionGet();
assertFalse(health.isTimedOut());

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 0.0, "b", 1.0, "c", 1.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse weightedRoutingResponse = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertTrue(weightedRoutingResponse.isAcknowledged());

AtomicInteger numRequestAcknowledged = new AtomicInteger();
AtomicInteger numRequestUnAcknowledged = new AtomicInteger();
AtomicInteger numRequestFailed = new AtomicInteger();
int concurrentRuns = randomIntBetween(5, 10);
TestThreadPool testThreadPool = null;
logger.info("--> starting {} concurrent decommission action in zone {}", concurrentRuns, 'a');
try {
testThreadPool = new TestThreadPool(AwarenessAttributeDecommissionIT.class.getName());
List<Runnable> operationThreads = new ArrayList<>();
CountDownLatch countDownLatch = new CountDownLatch(concurrentRuns);
for (int i = 0; i < concurrentRuns; i++) {
Runnable thread = () -> {
logger.info("Triggering decommission action");
DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "a");
DecommissionRequest decommissionRequest = new DecommissionRequest(decommissionAttribute);
decommissionRequest.setNoDelay(true);
try {
DecommissionResponse decommissionResponse = client().execute(DecommissionAction.INSTANCE, decommissionRequest)
.get();
if (decommissionResponse.isAcknowledged()) {
numRequestAcknowledged.incrementAndGet();
} else {
numRequestUnAcknowledged.incrementAndGet();
}
} catch (Exception e) {
numRequestFailed.incrementAndGet();
}
countDownLatch.countDown();
};
operationThreads.add(thread);
}
TestThreadPool finalTestThreadPool = testThreadPool;
operationThreads.forEach(runnable -> finalTestThreadPool.executor("generic").execute(runnable));
countDownLatch.await();
} finally {
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
}
assertEquals(concurrentRuns, numRequestAcknowledged.get() + numRequestUnAcknowledged.get() + numRequestFailed.get());
assertEquals(concurrentRuns - 1, numRequestFailed.get());
assertEquals(1, numRequestAcknowledged.get() + numRequestUnAcknowledged.get());
}

private static class WaitForFailedDecommissionState implements ClusterStateObserver.Listener {

final CountDownLatch doneLatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import static org.opensearch.action.ValidateActions.addValidationError;

/**
* Register decommission request.
* <p>
* Registers a decommission request with decommission attribute and timeout
*
* @opensearch.internal
Expand All @@ -32,7 +30,7 @@ public class DecommissionRequest extends ClusterManagerNodeRequest<DecommissionR
public static final TimeValue DEFAULT_NODE_DRAINING_TIMEOUT = TimeValue.timeValueSeconds(120);

private DecommissionAttribute decommissionAttribute;

private String requestID;
private TimeValue delayTimeout = DEFAULT_NODE_DRAINING_TIMEOUT;

// holder for no_delay param. To avoid draining time timeout.
Expand All @@ -49,6 +47,7 @@ public DecommissionRequest(StreamInput in) throws IOException {
decommissionAttribute = new DecommissionAttribute(in);
this.delayTimeout = in.readTimeValue();
this.noDelay = in.readBoolean();
this.requestID = in.readOptionalString();
}

@Override
Expand All @@ -57,6 +56,7 @@ public void writeTo(StreamOutput out) throws IOException {
decommissionAttribute.writeTo(out);
out.writeTimeValue(delayTimeout);
out.writeBoolean(noDelay);
out.writeOptionalString(requestID);
}

/**
Expand All @@ -77,25 +77,45 @@ public DecommissionAttribute getDecommissionAttribute() {
return this.decommissionAttribute;
}

public void setDelayTimeout(TimeValue delayTimeout) {
public DecommissionRequest setDelayTimeout(TimeValue delayTimeout) {
this.delayTimeout = delayTimeout;
return this;
}

public TimeValue getDelayTimeout() {
return this.delayTimeout;
}

public void setNoDelay(boolean noDelay) {
public DecommissionRequest setNoDelay(boolean noDelay) {
if (noDelay) {
this.delayTimeout = TimeValue.ZERO;
}
this.noDelay = noDelay;
return this;
}

public boolean isNoDelay() {
return noDelay;
}

/**
* Sets id for decommission request
*
* @param requestID uuid for request
* @return this request
*/
public DecommissionRequest setRequestID(String requestID) {
this.requestID = requestID;
return this;
}

/**
* @return Returns id of decommission request
*/
public String requestID() {
return requestID;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
Expand All @@ -122,6 +142,13 @@ public ActionRequestValidationException validate() {

@Override
public String toString() {
return "DecommissionRequest{" + "decommissionAttribute=" + decommissionAttribute + '}';
return "DecommissionRequest{"
+ "decommissionAttribute="
+ decommissionAttribute
+ ", delayTimeout="
+ delayTimeout
+ ", noDelay="
+ noDelay
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,15 @@ public DecommissionRequestBuilder setNoDelay(boolean noDelay) {
request.setNoDelay(noDelay);
return this;
}

/**
* Sets request id for decommission request
*
* @param requestID for decommission request
* @return current object
*/
public DecommissionRequestBuilder requestID(String requestID) {
request.setRequestID(requestID);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class DecommissionAttributeMetadata extends AbstractNamedDiffable<Custom>

private final DecommissionAttribute decommissionAttribute;
private DecommissionStatus status;
private String requestID;
public static final String attributeType = "awareness";

/**
Expand All @@ -45,18 +46,19 @@ public class DecommissionAttributeMetadata extends AbstractNamedDiffable<Custom>
* @param decommissionAttribute attribute details
* @param status current status of the attribute decommission
*/
public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute, DecommissionStatus status) {
public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute, DecommissionStatus status, String requestId) {
this.decommissionAttribute = decommissionAttribute;
this.status = status;
this.requestID = requestId;
}

/**
* Constructs new decommission attribute metadata with status as {@link DecommissionStatus#INIT}
* Constructs new decommission attribute metadata with status as {@link DecommissionStatus#INIT} and request id
*
* @param decommissionAttribute attribute details
*/
public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute) {
this(decommissionAttribute, DecommissionStatus.INIT);
public DecommissionAttributeMetadata(DecommissionAttribute decommissionAttribute, String requestID) {
this(decommissionAttribute, DecommissionStatus.INIT, requestID);
}

/**
Expand All @@ -77,6 +79,15 @@ public DecommissionStatus status() {
return this.status;
}

/**
* Returns the request id of the decommission
*
* @return request id
*/
public String requestID() {
return this.requestID;
}

/**
* Returns instance of the metadata with updated status
* @param newStatus status to be updated with
Expand Down Expand Up @@ -128,12 +139,13 @@ public boolean equals(Object o) {
DecommissionAttributeMetadata that = (DecommissionAttributeMetadata) o;

if (!status.equals(that.status)) return false;
if (!requestID.equals(that.requestID)) return false;
return decommissionAttribute.equals(that.decommissionAttribute);
}

@Override
public int hashCode() {
return Objects.hash(attributeType, decommissionAttribute, status);
return Objects.hash(attributeType, decommissionAttribute, status, requestID);
}

/**
Expand All @@ -152,6 +164,7 @@ public Version getMinimalSupportedVersion() {
public DecommissionAttributeMetadata(StreamInput in) throws IOException {
this.decommissionAttribute = new DecommissionAttribute(in);
this.status = DecommissionStatus.fromString(in.readString());
this.requestID = in.readString();
}

public static NamedDiff<Custom> readDiffFrom(StreamInput in) throws IOException {
Expand All @@ -165,12 +178,14 @@ public static NamedDiff<Custom> readDiffFrom(StreamInput in) throws IOException
public void writeTo(StreamOutput out) throws IOException {
decommissionAttribute.writeTo(out);
out.writeString(status.status());
out.writeString(requestID);
}

public static DecommissionAttributeMetadata fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token;
DecommissionAttribute decommissionAttribute = null;
DecommissionStatus status = null;
String requestID = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
String currentFieldName = parser.currentName();
Expand Down Expand Up @@ -210,6 +225,13 @@ public static DecommissionAttributeMetadata fromXContent(XContentParser parser)
);
}
status = DecommissionStatus.fromString(parser.text());
} else if ("requestID".equals(currentFieldName)) {
if (parser.nextToken() != XContentParser.Token.VALUE_STRING) {
throw new OpenSearchParseException(
"failed to parse status of decommissioning, expected string but found unknown type"
);
}
requestID = parser.text();
} else {
throw new OpenSearchParseException(
"unknown field found [{}], failed to parse the decommission attribute",
Expand All @@ -218,15 +240,15 @@ public static DecommissionAttributeMetadata fromXContent(XContentParser parser)
}
}
}
return new DecommissionAttributeMetadata(decommissionAttribute, status);
return new DecommissionAttributeMetadata(decommissionAttribute, status, requestID);
}

/**
* {@inheritDoc}
*/
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
toXContent(decommissionAttribute, status, attributeType, builder, params);
toXContent(decommissionAttribute, status, requestID, attributeType, builder, params);
return builder;
}

Expand All @@ -245,6 +267,7 @@ public EnumSet<Metadata.XContentContext> context() {
public static void toXContent(
DecommissionAttribute decommissionAttribute,
DecommissionStatus status,
String requestID,
String attributeType,
XContentBuilder builder,
ToXContent.Params params
Expand All @@ -253,6 +276,7 @@ public static void toXContent(
builder.field(decommissionAttribute.attributeName(), decommissionAttribute.attributeValue());
builder.endObject();
builder.field("status", status.status());
builder.field("requestID", requestID);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ public ClusterState execute(ClusterState currentState) {
decommissionAttributeMetadata.validateNewStatus(decommissionStatus);
decommissionAttributeMetadata = new DecommissionAttributeMetadata(
decommissionAttributeMetadata.decommissionAttribute(),
decommissionStatus
decommissionStatus,
decommissionAttributeMetadata.requestID()
);
ClusterState newState = ClusterState.builder(currentState)
.metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata))
Expand Down
Loading

0 comments on commit ac3351c

Please sign in to comment.