Skip to content

Commit

Permalink
Completing refactor introducing instance-builder on TaskDescriptor. D…
Browse files Browse the repository at this point in the history
…eprecating TaskWithDataDescriptor and TaskWithoutDataDescriptor. Updated examples.
  • Loading branch information
kagkarlsson committed Oct 11, 2024
1 parent 241fbba commit 9d9c30f
Show file tree
Hide file tree
Showing 28 changed files with 198 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@ static TaskDescriptor<Void> of(String name) {
}

static <T> TaskDescriptor<T> of(String name, Class<T> dataClass) {
return new TaskDescriptor.SimpleTaskDescriptor<T>(name, dataClass);
return new TaskDescriptor.SimpleTaskDescriptor<>(name, dataClass);
}

default SchedulableInstance.Builder<T> instanceWithId(String id) {
return new SchedulableInstance.Builder<>(getTaskName(), id);
default TaskInstance.Builder<T> instance(String id) {
return new TaskInstance.Builder<>(getTaskName(), id);
}

default TaskInstanceId instanceId(String id) {
return TaskInstanceId.of(getTaskName(), id);
}

class SimpleTaskDescriptor<T> implements TaskDescriptor<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
*/
package com.github.kagkarlsson.scheduler.task;

import com.github.kagkarlsson.scheduler.task.helper.ScheduleAndData;
import java.time.Instant;
import java.util.Objects;
import java.util.function.Supplier;

public final class TaskInstance<T> implements TaskInstanceId {
public class TaskInstance<T> implements TaskInstanceId {

private final String taskName;
private final String id;
Expand Down Expand Up @@ -110,5 +112,24 @@ public Builder<T> priority(int priority) {
public TaskInstance<T> build() {
return new TaskInstance<>(taskName, id, dataSupplier, priority);
}

public SchedulableInstance<T> scheduledTo(Instant executionTime) {
TaskInstance<T> taskInstance = new TaskInstance<>(taskName, id, dataSupplier, priority);
return new SchedulableTaskInstance<>(taskInstance, executionTime);
}

public SchedulableInstance<T> scheduledAccordingToData() {
TaskInstance<T> taskInstance = new TaskInstance<>(taskName, id, dataSupplier, priority);
T data = dataSupplier.get();
if (!(data instanceof ScheduleAndData)) {
throw new RuntimeException("To be able to use method 'scheduledAccordingToData()', dataClass must implement ScheduleAndData interface and contain a Schedule");
}

ScheduleAndData scheduleAndData = (ScheduleAndData) data;

return new SchedulableTaskInstance<>(taskInstance, scheduleAndData.getSchedule()::getInitialExecutionTime);
}


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
*/
package com.github.kagkarlsson.scheduler.task;

/** Experimental */
/**
* @deprecated use {@link TaskDescriptor} directly instead.
*/
@Deprecated
public class TaskWithDataDescriptor<T> implements TaskDescriptor<T> {

private final String taskName;
Expand All @@ -24,10 +27,6 @@ public TaskWithDataDescriptor(String taskName, Class<T> dataClass) { // TODO: no
this.dataClass = dataClass;
}

public TaskInstance<T> instance(String id, T data) {
return new TaskInstance<>(taskName, id, data);
}

@Override
public String getTaskName() {
return taskName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
*/
package com.github.kagkarlsson.scheduler.task;

/** Experimental */
/**
* @deprecated use {@link TaskDescriptor} directly instead.
*/
@Deprecated
public class TaskWithoutDataDescriptor implements TaskDescriptor<Void> {

private final String taskName;
Expand All @@ -22,10 +25,6 @@ public TaskWithoutDataDescriptor(String taskName) {
this.taskName = taskName;
}

public TaskInstance<Void> instance(String id) {
return new TaskInstance<>(taskName, id);
}

@Override
public String getTaskName() {
return taskName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.github.kagkarlsson.scheduler.task.schedule.FixedDelay;
import java.time.Duration;
import java.time.Instant;
import org.junit.jupiter.api.Assertions.*;
import org.junit.jupiter.api.Test;

public class ExecutionTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;

import co.unruly.matchers.TimeMatchers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.github.kagkarlsson.examples.helpers.Example;
import com.github.kagkarlsson.scheduler.Scheduler;
import com.github.kagkarlsson.scheduler.task.TaskDescriptor;
import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask;
import com.github.kagkarlsson.scheduler.task.helper.Tasks;
import java.time.Duration;
Expand All @@ -27,10 +28,13 @@ public static void main(String[] args) {
new CheckForNewBatchDirectlyMain().runWithDatasource();
}

public static final TaskDescriptor<Void> MY_TASK = TaskDescriptor.of("my_task");

@Override
public void run(DataSource dataSource) {

OneTimeTask<Void> onetimeTask =
Tasks.oneTime("my_task")
Tasks.oneTime(MY_TASK)
.execute(
(taskInstance, executionContext) -> {
System.out.println("Executed!");
Expand All @@ -48,7 +52,7 @@ public void run(DataSource dataSource) {
sleep(2);
System.out.println("Scheduling 100 task-instances.");
for (int i = 0; i < 100; i++) {
scheduler.schedule(onetimeTask.instance(String.valueOf(i)), Instant.now());
scheduler.schedule(MY_TASK.instance(String.valueOf(i)).scheduledTo(Instant.now()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.github.kagkarlsson.examples.helpers.Example;
import com.github.kagkarlsson.scheduler.Scheduler;
import com.github.kagkarlsson.scheduler.SchedulerClient;
import com.github.kagkarlsson.scheduler.task.TaskDescriptor;
import com.github.kagkarlsson.scheduler.task.helper.RecurringTask;
import com.github.kagkarlsson.scheduler.task.helper.Tasks;
import com.github.kagkarlsson.scheduler.task.schedule.Schedules;
Expand All @@ -26,29 +27,35 @@

public class DeletingUnresolvedTasksMain extends Example {

public static final TaskDescriptor<Void> UNRESOLVED_TASK_1 = TaskDescriptor.of("unresolved1");
public static final TaskDescriptor<Void> UNRESOLVED_TASK_2 = TaskDescriptor.of("unresolved2");

public static void main(String[] args) {
new DeletingUnresolvedTasksMain().runWithDatasource();
}

@Override
public void run(DataSource dataSource) {
RecurringTask<Void> unresolvedTask =
Tasks.recurring("unresolved1", Schedules.fixedDelay(Duration.ofSeconds(1)))
Tasks.recurring(UNRESOLVED_TASK_1, Schedules.fixedDelay(Duration.ofSeconds(1)))
.execute(
(taskInstance, executionContext) -> {
System.out.println("Ran");
});
RecurringTask<Void> unresolvedTask2 =
Tasks.recurring("unresolved2", Schedules.fixedDelay(Duration.ofSeconds(1)))
Tasks.recurring(UNRESOLVED_TASK_2, Schedules.fixedDelay(Duration.ofSeconds(1)))
.execute(
(taskInstance, executionContext) -> {
System.out.println("Ran");
});

SchedulerClient client = SchedulerClient.Builder.create(dataSource).build();
client.schedule(unresolvedTask.instance(RecurringTask.INSTANCE), Instant.now());
client.schedule(
unresolvedTask2.instance(RecurringTask.INSTANCE), Instant.now().plusSeconds(10));
client.scheduleIfNotExists(
UNRESOLVED_TASK_1.instance(RecurringTask.INSTANCE).scheduledTo(Instant.now()));
client.scheduleIfNotExists(
UNRESOLVED_TASK_2
.instance(RecurringTask.INSTANCE)
.scheduledTo(Instant.now().plusSeconds(10)));

final Scheduler scheduler =
Scheduler.create(dataSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import com.github.kagkarlsson.examples.helpers.Example;
import com.github.kagkarlsson.scheduler.Scheduler;
import com.github.kagkarlsson.scheduler.task.TaskWithoutDataDescriptor;
import com.github.kagkarlsson.scheduler.task.TaskDescriptor;
import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask;
import com.github.kagkarlsson.scheduler.task.helper.Tasks;
import java.time.Duration;
Expand All @@ -28,11 +28,11 @@ public static void main(String[] args) {
new EnableImmediateExecutionMain().runWithDatasource();
}

public static final TaskDescriptor<Void> DESCRIPTOR = TaskDescriptor.of("my_task");

@Override
public void run(DataSource dataSource) {

TaskWithoutDataDescriptor DESCRIPTOR = new TaskWithoutDataDescriptor("my_task");

OneTimeTask<Void> onetimeTask =
Tasks.oneTime(DESCRIPTOR)
.execute(
Expand All @@ -51,7 +51,7 @@ public void run(DataSource dataSource) {

sleep(2000);
System.out.println("Scheduling task to executed immediately.");
scheduler.schedule(DESCRIPTOR.instanceWithId("1").scheduledTo(Instant.now()));
scheduler.schedule(DESCRIPTOR.instance("1").scheduledTo(Instant.now()));

// scheduler.triggerCheckForDueExecutions();
// another option for triggering execution directly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ public static void main(String[] args) {
new ExponentialBackoffMain().runWithDatasource();
}

public static final TaskDescriptor<Void> MY_TASK = TaskDescriptor.of("exponential_backoff_task", Void.class);

@Override
public void run(DataSource dataSource) {
TaskDescriptor<Void> TASK = TaskDescriptor.of("exponential_backoff_task", Void.class);

OneTimeTask<Void> failingTask =
Tasks.oneTime(TASK)
Tasks.oneTime(MY_TASK)
.onFailure(new FailureHandler.ExponentialBackoffFailureHandler<>(ofSeconds(1)))
.execute(
(taskInstance, executionContext) -> {
Expand All @@ -48,7 +49,7 @@ public void run(DataSource dataSource) {
.registerShutdownHook()
.build();

scheduler.schedule(TASK.instanceWithId("1").scheduledTo(Instant.now()));
scheduler.schedule(MY_TASK.instance("1").scheduledTo(Instant.now()));

scheduler.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.github.kagkarlsson.examples.helpers.Example;
import com.github.kagkarlsson.scheduler.Scheduler;
import com.github.kagkarlsson.scheduler.task.FailureHandler;
import com.github.kagkarlsson.scheduler.task.TaskDescriptor;
import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask;
import com.github.kagkarlsson.scheduler.task.helper.Tasks;
import java.time.Instant;
Expand All @@ -29,10 +30,12 @@ public static void main(String[] args) {
new ExponentialBackoffWithMaxRetriesMain().runWithDatasource();
}

public static final TaskDescriptor<Void> EXPONENTIAL_BACKOFF_TASK = TaskDescriptor.of("exponential_backoff_with_max_retries_task");

@Override
public void run(DataSource dataSource) {
OneTimeTask<Void> failingTask =
Tasks.oneTime("exponential_backoff_with_max_retries_task")
Tasks.oneTime(EXPONENTIAL_BACKOFF_TASK)
.onFailure(
new FailureHandler.MaxRetriesFailureHandler<>(
6, new FailureHandler.ExponentialBackoffFailureHandler<>(ofSeconds(1), 2)))
Expand All @@ -47,7 +50,7 @@ public void run(DataSource dataSource) {
.registerShutdownHook()
.build();

scheduler.schedule(failingTask.instance("1"), Instant.now());
scheduler.schedule(EXPONENTIAL_BACKOFF_TASK.instance("1").scheduledTo(Instant.now()));

scheduler.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.github.kagkarlsson.jdbc.JdbcRunner;
import com.github.kagkarlsson.scheduler.HeartbeatState;
import com.github.kagkarlsson.scheduler.Scheduler;
import com.github.kagkarlsson.scheduler.task.TaskDescriptor;
import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask;
import com.github.kagkarlsson.scheduler.task.helper.Tasks;
import java.time.Duration;
Expand All @@ -31,11 +32,13 @@ public static void main(String[] args) {
new HeartbeatMonitoringMain().runWithDatasource();
}

public static final TaskDescriptor<Void> WAIT_FOR_STALE_HEARTBEAT_TASK = TaskDescriptor.of("wait-for-stale-heartbeat-task");

@Override
public void run(DataSource dataSource) {

OneTimeTask<Void> waitForStaleHeartbeatTask =
Tasks.oneTime("wait-for-stale-heartbeat-task", Void.class)
Tasks.oneTime(WAIT_FOR_STALE_HEARTBEAT_TASK)
.execute(
(inst, ctx) -> {
System.out.println("Running!");
Expand All @@ -57,7 +60,7 @@ public void run(DataSource dataSource) {

scheduler.start();

scheduler.schedule(waitForStaleHeartbeatTask.instance("1045"), Instant.now());
scheduler.schedule(WAIT_FOR_STALE_HEARTBEAT_TASK.instance("1045").scheduledTo(Instant.now()));

sleep(4000);
JdbcRunner jdbcRunner = new JdbcRunner(dataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@

public class JobChainingUsingSeparateTasksMain extends Example {

public static final TaskDescriptor<JobId> STEP1_TASK = TaskDescriptor.of("job-step-1", JobId.class);
public static final TaskDescriptor<JobId> STEP2_TASK = TaskDescriptor.of("job-step-2", JobId.class);

public static void main(String[] args) {
new JobChainingUsingSeparateTasksMain().runWithDatasource();
}
Expand All @@ -33,15 +36,15 @@ public static void main(String[] args) {
public void run(DataSource dataSource) {

final CustomTask<JobId> jobStep1 =
Tasks.custom("job-step-1", JobId.class)
Tasks.custom(STEP1_TASK)
.execute(
(taskInstance, executionContext) -> {
System.out.println("Step1 ran. Job: " + taskInstance.getData());
return new OnCompleteRemoveAndCreateNextStep("job-step-2");
return new OnCompleteRemoveAndCreateNextStep(STEP2_TASK.getTaskName());
});

final CustomTask<JobId> jobStep2 =
Tasks.custom("job-step-2", JobId.class)
Tasks.custom(STEP2_TASK)
.execute(
(taskInstance, executionContext) -> {
System.out.println(
Expand All @@ -60,13 +63,13 @@ public void run(DataSource dataSource) {
sleep(1_000);

// Schedule a multistep job. Simulate some instance-specific data, id=507
// both steps will run directly
scheduler.schedule(
jobStep1.instance("job-507", new JobId(507)),
Instant.now()); // both steps will run directly
STEP1_TASK.instance("job-507").data(new JobId(507)).scheduledTo(Instant.now()));
}

public static class JobId implements Serializable {
private static long serialVersionUID = 1L;
private static final long serialVersionUID = 1L;
public int id;

public JobId(int id) {
Expand All @@ -79,7 +82,8 @@ public String toString() {
}
}

class OnCompleteRemoveAndCreateNextStep implements CompletionHandler<JobId> {
@SuppressWarnings("rawtypes")
static class OnCompleteRemoveAndCreateNextStep implements CompletionHandler<JobId> {
private final String newTaskName;

public OnCompleteRemoveAndCreateNextStep(String newTaskName) {
Expand Down
Loading

0 comments on commit 9d9c30f

Please sign in to comment.