Skip to content

Commit

Permalink
Data node changes for master task throttling
Browse files Browse the repository at this point in the history
Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
  • Loading branch information
dhwanilpatel committed Aug 12, 2022
1 parent 7ac6c8d commit 14acc5c
Show file tree
Hide file tree
Showing 5 changed files with 408 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,21 @@ public abstract class ClusterManagerNodeRequest<Request extends ClusterManagerNo
@Deprecated
protected TimeValue masterNodeTimeout = clusterManagerNodeTimeout;

protected boolean remoteRequest;

protected ClusterManagerNodeRequest() {}

protected ClusterManagerNodeRequest(StreamInput in) throws IOException {
super(in);
clusterManagerNodeTimeout = in.readTimeValue();
remoteRequest = in.readOptionalBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeTimeValue(clusterManagerNodeTimeout);
out.writeOptionalBoolean(remoteRequest);
}

/**
Expand Down Expand Up @@ -110,6 +114,11 @@ public final Request masterNodeTimeout(String timeout) {
return clusterManagerNodeTimeout(timeout);
}

public final Request setRemoteRequest(boolean remoteRequest) {
this.remoteRequest = remoteRequest;
return (Request) this;
}

public final TimeValue clusterManagerNodeTimeout() {
return this.clusterManagerNodeTimeout;
}
Expand All @@ -119,4 +128,8 @@ public final TimeValue clusterManagerNodeTimeout() {
public final TimeValue masterNodeTimeout() {
return clusterManagerNodeTimeout();
}

public final boolean isRemoteRequest() {
return this.remoteRequest;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.support.clustermanager;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionResponse;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.opensearch.cluster.service.MasterTaskThrottlingException;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.transport.TransportException;

import java.util.Iterator;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* ActionListener for retrying the Throttled master tasks.
* It schedules the retry on the Throttling Exception from master node and
* delegates the response if it receive response from master.
*
* It uses ExponentialEqualJitterBackoff policy for determining delay between retries.
*/
public class MasterThrottlingRetryListener<Request extends ClusterManagerNodeRequest<Request>, Response extends ActionResponse>
implements
ActionListener<Response> {

private static final Logger logger = LogManager.getLogger(MasterThrottlingRetryListener.class);

/**
* Base delay in millis.
*/
private final int BASE_DELAY_MILLIS = 10;

/**
* Maximum delay in millis.
*/
private final int MAX_DELAY_MILLIS = 5000;

private long totalDelay;
private final Iterator<TimeValue> backoffDelay;
private final ActionListener<Response> listener;
private final Request request;
private final Runnable runnable;
private final String actionName;
private final boolean localNodeRequest;

private static ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY);

public MasterThrottlingRetryListener(String actionName, Request request, Runnable runnable, ActionListener<Response> actionListener) {
this.actionName = actionName;
this.listener = actionListener;
this.request = request;
this.runnable = runnable;
this.backoffDelay = BackoffPolicy.exponentialEqualJitterBackoff(BASE_DELAY_MILLIS, MAX_DELAY_MILLIS).iterator();
/**
This is to determine whether request is generated from local node or from remote node.
If it is local node's request we need to perform the retries on this node.
If it is remote node's request, we will not perform retries on this node and let remote node perform the retries.
If request is from remote data node, then data node will set remoteRequest flag in {@link MasterNodeRequest}
and send request to master, using that on master node we can determine if the request was localRequest or remoteRequest.
*/
this.localNodeRequest = !(request.isRemoteRequest());
}

@Override
public void onResponse(Response response) {
listener.onResponse(response);
}

@Override
public void onFailure(Exception e) {

if (localNodeRequest && isThrottlingException(e)) {
logger.info("Retrying [{}] on throttling exception from master. Error: [{}]", actionName, getExceptionMessage(e));
long delay = backoffDelay.next().getMillis();
if (totalDelay + delay >= request.clusterManagerNodeTimeout.getMillis()) {
delay = request.clusterManagerNodeTimeout.getMillis() - totalDelay;
scheduler.schedule(new Runnable() {
@Override
public void run() {
listener.onFailure(new ProcessClusterEventTimeoutException(request.clusterManagerNodeTimeout, actionName));
}
}, delay, TimeUnit.MILLISECONDS);
} else {
scheduler.schedule(runnable, delay, TimeUnit.MILLISECONDS);
}
totalDelay += delay;
} else {
listener.onFailure(e);
}
}

/**
* For Testcase purposes.
* @param retrySceduler scheduler defined in test cases.
*/
public static void setThrottlingRetryScheduler(ScheduledThreadPoolExecutor retrySceduler) {
scheduler = retrySceduler;
}

private boolean isThrottlingException(Exception e) {
if (e instanceof TransportException) {
return ((TransportException) e).unwrapCause() instanceof MasterTaskThrottlingException;
}
return e instanceof MasterTaskThrottlingException;
}

private String getExceptionMessage(Exception e) {
if (e instanceof TransportException) {
return ((TransportException) e).unwrapCause().getMessage();
} else {
return e.getMessage();
}
}

public static long getRetryingTasksCount() {
return scheduler.getActiveCount() + scheduler.getQueue().size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,10 @@ protected boolean localExecute(Request request) {

@Override
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
ClusterState state = clusterService.state();
logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version());
if (task != null) {
request.setParentTask(clusterService.localNode().getId(), task.getId());
}
new AsyncSingleAction(task, request, listener).doStart(state);
new AsyncSingleAction(task, request, listener).start();
}

