Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data node changes for master task throttling #4204

Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,17 @@ public static BackoffPolicy exponentialEqualJitterBackoff(int baseDelay, int max
return new ExponentialEqualJitterBackoff(baseDelay, maxDelayForRetry);
}

/**
* It provides exponential backoff between retries until it reaches Integer.MAX_VALUE.
* It will make random distribution of delay.
*
* @param baseDelay BaseDelay for exponential Backoff
* @return A backoff policy with exponential backoff with equal jitter which can't return delay more than given max delay
*/
public static BackoffPolicy exponentialRandomBackoff(long baseDelay) {
return new ExponentialRandomBackoff(baseDelay);
}

/**
* Wraps the backoff policy in one that calls a method every time a new backoff is taken from the policy.
*/
Expand Down Expand Up @@ -270,6 +281,48 @@ public TimeValue next() {
}
}

private static class ExponentialRandomBackoff extends BackoffPolicy {
private final long baseDelay;

private ExponentialRandomBackoff(long baseDelay) {
this.baseDelay = baseDelay;
}

@Override
public Iterator<TimeValue> iterator() {
return new ExponentialRandomBackoffIterator(baseDelay);
}
}

private static class ExponentialRandomBackoffIterator implements Iterator<TimeValue> {
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
/**
* Current delay in exponential backoff
*/
private long currentDelay;

private ExponentialRandomBackoffIterator(long baseDelay) {
this.currentDelay = baseDelay;
}

/**
* There is not any limit for this BackOff.
* This Iterator will always return back off delay.
*
* @return true
*/
@Override
public boolean hasNext() {
return true;
}

@Override
public TimeValue next() {
TimeValue delayToReturn = TimeValue.timeValueMillis(Randomness.get().nextInt(Math.toIntExact(currentDelay)) + 1);
currentDelay = Math.min(2 * currentDelay, Integer.MAX_VALUE);
Copy link
Member

@shwetathareja shwetathareja Sep 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be first statement in the method (currentDelay calculation)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to first calculate the randomDelay out of current delay which we needs to return as part of current call and then double it and store in currentDelay for next call.
So that's why we are first doing retrun delay calculation and then update current delay with double of it.

return delayToReturn;
}
}

