Skip to content

Commit

Permalink
fixed tests
Browse files Browse the repository at this point in the history
  • Loading branch information
musketyr committed Dec 11, 2024
1 parent ec4e81d commit a3dde2c
Show file tree
Hide file tree
Showing 14 changed files with 39 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
"producer": {
"queueName": "Sample",
"queueType": null
}
},
"virtualThreadsCompatible": false
},
"status": {
"executionCount": 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
/**
* @return whether the job contains code that can be executed on virtual threads, e.g. there is no use of <code>synchronized</code> keyword anywhere in the code
*/
@AliasFor(annotation = Job.class, member = "isVirtualThreadCompatible")
boolean isVirtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;
@AliasFor(annotation = Job.class, member = "virtualThreadCompatible")
boolean virtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
/**
* @return whether the job contains code that can be executed on virtual threads, e.g. there is no use of <code>synchronized</code> keyword anywhere in the code
*/
@AliasFor(annotation = Job.class, member = "isVirtualThreadCompatible")
boolean isVirtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;
@AliasFor(annotation = Job.class, member = "virtualThreadCompatible")
boolean virtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
/**
* @return whether the job contains code that can be executed on virtual threads, e.g. there is no use of <code>synchronized</code> keyword anywhere in the code
*/
@AliasFor(annotation = Job.class, member = "isVirtualThreadCompatible")
boolean isVirtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;
@AliasFor(annotation = Job.class, member = "virtualThreadCompatible")
boolean virtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,8 @@
@AliasFor(annotation = Job.class, member = "scheduler")
String scheduler() default WorkerConfiguration.DEFAULT_SCHEDULER;

/**
* @return whether the job contains code that can be executed on virtual threads, e.g. there is no use of <code>synchronized</code> keyword anywhere in the code
*/
boolean virtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,6 @@
/**
* @return whether the job contains code that can be executed on virtual threads, e.g. there is no use of <code>synchronized</code> keyword anywhere in the code
*/
boolean isVirtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;
boolean virtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;

}
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ public JobConfiguration mergeWith(JobConfiguration overrides) {

@Override
public String toString() {
return "DefaultJobConfiguration{name='%s', enabled=%s, concurrency=%d, leaderOnly=%s, followerOnly=%s, cron=%s, fixedDelay=%s, initialDelay=%s, fixedRate=%s, scheduler='%s', fork=%d, consumer=%s, producer=%s}"
.formatted(name, enabled, concurrency, leaderOnly, followerOnly, cron, fixedDelay, initialDelay, fixedRate, scheduler, fork, consumer, producer);
return "DefaultJobConfiguration{name='%s', enabled=%s, concurrency=%d, leaderOnly=%s, followerOnly=%s, cron=%s, fixedDelay=%s, initialDelay=%s, fixedRate=%s, scheduler='%s', fork=%d, consumer=%s, producer=%s, virtualThreadsCompatible=%s}"
.formatted(name, enabled, concurrency, leaderOnly, followerOnly, cron, fixedDelay, initialDelay, fixedRate, scheduler, fork, consumer, producer, virtualThreadsCompatible);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
/**
* @return whether the job contains code that can be executed on virtual threads, e.g. there is no use of <code>synchronized</code> keyword anywhere in the code
*/
boolean isVirtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;
@AliasFor(annotation = Job.class, member = "virtualThreadCompatible")
boolean virtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;

}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@
/**
* @return whether the job contains code that can be executed on virtual threads, e.g. there is no use of <code>synchronized</code> keyword anywhere in the code
*/
boolean isVirtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;
@AliasFor(annotation = Job.class, member = "virtualThreadCompatible")
boolean virtualThreadCompatible() default WorkerConfiguration.DEFAULT_VIRTUAL_THREAD_COMPATIBLE;


}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.agorapulse.worker.Job;
import com.agorapulse.worker.JobConfiguration;
import com.agorapulse.worker.WorkerConfiguration;
import io.micronaut.context.BeanContext;
import io.micronaut.context.Qualifier;
import io.micronaut.inject.qualifiers.Qualifiers;
Expand All @@ -42,11 +41,9 @@ public class DefaultExecutorServiceProvider implements ExecutorServiceProvider,
private final Map<String, ExecutorService> createdExecutors = new ConcurrentHashMap<>();

private final BeanContext beanContext;
private final WorkerConfiguration workerConfiguration;