/**
Expand All @@ -158,10 +156,16 @@ class AsyncSingleAction {
AsyncSingleAction(Task task, Request request, ActionListener<Response> listener) {
this.task = task;
this.request = request;
this.listener = listener;
this.listener = new MasterThrottlingRetryListener(actionName, request, this::start, listener);
this.startTime = threadPool.relativeTimeInMillis();
}

public void start() {
ClusterState state = clusterService.state();
logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version());
doStart(state);
}

protected void doStart(ClusterState clusterState) {
try {
final DiscoveryNodes nodes = clusterState.nodes();
Expand Down Expand Up @@ -210,6 +214,7 @@ protected void doStart(ClusterState clusterState) {
} else {
DiscoveryNode clusterManagerNode = nodes.getMasterNode();
final String actionName = getClusterManagerActionName(clusterManagerNode);
request.setRemoteRequest(true);
transportService.sendRequest(
clusterManagerNode,
actionName,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.support.clustermanager;

import org.opensearch.action.ActionListener;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
import org.opensearch.cluster.service.MasterTaskThrottlingException;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.Scheduler;

import java.time.Instant;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.AfterClass;
import org.junit.Before;

/**
* Test class of {@link MasterThrottlingRetryListener}
*/
public class MasterThrottlingRetryListenerTests extends OpenSearchTestCase {
private static ScheduledThreadPoolExecutor throttlingRetryScheduler = Scheduler.initScheduler(Settings.EMPTY);

@Override
@Before
public void setUp() throws Exception {
super.setUp();
MasterThrottlingRetryListener.setThrottlingRetryScheduler(throttlingRetryScheduler);
}

@AfterClass
public static void afterClass() {
Scheduler.terminate(throttlingRetryScheduler, 30, TimeUnit.SECONDS);
}

public void testRetryForLocalRequest() throws BrokenBarrierException, InterruptedException {
TransportClusterManagerNodeActionTests.Request request = new TransportClusterManagerNodeActionTests.Request();
PlainActionFuture<TransportClusterManagerNodeActionTests.Response> listener = new PlainActionFuture<>();
CyclicBarrier barrier = new CyclicBarrier(2);
AtomicBoolean callBackExecuted = new AtomicBoolean();
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
callBackExecuted.set(true);
barrier.await();
} catch (Exception e) {
new AssertionError();
}
}
};

ActionListener taskRetryListener = new MasterThrottlingRetryListener("test", request, runnable, listener);

taskRetryListener.onFailure(new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test"));
barrier.await();
assertTrue(callBackExecuted.get());
}

public void testRetryForRemoteRequest() throws BrokenBarrierException, InterruptedException {
TransportClusterManagerNodeActionTests.Request request = new TransportClusterManagerNodeActionTests.Request();
request.setRemoteRequest(true);
PlainActionFuture<TransportClusterManagerNodeActionTests.Response> listener = new PlainActionFuture<>();
AtomicBoolean callBackExecuted = new AtomicBoolean(false);
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
callBackExecuted.set(true);
} catch (Exception e) {
new AssertionError();
}
}
};

ActionListener taskRetryListener = new MasterThrottlingRetryListener("test", request, runnable, listener);

taskRetryListener.onFailure(new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test"));
Thread.sleep(100); // some buffer time so callback can execute.
assertFalse(callBackExecuted.get());
}

public void testTimedOut() throws BrokenBarrierException, InterruptedException {

CyclicBarrier barrier = new CyclicBarrier(2);
AtomicBoolean onFailureExecuted = new AtomicBoolean();
AtomicBoolean retryExecuted = new AtomicBoolean();
AtomicBoolean firstExecute = new AtomicBoolean(true);
int timeOutSec = randomIntBetween(1, 5);
final Instant[] startTime = new Instant[1];
final Instant[] endTime = new Instant[1];

ActionListener listener = new ActionListener() {
@Override
public void onResponse(Object o) {
new AssertionError();
}

@Override
public void onFailure(Exception e) {
endTime[0] = Instant.now();
try {
onFailureExecuted.set(true);
barrier.await();
} catch (Exception exe) {
new AssertionError();
}
assertEquals(ProcessClusterEventTimeoutException.class, e.getClass());
}
};
TransportClusterManagerNodeActionTests.Request request = new TransportClusterManagerNodeActionTests.Request()
.clusterManagerNodeTimeout(TimeValue.timeValueSeconds(timeOutSec));

class TestRetryClass {
ActionListener listener;

TestRetryClass(ActionListener listener) {
this.listener = new MasterThrottlingRetryListener("test", request, this::execute, listener);
}

public void execute() {
if (firstExecute.getAndSet(false)) {
startTime[0] = Instant.now();
}
listener.onFailure(new MasterTaskThrottlingException("Throttling Exception : Limit exceeded for test"));
}
}

TestRetryClass testRetryClass = new TestRetryClass(listener);
testRetryClass.execute();

barrier.await();
assertEquals(timeOutSec, (endTime[0].toEpochMilli() - startTime[0].toEpochMilli()) / 1000);
assertTrue(onFailureExecuted.get());
assertFalse(retryExecuted.get());
}

public void testRetryForDifferentException() {

TransportClusterManagerNodeActionTests.Request request = new TransportClusterManagerNodeActionTests.Request();
PlainActionFuture<TransportClusterManagerNodeActionTests.Response> listener = new PlainActionFuture<>();
AtomicBoolean callBackExecuted = new AtomicBoolean();
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
callBackExecuted.set(true);
} catch (Exception e) {
new AssertionError();
}
}
};

ActionListener taskRetryListener = new MasterThrottlingRetryListener("test", request, runnable, listener);

taskRetryListener.onFailure(new Exception());
assertFalse(callBackExecuted.get());

taskRetryListener.onFailure(new OpenSearchRejectedExecutionException("Different Exception"));
assertFalse(callBackExecuted.get());
}
}
Loading

0 comments on commit 14acc5c

Please sign in to comment.