Skip to content

Commit

Permalink
Support virtual threads (#1272)
Browse files Browse the repository at this point in the history
* update version to 4.2.0-SNAPSHOT

* add ci for jdk 21

* update springboot to 3.2.0

* update springboot 3.2.0

* update jacoco 0.8.11

* update sofa ark 3.0.1

* support virtual thread for asyncInit/moduleRefresh/readinessCheck

* support virtual thread for asyncInit/moduleRefresh/readinessCheck

* update springcloud 2023.0.0

* update asm 9.5

* rename LocalVariableTableParameterNameDiscoverer

* refactor virtual thead enable conditional

* update sprigncloud 2023.0.0

* fix ut

* fix ut

---------

Co-authored-by: 致节 <hzj266771@antgroup.com>
  • Loading branch information
HzjNeverStop and 致节 authored Dec 8, 2023
1 parent 6d44d41 commit b4e30c6
Show file tree
Hide file tree
Showing 25 changed files with 1,016 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,23 @@
import com.alipay.sofa.boot.actuator.health.ReadinessCheckCallbackProcessor;
import com.alipay.sofa.boot.actuator.health.ReadinessCheckListener;
import com.alipay.sofa.boot.actuator.health.ReadinessEndpoint;
import com.alipay.sofa.boot.autoconfigure.condition.OnVirtualThreadStartupAvailableCondition;
import com.alipay.sofa.boot.autoconfigure.condition.OnVirtualThreadStartupDisableCondition;
import com.alipay.sofa.boot.constant.SofaBootConstants;
import com.alipay.sofa.boot.log.SofaBootLoggerFactory;
import com.alipay.sofa.common.thread.NamedThreadFactory;
import com.alipay.sofa.common.thread.SofaThreadPoolExecutor;
import com.alipay.sofa.common.thread.virtual.SofaVirtualThreadFactory;
import org.slf4j.Logger;
import org.springframework.boot.actuate.autoconfigure.endpoint.condition.ConditionalOnAvailableEndpoint;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -57,7 +62,7 @@ public class ReadinessAutoConfiguration {
public ReadinessCheckListener readinessCheckListener(HealthCheckerProcessor healthCheckerProcessor,
HealthIndicatorProcessor healthIndicatorProcessor,
ReadinessCheckCallbackProcessor afterReadinessCheckCallbackProcessor,
ThreadPoolExecutor readinessHealthCheckExecutor,
ExecutorService readinessHealthCheckExecutor,
HealthProperties healthCheckProperties) {
ReadinessCheckListener readinessCheckListener = new ReadinessCheckListener(
healthCheckerProcessor, healthIndicatorProcessor, afterReadinessCheckCallbackProcessor);
Expand All @@ -76,7 +81,7 @@ public ReadinessCheckListener readinessCheckListener(HealthCheckerProcessor heal
@Bean
@ConditionalOnMissingBean
public HealthCheckerProcessor healthCheckerProcessor(HealthProperties healthCheckProperties,
ThreadPoolExecutor readinessHealthCheckExecutor) {
ExecutorService readinessHealthCheckExecutor) {
HealthCheckerProcessor healthCheckerProcessor = new HealthCheckerProcessor();
healthCheckerProcessor.setHealthCheckExecutor(readinessHealthCheckExecutor);
healthCheckerProcessor.setParallelCheck(healthCheckProperties.isParallelCheck());
Expand All @@ -92,7 +97,7 @@ public HealthCheckerProcessor healthCheckerProcessor(HealthProperties healthChec
@Bean
@ConditionalOnMissingBean
public HealthIndicatorProcessor healthIndicatorProcessor(HealthProperties healthCheckProperties,
ThreadPoolExecutor readinessHealthCheckExecutor) {
ExecutorService readinessHealthCheckExecutor) {
HealthIndicatorProcessor healthIndicatorProcessor = new HealthIndicatorProcessor();
healthIndicatorProcessor.setHealthCheckExecutor(readinessHealthCheckExecutor);
healthIndicatorProcessor.initExcludedIndicators(healthCheckProperties
Expand All @@ -115,7 +120,8 @@ public ReadinessCheckCallbackProcessor afterReadinessCheckCallbackProcessor() {

@Bean(name = ReadinessCheckListener.READINESS_HEALTH_CHECK_EXECUTOR_BEAN_NAME)
@ConditionalOnMissingBean(name = ReadinessCheckListener.READINESS_HEALTH_CHECK_EXECUTOR_BEAN_NAME)
public ThreadPoolExecutor readinessHealthCheckExecutor(HealthProperties properties) {
@Conditional(OnVirtualThreadStartupDisableCondition.class)
public ExecutorService readinessHealthCheckExecutor(HealthProperties properties) {
int threadPoolSize;
if (properties.isParallelCheck()) {
threadPoolSize = SofaBootConstants.CPU_CORE * 5;
Expand All @@ -129,4 +135,12 @@ public ThreadPoolExecutor readinessHealthCheckExecutor(HealthProperties properti
new ThreadPoolExecutor.CallerRunsPolicy(), "health-check",
SofaBootConstants.SOFA_BOOT_SPACE_NAME);
}

@Bean(name = ReadinessCheckListener.READINESS_HEALTH_CHECK_EXECUTOR_BEAN_NAME)
@ConditionalOnMissingBean(name = ReadinessCheckListener.READINESS_HEALTH_CHECK_EXECUTOR_BEAN_NAME)
@Conditional(OnVirtualThreadStartupAvailableCondition.class)
public ExecutorService readinessHealthCheckVirtualExecutor() {
LOGGER.info("Create health-check virtual executor service");
return SofaVirtualThreadFactory.ofExecutorService("health-check");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import com.alipay.sofa.boot.isle.ApplicationRuntimeModel;
import com.alipay.sofa.runtime.spi.component.SofaRuntimeContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledOnJre;
import org.junit.jupiter.api.condition.JRE;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.FilteredClassLoader;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -177,4 +180,26 @@ void runWhenNotHaveRuntimeConfigurationWithoutBeans() {
.withClassLoader(new FilteredClassLoader(SofaRuntimeContext.class))
.run((context) -> assertThat(context).doesNotHaveBean(ComponentHealthChecker.class));
}

@Test
public void useReadinessHealthCheckExecutor() {
this.contextRunner
.run((context) -> {
ExecutorService threadPoolExecutor = context.getBean(ReadinessCheckListener.READINESS_HEALTH_CHECK_EXECUTOR_BEAN_NAME,
ExecutorService.class);
assertThat(threadPoolExecutor).isInstanceOf(ThreadPoolExecutor.class);
});
}

@Test
@EnabledOnJre(JRE.JAVA_21)
public void useReadinessHealthCheckVirtualExecutor() {
this.contextRunner
.withPropertyValues("sofa.boot.startup.threads.virtual.enabled=true")
.run((context) -> {
ExecutorService threadPoolExecutor = context.getBean(ReadinessCheckListener.READINESS_HEALTH_CHECK_EXECUTOR_BEAN_NAME,
ExecutorService.class);
assertThat(threadPoolExecutor).isNotInstanceOf(ThreadPoolExecutor.class);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -148,7 +148,7 @@ public class ReadinessCheckListener implements ApplicationContextAware, Ordered,

private boolean throwExceptionWhenHealthCheckFailed = false;

private ThreadPoolExecutor healthCheckExecutor;
private ExecutorService executorService;

public ReadinessCheckListener(HealthCheckerProcessor healthCheckerProcessor,
HealthIndicatorProcessor healthIndicatorProcessor,
Expand Down Expand Up @@ -232,7 +232,7 @@ public void onContextRefreshedEvent(ContextRefreshedEvent event) {
if (startupReporter != null) {
startupReporter.addCommonStartupStat(stat);
}
healthCheckExecutor.shutdown();
executorService.shutdown();
}
}

Expand Down Expand Up @@ -454,8 +454,8 @@ public ReadinessState getReadinessState() {
return readinessState;
}

public void setHealthCheckExecutor(ThreadPoolExecutor healthCheckExecutor) {
this.healthCheckExecutor = healthCheckExecutor;
public void setHealthCheckExecutor(ExecutorService executorService) {
this.executorService = executorService;
}

public static class ManualReadinessCallbackResult {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.boot.autoconfigure.condition;

import org.springframework.boot.autoconfigure.condition.AllNestedConditions;
import org.springframework.boot.autoconfigure.condition.ConditionalOnJava;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.system.JavaVersion;

/**
* Condition for startup SOFABoot on virtual thread.
*
* @author huzijie
* @version OnVirtualThreadStartupAvailableCondition.java, v 0.1 2023年12月05日 4:51 PM huzijie Exp $
*/
public class OnVirtualThreadStartupAvailableCondition extends AllNestedConditions {
public OnVirtualThreadStartupAvailableCondition() {
super(ConfigurationPhase.REGISTER_BEAN);
}

@ConditionalOnJava(value = JavaVersion.TWENTY_ONE)
static class JdkVersionAvailable {

}

@ConditionalOnProperty(value = "sofa.boot.startup.threads.virtual.enabled", havingValue = "true")
static class IslePropertyAvailable {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.boot.autoconfigure.condition;

import org.springframework.boot.autoconfigure.condition.AnyNestedCondition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnJava;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.system.JavaVersion;

/**
* Condition for not startup SOFABoot on virtual thread.
*
* @author huzijie
* @version StartupOnVirtualThreadDisableCondition.java, v 0.1 2023年12月05日 4:51 PM huzijie Exp $
*/
public class OnVirtualThreadStartupDisableCondition extends AnyNestedCondition {
public OnVirtualThreadStartupDisableCondition() {
super(ConfigurationPhase.REGISTER_BEAN);
}

@ConditionalOnJava(value = JavaVersion.TWENTY_ONE, range = ConditionalOnJava.Range.OLDER_THAN)
static class JdkVersionAvailable {

}

@ConditionalOnProperty(value = "sofa.boot.startup.threads.virtual.enabled", havingValue = "false", matchIfMissing = true)
static class IslePropertyAvailable {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package com.alipay.sofa.boot.autoconfigure.isle;

import com.alipay.sofa.boot.autoconfigure.condition.OnVirtualThreadStartupAvailableCondition;
import com.alipay.sofa.boot.autoconfigure.condition.OnVirtualThreadStartupDisableCondition;
import com.alipay.sofa.boot.constant.SofaBootConstants;
import com.alipay.sofa.boot.context.ContextRefreshInterceptor;
import com.alipay.sofa.boot.context.processor.SofaPostProcessorShareFilter;
Expand All @@ -37,6 +39,7 @@
import com.alipay.sofa.boot.log.SofaBootLoggerFactory;
import com.alipay.sofa.common.thread.NamedThreadFactory;
import com.alipay.sofa.common.thread.SofaThreadPoolExecutor;
import com.alipay.sofa.common.thread.virtual.SofaVirtualThreadFactory;
import org.slf4j.Logger;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfiguration;
Expand All @@ -51,6 +54,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand Down Expand Up @@ -152,7 +156,8 @@ public SpringContextLoader sofaDynamicSpringContextLoader(SofaModuleProperties s
@Bean(SpringContextInstallStage.SOFA_MODULE_REFRESH_EXECUTOR_BEAN_NAME)
@ConditionalOnMissingBean(name = SpringContextInstallStage.SOFA_MODULE_REFRESH_EXECUTOR_BEAN_NAME)
@ConditionalOnProperty(value = "sofa.boot.isle.moduleStartUpParallel", havingValue = "true", matchIfMissing = true)
public Supplier<ThreadPoolExecutor> sofaModuleRefreshExecutor(SofaModuleProperties sofaModuleProperties) {
@Conditional(OnVirtualThreadStartupDisableCondition.class)
public Supplier<ExecutorService> sofaModuleRefreshExecutor(SofaModuleProperties sofaModuleProperties) {
int coreSize = (int) (SofaBootConstants.CPU_CORE * sofaModuleProperties.getParallelRefreshPoolSizeFactor());
long taskTimeout = sofaModuleProperties.getParallelRefreshTimeout();
long checkPeriod = sofaModuleProperties.getParallelRefreshCheckPeriod();
Expand All @@ -167,6 +172,17 @@ public Supplier<ThreadPoolExecutor> sofaModuleRefreshExecutor(SofaModuleProperti
TimeUnit.SECONDS);
}

@Bean(SpringContextInstallStage.SOFA_MODULE_REFRESH_EXECUTOR_BEAN_NAME)
@ConditionalOnMissingBean(name = SpringContextInstallStage.SOFA_MODULE_REFRESH_EXECUTOR_BEAN_NAME)
@ConditionalOnProperty(value = "sofa.boot.isle.moduleStartUpParallel", havingValue = "true", matchIfMissing = true)
@Conditional(OnVirtualThreadStartupAvailableCondition.class)
public Supplier<ExecutorService> sofaModuleRefreshVirtualExecutor() {
return () -> {
LOGGER.info("Create SOFA module refresh virtual executor service");
return SofaVirtualThreadFactory.ofExecutorService("sofa-module-refresh");
};
}

@Bean
@ConditionalOnMissingBean
public SofaModuleProfileChecker sofaModuleProfileChecker(SofaModuleProperties sofaModuleProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
package com.alipay.sofa.boot.autoconfigure.runtime;

import com.alipay.sofa.boot.autoconfigure.condition.ConditionalOnSwitch;
import com.alipay.sofa.boot.autoconfigure.condition.OnVirtualThreadStartupAvailableCondition;
import com.alipay.sofa.boot.autoconfigure.condition.OnVirtualThreadStartupDisableCondition;
import com.alipay.sofa.boot.constant.SofaBootConstants;
import com.alipay.sofa.boot.log.SofaBootLoggerFactory;
import com.alipay.sofa.common.thread.NamedThreadFactory;
import com.alipay.sofa.common.thread.SofaThreadPoolExecutor;
import com.alipay.sofa.common.thread.virtual.SofaVirtualThreadFactory;
import com.alipay.sofa.runtime.api.client.ReferenceClient;
import com.alipay.sofa.runtime.api.client.ServiceClient;
import com.alipay.sofa.runtime.async.AsyncInitMethodManager;
Expand Down Expand Up @@ -54,12 +57,14 @@
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.core.io.support.SpringFactoriesLoader;
import org.springframework.util.Assert;

import java.util.HashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -167,7 +172,8 @@ public static AsyncInitMethodManager asyncInitMethodManager() {

@Bean(AsyncInitMethodManager.ASYNC_INIT_METHOD_EXECUTOR_BEAN_NAME)
@ConditionalOnMissingBean(name = AsyncInitMethodManager.ASYNC_INIT_METHOD_EXECUTOR_BEAN_NAME)
public Supplier<ThreadPoolExecutor> asyncInitMethodExecutor(SofaRuntimeProperties sofaRuntimeProperties) {
@Conditional(OnVirtualThreadStartupDisableCondition.class)
public Supplier<ExecutorService> asyncInitMethodExecutor(SofaRuntimeProperties sofaRuntimeProperties) {
return ()-> {
int coreSize = sofaRuntimeProperties.getAsyncInitExecutorCoreSize();
int maxSize = sofaRuntimeProperties.getAsyncInitExecutorMaxSize();
Expand All @@ -183,6 +189,16 @@ public Supplier<ThreadPoolExecutor> asyncInitMethodExecutor(SofaRuntimePropertie
};
}

@Bean(AsyncInitMethodManager.ASYNC_INIT_METHOD_EXECUTOR_BEAN_NAME)
@ConditionalOnMissingBean(name = AsyncInitMethodManager.ASYNC_INIT_METHOD_EXECUTOR_BEAN_NAME)
@Conditional(OnVirtualThreadStartupAvailableCondition.class)
public Supplier<ExecutorService> asyncInitMethodVirtualExecutor() {
return ()-> {
LOGGER.info("create async-init-bean virtual executor service");
return SofaVirtualThreadFactory.ofExecutorService("async-init-bean");
};
}

@Bean
@ConditionalOnMissingBean
public static AsyncProxyBeanPostProcessor asyncProxyBeanPostProcessor(AsyncInitMethodManager asyncInitMethodManager) {
Expand Down
Loading

0 comments on commit b4e30c6

Please sign in to comment.