Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring smells #564

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,39 @@ public void oneExecutionDone(Runnable triggerCheckForNewBatch) {
triggeredExecuteDue,
possiblyMoreExecutionsInDb,
executionsLeftInBatch.get());
// Will not synchronize this method as it is not a big problem if two threads manage to call
// Will not synchronize this method as it is not a big problem if two threads manage to call
// triggerCheckForNewBatch.run() at the same time.
// There is synchronization further in, when waking the thread that will do the fetching.
if (!stale
&& !triggeredExecuteDue
&& possiblyMoreExecutionsInDb
&& whenToTriggerCheckForNewBatch.test(executionsLeftInBatch.get())) {
if (isBatchValidForTrigger()) {
LOG.trace("Triggering check for new batch.");
triggerCheckForNewBatch.run();
triggeredExecuteDue = true;
}
}

private boolean isBatchValidForTrigger() {
return isNotStale()
&& isNotAlreadyTriggered()
&& hasPossibleExecutions()
&& isTriggerConditionMet();
}

private boolean isNotStale() {
return !stale;
}

private boolean isNotAlreadyTriggered() {
return !triggeredExecuteDue;
}

private boolean hasPossibleExecutions() {
return possiblyMoreExecutionsInDb;
}

private boolean isTriggerConditionMet() {
return whenToTriggerCheckForNewBatch.test(executionsLeftInBatch.get());
}

