Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add backoff retry capability in rest client #170

Merged
merged 16 commits into from
Nov 22, 2023
Merged
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ lazy val flintCore = (project in file("flint-core"))
"org.opensearch.client" % "opensearch-rest-client" % opensearchVersion,
"org.opensearch.client" % "opensearch-rest-high-level-client" % opensearchVersion
exclude ("org.apache.logging.log4j", "log4j-api"),
"dev.failsafe" % "failsafe" % "3.3.2",
"com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided"
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
"org.scalactic" %% "scalactic" % "3.2.15" % "test",
Expand Down
3 changes: 3 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)]
- `spark.datasource.flint.read.scroll_size`: default value is 100.
- `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration.
- `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry.
- `spark.datasource.flint.retry.http_status_codes`: retryable HTTP response status code list. default value is "429,502" (429 Too Many Request and 502 Bad Gateway).
- `spark.datasource.flint.retry.exception_class_names`: retryable exception class name list. by default no retry on any exception thrown.
- `spark.flint.optimizer.enabled`: default is true.
- `spark.flint.index.hybridscan.enabled`: default is false.
- `spark.flint.index.checkpoint.mandatory`: default is true.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@

package org.opensearch.flint.core;

import dev.failsafe.RetryPolicy;
import java.io.Serializable;
import java.util.Map;
import org.opensearch.flint.core.http.FlintRetryOptions;

/**
* Flint Options include all the flint related configuration.
Expand All @@ -15,6 +17,11 @@ public class FlintOptions implements Serializable {

private final Map<String, String> options;

/**
* Flint options related to HTTP retry policy.
*/
private final FlintRetryOptions retryOptions;

public static final String HOST = "host";

