diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/DueExecutionsBatch.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/DueExecutionsBatch.java index cd649b06..2f044510 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/DueExecutionsBatch.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/DueExecutionsBatch.java @@ -57,19 +57,39 @@ public void oneExecutionDone(Runnable triggerCheckForNewBatch) { triggeredExecuteDue, possiblyMoreExecutionsInDb, executionsLeftInBatch.get()); - // Will not synchronize this method as it is not a big problem if two threads manage to call +// Will not synchronize this method as it is not a big problem if two threads manage to call // triggerCheckForNewBatch.run() at the same time. // There is synchronization further in, when waking the thread that will do the fetching. - if (!stale - && !triggeredExecuteDue - && possiblyMoreExecutionsInDb - && whenToTriggerCheckForNewBatch.test(executionsLeftInBatch.get())) { + if (isBatchValidForTrigger()) { LOG.trace("Triggering check for new batch."); triggerCheckForNewBatch.run(); triggeredExecuteDue = true; } } + private boolean isBatchValidForTrigger() { + return isNotStale() + && isNotAlreadyTriggered() + && hasPossibleExecutions() + && isTriggerConditionMet(); + } + + private boolean isNotStale() { + return !stale; + } + + private boolean isNotAlreadyTriggered() { + return !triggeredExecuteDue; + } + + private boolean hasPossibleExecutions() { + return possiblyMoreExecutionsInDb; + } + + private boolean isTriggerConditionMet() { + return whenToTriggerCheckForNewBatch.test(executionsLeftInBatch.get()); + } + public boolean isOlderGenerationThan(int compareTo) { return generationNumber < compareTo; } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java index 32d1c329..2e52b706 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java @@ -48,7 +48,7 @@ public class Scheduler implements SchedulerClient { public static final double TRIGGER_NEXT_BATCH_WHEN_AVAILABLE_THREADS_RATIO = 0.5; public static final String THREAD_PREFIX = "db-scheduler"; private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class); - private final SchedulerClient delegate; + private final SchedulerClient schedulerClientDelegate; final Clock clock; final TaskRepository schedulerTaskRepository; final TaskResolver taskResolver; @@ -110,7 +110,8 @@ protected Scheduler( this.schedulerListeners = new SchedulerListeners(schedulerListeners); this.dueExecutor = dueExecutor; this.housekeeperExecutor = housekeeperExecutor; - delegate = new StandardSchedulerClient(clientTaskRepository, this.schedulerListeners, clock); + schedulerClientDelegate = + new StandardSchedulerClient(clientTaskRepository, this.schedulerListeners, clock); this.failureLogger = ConfigurableLogger.create(LOG, logLevel, logStackTrace); if (pollingStrategyConfig.type == PollingStrategyConfig.Type.LOCK_AND_FETCH) { @@ -251,59 +252,59 @@ public SchedulerState getSchedulerState() { @Override public void schedule(SchedulableInstance schedulableInstance) { - this.delegate.schedule(schedulableInstance); + this.schedulerClientDelegate.schedule(schedulableInstance); } @Override public boolean scheduleIfNotExists(TaskInstance taskInstance, Instant executionTime) { - return this.delegate.scheduleIfNotExists(taskInstance, executionTime); + return this.schedulerClientDelegate.scheduleIfNotExists(taskInstance, executionTime); } @Override public boolean scheduleIfNotExists(SchedulableInstance schedulableInstance) { - return this.delegate.scheduleIfNotExists(schedulableInstance); + return this.schedulerClientDelegate.scheduleIfNotExists(schedulableInstance); } @Override public void schedule(TaskInstance taskInstance, Instant executionTime) { - this.delegate.schedule(taskInstance, executionTime); + this.schedulerClientDelegate.schedule(taskInstance, executionTime); } @Override public void reschedule(TaskInstanceId taskInstanceId, Instant newExecutionTime) { - this.delegate.reschedule(taskInstanceId, newExecutionTime); + this.schedulerClientDelegate.reschedule(taskInstanceId, newExecutionTime); } @Override public void reschedule(SchedulableInstance schedulableInstance) { - this.delegate.reschedule(schedulableInstance); + this.schedulerClientDelegate.reschedule(schedulableInstance); } @Override public void reschedule(TaskInstanceId taskInstanceId, Instant newExecutionTime, T newData) { - this.delegate.reschedule(taskInstanceId, newExecutionTime, newData); + this.schedulerClientDelegate.reschedule(taskInstanceId, newExecutionTime, newData); } @Override public void cancel(TaskInstanceId taskInstanceId) { - this.delegate.cancel(taskInstanceId); + this.schedulerClientDelegate.cancel(taskInstanceId); } @Override public void fetchScheduledExecutions(Consumer> consumer) { - this.delegate.fetchScheduledExecutions(consumer); + this.schedulerClientDelegate.fetchScheduledExecutions(consumer); } @Override public void fetchScheduledExecutions( ScheduledExecutionsFilter filter, Consumer> consumer) { - this.delegate.fetchScheduledExecutions(filter, consumer); + this.schedulerClientDelegate.fetchScheduledExecutions(filter, consumer); } @Override public void fetchScheduledExecutionsForTask( String taskName, Class dataClass, Consumer> consumer) { - this.delegate.fetchScheduledExecutionsForTask(taskName, dataClass, consumer); + this.schedulerClientDelegate.fetchScheduledExecutionsForTask(taskName, dataClass, consumer); } @Override @@ -312,12 +313,13 @@ public void fetchScheduledExecutionsForTask( Class dataClass, ScheduledExecutionsFilter filter, Consumer> consumer) { - this.delegate.fetchScheduledExecutionsForTask(taskName, dataClass, filter, consumer); + this.schedulerClientDelegate.fetchScheduledExecutionsForTask( + taskName, dataClass, filter, consumer); } @Override public Optional> getScheduledExecution(TaskInstanceId taskInstanceId) { - return this.delegate.getScheduledExecution(taskInstanceId); + return this.schedulerClientDelegate.getScheduledExecution(taskInstanceId); } public List getFailingExecutions(Duration failingAtLeastFor) { @@ -396,16 +398,25 @@ protected void detectDeadExecutions() { } void updateHeartbeats() { - final List currentlyProcessing = executor.getCurrentlyExecuting(); + final List currentlyProcessing = getCurrentlyProcessingExecutions(); + if (currentlyProcessing.isEmpty()) return; + + updateAllHeartbeats(currentlyProcessing); + schedulerListeners.onSchedulerEvent(SchedulerEventType.RAN_UPDATE_HEARTBEATS); + } + + private List getCurrentlyProcessingExecutions() { + List currentlyProcessing = executor.getCurrentlyExecuting(); if (currentlyProcessing.isEmpty()) { LOG.trace("No executions to update heartbeats for. Skipping."); - return; } + return currentlyProcessing; + } + private void updateAllHeartbeats(List currentlyProcessing) { LOG.debug("Updating heartbeats for {} executions being processed.", currentlyProcessing.size()); Instant now = clock.now(); currentlyProcessing.forEach(execution -> updateHeartbeatForExecution(now, execution)); - schedulerListeners.onSchedulerEvent(SchedulerEventType.RAN_UPDATE_HEARTBEATS); } protected void updateHeartbeatForExecution(Instant now, CurrentlyExecuting currentlyExecuting) { diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskResolver.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskResolver.java index e294ef6a..a1539b42 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskResolver.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/TaskResolver.java @@ -58,7 +58,7 @@ public Optional resolve(String taskName) { public Optional resolve(String taskName, boolean addUnresolvedToExclusionFilter) { Task task = taskMap.get(taskName); if (task == null && addUnresolvedToExclusionFilter) { - addUnresolved(taskName); + new UnresolvedTask(taskName).addUnresolved(); statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNRESOLVED_TASK); LOG.info( "Found execution with unknown task-name '{}'. Adding it to the list of known unresolved task-names.", @@ -67,9 +67,9 @@ public Optional resolve(String taskName, boolean addUnresolvedToExclusionF return Optional.ofNullable(task); } - private void addUnresolved(String taskName) { - unresolvedTasks.putIfAbsent(taskName, new UnresolvedTask(taskName)); - } + // private void addUnresolved(String taskName) { + // unresolvedTasks.putIfAbsent(taskName, new UnresolvedTask(taskName)); + // } public void addTask(Task task) { taskMap.put(task.getName(), task); @@ -105,5 +105,9 @@ public UnresolvedTask(String taskName) { public String getTaskName() { return taskName; } + + public void addUnresolved() { + TaskResolver.this.unresolvedTasks.putIfAbsent(taskName, this); + } } } diff --git a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/TaskDescriptor.java b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/TaskDescriptor.java index e4530ede..8d1c13d9 100644 --- a/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/TaskDescriptor.java +++ b/db-scheduler/src/main/java/com/github/kagkarlsson/scheduler/task/TaskDescriptor.java @@ -15,8 +15,6 @@ public interface TaskDescriptor extends HasTaskName { - String getTaskName(); - Class getDataClass(); static TaskDescriptor of(String name) { @@ -24,7 +22,7 @@ static TaskDescriptor of(String name) { } static TaskDescriptor of(String name, Class dataClass) { - return new TaskDescriptor.SimpleTaskDescriptor<>(name, dataClass); + return new SimpleTaskDescriptor<>(name, dataClass); } default TaskInstance.Builder instance(String id) { @@ -35,8 +33,7 @@ default TaskInstanceId instanceId(String id) { return TaskInstanceId.of(getTaskName(), id); } - class SimpleTaskDescriptor implements TaskDescriptor { - + public class SimpleTaskDescriptor implements TaskDescriptor { private final String name; private final Class dataClass; @@ -47,7 +44,7 @@ public SimpleTaskDescriptor(String name, Class dataClass) { @Override public String getTaskName() { - return name; + return name; // Consistent with HasTaskName's expectations } @Override @@ -55,4 +52,10 @@ public Class getDataClass() { return dataClass; } } + + public class VoidTaskDescriptor extends SimpleTaskDescriptor { + public VoidTaskDescriptor(String name) { + super(name, Void.class); + } + } }