Skip to content

Commit

Permalink
Adding --experimental_skyframe_cpu_heavy_skykeys_thread_pool_size to …
Browse files Browse the repository at this point in the history
…blaze.

This flag allows the loading/analysis phase of Skyframe to use 2 independent thread pools:
- 1 larger one for IO-bound tasks, and
- 1 smaller one (size ~= numCores) for CPU-bound tasks

This is currently not applied to the execution phase, since ActionExecutionFunction is both IO and CPU bound.

PiperOrigin-RevId: 371763857
  • Loading branch information
joeleba authored and copybara-github committed May 3, 2021
1 parent e5a170f commit 16c0408
Show file tree
Hide file tree
Showing 28 changed files with 646 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@
* are subclasses of {@link ActionLookupKey}. This allows callers to easily find the value key,
* while remaining agnostic to what action lookup values actually exist.
*/
public interface ActionLookupKey extends ArtifactOwner, SkyKey, CPUHeavySkyKey {}
public interface ActionLookupKey extends ArtifactOwner, CPUHeavySkyKey {}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.google.devtools.build.lib.analysis;

import com.google.devtools.build.lib.util.CpuResourceConverter;
import com.google.devtools.build.lib.util.RegexFilter;
import com.google.devtools.common.options.Option;
import com.google.devtools.common.options.OptionDocumentationCategory;
Expand Down Expand Up @@ -120,4 +121,21 @@ public class AnalysisOptions extends OptionsBase {
help =
"Check for action prefix file path conflicts, regardless of action-specific overrides.")
public boolean strictConflictChecks;

@Option(
name = "experimental_skyframe_cpu_heavy_skykeys_thread_pool_size",
defaultValue = "0",
documentationCategory = OptionDocumentationCategory.UNDOCUMENTED,
metadataTags = OptionMetadataTag.EXPERIMENTAL,
effectTags = {
OptionEffectTag.LOADING_AND_ANALYSIS,
OptionEffectTag.BAZEL_INTERNAL_CONFIGURATION
},
help =
"If set to a positive value (e.g. \"HOST_CPUS*1.5\"), Skyframe will run the"
+ " loading/analysis phase with 2 separate thread pools: 1 with <value> threads"
+ " (ideally close to HOST_CPUS) reserved for CPU-heavy SkyKeys, and 1 \"standard\""
+ " thread pool (whose size is controlled by --loading_phase_threads) for the rest.",
converter = CpuResourceConverter.class)
public int cpuHeavySkyKeysThreadPoolSize;
}
1 change: 1 addition & 0 deletions src/main/java/com/google/devtools/build/lib/analysis/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ java_library(
srcs = ["AnalysisOptions.java"],
deps = [
"//src/main/java/com/google/devtools/build/lib/util",
"//src/main/java/com/google/devtools/build/lib/util:cpu_resource_converter",
"//src/main/java/com/google/devtools/common/options",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,8 @@ public AnalysisResult update(
keepGoing,
loadingPhaseThreads,
viewOptions.strictConflictChecks,
checkForActionConflicts);
checkForActionConflicts,
viewOptions.cpuHeavySkyKeysThreadPoolSize);
setArtifactRoots(skyframeAnalysisResult.getPackageRoots());
} finally {
skyframeBuildView.clearInvalidatedActionLookupKeys();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ protected AbstractQueueVisitor(
/*usingPriorityQueue=*/ false);
}

