From 5f49f982a5180d65967fc4a496fb4c3a07ca2c91 Mon Sep 17 00:00:00 2001 From: Corin Dwyer Date: Mon, 14 May 2018 09:21:01 -0700 Subject: [PATCH] Minor improvements (#177) * Add task batch size in order to dynamically increase/reduce scheduling iteration time * Name the fenzo threads --- .../java/com/netflix/fenzo/TaskScheduler.java | 27 ++++++- .../netflix/fenzo/TaskSchedulingService.java | 5 +- .../fenzo/common/ThreadFactoryBuilder.java | 74 +++++++++++++++++++ .../netflix/fenzo/BasicSchedulerTests.java | 25 +++++++ 4 files changed, 129 insertions(+), 2 deletions(-) create mode 100644 fenzo-core/src/main/java/com/netflix/fenzo/common/ThreadFactoryBuilder.java diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/TaskScheduler.java b/fenzo-core/src/main/java/com/netflix/fenzo/TaskScheduler.java index 07df96a..3483558 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/TaskScheduler.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/TaskScheduler.java @@ -16,6 +16,7 @@ package com.netflix.fenzo; +import com.netflix.fenzo.common.ThreadFactoryBuilder; import com.netflix.fenzo.plugins.NoOpScaleDownOrderEvaluator; import com.netflix.fenzo.queues.Assignable; import com.netflix.fenzo.queues.QueuableTask; @@ -34,8 +35,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -103,6 +106,7 @@ public Boolean call(Double f) { private boolean singleOfferMode=false; private final List schedulingEventListeners = new ArrayList<>(); private int maxConcurrent = Runtime.getRuntime().availableProcessors(); + private Supplier taskBatchSizeSupplier = () -> Long.MAX_VALUE; /** * (Required) Call this method to establish a method that your task scheduler will call to notify you @@ -442,6 +446,20 @@ public Builder withMaxConcurrent(int maxConcurrent) { return this; } + /** + * Use the given supplier to determine how many successful tasks should be evaluated in the next scheduling iteration. This + * can be used to dynamically change how many successful task evaluations are done in order to increase/reduce the scheduling iteration + * duration. The default supplier implementation will return {@link Long#MAX_VALUE} such that all tasks will be + * evaluated. + * + * @param taskBatchSizeSupplier the supplier that returns the task batch size for the next scheduling iteration. + * @return this same {@code Builder}, suitable for further chaining or to build the {@link TaskSchedulingService}. + */ + public Builder withTaskBatchSizeSupplier(Supplier taskBatchSizeSupplier) { + this.taskBatchSizeSupplier = taskBatchSizeSupplier; + return this; + } + /** * Creates a {@link TaskScheduler} based on the various builder methods you have chained. * @@ -497,7 +515,8 @@ private TaskScheduler(Builder builder) { throw new IllegalArgumentException("Lease reject action must be non-null"); this.builder = builder; this.maxConcurrent = builder.maxConcurrent; - this.executorService = Executors.newFixedThreadPool(maxConcurrent); + ThreadFactory threadFactory = ThreadFactoryBuilder.newBuilder().withNameFormat("fenzo-worker-%d").build(); + this.executorService = Executors.newFixedThreadPool(maxConcurrent, threadFactory); this.stateMonitor = new StateMonitor(); this.schedulingEventListener = CompositeSchedulingEventListener.of(builder.schedulingEventListeners); taskTracker = new TaskTracker(); @@ -795,6 +814,8 @@ private SchedulingResult doSchedule( Set failedTasksForAutoScaler = new HashSet<>(); Map resultMap = new HashMap<>(avms.size()); final SchedulingResult schedulingResult = new SchedulingResult(resultMap); + long taskBatchSize = builder.taskBatchSizeSupplier.get(); + long tasksIterationCount = 0; if(avms.isEmpty()) { while (true) { final Assignable taskOrFailure = taskIterator.next(); @@ -806,6 +827,9 @@ private SchedulingResult doSchedule( schedulingEventListener.onScheduleStart(); try { while (true) { + if (tasksIterationCount >= taskBatchSize) { + break; + } final Assignable taskOrFailure = taskIterator.next(); if(logger.isDebugEnabled()) logger.debug("TaskSched: task=" + (taskOrFailure == null? "null" : taskOrFailure.getTask().getId())); @@ -904,6 +928,7 @@ public EvalResult call() throws Exception { logger.debug("Task {}: found successful assignment on host {}", task.getId(), successfulResult.getHostname()); successfulResult.assignResult(); + tasksIterationCount++; failedTasksForAutoScaler.remove(task); schedulingEventListener.onAssignment(successfulResult); } diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/TaskSchedulingService.java b/fenzo-core/src/main/java/com/netflix/fenzo/TaskSchedulingService.java index edadb34..1f9c21f 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/TaskSchedulingService.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/TaskSchedulingService.java @@ -16,6 +16,7 @@ package com.netflix.fenzo; +import com.netflix.fenzo.common.ThreadFactoryBuilder; import com.netflix.fenzo.functions.Action0; import com.netflix.fenzo.functions.Action1; import com.netflix.fenzo.functions.Func1; @@ -39,6 +40,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -495,7 +497,8 @@ public final static class Builder { private boolean optimizingShortfallEvaluator = false; public Builder() { - executorService = new ScheduledThreadPoolExecutor(1); + ThreadFactory threadFactory = ThreadFactoryBuilder.newBuilder().withNameFormat("fenzo-main").build(); + executorService = new ScheduledThreadPoolExecutor(1, threadFactory); } /** diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/common/ThreadFactoryBuilder.java b/fenzo-core/src/main/java/com/netflix/fenzo/common/ThreadFactoryBuilder.java new file mode 100644 index 0000000..7a62e59 --- /dev/null +++ b/fenzo-core/src/main/java/com/netflix/fenzo/common/ThreadFactoryBuilder.java @@ -0,0 +1,74 @@ +package com.netflix.fenzo.common; + +import java.util.Locale; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A ThreadFactory builder based on Guava's ThreadFactoryBuilder. + */ +public final class ThreadFactoryBuilder { + private String nameFormat = null; + private Boolean daemon = null; + + private ThreadFactoryBuilder() { + } + + /** + * Creates a new {@link ThreadFactoryBuilder} builder. + */ + public static ThreadFactoryBuilder newBuilder() { + return new ThreadFactoryBuilder(); + } + + /** + * Sets the naming format to use when naming threads ({@link Thread#setName}) which are created + * with this ThreadFactory. + * + * @param nameFormat a {@link String#format(String, Object...)}-compatible format String, to which + * a unique integer (0, 1, etc.) will be supplied as the single parameter. This integer will + * be unique to the built instance of the ThreadFactory and will be assigned sequentially. For + * example, {@code "rpc-pool-%d"} will generate thread names like {@code "rpc-pool-0"}, + * {@code "rpc-pool-1"}, {@code "rpc-pool-2"}, etc. + * @return this for the builder pattern + */ + public ThreadFactoryBuilder withNameFormat(String nameFormat) { + this.nameFormat = nameFormat; + return this; + } + + /** + * Sets whether or not the created thread will be a daemon thread. + * + * @param daemon whether or not new Threads created with this ThreadFactory will be daemon threads + * @return this for the builder pattern + */ + public ThreadFactoryBuilder withDaemon(boolean daemon) { + this.daemon = daemon; + return this; + } + + public ThreadFactory build() { + return build(this); + } + + private static ThreadFactory build(ThreadFactoryBuilder builder) { + final String nameFormat = builder.nameFormat; + final Boolean daemon = builder.daemon; + final AtomicLong count = (nameFormat != null) ? new AtomicLong(0) : null; + return runnable -> { + Thread thread = new Thread(runnable); + if (nameFormat != null) { + thread.setName(format(nameFormat, count.getAndIncrement())); + } + if (daemon != null) { + thread.setDaemon(daemon); + } + return thread; + }; + } + + private static String format(String format, Object... args) { + return String.format(Locale.ROOT, format, args); + } +} \ No newline at end of file diff --git a/fenzo-core/src/test/java/com/netflix/fenzo/BasicSchedulerTests.java b/fenzo-core/src/test/java/com/netflix/fenzo/BasicSchedulerTests.java index ac26be9..9bac0e0 100644 --- a/fenzo-core/src/test/java/com/netflix/fenzo/BasicSchedulerTests.java +++ b/fenzo-core/src/test/java/com/netflix/fenzo/BasicSchedulerTests.java @@ -552,4 +552,29 @@ public Result evaluate(TaskRequest taskRequest, VirtualMachineCurrentState targe Assert.assertNotNull(ref.get()); Assert.assertEquals(2, ref.get().size()); } + + @Test + public void testTaskBatchSize() { + TaskScheduler taskScheduler = new TaskScheduler.Builder() + .withLeaseOfferExpirySecs(1000000) + .withLeaseRejectAction(virtualMachineLease -> System.out.println("Rejecting offer on host " + virtualMachineLease.hostname())) + .withTaskBatchSizeSupplier(() -> 2L) + .build(); + List leases = LeaseProvider.getLeases(1, 5, 50, 1, 10); + List taskRequests = new ArrayList<>(); + taskRequests.add(TaskRequestProvider.getTaskRequest(1, 10, 0)); + taskRequests.add(TaskRequestProvider.getTaskRequest(1, 10, 0)); + taskRequests.add(TaskRequestProvider.getTaskRequest(1, 10, 0)); + taskRequests.add(TaskRequestProvider.getTaskRequest(1, 10, 0)); + taskRequests.add(TaskRequestProvider.getTaskRequest(1, 10, 0)); + SchedulingResult schedulingResult = taskScheduler.scheduleOnce(taskRequests, leases); + Assert.assertEquals(2, schedulingResult.getResultMap().values().iterator().next().getTasksAssigned().size()); + taskRequests = new ArrayList<>(); + taskRequests.add(TaskRequestProvider.getTaskRequest(1, 10, 0)); + schedulingResult = taskScheduler.scheduleOnce(taskRequests, leases); + Assert.assertEquals(1, schedulingResult.getResultMap().values().iterator().next().getTasksAssigned().size()); + taskRequests = new ArrayList<>(); + schedulingResult = taskScheduler.scheduleOnce(taskRequests, leases); + Assert.assertEquals(0, schedulingResult.getResultMap().size()); + } }