Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition in task scheduling #10892

Merged
merged 1 commit into from
Jun 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,21 @@
import io.micronaut.scheduling.exceptions.SchedulerConfigurationException;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Singleton;
import java.util.ArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* A {@link ExecutableMethodProcessor} for the {@link Scheduled} annotation.
Expand All @@ -76,8 +78,9 @@ public class ScheduledMethodProcessor implements ExecutableMethodProcessor<Sched
private final BeanContext beanContext;
private final ConversionService conversionService;
private final Queue<ScheduledFuture<?>> scheduledTasks = new ConcurrentLinkedDeque<>();
private final List<ScheduledDefinition> scheduledMethods = new ArrayList<>();
private final Map<ScheduledDefinition, Runnable> scheduledMethods = new ConcurrentHashMap<>();
private final TaskExceptionHandler<?, ?> taskExceptionHandler;
private volatile boolean started = false;

/**
* @param beanContext The bean context for DI of beans annotated with @Inject
Expand All @@ -95,8 +98,10 @@ public ScheduledMethodProcessor(BeanContext beanContext, Optional<ConversionServ
public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> method) {
if (beanContext instanceof ApplicationContext) {
ScheduledDefinition scheduledDefinition = new ScheduledDefinition(beanDefinition, method);
if (!scheduledMethods.contains(scheduledDefinition)) {
this.scheduledMethods.add(scheduledDefinition);
Runnable runnable = new ScheduleTaskRunnable(scheduledDefinition);
// process may be called during or after scheduleTasks. we need to guard against that.
if (scheduledMethods.putIfAbsent(scheduledDefinition, runnable) == null && started) {
runnable.run();
}
}
}
Expand All @@ -106,104 +111,109 @@ public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> met
* @param startupEvent The startup event.
*/
@EventListener
@SuppressWarnings("unchecked")
void scheduleTasks(@SuppressWarnings("unused") StartupEvent startupEvent) {
for (ScheduledDefinition scheduledDefinition : scheduledMethods) {
ExecutableMethod<?, ?> method = scheduledDefinition.method();
BeanDefinition<?> beanDefinition = scheduledDefinition.definition();
List<AnnotationValue<Scheduled>> scheduledAnnotations = method.getAnnotationValuesByType(Scheduled.class);
for (AnnotationValue<Scheduled> scheduledAnnotation : scheduledAnnotations) {
String fixedRate = scheduledAnnotation.stringValue(MEMBER_FIXED_RATE).orElse(null);

String initialDelayStr = scheduledAnnotation.stringValue(MEMBER_INITIAL_DELAY).orElse(null);
Duration initialDelay = null;
if (StringUtils.hasText(initialDelayStr)) {
initialDelay = conversionService.convert(initialDelayStr, Duration.class).orElseThrow(() ->
new SchedulerConfigurationException(method, "Invalid initial delay definition: " + initialDelayStr)
);
}
started = true;
for (Runnable runnable : scheduledMethods.values()) {
runnable.run();
}
}

String scheduler = scheduledAnnotation.stringValue(MEMBER_SCHEDULER).orElse(TaskExecutors.SCHEDULED);
Optional<TaskScheduler> optionalTaskScheduler = beanContext
.findBean(TaskScheduler.class, Qualifiers.byName(scheduler));
@SuppressWarnings("unchecked")
private void scheduleTask(ScheduledDefinition scheduledDefinition) {
ExecutableMethod<?, ?> method = scheduledDefinition.method();
BeanDefinition<?> beanDefinition = scheduledDefinition.definition();
List<AnnotationValue<Scheduled>> scheduledAnnotations = method.getAnnotationValuesByType(Scheduled.class);
for (AnnotationValue<Scheduled> scheduledAnnotation : scheduledAnnotations) {
String fixedRate = scheduledAnnotation.stringValue(MEMBER_FIXED_RATE).orElse(null);

if (optionalTaskScheduler.isEmpty()) {
optionalTaskScheduler = beanContext.findBean(ExecutorService.class, Qualifiers.byName(scheduler))
.filter(ScheduledExecutorService.class::isInstance)
.map(ScheduledExecutorTaskScheduler::new);
}
String initialDelayStr = scheduledAnnotation.stringValue(MEMBER_INITIAL_DELAY).orElse(null);
Duration initialDelay = null;
if (StringUtils.hasText(initialDelayStr)) {
initialDelay = conversionService.convert(initialDelayStr, Duration.class).orElseThrow(() ->
new SchedulerConfigurationException(method, "Invalid initial delay definition: " + initialDelayStr)
);
}

TaskScheduler taskScheduler = optionalTaskScheduler.orElseThrow(() -> new SchedulerConfigurationException(method, "No scheduler of type TaskScheduler configured for name: " + scheduler));
Runnable task = () -> {
try {
ExecutableBeanContextBinder binder = new DefaultExecutableBeanContextBinder();
BoundExecutable<?, ?> boundExecutable = binder.bind(method, beanContext);
Object bean = beanContext.getBean((Argument<Object>) beanDefinition.asArgument(), (Qualifier<Object>) beanDefinition.getDeclaredQualifier());
AnnotationValue<Scheduled> finalAnnotationValue = scheduledAnnotation;
if (finalAnnotationValue instanceof EvaluatedAnnotationValue<Scheduled> evaluated) {
finalAnnotationValue = evaluated.withArguments(bean, boundExecutable.getBoundArguments());
}
boolean shouldRun = finalAnnotationValue.booleanValue(MEMBER_CONDITION).orElse(true);
if (shouldRun) {
try {
((BoundExecutable<Object, Object>) boundExecutable).invoke(bean);
} catch (Throwable e) {
handleException((Class<Object>) beanDefinition.getBeanType(), bean, e);
}
}
} catch (NoSuchBeanException noSuchBeanException) {
// ignore: a timing issue can occur when the context is being shutdown. If a scheduled job runs and the context
// is shutdown and available beans cleared then the bean is no longer available. The best thing to do here is just ignore the failure.
LOG.debug("Scheduled job skipped for context shutdown: {}.{}", beanDefinition.getBeanType().getSimpleName(), method.getDescription(true));
} catch (Exception e) {
TaskExceptionHandler finalHandler = findHandler(beanDefinition.getBeanType(), e);
finalHandler.handleCreationFailure(beanDefinition, e);
}
};
String scheduler = scheduledAnnotation.stringValue(MEMBER_SCHEDULER).orElse(TaskExecutors.SCHEDULED);
Optional<TaskScheduler> optionalTaskScheduler = beanContext
.findBean(TaskScheduler.class, Qualifiers.byName(scheduler));

String cronExpr = scheduledAnnotation.stringValue(MEMBER_CRON).orElse(null);
String zoneIdStr = scheduledAnnotation.stringValue(MEMBER_ZONE_ID).orElse(null);
String fixedDelay = scheduledAnnotation.stringValue(MEMBER_FIXED_DELAY).orElse(null);
if (optionalTaskScheduler.isEmpty()) {
optionalTaskScheduler = beanContext.findBean(ExecutorService.class, Qualifiers.byName(scheduler))
.filter(ScheduledExecutorService.class::isInstance)
.map(ScheduledExecutorTaskScheduler::new);
}

if (StringUtils.isNotEmpty(cronExpr)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling cron task [{}] for method: {}", cronExpr, method);
TaskScheduler taskScheduler = optionalTaskScheduler.orElseThrow(() -> new SchedulerConfigurationException(method, "No scheduler of type TaskScheduler configured for name: " + scheduler));
Runnable task = () -> {
try {
ExecutableBeanContextBinder binder = new DefaultExecutableBeanContextBinder();
BoundExecutable<?, ?> boundExecutable = binder.bind(method, beanContext);
Object bean = beanContext.getBean((Argument<Object>) beanDefinition.asArgument(), (Qualifier<Object>) beanDefinition.getDeclaredQualifier());
AnnotationValue<Scheduled> finalAnnotationValue = scheduledAnnotation;
if (finalAnnotationValue instanceof EvaluatedAnnotationValue<Scheduled> evaluated) {
finalAnnotationValue = evaluated.withArguments(bean, boundExecutable.getBoundArguments());
}
boolean shouldRun = finalAnnotationValue.booleanValue(MEMBER_CONDITION).orElse(true);
if (shouldRun) {
try {
((BoundExecutable<Object, Object>) boundExecutable).invoke(bean);
} catch (Throwable e) {
handleException((Class<Object>) beanDefinition.getBeanType(), bean, e);
}
}
} catch (NoSuchBeanException noSuchBeanException) {
// ignore: a timing issue can occur when the context is being shutdown. If a scheduled job runs and the context
// is shutdown and available beans cleared then the bean is no longer available. The best thing to do here is just ignore the failure.
LOG.debug("Scheduled job skipped for context shutdown: {}.{}", beanDefinition.getBeanType().getSimpleName(), method.getDescription(true));
} catch (Exception e) {
TaskExceptionHandler finalHandler = findHandler(beanDefinition.getBeanType(), e);
finalHandler.handleCreationFailure(beanDefinition, e);
}
};

ScheduledFuture<?> scheduledFuture = taskScheduler.schedule(cronExpr, zoneIdStr, task);
scheduledTasks.add(scheduledFuture);
} else if (StringUtils.isNotEmpty(fixedRate)) {
Optional<Duration> converted = conversionService.convert(fixedRate, Duration.class);
Duration duration = converted.orElseThrow(() ->
new SchedulerConfigurationException(method, "Invalid fixed rate definition: " + fixedRate)
);
String cronExpr = scheduledAnnotation.stringValue(MEMBER_CRON).orElse(null);
String zoneIdStr = scheduledAnnotation.stringValue(MEMBER_ZONE_ID).orElse(null);
String fixedDelay = scheduledAnnotation.stringValue(MEMBER_FIXED_DELAY).orElse(null);

if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling fixed rate task [{}] for method: {}", duration, method);
}
if (StringUtils.isNotEmpty(cronExpr)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling cron task [{}] for method: {}", cronExpr, method);
}

ScheduledFuture<?> scheduledFuture = taskScheduler.scheduleAtFixedRate(initialDelay, duration, task);
scheduledTasks.add(scheduledFuture);
} else if (StringUtils.isNotEmpty(fixedDelay)) {
Optional<Duration> converted = conversionService.convert(fixedDelay, Duration.class);
Duration duration = converted.orElseThrow(() ->
new SchedulerConfigurationException(method, "Invalid fixed delay definition: " + fixedDelay)
);
ScheduledFuture<?> scheduledFuture = taskScheduler.schedule(cronExpr, zoneIdStr, task);
scheduledTasks.add(scheduledFuture);
} else if (StringUtils.isNotEmpty(fixedRate)) {
Optional<Duration> converted = conversionService.convert(fixedRate, Duration.class);
Duration duration = converted.orElseThrow(() ->
new SchedulerConfigurationException(method, "Invalid fixed rate definition: " + fixedRate)
);

if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling fixed rate task [{}] for method: {}", duration, method);
}

if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling fixed delay task [{}] for method: {}", duration, method);
}
ScheduledFuture<?> scheduledFuture = taskScheduler.scheduleAtFixedRate(initialDelay, duration, task);
scheduledTasks.add(scheduledFuture);
} else if (StringUtils.isNotEmpty(fixedDelay)) {
Optional<Duration> converted = conversionService.convert(fixedDelay, Duration.class);
Duration duration = converted.orElseThrow(() ->
new SchedulerConfigurationException(method, "Invalid fixed delay definition: " + fixedDelay)
);

ScheduledFuture<?> scheduledFuture = taskScheduler.scheduleWithFixedDelay(initialDelay, duration, task);
scheduledTasks.add(scheduledFuture);
} else if (initialDelay != null) {
ScheduledFuture<?> scheduledFuture = taskScheduler.schedule(initialDelay, task);

scheduledTasks.add(scheduledFuture);
} else {
throw new SchedulerConfigurationException(method, "Failed to schedule task. Invalid definition");
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling fixed delay task [{}] for method: {}", duration, method);
}

ScheduledFuture<?> scheduledFuture = taskScheduler.scheduleWithFixedDelay(initialDelay, duration, task);
scheduledTasks.add(scheduledFuture);
} else if (initialDelay != null) {
ScheduledFuture<?> scheduledFuture = taskScheduler.schedule(initialDelay, task);

scheduledTasks.add(scheduledFuture);
} else {
throw new SchedulerConfigurationException(method, "Failed to schedule task. Invalid definition");
}
}
}
Expand Down Expand Up @@ -237,4 +247,23 @@ public void close() {
private record ScheduledDefinition(
BeanDefinition<?> definition,
ExecutableMethod<?, ?> method) { }

/**
* This Runnable calls {@link #scheduleTask(ScheduledDefinition)} exactly once, even if invoked
* multiple times from multiple threads.
*/
private class ScheduleTaskRunnable extends AtomicBoolean implements Runnable {
private final ScheduledDefinition definition;

ScheduleTaskRunnable(ScheduledDefinition definition) {
this.definition = definition;
}

@Override
public void run() {
if (compareAndSet(false, true)) {
scheduleTask(definition);
}
}
}
}
Loading