private AbstractQueueVisitor(
protected AbstractQueueVisitor(
ExecutorService executorService,
boolean shutdownOnCompletion,
boolean failFastOnException,
Expand Down Expand Up @@ -278,6 +278,10 @@ public final void awaitQuiescence(boolean interruptWorkers) throws InterruptedEx
/** Schedules a call. Called in a worker thread. */
@Override
public final void execute(Runnable runnable) {
executeWithExecutorService(runnable, executorService);
}

protected void executeWithExecutorService(Runnable runnable, ExecutorService executorService) {
if (usingPriorityQueue) {
Preconditions.checkState(runnable instanceof Comparable);
}
Expand All @@ -290,7 +294,7 @@ public final void execute(Runnable runnable) {
Preconditions.checkState(
tasks > 0,
"Incrementing remaining tasks counter resulted in impossible non-positive number.");
executeRunnable(wrappedRunnable);
executeWrappedRunnable(wrappedRunnable, executorService);
} catch (Throwable e) {
if (!wrappedRunnable.ran) {
// Note that keeping track of ranTask is necessary to disambiguate the case where
Expand All @@ -302,7 +306,7 @@ public final void execute(Runnable runnable) {
}
}

protected void executeRunnable(WrappedRunnable runnable) {
protected void executeWrappedRunnable(WrappedRunnable runnable, ExecutorService executorService) {
executorService.execute(runnable);
}

Expand Down Expand Up @@ -476,6 +480,10 @@ public final long getTaskCount() {
return remainingTasks.get();
}

protected ExecutorService getExecutorService() {
return executorService;
}

@Override
public void dependOnFuture(ListenableFuture<?> future) throws InterruptedException {
outstandingFuturesLock.readLock().lock();
Expand Down Expand Up @@ -575,6 +583,11 @@ private void reallyAwaitTermination(boolean interruptWorkers) {
}

if (ownExecutorService) {
shutdownExecutorService(catastrophe);
}
}

protected void shutdownExecutorService(Throwable catastrophe) {
executorService.shutdown();
for (; ; ) {
try {
Expand All @@ -585,7 +598,6 @@ private void reallyAwaitTermination(boolean interruptWorkers) {
setInterrupted();
}
}
}
}

private void interruptInFlightTasks() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2021 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.concurrent;

import static java.util.concurrent.TimeUnit.SECONDS;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.util.concurrent.ExecutorService;

/**
* An implementation of MultiThreadPoolsQuiescingExecutor that has 2 ExecutorServices, one with a
* larger thread pool for IO/Network-bound tasks, and one with a smaller thread pool for CPU-bound
* tasks.
*/
public class DualExecutorQueueVisitor extends AbstractQueueVisitor
implements MultiThreadPoolsQuiescingExecutor {
private final ExecutorService regularPoolExecutorService;
private final ExecutorService cpuHeavyPoolExecutorService;

private DualExecutorQueueVisitor(
ExecutorService regularPoolExecutorService,
ExecutorService cpuHeavyPoolExecutorService,
boolean shutdownOnCompletion,
boolean failFastOnException,
ErrorClassifier errorClassifier,
boolean usingPriorityQueue) {
super(
regularPoolExecutorService,
shutdownOnCompletion,
failFastOnException,
errorClassifier,
usingPriorityQueue);
this.regularPoolExecutorService = super.getExecutorService();
this.cpuHeavyPoolExecutorService = Preconditions.checkNotNull(cpuHeavyPoolExecutorService);
}

public static AbstractQueueVisitor createWithExecutorServices(
ExecutorService executorService,
ExecutorService cpuExecutorService,
boolean failFastOnException,
ErrorClassifier errorClassifier) {
return new DualExecutorQueueVisitor(
executorService,
cpuExecutorService,
/*shutdownOnCompletion=*/ true,
failFastOnException,
errorClassifier,
/*usingPriorityQueue=*/ false);
}

@Override
public final void execute(Runnable runnable, ThreadPoolType threadPoolType) {
super.executeWithExecutorService(runnable, getExecutorServiceByThreadPoolType(threadPoolType));
}

@VisibleForTesting
ExecutorService getExecutorServiceByThreadPoolType(ThreadPoolType threadPoolType) {
switch (threadPoolType) {
case REGULAR:
return regularPoolExecutorService;
case CPU_HEAVY:
return cpuHeavyPoolExecutorService;
}
throw new IllegalStateException("Invalid ThreadPoolType: " + threadPoolType);
}

@Override
protected void shutdownExecutorService(Throwable catastrophe) {
if (catastrophe != null) {
Throwables.throwIfUnchecked(catastrophe);
}
internalShutdownExecutorService(regularPoolExecutorService);
internalShutdownExecutorService(cpuHeavyPoolExecutorService);
}

private void internalShutdownExecutorService(ExecutorService executorService) {
executorService.shutdown();
while (true) {
try {
executorService.awaitTermination(Integer.MAX_VALUE, SECONDS);
break;
} catch (InterruptedException e) {
setInterrupted();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.google.devtools.build.lib.concurrent;

import com.google.common.base.Preconditions;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -92,12 +93,12 @@ public static Builder newBuilder() {
}

@Override
protected void executeRunnable(WrappedRunnable runnable) {
protected void executeWrappedRunnable(WrappedRunnable runnable, ExecutorService executorService) {
if (ForkJoinTask.inForkJoinPool()) {
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError = ForkJoinTask.adapt(runnable).fork();
} else {
super.executeRunnable(runnable);
super.executeWrappedRunnable(runnable, executorService);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2021 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.concurrent;

/** An Executor that can execute tasks in multiple independent thread pools. */
public interface MultiThreadPoolsQuiescingExecutor extends QuiescingExecutor {
/** The types of thread pools to use. */
enum ThreadPoolType {
// Suitable for CPU-heavy tasks. Ideally the number of threads is close to the machine's number
// of cores.
CPU_HEAVY,
REGULAR
}

/** Execute the runnable, taking into consideration the preferred thread pool type. */
void execute(Runnable runnable, ThreadPoolType threadPoolType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -386,14 +386,20 @@ public SkyframeAnalysisResult configureTargets(
boolean keepGoing,
int numThreads,
boolean strictConflictChecks,
boolean checkForActionConflicts)
boolean checkForActionConflicts,
int cpuHeavySkyKeysThreadPoolSize)
throws InterruptedException, ViewCreationFailedException {
enableAnalysis(true);
EvaluationResult<ActionLookupValue> result;
try (SilentCloseable c = Profiler.instance().profile("skyframeExecutor.configureTargets")) {
result =
skyframeExecutor.configureTargets(
eventHandler, ctKeys, aspectKeys, keepGoing, numThreads);
eventHandler,
ctKeys,
aspectKeys,
keepGoing,
numThreads,
cpuHeavySkyKeysThreadPoolSize);
} finally {
enableAnalysis(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2317,7 +2317,8 @@ EvaluationResult<ActionLookupValue> configureTargets(
List<ConfiguredTargetKey> values,
List<AspectValueKey> aspectKeys,
boolean keepGoing,
int numThreads)
int numThreads,
int cpuHeavySkyKeysThreadPoolSize)
throws InterruptedException {
checkActive();

Expand All @@ -2328,6 +2329,7 @@ EvaluationResult<ActionLookupValue> configureTargets(
.setNumThreads(numThreads)
.setExecutorServiceSupplier(
() -> NamedForkJoinPool.newNamedPool("skyframe-evaluator", numThreads))
.setCPUHeavySkyKeysThreadPoolSize(cpuHeavySkyKeysThreadPoolSize)
.setEventHandler(eventHandler)
.build();
EvaluationResult<ActionLookupValue> result =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public CpuResourceConverter() {
ImmutableMap.of(
"HOST_CPUS",
() -> (int) Math.ceil(LocalHostCapacity.getLocalHostCapacity().getCpuUsage())),
1,
/*minValue=*/ 0,
Integer.MAX_VALUE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public abstract class AbstractExceptionalParallelEvaluator<E extends Exception>
GraphInconsistencyReceiver graphInconsistencyReceiver,
Supplier<ExecutorService> executorService,
CycleDetector cycleDetector,
EvaluationVersionBehavior evaluationVersionBehavior) {
EvaluationVersionBehavior evaluationVersionBehavior,
int cpuHeavySkyKeysThreadPoolSize) {
super(
graph,
graphVersion,
Expand All @@ -110,7 +111,8 @@ public abstract class AbstractExceptionalParallelEvaluator<E extends Exception>
graphInconsistencyReceiver,
executorService,
cycleDetector,
evaluationVersionBehavior);
evaluationVersionBehavior,
cpuHeavySkyKeysThreadPoolSize);
}

private void informProgressReceiverThatValueIsDone(SkyKey key, NodeEntry entry)
Expand Down
Loading

0 comments on commit 16c0408

Please sign in to comment.