Skip to content

Commit

Permalink
Moved back backoff policy to BackOffPolicy class
Browse files Browse the repository at this point in the history
Signed-off-by: Dhwanil Patel <dhwanip@amazon.com>
  • Loading branch information
dhwanilpatel committed Aug 30, 2022
1 parent a9cd2d1 commit 3ae89ea
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 210 deletions.
126 changes: 126 additions & 0 deletions server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package org.opensearch.action.bulk;

import org.opensearch.common.Randomness;
import org.opensearch.common.unit.TimeValue;

import java.util.Iterator;
Expand Down Expand Up @@ -105,6 +106,30 @@ public static BackoffPolicy exponentialBackoff(TimeValue initialDelay, int maxNu
return new ExponentialBackoff((int) checkDelay(initialDelay).millis(), maxNumberOfRetries);
}

/**
* It provides exponential backoff between retries until it reaches maxDelayForRetry.
* It uses equal jitter scheme as it is being used for throttled exceptions.
* It will make random distribution and also guarantees a minimum delay.
*
* @param baseDelay BaseDelay for exponential Backoff
* @param maxDelayForRetry MaxDelay that can be returned from backoff policy
* @return A backoff policy with exponential backoff with equal jitter which can't return delay more than given max delay
*/
public static BackoffPolicy exponentialEqualJitterBackoff(int baseDelay, int maxDelayForRetry) {
return new ExponentialEqualJitterBackoff(baseDelay, maxDelayForRetry);
}

/**
* It provides exponential backoff between retries until it reaches Integer.MAX_VALUE.
* It will make random distribution of delay.
*
* @param baseDelay BaseDelay for exponential Backoff
* @return A backoff policy with exponential backoff with equal jitter which can't return delay more than given max delay
*/
public static BackoffPolicy exponentialRandomBackoff(long baseDelay) {
return new ExponentialRandomBackoff(baseDelay);
}

/**
* Wraps the backoff policy in one that calls a method every time a new backoff is taken from the policy.
*/
Expand Down Expand Up @@ -197,6 +222,107 @@ public TimeValue next() {
}
}

private static class ExponentialEqualJitterBackoff extends BackoffPolicy {
private final int maxDelayForRetry;
private final int baseDelay;

private ExponentialEqualJitterBackoff(int baseDelay, int maxDelayForRetry) {
this.maxDelayForRetry = maxDelayForRetry;
this.baseDelay = baseDelay;
}

@Override
public Iterator<TimeValue> iterator() {
return new ExponentialEqualJitterBackoffIterator(baseDelay, maxDelayForRetry);
}
}

private static class ExponentialEqualJitterBackoffIterator implements Iterator<TimeValue> {
/**
* Retry limit to avoids integer overflow issues.
* Post this limit, max delay will be returned with Equal Jitter.
*
* NOTE: If the value is greater than 30, there can be integer overflow
* issues during delay calculation.
**/
private final int RETRIES_TILL_JITTER_INCREASE = 30;

/**
* Exponential increase in delay will happen till it reaches maxDelayForRetry.
* Once delay has exceeded maxDelayForRetry, it will return maxDelayForRetry only
* and not increase the delay.
*/
private final int maxDelayForRetry;
private final int baseDelay;
private int retriesAttempted;

private ExponentialEqualJitterBackoffIterator(int baseDelay, int maxDelayForRetry) {
this.baseDelay = baseDelay;
this.maxDelayForRetry = maxDelayForRetry;
}

/**
* There is not any limit for this BackOff.
* This Iterator will always return back off delay.
*
* @return true
*/
@Override
public boolean hasNext() {
return true;
}

@Override
public TimeValue next() {
int retries = Math.min(retriesAttempted, RETRIES_TILL_JITTER_INCREASE);
int exponentialDelay = (int) Math.min((1L << retries) * baseDelay, maxDelayForRetry);
retriesAttempted++;
return TimeValue.timeValueMillis((exponentialDelay / 2) + Randomness.get().nextInt(exponentialDelay / 2 + 1));
}
}

private static class ExponentialRandomBackoff extends BackoffPolicy {
private final long baseDelay;

private ExponentialRandomBackoff(long baseDelay) {
this.baseDelay = baseDelay;
}

@Override
public Iterator<TimeValue> iterator() {
return new ExponentialRandomBackoffIterator(baseDelay);
}
}

private static class ExponentialRandomBackoffIterator implements Iterator<TimeValue> {
/**
* Current delay in exponential backoff
*/
private long currentDelay;

private ExponentialRandomBackoffIterator(long baseDelay) {
this.currentDelay = baseDelay;
}

/**
* There is not any limit for this BackOff.
* This Iterator will always return back off delay.
*
* @return true
*/
@Override
public boolean hasNext() {
return true;
}

@Override
public TimeValue next() {
TimeValue delayToReturn = TimeValue.timeValueMillis(Randomness.get().nextInt(Math.toIntExact(currentDelay)) + 1);
currentDelay = Math.min(2 * currentDelay, Integer.MAX_VALUE);
return delayToReturn;
}
}

/**
* Concrete Constant Back Off Policy
*
Expand Down
145 changes: 0 additions & 145 deletions server/src/main/java/org/opensearch/action/support/RetryPolicy.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
import org.opensearch.threadpool.Scheduler;
Expand Down Expand Up @@ -64,7 +65,7 @@ public abstract class RetryableAction<Response> {
private final long startMillis;
private final ActionListener<Response> finalListener;
private final String executor;
private final RetryPolicy retryPolicy;
private final BackoffPolicy backoffPolicy;

private volatile Scheduler.ScheduledCancellable retryTask;

Expand All @@ -81,7 +82,7 @@ public RetryableAction(
initialDelay,
timeoutValue,
listener,
RetryPolicy.exponentialBackoff(initialDelay.getMillis()),
BackoffPolicy.exponentialRandomBackoff(initialDelay.getMillis()),
ThreadPool.Names.SAME
);
}
Expand All @@ -92,7 +93,7 @@ public RetryableAction(
TimeValue initialDelay,
TimeValue timeoutValue,
ActionListener<Response> listener,
RetryPolicy retryPolicy,
BackoffPolicy backoffPolicy,
String executor
) {
this.logger = logger;
Expand All @@ -105,11 +106,11 @@ public RetryableAction(
this.startMillis = threadPool.relativeTimeInMillis();
this.finalListener = listener;
this.executor = executor;
this.retryPolicy = retryPolicy;
this.backoffPolicy = backoffPolicy;
}

public void run() {
final RetryingListener retryingListener = new RetryingListener(retryPolicy.iterator(), null);
final RetryingListener retryingListener = new RetryingListener(backoffPolicy.iterator(), null);
final Runnable runnable = createRunnable(retryingListener);
threadPool.executor(executor).execute(runnable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.ActionResponse;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.RetryPolicy;
import org.opensearch.action.support.RetryableAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateObserver;
Expand Down Expand Up @@ -166,7 +166,7 @@ class AsyncSingleAction extends RetryableAction {
TimeValue.timeValueMillis(BASE_DELAY_MILLIS),
request.clusterManagerNodeTimeout,
listener,
RetryPolicy.exponentialEqualJitterBackoff(BASE_DELAY_MILLIS, MAX_DELAY_MILLIS),
BackoffPolicy.exponentialEqualJitterBackoff(BASE_DELAY_MILLIS, MAX_DELAY_MILLIS),
ThreadPool.Names.SAME
);
this.task = task;
Expand Down
Loading

0 comments on commit 3ae89ea

Please sign in to comment.