Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support virtual threads #1272

Merged
merged 20 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,23 @@
import com.alipay.sofa.boot.actuator.health.ReadinessCheckCallbackProcessor;
import com.alipay.sofa.boot.actuator.health.ReadinessCheckListener;
import com.alipay.sofa.boot.actuator.health.ReadinessEndpoint;
import com.alipay.sofa.boot.autoconfigure.condition.OnVirtualThreadStartupAvailableCondition;
import com.alipay.sofa.boot.autoconfigure.condition.OnVirtualThreadStartupDisableCondition;
import com.alipay.sofa.boot.constant.SofaBootConstants;
import com.alipay.sofa.boot.log.SofaBootLoggerFactory;
import com.alipay.sofa.common.thread.NamedThreadFactory;
import com.alipay.sofa.common.thread.SofaThreadPoolExecutor;
import com.alipay.sofa.common.thread.virtual.SofaVirtualThreadFactory;
import org.slf4j.Logger;
import org.springframework.boot.actuate.autoconfigure.endpoint.condition.ConditionalOnAvailableEndpoint;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -57,7 +62,7 @@ public class ReadinessAutoConfiguration {
public ReadinessCheckListener readinessCheckListener(HealthCheckerProcessor healthCheckerProcessor,
HealthIndicatorProcessor healthIndicatorProcessor,
ReadinessCheckCallbackProcessor afterReadinessCheckCallbackProcessor,
ThreadPoolExecutor readinessHealthCheckExecutor,
ExecutorService readinessHealthCheckExecutor,
HealthProperties healthCheckProperties) {
ReadinessCheckListener readinessCheckListener = new ReadinessCheckListener(
healthCheckerProcessor, healthIndicatorProcessor, afterReadinessCheckCallbackProcessor);
Expand All @@ -76,7 +81,7 @@ public ReadinessCheckListener readinessCheckListener(HealthCheckerProcessor heal
@Bean
@ConditionalOnMissingBean
public HealthCheckerProcessor healthCheckerProcessor(HealthProperties healthCheckProperties,
ThreadPoolExecutor readinessHealthCheckExecutor) {
ExecutorService readinessHealthCheckExecutor) {
HealthCheckerProcessor healthCheckerProcessor = new HealthCheckerProcessor();
healthCheckerProcessor.setHealthCheckExecutor(readinessHealthCheckExecutor);
healthCheckerProcessor.setParallelCheck(healthCheckProperties.isParallelCheck());
Expand All @@ -92,7 +97,7 @@ public HealthCheckerProcessor healthCheckerProcessor(HealthProperties healthChec
@Bean
@ConditionalOnMissingBean
public HealthIndicatorProcessor healthIndicatorProcessor(HealthProperties healthCheckProperties,
ThreadPoolExecutor readinessHealthCheckExecutor) {
ExecutorService readinessHealthCheckExecutor) {
HealthIndicatorProcessor healthIndicatorProcessor = new HealthIndicatorProcessor();
healthIndicatorProcessor.setHealthCheckExecutor(readinessHealthCheckExecutor);
healthIndicatorProcessor.initExcludedIndicators(healthCheckProperties
Expand All @@ -115,7 +120,8 @@ public ReadinessCheckCallbackProcessor afterReadinessCheckCallbackProcessor() {

@Bean(name = ReadinessCheckListener.READINESS_HEALTH_CHECK_EXECUTOR_BEAN_NAME)
@ConditionalOnMissingBean(name = ReadinessCheckListener.READINESS_HEALTH_CHECK_EXECUTOR_BEAN_NAME)
public ThreadPoolExecutor readinessHealthCheckExecutor(HealthProperties properties) {
@Conditional(OnVirtualThreadStartupDisableCondition.class)
public ExecutorService readinessHealthCheckExecutor(HealthProperties properties) {
int threadPoolSize;
if (properties.isParallelCheck()) {
threadPoolSize = SofaBootConstants.CPU_CORE * 5;
Expand All @@ -129,4 +135,12 @@ public ThreadPoolExecutor readinessHealthCheckExecutor(HealthProperties properti
new ThreadPoolExecutor.CallerRunsPolicy(), "health-check",
SofaBootConstants.SOFA_BOOT_SPACE_NAME);
}

@Bean(name = ReadinessCheckListener.READINESS_HEALTH_CHECK_EXECUTOR_BEAN_NAME)
@ConditionalOnMissingBean(name = ReadinessCheckListener.READINESS_HEALTH_CHECK_EXECUTOR_BEAN_NAME)
@Conditional(OnVirtualThreadStartupAvailableCondition.class)
public ExecutorService readinessHealthCheckVirtualExecutor() {
LOGGER.info("Create health-check virtual executor service");
return SofaVirtualThreadFactory.ofExecutorService("health-check");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import com.alipay.sofa.boot.isle.ApplicationRuntimeModel;
import com.alipay.sofa.runtime.spi.component.SofaRuntimeContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledOnJre;
import org.junit.jupiter.api.condition.JRE;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.FilteredClassLoader;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -177,4 +180,26 @@ void runWhenNotHaveRuntimeConfigurationWithoutBeans() {
.withClassLoader(new FilteredClassLoader(SofaRuntimeContext.class))
.run((context) -> assertThat(context).doesNotHaveBean(ComponentHealthChecker.class));
}

@Test
public void useReadinessHealthCheckExecutor() {
this.contextRunner
.run((context) -> {
ExecutorService threadPoolExecutor = context.getBean(ReadinessCheckListener.READINESS_HEALTH_CHECK_EXECUTOR_BEAN_NAME,
ExecutorService.class);
assertThat(threadPoolExecutor).isInstanceOf(ThreadPoolExecutor.class);
});
}

@Test
@EnabledOnJre(JRE.JAVA_21)
public void useReadinessHealthCheckVirtualExecutor() {
this.contextRunner
.withPropertyValues("sofa.boot.startup.threads.virtual.enabled=true")
.run((context) -> {
ExecutorService threadPoolExecutor = context.getBean(ReadinessCheckListener.READINESS_HEALTH_CHECK_EXECUTOR_BEAN_NAME,
ExecutorService.class);
assertThat(threadPoolExecutor).isNotInstanceOf(ThreadPoolExecutor.class);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -148,7 +148,7 @@ public class ReadinessCheckListener implements ApplicationContextAware, Ordered,

private boolean throwExceptionWhenHealthCheckFailed = false;

private ThreadPoolExecutor healthCheckExecutor;
private ExecutorService executorService;

public ReadinessCheckListener(HealthCheckerProcessor healthCheckerProcessor,
HealthIndicatorProcessor healthIndicatorProcessor,
Expand Down Expand Up @@ -232,7 +232,7 @@ public void onContextRefreshedEvent(ContextRefreshedEvent event) {
if (startupReporter != null) {
startupReporter.addCommonStartupStat(stat);
}
healthCheckExecutor.shutdown();
executorService.shutdown();
}
}

Expand Down Expand Up @@ -454,8 +454,8 @@ public ReadinessState getReadinessState() {
return readinessState;
}

public void setHealthCheckExecutor(ThreadPoolExecutor healthCheckExecutor) {
this.healthCheckExecutor = healthCheckExecutor;
public void setHealthCheckExecutor(ExecutorService executorService) {
this.executorService = executorService;
}

public static class ManualReadinessCallbackResult {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.boot.autoconfigure.condition;

import org.springframework.boot.autoconfigure.condition.AllNestedConditions;
import org.springframework.boot.autoconfigure.condition.ConditionalOnJava;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.system.JavaVersion;

/**
* Condition for startup SOFABoot on virtual thread.
*
* @author huzijie
* @version OnVirtualThreadStartupAvailableCondition.java, v 0.1 2023年12月05日 4:51 PM huzijie Exp $
*/
public class OnVirtualThreadStartupAvailableCondition extends AllNestedConditions {
public OnVirtualThreadStartupAvailableCondition() {
super(ConfigurationPhase.REGISTER_BEAN);
}

@ConditionalOnJava(value = JavaVersion.TWENTY_ONE)
static class JdkVersionAvailable {

Check warning on line 36 in sofa-boot-project/sofa-boot-autoconfigure/src/main/java/com/alipay/sofa/boot/autoconfigure/condition/OnVirtualThreadStartupAvailableCondition.java

View check run for this annotation

Codecov / codecov/patch

sofa-boot-project/sofa-boot-autoconfigure/src/main/java/com/alipay/sofa/boot/autoconfigure/condition/OnVirtualThreadStartupAvailableCondition.java#L36

Added line #L36 was not covered by tests

}

@ConditionalOnProperty(value = "sofa.boot.startup.threads.virtual.enabled", havingValue = "true")
static class IslePropertyAvailable {

Check warning on line 41 in sofa-boot-project/sofa-boot-autoconfigure/src/main/java/com/alipay/sofa/boot/autoconfigure/condition/OnVirtualThreadStartupAvailableCondition.java

View check run for this annotation

Codecov / codecov/patch

sofa-boot-project/sofa-boot-autoconfigure/src/main/java/com/alipay/sofa/boot/autoconfigure/condition/OnVirtualThreadStartupAvailableCondition.java#L41

Added line #L41 was not covered by tests

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.boot.autoconfigure.condition;

import org.springframework.boot.autoconfigure.condition.AnyNestedCondition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnJava;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.system.JavaVersion;

/**
* Condition for not startup SOFABoot on virtual thread.
*
* @author huzijie
* @version StartupOnVirtualThreadDisableCondition.java, v 0.1 2023年12月05日 4:51 PM huzijie Exp $
*/
public class OnVirtualThreadStartupDisableCondition extends AnyNestedCondition {
public OnVirtualThreadStartupDisableCondition() {
super(ConfigurationPhase.REGISTER_BEAN);
}

@ConditionalOnJava(value = JavaVersion.TWENTY_ONE, range = ConditionalOnJava.Range.OLDER_THAN)
static class JdkVersionAvailable {

Check warning on line 36 in sofa-boot-project/sofa-boot-autoconfigure/src/main/java/com/alipay/sofa/boot/autoconfigure/condition/OnVirtualThreadStartupDisableCondition.java

View check run for this annotation

Codecov / codecov/patch

sofa-boot-project/sofa-boot-autoconfigure/src/main/java/com/alipay/sofa/boot/autoconfigure/condition/OnVirtualThreadStartupDisableCondition.java#L36

Added line #L36 was not covered by tests

}

@ConditionalOnProperty(value = "sofa.boot.startup.threads.virtual.enabled", havingValue = "false", matchIfMissing = true)
static class IslePropertyAvailable {

Check warning on line 41 in sofa-boot-project/sofa-boot-autoconfigure/src/main/java/com/alipay/sofa/boot/autoconfigure/condition/OnVirtualThreadStartupDisableCondition.java

View check run for this annotation

Codecov / codecov/patch

sofa-boot-project/sofa-boot-autoconfigure/src/main/java/com/alipay/sofa/boot/autoconfigure/condition/OnVirtualThreadStartupDisableCondition.java#L41

Added line #L41 was not covered by tests

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package com.alipay.sofa.boot.autoconfigure.isle;

import com.alipay.sofa.boot.autoconfigure.condition.OnVirtualThreadStartupAvailableCondition;
import com.alipay.sofa.boot.autoconfigure.condition.OnVirtualThreadStartupDisableCondition;
import com.alipay.sofa.boot.constant.SofaBootConstants;
import com.alipay.sofa.boot.context.ContextRefreshInterceptor;
import com.alipay.sofa.boot.context.processor.SofaPostProcessorShareFilter;
Expand All @@ -37,6 +39,7 @@
import com.alipay.sofa.boot.log.SofaBootLoggerFactory;
import com.alipay.sofa.common.thread.NamedThreadFactory;
import com.alipay.sofa.common.thread.SofaThreadPoolExecutor;
import com.alipay.sofa.common.thread.virtual.SofaVirtualThreadFactory;
import org.slf4j.Logger;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfiguration;
Expand All @@ -51,6 +54,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand Down Expand Up @@ -152,7 +156,8 @@ public SpringContextLoader sofaDynamicSpringContextLoader(SofaModuleProperties s
@Bean(SpringContextInstallStage.SOFA_MODULE_REFRESH_EXECUTOR_BEAN_NAME)
@ConditionalOnMissingBean(name = SpringContextInstallStage.SOFA_MODULE_REFRESH_EXECUTOR_BEAN_NAME)
@ConditionalOnProperty(value = "sofa.boot.isle.moduleStartUpParallel", havingValue = "true", matchIfMissing = true)
public Supplier<ThreadPoolExecutor> sofaModuleRefreshExecutor(SofaModuleProperties sofaModuleProperties) {
@Conditional(OnVirtualThreadStartupDisableCondition.class)
public Supplier<ExecutorService> sofaModuleRefreshExecutor(SofaModuleProperties sofaModuleProperties) {
int coreSize = (int) (SofaBootConstants.CPU_CORE * sofaModuleProperties.getParallelRefreshPoolSizeFactor());
long taskTimeout = sofaModuleProperties.getParallelRefreshTimeout();
long checkPeriod = sofaModuleProperties.getParallelRefreshCheckPeriod();
Expand All @@ -167,6 +172,17 @@ public Supplier<ThreadPoolExecutor> sofaModuleRefreshExecutor(SofaModuleProperti
TimeUnit.SECONDS);
}

@Bean(SpringContextInstallStage.SOFA_MODULE_REFRESH_EXECUTOR_BEAN_NAME)
@ConditionalOnMissingBean(name = SpringContextInstallStage.SOFA_MODULE_REFRESH_EXECUTOR_BEAN_NAME)
@ConditionalOnProperty(value = "sofa.boot.isle.moduleStartUpParallel", havingValue = "true", matchIfMissing = true)
@Conditional(OnVirtualThreadStartupAvailableCondition.class)
public Supplier<ExecutorService> sofaModuleRefreshVirtualExecutor() {
return () -> {
LOGGER.info("Create SOFA module refresh virtual executor service");
return SofaVirtualThreadFactory.ofExecutorService("sofa-module-refresh");
};
}

@Bean
@ConditionalOnMissingBean
public SofaModuleProfileChecker sofaModuleProfileChecker(SofaModuleProperties sofaModuleProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
package com.alipay.sofa.boot.autoconfigure.runtime;

import com.alipay.sofa.boot.autoconfigure.condition.ConditionalOnSwitch;
import com.alipay.sofa.boot.autoconfigure.condition.OnVirtualThreadStartupAvailableCondition;
import com.alipay.sofa.boot.autoconfigure.condition.OnVirtualThreadStartupDisableCondition;
import com.alipay.sofa.boot.constant.SofaBootConstants;
import com.alipay.sofa.boot.log.SofaBootLoggerFactory;
import com.alipay.sofa.common.thread.NamedThreadFactory;
import com.alipay.sofa.common.thread.SofaThreadPoolExecutor;
import com.alipay.sofa.common.thread.virtual.SofaVirtualThreadFactory;
import com.alipay.sofa.runtime.api.client.ReferenceClient;
import com.alipay.sofa.runtime.api.client.ServiceClient;
import com.alipay.sofa.runtime.async.AsyncInitMethodManager;
Expand Down Expand Up @@ -54,12 +57,14 @@
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.core.io.support.SpringFactoriesLoader;
import org.springframework.util.Assert;

import java.util.HashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -167,7 +172,8 @@ public static AsyncInitMethodManager asyncInitMethodManager() {

@Bean(AsyncInitMethodManager.ASYNC_INIT_METHOD_EXECUTOR_BEAN_NAME)
@ConditionalOnMissingBean(name = AsyncInitMethodManager.ASYNC_INIT_METHOD_EXECUTOR_BEAN_NAME)
public Supplier<ThreadPoolExecutor> asyncInitMethodExecutor(SofaRuntimeProperties sofaRuntimeProperties) {
@Conditional(OnVirtualThreadStartupDisableCondition.class)
public Supplier<ExecutorService> asyncInitMethodExecutor(SofaRuntimeProperties sofaRuntimeProperties) {
return ()-> {
int coreSize = sofaRuntimeProperties.getAsyncInitExecutorCoreSize();
int maxSize = sofaRuntimeProperties.getAsyncInitExecutorMaxSize();
Expand All @@ -183,6 +189,16 @@ public Supplier<ThreadPoolExecutor> asyncInitMethodExecutor(SofaRuntimePropertie
};
}

@Bean(AsyncInitMethodManager.ASYNC_INIT_METHOD_EXECUTOR_BEAN_NAME)
@ConditionalOnMissingBean(name = AsyncInitMethodManager.ASYNC_INIT_METHOD_EXECUTOR_BEAN_NAME)
@Conditional(OnVirtualThreadStartupAvailableCondition.class)
public Supplier<ExecutorService> asyncInitMethodVirtualExecutor() {
return ()-> {
LOGGER.info("create async-init-bean virtual executor service");
return SofaVirtualThreadFactory.ofExecutorService("async-init-bean");
};
}

@Bean
@ConditionalOnMissingBean
public static AsyncProxyBeanPostProcessor asyncProxyBeanPostProcessor(AsyncInitMethodManager asyncInitMethodManager) {
Expand Down
Loading
Loading