Skip to content

Commit

Permalink
More properly destroy workers on interrupt.
Browse files Browse the repository at this point in the history
Interrupts did not wake the workers out of waiting for a response. When running under dynamic execution, that's all we can do until cancellation is implemented. For workers without dynamic execution, the only interrupt is from the build itself getting interrupted, and in that case we don't want to wait - it could take minutes. Instead we then use a busy-wait to notice interrupts and destroy the worker if interrupted. This will be useful even when cancellation is implemented, since not all workers may be able to implement a sensible cancellation.

This also renames mustBeSandboxed on WorkerKey to isSpeculative to separate cause and effect.

RELNOTES: None.
PiperOrigin-RevId: 361802760
  • Loading branch information
larsrc-google authored and philwo committed Apr 19, 2021
1 parent 12d5482 commit c4e436a
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ public void putRequest(WorkRequest request) throws IOException {

@Override
public WorkResponse getResponse() throws IOException {
boolean interrupted = Thread.interrupted();
try {
return parseResponse();
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}

private WorkResponse parseResponse() throws IOException {
Integer exitCode = null;
String output = null;
Integer requestId = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,14 @@ public void putRequest(WorkRequest request) throws IOException {

@Override
public WorkResponse getResponse() throws IOException {
return WorkResponse.parseDelimitedFrom(workersStdout);
boolean interrupted = Thread.interrupted();
try {
return WorkResponse.parseDelimitedFrom(workersStdout);
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,28 @@ void putRequest(WorkRequest request) throws IOException {
}

@Override
WorkResponse getResponse(int requestId) throws IOException {
WorkResponse getResponse(int requestId) throws IOException, InterruptedException {
recordingInputStream.startRecording(4096);
// Ironically, we don't allow interrupts during dynamic execution, since we can't cancel
// the worker short of destroying it.
if (!workerKey.isSpeculative()) {
while (recordingInputStream.available() == 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// This should only happen when not in dynamic execution, so we can safely kill the
// worker.
destroy();
throw e;
}
if (!process.isAlive()) {
throw new IOException(
String.format(
"Worker process for %s died while waiting for response",
workerKey.getMnemonic()));
}
}
}
return workerProtocol.getResponse();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public Worker create(WorkerKey key) {
workerBaseDir.getRelative(workTypeName + "-" + workerId + "-" + key.getMnemonic() + ".log");

Worker worker;
boolean sandboxed = workerOptions.workerSandboxing || key.mustBeSandboxed();
boolean sandboxed = workerOptions.workerSandboxing || key.isSpeculative();
if (sandboxed) {
Path workDir = getSandboxedWorkerPath(key, workerId);
worker = new SandboxedWorker(key, workerId, workDir, logFile);
Expand Down Expand Up @@ -124,30 +124,18 @@ public boolean validateObject(WorkerKey key, PooledObject<Worker> p) {
Worker worker = p.getObject();
Optional<Integer> exitValue = worker.getExitValue();
if (exitValue.isPresent()) {
if (workerOptions.workerVerbose) {
if (worker.diedUnexpectedly()) {
String msg =
String.format(
"%s %s (id %d) has unexpectedly died with exit code %d.",
key.getMnemonic(),
key.getWorkerTypeName(),
worker.getWorkerId(),
exitValue.get());
ErrorMessage errorMessage =
ErrorMessage.builder()
.message(msg)
.logFile(worker.getLogFile())
.logSizeLimit(4096)
.build();
reporter.handle(Event.warn(errorMessage.toString()));
} else {
// Can't rule this out entirely, but it's not an unexpected death.
String msg =
String.format(
"%s %s (id %d) was destroyed, but is still in the worker pool.",
key.getMnemonic(), key.getWorkerTypeName(), worker.getWorkerId());
reporter.handle(Event.info(msg));
}
if (workerOptions.workerVerbose && worker.diedUnexpectedly()) {
String msg =
String.format(
"%s %s (id %d) has unexpectedly died with exit code %d.",
key.getMnemonic(), key.getWorkerTypeName(), worker.getWorkerId(), exitValue.get());
ErrorMessage errorMessage =
ErrorMessage.builder()
.message(msg)
.logFile(worker.getLogFile())
.logSizeLimit(4096)
.build();
reporter.handle(Event.warn(errorMessage.toString()));
}
return false;
}
Expand Down
33 changes: 15 additions & 18 deletions src/main/java/com/google/devtools/build/lib/worker/WorkerKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@
* break various things as well as render the workers less useful.
*/
final class WorkerKey {
/** Build options. */
private final ImmutableList<String> args;
/** Environment variables. */
private final ImmutableMap<String, String> env;
/** Execution root of Bazel process. */
private final Path execRoot;
/** Mnemonic of the worker. */
private final String mnemonic;

/**
Expand All @@ -43,16 +47,18 @@ final class WorkerKey {
* methods.
*/
private final HashCode workerFilesCombinedHash;
/** Worker files with the corresponding hash code. */
private final SortedMap<PathFragment, HashCode> workerFilesWithHashes;
private final boolean mustBeSandboxed;
/** Set it to true if this job is running speculatively and thus likely to be interrupted. */
private final boolean isSpeculative;
/** A WorkerProxy will be instantiated if true, instantiate a regular Worker if false. */
private final boolean proxied;
/**
* Cached value for the hash of this key, because the value is expensive to calculate
* (ImmutableMap and ImmutableList do not cache their hashcodes.
*/
private final int hash;

/** The format of the worker protocol sent to and read from the worker. */
private final WorkerProtocolFormat protocolFormat;

WorkerKey(
Expand All @@ -62,26 +68,17 @@ final class WorkerKey {
String mnemonic,
HashCode workerFilesCombinedHash,
SortedMap<PathFragment, HashCode> workerFilesWithHashes,
boolean mustBeSandboxed,
boolean isSpeculative,
boolean proxied,
WorkerProtocolFormat protocolFormat) {
/** Build options. */
this.args = Preconditions.checkNotNull(args);
/** Environment variables. */
this.env = Preconditions.checkNotNull(env);
/** Execution root of Bazel process. */
this.execRoot = Preconditions.checkNotNull(execRoot);
/** Mnemonic of the worker. */
this.mnemonic = Preconditions.checkNotNull(mnemonic);
/** One combined hash code for all files. */
this.workerFilesCombinedHash = Preconditions.checkNotNull(workerFilesCombinedHash);
/** Worker files with the corresponding hash code. */
this.workerFilesWithHashes = Preconditions.checkNotNull(workerFilesWithHashes);
/** Set it to true if this job should be run in sandbox. */
this.mustBeSandboxed = mustBeSandboxed;
/** Set it to true if this job should be run with WorkerProxy. */
this.isSpeculative = isSpeculative;
this.proxied = proxied;
/** The format of the worker protocol sent to and read from the worker. */
this.protocolFormat = protocolFormat;

hash = calculateHashCode();
Expand Down Expand Up @@ -117,9 +114,9 @@ public SortedMap<PathFragment, HashCode> getWorkerFilesWithHashes() {
return workerFilesWithHashes;
}

/** Getter function for variable mustBeSandboxed. */
public boolean mustBeSandboxed() {
return mustBeSandboxed;
/** Returns true if workers are run speculatively. */
public boolean isSpeculative() {
return isSpeculative;
}

/** Getter function for variable proxied. */
Expand All @@ -128,7 +125,7 @@ public boolean getProxied() {
}

public boolean isMultiplex() {
return getProxied() && !mustBeSandboxed;
return getProxied() && !isSpeculative;
}

/** Returns the format of the worker protocol. */
Expand All @@ -147,7 +144,7 @@ public static String makeWorkerTypeName(boolean proxied, boolean mustBeSandboxed

/** Returns a user-friendly name for this worker type. */
public String getWorkerTypeName() {
return makeWorkerTypeName(proxied, mustBeSandboxed);
return makeWorkerTypeName(proxied, isSpeculative);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.eventbus.Subscribe;
import com.google.devtools.build.lib.buildtool.buildevent.BuildCompleteEvent;
import com.google.devtools.build.lib.buildtool.buildevent.BuildInterruptedEvent;
import com.google.devtools.build.lib.buildtool.buildevent.BuildStartingEvent;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.exec.ExecutionOptions;
Expand Down Expand Up @@ -72,7 +71,8 @@ public void cleanStarting(CleanStartingEvent event) {
this.options = event.getOptionsProvider().getOptions(WorkerOptions.class);
workerFactory.setReporter(env.getReporter());
workerFactory.setOptions(options);
shutdownPool("Clean command is running, shutting down worker pool...");
shutdownPool(
"Clean command is running, shutting down worker pool...", /* alwaysLog= */ false);
}
}

Expand Down Expand Up @@ -179,31 +179,10 @@ public void registerSpawnStrategies(
@Subscribe
public void buildComplete(BuildCompleteEvent event) {
if (options != null && options.workerQuitAfterBuild) {
shutdownPool("Build completed, shutting down worker pool...");
}
}

/**
* Stops any workers that are still executing.
*
* <p>This currently kills off some amount of workers, losing the warmed-up state.
* TODO(b/119701157): Cancel running workers instead (requires some way to reach each worker).
*/
@Subscribe
public void buildInterrupted(BuildInterruptedEvent event) {
if (workerPool != null) {
if ((options != null && options.workerVerbose)) {
env.getReporter().handle(Event.info("Build interrupted, stopping active workers..."));
}
workerPool.stopWork();
shutdownPool("Build completed, shutting down worker pool...", /* alwaysLog= */ false);
}
}

/** Shuts down the worker pool and sets {#code workerPool} to null. */
private void shutdownPool(String reason) {
shutdownPool(reason, /* alwaysLog= */ false);
}

/** Shuts down the worker pool and sets {#code workerPool} to null. */
private void shutdownPool(String reason, boolean alwaysLog) {
Preconditions.checkArgument(!reason.isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ private boolean sendRequest() {
*
* <p>This is only called on the readResponses subthread and so cannot be interrupted by dynamic
* execution cancellation, but only by a call to {@link #destroyProcess()}.
*
* @return True if the worker is still in a consistent state.
*/
private boolean readResponse() {
WorkResponse parsedResponse;
Expand Down
24 changes: 4 additions & 20 deletions src/main/java/com/google/devtools/build/lib/worker/WorkerPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,28 +189,12 @@ private void waitForHighPriorityWorkersToFinish() throws InterruptedException {
}
}

/**
* Closes all the worker pools, destroying the workers in the process. This waits for any
* currently-ongoing work to finish.
*/
public void close() {
workerPools.values().forEach(GenericKeyedObjectPool::close);
multiplexPools.values().forEach(GenericKeyedObjectPool::close);
}

/** Stops any ongoing work in the worker pools. This may entail killing the worker processes. */
public void stopWork() {
workerPools
.values()
.forEach(
pool -> {
if (pool.getNumActive() > 0) {
pool.clear();
}
});
multiplexPools
.values()
.forEach(
pool -> {
if (pool.getNumActive() > 0) {
pool.clear();
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,6 @@ WorkResponse execInWorker(
Worker worker = null;
WorkResponse response;
WorkRequest request;

ActionExecutionMetadata owner = spawn.getResourceOwner();
try {
Stopwatch setupInputsStopwatch = Stopwatch.createStarted();
Expand Down
Loading

0 comments on commit c4e436a

Please sign in to comment.