From 80c03ef14a1842d1e3475b1adf98adeb05df33f9 Mon Sep 17 00:00:00 2001 From: larsrc Date: Mon, 18 Jan 2021 04:38:43 -0800 Subject: [PATCH] Move sending requests and reading responses for multiplex workers into separate subthreads. Nice symmetry here, but more importantly prevents dynamic execution from killing the workers. Also makes the code nicer. RELNOTES: None. PiperOrigin-RevId: 352389574 --- .../build/lib/worker/WorkerMultiplexer.java | 224 ++++++++++-------- .../lib/worker/WorkerMultiplexerManager.java | 1 - .../bazel_worker_multiplexer_test.sh | 9 +- 3 files changed, 129 insertions(+), 105 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java index 1a669477b56bc5..8b7ce76f78b293 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java @@ -14,9 +14,9 @@ package com.google.devtools.build.lib.worker; + import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; -import com.google.common.flogger.GoogleLogger; import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.events.EventHandler; import com.google.devtools.build.lib.shell.Subprocess; @@ -31,8 +31,10 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import javax.annotation.Nullable; @@ -43,8 +45,14 @@ * into them to send requests. When a worker process returns a {@code WorkResponse}, {@code * WorkerMultiplexer} wakes up the relevant {@code WorkerProxy} to retrieve the response. */ -public class WorkerMultiplexer extends Thread { - private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); +public class WorkerMultiplexer { + /** + * A queue of {@link WorkRequest} instances that need to be sent to the worker. {@link + * WorkerProxy} instances add to this queue, while the requestSender subthread remove requests and + * send them to the worker. This prevents dynamic execution interrupts from corrupting the {@code + * stdin} of the worker process. + */ + private final BlockingQueue pendingRequests = new LinkedBlockingQueue<>(); /** * A map of {@code WorkResponse}s received from the worker process. They are stored in this map * keyed by the request id until the corresponding {@code WorkerProxy} picks them up. @@ -80,6 +88,12 @@ public class WorkerMultiplexer extends Thread { /** For testing only, allow a way to fake subprocesses. */ private SubprocessFactory subprocessFactory; + /** A separate thread that sends requests. */ + private Thread requestSender; + + /** A separate thread that receives responses. */ + private Thread responseReceiver; + /** * The active Reporter object, non-null if {@code --worker_verbose} is set. This must be cleared * at the end of a command execution. @@ -97,16 +111,15 @@ synchronized void setReporter(@Nullable EventHandler reporter) { } /** Reports a string to the user if reporting is enabled. */ - private synchronized void report(String s) { - EventHandler r = this.reporter; // Protect against race condition with setReporter(). - if (r != null && s != null) { - r.handle(Event.info(s)); + private synchronized void report(@Nullable String s) { + if (this.reporter != null && s != null) { + this.reporter.handle(Event.info(s)); } } /** * Creates a worker process corresponding to this {@code WorkerMultiplexer}, if it doesn't already - * exist. Also makes sure this {@code WorkerMultiplexer} runs as a separate thread. + * exist. Also starts up the subthreads handling reading and writing requests and responses. */ public synchronized void createProcess(Path workDir) throws IOException { if (this.process == null) { @@ -129,12 +142,25 @@ public synchronized void createProcess(Path workDir) throws IOException { processBuilder.setStderr(logFile.getPathFile()); processBuilder.setEnv(workerKey.getEnv()); this.process = processBuilder.start(); + String id = workerKey.getMnemonic() + "-" + workerKey.hashCode(); + // TODO(larsrc): Consider moving sender/receiver threads into separate classes. + this.requestSender = + new Thread( + () -> { + while (process.isAlive() && sendRequest()) {} + }); + this.requestSender.setName("multiplexer-request-sender-" + id); + this.requestSender.start(); + this.responseReceiver = + new Thread( + () -> { + while (process.isAlive() && readResponse()) {} + }); + this.responseReceiver.setName("multiplexer-response-receiver-" + id); + this.responseReceiver.start(); } else if (!this.process.isAlive()) { throw new IOException("Process is dead"); } - if (!this.isAlive()) { - this.start(); - } } /** @@ -157,7 +183,10 @@ public synchronized void destroyMultiplexer() { wasDestroyed = true; } - /** Destroys the worker subprocess. This might block forever if the subprocess refuses to die. */ + /** + * Destroys the worker subprocess. This might block forever if the subprocess refuses to die. It + * is safe to call this multiple times. + */ private synchronized void destroyProcess() { boolean wasInterrupted = false; try { @@ -171,6 +200,17 @@ private synchronized void destroyProcess() { } } } finally { + // Stop the subthreads only when the process is dead, or their loops will go on. + if (this.requestSender != null) { + this.requestSender.interrupt(); + } + if (this.responseReceiver != null) { + this.responseReceiver.interrupt(); + } + // Might as well release any waiting workers + for (Semaphore semaphore : responseChecker.values()) { + semaphore.release(); + } // Read this for detailed explanation: http://www.ibm.com/developerworks/library/j-jtp05236/ if (wasInterrupted) { Thread.currentThread().interrupt(); // preserve interrupted status @@ -183,17 +223,12 @@ private synchronized void destroyProcess() { * WorkerProxy}, and so is subject to interrupts by dynamic execution. */ public synchronized void putRequest(WorkRequest request) throws IOException { - responseChecker.put(request.getRequestId(), new Semaphore(0)); - try { - request.writeDelimitedTo(process.getOutputStream()); - process.getOutputStream().flush(); - } catch (IOException e) { - // We can't know how much of the request was sent, so we have to assume the worker's input - // now contains garbage. - // TODO(b/151767359): Avoid causing garbage! Maybe by sending in a separate thread? - responseChecker.remove(request.getRequestId()); - throw e; + if (!process.isAlive()) { + throw new IOException( + "Attempting to send request " + request.getRequestId() + " to dead process"); } + responseChecker.put(request.getRequestId(), new Semaphore(0)); + pendingRequests.add(request); } /** @@ -203,21 +238,25 @@ public synchronized void putRequest(WorkRequest request) throws IOException { */ public WorkResponse getResponse(Integer requestId) throws InterruptedException { try { + if (!process.isAlive()) { + // If the process has died, all we can do is return what may already have been returned. + return workerProcessResponse.get(requestId); + } + Semaphore waitForResponse = responseChecker.get(requestId); if (waitForResponse == null) { report("Null response semaphore for " + requestId); - // If the multiplexer is interrupted when a {@code WorkerProxy} is trying to send a request, - // the request is not sent, so there is no need to wait for a response. - return null; + // If there is no semaphore for this request, it probably failed to send, so we just return + // what we got, probably nothing. + return workerProcessResponse.get(requestId); } // Wait for the multiplexer to get our response and release this semaphore. The semaphore will // throw {@code InterruptedException} when the multiplexer is terminated. waitForResponse.acquire(); - WorkResponse workResponse = workerProcessResponse.get(requestId); - return workResponse; + return workerProcessResponse.get(requestId); } finally { responseChecker.remove(requestId); workerProcessResponse.remove(requestId); @@ -225,36 +264,73 @@ public WorkResponse getResponse(Integer requestId) throws InterruptedException { } /** - * Waits to read a {@code WorkResponse} from worker process, put that {@code WorkResponse} in - * {@code workerProcessResponse} and release the semaphore for the {@code WorkerProxy}. + * Sends a single pending request, if there are any. Blocks until a request is available. * - *

This is only called on the WorkerMultiplexer thread and so cannot be interrupted by dynamic - * execution cancellation. + *

This is only called by the {@code requestSender} thread and so cannot be interrupted by + * dynamic execution cancellation, but only by a call to {@link #destroyProcess()}. */ - private void waitResponse() throws InterruptedException, IOException { - recordingStream = new RecordingInputStream(this.process.getInputStream()); - recordingStream.startRecording(4096); - // TODO(larsrc): Turn this into a loop that also sends requests. - // Allow interrupts while waiting for responses, without conflating it with I/O errors. - while (recordingStream.available() == 0) { - if (!this.process.isAlive()) { - throw new IOException( - String.format("Multiplexer process for %s is dead", workerKey.getMnemonic())); + private boolean sendRequest() { + WorkRequest request; + try { + request = pendingRequests.take(); + } catch (InterruptedException e) { + return false; + } + try { + request.writeDelimitedTo(process.getOutputStream()); + process.getOutputStream().flush(); + } catch (IOException e) { + // We can't know how much of the request was sent, so we have to assume the worker's input + // now contains garbage, and this request is lost. + // TODO(b/177637516): Signal that this action failed for presumably transient reasons. + report("Failed to send request " + request.getRequestId()); + Semaphore s = responseChecker.remove(request.getRequestId()); + if (s != null) { + s.release(); } - Thread.sleep(1); + // TODO(b/177637516): Leave process in a moribound state so pending responses can be returned. + destroyProcess(); + return false; } - WorkResponse parsedResponse = WorkResponse.parseDelimitedFrom(recordingStream); + return true; + } - // A null parsedResponse can only happen if the input stream is closed, in which case we + /** + * Reads a {@code WorkResponse} from worker process, puts that {@code WorkResponse} in {@code + * workerProcessResponse}, and releases the semaphore for the {@code WorkerProxy}. + * + *

This is only called on the readResponses subthread and so cannot be interrupted by dynamic + * execution cancellation, but only by a call to {@link #destroyProcess()}. + */ + private boolean readResponse() { + recordingStream = new RecordingInputStream(process.getInputStream()); + recordingStream.startRecording(4096); + WorkResponse parsedResponse; + try { + parsedResponse = WorkResponse.parseDelimitedFrom(recordingStream); + } catch (IOException e) { + if (!(e instanceof InterruptedIOException)) { + report( + String.format( + "Error while reading response from multiplexer process for %s: %s", + workerKey.getMnemonic(), e)); + } + // We can't know how much of the response was read, so we have to assume the worker's output + // now contains garbage, and we can't reliably read any further responses. + destroyProcess(); + return false; + } + // A null parsedResponse can happen if the input stream is closed, in which case we // drop everything. if (parsedResponse == null) { - throw new IOException( + report( String.format( - "Multiplexer process for %s died while reading response", workerKey.getMnemonic())); + "Multiplexer process for %s has closed its output stream", workerKey.getMnemonic())); + destroyProcess(); + return false; } int requestId = parsedResponse.getRequestId(); - workerProcessResponse.put(requestId, parsedResponse); // TODO(b/151767359): When allowing cancellation, just remove responses that have no matching @@ -267,61 +343,7 @@ private void waitResponse() throws InterruptedException, IOException { report(String.format("Multiplexer for %s found no semaphore", workerKey.getMnemonic())); workerProcessResponse.remove(requestId); } - } - - /** The multiplexer thread that listens to the WorkResponse from worker process. */ - @Override - public void run() { - while (this.process.isAlive()) { - try { - waitResponse(); - } catch (IOException e) { - // We got this exception while reading from the worker's stdout. We can't trust the - // output any more at that point. - if (this.process.isAlive()) { - destroyProcess(); - } - if (e instanceof InterruptedIOException) { - report( - String.format( - "Multiplexer process for %s was interrupted during I/O, aborting multiplexer", - workerKey.getMnemonic())); - } else { - // TODO(larsrc): Output the recorded message. - report( - String.format( - "Multiplexer for %s got IOException reading a response, aborting multiplexer", - workerKey.getMnemonic())); - logger.atWarning().withCause(e).log( - "Caught IOException while waiting for worker response. " - + "It could be because the worker returned an unparseable response."); - } - } catch (InterruptedException e) { - // This should only happen when the Blaze build has been aborted (failed or cancelled). In - // that case, there may still be some outstanding requests in the worker process, which we - // will let fall on the floor, but we still want to leave the process running for the next - // build. - // TODO(b/151767359): Cancel all outstanding requests when cancellation is implemented. - for (Semaphore semaphore : responseChecker.values()) { - semaphore.release(); - } - } - } - synchronized (this) { - releaseAllSemaphores(); - } - } - - /** - * Release all the semaphores and clear the related maps. Must only be called when we are shutting - * down the multiplexer. - */ - private void releaseAllSemaphores() { - for (Semaphore semaphore : responseChecker.values()) { - semaphore.release(); - } - responseChecker.clear(); - workerProcessResponse.clear(); + return true; } String getRecordingStreamMessage() { diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java index aefabf0a4306ee..0a58cd539f7f81 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java @@ -80,7 +80,6 @@ public static synchronized void removeInstance(WorkerKey key) throws UserExecExc } instanceInfo.decreaseRefCount(); if (instanceInfo.getRefCount() == 0) { - instanceInfo.getWorkerMultiplexer().interrupt(); instanceInfo.getWorkerMultiplexer().destroyMultiplexer(); multiplexerInstance.remove(key); } diff --git a/src/test/shell/integration/bazel_worker_multiplexer_test.sh b/src/test/shell/integration/bazel_worker_multiplexer_test.sh index e04821619ae3e0..e98c46b33bb7aa 100755 --- a/src/test/shell/integration/bazel_worker_multiplexer_test.sh +++ b/src/test/shell/integration/bazel_worker_multiplexer_test.sh @@ -205,10 +205,13 @@ EOF bazel build :hello_world_1 &> $TEST_log \ || fail "build failed" - bazel build :hello_world_2 &> $TEST_log \ - && fail "expected build to failed" || true + bazel build --worker_verbose :hello_world_2 >> $TEST_log 2>&1 \ + && fail "expected build to fail" || true + + error_msgs=$(egrep -o -- 'Worker process (did not return a|returned an unparseable) WorkResponse' "$TEST_log") - expect_log "Worker process quit or closed its stdin stream when we tried to send a WorkRequest" + [ -n "$error_msgs" ] \ + || fail "expected error message not found" } function test_worker_restarts_when_worker_binary_changes() {