From f2dc4a71282cd7453e07bc9cf8efbd66df41ae79 Mon Sep 17 00:00:00 2001 From: Santiago Pericasgeertsen Date: Thu, 3 Nov 2022 10:30:43 -0400 Subject: [PATCH 1/7] Try a larger multipler for timeouts in pipeline. Signed-off-by: Santiago Pericasgeertsen --- microprofile/tests/tck/tck-fault-tolerance/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/microprofile/tests/tck/tck-fault-tolerance/pom.xml b/microprofile/tests/tck/tck-fault-tolerance/pom.xml index bfdafff4e9a..8cdeb621817 100644 --- a/microprofile/tests/tck/tck-fault-tolerance/pom.xml +++ b/microprofile/tests/tck/tck-fault-tolerance/pom.xml @@ -93,6 +93,7 @@ 32 src/test/resources/logging.properties + 2.0 src/test/tck-suite.xml From 55ed29b314d2d656dd1f7014e267861193346f93 Mon Sep 17 00:00:00 2001 From: Santiago Pericasgeertsen Date: Thu, 3 Nov 2022 13:57:26 -0400 Subject: [PATCH 2/7] Pipeline test: different implementation of Timeout and improved mutual exclusion in bulkheads. Signed-off-by: Santiago Pericasgeertsen --- .../nima/faulttolerance/BulkheadImpl.java | 44 +++++++++++-------- .../nima/faulttolerance/TimeoutImpl.java | 20 ++++++--- 2 files changed, 40 insertions(+), 24 deletions(-) diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java index 3fe89636663..b24621b6ff0 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java @@ -267,7 +267,13 @@ public int capacity() { @Override public void enqueueAndWaitOn(Supplier supplier) throws ExecutionException, InterruptedException { - Barrier barrier = enqueue(supplier); + Barrier barrier; + lock.lock(); + try { + barrier = enqueue(supplier); + } finally { + lock.unlock(); + } if (barrier != null) { barrier.waitOn(); } else { @@ -277,37 +283,37 @@ public void enqueueAndWaitOn(Supplier supplier) throws ExecutionException, In @Override public void dequeueAndRetract() { - Barrier barrier = dequeue(); - if (barrier != null) { - barrier.retract(); - } else { - throw new IllegalStateException("Queue is empty"); + lock.lock(); + try { + Barrier barrier = dequeue(); + if (barrier != null) { + barrier.retract(); + } else { + throw new IllegalStateException("Queue is empty"); + } + } finally { + lock.unlock(); } } @Override public boolean remove(Supplier supplier) { - return queue.remove(supplier); - } - - private Barrier dequeue() { lock.lock(); try { - Supplier supplier = queue.poll(); - return supplier == null ? null : map.remove(supplier); + return queue.remove(supplier); } finally { lock.unlock(); } } + private Barrier dequeue() { + Supplier supplier = queue.poll(); + return supplier == null ? null : map.remove(supplier); + } + private Barrier enqueue(Supplier supplier) { - lock.lock(); - try { - boolean added = queue.offer(supplier); - return added ? map.computeIfAbsent(supplier, s -> new Barrier()) : null; - } finally { - lock.unlock(); - } + boolean added = queue.offer(supplier); + return added ? map.computeIfAbsent(supplier, s -> new Barrier()) : null; } } diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/TimeoutImpl.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/TimeoutImpl.java index dfeb1093c11..5a233e2dff0 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/TimeoutImpl.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/TimeoutImpl.java @@ -17,8 +17,9 @@ package io.helidon.nima.faulttolerance; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; @@ -37,6 +38,12 @@ class TimeoutImpl implements Timeout { private final boolean currentThread; private final String name; + private static final ExecutorService VIRTUAL_EXECUTOR = + Executors.newThreadPerTaskExecutor(Thread.ofVirtual() + .allowSetThreadLocals(false) + .inheritInheritableThreadLocals(false) + .factory()); + TimeoutImpl(Builder builder) { this.timeoutMillis = builder.timeout().toMillis(); this.executor = builder.executor(); @@ -65,7 +72,12 @@ public T invoke(Supplier supplier) { AtomicBoolean callReturned = new AtomicBoolean(false); AtomicBoolean interrupted = new AtomicBoolean(false); - ScheduledFuture timeoutFuture = executor.get().schedule(() -> { + VIRTUAL_EXECUTOR.submit(() -> { + try { + Thread.sleep(timeoutMillis); + } catch (InterruptedException e) { + // log event + } interruptLock.lock(); try { if (callReturned.compareAndSet(false, true)) { @@ -75,8 +87,7 @@ public T invoke(Supplier supplier) { } finally { interruptLock.unlock(); } - - }, timeoutMillis, TimeUnit.MILLISECONDS); + }); try { T result = supplier.get(); @@ -90,7 +101,6 @@ public T invoke(Supplier supplier) { interruptLock.lock(); try { callReturned.set(true); - timeoutFuture.cancel(false); // Run invocation in current thread // Clear interrupted flag here -- required for uninterruptible busy loops if (Thread.interrupted()) { From 3caffd23af5327e61a5fa6e987bb02624b51b883 Mon Sep 17 00:00:00 2001 From: Santiago Pericasgeertsen Date: Fri, 4 Nov 2022 09:23:02 -0400 Subject: [PATCH 3/7] Replace old executors in FaultTolerance class to single one that use VTs. Simulate scheduled tasks with delayed callables/runnables. Remove ability to configure thread pool sizes etc. since it does not apply to VTs anymore. --- .../FaultToleranceExtension.java | 51 -------- .../faulttolerance/ThreadPoolConfigTest.java | 47 ------- .../nima/faulttolerance/BulkheadImpl.java | 8 +- .../nima/faulttolerance/CircuitBreaker.java | 14 +- .../faulttolerance/CircuitBreakerImpl.java | 28 ++-- .../nima/faulttolerance/FaultTolerance.java | 122 +++++++----------- .../helidon/nima/faulttolerance/Timeout.java | 8 +- .../nima/faulttolerance/TimeoutImpl.java | 20 +-- .../nima/faulttolerance/AsyncTest.java | 4 +- .../faulttolerance/CircuitBreakerTest.java | 4 +- 10 files changed, 86 insertions(+), 220 deletions(-) delete mode 100644 microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/ThreadPoolConfigTest.java diff --git a/microprofile/fault-tolerance/src/main/java/io/helidon/microprofile/faulttolerance/FaultToleranceExtension.java b/microprofile/fault-tolerance/src/main/java/io/helidon/microprofile/faulttolerance/FaultToleranceExtension.java index efcf0c3dc94..8e569a50a7d 100644 --- a/microprofile/fault-tolerance/src/main/java/io/helidon/microprofile/faulttolerance/FaultToleranceExtension.java +++ b/microprofile/fault-tolerance/src/main/java/io/helidon/microprofile/faulttolerance/FaultToleranceExtension.java @@ -25,16 +25,10 @@ import java.util.concurrent.CopyOnWriteArraySet; import java.util.stream.Collectors; -import io.helidon.common.configurable.ScheduledThreadPoolSupplier; -import io.helidon.common.configurable.ThreadPoolSupplier; -import io.helidon.config.mp.MpConfig; -import io.helidon.nima.faulttolerance.FaultTolerance; - import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.Initialized; import jakarta.enterprise.event.Observes; -import jakarta.enterprise.inject.spi.AfterDeploymentValidation; import jakarta.enterprise.inject.spi.AnnotatedConstructor; import jakarta.enterprise.inject.spi.AnnotatedField; import jakarta.enterprise.inject.spi.AnnotatedMethod; @@ -73,10 +67,6 @@ public class FaultToleranceExtension implements Extension { private Set> registeredMethods; - private ThreadPoolSupplier threadPoolSupplier; - - private ScheduledThreadPoolSupplier scheduledThreadPoolSupplier; - /** * Class to mimic a {@link Priority} annotation for the purpose of changing * its value dynamically. @@ -244,29 +234,6 @@ void validateAnnotations(BeanManager bm, } } - /** - * Creates the executors used by FT using config. Must be created during the - * {@code AfterDeploymentValidation} event. - * - * @param event the AfterDeploymentValidation event - */ - void createFaultToleranceExecutors(@Observes AfterDeploymentValidation event) { - // Initialize executors for MP FT - default size of 20 - io.helidon.config.Config config = MpConfig.toHelidonConfig(ConfigProvider.getConfig()); - scheduledThreadPoolSupplier = ScheduledThreadPoolSupplier.builder() - .threadNamePrefix("ft-mp-schedule-") - .corePoolSize(20) - .config(config.get("scheduled-executor")) - .build(); - FaultTolerance.scheduledExecutor(scheduledThreadPoolSupplier); - threadPoolSupplier = ThreadPoolSupplier.builder() - .threadNamePrefix("ft-mp-") - .corePoolSize(20) - .config(config.get("executor")) - .build(); - FaultTolerance.executor(threadPoolSupplier); - } - /** * Lazy initialization of set. * @@ -310,24 +277,6 @@ static boolean isFaultToleranceMethod(AnnotatedMethod annotatedMethod, || MethodAntn.isAnnotationPresent(annotatedMethod, Fallback.class, bm); } - /** - * Access {@code ThreadPoolSupplier} configured by this extension. - * - * @return a thread pool supplier. - */ - public ThreadPoolSupplier threadPoolSupplier() { - return threadPoolSupplier; - } - - /** - * Access {@code ScheduledThreadPoolSupplier} configured by this extension. - * - * @return a scheduled thread pool supplier. - */ - public ScheduledThreadPoolSupplier scheduledThreadPoolSupplier() { - return scheduledThreadPoolSupplier; - } - /** * Wraps an annotated type for the purpose of adding and/or overriding * some annotations. diff --git a/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/ThreadPoolConfigTest.java b/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/ThreadPoolConfigTest.java deleted file mode 100644 index 0514b435f24..00000000000 --- a/microprofile/fault-tolerance/src/test/java/io/helidon/microprofile/faulttolerance/ThreadPoolConfigTest.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2020 Oracle and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.helidon.microprofile.faulttolerance; - -import jakarta.inject.Inject; -import org.junit.jupiter.api.Test; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; - -/** - * Tests to verify that the default thread pool sizes can be set via config. - * Default size for the pools in MPT FT is 16, but the application.yaml file - * in this test directory sets it to 8. - * - * See {@code test/resources/application.yaml}. - */ -class ThreadPoolConfigTest extends FaultToleranceTest { - - @Inject - private FaultToleranceExtension extension; - - @Test - void testThreadPoolDefaultSize() { - assertThat(extension.threadPoolSupplier().corePoolSize(), is(8)); - - } - - @Test - void testScheduledThreadPool() { - assertThat(extension.scheduledThreadPoolSupplier().corePoolSize(), is(8)); - } -} diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java index b24621b6ff0..8377f429756 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java @@ -218,18 +218,18 @@ public int capacity() { } @Override - public void enqueueAndWaitOn(Supplier supplier) throws InterruptedException { - throw new IllegalStateException("Queue capacity is 0"); + public void enqueueAndWaitOn(Supplier supplier) { + throw new BulkheadException("Queue capacity is 0"); } @Override public void dequeueAndRetract() { - throw new IllegalStateException("Queue capacity is 0"); + throw new BulkheadException("Queue capacity is 0"); } @Override public boolean remove(Supplier supplier) { - throw new IllegalStateException("Queue capacity is 0"); + throw new BulkheadException("Queue capacity is 0"); } } diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/CircuitBreaker.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/CircuitBreaker.java index 59771020e8d..be0a2b561b1 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/CircuitBreaker.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/CircuitBreaker.java @@ -20,7 +20,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; import io.helidon.common.LazyValue; @@ -107,7 +107,7 @@ class Builder implements io.helidon.common.Builder { private int successThreshold = 1; // rolling window size to private int volume = 10; - private LazyValue executor = FaultTolerance.scheduledExecutor(); + private LazyValue executor = FaultTolerance.executor(); private String name = "CircuitBreaker-" + System.identityHashCode(this); private Builder() { @@ -227,13 +227,13 @@ public Builder addSkipOn(Class clazz) { /** * Executor service to schedule future tasks. * By default uses an executor configured on - * {@link io.helidon.nima.faulttolerance.FaultTolerance#scheduledExecutor(java.util.function.Supplier)}. + * {@link io.helidon.nima.faulttolerance.FaultTolerance#executor(java.util.function.Supplier)}. * - * @param scheduledExecutor executor to use + * @param executor executor to use * @return updated builder instance */ - public Builder executor(ScheduledExecutorService scheduledExecutor) { - this.executor = LazyValue.create(scheduledExecutor); + public Builder executor(ExecutorService executor) { + this.executor = LazyValue.create(executor); return this; } @@ -248,7 +248,7 @@ public Builder name(String name) { return this; } - LazyValue executor() { + LazyValue executor() { return executor; } diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/CircuitBreakerImpl.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/CircuitBreakerImpl.java index 9d1b34b909d..39dfb5eb2d2 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/CircuitBreakerImpl.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/CircuitBreakerImpl.java @@ -16,9 +16,8 @@ package io.helidon.nima.faulttolerance; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -26,6 +25,7 @@ import io.helidon.common.LazyValue; +import static io.helidon.nima.faulttolerance.FaultTolerance.toDelayedCallable; import static io.helidon.nima.faulttolerance.SupplierHelper.toRuntimeException; import static io.helidon.nima.faulttolerance.SupplierHelper.unwrapThrowable; @@ -33,7 +33,7 @@ class CircuitBreakerImpl implements CircuitBreaker { /* Configuration options */ - private final LazyValue executor; + private final LazyValue executor; // how long to transition from open to half-open private final long delayMillis; // how many successful calls will close a half-open breaker @@ -48,7 +48,7 @@ class CircuitBreakerImpl implements CircuitBreaker { // to close from half-open private final AtomicInteger successCounter = new AtomicInteger(); private final AtomicBoolean halfOpenInProgress = new AtomicBoolean(); - private final AtomicReference> schedule = new AtomicReference<>(); + private final AtomicReference> schedule = new AtomicReference<>(); private final ErrorChecker errorChecker; private final String name; @@ -89,7 +89,7 @@ public void state(State newState) { return; } - ScheduledFuture future = schedule.getAndSet(null); + Future future = schedule.getAndSet(null); if (future != null) { future.cancel(false); } @@ -97,7 +97,7 @@ public void state(State newState) { state.set(State.CLOSED); } else if (newState == State.OPEN) { state.set(State.OPEN); - ScheduledFuture future = schedule.getAndSet(null); + Future future = schedule.getAndSet(null); if (future != null) { future.cancel(false); } @@ -170,15 +170,15 @@ private U halfOpenTask(Supplier supplier) { } private void scheduleHalf() { - schedule.set(executor.get() - .schedule(() -> { - state.compareAndSet(State.OPEN, State.HALF_OPEN); - schedule.set(null); - return true; - }, delayMillis, TimeUnit.MILLISECONDS)); + schedule.set(executor.get().submit( + toDelayedCallable(() -> { + state.compareAndSet(State.OPEN, State.HALF_OPEN); + schedule.set(null); + return true; + }, delayMillis))); } - ScheduledFuture schedule() { + Future schedule() { return schedule.get(); } diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/FaultTolerance.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/FaultTolerance.java index 2883626267c..a1a882b7e9b 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/FaultTolerance.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/FaultTolerance.java @@ -18,22 +18,19 @@ import java.util.LinkedList; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import io.helidon.common.LazyValue; -import io.helidon.common.configurable.ScheduledThreadPoolSupplier; -import io.helidon.common.configurable.ThreadPoolSupplier; import io.helidon.config.Config; +import static java.lang.System.Logger.Level.ERROR; + /** - * System wide fault tolerance configuration and access to a customized sequence of fault tolerance handlers. + * System-wide fault tolerance configuration and access to a customized sequence of fault tolerance handlers. *

* Fault tolerance provides the following features: *

    @@ -47,27 +44,17 @@ *
* * @see #config(io.helidon.config.Config) - * @see #scheduledExecutor(java.util.function.Supplier) * @see #executor() * @see #builder() */ public final class FaultTolerance { - private static final AtomicReference> SCHEDULED_EXECUTOR = - new AtomicReference<>(); + private static final System.Logger LOGGER = System.getLogger(FaultTolerance.class.getName()); + private static final AtomicReference> EXECUTOR = new AtomicReference<>(); private static final AtomicReference CONFIG = new AtomicReference<>(Config.empty()); static { - SCHEDULED_EXECUTOR.set(LazyValue.create(ScheduledThreadPoolSupplier.builder() - .threadNamePrefix("ft-schedule-") - .corePoolSize(2) - .config(CONFIG.get().get("scheduled-executor")) - .build())); - - EXECUTOR.set(LazyValue.create(ThreadPoolSupplier.builder() - .threadNamePrefix("ft-") - .config(CONFIG.get().get("executor")) - .build())); + EXECUTOR.set(LazyValue.create(Executors.newVirtualThreadPerTaskExecutor())); } private FaultTolerance() { @@ -80,9 +67,6 @@ private FaultTolerance() { */ public static void config(Config config) { CONFIG.set(config); - - SCHEDULED_EXECUTOR.set(LazyValue.create(ScheduledThreadPoolSupplier.create(CONFIG.get().get("scheduled-executor")))); - EXECUTOR.set(LazyValue.create(ThreadPoolSupplier.create(CONFIG.get().get("executor"), "ft-se-thread-pool"))); } /** @@ -94,15 +78,6 @@ public static void executor(Supplier executor) { EXECUTOR.set(LazyValue.create(executor::get)); } - /** - * Configure Helidon wide scheduled executor service for Fault Tolerance. - * - * @param executor scheduled executor service to use, such as for {@link io.helidon.nima.faulttolerance.Retry} scheduling - */ - public static void scheduledExecutor(Supplier executor) { - SCHEDULED_EXECUTOR.set(LazyValue.create(executor)); - } - /** * A builder to configure a customized sequence of fault tolerance handlers. * @@ -122,52 +97,53 @@ public static TypedBuilder typedBuilder() { return new TypedBuilder<>(); } - static LazyValue executor() { - return EXECUTOR.get(); - } - - static LazyValue scheduledExecutor() { - return SCHEDULED_EXECUTOR.get(); - } - - static Config config() { - return CONFIG.get(); - } - - static Throwable cause(Throwable throwable) { - if (throwable instanceof CompletionException) { - return cause(throwable.getCause()); - } - if (throwable instanceof ExecutionException) { - return cause(throwable.getCause()); - } - return throwable; + /** + * Converts a {@code Runnable} into another that sleeps for {@code millis} before + * executing. Simulates a scheduled executor when using VTs. + * + * @param runnable the runnable + * @param millis the time to sleep + * @return the new runnable + */ + public static Runnable toDelayedRunnable(Runnable runnable, long millis) { + return () -> { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + // should never be interrupted + LOGGER.log(ERROR, "Delayed runnable was unexpectedly interrupted"); + } + runnable.run(); + }; } /** - * Establish a dependency between a source (stage) and a dependent (future). The - * dependent shall complete (normally or exceptionally) based on the source stage. - * The source stage shall be cancelled if the dependent is cancelled. The {@code - * mayInterruptIfRunning} flag is always set to {@code true} during cancellation. + * Converts a {@code Callable} into another that sleeps for {@code millis} before + * executing. Simulates a scheduled executor when using VTs. * - * @param source the source stage - * @param dependent the dependent future - * @param type of result + * @param callable the callable + * @param millis the time to sleep + * @return the new callable + * @param type of value returned */ - static CompletableFuture createDependency(CompletionStage source, CompletableFuture dependent) { - source.whenComplete((o, t) -> { - if (t != null) { - dependent.completeExceptionally(t); - } else { - dependent.complete(o); - } - }); - dependent.whenComplete((o, t) -> { - if (dependent.isCancelled()) { - source.toCompletableFuture().cancel(true); + public static Callable toDelayedCallable(Callable callable, long millis) { + return () -> { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + // should never be interrupted + LOGGER.log(ERROR, "Delayed callable was unexpectedly interrupted"); } - }); - return dependent; + return callable.call(); + }; + } + + static LazyValue executor() { + return EXECUTOR.get(); + } + + static Config config() { + return CONFIG.get(); } abstract static class BaseBuilder> { diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Timeout.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Timeout.java index ce5ff5b27c7..a3d196e1fc3 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Timeout.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Timeout.java @@ -17,7 +17,7 @@ package io.helidon.nima.faulttolerance; import java.time.Duration; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; import io.helidon.common.LazyValue; @@ -49,7 +49,7 @@ static Timeout create(Duration timeout) { */ class Builder implements io.helidon.common.Builder { private Duration timeout = Duration.ofSeconds(10); - private LazyValue executor = FaultTolerance.scheduledExecutor(); + private LazyValue executor = FaultTolerance.executor(); private boolean currentThread = false; private String name = "Timeout-" + System.identityHashCode(this); @@ -90,7 +90,7 @@ public Builder currentThread(boolean currentThread) { * @param executor scheduled executor service to use * @return updated builder instance */ - public Builder executor(ScheduledExecutorService executor) { + public Builder executor(ExecutorService executor) { this.executor = LazyValue.create(executor); return this; } @@ -110,7 +110,7 @@ Duration timeout() { return timeout; } - LazyValue executor() { + LazyValue executor() { return executor; } diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/TimeoutImpl.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/TimeoutImpl.java index 5a233e2dff0..f77d85c209b 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/TimeoutImpl.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/TimeoutImpl.java @@ -18,8 +18,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; @@ -27,6 +25,7 @@ import io.helidon.common.LazyValue; +import static io.helidon.nima.faulttolerance.FaultTolerance.toDelayedRunnable; import static io.helidon.nima.faulttolerance.SupplierHelper.toRuntimeException; import static io.helidon.nima.faulttolerance.SupplierHelper.unwrapThrowable; @@ -34,16 +33,10 @@ class TimeoutImpl implements Timeout { private static final System.Logger LOGGER = System.getLogger(TimeoutImpl.class.getName()); private final long timeoutMillis; - private final LazyValue executor; + private final LazyValue executor; private final boolean currentThread; private final String name; - private static final ExecutorService VIRTUAL_EXECUTOR = - Executors.newThreadPerTaskExecutor(Thread.ofVirtual() - .allowSetThreadLocals(false) - .inheritInheritableThreadLocals(false) - .factory()); - TimeoutImpl(Builder builder) { this.timeoutMillis = builder.timeout().toMillis(); this.executor = builder.executor(); @@ -72,12 +65,7 @@ public T invoke(Supplier supplier) { AtomicBoolean callReturned = new AtomicBoolean(false); AtomicBoolean interrupted = new AtomicBoolean(false); - VIRTUAL_EXECUTOR.submit(() -> { - try { - Thread.sleep(timeoutMillis); - } catch (InterruptedException e) { - // log event - } + executor.get().submit(toDelayedRunnable(() -> { interruptLock.lock(); try { if (callReturned.compareAndSet(false, true)) { @@ -87,7 +75,7 @@ public T invoke(Supplier supplier) { } finally { interruptLock.unlock(); } - }); + }, timeoutMillis)); try { T result = supplier.get(); diff --git a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/AsyncTest.java b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/AsyncTest.java index 29fc77fb818..7f88820ab6a 100644 --- a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/AsyncTest.java +++ b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/AsyncTest.java @@ -43,10 +43,10 @@ void testDefaultExecutorBuilder(){ @Test void testCustomExecutorBuilder() { Async async = Async.builder() - .executor(FaultTolerance.executor()) // platform thread executor + .executor(FaultTolerance.executor()) .build(); Thread thread = testAsync(async); - assertThat(thread.isVirtual(), is(false)); + assertThat(thread.isVirtual(), is(true)); } private Thread testAsync(Async async) { diff --git a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/CircuitBreakerTest.java b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/CircuitBreakerTest.java index ae160af2caf..eb06d203255 100644 --- a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/CircuitBreakerTest.java +++ b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/CircuitBreakerTest.java @@ -18,7 +18,7 @@ import java.time.Duration; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -57,7 +57,7 @@ void testCircuitBreaker() throws InterruptedException, ExecutionException, Timeo assertThat(breaker.state(), is(CircuitBreaker.State.OPEN)); // need to wait until half open - ScheduledFuture schedule = ((CircuitBreakerImpl) breaker).schedule(); + Future schedule = ((CircuitBreakerImpl) breaker).schedule(); schedule.get(WAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); assertThat(breaker.state(), is(CircuitBreaker.State.HALF_OPEN)); From be26f5e48d5d97a3c2f395691ba0e8f37667a024 Mon Sep 17 00:00:00 2001 From: Santiago Pericasgeertsen Date: Fri, 4 Nov 2022 09:50:11 -0400 Subject: [PATCH 4/7] Updated Async to use same executor from FaultTolerance class. Signed-off-by: Santiago Pericasgeertsen --- .../java/io/helidon/nima/faulttolerance/Async.java | 2 +- .../io/helidon/nima/faulttolerance/AsyncImpl.java | 12 ++++-------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Async.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Async.java index c60378b8d55..17bc078cd88 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Async.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/Async.java @@ -72,7 +72,7 @@ static Builder builder() { * Fluent API Builder for {@link Async}. */ class Builder implements io.helidon.common.Builder { - private LazyValue executor; + private LazyValue executor = FaultTolerance.executor(); private Builder() { } diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/AsyncImpl.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/AsyncImpl.java index 5314686e3b6..4664e80651e 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/AsyncImpl.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/AsyncImpl.java @@ -19,7 +19,6 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -29,20 +28,17 @@ import static io.helidon.nima.faulttolerance.SupplierHelper.unwrapThrowable; /** - * Implementation of {@code Async}. If no executor specified in builder, then it will - * use {@link Executors#newVirtualThreadPerTaskExecutor}. Note that this default executor - * is not configurable using Helidon's config. + * Implementation of {@code Async}. Default executor accessed from {@link FaultTolerance#executor()}. */ class AsyncImpl implements Async { private final LazyValue executor; AsyncImpl() { - this.executor = LazyValue.create(Executors.newVirtualThreadPerTaskExecutor()); + this(Async.builder()); } AsyncImpl(Builder builder) { - this.executor = builder.executor() != null ? builder.executor() - : LazyValue.create(Executors.newVirtualThreadPerTaskExecutor()); + this.executor = builder.executor(); } @Override @@ -74,7 +70,7 @@ public boolean cancel(boolean mayInterruptIfRunning) { } /** - * Default {@code Async} instance that uses {@link Executors#newVirtualThreadPerTaskExecutor}. + * Default {@code Async} instance. */ static final class DefaultAsyncInstance { private static final Async INSTANCE = new AsyncImpl(); From 88eb6ce47dca3072ce814f55004de68aec787983 Mon Sep 17 00:00:00 2001 From: Santiago Pericasgeertsen Date: Fri, 4 Nov 2022 09:51:03 -0400 Subject: [PATCH 5/7] Increased timeout value. Signed-off-by: Santiago Pericasgeertsen --- .../test/java/io/helidon/nima/faulttolerance/BulkheadTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/BulkheadTest.java b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/BulkheadTest.java index f3cb93260f0..ec3c7595f66 100644 --- a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/BulkheadTest.java +++ b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/BulkheadTest.java @@ -40,7 +40,7 @@ class BulkheadTest { private static final System.Logger LOGGER = System.getLogger(BulkheadTest.class.getName()); - private static final long WAIT_TIMEOUT_MILLIS = 4000; + private static final long WAIT_TIMEOUT_MILLIS = 10000; private final CountDownLatch enqueuedSubmitted = new CountDownLatch(1); From cffdafc506c519821955d4c5c590864ceaec2f31 Mon Sep 17 00:00:00 2001 From: Santiago Pericasgeertsen Date: Fri, 4 Nov 2022 12:51:20 -0400 Subject: [PATCH 6/7] Always use BulkheadException. Signed-off-by: Santiago Pericasgeertsen --- .../java/io/helidon/nima/faulttolerance/BulkheadImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java index 8377f429756..a5ce452f714 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java @@ -277,7 +277,7 @@ public void enqueueAndWaitOn(Supplier supplier) throws ExecutionException, In if (barrier != null) { barrier.waitOn(); } else { - throw new IllegalStateException("Queue is full"); + throw new BulkheadException("Queue is full"); } } @@ -289,7 +289,7 @@ public void dequeueAndRetract() { if (barrier != null) { barrier.retract(); } else { - throw new IllegalStateException("Queue is empty"); + throw new BulkheadException("Queue is empty"); } } finally { lock.unlock(); From c7d5fd43ddc7cf4da931852965f067669e343bcf Mon Sep 17 00:00:00 2001 From: Santiago Pericasgeertsen Date: Fri, 4 Nov 2022 17:53:07 -0400 Subject: [PATCH 7/7] Fixed race condition in test. Some tweaks to BulkheadImpl. --- .../nima/faulttolerance/BulkheadImpl.java | 57 +++++++++++-------- .../nima/faulttolerance/BulkheadTest.java | 16 +++--- 2 files changed, 40 insertions(+), 33 deletions(-) diff --git a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java index a5ce452f714..0508daa3330 100644 --- a/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java +++ b/nima/fault-tolerance/src/main/java/io/helidon/nima/faulttolerance/BulkheadImpl.java @@ -70,7 +70,7 @@ public T invoke(Supplier supplier) { return execute(supplier); } - if (queue.size() == queue.capacity()) { + if (queue.isFull()) { callsRejected.incrementAndGet(); throw new BulkheadException("Bulkhead queue \"" + name + "\" is full"); } @@ -143,10 +143,9 @@ private T execute(Supplier supplier) { throw toRuntimeException(throwable); } finally { concurrentExecutions.decrementAndGet(); - if (queue.size() > 0) { - queue.dequeueAndRetract(); - } else { - inProgress.release(); + boolean dequeued = queue.dequeueAndRetract(); + if (!dequeued) { + inProgress.release(); // nothing dequeued, one more permit } } } @@ -173,25 +172,28 @@ private interface BarrierQueue { int size(); /** - * Maximum number of suppliers in queue. + * Check if queue is full. * - * @return max number of suppliers + * @return outcome of test */ - int capacity(); + boolean isFull(); /** * Enqueue supplier and block thread on barrier. * * @param supplier the supplier + * @return {@code true} if supplier was enqueued or {@code false} otherwise * @throws ExecutionException if exception encountered while blocked * @throws InterruptedException if blocking is interrupted */ - void enqueueAndWaitOn(Supplier supplier) throws ExecutionException, InterruptedException; + boolean enqueueAndWaitOn(Supplier supplier) throws ExecutionException, InterruptedException; /** * Dequeue supplier and retract its barrier. + * + * @return {@code true} if a supplier was dequeued or {@code false} otherwise */ - void dequeueAndRetract(); + boolean dequeueAndRetract(); /** * Remove supplier from queue, if present. @@ -213,23 +215,23 @@ public int size() { } @Override - public int capacity() { - return 0; + public boolean isFull() { + return true; } @Override - public void enqueueAndWaitOn(Supplier supplier) { - throw new BulkheadException("Queue capacity is 0"); + public boolean enqueueAndWaitOn(Supplier supplier) { + return false; } @Override - public void dequeueAndRetract() { - throw new BulkheadException("Queue capacity is 0"); + public boolean dequeueAndRetract() { + return false; } @Override public boolean remove(Supplier supplier) { - throw new BulkheadException("Queue capacity is 0"); + return false; } } @@ -261,12 +263,17 @@ public int size() { } @Override - public int capacity() { - return capacity; + public boolean isFull() { + lock.lock(); + try { + return queue.size() == capacity; + } finally { + lock.unlock(); + } } @Override - public void enqueueAndWaitOn(Supplier supplier) throws ExecutionException, InterruptedException { + public boolean enqueueAndWaitOn(Supplier supplier) throws ExecutionException, InterruptedException { Barrier barrier; lock.lock(); try { @@ -276,21 +283,21 @@ public void enqueueAndWaitOn(Supplier supplier) throws ExecutionException, In } if (barrier != null) { barrier.waitOn(); - } else { - throw new BulkheadException("Queue is full"); + return true; } + return false; } @Override - public void dequeueAndRetract() { + public boolean dequeueAndRetract() { lock.lock(); try { Barrier barrier = dequeue(); if (barrier != null) { barrier.retract(); - } else { - throw new BulkheadException("Queue is empty"); + return true; } + return false; } finally { lock.unlock(); } diff --git a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/BulkheadTest.java b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/BulkheadTest.java index ec3c7595f66..c2895dfedda 100644 --- a/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/BulkheadTest.java +++ b/nima/fault-tolerance/src/test/java/io/helidon/nima/faulttolerance/BulkheadTest.java @@ -97,6 +97,14 @@ public void enqueueing(Supplier supplier) { assertThat(rejected.isStarted(), is(false)); assertThat(rejected.isBlocked(), is(true)); + // Verify rejected task was indeed rejected + ExecutionException executionException = assertThrows(ExecutionException.class, + () -> rejectedResult.get(WAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)); + Throwable cause = executionException.getCause(); + assertThat(cause, notNullValue()); + assertThat(cause, instanceOf(BulkheadException.class)); + assertThat(cause.getMessage(), is("Bulkhead queue \"" + name + "\" is full")); + // Unblock inProgress task and get result to free bulkhead inProgress.unblock(); inProgressResult.get(WAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); @@ -113,14 +121,6 @@ public void enqueueing(Supplier supplier) { // Unblock enqueued task and get result enqueued.unblock(); enqueuedResult.get(WAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); - - // Verify rejected task was indeed rejected - ExecutionException executionException = assertThrows(ExecutionException.class, - () -> rejectedResult.get(WAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)); - Throwable cause = executionException.getCause(); - assertThat(cause, notNullValue()); - assertThat(cause, instanceOf(BulkheadException.class)); - assertThat(cause.getMessage(), is("Bulkhead queue \"" + name + "\" is full")); } @Test