Skip to content

Commit

Permalink
Move sending requests and reading responses for multiplex workers int…
Browse files Browse the repository at this point in the history
…o separate subthreads.

Nice symmetry here, but more importantly prevents dynamic execution from killing the workers. Also makes the code nicer.

RELNOTES: None.
PiperOrigin-RevId: 352389574
  • Loading branch information
larsrc-google authored and copybara-github committed Jan 18, 2021
1 parent bbac0c3 commit 80c03ef
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 105 deletions.
224 changes: 123 additions & 101 deletions src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

package com.google.devtools.build.lib.worker;


import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.flogger.GoogleLogger;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.EventHandler;
import com.google.devtools.build.lib.shell.Subprocess;
Expand All @@ -31,8 +31,10 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import javax.annotation.Nullable;

Expand All @@ -43,8 +45,14 @@
* into them to send requests. When a worker process returns a {@code WorkResponse}, {@code
* WorkerMultiplexer} wakes up the relevant {@code WorkerProxy} to retrieve the response.
*/
public class WorkerMultiplexer extends Thread {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
public class WorkerMultiplexer {
/**
* A queue of {@link WorkRequest} instances that need to be sent to the worker. {@link
* WorkerProxy} instances add to this queue, while the requestSender subthread remove requests and
* send them to the worker. This prevents dynamic execution interrupts from corrupting the {@code
* stdin} of the worker process.
*/
private final BlockingQueue<WorkRequest> pendingRequests = new LinkedBlockingQueue<>();
/**
* A map of {@code WorkResponse}s received from the worker process. They are stored in this map
* keyed by the request id until the corresponding {@code WorkerProxy} picks them up.
Expand Down Expand Up @@ -80,6 +88,12 @@ public class WorkerMultiplexer extends Thread {
/** For testing only, allow a way to fake subprocesses. */
private SubprocessFactory subprocessFactory;

/** A separate thread that sends requests. */
private Thread requestSender;

/** A separate thread that receives responses. */
private Thread responseReceiver;

/**
* The active Reporter object, non-null if {@code --worker_verbose} is set. This must be cleared
* at the end of a command execution.
Expand All @@ -97,16 +111,15 @@ synchronized void setReporter(@Nullable EventHandler reporter) {
}

/** Reports a string to the user if reporting is enabled. */
private synchronized void report(String s) {
EventHandler r = this.reporter; // Protect against race condition with setReporter().
if (r != null && s != null) {
r.handle(Event.info(s));
private synchronized void report(@Nullable String s) {
if (this.reporter != null && s != null) {
this.reporter.handle(Event.info(s));
}
}

/**
* Creates a worker process corresponding to this {@code WorkerMultiplexer}, if it doesn't already
* exist. Also makes sure this {@code WorkerMultiplexer} runs as a separate thread.
* exist. Also starts up the subthreads handling reading and writing requests and responses.
*/
public synchronized void createProcess(Path workDir) throws IOException {
if (this.process == null) {
Expand All @@ -129,12 +142,25 @@ public synchronized void createProcess(Path workDir) throws IOException {
processBuilder.setStderr(logFile.getPathFile());
processBuilder.setEnv(workerKey.getEnv());
this.process = processBuilder.start();
String id = workerKey.getMnemonic() + "-" + workerKey.hashCode();
// TODO(larsrc): Consider moving sender/receiver threads into separate classes.
this.requestSender =
new Thread(
() -> {
while (process.isAlive() && sendRequest()) {}
});
this.requestSender.setName("multiplexer-request-sender-" + id);
this.requestSender.start();
this.responseReceiver =
new Thread(
() -> {
while (process.isAlive() && readResponse()) {}
});
this.responseReceiver.setName("multiplexer-response-receiver-" + id);
this.responseReceiver.start();
} else if (!this.process.isAlive()) {
throw new IOException("Process is dead");
}
if (!this.isAlive()) {
this.start();
}
}

/**
Expand All @@ -157,7 +183,10 @@ public synchronized void destroyMultiplexer() {
wasDestroyed = true;
}

/** Destroys the worker subprocess. This might block forever if the subprocess refuses to die. */
/**
* Destroys the worker subprocess. This might block forever if the subprocess refuses to die. It
* is safe to call this multiple times.
*/
private synchronized void destroyProcess() {
boolean wasInterrupted = false;
try {
Expand All @@ -171,6 +200,17 @@ private synchronized void destroyProcess() {
}
}
} finally {
// Stop the subthreads only when the process is dead, or their loops will go on.
if (this.requestSender != null) {
this.requestSender.interrupt();
}
if (this.responseReceiver != null) {
this.responseReceiver.interrupt();
}
// Might as well release any waiting workers
for (Semaphore semaphore : responseChecker.values()) {
semaphore.release();
}
// Read this for detailed explanation: http://www.ibm.com/developerworks/library/j-jtp05236/
if (wasInterrupted) {
Thread.currentThread().interrupt(); // preserve interrupted status
Expand All @@ -183,17 +223,12 @@ private synchronized void destroyProcess() {
* WorkerProxy}, and so is subject to interrupts by dynamic execution.
*/
public synchronized void putRequest(WorkRequest request) throws IOException {
responseChecker.put(request.getRequestId(), new Semaphore(0));
try {
request.writeDelimitedTo(process.getOutputStream());
process.getOutputStream().flush();
} catch (IOException e) {
// We can't know how much of the request was sent, so we have to assume the worker's input
// now contains garbage.
// TODO(b/151767359): Avoid causing garbage! Maybe by sending in a separate thread?
responseChecker.remove(request.getRequestId());
throw e;
if (!process.isAlive()) {
throw new IOException(
"Attempting to send request " + request.getRequestId() + " to dead process");
}
responseChecker.put(request.getRequestId(), new Semaphore(0));
pendingRequests.add(request);
}

/**
Expand All @@ -203,58 +238,99 @@ public synchronized void putRequest(WorkRequest request) throws IOException {
*/
public WorkResponse getResponse(Integer requestId) throws InterruptedException {
try {
if (!process.isAlive()) {
// If the process has died, all we can do is return what may already have been returned.
return workerProcessResponse.get(requestId);
}

Semaphore waitForResponse = responseChecker.get(requestId);

if (waitForResponse == null) {
report("Null response semaphore for " + requestId);
// If the multiplexer is interrupted when a {@code WorkerProxy} is trying to send a request,
// the request is not sent, so there is no need to wait for a response.
return null;
// If there is no semaphore for this request, it probably failed to send, so we just return
// what we got, probably nothing.
return workerProcessResponse.get(requestId);
}

// Wait for the multiplexer to get our response and release this semaphore. The semaphore will
// throw {@code InterruptedException} when the multiplexer is terminated.
waitForResponse.acquire();

WorkResponse workResponse = workerProcessResponse.get(requestId);
return workResponse;
return workerProcessResponse.get(requestId);
} finally {
responseChecker.remove(requestId);
workerProcessResponse.remove(requestId);
}
}

/**
* Waits to read a {@code WorkResponse} from worker process, put that {@code WorkResponse} in
* {@code workerProcessResponse} and release the semaphore for the {@code WorkerProxy}.
* Sends a single pending request, if there are any. Blocks until a request is available.
*
* <p>This is only called on the WorkerMultiplexer thread and so cannot be interrupted by dynamic
* execution cancellation.
* <p>This is only called by the {@code requestSender} thread and so cannot be interrupted by
* dynamic execution cancellation, but only by a call to {@link #destroyProcess()}.
*/
private void waitResponse() throws InterruptedException, IOException {
recordingStream = new RecordingInputStream(this.process.getInputStream());
recordingStream.startRecording(4096);
// TODO(larsrc): Turn this into a loop that also sends requests.
// Allow interrupts while waiting for responses, without conflating it with I/O errors.
while (recordingStream.available() == 0) {
if (!this.process.isAlive()) {
throw new IOException(
String.format("Multiplexer process for %s is dead", workerKey.getMnemonic()));
private boolean sendRequest() {
WorkRequest request;
try {
request = pendingRequests.take();
} catch (InterruptedException e) {
return false;
}
try {
request.writeDelimitedTo(process.getOutputStream());
process.getOutputStream().flush();
} catch (IOException e) {
// We can't know how much of the request was sent, so we have to assume the worker's input
// now contains garbage, and this request is lost.
// TODO(b/177637516): Signal that this action failed for presumably transient reasons.
report("Failed to send request " + request.getRequestId());
Semaphore s = responseChecker.remove(request.getRequestId());
if (s != null) {
s.release();
}
Thread.sleep(1);
// TODO(b/177637516): Leave process in a moribound state so pending responses can be returned.
destroyProcess();
return false;
}
WorkResponse parsedResponse = WorkResponse.parseDelimitedFrom(recordingStream);
return true;
}

// A null parsedResponse can only happen if the input stream is closed, in which case we
/**
* Reads a {@code WorkResponse} from worker process, puts that {@code WorkResponse} in {@code
* workerProcessResponse}, and releases the semaphore for the {@code WorkerProxy}.
*
* <p>This is only called on the readResponses subthread and so cannot be interrupted by dynamic
* execution cancellation, but only by a call to {@link #destroyProcess()}.
*/
private boolean readResponse() {
recordingStream = new RecordingInputStream(process.getInputStream());
recordingStream.startRecording(4096);
WorkResponse parsedResponse;
try {
parsedResponse = WorkResponse.parseDelimitedFrom(recordingStream);
} catch (IOException e) {
if (!(e instanceof InterruptedIOException)) {
report(
String.format(
"Error while reading response from multiplexer process for %s: %s",
workerKey.getMnemonic(), e));
}
// We can't know how much of the response was read, so we have to assume the worker's output
// now contains garbage, and we can't reliably read any further responses.
destroyProcess();
return false;
}
// A null parsedResponse can happen if the input stream is closed, in which case we
// drop everything.
if (parsedResponse == null) {
throw new IOException(
report(
String.format(
"Multiplexer process for %s died while reading response", workerKey.getMnemonic()));
"Multiplexer process for %s has closed its output stream", workerKey.getMnemonic()));
destroyProcess();
return false;
}

int requestId = parsedResponse.getRequestId();

workerProcessResponse.put(requestId, parsedResponse);

// TODO(b/151767359): When allowing cancellation, just remove responses that have no matching
Expand All @@ -267,61 +343,7 @@ private void waitResponse() throws InterruptedException, IOException {
report(String.format("Multiplexer for %s found no semaphore", workerKey.getMnemonic()));
workerProcessResponse.remove(requestId);
}
}

/** The multiplexer thread that listens to the WorkResponse from worker process. */
@Override
public void run() {
while (this.process.isAlive()) {
try {
waitResponse();
} catch (IOException e) {
// We got this exception while reading from the worker's stdout. We can't trust the
// output any more at that point.
if (this.process.isAlive()) {
destroyProcess();
}
if (e instanceof InterruptedIOException) {
report(
String.format(
"Multiplexer process for %s was interrupted during I/O, aborting multiplexer",
workerKey.getMnemonic()));
} else {
// TODO(larsrc): Output the recorded message.
report(
String.format(
"Multiplexer for %s got IOException reading a response, aborting multiplexer",
workerKey.getMnemonic()));
logger.atWarning().withCause(e).log(
"Caught IOException while waiting for worker response. "
+ "It could be because the worker returned an unparseable response.");
}
} catch (InterruptedException e) {
// This should only happen when the Blaze build has been aborted (failed or cancelled). In
// that case, there may still be some outstanding requests in the worker process, which we
// will let fall on the floor, but we still want to leave the process running for the next
// build.
// TODO(b/151767359): Cancel all outstanding requests when cancellation is implemented.
for (Semaphore semaphore : responseChecker.values()) {
semaphore.release();
}
}
}
synchronized (this) {
releaseAllSemaphores();
}
}

/**
* Release all the semaphores and clear the related maps. Must only be called when we are shutting
* down the multiplexer.
*/
private void releaseAllSemaphores() {
for (Semaphore semaphore : responseChecker.values()) {
semaphore.release();
}
responseChecker.clear();
workerProcessResponse.clear();
return true;
}

String getRecordingStreamMessage() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public static synchronized void removeInstance(WorkerKey key) throws UserExecExc
}
instanceInfo.decreaseRefCount();
if (instanceInfo.getRefCount() == 0) {
instanceInfo.getWorkerMultiplexer().interrupt();
instanceInfo.getWorkerMultiplexer().destroyMultiplexer();
multiplexerInstance.remove(key);
}
Expand Down
9 changes: 6 additions & 3 deletions src/test/shell/integration/bazel_worker_multiplexer_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,13 @@ EOF
bazel build :hello_world_1 &> $TEST_log \
|| fail "build failed"

bazel build :hello_world_2 &> $TEST_log \
&& fail "expected build to failed" || true
bazel build --worker_verbose :hello_world_2 >> $TEST_log 2>&1 \
&& fail "expected build to fail" || true

error_msgs=$(egrep -o -- 'Worker process (did not return a|returned an unparseable) WorkResponse' "$TEST_log")

expect_log "Worker process quit or closed its stdin stream when we tried to send a WorkRequest"
[ -n "$error_msgs" ] \
|| fail "expected error message not found"
}

function test_worker_restarts_when_worker_binary_changes() {
Expand Down

0 comments on commit 80c03ef

Please sign in to comment.