/**
* Concrete Constant Back Off Policy
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRunnable;
import org.opensearch.common.Randomness;
import org.opensearch.action.bulk.BackoffPolicy;
dhwanilpatel marked this conversation as resolved.
Show resolved Hide resolved
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand All @@ -64,6 +65,7 @@ public abstract class RetryableAction<Response> {
private final long startMillis;
private final ActionListener<Response> finalListener;
private final String executor;
private final BackoffPolicy backoffPolicy;

private volatile Scheduler.ScheduledCancellable retryTask;

Expand All @@ -74,7 +76,15 @@ public RetryableAction(
TimeValue timeoutValue,
ActionListener<Response> listener
) {
this(logger, threadPool, initialDelay, timeoutValue, listener, ThreadPool.Names.SAME);
this(
logger,
threadPool,
initialDelay,
timeoutValue,
listener,
BackoffPolicy.exponentialRandomBackoff(initialDelay.getMillis()),
ThreadPool.Names.SAME
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which is this threadpool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per my understanding, This is basically caller's threadpool only. We will not create new threadpool for retries but it will perform the retries on the same threadpool of the caller's.

);
}

public RetryableAction(
Expand All @@ -83,6 +93,7 @@ public RetryableAction(
TimeValue initialDelay,
TimeValue timeoutValue,
ActionListener<Response> listener,
BackoffPolicy backoffPolicy,
String executor
) {
this.logger = logger;
Expand All @@ -95,10 +106,11 @@ public RetryableAction(
this.startMillis = threadPool.relativeTimeInMillis();
this.finalListener = listener;
this.executor = executor;
this.backoffPolicy = backoffPolicy;
}

public void run() {
final RetryingListener retryingListener = new RetryingListener(initialDelayMillis, null);
final RetryingListener retryingListener = new RetryingListener(backoffPolicy.iterator(), null);
final Runnable runnable = createRunnable(retryingListener);
threadPool.executor(executor).execute(runnable);
}
Expand Down Expand Up @@ -146,12 +158,12 @@ private class RetryingListener implements ActionListener<Response> {

private static final int MAX_EXCEPTIONS = 4;

private final long delayMillisBound;
private ArrayDeque<Exception> caughtExceptions;
private Iterator<TimeValue> backoffDelayIterator;

private RetryingListener(long delayMillisBound, ArrayDeque<Exception> caughtExceptions) {
this.delayMillisBound = delayMillisBound;
private RetryingListener(Iterator<TimeValue> backoffDelayIterator, ArrayDeque<Exception> caughtExceptions) {
this.caughtExceptions = caughtExceptions;
this.backoffDelayIterator = backoffDelayIterator;
}

@Override
Expand All @@ -175,12 +187,9 @@ public void onFailure(Exception e) {
} else {
addException(e);

final long nextDelayMillisBound = Math.min(delayMillisBound * 2, Integer.MAX_VALUE);
final RetryingListener retryingListener = new RetryingListener(nextDelayMillisBound, caughtExceptions);
final Runnable runnable = createRunnable(retryingListener);
final long delayMillis = Randomness.get().nextInt(Math.toIntExact(delayMillisBound)) + 1;
final TimeValue delay = backoffDelayIterator.next();
final Runnable runnable = createRunnable(this);
if (isDone.get() == false) {
final TimeValue delay = TimeValue.timeValueMillis(delayMillis);
logger.debug(() -> new ParameterizedMessage("retrying action that failed in {}", delay), e);
try {
retryTask = threadPool.schedule(runnable, delay, executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.ActionResponse;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.RetryableAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateObserver;
import org.opensearch.cluster.MasterNodeChangePredicate;
Expand All @@ -51,6 +53,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.cluster.service.MasterTaskThrottlingException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.unit.TimeValue;
Expand Down Expand Up @@ -134,34 +137,64 @@ protected boolean localExecute(Request request) {

@Override
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
ClusterState state = clusterService.state();
logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version());
if (task != null) {
request.setParentTask(clusterService.localNode().getId(), task.getId());
}
new AsyncSingleAction(task, request, listener).doStart(state);
new AsyncSingleAction(task, request, listener).run();
}

/**
* Asynchronous single action
*
* @opensearch.internal
*/
class AsyncSingleAction {
class AsyncSingleAction extends RetryableAction {

private final ActionListener<Response> listener;
private ActionListener<Response> listener;
private final Request request;
private ClusterStateObserver observer;
private final long startTime;
private final Task task;
private static final int BASE_DELAY_MILLIS = 10;
private static final int MAX_DELAY_MILLIS = 5000;

AsyncSingleAction(Task task, Request request, ActionListener<Response> listener) {
super(
logger,
threadPool,
TimeValue.timeValueMillis(BASE_DELAY_MILLIS),
request.clusterManagerNodeTimeout,
listener,
BackoffPolicy.exponentialEqualJitterBackoff(BASE_DELAY_MILLIS, MAX_DELAY_MILLIS),
ThreadPool.Names.SAME
);
this.task = task;
this.request = request;
this.listener = listener;
this.startTime = threadPool.relativeTimeInMillis();
}

@Override
public void tryAction(ActionListener retryListener) {
ClusterState state = clusterService.state();
logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version());
this.listener = retryListener;
doStart(state);
}

@Override
public boolean shouldRetry(Exception e) {
// If remote address is null, i.e request is generated from same node and we would want to perform retry for it
// If remote address is not null, i.e request is generated from remote node and received on this master node on transport layer
// in that case we would want throttling retry to perform on remote node only not on this master node.
if (request.remoteAddress() == null) {
if (e instanceof TransportException) {
return ((TransportException) e).unwrapCause() instanceof MasterTaskThrottlingException;
}
return e instanceof MasterTaskThrottlingException;
}
return false;
}

protected void doStart(ClusterState clusterState) {
try {
final DiscoveryNodes nodes = clusterState.nodes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,22 @@ public void testEqualJitterExponentialBackOffPolicy() {
assertTrue(delay.getMillis() <= maxDelay);
}
}

public void testExponentialBackOffPolicy() {
long baseDelay = 10;
int maxDelay = 10000;
long currentDelay = baseDelay;
BackoffPolicy policy = BackoffPolicy.exponentialRandomBackoff(baseDelay);
Iterator<TimeValue> iterator = policy.iterator();

// Assert equal jitter
int numberOfRetries = randomInt(20);

for (int i = 0; i < numberOfRetries; i++) {
TimeValue delay = iterator.next();
assertTrue(delay.getMillis() >= 0);
assertTrue(delay.getMillis() <= currentDelay);
currentDelay = currentDelay * 2;
}
}
}
Loading