Skip to content
This repository has been archived by the owner on Mar 31, 2023. It is now read-only.

Commit

Permalink
Minor improvements (#177)
Browse files Browse the repository at this point in the history
* Add task batch size in order to dynamically increase/reduce scheduling iteration time
* Name the fenzo threads
  • Loading branch information
corindwyer authored May 14, 2018
1 parent e462045 commit 5f49f98
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 2 deletions.
27 changes: 26 additions & 1 deletion fenzo-core/src/main/java/com/netflix/fenzo/TaskScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.netflix.fenzo;

import com.netflix.fenzo.common.ThreadFactoryBuilder;
import com.netflix.fenzo.plugins.NoOpScaleDownOrderEvaluator;
import com.netflix.fenzo.queues.Assignable;
import com.netflix.fenzo.queues.QueuableTask;
Expand All @@ -34,8 +35,10 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -103,6 +106,7 @@ public Boolean call(Double f) {
private boolean singleOfferMode=false;
private final List<SchedulingEventListener> schedulingEventListeners = new ArrayList<>();
private int maxConcurrent = Runtime.getRuntime().availableProcessors();
private Supplier<Long> taskBatchSizeSupplier = () -> Long.MAX_VALUE;

/**
* (Required) Call this method to establish a method that your task scheduler will call to notify you
Expand Down Expand Up @@ -442,6 +446,20 @@ public Builder withMaxConcurrent(int maxConcurrent) {
return this;
}

/**
* Use the given supplier to determine how many successful tasks should be evaluated in the next scheduling iteration. This
* can be used to dynamically change how many successful task evaluations are done in order to increase/reduce the scheduling iteration
* duration. The default supplier implementation will return {@link Long#MAX_VALUE} such that all tasks will be
* evaluated.
*
* @param taskBatchSizeSupplier the supplier that returns the task batch size for the next scheduling iteration.
* @return this same {@code Builder}, suitable for further chaining or to build the {@link TaskSchedulingService}.
*/
public Builder withTaskBatchSizeSupplier(Supplier<Long> taskBatchSizeSupplier) {
this.taskBatchSizeSupplier = taskBatchSizeSupplier;
return this;
}

/**
* Creates a {@link TaskScheduler} based on the various builder methods you have chained.
*
Expand Down Expand Up @@ -497,7 +515,8 @@ private TaskScheduler(Builder builder) {
throw new IllegalArgumentException("Lease reject action must be non-null");
this.builder = builder;
this.maxConcurrent = builder.maxConcurrent;
this.executorService = Executors.newFixedThreadPool(maxConcurrent);
ThreadFactory threadFactory = ThreadFactoryBuilder.newBuilder().withNameFormat("fenzo-worker-%d").build();
this.executorService = Executors.newFixedThreadPool(maxConcurrent, threadFactory);
this.stateMonitor = new StateMonitor();
this.schedulingEventListener = CompositeSchedulingEventListener.of(builder.schedulingEventListeners);
taskTracker = new TaskTracker();
Expand Down Expand Up @@ -795,6 +814,8 @@ private SchedulingResult doSchedule(
Set<TaskRequest> failedTasksForAutoScaler = new HashSet<>();
Map<String, VMAssignmentResult> resultMap = new HashMap<>(avms.size());
final SchedulingResult schedulingResult = new SchedulingResult(resultMap);
long taskBatchSize = builder.taskBatchSizeSupplier.get();
long tasksIterationCount = 0;
if(avms.isEmpty()) {
while (true) {
final Assignable<? extends TaskRequest> taskOrFailure = taskIterator.next();
Expand All @@ -806,6 +827,9 @@ private SchedulingResult doSchedule(
schedulingEventListener.onScheduleStart();
try {
while (true) {
if (tasksIterationCount >= taskBatchSize) {
break;
}
final Assignable<? extends TaskRequest> taskOrFailure = taskIterator.next();
if(logger.isDebugEnabled())
logger.debug("TaskSched: task=" + (taskOrFailure == null? "null" : taskOrFailure.getTask().getId()));
Expand Down Expand Up @@ -904,6 +928,7 @@ public EvalResult call() throws Exception {
logger.debug("Task {}: found successful assignment on host {}", task.getId(),
successfulResult.getHostname());
successfulResult.assignResult();
tasksIterationCount++;
failedTasksForAutoScaler.remove(task);
schedulingEventListener.onAssignment(successfulResult);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.netflix.fenzo;

import com.netflix.fenzo.common.ThreadFactoryBuilder;
import com.netflix.fenzo.functions.Action0;
import com.netflix.fenzo.functions.Action1;
import com.netflix.fenzo.functions.Func1;
Expand All @@ -39,6 +40,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -495,7 +497,8 @@ public final static class Builder {
private boolean optimizingShortfallEvaluator = false;

public Builder() {
executorService = new ScheduledThreadPoolExecutor(1);
ThreadFactory threadFactory = ThreadFactoryBuilder.newBuilder().withNameFormat("fenzo-main").build();
executorService = new ScheduledThreadPoolExecutor(1, threadFactory);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.netflix.fenzo.common;

import java.util.Locale;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;

/**
* A ThreadFactory builder based on <a href="https://github.com/google/guava">Guava's</a> ThreadFactoryBuilder.
*/
public final class ThreadFactoryBuilder {
private String nameFormat = null;
private Boolean daemon = null;

private ThreadFactoryBuilder() {
}

/**
* Creates a new {@link ThreadFactoryBuilder} builder.
*/
public static ThreadFactoryBuilder newBuilder() {
return new ThreadFactoryBuilder();
}

/**
* Sets the naming format to use when naming threads ({@link Thread#setName}) which are created
* with this ThreadFactory.
*
* @param nameFormat a {@link String#format(String, Object...)}-compatible format String, to which
* a unique integer (0, 1, etc.) will be supplied as the single parameter. This integer will
* be unique to the built instance of the ThreadFactory and will be assigned sequentially. For
* example, {@code "rpc-pool-%d"} will generate thread names like {@code "rpc-pool-0"},
* {@code "rpc-pool-1"}, {@code "rpc-pool-2"}, etc.
* @return this for the builder pattern
*/
public ThreadFactoryBuilder withNameFormat(String nameFormat) {
this.nameFormat = nameFormat;
return this;
}

/**
* Sets whether or not the created thread will be a daemon thread.
*
* @param daemon whether or not new Threads created with this ThreadFactory will be daemon threads
* @return this for the builder pattern
*/
public ThreadFactoryBuilder withDaemon(boolean daemon) {
this.daemon = daemon;
return this;
}

public ThreadFactory build() {
return build(this);
}

private static ThreadFactory build(ThreadFactoryBuilder builder) {
final String nameFormat = builder.nameFormat;
final Boolean daemon = builder.daemon;
final AtomicLong count = (nameFormat != null) ? new AtomicLong(0) : null;
return runnable -> {
Thread thread = new Thread(runnable);
if (nameFormat != null) {
thread.setName(format(nameFormat, count.getAndIncrement()));
}
if (daemon != null) {
thread.setDaemon(daemon);
}
return thread;
};
}

private static String format(String format, Object... args) {
return String.format(Locale.ROOT, format, args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -552,4 +552,29 @@ public Result evaluate(TaskRequest taskRequest, VirtualMachineCurrentState targe
Assert.assertNotNull(ref.get());
Assert.assertEquals(2, ref.get().size());
}

@Test
public void testTaskBatchSize() {
TaskScheduler taskScheduler = new TaskScheduler.Builder()
.withLeaseOfferExpirySecs(1000000)
.withLeaseRejectAction(virtualMachineLease -> System.out.println("Rejecting offer on host " + virtualMachineLease.hostname()))
.withTaskBatchSizeSupplier(() -> 2L)
.build();
List<VirtualMachineLease> leases = LeaseProvider.getLeases(1, 5, 50, 1, 10);
List<TaskRequest> taskRequests = new ArrayList<>();
taskRequests.add(TaskRequestProvider.getTaskRequest(1, 10, 0));
taskRequests.add(TaskRequestProvider.getTaskRequest(1, 10, 0));
taskRequests.add(TaskRequestProvider.getTaskRequest(1, 10, 0));
taskRequests.add(TaskRequestProvider.getTaskRequest(1, 10, 0));
taskRequests.add(TaskRequestProvider.getTaskRequest(1, 10, 0));
SchedulingResult schedulingResult = taskScheduler.scheduleOnce(taskRequests, leases);
Assert.assertEquals(2, schedulingResult.getResultMap().values().iterator().next().getTasksAssigned().size());
taskRequests = new ArrayList<>();
taskRequests.add(TaskRequestProvider.getTaskRequest(1, 10, 0));
schedulingResult = taskScheduler.scheduleOnce(taskRequests, leases);
Assert.assertEquals(1, schedulingResult.getResultMap().values().iterator().next().getTasksAssigned().size());
taskRequests = new ArrayList<>();
schedulingResult = taskScheduler.scheduleOnce(taskRequests, leases);
Assert.assertEquals(0, schedulingResult.getResultMap().size());
}
}

0 comments on commit 5f49f98

Please sign in to comment.