From 35c98d07b21785efae57a7c4230cc1e452f74fd2 Mon Sep 17 00:00:00 2001 From: Lars Clausen Date: Wed, 28 Jul 2021 12:11:15 +0200 Subject: [PATCH] Revert "Let workers finish lost races without delaying dynamic execution." This reverts commit 6080c1e07f4229ea72eacd04faa9302e44955a84. --- .../build/lib/worker/SingleplexWorker.java | 24 +++++++++--- .../build/lib/worker/WorkerSpawnRunner.java | 39 ------------------- .../build/lib/worker/ExampleWorker.java | 20 ---------- .../lib/worker/ExampleWorkerOptions.java | 8 ---- .../devtools/build/lib/worker/WorkerTest.java | 5 +++ 5 files changed, 23 insertions(+), 73 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java b/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java index 3edce2c3227b6f..dbe0c494db3f11 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java +++ b/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java @@ -120,12 +120,24 @@ void putRequest(WorkRequest request) throws IOException { @Override WorkResponse getResponse(int requestId) throws IOException, InterruptedException { recordingInputStream.startRecording(4096); - while (recordingInputStream.available() == 0) { - Thread.sleep(10); - if (!process.isAlive()) { - throw new IOException( - String.format( - "Worker process for %s died while waiting for response", workerKey.getMnemonic())); + // Ironically, we don't allow interrupts during dynamic execution, since we can't cancel + // the worker short of destroying it. + if (!workerKey.isSpeculative()) { + while (recordingInputStream.available() == 0) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + // This should only happen when not in dynamic execution, so we can safely kill the + // worker. + destroy(); + throw e; + } + if (!process.isAlive()) { + throw new IOException( + String.format( + "Worker process for %s died while waiting for response", + workerKey.getMnemonic())); + } } } return workerProtocol.getResponse(); diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java index d144e476f362dd..86798ec46f86c8 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java @@ -457,10 +457,6 @@ WorkResponse execInWorker( try { response = worker.getResponse(request.getRequestId()); - } catch (InterruptedException e) { - finishWorkAsync(key, worker, request); - worker = null; - throw e; } catch (IOException e) { restoreInterrupt(e); // If protobuf or json reader couldn't parse the response, try to print whatever the @@ -518,41 +514,6 @@ WorkResponse execInWorker( return response; } - /** - * Starts a thread to collect the response from a worker when it's no longer of interest. - * - *

This can happen either when we lost the race in dynamic execution or the build got - * interrupted. This takes ownership of the worker for purposes of returning it to the worker - * pool. - */ - private void finishWorkAsync(WorkerKey key, Worker worker, WorkRequest request) { - Thread reaper = - new Thread( - () -> { - Worker w = worker; - try { - w.getResponse(request.getRequestId()); - } catch (IOException | InterruptedException e1) { - // If this happens, we either can't trust the output of the worker, or we got - // interrupted while handling being interrupted. In the latter case, let's stop - // trying and just destroy the worker. If it's a singleplex worker, there will - // be a dangling response that we don't want to keep trying to read, so we destroy - // the worker. - try { - workers.invalidateObject(key, w); - w = null; - } catch (IOException | InterruptedException e2) { - // The reaper thread can't do anything useful about this. - } - } finally { - if (w != null) { - workers.returnObject(key, w); - } - } - }); - reaper.start(); - } - private static void restoreInterrupt(IOException e) { if (e instanceof InterruptedIOException) { Thread.currentThread().interrupt(); diff --git a/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java index 353efa617138cb..471218c060dbda 100644 --- a/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java +++ b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java @@ -35,11 +35,8 @@ import java.util.Map; import java.util.Random; import java.util.UUID; -import java.util.concurrent.Semaphore; import java.util.regex.Matcher; import java.util.regex.Pattern; -import sun.misc.Signal; -import sun.misc.SignalHandler; /** An example implementation of a worker process that is used for integration tests. */ public class ExampleWorker { @@ -82,23 +79,6 @@ private static void runPersistentWorker(ExampleWorkerOptions workerOptions) thro PrintStream originalStdOut = System.out; PrintStream originalStdErr = System.err; - if (workerOptions.waitForSignal) { - Semaphore signalSem = new Semaphore(0); - Signal.handle( - new Signal("HUP"), - new SignalHandler() { - @Override - public void handle(Signal sig) { - signalSem.release(); - } - }); - try { - signalSem.acquire(); - } catch (InterruptedException e) { - System.out.println("Interrupted while waiting for signal"); - e.printStackTrace(); - } - } ExampleWorkerProtocol workerProtocol = null; switch (workerOptions.workerProtocol) { case JSON: diff --git a/src/test/java/com/google/devtools/build/lib/worker/ExampleWorkerOptions.java b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorkerOptions.java index 440717916a3fd4..40d6faa5811b4a 100644 --- a/src/test/java/com/google/devtools/build/lib/worker/ExampleWorkerOptions.java +++ b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorkerOptions.java @@ -135,14 +135,6 @@ public static class ExampleWorkOptions extends OptionsBase { ) public boolean hardPoison; - @Option( - name = "wait_for_signal", - documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, - effectTags = {OptionEffectTag.NO_OP}, - defaultValue = "false", - help = "Don't send a response until receiving a SIGXXXX.") - public boolean waitForSignal; - /** Enum converter for --worker_protocol. */ public static class WorkerProtocolEnumConverter extends EnumConverter { diff --git a/src/test/java/com/google/devtools/build/lib/worker/WorkerTest.java b/src/test/java/com/google/devtools/build/lib/worker/WorkerTest.java index a7f22586712e97..f7537692c50a18 100644 --- a/src/test/java/com/google/devtools/build/lib/worker/WorkerTest.java +++ b/src/test/java/com/google/devtools/build/lib/worker/WorkerTest.java @@ -174,6 +174,11 @@ private void verifyGetResponseFailure(String responseString, String expectedErro assertThat(ex).hasMessageThat().contains(expectedError); } + @Test + public void testGetResponse_json_emptyString_throws() throws IOException { + verifyGetResponseFailure("", "Could not parse json work request correctly"); + } + @Test public void testGetResponse_badJson_throws() throws IOException { verifyGetResponseFailure(