From a0ff6109345499698ee8b1aeb1b8d381df9a0974 Mon Sep 17 00:00:00 2001 From: Jonas Konrad Date: Fri, 7 Jun 2024 17:08:14 +0200 Subject: [PATCH] Fix race condition in task scheduling (#10892) Under some circumstances, ExecutableMethodProcessor.process may be called during or even after the StartupEvent is fired. In that case, scheduled methods may be ignored or there may be a ConcurrentModificationException. This patch moves the schedule logic into its own method, called by a class ScheduleTaskRunnable that guards the schedule logic using an AtomicBoolean so that it is run only once. Inside process(), the task is created, registered in a ConcurrentHashMap and, if the StartupEvent has already been fired, invoked. Inside the StartupEvent listener, all previously registered tasks are invoked. - If process is called before the started flag is set, the task will definitely be run by the StartupEvent listener. - If process is called after the started flag is set, the task will definitely be called inside process, and potentially also inside the StartupEvent listener if that is still running. This ensures that the ScheduleTaskRunnable is invoked at least once. --- .../processor/ScheduledMethodProcessor.java | 203 ++++++++++-------- 1 file changed, 116 insertions(+), 87 deletions(-) diff --git a/context/src/main/java/io/micronaut/scheduling/processor/ScheduledMethodProcessor.java b/context/src/main/java/io/micronaut/scheduling/processor/ScheduledMethodProcessor.java index c2c1f88b59f..a17fce1bfbf 100644 --- a/context/src/main/java/io/micronaut/scheduling/processor/ScheduledMethodProcessor.java +++ b/context/src/main/java/io/micronaut/scheduling/processor/ScheduledMethodProcessor.java @@ -41,19 +41,21 @@ import io.micronaut.scheduling.exceptions.SchedulerConfigurationException; import jakarta.annotation.PreDestroy; import jakarta.inject.Singleton; -import java.util.ArrayList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.time.Duration; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; /** * A {@link ExecutableMethodProcessor} for the {@link Scheduled} annotation. @@ -76,8 +78,9 @@ public class ScheduledMethodProcessor implements ExecutableMethodProcessor> scheduledTasks = new ConcurrentLinkedDeque<>(); - private final List scheduledMethods = new ArrayList<>(); + private final Map scheduledMethods = new ConcurrentHashMap<>(); private final TaskExceptionHandler taskExceptionHandler; + private volatile boolean started = false; /** * @param beanContext The bean context for DI of beans annotated with @Inject @@ -95,8 +98,10 @@ public ScheduledMethodProcessor(BeanContext beanContext, Optional beanDefinition, ExecutableMethod method) { if (beanContext instanceof ApplicationContext) { ScheduledDefinition scheduledDefinition = new ScheduledDefinition(beanDefinition, method); - if (!scheduledMethods.contains(scheduledDefinition)) { - this.scheduledMethods.add(scheduledDefinition); + Runnable runnable = new ScheduleTaskRunnable(scheduledDefinition); + // process may be called during or after scheduleTasks. we need to guard against that. + if (scheduledMethods.putIfAbsent(scheduledDefinition, runnable) == null && started) { + runnable.run(); } } } @@ -106,104 +111,109 @@ public void process(BeanDefinition beanDefinition, ExecutableMethod met * @param startupEvent The startup event. */ @EventListener - @SuppressWarnings("unchecked") void scheduleTasks(@SuppressWarnings("unused") StartupEvent startupEvent) { - for (ScheduledDefinition scheduledDefinition : scheduledMethods) { - ExecutableMethod method = scheduledDefinition.method(); - BeanDefinition beanDefinition = scheduledDefinition.definition(); - List> scheduledAnnotations = method.getAnnotationValuesByType(Scheduled.class); - for (AnnotationValue scheduledAnnotation : scheduledAnnotations) { - String fixedRate = scheduledAnnotation.stringValue(MEMBER_FIXED_RATE).orElse(null); - - String initialDelayStr = scheduledAnnotation.stringValue(MEMBER_INITIAL_DELAY).orElse(null); - Duration initialDelay = null; - if (StringUtils.hasText(initialDelayStr)) { - initialDelay = conversionService.convert(initialDelayStr, Duration.class).orElseThrow(() -> - new SchedulerConfigurationException(method, "Invalid initial delay definition: " + initialDelayStr) - ); - } + started = true; + for (Runnable runnable : scheduledMethods.values()) { + runnable.run(); + } + } - String scheduler = scheduledAnnotation.stringValue(MEMBER_SCHEDULER).orElse(TaskExecutors.SCHEDULED); - Optional optionalTaskScheduler = beanContext - .findBean(TaskScheduler.class, Qualifiers.byName(scheduler)); + @SuppressWarnings("unchecked") + private void scheduleTask(ScheduledDefinition scheduledDefinition) { + ExecutableMethod method = scheduledDefinition.method(); + BeanDefinition beanDefinition = scheduledDefinition.definition(); + List> scheduledAnnotations = method.getAnnotationValuesByType(Scheduled.class); + for (AnnotationValue scheduledAnnotation : scheduledAnnotations) { + String fixedRate = scheduledAnnotation.stringValue(MEMBER_FIXED_RATE).orElse(null); - if (optionalTaskScheduler.isEmpty()) { - optionalTaskScheduler = beanContext.findBean(ExecutorService.class, Qualifiers.byName(scheduler)) - .filter(ScheduledExecutorService.class::isInstance) - .map(ScheduledExecutorTaskScheduler::new); - } + String initialDelayStr = scheduledAnnotation.stringValue(MEMBER_INITIAL_DELAY).orElse(null); + Duration initialDelay = null; + if (StringUtils.hasText(initialDelayStr)) { + initialDelay = conversionService.convert(initialDelayStr, Duration.class).orElseThrow(() -> + new SchedulerConfigurationException(method, "Invalid initial delay definition: " + initialDelayStr) + ); + } - TaskScheduler taskScheduler = optionalTaskScheduler.orElseThrow(() -> new SchedulerConfigurationException(method, "No scheduler of type TaskScheduler configured for name: " + scheduler)); - Runnable task = () -> { - try { - ExecutableBeanContextBinder binder = new DefaultExecutableBeanContextBinder(); - BoundExecutable boundExecutable = binder.bind(method, beanContext); - Object bean = beanContext.getBean((Argument) beanDefinition.asArgument(), (Qualifier) beanDefinition.getDeclaredQualifier()); - AnnotationValue finalAnnotationValue = scheduledAnnotation; - if (finalAnnotationValue instanceof EvaluatedAnnotationValue evaluated) { - finalAnnotationValue = evaluated.withArguments(bean, boundExecutable.getBoundArguments()); - } - boolean shouldRun = finalAnnotationValue.booleanValue(MEMBER_CONDITION).orElse(true); - if (shouldRun) { - try { - ((BoundExecutable) boundExecutable).invoke(bean); - } catch (Throwable e) { - handleException((Class) beanDefinition.getBeanType(), bean, e); - } - } - } catch (NoSuchBeanException noSuchBeanException) { - // ignore: a timing issue can occur when the context is being shutdown. If a scheduled job runs and the context - // is shutdown and available beans cleared then the bean is no longer available. The best thing to do here is just ignore the failure. - LOG.debug("Scheduled job skipped for context shutdown: {}.{}", beanDefinition.getBeanType().getSimpleName(), method.getDescription(true)); - } catch (Exception e) { - TaskExceptionHandler finalHandler = findHandler(beanDefinition.getBeanType(), e); - finalHandler.handleCreationFailure(beanDefinition, e); - } - }; + String scheduler = scheduledAnnotation.stringValue(MEMBER_SCHEDULER).orElse(TaskExecutors.SCHEDULED); + Optional optionalTaskScheduler = beanContext + .findBean(TaskScheduler.class, Qualifiers.byName(scheduler)); - String cronExpr = scheduledAnnotation.stringValue(MEMBER_CRON).orElse(null); - String zoneIdStr = scheduledAnnotation.stringValue(MEMBER_ZONE_ID).orElse(null); - String fixedDelay = scheduledAnnotation.stringValue(MEMBER_FIXED_DELAY).orElse(null); + if (optionalTaskScheduler.isEmpty()) { + optionalTaskScheduler = beanContext.findBean(ExecutorService.class, Qualifiers.byName(scheduler)) + .filter(ScheduledExecutorService.class::isInstance) + .map(ScheduledExecutorTaskScheduler::new); + } - if (StringUtils.isNotEmpty(cronExpr)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Scheduling cron task [{}] for method: {}", cronExpr, method); + TaskScheduler taskScheduler = optionalTaskScheduler.orElseThrow(() -> new SchedulerConfigurationException(method, "No scheduler of type TaskScheduler configured for name: " + scheduler)); + Runnable task = () -> { + try { + ExecutableBeanContextBinder binder = new DefaultExecutableBeanContextBinder(); + BoundExecutable boundExecutable = binder.bind(method, beanContext); + Object bean = beanContext.getBean((Argument) beanDefinition.asArgument(), (Qualifier) beanDefinition.getDeclaredQualifier()); + AnnotationValue finalAnnotationValue = scheduledAnnotation; + if (finalAnnotationValue instanceof EvaluatedAnnotationValue evaluated) { + finalAnnotationValue = evaluated.withArguments(bean, boundExecutable.getBoundArguments()); } + boolean shouldRun = finalAnnotationValue.booleanValue(MEMBER_CONDITION).orElse(true); + if (shouldRun) { + try { + ((BoundExecutable) boundExecutable).invoke(bean); + } catch (Throwable e) { + handleException((Class) beanDefinition.getBeanType(), bean, e); + } + } + } catch (NoSuchBeanException noSuchBeanException) { + // ignore: a timing issue can occur when the context is being shutdown. If a scheduled job runs and the context + // is shutdown and available beans cleared then the bean is no longer available. The best thing to do here is just ignore the failure. + LOG.debug("Scheduled job skipped for context shutdown: {}.{}", beanDefinition.getBeanType().getSimpleName(), method.getDescription(true)); + } catch (Exception e) { + TaskExceptionHandler finalHandler = findHandler(beanDefinition.getBeanType(), e); + finalHandler.handleCreationFailure(beanDefinition, e); + } + }; - ScheduledFuture scheduledFuture = taskScheduler.schedule(cronExpr, zoneIdStr, task); - scheduledTasks.add(scheduledFuture); - } else if (StringUtils.isNotEmpty(fixedRate)) { - Optional converted = conversionService.convert(fixedRate, Duration.class); - Duration duration = converted.orElseThrow(() -> - new SchedulerConfigurationException(method, "Invalid fixed rate definition: " + fixedRate) - ); + String cronExpr = scheduledAnnotation.stringValue(MEMBER_CRON).orElse(null); + String zoneIdStr = scheduledAnnotation.stringValue(MEMBER_ZONE_ID).orElse(null); + String fixedDelay = scheduledAnnotation.stringValue(MEMBER_FIXED_DELAY).orElse(null); - if (LOG.isDebugEnabled()) { - LOG.debug("Scheduling fixed rate task [{}] for method: {}", duration, method); - } + if (StringUtils.isNotEmpty(cronExpr)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Scheduling cron task [{}] for method: {}", cronExpr, method); + } - ScheduledFuture scheduledFuture = taskScheduler.scheduleAtFixedRate(initialDelay, duration, task); - scheduledTasks.add(scheduledFuture); - } else if (StringUtils.isNotEmpty(fixedDelay)) { - Optional converted = conversionService.convert(fixedDelay, Duration.class); - Duration duration = converted.orElseThrow(() -> - new SchedulerConfigurationException(method, "Invalid fixed delay definition: " + fixedDelay) - ); + ScheduledFuture scheduledFuture = taskScheduler.schedule(cronExpr, zoneIdStr, task); + scheduledTasks.add(scheduledFuture); + } else if (StringUtils.isNotEmpty(fixedRate)) { + Optional converted = conversionService.convert(fixedRate, Duration.class); + Duration duration = converted.orElseThrow(() -> + new SchedulerConfigurationException(method, "Invalid fixed rate definition: " + fixedRate) + ); + if (LOG.isDebugEnabled()) { + LOG.debug("Scheduling fixed rate task [{}] for method: {}", duration, method); + } - if (LOG.isDebugEnabled()) { - LOG.debug("Scheduling fixed delay task [{}] for method: {}", duration, method); - } + ScheduledFuture scheduledFuture = taskScheduler.scheduleAtFixedRate(initialDelay, duration, task); + scheduledTasks.add(scheduledFuture); + } else if (StringUtils.isNotEmpty(fixedDelay)) { + Optional converted = conversionService.convert(fixedDelay, Duration.class); + Duration duration = converted.orElseThrow(() -> + new SchedulerConfigurationException(method, "Invalid fixed delay definition: " + fixedDelay) + ); - ScheduledFuture scheduledFuture = taskScheduler.scheduleWithFixedDelay(initialDelay, duration, task); - scheduledTasks.add(scheduledFuture); - } else if (initialDelay != null) { - ScheduledFuture scheduledFuture = taskScheduler.schedule(initialDelay, task); - scheduledTasks.add(scheduledFuture); - } else { - throw new SchedulerConfigurationException(method, "Failed to schedule task. Invalid definition"); + if (LOG.isDebugEnabled()) { + LOG.debug("Scheduling fixed delay task [{}] for method: {}", duration, method); } + + ScheduledFuture scheduledFuture = taskScheduler.scheduleWithFixedDelay(initialDelay, duration, task); + scheduledTasks.add(scheduledFuture); + } else if (initialDelay != null) { + ScheduledFuture scheduledFuture = taskScheduler.schedule(initialDelay, task); + + scheduledTasks.add(scheduledFuture); + } else { + throw new SchedulerConfigurationException(method, "Failed to schedule task. Invalid definition"); } } } @@ -237,4 +247,23 @@ public void close() { private record ScheduledDefinition( BeanDefinition definition, ExecutableMethod method) { } + + /** + * This Runnable calls {@link #scheduleTask(ScheduledDefinition)} exactly once, even if invoked + * multiple times from multiple threads. + */ + private class ScheduleTaskRunnable extends AtomicBoolean implements Runnable { + private final ScheduledDefinition definition; + + ScheduleTaskRunnable(ScheduledDefinition definition) { + this.definition = definition; + } + + @Override + public void run() { + if (compareAndSet(false, true)) { + scheduleTask(definition); + } + } + } }