Skip to content

Commit

Permalink
Loom support moved to ThreadPoolSupplier
Browse files Browse the repository at this point in the history
Signed-off-by: Tomas Langer <tomas.langer@oracle.com>
  • Loading branch information
tomas-langer committed Jun 30, 2021
1 parent 94e95c2 commit 18e4999
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public final class ThreadPoolSupplier implements Supplier<ExecutorService> {
private final int growthRate;
private final ThreadPool.RejectionHandler rejectionHandler;
private final LazyValue<ExecutorService> lazyValue = LazyValue.create(() -> Contexts.wrap(getThreadPool()));
private final boolean useVirtualThreads;

private ThreadPoolSupplier(Builder builder) {
this.corePoolSize = builder.corePoolSize;
Expand All @@ -71,6 +72,7 @@ private ThreadPoolSupplier(Builder builder) {
this.growthThreshold = builder.growthThreshold;
this.growthRate = builder.growthRate;
this.rejectionHandler = builder.rejectionHandler == null ? DEFAULT_REJECTION_POLICY : builder.rejectionHandler;
this.useVirtualThreads = builder.useVirtualThreads || builder.virtualThreadsEnforced;
}

/**
Expand Down Expand Up @@ -102,7 +104,14 @@ public static ThreadPoolSupplier create() {
return builder().build();
}

ThreadPool getThreadPool() {
ExecutorService getThreadPool() {
if (useVirtualThreads) {
if (VirtualExecutorUtil.isVirtualSupported()) {
LOGGER.fine("Using unbounded virtual executor service for pool " + name);
return VirtualExecutorUtil.executorService();
}
}

ThreadPool result = ThreadPool.create(name,
corePoolSize,
maxPoolSize,
Expand Down Expand Up @@ -149,6 +158,8 @@ public static final class Builder implements io.helidon.common.Builder<ThreadPoo
private int growthRate = DEFAULT_GROWTH_RATE;
private ThreadPool.RejectionHandler rejectionHandler = DEFAULT_REJECTION_POLICY;
private String name;
private boolean useVirtualThreads;
private boolean virtualThreadsEnforced;

private Builder() {
}
Expand All @@ -162,6 +173,13 @@ public ThreadPoolSupplier build() {
rejectionHandler = DEFAULT_REJECTION_POLICY;
}

if (virtualThreadsEnforced) {
if (!VirtualExecutorUtil.isVirtualSupported()) {
throw new IllegalStateException("Virtual threads are required, yet not available on this JVM. "
+ "Please use a Loom build.");
}
}

return new ThreadPoolSupplier(this);
}

Expand Down Expand Up @@ -390,13 +408,48 @@ public Builder config(Config config) {
config.get("growth-rate").asInt().ifPresent(value -> {
warnExperimental("growth-rate");
growthRate(value);

});
config.get("virtual-threads").asBoolean().ifPresent(value -> {
warnExperimental("virtual-threads");
virtualIfAvailable(value);
});
config.get("virtual-enforced").asBoolean().ifPresent(value -> {
warnExperimental("virtual-enforced");
virtualEnforced(value);
});
return this;
}

private void warnExperimental(String key) {
LOGGER.warning(String.format("Config key \"executor-service.%s\" is EXPERIMENTAL and subject to change.", key));
}

/**
* When configured to {@code true}, virtual thread executor service must be available, otherwise the built
* executor would fail to start.
*
* @param enforceVirtualThreads whether to enforce virtual threads, defaults to {@code false}
* @return updated builder instance
* @see #virtualIfAvailable(boolean)
*/
public Builder virtualEnforced(boolean enforceVirtualThreads) {
this.virtualThreadsEnforced = enforceVirtualThreads;
return this;
}

/**
* When configured to {@code true}, an unbounded virtual executor service (project Loom) will be used
* if available.
* This is an experimental feature.
* <p>
* If enabled and available, all other configuration options of this executor service are ignored!
*
* @param useVirtualThreads whether to use virtual threads or not, defaults to {@code false}
* @return updated builder instance
*/
public Builder virtualIfAvailable(boolean useVirtualThreads) {
this.useVirtualThreads = useVirtualThreads;
return this;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.common.configurable;

import java.lang.reflect.Method;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;

import io.helidon.common.LazyValue;

/**
* A utility class to handle virtual threads (project Loom).
*/
final class VirtualExecutorUtil {
private static final Logger LOGGER = Logger.getLogger(VirtualExecutorUtil.class.getName());
private static final LazyValue<Boolean> SUPPORTED = LazyValue.create(VirtualExecutorUtil::findSupported);
private static final LazyValue<ExecutorService> EXECUTOR_SERVICE = LazyValue.create(VirtualExecutorUtil::findExecutor);

private VirtualExecutorUtil() {
}

static boolean isVirtualSupported() {
return SUPPORTED.get();
}

static ExecutorService executorService() {
ExecutorService result = EXECUTOR_SERVICE.get();
if (result == null) {
throw new IllegalStateException("Virtual executor service is not supported on this JVM");
}
return result;
}

private static boolean findSupported() {
try {
// the method is intentionally NOT CACHED in static context, to support differences between build
// and runtime environments (support for GraalVM native image)
findMethod();
return true;
} catch (final ReflectiveOperationException e) {
LOGGER.log(Level.FINEST, "Loom virtual executor service not available", e);
}

return false;
}

private static ExecutorService findExecutor() {
try {
// the method is intentionally NOT CACHED in static context, to support differences between build
// and runtime environments (support for GraalVM native image)
return (ExecutorService) findMethod().invoke(null);
} catch (final ReflectiveOperationException e) {
LOGGER.log(Level.FINEST, "Loom virtual executor service not available", e);
}

return null;
}

private static Method findMethod() throws ReflectiveOperationException {
return Executors.class.getDeclaredMethod("newVirtualThreadExecutor");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.startsWith;
Expand All @@ -50,19 +51,25 @@ class ThreadPoolSupplierTest {

@BeforeAll
static void initClass() {
defaultInstance = ThreadPoolSupplier.create().getThreadPool();
defaultInstance = ensureOurExecutor(ThreadPoolSupplier.create().getThreadPool());

builtInstance = ThreadPoolSupplier.builder()
builtInstance = ensureOurExecutor(ThreadPoolSupplier.builder()
.threadNamePrefix("thread-pool-unit-test-")
.corePoolSize(2)
.daemon(true)
.prestart(true)
.queueCapacity(10)
.build()
.getThreadPool();
.getThreadPool());

configuredInstance = ThreadPoolSupplier.create(Config.create().get("unit.thread-pool"))
.getThreadPool();
configuredInstance = ensureOurExecutor(ThreadPoolSupplier.create(Config.create().get("unit.thread-pool"))
.getThreadPool());
}

private static ThreadPoolExecutor ensureOurExecutor(ExecutorService threadPool) {
// thread pool should be our implementation, unless Loom virtual threads are used
assertThat(threadPool, instanceOf(ThreadPoolExecutor.class));
return (ThreadPoolExecutor) threadPool;
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
Expand All @@ -56,7 +55,6 @@
import io.helidon.common.configurable.ServerThreadPoolSupplier;
import io.helidon.common.http.Http;
import io.helidon.config.Config;
import io.helidon.config.DeprecatedConfig;
import io.helidon.microprofile.cdi.BuildTimeStart;
import io.helidon.microprofile.cdi.RuntimeStart;
import io.helidon.webserver.KeyPerformanceIndicatorSupport;
Expand Down Expand Up @@ -170,34 +168,12 @@ private void startServer(@Observes @Priority(PLATFORM_AFTER + 100) @Initialized(
// make sure all configuration is in place
if (null == jaxRsExecutorService) {
Config serverConfig = config.get("server");
final java.lang.reflect.Method m;
if (DeprecatedConfig.get(serverConfig, "executor-service.virtual-threads", "virtual-threads")
.asBoolean().orElse(false)) {
java.lang.reflect.Method temp = null;
try {
temp = Executors.class.getDeclaredMethod("newVirtualThreadExecutor");
} catch (final ReflectiveOperationException notLoomEarlyAccess) {
temp = null;
} finally {
m = temp;
}
} else {
m = null;
}
if (m != null) {
jaxRsExecutorService = () -> {
try {
return (ExecutorService) m.invoke(null);
} catch (final ReflectiveOperationException reflectiveOperationException) {
throw new IllegalStateException(reflectiveOperationException.getMessage(), reflectiveOperationException);
}
};
} else {
jaxRsExecutorService = ServerThreadPoolSupplier.builder()
.name("server")
.config(serverConfig.get("executor-service"))
.build();
}

// support for Loom is built into the thread pool supplier
jaxRsExecutorService = ServerThreadPoolSupplier.builder()
.name("server")
.config(serverConfig.get("executor-service"))
.build();
}

// redirect to the first page when root is accessed (if configured)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
* Copyright (c) 2020, 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,6 @@

import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;

import io.helidon.common.configurable.ThreadPoolSupplier;
Expand All @@ -37,36 +36,15 @@ class AsyncExecutorProvider implements ExecutorServiceProvider {

static ExecutorServiceProvider create(Config config) {
Config asyncExecutorServiceConfig = config.get("async-executor-service");
final java.lang.reflect.Method m;
if (asyncExecutorServiceConfig.get("virtual-threads").asBoolean().orElse(false)) {
java.lang.reflect.Method temp = null;
try {
temp = Executors.class.getDeclaredMethod("newVirtualThreadExecutor");
} catch (final ReflectiveOperationException notLoom) {
temp = null;
} finally {
m = temp;
}
} else {
m = null;
}
if (m != null) {
return new AsyncExecutorProvider(() -> {
try {
return (ExecutorService) m.invoke(null);
} catch (final ReflectiveOperationException reflectiveOperationException) {
throw new IllegalStateException(reflectiveOperationException.getMessage(), reflectiveOperationException);
}
});
} else {
return new AsyncExecutorProvider(ThreadPoolSupplier.builder()

// Loom support is moved to thread pool supplier
return new AsyncExecutorProvider(ThreadPoolSupplier.builder()
.corePoolSize(1)
.maxPoolSize(10)
.prestart(false)
.threadNamePrefix("helidon-jersey-async")
.config(asyncExecutorServiceConfig)
.build());
}
}

static ExecutorServiceProvider create(ExecutorService executor) {
Expand Down

0 comments on commit 18e4999

Please sign in to comment.