Skip to content

Commit

Permalink
Merge pull request #48 from bgooren/work-until-no-more-due-tasks
Browse files Browse the repository at this point in the history
Allow check for new due executions after completing picked tasks
  • Loading branch information
kagkarlsson authored Jan 11, 2019
2 parents b943b26 + be7ec81 commit 63c643d
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 12 deletions.
31 changes: 23 additions & 8 deletions src/main/java/com/github/kagkarlsson/scheduler/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class Scheduler implements SchedulerClient {
private final ExecutorService executorService;
private final Waiter executeDueWaiter;
private boolean enableImmediateExecution;
private final boolean pollAfterCompletion;
private boolean lastPollAcquiredAllAvailableExecutors;
protected final List<OnStartup> onStartup;
private final Waiter detectDeadWaiter;
private final Duration heartbeatInterval;
Expand All @@ -59,22 +61,23 @@ public class Scheduler implements SchedulerClient {
final Semaphore executorsSemaphore;

Scheduler(Clock clock, TaskRepository taskRepository, TaskResolver taskResolver, int maxThreads, SchedulerName schedulerName,
Waiter executeDueWaiter, Duration updateHeartbeatWaiter, boolean enableImmediateExecution, StatsRegistry statsRegistry, int pollingLimit, List<OnStartup> onStartup) {
this(clock, taskRepository, taskResolver, maxThreads, defaultExecutorService(maxThreads), schedulerName, executeDueWaiter, updateHeartbeatWaiter, enableImmediateExecution, statsRegistry, pollingLimit, onStartup);
Waiter executeDueWaiter, Duration updateHeartbeatWaiter, boolean enableImmediateExecution, boolean pollAfterCompletion, StatsRegistry statsRegistry, int pollingLimit, List<OnStartup> onStartup) {
this(clock, taskRepository, taskResolver, maxThreads, defaultExecutorService(maxThreads), schedulerName, executeDueWaiter, updateHeartbeatWaiter, enableImmediateExecution, pollAfterCompletion, statsRegistry, pollingLimit, onStartup);
}

private static ExecutorService defaultExecutorService(int maxThreads) {
return Executors.newFixedThreadPool(maxThreads, defaultThreadFactoryWithPrefix(THREAD_PREFIX + "-"));
}

protected Scheduler(Clock clock, TaskRepository taskRepository, TaskResolver taskResolver, int maxThreads, ExecutorService executorService, SchedulerName schedulerName,
Waiter executeDueWaiter, Duration heartbeatInterval, boolean enableImmediateExecution, StatsRegistry statsRegistry, int pollingLimit, List<OnStartup> onStartup) {
Waiter executeDueWaiter, Duration heartbeatInterval, boolean enableImmediateExecution, boolean pollAfterCompletion, StatsRegistry statsRegistry, int pollingLimit, List<OnStartup> onStartup) {
this.clock = clock;
this.taskRepository = taskRepository;
this.taskResolver = taskResolver;
this.executorService = executorService;
this.executeDueWaiter = executeDueWaiter;
this.enableImmediateExecution = enableImmediateExecution;
this.pollAfterCompletion = pollAfterCompletion;
this.onStartup = onStartup;
this.detectDeadWaiter = new Waiter(heartbeatInterval.multipliedBy(2), clock);
this.heartbeatInterval = heartbeatInterval;
Expand Down Expand Up @@ -185,19 +188,21 @@ protected void executeDue() {
Instant now = clock.now();
List<Execution> dueExecutions = taskRepository.getDue(now, pollingLimit);

int count = 0;
int pickedExecutionCount = 0;
lastPollAcquiredAllAvailableExecutors = false;
LOG.trace("Found {} taskinstances due for execution", dueExecutions.size());
for (Execution e : dueExecutions) {
if (schedulerState.isShuttingDown()) {
LOG.info("Scheduler has been shutdown. Skipping {} due executions.", dueExecutions.size() - count);
LOG.info("Scheduler has been shutdown. Skipping {} due executions.", dueExecutions.size() - pickedExecutionCount);
return;
}

final Optional<Execution> pickedExecution;
try {
pickedExecution = aquireExecutorAndPickExecution(e);
} catch (NoAvailableExecutors ex) {
LOG.debug("No available executors. Skipping {} due executions.", dueExecutions.size() - count);
LOG.debug("No available executors. Skipping {} due executions.", dueExecutions.size() - pickedExecutionCount);
lastPollAcquiredAllAvailableExecutors = true;
return;
}

Expand All @@ -208,7 +213,7 @@ protected void executeDue() {
} else {
LOG.debug("Execution picked by another scheduler. Continuing to next due execution.");
}
count++;
pickedExecutionCount++;
}
}

Expand Down Expand Up @@ -239,6 +244,10 @@ private void releaseExecutor(Execution execution) {
LOG.error("Released execution was not found in collection of executions currently being processed. Should never happen.");
statsRegistry.registerUnexpectedError();
}

if (pollAfterCompletion && lastPollAcquiredAllAvailableExecutors) {
triggerCheckForDueExecutions();
}
}

@SuppressWarnings({"rawtypes","unchecked"})
Expand Down Expand Up @@ -410,6 +419,7 @@ public static class Builder {
protected Serializer serializer = Serializer.DEFAULT_JAVA_SERIALIZER;
protected String tableName = JdbcTaskRepository.DEFAULT_TABLE_NAME;
protected boolean enableImmediateExecution = false;
protected boolean pollAfterCompletion = true;

public Builder(DataSource dataSource, List<Task<?>> knownTasks) {
this.dataSource = dataSource;
Expand Down Expand Up @@ -485,11 +495,16 @@ public Builder enableImmediateExecution() {
return this;
}

public Builder pollAfterCompletion(boolean pollAfterCompletion) {
this.pollAfterCompletion = pollAfterCompletion;
return this;
}

public Scheduler build() {
final TaskResolver taskResolver = new TaskResolver(knownTasks);
final JdbcTaskRepository taskRepository = new JdbcTaskRepository(dataSource, tableName, taskResolver, schedulerName, serializer);

return new Scheduler(clock, taskRepository, taskResolver, executorThreads, schedulerName, waiter, heartbeatInterval, enableImmediateExecution, statsRegistry, pollingLimit, startTasks);
return new Scheduler(clock, taskRepository, taskResolver, executorThreads, schedulerName, waiter, heartbeatInterval, enableImmediateExecution, pollAfterCompletion, statsRegistry, pollingLimit, startTasks);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ public class ManualScheduler extends Scheduler {
private static final Logger LOG = LoggerFactory.getLogger(ManualScheduler.class);
private final SettableClock clock;

ManualScheduler(SettableClock clock, TaskRepository taskRepository, TaskResolver taskResolver, int maxThreads, ExecutorService executorService, SchedulerName schedulerName, Waiter waiter, Duration heartbeatInterval, boolean executeImmediately, StatsRegistry statsRegistry, int pollingLimit, List<OnStartup> onStartup) {
super(clock, taskRepository, taskResolver, maxThreads, executorService, schedulerName, waiter, heartbeatInterval, executeImmediately, statsRegistry, pollingLimit, onStartup);
ManualScheduler(SettableClock clock, TaskRepository taskRepository, TaskResolver taskResolver, int maxThreads, ExecutorService executorService, SchedulerName schedulerName, Waiter waiter, Duration heartbeatInterval, boolean executeImmediately, boolean pollAfterCompletion, StatsRegistry statsRegistry, int pollingLimit, List<OnStartup> onStartup) {
super(clock, taskRepository, taskResolver, maxThreads, executorService, schedulerName, waiter, heartbeatInterval, executeImmediately, pollAfterCompletion, statsRegistry, pollingLimit, onStartup);
this.clock = clock;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public ManualScheduler build() {
final TaskResolver taskResolver = new TaskResolver(knownTasks);
final JdbcTaskRepository taskRepository = new JdbcTaskRepository(dataSource, tableName, taskResolver, schedulerName, serializer);

return new ManualScheduler(clock, taskRepository, taskResolver, executorThreads, new DirectExecutorService(), schedulerName, waiter, heartbeatInterval, enableImmediateExecution, statsRegistry, pollingLimit, startTasks);
return new ManualScheduler(clock, taskRepository, taskResolver, executorThreads, new DirectExecutorService(), schedulerName, waiter, heartbeatInterval, enableImmediateExecution, pollAfterCompletion, statsRegistry, pollingLimit, startTasks);
}

public ManualScheduler start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public void setUp() {
new Waiter(Duration.ZERO),
Duration.ofMinutes(1),
false,
false,
StatsRegistry.NOOP,
POLLING_LIMIT,
new ArrayList<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private Scheduler schedulerFor(Task<?> ... tasks) {
private Scheduler schedulerFor(ExecutorService executor, Task<?> ... tasks) {
TaskResolver taskResolver = new TaskResolver(tasks);
JdbcTaskRepository taskRepository = new JdbcTaskRepository(postgres.getDataSource(), DEFAULT_TABLE_NAME, taskResolver, new SchedulerName.Fixed("scheduler1"));
return new Scheduler(clock, taskRepository, taskResolver, 1, executor, new SchedulerName.Fixed("name"), new Waiter(Duration.ZERO), Duration.ofSeconds(1), false, StatsRegistry.NOOP, 10_000, new ArrayList<>());
return new Scheduler(clock, taskRepository, taskResolver, 1, executor, new SchedulerName.Fixed("name"), new Waiter(Duration.ZERO), Duration.ofSeconds(1), false, false, StatsRegistry.NOOP, 10_000, new ArrayList<>());
}

@Test
Expand Down

0 comments on commit 63c643d

Please sign in to comment.