Skip to content

Commit

Permalink
Add virtual thread option for ThreadPoolTaskExecutorBuilder/`Thread…
Browse files Browse the repository at this point in the history
…PoolTaskSchedulerBuilder`

See spring-projects/spring-framework#33807
  • Loading branch information
quaff committed Oct 29, 2024
1 parent 619b24a commit bc3645f
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 38 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mavenVersion=3.9.4
mockitoVersion=5.14.2
nativeBuildToolsVersion=0.10.3
snakeYamlVersion=2.3
springFrameworkVersion=6.2.0-RC3
springFrameworkVersion=6.2.0-SNAPSHOT
springFramework60xVersion=6.0.23
tomcatVersion=10.1.31

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,25 +69,49 @@ ThreadPoolTaskExecutor applicationTaskExecutor(ThreadPoolTaskExecutorBuilder thr
@Configuration(proxyBeanMethods = false)
static class ThreadPoolTaskExecutorBuilderConfiguration {

@Bean
@ConditionalOnMissingBean
ThreadPoolTaskExecutorBuilder threadPoolTaskExecutorBuilder(TaskExecutionProperties properties,
private final TaskExecutionProperties properties;

private final ObjectProvider<ThreadPoolTaskExecutorCustomizer> threadPoolTaskExecutorCustomizers;

private final ObjectProvider<TaskDecorator> taskDecorator;

ThreadPoolTaskExecutorBuilderConfiguration(TaskExecutionProperties properties,
ObjectProvider<ThreadPoolTaskExecutorCustomizer> threadPoolTaskExecutorCustomizers,
ObjectProvider<TaskDecorator> taskDecorator) {
TaskExecutionProperties.Pool pool = properties.getPool();
this.properties = properties;
this.threadPoolTaskExecutorCustomizers = threadPoolTaskExecutorCustomizers;
this.taskDecorator = taskDecorator;
}

@Bean
@ConditionalOnMissingBean
@ConditionalOnThreading(Threading.PLATFORM)
ThreadPoolTaskExecutorBuilder threadPoolTaskExecutorBuilder() {
return builder();
}

@Bean(name = "threadPoolTaskExecutorBuilder")
@ConditionalOnMissingBean
@ConditionalOnThreading(Threading.VIRTUAL)
ThreadPoolTaskExecutorBuilder threadPoolTaskExecutorBuilderVirtualThreads() {
return builder().virtualThreads(true);
}

private ThreadPoolTaskExecutorBuilder builder() {
TaskExecutionProperties.Pool pool = this.properties.getPool();
ThreadPoolTaskExecutorBuilder builder = new ThreadPoolTaskExecutorBuilder();
builder = builder.queueCapacity(pool.getQueueCapacity());
builder = builder.corePoolSize(pool.getCoreSize());
builder = builder.maxPoolSize(pool.getMaxSize());
builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
builder = builder.keepAlive(pool.getKeepAlive());
builder = builder.acceptTasksAfterContextClose(pool.getShutdown().isAcceptTasksAfterContextClose());
TaskExecutionProperties.Shutdown shutdown = properties.getShutdown();
TaskExecutionProperties.Shutdown shutdown = this.properties.getShutdown();
builder = builder.awaitTermination(shutdown.isAwaitTermination());
builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
builder = builder.customizers(threadPoolTaskExecutorCustomizers.orderedStream()::iterator);
builder = builder.taskDecorator(taskDecorator.getIfUnique());
builder = builder.threadNamePrefix(this.properties.getThreadNamePrefix());
builder = builder.customizers(this.threadPoolTaskExecutorCustomizers.orderedStream()::iterator);
builder = builder.taskDecorator(this.taskDecorator.getIfUnique());
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
* {@link TaskSchedulingAutoConfiguration} in a specific order.
*
* @author Moritz Halbritter
* @author Yanming Zhou
*/
class TaskSchedulingConfigurations {

Expand All @@ -64,20 +65,40 @@ ThreadPoolTaskScheduler taskScheduler(ThreadPoolTaskSchedulerBuilder threadPoolT
@Configuration(proxyBeanMethods = false)
static class ThreadPoolTaskSchedulerBuilderConfiguration {

private final TaskSchedulingProperties properties;

private final ObjectProvider<ThreadPoolTaskSchedulerCustomizer> threadPoolTaskSchedulerCustomizers;

ThreadPoolTaskSchedulerBuilderConfiguration(TaskSchedulingProperties properties,
ObjectProvider<ThreadPoolTaskSchedulerCustomizer> threadPoolTaskSchedulerCustomizers) {
this.properties = properties;
this.threadPoolTaskSchedulerCustomizers = threadPoolTaskSchedulerCustomizers;
}

@Bean
@ConditionalOnMissingBean
ThreadPoolTaskSchedulerBuilder threadPoolTaskSchedulerBuilder(TaskSchedulingProperties properties,
ObjectProvider<ThreadPoolTaskSchedulerCustomizer> threadPoolTaskSchedulerCustomizers) {
TaskSchedulingProperties.Shutdown shutdown = properties.getShutdown();
@ConditionalOnThreading(Threading.PLATFORM)
ThreadPoolTaskSchedulerBuilder threadPoolTaskSchedulerBuilder() {
return builder();
}

@Bean(name = "threadPoolTaskSchedulerBuilder")
@ConditionalOnMissingBean
@ConditionalOnThreading(Threading.VIRTUAL)
ThreadPoolTaskSchedulerBuilder threadPoolTaskSchedulerBuilderVirtualThreads() {
return builder().virtualThreads(true);
}

private ThreadPoolTaskSchedulerBuilder builder() {
TaskSchedulingProperties.Shutdown shutdown = this.properties.getShutdown();
ThreadPoolTaskSchedulerBuilder builder = new ThreadPoolTaskSchedulerBuilder();
builder = builder.poolSize(properties.getPool().getSize());
builder = builder.poolSize(this.properties.getPool().getSize());
builder = builder.awaitTermination(shutdown.isAwaitTermination());
builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod());
builder = builder.threadNamePrefix(properties.getThreadNamePrefix());
builder = builder.customizers(threadPoolTaskSchedulerCustomizers);
builder = builder.threadNamePrefix(this.properties.getThreadNamePrefix());
builder = builder.customizers(this.threadPoolTaskSchedulerCustomizers);
return builder;
}

}

@Configuration(proxyBeanMethods = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,23 @@ void threadPoolTaskExecutorBuilderShouldUseTaskDecorator() {
});
}

@Test
void threadPoolTaskExecutorBuilderUsesPlatformThreadsByDefault() {
this.contextRunner.run((context) -> {
ThreadPoolTaskExecutorBuilder builder = context.getBean(ThreadPoolTaskExecutorBuilder.class);
assertThat(builder).hasFieldOrPropertyWithValue("virtualThreads", null);
});
}

@Test
@EnabledForJreRange(min = JRE.JAVA_21)
void threadPoolTaskExecutorBuilderUsesVirtualThreadsWhenEnabled() {
this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true").run((context) -> {
ThreadPoolTaskExecutorBuilder builder = context.getBean(ThreadPoolTaskExecutorBuilder.class);
assertThat(builder).hasFieldOrPropertyWithValue("virtualThreads", true);
});
}

@Test
void whenThreadPoolTaskExecutorIsAutoConfiguredThenItIsLazy() {
this.contextRunner.run((context) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
*
* @author Stephane Nicoll
* @author Moritz Halbritter
* @author Yanming Zhou
*/
class TaskSchedulingAutoConfigurationTests {

Expand Down Expand Up @@ -100,6 +101,28 @@ void enableSchedulingWithNoTaskExecutorAutoConfiguresOne() {
});
}

@Test
@EnabledForJreRange(min = JRE.JAVA_21)
void threadPoolTaskSchedulerBuilderShouldUseVirtualThreadsIfEnabled() {
this.contextRunner.withPropertyValues("spring.threads.virtual.enabled=true")
.withUserConfiguration(SchedulingConfiguration.class)
.run((context) -> {
assertThat(context).hasSingleBean(ThreadPoolTaskSchedulerBuilder.class);
ThreadPoolTaskSchedulerBuilder builder = context.getBean(ThreadPoolTaskSchedulerBuilder.class);
assertThat(builder).hasFieldOrPropertyWithValue("virtualThreads", true);
});
}

@Test
@EnabledForJreRange(min = JRE.JAVA_21)
void threadPoolTaskSchedulerBuilderShouldUsePlatformThreadsByDefault() {
this.contextRunner.withUserConfiguration(SchedulingConfiguration.class).run((context) -> {
assertThat(context).hasSingleBean(ThreadPoolTaskSchedulerBuilder.class);
ThreadPoolTaskSchedulerBuilder builder = context.getBean(ThreadPoolTaskSchedulerBuilder.class);
assertThat(builder).hasFieldOrPropertyWithValue("virtualThreads", null);
});
}

@Test
void simpleAsyncTaskSchedulerBuilderShouldReadProperties() {
this.contextRunner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public class ThreadPoolTaskExecutorBuilder {

private final TaskDecorator taskDecorator;

private final Boolean virtualThreads;

private final Set<ThreadPoolTaskExecutorCustomizer> customizers;

public ThreadPoolTaskExecutorBuilder() {
Expand All @@ -78,13 +80,14 @@ public ThreadPoolTaskExecutorBuilder() {
this.awaitTerminationPeriod = null;
this.threadNamePrefix = null;
this.taskDecorator = null;
this.virtualThreads = null;
this.customizers = null;
}

private ThreadPoolTaskExecutorBuilder(Integer queueCapacity, Integer corePoolSize, Integer maxPoolSize,
Boolean allowCoreThreadTimeOut, Duration keepAlive, Boolean acceptTasksAfterContextClose,
Boolean awaitTermination, Duration awaitTerminationPeriod, String threadNamePrefix,
TaskDecorator taskDecorator, Set<ThreadPoolTaskExecutorCustomizer> customizers) {
TaskDecorator taskDecorator, Boolean virtualThreads, Set<ThreadPoolTaskExecutorCustomizer> customizers) {
this.queueCapacity = queueCapacity;
this.corePoolSize = corePoolSize;
this.maxPoolSize = maxPoolSize;
Expand All @@ -95,6 +98,7 @@ private ThreadPoolTaskExecutorBuilder(Integer queueCapacity, Integer corePoolSiz
this.awaitTerminationPeriod = awaitTerminationPeriod;
this.threadNamePrefix = threadNamePrefix;
this.taskDecorator = taskDecorator;
this.virtualThreads = virtualThreads;
this.customizers = customizers;
}

Expand All @@ -107,7 +111,8 @@ private ThreadPoolTaskExecutorBuilder(Integer queueCapacity, Integer corePoolSiz
public ThreadPoolTaskExecutorBuilder queueCapacity(int queueCapacity) {
return new ThreadPoolTaskExecutorBuilder(queueCapacity, this.corePoolSize, this.maxPoolSize,
this.allowCoreThreadTimeOut, this.keepAlive, this.acceptTasksAfterContextClose, this.awaitTermination,
this.awaitTerminationPeriod, this.threadNamePrefix, this.taskDecorator, this.customizers);
this.awaitTerminationPeriod, this.threadNamePrefix, this.taskDecorator, this.virtualThreads,
this.customizers);
}

/**
Expand All @@ -122,7 +127,8 @@ public ThreadPoolTaskExecutorBuilder queueCapacity(int queueCapacity) {
public ThreadPoolTaskExecutorBuilder corePoolSize(int corePoolSize) {
return new ThreadPoolTaskExecutorBuilder(this.queueCapacity, corePoolSize, this.maxPoolSize,
this.allowCoreThreadTimeOut, this.keepAlive, this.acceptTasksAfterContextClose, this.awaitTermination,
this.awaitTerminationPeriod, this.threadNamePrefix, this.taskDecorator, this.customizers);
this.awaitTerminationPeriod, this.threadNamePrefix, this.taskDecorator, this.virtualThreads,
this.customizers);
}

/**
Expand All @@ -137,7 +143,8 @@ public ThreadPoolTaskExecutorBuilder corePoolSize(int corePoolSize) {
public ThreadPoolTaskExecutorBuilder maxPoolSize(int maxPoolSize) {
return new ThreadPoolTaskExecutorBuilder(this.queueCapacity, this.corePoolSize, maxPoolSize,
this.allowCoreThreadTimeOut, this.keepAlive, this.acceptTasksAfterContextClose, this.awaitTermination,
this.awaitTerminationPeriod, this.threadNamePrefix, this.taskDecorator, this.customizers);
this.awaitTerminationPeriod, this.threadNamePrefix, this.taskDecorator, this.virtualThreads,
this.customizers);
}

/**
Expand All @@ -149,7 +156,8 @@ public ThreadPoolTaskExecutorBuilder maxPoolSize(int maxPoolSize) {
public ThreadPoolTaskExecutorBuilder allowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
return new ThreadPoolTaskExecutorBuilder(this.queueCapacity, this.corePoolSize, this.maxPoolSize,
allowCoreThreadTimeOut, this.keepAlive, this.acceptTasksAfterContextClose, this.awaitTermination,
this.awaitTerminationPeriod, this.threadNamePrefix, this.taskDecorator, this.customizers);
this.awaitTerminationPeriod, this.threadNamePrefix, this.taskDecorator, this.virtualThreads,
this.customizers);
}

/**
Expand All @@ -160,7 +168,8 @@ public ThreadPoolTaskExecutorBuilder allowCoreThreadTimeOut(boolean allowCoreThr
public ThreadPoolTaskExecutorBuilder keepAlive(Duration keepAlive) {
return new ThreadPoolTaskExecutorBuilder(this.queueCapacity, this.corePoolSize, this.maxPoolSize,
this.allowCoreThreadTimeOut, keepAlive, this.acceptTasksAfterContextClose, this.awaitTermination,
this.awaitTerminationPeriod, this.threadNamePrefix, this.taskDecorator, this.customizers);
this.awaitTerminationPeriod, this.threadNamePrefix, this.taskDecorator, this.virtualThreads,
this.customizers);
}

/**
Expand All @@ -174,7 +183,8 @@ public ThreadPoolTaskExecutorBuilder keepAlive(Duration keepAlive) {
public ThreadPoolTaskExecutorBuilder acceptTasksAfterContextClose(boolean acceptTasksAfterContextClose) {
return new ThreadPoolTaskExecutorBuilder(this.queueCapacity, this.corePoolSize, this.maxPoolSize,
this.allowCoreThreadTimeOut, this.keepAlive, acceptTasksAfterContextClose, this.awaitTermination,
this.awaitTerminationPeriod, this.threadNamePrefix, this.taskDecorator, this.customizers);
this.awaitTerminationPeriod, this.threadNamePrefix, this.taskDecorator, this.virtualThreads,
this.customizers);
}

/**
Expand All @@ -188,7 +198,8 @@ public ThreadPoolTaskExecutorBuilder acceptTasksAfterContextClose(boolean accept
public ThreadPoolTaskExecutorBuilder awaitTermination(boolean awaitTermination) {
return new ThreadPoolTaskExecutorBuilder(this.queueCapacity, this.corePoolSize, this.maxPoolSize,
this.allowCoreThreadTimeOut, this.keepAlive, this.acceptTasksAfterContextClose, awaitTermination,
this.awaitTerminationPeriod, this.threadNamePrefix, this.taskDecorator, this.customizers);
this.awaitTerminationPeriod, this.threadNamePrefix, this.taskDecorator, this.virtualThreads,
this.customizers);
}

/**
Expand All @@ -203,7 +214,8 @@ public ThreadPoolTaskExecutorBuilder awaitTermination(boolean awaitTermination)
public ThreadPoolTaskExecutorBuilder awaitTerminationPeriod(Duration awaitTerminationPeriod) {
return new ThreadPoolTaskExecutorBuilder(this.queueCapacity, this.corePoolSize, this.maxPoolSize,
this.allowCoreThreadTimeOut, this.keepAlive, this.acceptTasksAfterContextClose, this.awaitTermination,
awaitTerminationPeriod, this.threadNamePrefix, this.taskDecorator, this.customizers);
awaitTerminationPeriod, this.threadNamePrefix, this.taskDecorator, this.virtualThreads,
this.customizers);
}

/**
Expand All @@ -214,7 +226,8 @@ public ThreadPoolTaskExecutorBuilder awaitTerminationPeriod(Duration awaitTermin
public ThreadPoolTaskExecutorBuilder threadNamePrefix(String threadNamePrefix) {
return new ThreadPoolTaskExecutorBuilder(this.queueCapacity, this.corePoolSize, this.maxPoolSize,
this.allowCoreThreadTimeOut, this.keepAlive, this.acceptTasksAfterContextClose, this.awaitTermination,
this.awaitTerminationPeriod, threadNamePrefix, this.taskDecorator, this.customizers);
this.awaitTerminationPeriod, threadNamePrefix, this.taskDecorator, this.virtualThreads,
this.customizers);
}

/**
Expand All @@ -225,7 +238,20 @@ public ThreadPoolTaskExecutorBuilder threadNamePrefix(String threadNamePrefix) {
public ThreadPoolTaskExecutorBuilder taskDecorator(TaskDecorator taskDecorator) {
return new ThreadPoolTaskExecutorBuilder(this.queueCapacity, this.corePoolSize, this.maxPoolSize,
this.allowCoreThreadTimeOut, this.keepAlive, this.acceptTasksAfterContextClose, this.awaitTermination,
this.awaitTerminationPeriod, this.threadNamePrefix, taskDecorator, this.customizers);
this.awaitTerminationPeriod, this.threadNamePrefix, taskDecorator, this.virtualThreads,
this.customizers);
}

/**
* Specify whether to use virtual threads instead of platform threads.
* @param virtualThreads whether to use virtual threads instead of platform threads
* @return a new builder instance
*/
public ThreadPoolTaskExecutorBuilder virtualThreads(boolean virtualThreads) {
return new ThreadPoolTaskExecutorBuilder(this.queueCapacity, this.corePoolSize, this.maxPoolSize,
this.allowCoreThreadTimeOut, this.keepAlive, this.acceptTasksAfterContextClose, this.awaitTermination,
this.awaitTerminationPeriod, this.threadNamePrefix, this.taskDecorator, virtualThreads,
this.customizers);
}

/**
Expand Down Expand Up @@ -255,7 +281,8 @@ public ThreadPoolTaskExecutorBuilder customizers(Iterable<? extends ThreadPoolTa
Assert.notNull(customizers, "Customizers must not be null");
return new ThreadPoolTaskExecutorBuilder(this.queueCapacity, this.corePoolSize, this.maxPoolSize,
this.allowCoreThreadTimeOut, this.keepAlive, this.acceptTasksAfterContextClose, this.awaitTermination,
this.awaitTerminationPeriod, this.threadNamePrefix, this.taskDecorator, append(null, customizers));
this.awaitTerminationPeriod, this.threadNamePrefix, this.taskDecorator, this.virtualThreads,
append(null, customizers));
}

/**
Expand Down Expand Up @@ -284,7 +311,7 @@ public ThreadPoolTaskExecutorBuilder additionalCustomizers(
Assert.notNull(customizers, "Customizers must not be null");
return new ThreadPoolTaskExecutorBuilder(this.queueCapacity, this.corePoolSize, this.maxPoolSize,
this.allowCoreThreadTimeOut, this.keepAlive, this.acceptTasksAfterContextClose, this.awaitTermination,
this.awaitTerminationPeriod, this.threadNamePrefix, this.taskDecorator,
this.awaitTerminationPeriod, this.threadNamePrefix, this.taskDecorator, this.virtualThreads,
append(this.customizers, customizers));
}

Expand Down Expand Up @@ -332,6 +359,7 @@ public <T extends ThreadPoolTaskExecutor> T configure(T taskExecutor) {
map.from(this.awaitTerminationPeriod).as(Duration::toMillis).to(taskExecutor::setAwaitTerminationMillis);
map.from(this.threadNamePrefix).whenHasText().to(taskExecutor::setThreadNamePrefix);
map.from(this.taskDecorator).to(taskExecutor::setTaskDecorator);
map.from(this.virtualThreads).to(taskExecutor::setVirtualThreads);
if (!CollectionUtils.isEmpty(this.customizers)) {
this.customizers.forEach((customizer) -> customizer.customize(taskExecutor));
}
Expand Down
Loading

0 comments on commit bc3645f

Please sign in to comment.