Skip to content

Commit

Permalink
Revert "Let workers finish lost races without delaying dynamic execut…
Browse files Browse the repository at this point in the history
…ion."

This reverts commit 6080c1e.
  • Loading branch information
larsrc-google committed Jul 30, 2021
1 parent 988b56f commit 35c98d0
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,24 @@ void putRequest(WorkRequest request) throws IOException {
@Override
WorkResponse getResponse(int requestId) throws IOException, InterruptedException {
recordingInputStream.startRecording(4096);
while (recordingInputStream.available() == 0) {
Thread.sleep(10);
if (!process.isAlive()) {
throw new IOException(
String.format(
"Worker process for %s died while waiting for response", workerKey.getMnemonic()));
// Ironically, we don't allow interrupts during dynamic execution, since we can't cancel
// the worker short of destroying it.
if (!workerKey.isSpeculative()) {
while (recordingInputStream.available() == 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// This should only happen when not in dynamic execution, so we can safely kill the
// worker.
destroy();
throw e;
}
if (!process.isAlive()) {
throw new IOException(
String.format(
"Worker process for %s died while waiting for response",
workerKey.getMnemonic()));
}
}
}
return workerProtocol.getResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,6 @@ WorkResponse execInWorker(

try {
response = worker.getResponse(request.getRequestId());
} catch (InterruptedException e) {
finishWorkAsync(key, worker, request);
worker = null;
throw e;
} catch (IOException e) {
restoreInterrupt(e);
// If protobuf or json reader couldn't parse the response, try to print whatever the
Expand Down Expand Up @@ -518,41 +514,6 @@ WorkResponse execInWorker(
return response;
}

/**
* Starts a thread to collect the response from a worker when it's no longer of interest.
*
* <p>This can happen either when we lost the race in dynamic execution or the build got
* interrupted. This takes ownership of the worker for purposes of returning it to the worker
* pool.
*/
private void finishWorkAsync(WorkerKey key, Worker worker, WorkRequest request) {
Thread reaper =
new Thread(
() -> {
Worker w = worker;
try {
w.getResponse(request.getRequestId());
} catch (IOException | InterruptedException e1) {
// If this happens, we either can't trust the output of the worker, or we got
// interrupted while handling being interrupted. In the latter case, let's stop
// trying and just destroy the worker. If it's a singleplex worker, there will
// be a dangling response that we don't want to keep trying to read, so we destroy
// the worker.
try {
workers.invalidateObject(key, w);
w = null;
} catch (IOException | InterruptedException e2) {
// The reaper thread can't do anything useful about this.
}
} finally {
if (w != null) {
workers.returnObject(key, w);
}
}
});
reaper.start();
}

private static void restoreInterrupt(IOException e) {
if (e instanceof InterruptedIOException) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,8 @@
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Semaphore;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import sun.misc.Signal;
import sun.misc.SignalHandler;

/** An example implementation of a worker process that is used for integration tests. */
public class ExampleWorker {
Expand Down Expand Up @@ -82,23 +79,6 @@ private static void runPersistentWorker(ExampleWorkerOptions workerOptions) thro
PrintStream originalStdOut = System.out;
PrintStream originalStdErr = System.err;

if (workerOptions.waitForSignal) {
Semaphore signalSem = new Semaphore(0);
Signal.handle(
new Signal("HUP"),
new SignalHandler() {
@Override
public void handle(Signal sig) {
signalSem.release();
}
});
try {
signalSem.acquire();
} catch (InterruptedException e) {
System.out.println("Interrupted while waiting for signal");
e.printStackTrace();
}
}
ExampleWorkerProtocol workerProtocol = null;
switch (workerOptions.workerProtocol) {
case JSON:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,6 @@ public static class ExampleWorkOptions extends OptionsBase {
)
public boolean hardPoison;

@Option(
name = "wait_for_signal",
documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
effectTags = {OptionEffectTag.NO_OP},
defaultValue = "false",
help = "Don't send a response until receiving a SIGXXXX.")
public boolean waitForSignal;

/** Enum converter for --worker_protocol. */
public static class WorkerProtocolEnumConverter
extends EnumConverter<ExecutionRequirements.WorkerProtocolFormat> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ private void verifyGetResponseFailure(String responseString, String expectedErro
assertThat(ex).hasMessageThat().contains(expectedError);
}

@Test
public void testGetResponse_json_emptyString_throws() throws IOException {
verifyGetResponseFailure("", "Could not parse json work request correctly");
}

@Test
public void testGetResponse_badJson_throws() throws IOException {
verifyGetResponseFailure(
Expand Down

0 comments on commit 35c98d0

Please sign in to comment.