Skip to content

Commit

Permalink
fix: getScheduledExecutions(...) should be able to return executions …
Browse files Browse the repository at this point in the history
…with unresolved tasks (#418)

If the `SchedulerClient` is instantiated with no _known tasks_, it
currently does not find any executions since the client will not be able
to resolve the task and will therefore filter them out.

This PR **changes the behavior** of `getScheduledExecution()`,
`getScheduledExecutions()` to also return unresolved executions, and
additionally adds options in `ScheduledExecutionsFilter` to enable
controlling whether or not to include them.
  • Loading branch information
kagkarlsson authored Aug 25, 2023
1 parent 597e09e commit c43024e
Show file tree
Hide file tree
Showing 12 changed files with 224 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.github.kagkarlsson.scheduler;

import com.github.kagkarlsson.scheduler.exceptions.DataClassMismatchException;
import com.github.kagkarlsson.scheduler.exceptions.MissingRawDataException;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.TaskInstanceId;
import java.time.Instant;
Expand Down Expand Up @@ -47,6 +48,19 @@ public DATA_TYPE getData() {
throw new DataClassMismatchException(dataClass, data.getClass());
}

public boolean hasRawData() {
Object data = this.execution.taskInstance.getData();
return data == null || data.getClass().equals(byte[].class);
}

public byte[] getRawData() {
if (!hasRawData()) {
throw new MissingRawDataException(dataClass);
}

return (byte[]) this.execution.taskInstance.getData();
}

public Instant getLastSuccess() {
return execution.lastSuccess;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,33 @@
public class ScheduledExecutionsFilter {

private Boolean pickedValue;
private boolean includeUnresolved = false;

private ScheduledExecutionsFilter() {}

public static ScheduledExecutionsFilter all() {
return new ScheduledExecutionsFilter();
return new ScheduledExecutionsFilter().withIncludeUnresolved(true);
}

public static ScheduledExecutionsFilter onlyResolved() {
return new ScheduledExecutionsFilter().withIncludeUnresolved(false);
}

public ScheduledExecutionsFilter withPicked(boolean pickedValue) {
this.pickedValue = pickedValue;
return this;
}

public ScheduledExecutionsFilter withIncludeUnresolved(boolean includeUnresolved) {
this.includeUnresolved = includeUnresolved;
return this;
}

public Optional<Boolean> getPickedValue() {
return Optional.ofNullable(pickedValue);
}

public boolean getIncludeUnresolved() {
return includeUnresolved;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,15 @@ <T> void fetchScheduledExecutionsForTask(
ScheduledExecutionsFilter filter,
Consumer<ScheduledExecution<T>> consumer);

/**
* @see #fetchScheduledExecutionsForTask(String, Class, Consumer)
*/
default <T> List<ScheduledExecution<Object>> getScheduledExecutionsForTask(String taskName) {
List<ScheduledExecution<Object>> executions = new ArrayList<>();
fetchScheduledExecutionsForTask(taskName, Object.class, executions::add);
return executions;
}

/**
* @see #fetchScheduledExecutionsForTask(String, Class, Consumer)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,12 @@ public TaskResolver(StatsRegistry statsRegistry, Clock clock, List<Task<?>> know
}

public Optional<Task> resolve(String taskName) {
return resolve(taskName, true);
}

public Optional<Task> resolve(String taskName, boolean addUnresolvedToExclusionFilter) {
Task task = taskMap.get(taskName);
if (task == null) {
if (task == null && addUnresolvedToExclusionFilter) {
addUnresolved(taskName);
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNRESOLVED_TASK);
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ public class DataClassMismatchException extends DbSchedulerException {
public DataClassMismatchException(Class expectedClass, Class actualClass) {
super(
String.format(
"Task data mismatch. Expected class : %s, actual : %s", expectedClass, actualClass));
"Task data mismatch. If actual data-class is byte[], it might have been fetched without"
+ " knowledge of task-data types, and is thus not deserialized."
+ " Use getRawData() to get non-deserialized data in that case."
+ " Expected class : %s, actual : %s",
expectedClass, actualClass));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (C) Gustav Karlsson
*
* <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.kagkarlsson.scheduler.exceptions;

public class MissingRawDataException extends DbSchedulerException {
private static final long serialVersionUID = 1L;

public MissingRawDataException(Class<?> dataClass) {
super(
String.format(
"Scheduled execution has typed data, use getData() to read the deserialized object. Data-class : %s",
dataClass));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@
import com.github.kagkarlsson.jdbc.JdbcRunner;
import com.github.kagkarlsson.jdbc.ResultSetMapper;
import com.github.kagkarlsson.jdbc.SQLRuntimeException;
import com.github.kagkarlsson.scheduler.Clock;
import com.github.kagkarlsson.scheduler.ScheduledExecutionsFilter;
import com.github.kagkarlsson.scheduler.SchedulerName;
import com.github.kagkarlsson.scheduler.TaskRepository;
import com.github.kagkarlsson.scheduler.TaskResolver;
import com.github.kagkarlsson.scheduler.*;
import com.github.kagkarlsson.scheduler.TaskResolver.UnresolvedTask;
import com.github.kagkarlsson.scheduler.exceptions.ExecutionException;
import com.github.kagkarlsson.scheduler.exceptions.TaskInstanceException;
Expand Down Expand Up @@ -244,22 +240,31 @@ public void getScheduledExecutions(
UnresolvedFilter unresolvedFilter = new UnresolvedFilter(taskResolver.getUnresolved());

QueryBuilder q = queryForFilter(filter);
if (unresolvedFilter.isActive()) {
if (unresolvedFilter.isActive() && !filter.getIncludeUnresolved()) {
q.andCondition(unresolvedFilter);
}

jdbcRunner.query(
q.getQuery(), q.getPreparedStatementSetter(), new ExecutionResultSetConsumer(consumer));
q.getQuery(),
q.getPreparedStatementSetter(),
new ExecutionResultSetConsumer(consumer, filter.getIncludeUnresolved(), false));
}

@Override
public void getScheduledExecutions(
ScheduledExecutionsFilter filter, String taskName, Consumer<Execution> consumer) {
UnresolvedFilter unresolvedFilter = new UnresolvedFilter(taskResolver.getUnresolved());

QueryBuilder q = queryForFilter(filter);
if (unresolvedFilter.isActive() && !filter.getIncludeUnresolved()) {
q.andCondition(unresolvedFilter);
}
q.andCondition(new TaskCondition(taskName));

jdbcRunner.query(
q.getQuery(), q.getPreparedStatementSetter(), new ExecutionResultSetConsumer(consumer));
q.getQuery(),
q.getPreparedStatementSetter(),
new ExecutionResultSetConsumer(consumer, filter.getIncludeUnresolved(), false));
}

@Override
Expand All @@ -285,7 +290,7 @@ public List<Execution> getDue(Instant now, int limit) {
p.setMaxRows(limit);
}
},
new ExecutionResultSetMapper());
new ExecutionResultSetMapper(false, true));
}

@Override
Expand Down Expand Up @@ -377,6 +382,7 @@ private boolean rescheduleInternal(
jdbcCustomization.setInstant(ps, index++, nextExecutionTime);
if (newData != null) {
// may cause datbase-specific problems, might have to use setNull instead
// FIXLATER: optionally support bypassing serializer if byte[] already
ps.setObject(index++, serializer.serialize(newData.data));
}
ps.setString(index++, execution.taskInstance.getTaskName());
Expand Down Expand Up @@ -449,7 +455,7 @@ public List<Execution> getDeadExecutions(Instant olderThan) {
jdbcCustomization.setInstant(p, index++, olderThan);
unresolvedFilter.setParameters(p, index);
},
new ExecutionResultSetMapper());
new ExecutionResultSetMapper(false, true));
}

@Override
Expand Down Expand Up @@ -497,7 +503,7 @@ public List<Execution> getExecutionsFailingLongerThan(Duration interval) {
jdbcCustomization.setInstant(p, index++, Instant.now().minus(interval));
unresolvedFilter.setParameters(p, index);
},
new ExecutionResultSetMapper());
new ExecutionResultSetMapper(false, false));
}

public Optional<Execution> getExecution(TaskInstance taskInstance) {
Expand All @@ -512,7 +518,7 @@ public Optional<Execution> getExecution(String taskName, String taskInstanceId)
p.setString(1, taskName);
p.setString(2, taskInstanceId);
},
new ExecutionResultSetMapper());
new ExecutionResultSetMapper(true, false));
if (executions.size() > 1) {
throw new TaskInstanceException(
"Found more than one matching execution for task name/id combination.",
Expand Down Expand Up @@ -544,7 +550,11 @@ public void checkSupportsLockAndFetch() {

private JdbcTaskRepositoryContext getTaskRespositoryContext() {
return new JdbcTaskRepositoryContext(
taskResolver, tableName, schedulerSchedulerName, jdbcRunner, ExecutionResultSetMapper::new);
taskResolver,
tableName,
schedulerSchedulerName,
jdbcRunner,
() -> new ExecutionResultSetMapper(false, true));
}

private QueryBuilder queryForFilter(ScheduledExecutionsFilter filter) {
Expand All @@ -567,9 +577,12 @@ private class ExecutionResultSetMapper implements ResultSetMapper<List<Execution

private final ExecutionResultSetConsumer delegate;

private ExecutionResultSetMapper() {
private ExecutionResultSetMapper(
boolean includeUnresolved, boolean addUnresolvedToExclusionFilter) {
this.executions = new ArrayList<>();
this.delegate = new ExecutionResultSetConsumer(executions::add);
this.delegate =
new ExecutionResultSetConsumer(
executions::add, includeUnresolved, addUnresolvedToExclusionFilter);
}

@Override
Expand All @@ -583,22 +596,37 @@ public List<Execution> map(ResultSet resultSet) throws SQLException {
private class ExecutionResultSetConsumer implements ResultSetMapper<Void> {

private final Consumer<Execution> consumer;
private final boolean includeUnresolved;
private boolean addUnresolvedToExclusionFilter;

private ExecutionResultSetConsumer(Consumer<Execution> consumer) {
this(consumer, false, true);
}

private ExecutionResultSetConsumer(
Consumer<Execution> consumer,
boolean includeUnresolved,
boolean addUnresolvedToExclusionFilter) {
this.consumer = consumer;
this.includeUnresolved = includeUnresolved;
this.addUnresolvedToExclusionFilter = addUnresolvedToExclusionFilter;
}

@Override
public Void map(ResultSet rs) throws SQLException {

while (rs.next()) {
String taskName = rs.getString("task_name");
Optional<Task> task = taskResolver.resolve(taskName);

if (!task.isPresent()) {
LOG.warn(
"Failed to find implementation for task with name '{}'. Execution will be excluded from due. Either delete the execution from the database, or add an implementation for it. The scheduler may be configured to automatically delete unresolved tasks after a certain period of time.",
taskName);
Optional<Task> task = taskResolver.resolve(taskName, addUnresolvedToExclusionFilter);

if (!task.isPresent() && !includeUnresolved) {
if (addUnresolvedToExclusionFilter) {
LOG.warn(
"Failed to find implementation for task with name '{}'. Execution will be excluded from due. "
+ "The scheduler normally delete unresolved tasks after 14d. To handle manually, "
+ "either delete the execution from the database, or add an implementation for it. ",
taskName);
}
continue;
}

Expand All @@ -618,7 +646,15 @@ public Void map(ResultSet rs) throws SQLException {
long version = rs.getLong("version");

Supplier dataSupplier =
memoize(() -> serializer.deserialize(task.get().getDataClass(), data));
memoize(
() -> {
if (!task.isPresent()) {
// return the data raw if the type is not known
// a case for standalone clients, with no "known tasks"
return data;
}
return serializer.deserialize(task.get().getDataClass(), data);
});
this.consumer.accept(
new Execution(
executionTime,
Expand All @@ -638,7 +674,6 @@ public Void map(ResultSet rs) throws SQLException {

private static <T> Supplier<T> memoize(Supplier<T> original) {
return new Supplier<T>() {
Supplier<T> delegate = this::firstTime;
boolean initialized;

public T get() {
Expand All @@ -653,6 +688,8 @@ private synchronized T firstTime() {
}
return delegate.get();
}

Supplier<T> delegate = this::firstTime;
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.github.kagkarlsson.scheduler;

import static com.github.kagkarlsson.scheduler.ScheduledExecutionsFilter.all;
import static com.github.kagkarlsson.scheduler.ScheduledExecutionsFilter.onlyResolved;
import static com.github.kagkarlsson.scheduler.jdbc.JdbcTaskRepository.DEFAULT_TABLE_NAME;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
Expand Down Expand Up @@ -402,15 +403,19 @@ public void get_dead_executions_should_not_include_previously_unresolved() {
@Test
public void get_scheduled_executions_should_work_with_unresolved() {
Instant now = TimeHelper.truncatedInstantNow();
String taskName = "unresolved1";
final OneTimeTask<Void> unresolved1 =
TestTasks.oneTime("unresolved1", Void.class, TestTasks.DO_NOTHING);
TestTasks.oneTime(taskName, Void.class, TestTasks.DO_NOTHING);
taskRepository.createIfNotExists(
new SchedulableTaskInstance<>(unresolved1.instance("id"), now));
assertThat(taskRepository.getDue(now, POLLING_LIMIT), hasSize(0));
assertThat(taskResolver.getUnresolved(), hasSize(1));

taskRepository.getScheduledExecutions(ScheduledExecutionsFilter.all(), e -> {});
taskRepository.getScheduledExecutions(ScheduledExecutionsFilter.all(), "sometask", e -> {});
assertThat(getScheduledExecutions(ScheduledExecutionsFilter.onlyResolved()), hasSize(0));
assertThat(
getScheduledExecutions(ScheduledExecutionsFilter.onlyResolved(), taskName), hasSize(0));
assertThat(getScheduledExecutions(all()), hasSize(1));
assertThat(getScheduledExecutions(all(), taskName), hasSize(1));
}

@Test
Expand Down Expand Up @@ -459,7 +464,7 @@ private Execution getSingleDueExecution() {

private Execution getSingleExecution() {
List<Execution> executions = new ArrayList<>();
taskRepository.getScheduledExecutions(all().withPicked(false), executions::add);
taskRepository.getScheduledExecutions(onlyResolved().withPicked(false), executions::add);
return executions.get(0);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.github.kagkarlsson.scheduler;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -8,6 +9,7 @@
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.helper.OneTimeTask;
import java.time.Instant;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.Test;

public class ScheduledExecutionTest {
Expand Down Expand Up @@ -59,8 +61,7 @@ public void test_data_class_type_not_equals() {
.getData(); // Instantiate with incorrect type
});

assertEquals(
"Task data mismatch. Expected class : class java.lang.String, actual : class java.lang.Integer",
dataClassMismatchException.getMessage());
assertThat(
dataClassMismatchException.getMessage(), CoreMatchers.containsString("Task data mismatch"));
}
}
Loading

0 comments on commit c43024e

Please sign in to comment.