Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loom support moved to ThreadPoolSupplier #3164

Merged
merged 1 commit into from
Jul 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public final class ThreadPoolSupplier implements Supplier<ExecutorService> {
private final int growthRate;
private final ThreadPool.RejectionHandler rejectionHandler;
private final LazyValue<ExecutorService> lazyValue = LazyValue.create(() -> Contexts.wrap(getThreadPool()));
private final boolean useVirtualThreads;

private ThreadPoolSupplier(Builder builder) {
this.corePoolSize = builder.corePoolSize;
Expand All @@ -71,6 +72,7 @@ private ThreadPoolSupplier(Builder builder) {
this.growthThreshold = builder.growthThreshold;
this.growthRate = builder.growthRate;
this.rejectionHandler = builder.rejectionHandler == null ? DEFAULT_REJECTION_POLICY : builder.rejectionHandler;
this.useVirtualThreads = builder.useVirtualThreads || builder.virtualThreadsEnforced;
}

/**
Expand Down Expand Up @@ -102,7 +104,14 @@ public static ThreadPoolSupplier create() {
return builder().build();
}

ThreadPool getThreadPool() {
ExecutorService getThreadPool() {
if (useVirtualThreads) {
if (VirtualExecutorUtil.isVirtualSupported()) {
LOGGER.fine("Using unbounded virtual executor service for pool " + name);
return VirtualExecutorUtil.executorService();
}
}

ThreadPool result = ThreadPool.create(name,
corePoolSize,
maxPoolSize,
Expand Down Expand Up @@ -149,6 +158,8 @@ public static final class Builder implements io.helidon.common.Builder<ThreadPoo
private int growthRate = DEFAULT_GROWTH_RATE;
private ThreadPool.RejectionHandler rejectionHandler = DEFAULT_REJECTION_POLICY;
private String name;
private boolean useVirtualThreads;
private boolean virtualThreadsEnforced;

private Builder() {
}
Expand All @@ -162,6 +173,13 @@ public ThreadPoolSupplier build() {
rejectionHandler = DEFAULT_REJECTION_POLICY;
}

if (virtualThreadsEnforced) {
if (!VirtualExecutorUtil.isVirtualSupported()) {
throw new IllegalStateException("Virtual threads are required, yet not available on this JVM. "
+ "Please use a Loom build.");
}
}

return new ThreadPoolSupplier(this);
}

Expand Down Expand Up @@ -390,13 +408,48 @@ public Builder config(Config config) {
config.get("growth-rate").asInt().ifPresent(value -> {
warnExperimental("growth-rate");
growthRate(value);

});
config.get("virtual-threads").asBoolean().ifPresent(value -> {
warnExperimental("virtual-threads");
virtualIfAvailable(value);
});
config.get("virtual-enforced").asBoolean().ifPresent(value -> {
warnExperimental("virtual-enforced");
virtualEnforced(value);
});
return this;
}

private void warnExperimental(String key) {
LOGGER.warning(String.format("Config key \"executor-service.%s\" is EXPERIMENTAL and subject to change.", key));
}

/**
* When configured to {@code true}, virtual thread executor service must be available, otherwise the built
* executor would fail to start.
*
* @param enforceVirtualThreads whether to enforce virtual threads, defaults to {@code false}
* @return updated builder instance
* @see #virtualIfAvailable(boolean)
*/
public Builder virtualEnforced(boolean enforceVirtualThreads) {
this.virtualThreadsEnforced = enforceVirtualThreads;
return this;
}

/**
* When configured to {@code true}, an unbounded virtual executor service (project Loom) will be used
* if available.
* This is an experimental feature.
* <p>
* If enabled and available, all other configuration options of this executor service are ignored!
*
* @param useVirtualThreads whether to use virtual threads or not, defaults to {@code false}
* @return updated builder instance
*/
public Builder virtualIfAvailable(boolean useVirtualThreads) {
this.useVirtualThreads = useVirtualThreads;
return this;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.common.configurable;

import java.lang.reflect.Method;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;

import io.helidon.common.LazyValue;

/**
* A utility class to handle virtual threads (project Loom).
*/
final class VirtualExecutorUtil {
private static final Logger LOGGER = Logger.getLogger(VirtualExecutorUtil.class.getName());
private static final LazyValue<Boolean> SUPPORTED = LazyValue.create(VirtualExecutorUtil::findSupported);
private static final LazyValue<ExecutorService> EXECUTOR_SERVICE = LazyValue.create(VirtualExecutorUtil::findExecutor);

private VirtualExecutorUtil() {
}

static boolean isVirtualSupported() {
return SUPPORTED.get();
}

static ExecutorService executorService() {
ExecutorService result = EXECUTOR_SERVICE.get();
if (result == null) {
throw new IllegalStateException("Virtual executor service is not supported on this JVM");
}
return result;
}

private static boolean findSupported() {
try {
// the method is intentionally NOT CACHED in static context, to support differences between build
// and runtime environments (support for GraalVM native image)
findMethod();
return true;
} catch (final ReflectiveOperationException e) {
LOGGER.log(Level.FINEST, "Loom virtual executor service not available", e);
}

return false;
}

private static ExecutorService findExecutor() {
try {
// the method is intentionally NOT CACHED in static context, to support differences between build
// and runtime environments (support for GraalVM native image)
return (ExecutorService) findMethod().invoke(null);
} catch (final ReflectiveOperationException e) {
LOGGER.log(Level.FINEST, "Loom virtual executor service not available", e);
}

return null;
}

private static Method findMethod() throws ReflectiveOperationException {
return Executors.class.getDeclaredMethod("newVirtualThreadExecutor");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.startsWith;
Expand All @@ -50,19 +51,25 @@ class ThreadPoolSupplierTest {

@BeforeAll
static void initClass() {
defaultInstance = ThreadPoolSupplier.create().getThreadPool();
defaultInstance = ensureOurExecutor(ThreadPoolSupplier.create().getThreadPool());

builtInstance = ThreadPoolSupplier.builder()
builtInstance = ensureOurExecutor(ThreadPoolSupplier.builder()
.threadNamePrefix("thread-pool-unit-test-")
.corePoolSize(2)
.daemon(true)
.prestart(true)
.queueCapacity(10)
.build()
.getThreadPool();
.getThreadPool());

configuredInstance = ThreadPoolSupplier.create(Config.create().get("unit.thread-pool"))
.getThreadPool();
configuredInstance = ensureOurExecutor(ThreadPoolSupplier.create(Config.create().get("unit.thread-pool"))
.getThreadPool());
}

private static ThreadPoolExecutor ensureOurExecutor(ExecutorService threadPool) {
// thread pool should be our implementation, unless Loom virtual threads are used
assertThat(threadPool, instanceOf(ThreadPoolExecutor.class));
return (ThreadPoolExecutor) threadPool;
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
Expand All @@ -56,7 +55,6 @@
import io.helidon.common.configurable.ServerThreadPoolSupplier;
import io.helidon.common.http.Http;
import io.helidon.config.Config;
import io.helidon.config.DeprecatedConfig;
import io.helidon.microprofile.cdi.BuildTimeStart;
import io.helidon.microprofile.cdi.RuntimeStart;
import io.helidon.webserver.KeyPerformanceIndicatorSupport;
Expand Down Expand Up @@ -170,34 +168,12 @@ private void startServer(@Observes @Priority(PLATFORM_AFTER + 100) @Initialized(
// make sure all configuration is in place
if (null == jaxRsExecutorService) {
Config serverConfig = config.get("server");
final java.lang.reflect.Method m;
if (DeprecatedConfig.get(serverConfig, "executor-service.virtual-threads", "virtual-threads")
.asBoolean().orElse(false)) {
java.lang.reflect.Method temp = null;
try {
temp = Executors.class.getDeclaredMethod("newVirtualThreadExecutor");
} catch (final ReflectiveOperationException notLoomEarlyAccess) {
temp = null;
} finally {
m = temp;
}
} else {
m = null;
}
if (m != null) {
jaxRsExecutorService = () -> {
try {
return (ExecutorService) m.invoke(null);
} catch (final ReflectiveOperationException reflectiveOperationException) {
throw new IllegalStateException(reflectiveOperationException.getMessage(), reflectiveOperationException);
}
};
} else {
jaxRsExecutorService = ServerThreadPoolSupplier.builder()
.name("server")
.config(serverConfig.get("executor-service"))
.build();
}

// support for Loom is built into the thread pool supplier
jaxRsExecutorService = ServerThreadPoolSupplier.builder()
.name("server")
.config(serverConfig.get("executor-service"))
.build();
}

// redirect to the first page when root is accessed (if configured)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
* Copyright (c) 2020, 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,6 @@

import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;

import io.helidon.common.configurable.ThreadPoolSupplier;
Expand All @@ -37,36 +36,15 @@ class AsyncExecutorProvider implements ExecutorServiceProvider {

static ExecutorServiceProvider create(Config config) {
Config asyncExecutorServiceConfig = config.get("async-executor-service");
final java.lang.reflect.Method m;
if (asyncExecutorServiceConfig.get("virtual-threads").asBoolean().orElse(false)) {
java.lang.reflect.Method temp = null;
try {
temp = Executors.class.getDeclaredMethod("newVirtualThreadExecutor");
} catch (final ReflectiveOperationException notLoom) {
temp = null;
} finally {
m = temp;
}
} else {
m = null;
}
if (m != null) {
return new AsyncExecutorProvider(() -> {
try {
return (ExecutorService) m.invoke(null);
} catch (final ReflectiveOperationException reflectiveOperationException) {
throw new IllegalStateException(reflectiveOperationException.getMessage(), reflectiveOperationException);
}
});
} else {
return new AsyncExecutorProvider(ThreadPoolSupplier.builder()

// Loom support is moved to thread pool supplier
return new AsyncExecutorProvider(ThreadPoolSupplier.builder()
.corePoolSize(1)
.maxPoolSize(10)
.prestart(false)
.threadNamePrefix("helidon-jersey-async")
.config(asyncExecutorServiceConfig)
.build());
}
}

static ExecutorServiceProvider create(ExecutorService executor) {
Expand Down