public DefaultExecutorServiceProvider(BeanContext beanContext, WorkerConfiguration workerConfiguration) {
public DefaultExecutorServiceProvider(BeanContext beanContext) {
this.beanContext = beanContext;
this.workerConfiguration = workerConfiguration;
}

@Override
Expand All @@ -58,7 +55,7 @@ public void close() {

@Override
public ExecutorService getExecutorService(Job job) {
return getExecutor(ExecutorServiceProvider.getSchedulerName(job.getConfiguration()), job.getConfiguration().getFork(), workerConfiguration.isVirtualThreadsCompatible() || job.getConfiguration().isVirtualThreadsCompatible());
return getExecutor(ExecutorServiceProvider.getSchedulerName(job.getConfiguration()), job.getConfiguration().getFork(), job.getConfiguration().isVirtualThreadsCompatible());
}

@Override
Expand All @@ -76,7 +73,7 @@ public TaskScheduler getTaskScheduler(Job job) {
}

return optionalTaskScheduler.orElseGet(() -> {
ExecutorService executor = getExecutor(schedulerName, configuration.getFork(), workerConfiguration.isVirtualThreadsCompatible() || configuration.isVirtualThreadsCompatible());
ExecutorService executor = getExecutor(schedulerName, configuration.getFork(), configuration.isVirtualThreadsCompatible());
ScheduledExecutorTaskScheduler scheduler = new ScheduledExecutorTaskScheduler(executor);
beanContext.registerSingleton(TaskScheduler.class, scheduler, Qualifiers.byName(schedulerName));
return scheduler;
Expand All @@ -96,7 +93,6 @@ private ExecutorService getExecutor(String schedulerName, int fork, boolean virt
.findBean(ExecutorService.class, byName)
.filter(ScheduledExecutorService.class::isInstance)
.orElseGet(() -> {
// TODO: also add configuration to the job
ExecutorService service = Executors.newScheduledThreadPool(
useVirtualThreads ? 0 : fork,
useVirtualThreads ? LoomSupport.newVirtualThreadFactory(schedulerName) : new NamedThreadFactory(schedulerName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class MethodJobProcessor implements ExecutableMethodProcessor<Job> {
private static final String MEMBER_MAX_MESSAGES = "maxMessages";
private static final String MEMBER_WAITING_TIME = "waitingTime";
private static final String MEMBER_SCHEDULER = "scheduler";
private static final String MEMBER_VIRTUAL_THREADS_COMPATIBLE = "scheduler";
private static final String MEMBER_TYPE = "type";
private static final String MEMBER_VALUE = "value";
private static final String MEMBER_NAME = "name";
Expand Down Expand Up @@ -118,6 +119,15 @@ public class MethodJobProcessor implements ExecutableMethodProcessor<Job> {
QueueConsumer.class.getName(), MEMBER_SCHEDULER,
QueueProducer.class.getName(), MEMBER_SCHEDULER
);
private static final Map<String, String> ANNOTATION_TO_VIRTUAL_THREADS_COMPATIBLE_MAP = Map.of(
Job.class.getName(), MEMBER_VIRTUAL_THREADS_COMPATIBLE,
Cron.class.getName(), MEMBER_VIRTUAL_THREADS_COMPATIBLE,
FixedRate.class.getName(), MEMBER_VIRTUAL_THREADS_COMPATIBLE,
InitialDelay.class.getName(), MEMBER_VIRTUAL_THREADS_COMPATIBLE,
FixedDelay.class.getName(), MEMBER_VIRTUAL_THREADS_COMPATIBLE,
QueueConsumer.class.getName(), MEMBER_VIRTUAL_THREADS_COMPATIBLE,
QueueProducer.class.getName(), MEMBER_VIRTUAL_THREADS_COMPATIBLE
);

private static final Map<String, String> ANNOTATION_TO_FORK_MAP = Map.of(
Fork.class.getName(), MEMBER_VALUE,
Expand Down Expand Up @@ -261,6 +271,7 @@ private JobConfiguration getJobConfiguration(BeanDefinition<?> beanDefinition, E
getFirstAnnotationValue(ANNOTATION_TO_FIXED_RATE_MAP, method::stringValue, StringUtils::isNotEmpty).ifPresent(fixedRate -> configuration.setFixedRate(convertDuration(jobName, fixedRate, "fixed rate")));
getFirstAnnotationValue(ANNOTATION_TO_INITIAL_DELAY_MAP, method::stringValue, StringUtils::isNotEmpty).ifPresent(initialDelay -> configuration.setInitialDelay(convertDuration(jobName, initialDelay, "initial delay")));
getFirstAnnotationValue(ANNOTATION_TO_SCHEDULER_MAP, method::stringValue, StringUtils::isNotEmpty).ifPresent(configuration::setScheduler);
getFirstAnnotationValue(ANNOTATION_TO_VIRTUAL_THREADS_COMPATIBLE_MAP, method::booleanValue, Boolean::booleanValue).ifPresent(configuration::setVirtualThreadsCompatible);

boolean consumer = method.getArguments().length == 1;
boolean producer = !method.getReturnType().getType().equals(void.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
"producer": {
"queueName": "Sample",
"queueType": null
}
},
"virtualThreadsCompatible": false
},
"status": {
"executionCount": 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
"producer": {
"queueName": "Sample",
"queueType": null
}
},
"virtualThreadsCompatible": false
},
"status": {
"executionCount": 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
"producer": {
"queueName": null,
"queueType": null
}
},
"virtualThreadsCompatible": false
},
"status": {
"executionCount": 0,
Expand Down

0 comments on commit a3dde2c

Please sign in to comment.