Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New FT Nima executors and larger multiplier for TCK timeouts in pipeline #5317

Merged
merged 7 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,10 @@
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Collectors;

import io.helidon.common.configurable.ScheduledThreadPoolSupplier;
import io.helidon.common.configurable.ThreadPoolSupplier;
import io.helidon.config.mp.MpConfig;
import io.helidon.nima.faulttolerance.FaultTolerance;

import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.Initialized;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.spi.AfterDeploymentValidation;
import jakarta.enterprise.inject.spi.AnnotatedConstructor;
import jakarta.enterprise.inject.spi.AnnotatedField;
import jakarta.enterprise.inject.spi.AnnotatedMethod;
Expand Down Expand Up @@ -73,10 +67,6 @@ public class FaultToleranceExtension implements Extension {

private Set<AnnotatedMethod<?>> registeredMethods;

private ThreadPoolSupplier threadPoolSupplier;

private ScheduledThreadPoolSupplier scheduledThreadPoolSupplier;

/**
* Class to mimic a {@link Priority} annotation for the purpose of changing
* its value dynamically.
Expand Down Expand Up @@ -244,29 +234,6 @@ void validateAnnotations(BeanManager bm,
}
}

/**
* Creates the executors used by FT using config. Must be created during the
* {@code AfterDeploymentValidation} event.
*
* @param event the AfterDeploymentValidation event
*/
void createFaultToleranceExecutors(@Observes AfterDeploymentValidation event) {
// Initialize executors for MP FT - default size of 20
io.helidon.config.Config config = MpConfig.toHelidonConfig(ConfigProvider.getConfig());
scheduledThreadPoolSupplier = ScheduledThreadPoolSupplier.builder()
.threadNamePrefix("ft-mp-schedule-")
.corePoolSize(20)
.config(config.get("scheduled-executor"))
.build();
FaultTolerance.scheduledExecutor(scheduledThreadPoolSupplier);
threadPoolSupplier = ThreadPoolSupplier.builder()
.threadNamePrefix("ft-mp-")
.corePoolSize(20)
.config(config.get("executor"))
.build();
FaultTolerance.executor(threadPoolSupplier);
}

/**
* Lazy initialization of set.
*
Expand Down Expand Up @@ -310,24 +277,6 @@ static boolean isFaultToleranceMethod(AnnotatedMethod<?> annotatedMethod,
|| MethodAntn.isAnnotationPresent(annotatedMethod, Fallback.class, bm);
}

/**
* Access {@code ThreadPoolSupplier} configured by this extension.
*
* @return a thread pool supplier.
*/
public ThreadPoolSupplier threadPoolSupplier() {
return threadPoolSupplier;
}

/**
* Access {@code ScheduledThreadPoolSupplier} configured by this extension.
*
* @return a scheduled thread pool supplier.
*/
public ScheduledThreadPoolSupplier scheduledThreadPoolSupplier() {
return scheduledThreadPoolSupplier;
}

/**
* Wraps an annotated type for the purpose of adding and/or overriding
* some annotations.
Expand Down

This file was deleted.

1 change: 1 addition & 0 deletions microprofile/tests/tck/tck-fault-tolerance/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
<systemPropertyVariables>
<fault-tolerance.commandThreadPoolSize>32</fault-tolerance.commandThreadPoolSize>
<java.util.logging.config.file>src/test/resources/logging.properties</java.util.logging.config.file>
<org.eclipse.microprofile.fault.tolerance.tck.timeout.multiplier>2.0</org.eclipse.microprofile.fault.tolerance.tck.timeout.multiplier>
</systemPropertyVariables>
<suiteXmlFiles>
<suiteXmlFile>src/test/tck-suite.xml</suiteXmlFile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ static Builder builder() {
* Fluent API Builder for {@link Async}.
*/
class Builder implements io.helidon.common.Builder<Builder, Async> {
private LazyValue<? extends ExecutorService> executor;
private LazyValue<? extends ExecutorService> executor = FaultTolerance.executor();

private Builder() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
Expand All @@ -29,20 +28,17 @@
import static io.helidon.nima.faulttolerance.SupplierHelper.unwrapThrowable;

/**
* Implementation of {@code Async}. If no executor specified in builder, then it will
* use {@link Executors#newVirtualThreadPerTaskExecutor}. Note that this default executor
* is not configurable using Helidon's config.
* Implementation of {@code Async}. Default executor accessed from {@link FaultTolerance#executor()}.
*/
class AsyncImpl implements Async {
private final LazyValue<? extends ExecutorService> executor;

AsyncImpl() {
this.executor = LazyValue.create(Executors.newVirtualThreadPerTaskExecutor());
this(Async.builder());
}

AsyncImpl(Builder builder) {
this.executor = builder.executor() != null ? builder.executor()
: LazyValue.create(Executors.newVirtualThreadPerTaskExecutor());
this.executor = builder.executor();
}

@Override
Expand Down Expand Up @@ -74,7 +70,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {
}

/**
* Default {@code Async} instance that uses {@link Executors#newVirtualThreadPerTaskExecutor}.
* Default {@code Async} instance.
*/
static final class DefaultAsyncInstance {
private static final Async INSTANCE = new AsyncImpl();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public <T> T invoke(Supplier<? extends T> supplier) {
return execute(supplier);
}

if (queue.size() == queue.capacity()) {
if (queue.isFull()) {
callsRejected.incrementAndGet();
throw new BulkheadException("Bulkhead queue \"" + name + "\" is full");
}
Expand Down Expand Up @@ -143,10 +143,9 @@ private <T> T execute(Supplier<? extends T> supplier) {
throw toRuntimeException(throwable);
} finally {
concurrentExecutions.decrementAndGet();
if (queue.size() > 0) {
queue.dequeueAndRetract();
} else {
inProgress.release();
boolean dequeued = queue.dequeueAndRetract();
if (!dequeued) {
inProgress.release(); // nothing dequeued, one more permit
}
}
}
Expand All @@ -173,25 +172,28 @@ private interface BarrierQueue {
int size();

/**
* Maximum number of suppliers in queue.
* Check if queue is full.
*
* @return max number of suppliers
* @return outcome of test
*/
int capacity();
boolean isFull();

/**
* Enqueue supplier and block thread on barrier.
*
* @param supplier the supplier
* @return {@code true} if supplier was enqueued or {@code false} otherwise
* @throws ExecutionException if exception encountered while blocked
* @throws InterruptedException if blocking is interrupted
*/
void enqueueAndWaitOn(Supplier<?> supplier) throws ExecutionException, InterruptedException;
boolean enqueueAndWaitOn(Supplier<?> supplier) throws ExecutionException, InterruptedException;

/**
* Dequeue supplier and retract its barrier.
*
* @return {@code true} if a supplier was dequeued or {@code false} otherwise
*/
void dequeueAndRetract();
boolean dequeueAndRetract();

/**
* Remove supplier from queue, if present.
Expand All @@ -213,23 +215,23 @@ public int size() {
}

@Override
public int capacity() {
return 0;
public boolean isFull() {
return true;
}

@Override
public void enqueueAndWaitOn(Supplier<?> supplier) throws InterruptedException {
throw new IllegalStateException("Queue capacity is 0");
public boolean enqueueAndWaitOn(Supplier<?> supplier) {
return false;
}

@Override
public void dequeueAndRetract() {
throw new IllegalStateException("Queue capacity is 0");
public boolean dequeueAndRetract() {
return false;
}

@Override
public boolean remove(Supplier<?> supplier) {
throw new IllegalStateException("Queue capacity is 0");
return false;
}
}

Expand Down Expand Up @@ -261,53 +263,64 @@ public int size() {
}

@Override
public int capacity() {
return capacity;
public boolean isFull() {
lock.lock();
try {
return queue.size() == capacity;
} finally {
lock.unlock();
}
}

@Override
public void enqueueAndWaitOn(Supplier<?> supplier) throws ExecutionException, InterruptedException {
Barrier barrier = enqueue(supplier);
public boolean enqueueAndWaitOn(Supplier<?> supplier) throws ExecutionException, InterruptedException {
Barrier barrier;
lock.lock();
try {
barrier = enqueue(supplier);
} finally {
lock.unlock();
}
if (barrier != null) {
barrier.waitOn();
} else {
throw new IllegalStateException("Queue is full");
return true;
}
return false;
}

@Override
public void dequeueAndRetract() {
Barrier barrier = dequeue();
if (barrier != null) {
barrier.retract();
} else {
throw new IllegalStateException("Queue is empty");
public boolean dequeueAndRetract() {
lock.lock();
try {
Barrier barrier = dequeue();
if (barrier != null) {
barrier.retract();
return true;
}
return false;
} finally {
lock.unlock();
}
}

@Override
public boolean remove(Supplier<?> supplier) {
return queue.remove(supplier);
}

private Barrier dequeue() {
lock.lock();
try {
Supplier<?> supplier = queue.poll();
return supplier == null ? null : map.remove(supplier);
return queue.remove(supplier);
} finally {
lock.unlock();
}
}

private Barrier dequeue() {
Supplier<?> supplier = queue.poll();
return supplier == null ? null : map.remove(supplier);
}

private Barrier enqueue(Supplier<?> supplier) {
lock.lock();
try {
boolean added = queue.offer(supplier);
return added ? map.computeIfAbsent(supplier, s -> new Barrier()) : null;
} finally {
lock.unlock();
}
boolean added = queue.offer(supplier);
return added ? map.computeIfAbsent(supplier, s -> new Barrier()) : null;
}
}

Expand Down
Loading