public boolean isOlderGenerationThan(int compareTo) {
return generationNumber < compareTo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class Scheduler implements SchedulerClient {
public static final double TRIGGER_NEXT_BATCH_WHEN_AVAILABLE_THREADS_RATIO = 0.5;
public static final String THREAD_PREFIX = "db-scheduler";
private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
private final SchedulerClient delegate;
private final SchedulerClient schedulerClientDelegate;
final Clock clock;
final TaskRepository schedulerTaskRepository;
final TaskResolver taskResolver;
Expand Down Expand Up @@ -110,7 +110,8 @@ protected Scheduler(
this.schedulerListeners = new SchedulerListeners(schedulerListeners);
this.dueExecutor = dueExecutor;
this.housekeeperExecutor = housekeeperExecutor;
delegate = new StandardSchedulerClient(clientTaskRepository, this.schedulerListeners, clock);
schedulerClientDelegate =
new StandardSchedulerClient(clientTaskRepository, this.schedulerListeners, clock);
this.failureLogger = ConfigurableLogger.create(LOG, logLevel, logStackTrace);

if (pollingStrategyConfig.type == PollingStrategyConfig.Type.LOCK_AND_FETCH) {
Expand Down Expand Up @@ -251,59 +252,59 @@ public SchedulerState getSchedulerState() {

@Override
public <T> void schedule(SchedulableInstance<T> schedulableInstance) {
this.delegate.schedule(schedulableInstance);
this.schedulerClientDelegate.schedule(schedulableInstance);
}

@Override
public <T> boolean scheduleIfNotExists(TaskInstance<T> taskInstance, Instant executionTime) {
return this.delegate.scheduleIfNotExists(taskInstance, executionTime);
return this.schedulerClientDelegate.scheduleIfNotExists(taskInstance, executionTime);
}

@Override
public <T> boolean scheduleIfNotExists(SchedulableInstance<T> schedulableInstance) {
return this.delegate.scheduleIfNotExists(schedulableInstance);
return this.schedulerClientDelegate.scheduleIfNotExists(schedulableInstance);
}

@Override
public <T> void schedule(TaskInstance<T> taskInstance, Instant executionTime) {
this.delegate.schedule(taskInstance, executionTime);
this.schedulerClientDelegate.schedule(taskInstance, executionTime);
}

@Override
public void reschedule(TaskInstanceId taskInstanceId, Instant newExecutionTime) {
this.delegate.reschedule(taskInstanceId, newExecutionTime);
this.schedulerClientDelegate.reschedule(taskInstanceId, newExecutionTime);
}

@Override
public <T> void reschedule(SchedulableInstance<T> schedulableInstance) {
this.delegate.reschedule(schedulableInstance);
this.schedulerClientDelegate.reschedule(schedulableInstance);
}

@Override
public <T> void reschedule(TaskInstanceId taskInstanceId, Instant newExecutionTime, T newData) {
this.delegate.reschedule(taskInstanceId, newExecutionTime, newData);
this.schedulerClientDelegate.reschedule(taskInstanceId, newExecutionTime, newData);
}

@Override
public void cancel(TaskInstanceId taskInstanceId) {
this.delegate.cancel(taskInstanceId);
this.schedulerClientDelegate.cancel(taskInstanceId);
}

@Override
public void fetchScheduledExecutions(Consumer<ScheduledExecution<Object>> consumer) {
this.delegate.fetchScheduledExecutions(consumer);
this.schedulerClientDelegate.fetchScheduledExecutions(consumer);
}

@Override
public void fetchScheduledExecutions(
ScheduledExecutionsFilter filter, Consumer<ScheduledExecution<Object>> consumer) {
this.delegate.fetchScheduledExecutions(filter, consumer);
this.schedulerClientDelegate.fetchScheduledExecutions(filter, consumer);
}

@Override
public <T> void fetchScheduledExecutionsForTask(
String taskName, Class<T> dataClass, Consumer<ScheduledExecution<T>> consumer) {
this.delegate.fetchScheduledExecutionsForTask(taskName, dataClass, consumer);
this.schedulerClientDelegate.fetchScheduledExecutionsForTask(taskName, dataClass, consumer);
}

@Override
Expand All @@ -312,12 +313,13 @@ public <T> void fetchScheduledExecutionsForTask(
Class<T> dataClass,
ScheduledExecutionsFilter filter,
Consumer<ScheduledExecution<T>> consumer) {
this.delegate.fetchScheduledExecutionsForTask(taskName, dataClass, filter, consumer);
this.schedulerClientDelegate.fetchScheduledExecutionsForTask(
taskName, dataClass, filter, consumer);
}

@Override
public Optional<ScheduledExecution<Object>> getScheduledExecution(TaskInstanceId taskInstanceId) {
return this.delegate.getScheduledExecution(taskInstanceId);
return this.schedulerClientDelegate.getScheduledExecution(taskInstanceId);
}

public List<Execution> getFailingExecutions(Duration failingAtLeastFor) {
Expand Down Expand Up @@ -396,16 +398,25 @@ protected void detectDeadExecutions() {
}

void updateHeartbeats() {
final List<CurrentlyExecuting> currentlyProcessing = executor.getCurrentlyExecuting();
final List<CurrentlyExecuting> currentlyProcessing = getCurrentlyProcessingExecutions();
if (currentlyProcessing.isEmpty()) return;

updateAllHeartbeats(currentlyProcessing);
schedulerListeners.onSchedulerEvent(SchedulerEventType.RAN_UPDATE_HEARTBEATS);
}

private List<CurrentlyExecuting> getCurrentlyProcessingExecutions() {
List<CurrentlyExecuting> currentlyProcessing = executor.getCurrentlyExecuting();
if (currentlyProcessing.isEmpty()) {
LOG.trace("No executions to update heartbeats for. Skipping.");
return;
}
return currentlyProcessing;
}

private void updateAllHeartbeats(List<CurrentlyExecuting> currentlyProcessing) {
LOG.debug("Updating heartbeats for {} executions being processed.", currentlyProcessing.size());
Instant now = clock.now();
currentlyProcessing.forEach(execution -> updateHeartbeatForExecution(now, execution));
schedulerListeners.onSchedulerEvent(SchedulerEventType.RAN_UPDATE_HEARTBEATS);
}

protected void updateHeartbeatForExecution(Instant now, CurrentlyExecuting currentlyExecuting) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public Optional<Task> resolve(String taskName) {
public Optional<Task> resolve(String taskName, boolean addUnresolvedToExclusionFilter) {
Task task = taskMap.get(taskName);
if (task == null && addUnresolvedToExclusionFilter) {
addUnresolved(taskName);
new UnresolvedTask(taskName).addUnresolved();
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNRESOLVED_TASK);
LOG.info(
"Found execution with unknown task-name '{}'. Adding it to the list of known unresolved task-names.",
Expand All @@ -67,9 +67,9 @@ public Optional<Task> resolve(String taskName, boolean addUnresolvedToExclusionF
return Optional.ofNullable(task);
}

private void addUnresolved(String taskName) {
unresolvedTasks.putIfAbsent(taskName, new UnresolvedTask(taskName));
}
// private void addUnresolved(String taskName) {
// unresolvedTasks.putIfAbsent(taskName, new UnresolvedTask(taskName));
// }

public void addTask(Task task) {
taskMap.put(task.getName(), task);
Expand Down Expand Up @@ -105,5 +105,9 @@ public UnresolvedTask(String taskName) {
public String getTaskName() {
return taskName;
}

public void addUnresolved() {
TaskResolver.this.unresolvedTasks.putIfAbsent(taskName, this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@

public interface TaskDescriptor<T> extends HasTaskName {

String getTaskName();

Class<T> getDataClass();

static TaskDescriptor<Void> of(String name) {
return TaskDescriptor.of(name, Void.class);
}

static <T> TaskDescriptor<T> of(String name, Class<T> dataClass) {
return new TaskDescriptor.SimpleTaskDescriptor<>(name, dataClass);
return new SimpleTaskDescriptor<>(name, dataClass);
}

default TaskInstance.Builder<T> instance(String id) {
Expand All @@ -35,8 +33,7 @@ default TaskInstanceId instanceId(String id) {
return TaskInstanceId.of(getTaskName(), id);
}

class SimpleTaskDescriptor<T> implements TaskDescriptor<T> {

public class SimpleTaskDescriptor<T> implements TaskDescriptor<T> {
private final String name;
private final Class<T> dataClass;

Expand All @@ -47,12 +44,18 @@ public SimpleTaskDescriptor(String name, Class<T> dataClass) {

@Override
public String getTaskName() {
return name;
return name; // Consistent with HasTaskName's expectations
}

@Override
public Class<T> getDataClass() {
return dataClass;
}
}

public class VoidTaskDescriptor extends SimpleTaskDescriptor<Void> {
public VoidTaskDescriptor(String name) {
super(name, Void.class);
}
}
}