public static final String PORT = "port";
Expand Down Expand Up @@ -68,6 +75,7 @@ public class FlintOptions implements Serializable {

public FlintOptions(Map<String, String> options) {
this.options = options;
this.retryOptions = new FlintRetryOptions(options);
}

public String getHost() {
Expand All @@ -88,6 +96,10 @@ public int getScrollDuration() {

public String getRefreshPolicy() {return options.getOrDefault(REFRESH_POLICY, DEFAULT_REFRESH_POLICY);}

public FlintRetryOptions getRetryOptions() {
return retryOptions;
}

public String getRegion() {
return options.getOrDefault(REGION, DEFAULT_REGION);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.http;

import static java.time.temporal.ChronoUnit.SECONDS;

import dev.failsafe.RetryPolicy;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.logging.Logger;
import org.opensearch.flint.core.http.handler.ExceptionClassNameFailurePredicate;
import org.opensearch.flint.core.http.handler.HttpStatusCodeResultPredicate;

/**
* Flint options related to HTTP request retry.
*/
public class FlintRetryOptions {

private static final Logger LOG = Logger.getLogger(FlintRetryOptions.class.getName());

/**
* All Flint options.
*/
private final Map<String, String> options;

/**
* Maximum retry attempt
*/
public static final int DEFAULT_MAX_RETRIES = 3;
public static final String MAX_RETRIES = "retry.max_retries";

public static final String DEFAULT_RETRYABLE_HTTP_STATUS_CODES = "429,502";
public static final String RETRYABLE_HTTP_STATUS_CODES = "retry.http_status_codes";

/**
* Retryable exception class name
*/
public static final String RETRYABLE_EXCEPTION_CLASS_NAMES = "retry.exception_class_names";

public FlintRetryOptions(Map<String, String> options) {
this.options = options;
}

/**
* Is auto retry capability enabled.
*
* @return true if enabled, otherwise false.
*/
public boolean isRetryEnabled() {
return getMaxRetries() > 0;
}

/**
* Build retry policy based on the given Flint options.
*
* @param <T> success execution result type
* @return Failsafe retry policy
*/
public <T> RetryPolicy<T> getRetryPolicy() {
return RetryPolicy.<T>builder()
// Backoff strategy config (can be configurable as needed in future)
.withBackoff(1, 30, SECONDS)
.withJitter(Duration.ofMillis(100))
// Failure handling config from Flint options
.withMaxRetries(getMaxRetries())
.handleIf(ExceptionClassNameFailurePredicate.create(getRetryableExceptionClassNames()))
.handleResultIf(new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes()))
// Logging listener
.onFailedAttempt(event ->
LOG.severe("Attempt to execute request failed: " + event))
.onRetry(ex ->
LOG.warning("Retrying failed request at #" + ex.getAttemptCount()))
.build();
}

/**
* @return maximum retry option value
*/
public int getMaxRetries() {
return Integer.parseInt(
options.getOrDefault(MAX_RETRIES, String.valueOf(DEFAULT_MAX_RETRIES)));
}

/**
* @return retryable HTTP status code list
*/
public String getRetryableHttpStatusCodes() {
return options.getOrDefault(RETRYABLE_HTTP_STATUS_CODES, DEFAULT_RETRYABLE_HTTP_STATUS_CODES);
}

/**
* @return retryable exception class name list
*/
public Optional<String> getRetryableExceptionClassNames() {
return Optional.ofNullable(options.get(RETRYABLE_EXCEPTION_CLASS_NAMES));
}

@Override
public String toString() {
return "FlintRetryOptions{" +
"maxRetries=" + getMaxRetries() +
", retryableStatusCodes=" + getRetryableHttpStatusCodes() +
", retryableExceptionClassNames=" + getRetryableExceptionClassNames() +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.http;

import dev.failsafe.Failsafe;
import dev.failsafe.FailsafeException;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.apache.http.protocol.HttpContext;
import org.opensearch.flint.core.FlintOptions;

/**
* HTTP client that retries request to tolerant transient fault.
*/
public class RetryableHttpAsyncClient extends CloseableHttpAsyncClient {

private static final Logger LOG = Logger.getLogger(RetryableHttpAsyncClient.class.getName());

/**
* Delegated internal HTTP client that execute the request underlying.
*/
private final CloseableHttpAsyncClient internalClient;

/**
* Flint retry options.
*/
private final FlintRetryOptions options;

public RetryableHttpAsyncClient(CloseableHttpAsyncClient internalClient,
FlintRetryOptions options) {
this.internalClient = internalClient;
this.options = options;
}

@Override
public boolean isRunning() {
return internalClient.isRunning();
}

@Override
public void start() {
internalClient.start();
}

@Override
public void close() throws IOException {
internalClient.close();
}

@Override
public <T> Future<T> execute(HttpAsyncRequestProducer requestProducer,
HttpAsyncResponseConsumer<T> responseConsumer,
HttpContext context,
FutureCallback<T> callback) {
return new Future<>() {
/**
* Delegated future object created per doExecuteAndFutureGetWithRetry() call which creates initial object too.
* In this way, we avoid the duplicate logic of first call and subsequent retry calls.
* Here the assumption is cancel, isCancelled and isDone never called before get().
* (OpenSearch RestClient seems only call get() API)
*/
private Future<T> delegate;

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return delegate.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return delegate.isCancelled();
}

@Override
public boolean isDone() {
return delegate.isDone();
}

@Override
public T get() throws InterruptedException, ExecutionException {
return doExecuteAndFutureGetWithRetry(() -> delegate.get());
}

@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
return doExecuteAndFutureGetWithRetry(() -> delegate.get(timeout, unit));
}

private T doExecuteAndFutureGetWithRetry(Callable<T> futureGet) throws InterruptedException, ExecutionException {
try {
// Retry by creating a new Future object (as new delegate) and get its result again
return Failsafe
.with(options.getRetryPolicy())
.get(() -> {
this.delegate = internalClient.execute(requestProducer, responseConsumer, context, callback);
return futureGet.call();
});
} catch (FailsafeException ex) {
LOG.severe("Request failed permanently. Re-throwing original exception.");

// Failsafe will wrap checked exception, such as ExecutionException
// So here we have to unwrap failsafe exception and rethrow it
Throwable cause = ex.getCause();
if (cause instanceof InterruptedException) {
throw (InterruptedException) cause;
} else if (cause instanceof ExecutionException) {
throw (ExecutionException) cause;
} else {
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
throw ex;
}
}
}
};
}

public static HttpAsyncClientBuilder builder(HttpAsyncClientBuilder delegate, FlintOptions options) {
FlintRetryOptions retryOptions = options.getRetryOptions();
if (!retryOptions.isRetryEnabled()) {
return delegate;
}

// Wrap original builder so created client will be wrapped by retryable client too
return new HttpAsyncClientBuilder() {
@Override
public CloseableHttpAsyncClient build() {
LOG.info("Building retryable http async client with options: " + retryOptions);
return new RetryableHttpAsyncClient(delegate.build(), retryOptions);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.http.handler;

import dev.failsafe.function.CheckedPredicate;
import java.util.HashSet;
import java.util.Set;
import java.util.logging.Logger;

/**
* Failure predicate that determines if retryable based on error stacktrace iteration.
*/
public abstract class ErrorStacktraceFailurePredicate implements CheckedPredicate<Throwable> {

private static final Logger LOG = Logger.getLogger(ErrorStacktraceFailurePredicate.class.getName());

/**
* This base class implementation iterates the stacktrace and pass each exception
* to subclass for retryable decision.
*/
@Override
public boolean test(Throwable throwable) throws Throwable {
// Use extra set to Handle nested exception to avoid dead loop
Set<Throwable> seen = new HashSet<>();

while (throwable != null && seen.add(throwable)) {
LOG.info("Checking if exception retryable: " + throwable);

if (isRetryable(throwable)) {
LOG.info("Exception is retryable: " + throwable);
return true;
}
throwable = throwable.getCause();
}

LOG.info("No retryable exception found on the stacktrace");
return false;
}

/**
* Is exception retryable decided by subclass implementation
*/
protected abstract boolean isRetryable(Throwable throwable);
}
Loading
Loading