Skip to content

Commit

Permalink
Attempt to fix cancellation crash in repo fetching w/ worker thread
Browse files Browse the repository at this point in the history
The stacktrace in #21478 suggests that the second `workerFuture.get()` (line 183 before) is snagging on a `CancellationException`. Closer inspection indicates that the exception handling in this entire block of code is just faulty -- one, `workerFuture.get()` on line 169 is very unlikely to throw an `InterruptedException` because this call happens after we've received a `DONE` from the signal queue, which is at the very end of the worker thread logic (in its own `finally` clause, actually); two, the second call to `workerFuture.get()` on line 183 doesn't actually do anything because `get()`-ing a cancelled future would just throw a `CancellationException` immediately.

This CL attempts to fix these two glaring errors. It now tries to handle interrupts where it's likely to happen, which is at the call to `state.signalQueue.take()` -- this is where the Skyframe thread spends the most time blocked, and where a Ctrl-C from the user is most likely to land. We catch an `InterruptedException` here and interrupt the worker thread. To wait for the worker thread to finish, we uninterruptibly take from the signal queue instead of calling `workerFuture.get()`.

Additionally, we now correctly handle the worker thread being interrupted by someone other the host Skyframe thread (the memory pressure handler, in all likelihood), by simply retrying the fetch instead of crashing Bazel.

Fixes #21478 (maybe...?)

PiperOrigin-RevId: 613348046
Change-Id: I692fa750cb8873f1bd403f16764d1845410a29f1
  • Loading branch information
Wyverald authored and copybara-github committed Mar 6, 2024
1 parent 4420fff commit fd769f0
Showing 1 changed file with 24 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Table;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.devtools.build.lib.analysis.BlazeDirectories;
import com.google.devtools.build.lib.analysis.RuleDefinition;
import com.google.devtools.build.lib.bazel.repository.RepositoryResolvedEvent;
Expand All @@ -38,6 +39,7 @@
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.repository.RepositoryFetchProgress;
import com.google.devtools.build.lib.rules.repository.NeedsSkyframeRestartException;
import com.google.devtools.build.lib.rules.repository.RepoRecordedInput;
import com.google.devtools.build.lib.rules.repository.RepositoryDirectoryValue;
Expand All @@ -56,6 +58,7 @@
import com.google.devtools.build.skyframe.SkyKey;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -161,7 +164,18 @@ public RepositoryDirectoryValue.Builder fetch(
// restart, and need to send over a fresh Environment.
state.delegateEnvQueue.put(env);
}
switch (state.signalQueue.take()) {
Signal signal;
try {
signal = state.signalQueue.take();
} catch (InterruptedException e) {
// This means that we caught a Ctrl-C. Make sure to close the state object to interrupt the
// worker thread, wait for it to finish, and then propagate the InterruptedException.
state.close();
signal = Uninterruptibles.takeUninterruptibly(state.signalQueue);
Thread.interrupted(); // clear the interrupted status
throw new InterruptedException();
}
switch (signal) {
case RESTART:
return null;
case DONE:
Expand All @@ -174,18 +188,15 @@ public RepositoryDirectoryValue.Builder fetch(
Throwables.throwIfUnchecked(e.getCause());
throw new IllegalStateException(
"unexpected exception type: " + e.getClass(), e.getCause());
} finally {
// Make sure we interrupt the worker thread if work on the Skyframe thread were cut short
// for any reason.
state.close();
try {
// Synchronously wait for the worker thread to finish any remaining work.
workerFuture.get();
} catch (ExecutionException e) {
// When this happens, we either already dealt with the exception (see `catch` clause
// above), or we're in the middle of propagating an InterruptedException in which case
// we don't care about the result of execution anyway.
}
} catch (CancellationException e) {
// This can only happen if the state object was invalidated due to memory pressure, in
// which case we can simply reattempt the fetch.
env.getListener()
.post(
RepositoryFetchProgress.ongoing(
RepositoryName.createUnvalidated(rule.getName()),
"fetch interrupted due to memory pressure; restarting."));
return fetch(rule, outputDirectory, directories, env, recordedInputValues, key);
}
}
// TODO(wyv): use a switch expression above instead and remove this.
Expand Down

0 comments on commit fd769f0

Please sign in to comment.