Skip to content

Commit

Permalink
Fix race condition in task scheduling (#10892)
Browse files Browse the repository at this point in the history
Under some circumstances, ExecutableMethodProcessor.process may be called during or even after the StartupEvent is fired. In that case, scheduled methods may be ignored or there may be a ConcurrentModificationException.

This patch moves the schedule logic into its own method, called by a class ScheduleTaskRunnable that guards the schedule logic using an AtomicBoolean so that it is run only once. Inside process(), the task is created, registered in a ConcurrentHashMap and, if the StartupEvent has already been fired, invoked. Inside the StartupEvent listener, all previously registered tasks are invoked.

- If process is called before the started flag is set, the task will definitely be run by the StartupEvent listener.
- If process is called after the started flag is set, the task will definitely be called inside process, and potentially also inside the StartupEvent listener if that is still running.

This ensures that the ScheduleTaskRunnable is invoked at least once.
  • Loading branch information
yawkat committed Jun 7, 2024
1 parent f9551ce commit a0ff610
Showing 1 changed file with 116 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,21 @@
import io.micronaut.scheduling.exceptions.SchedulerConfigurationException;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Singleton;
import java.util.ArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* A {@link ExecutableMethodProcessor} for the {@link Scheduled} annotation.
Expand All @@ -76,8 +78,9 @@ public class ScheduledMethodProcessor implements ExecutableMethodProcessor<Sched
private final BeanContext beanContext;
private final ConversionService conversionService;
private final Queue<ScheduledFuture<?>> scheduledTasks = new ConcurrentLinkedDeque<>();
private final List<ScheduledDefinition> scheduledMethods = new ArrayList<>();
private final Map<ScheduledDefinition, Runnable> scheduledMethods = new ConcurrentHashMap<>();
private final TaskExceptionHandler<?, ?> taskExceptionHandler;
private volatile boolean started = false;

/**
* @param beanContext The bean context for DI of beans annotated with @Inject
Expand All @@ -95,8 +98,10 @@ public ScheduledMethodProcessor(BeanContext beanContext, Optional<ConversionServ
public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> method) {
if (beanContext instanceof ApplicationContext) {
ScheduledDefinition scheduledDefinition = new ScheduledDefinition(beanDefinition, method);
if (!scheduledMethods.contains(scheduledDefinition)) {
this.scheduledMethods.add(scheduledDefinition);
Runnable runnable = new ScheduleTaskRunnable(scheduledDefinition);
// process may be called during or after scheduleTasks. we need to guard against that.
if (scheduledMethods.putIfAbsent(scheduledDefinition, runnable) == null && started) {
runnable.run();
}
}
}
Expand All @@ -106,104 +111,109 @@ public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> met
* @param startupEvent The startup event.
*/
@EventListener
@SuppressWarnings("unchecked")
void scheduleTasks(@SuppressWarnings("unused") StartupEvent startupEvent) {
for (ScheduledDefinition scheduledDefinition : scheduledMethods) {
ExecutableMethod<?, ?> method = scheduledDefinition.method();
BeanDefinition<?> beanDefinition = scheduledDefinition.definition();
List<AnnotationValue<Scheduled>> scheduledAnnotations = method.getAnnotationValuesByType(Scheduled.class);
for (AnnotationValue<Scheduled> scheduledAnnotation : scheduledAnnotations) {
String fixedRate = scheduledAnnotation.stringValue(MEMBER_FIXED_RATE).orElse(null);

String initialDelayStr = scheduledAnnotation.stringValue(MEMBER_INITIAL_DELAY).orElse(null);
Duration initialDelay = null;
if (StringUtils.hasText(initialDelayStr)) {
initialDelay = conversionService.convert(initialDelayStr, Duration.class).orElseThrow(() ->
new SchedulerConfigurationException(method, "Invalid initial delay definition: " + initialDelayStr)
);
}
started = true;
for (Runnable runnable : scheduledMethods.values()) {
runnable.run();
}
}

String scheduler = scheduledAnnotation.stringValue(MEMBER_SCHEDULER).orElse(TaskExecutors.SCHEDULED);
Optional<TaskScheduler> optionalTaskScheduler = beanContext
.findBean(TaskScheduler.class, Qualifiers.byName(scheduler));
@SuppressWarnings("unchecked")
private void scheduleTask(ScheduledDefinition scheduledDefinition) {
ExecutableMethod<?, ?> method = scheduledDefinition.method();
BeanDefinition<?> beanDefinition = scheduledDefinition.definition();
List<AnnotationValue<Scheduled>> scheduledAnnotations = method.getAnnotationValuesByType(Scheduled.class);
for (AnnotationValue<Scheduled> scheduledAnnotation : scheduledAnnotations) {
String fixedRate = scheduledAnnotation.stringValue(MEMBER_FIXED_RATE).orElse(null);

if (optionalTaskScheduler.isEmpty()) {
optionalTaskScheduler = beanContext.findBean(ExecutorService.class, Qualifiers.byName(scheduler))
.filter(ScheduledExecutorService.class::isInstance)
.map(ScheduledExecutorTaskScheduler::new);
}
String initialDelayStr = scheduledAnnotation.stringValue(MEMBER_INITIAL_DELAY).orElse(null);
Duration initialDelay = null;
if (StringUtils.hasText(initialDelayStr)) {
initialDelay = conversionService.convert(initialDelayStr, Duration.class).orElseThrow(() ->
new SchedulerConfigurationException(method, "Invalid initial delay definition: " + initialDelayStr)
);
}

TaskScheduler taskScheduler = optionalTaskScheduler.orElseThrow(() -> new SchedulerConfigurationException(method, "No scheduler of type TaskScheduler configured for name: " + scheduler));
Runnable task = () -> {
try {
ExecutableBeanContextBinder binder = new DefaultExecutableBeanContextBinder();
BoundExecutable<?, ?> boundExecutable = binder.bind(method, beanContext);
Object bean = beanContext.getBean((Argument<Object>) beanDefinition.asArgument(), (Qualifier<Object>) beanDefinition.getDeclaredQualifier());
AnnotationValue<Scheduled> finalAnnotationValue = scheduledAnnotation;
if (finalAnnotationValue instanceof EvaluatedAnnotationValue<Scheduled> evaluated) {
finalAnnotationValue = evaluated.withArguments(bean, boundExecutable.getBoundArguments());
}
boolean shouldRun = finalAnnotationValue.booleanValue(MEMBER_CONDITION).orElse(true);
if (shouldRun) {
try {
((BoundExecutable<Object, Object>) boundExecutable).invoke(bean);
} catch (Throwable e) {
handleException((Class<Object>) beanDefinition.getBeanType(), bean, e);
}
}
} catch (NoSuchBeanException noSuchBeanException) {
// ignore: a timing issue can occur when the context is being shutdown. If a scheduled job runs and the context
// is shutdown and available beans cleared then the bean is no longer available. The best thing to do here is just ignore the failure.
LOG.debug("Scheduled job skipped for context shutdown: {}.{}", beanDefinition.getBeanType().getSimpleName(), method.getDescription(true));
} catch (Exception e) {
TaskExceptionHandler finalHandler = findHandler(beanDefinition.getBeanType(), e);
finalHandler.handleCreationFailure(beanDefinition, e);
}
};
String scheduler = scheduledAnnotation.stringValue(MEMBER_SCHEDULER).orElse(TaskExecutors.SCHEDULED);
Optional<TaskScheduler> optionalTaskScheduler = beanContext
.findBean(TaskScheduler.class, Qualifiers.byName(scheduler));

String cronExpr = scheduledAnnotation.stringValue(MEMBER_CRON).orElse(null);
String zoneIdStr = scheduledAnnotation.stringValue(MEMBER_ZONE_ID).orElse(null);
String fixedDelay = scheduledAnnotation.stringValue(MEMBER_FIXED_DELAY).orElse(null);
if (optionalTaskScheduler.isEmpty()) {
optionalTaskScheduler = beanContext.findBean(ExecutorService.class, Qualifiers.byName(scheduler))
.filter(ScheduledExecutorService.class::isInstance)
.map(ScheduledExecutorTaskScheduler::new);
}

if (StringUtils.isNotEmpty(cronExpr)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling cron task [{}] for method: {}", cronExpr, method);
TaskScheduler taskScheduler = optionalTaskScheduler.orElseThrow(() -> new SchedulerConfigurationException(method, "No scheduler of type TaskScheduler configured for name: " + scheduler));
Runnable task = () -> {
try {
ExecutableBeanContextBinder binder = new DefaultExecutableBeanContextBinder();
BoundExecutable<?, ?> boundExecutable = binder.bind(method, beanContext);
Object bean = beanContext.getBean((Argument<Object>) beanDefinition.asArgument(), (Qualifier<Object>) beanDefinition.getDeclaredQualifier());
AnnotationValue<Scheduled> finalAnnotationValue = scheduledAnnotation;
if (finalAnnotationValue instanceof EvaluatedAnnotationValue<Scheduled> evaluated) {
finalAnnotationValue = evaluated.withArguments(bean, boundExecutable.getBoundArguments());
}
boolean shouldRun = finalAnnotationValue.booleanValue(MEMBER_CONDITION).orElse(true);
if (shouldRun) {
try {
((BoundExecutable<Object, Object>) boundExecutable).invoke(bean);
} catch (Throwable e) {
handleException((Class<Object>) beanDefinition.getBeanType(), bean, e);
}
}
} catch (NoSuchBeanException noSuchBeanException) {
// ignore: a timing issue can occur when the context is being shutdown. If a scheduled job runs and the context
// is shutdown and available beans cleared then the bean is no longer available. The best thing to do here is just ignore the failure.
LOG.debug("Scheduled job skipped for context shutdown: {}.{}", beanDefinition.getBeanType().getSimpleName(), method.getDescription(true));
} catch (Exception e) {
TaskExceptionHandler finalHandler = findHandler(beanDefinition.getBeanType(), e);
finalHandler.handleCreationFailure(beanDefinition, e);
}
};

ScheduledFuture<?> scheduledFuture = taskScheduler.schedule(cronExpr, zoneIdStr, task);
scheduledTasks.add(scheduledFuture);
} else if (StringUtils.isNotEmpty(fixedRate)) {
Optional<Duration> converted = conversionService.convert(fixedRate, Duration.class);
Duration duration = converted.orElseThrow(() ->
new SchedulerConfigurationException(method, "Invalid fixed rate definition: " + fixedRate)
);
String cronExpr = scheduledAnnotation.stringValue(MEMBER_CRON).orElse(null);
String zoneIdStr = scheduledAnnotation.stringValue(MEMBER_ZONE_ID).orElse(null);
String fixedDelay = scheduledAnnotation.stringValue(MEMBER_FIXED_DELAY).orElse(null);

if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling fixed rate task [{}] for method: {}", duration, method);
}
if (StringUtils.isNotEmpty(cronExpr)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling cron task [{}] for method: {}", cronExpr, method);
}

ScheduledFuture<?> scheduledFuture = taskScheduler.scheduleAtFixedRate(initialDelay, duration, task);
scheduledTasks.add(scheduledFuture);
} else if (StringUtils.isNotEmpty(fixedDelay)) {
Optional<Duration> converted = conversionService.convert(fixedDelay, Duration.class);
Duration duration = converted.orElseThrow(() ->
new SchedulerConfigurationException(method, "Invalid fixed delay definition: " + fixedDelay)
);
ScheduledFuture<?> scheduledFuture = taskScheduler.schedule(cronExpr, zoneIdStr, task);
scheduledTasks.add(scheduledFuture);
} else if (StringUtils.isNotEmpty(fixedRate)) {
Optional<Duration> converted = conversionService.convert(fixedRate, Duration.class);
Duration duration = converted.orElseThrow(() ->
new SchedulerConfigurationException(method, "Invalid fixed rate definition: " + fixedRate)
);

if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling fixed rate task [{}] for method: {}", duration, method);
}

if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling fixed delay task [{}] for method: {}", duration, method);
}
ScheduledFuture<?> scheduledFuture = taskScheduler.scheduleAtFixedRate(initialDelay, duration, task);
scheduledTasks.add(scheduledFuture);
} else if (StringUtils.isNotEmpty(fixedDelay)) {
Optional<Duration> converted = conversionService.convert(fixedDelay, Duration.class);
Duration duration = converted.orElseThrow(() ->
new SchedulerConfigurationException(method, "Invalid fixed delay definition: " + fixedDelay)
);

ScheduledFuture<?> scheduledFuture = taskScheduler.scheduleWithFixedDelay(initialDelay, duration, task);
scheduledTasks.add(scheduledFuture);
} else if (initialDelay != null) {
ScheduledFuture<?> scheduledFuture = taskScheduler.schedule(initialDelay, task);

scheduledTasks.add(scheduledFuture);
} else {
throw new SchedulerConfigurationException(method, "Failed to schedule task. Invalid definition");
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling fixed delay task [{}] for method: {}", duration, method);
}

ScheduledFuture<?> scheduledFuture = taskScheduler.scheduleWithFixedDelay(initialDelay, duration, task);
scheduledTasks.add(scheduledFuture);
} else if (initialDelay != null) {
ScheduledFuture<?> scheduledFuture = taskScheduler.schedule(initialDelay, task);

scheduledTasks.add(scheduledFuture);
} else {
throw new SchedulerConfigurationException(method, "Failed to schedule task. Invalid definition");
}
}
}
Expand Down Expand Up @@ -237,4 +247,23 @@ public void close() {
private record ScheduledDefinition(
BeanDefinition<?> definition,
ExecutableMethod<?, ?> method) { }

/**
* This Runnable calls {@link #scheduleTask(ScheduledDefinition)} exactly once, even if invoked
* multiple times from multiple threads.
*/
private class ScheduleTaskRunnable extends AtomicBoolean implements Runnable {
private final ScheduledDefinition definition;

ScheduleTaskRunnable(ScheduledDefinition definition) {
this.definition = definition;
}

@Override
public void run() {
if (compareAndSet(false, true)) {
scheduleTask(definition);
}
}
}
}

0 comments on commit a0ff610

Please sign in to comment.