Skip to content

Commit

Permalink
fix #5095: moving the enforcement to requestTimeout
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins authored and manusa committed May 9, 2023
1 parent b185e93 commit f1185a0
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 6.7-SNAPSHOT

#### Bugs
* Fix #5095: moving the enforcement to requestTimeout
* Fix #5102: wait on scale to 0 was not completing

#### Improvements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,18 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;

public abstract class StandardHttpClient<C extends HttpClient, F extends HttpClient.Factory, T extends StandardHttpClientBuilder<C, F, ?>>
implements HttpClient, RequestTags {

private static final long ADDITIONAL_REQEUST_TIMEOUT = TimeUnit.SECONDS.toMillis(5);

private static final Logger LOG = LoggerFactory.getLogger(StandardHttpClient.class);

protected StandardHttpClientBuilder<C, F, T> builder;
Expand Down Expand Up @@ -136,14 +140,25 @@ private CompletableFuture<HttpResponse<AsyncBody>> consumeBytesOnce(HttpRequest
};
}

public <V> CompletableFuture<V> orTimeout(CompletableFuture<V> future, RequestConfig requestConfig) {
int timeout = Optional.ofNullable(requestConfig).map(RequestConfig::getRequestTimeout).orElse(0);
if (timeout > 0) {
Future<?> scheduled = Utils.schedule(Runnable::run, () -> future.completeExceptionally(new TimeoutException()),
timeout + ADDITIONAL_REQEUST_TIMEOUT, TimeUnit.MILLISECONDS);
future.whenComplete((v, t) -> scheduled.cancel(true));
}
return future;
}

/**
* Will retry the action if needed based upon the retry settings provided by the ExponentialBackoffIntervalCalculator.
*/
protected <V> void retryWithExponentialBackoff(CompletableFuture<V> result,
Supplier<CompletableFuture<V>> action, URI uri, Function<V, Integer> codeExtractor,
java.util.function.Consumer<V> cancel, ExponentialBackoffIntervalCalculator retryIntervalCalculator) {
java.util.function.Consumer<V> cancel, ExponentialBackoffIntervalCalculator retryIntervalCalculator,
RequestConfig requestConfig) {

action.get()
orTimeout(action.get(), requestConfig)
.whenComplete((response, throwable) -> {
if (retryIntervalCalculator.shouldRetry() && !result.isDone()) {
long retryInterval = retryIntervalCalculator.nextReconnectInterval();
Expand All @@ -168,7 +183,8 @@ protected <V> void retryWithExponentialBackoff(CompletableFuture<V> result,
}
if (retry) {
Utils.schedule(Runnable::run,
() -> retryWithExponentialBackoff(result, action, uri, codeExtractor, cancel, retryIntervalCalculator),
() -> retryWithExponentialBackoff(result, action, uri, codeExtractor, cancel, retryIntervalCalculator,
requestConfig),
retryInterval,
TimeUnit.MILLISECONDS);
return;
Expand All @@ -181,8 +197,9 @@ protected <V> void retryWithExponentialBackoff(CompletableFuture<V> result,
protected <V> void retryWithExponentialBackoff(CompletableFuture<V> result,
Supplier<CompletableFuture<V>> action, URI uri, Function<V, Integer> codeExtractor,
java.util.function.Consumer<V> cancel) {
RequestConfig requestConfig = getTag(RequestConfig.class);
retryWithExponentialBackoff(result, action, uri, codeExtractor, cancel,
ExponentialBackoffIntervalCalculator.from(getTag(RequestConfig.class)));
ExponentialBackoffIntervalCalculator.from(requestConfig), requestConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.fabric8.kubernetes.client.RequestConfigBuilder;
import io.fabric8.kubernetes.client.http.WebSocket.Listener;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
Expand Down Expand Up @@ -212,4 +213,18 @@ void testClosePreviousBeforeRetry() throws Exception {
assertEquals(2, client.getRespFutures().size());
}

@Test
void testRequestTimeout() throws Exception {
client = client.newBuilder().tag(new RequestConfigBuilder()
.withRequestTimeout(1).build())
.build();

CompletableFuture<HttpResponse<AsyncBody>> consumeFuture = client.consumeBytes(
client.newHttpRequestBuilder().uri("http://localhost").build(),
(value, asyncBody) -> {
});

Awaitility.await().atMost(10, TimeUnit.SECONDS).until(consumeFuture::isDone);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,9 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class OperationSupport {

private static final long ADDITIONAL_REQEUST_TIMEOUT = TimeUnit.SECONDS.toMillis(5);
private static final String FIELD_MANAGER_PARAM = "?fieldManager=";
public static final String JSON = "application/json";
public static final String JSON_PATCH = "application/json-patch+json";
Expand Down Expand Up @@ -501,11 +498,6 @@ protected <T> T handleRawGet(URL resourceUrl, Class<T> type) throws IOException
*/
protected <T> T waitForResult(CompletableFuture<T> future) throws IOException {
try {
// since readTimeout may not be enforced in a timely manner at the httpclient, we'll
// enforce a higher level timeout with a small amount of padding to account for possible queuing
if (getRequestConfig().getRequestTimeout() > 0) {
return future.get(getRequestConfig().getRequestTimeout() + ADDITIONAL_REQEUST_TIMEOUT, TimeUnit.MILLISECONDS);
}
return future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -525,9 +517,6 @@ protected <T> T waitForResult(CompletableFuture<T> future) throws IOException {
throw ((KubernetesClientException) t).copyAsCause();
}
throw new KubernetesClientException(t.getMessage(), t);
} catch (TimeoutException e) {
future.cancel(true);
throw KubernetesClientException.launderThrowable(e);
}
}

Expand Down

0 comments on commit f1185a0

Please sign in to comment.