Skip to content

Commit

Permalink
Support for cancellation in WorkRequestHandler.
Browse files Browse the repository at this point in the history
To actually use cancellation, a worker implementation will still have to implement a cancellation callback that actually cancels and add `supports-worker-cancellation = 1` to the execution requirements, and then the build must run with `--experimental_worker_cancellation`.

Cancellation design doc: https://docs.google.com/document/d/1-h4gcBV8Jn6DK9G_e23kZQ159jmX__uckhub1Gv9dzc

RELNOTES: None.
PiperOrigin-RevId: 373749452
  • Loading branch information
larsrc-google committed Jul 30, 2021
1 parent c366d30 commit a698bef
Show file tree
Hide file tree
Showing 5 changed files with 473 additions and 65 deletions.
45 changes: 45 additions & 0 deletions site/docs/creating-workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,51 @@ id, so the request id must be specified if it is nonzero. This is a valid
}
```

A `request_id` of 0 indicates a "singleplex" request, i.e. this request cannot
be processed in parallel with other requests. The server guarantees that a
given worker receives requests with either only `request_id` 0 or only
`request_id` greater than zero. Singleplex requests are sent in serial, i.e. the
server doesn't send another request until it has received a response (except
for cancel requests, see below).

**Notes**

* Each protocol buffer is preceded by its length in `varint` format (see
[`MessageLite.writeDelimitedTo()`](https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/MessageLite.html#writeDelimitedTo-java.io.OutputStream-).
* JSON requests and responses are not preceded by a size indicator.
* JSON requests uphold the same structure as the protobuf, but use standard
JSON.
* Bazel stores requests as protobufs and converts them to JSON using
[protobuf's JSON format](https://cs.opensource.google/protobuf/protobuf/+/master:java/util/src/main/java/com/google/protobuf/util/JsonFormat.java)

### Cancellation

Workers can optionally allow work requests to be cancelled before they finish.
This is particularly useful in connection with dynamic execution, where local
execution can regularly be interrupted by a faster remote execution. To allow
cancellation, add `supports-worker-cancellation: 1` to the
`execution-requirements` field (see below) and set the
`--experimental_worker_cancellation` flag.

A **cancel request** is a `WorkRequest` with the `cancel` field set (and
similarly a **cancel response** is a `WorkResponse` with the `was_cancelled`
field set). The only other field that must be in a cancel request or cancel
response is `request_id`, indicating which
request to cancel. The `request_id` field will be 0 for singleplex workers
or the non-0 `request_id` of a previously sent `WorkRequest` for multiplex
workers. The server may send cancel requests for requests that the worker has
already responded to, in which case the cancel request must be ignored.

Each non-cancel `WorkRequest` message must be answered exactly once, whether
or not it was cancelled. Once the server has sent a cancel request, the worker
may respond with a `WorkResponse` with the `request_id` set
and the `was_cancelled` field set to true. Sending a regular `WorkResponse`
is also accepted, but the `output` and `exit_code` fields will be ignored.

Once a response has been sent for a `WorkRequest`, the worker must not touch
the files in its working directory. The server is free to clean up the files,
including temporary files.

## Making the rule that uses the worker

You'll also need to create a rule that generates actions to be performed by the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.
package com.google.devtools.build.lib.worker;


import com.google.common.annotations.VisibleForTesting;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
Expand All @@ -24,13 +23,12 @@
import java.io.StringWriter;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;

/**
Expand All @@ -56,13 +54,31 @@ public interface WorkerMessageProcessor {

/** Holds information necessary to properly handle a request, especially for cancellation. */
static class RequestInfo {
/** The thread handling the request. */
final Thread thread;
/** If true, we have received a cancel request for this request. */
private boolean cancelled;
/**
* The builder for the response to this request. Since only one response must be sent per
* request, this builder must be accessed through takeBuilder(), which zeroes this field and
* returns the builder.
*/
private WorkResponse.Builder responseBuilder = WorkResponse.newBuilder();

RequestInfo(Thread thread) {
this.thread = thread;
}

/** Sets whether this request has been cancelled. */
void setCancelled() {
cancelled = true;
}

/** Returns true if this request has been cancelled. */
boolean isCancelled() {
return cancelled;
}

/**
* Returns the response builder. If called more than once on the same instance, subsequent calls
* will return {@code null}.
Expand All @@ -72,13 +88,22 @@ synchronized Optional<WorkResponse.Builder> takeBuilder() {
responseBuilder = null;
return Optional.ofNullable(b);
}

/**
* Adds {@code s} as output to when the response eventually gets built. Does nothing if the
* response has already been taken. There is no guarantee that the response hasn't already been
* taken, making this call a no-op. This may be called multiple times. No delimiters are added
* between strings from multiple calls.
*/
synchronized void addOutput(String s) {
if (responseBuilder != null) {
responseBuilder.setOutput(responseBuilder.getOutput() + s);
}
}
}

/** Requests that are currently being processed. Visible for testing. */
final Map<Integer, RequestInfo> activeRequests = new ConcurrentHashMap<>();

/** WorkRequests that have been received but could not be processed yet. */
private final Queue<WorkRequest> availableRequests = new ArrayDeque<>();
final ConcurrentMap<Integer, RequestInfo> activeRequests = new ConcurrentHashMap<>();

/** The function to be called after each {@link WorkRequest} is read. */
private final BiFunction<List<String>, PrintWriter, Integer> callback;
Expand All @@ -88,6 +113,7 @@ synchronized Optional<WorkResponse.Builder> takeBuilder() {

final WorkerMessageProcessor messageProcessor;

private final BiConsumer<Integer, Thread> cancelCallback;

private final CpuTimeBasedGcScheduler gcScheduler;

Expand All @@ -107,7 +133,7 @@ public WorkRequestHandler(
BiFunction<List<String>, PrintWriter, Integer> callback,
PrintStream stderr,
WorkerMessageProcessor messageProcessor) {
this(callback, stderr, messageProcessor, Duration.ZERO);
this(callback, stderr, messageProcessor, Duration.ZERO, null);
}

/**
Expand All @@ -131,10 +157,24 @@ public WorkRequestHandler(
PrintStream stderr,
WorkerMessageProcessor messageProcessor,
Duration cpuUsageBeforeGc) {
this(callback, stderr, messageProcessor, cpuUsageBeforeGc, null);
}

/**
* Creates a {@code WorkRequestHandler} that will call {@code callback} for each WorkRequest
* received. Only used for the Builder.
*/
private WorkRequestHandler(
BiFunction<List<String>, PrintWriter, Integer> callback,
PrintStream stderr,
WorkerMessageProcessor messageProcessor,
Duration cpuUsageBeforeGc,
BiConsumer<Integer, Thread> cancelCallback) {
this.callback = callback;
this.stderr = stderr;
this.messageProcessor = messageProcessor;
this.gcScheduler = new CpuTimeBasedGcScheduler(cpuUsageBeforeGc);
this.cancelCallback = cancelCallback;
}

/** Builder class for WorkRequestHandler. Required parameters are passed to the constructor. */
Expand All @@ -143,6 +183,7 @@ public static class WorkRequestHandlerBuilder {
private final PrintStream stderr;
private final WorkerMessageProcessor messageProcessor;
private Duration cpuUsageBeforeGc = Duration.ZERO;
private BiConsumer<Integer, Thread> cancelCallback;

/**
* Creates a {@code WorkRequestHandlerBuilder}.
Expand Down Expand Up @@ -173,9 +214,19 @@ public WorkRequestHandlerBuilder setCpuUsageBeforeGc(Duration cpuUsageBeforeGc)
return this;
}

/**
* Sets a callback will be called when a cancellation message has been received. The callback
* will be call with the request ID and the thread executing the request.
*/
public WorkRequestHandlerBuilder setCancelCallback(BiConsumer<Integer, Thread> cancelCallback) {
this.cancelCallback = cancelCallback;
return this;
}

/** Returns a WorkRequestHandler instance with the values in this Builder. */
public WorkRequestHandler build() {
return new WorkRequestHandler(callback, stderr, messageProcessor, cpuUsageBeforeGc);
return new WorkRequestHandler(
callback, stderr, messageProcessor, cpuUsageBeforeGc, cancelCallback);
}
}

Expand All @@ -191,56 +242,42 @@ public void processRequests() throws IOException {
if (request == null) {
break;
}
availableRequests.add(request);
startRequestThreads();
}
}

/**
* Starts threads for as many outstanding requests as possible. This is the only method that adds
* to {@code activeRequests}.
*/
private synchronized void startRequestThreads() {
while (!availableRequests.isEmpty()) {
// If there's a singleplex request in process, don't start more processes.
if (activeRequests.containsKey(0)) {
return;
if (request.getCancel()) {
respondToCancelRequest(request);
} else {
startResponseThread(request);
}
WorkRequest request = availableRequests.peek();
// Don't start new singleplex requests if there are other requests running.
if (request.getRequestId() == 0 && !activeRequests.isEmpty()) {
return;
}
availableRequests.remove();
Thread t = createResponseThread(request);
activeRequests.put(request.getRequestId(), new RequestInfo());
t.start();
}
}

/** Creates a new {@link Thread} to process a multiplex request. */
Thread createResponseThread(WorkRequest request) {
/** Starts a thread for the given request. */
void startResponseThread(WorkRequest request) {
Thread currentThread = Thread.currentThread();
String threadName =
request.getRequestId() > 0
? "multiplex-request-" + request.getRequestId()
: "singleplex-request";
return new Thread(
() -> {
RequestInfo requestInfo = activeRequests.get(request.getRequestId());
try {
respondToRequest(request, requestInfo);
} catch (IOException e) {
e.printStackTrace(stderr);
// In case of error, shut down the entire worker.
currentThread.interrupt();
} finally {
activeRequests.remove(request.getRequestId());
// A good time to start more requests, especially if we finished a singleplex request
startRequestThreads();
}
},
threadName);
Thread t =
new Thread(
() -> {
RequestInfo requestInfo = activeRequests.get(request.getRequestId());
if (requestInfo == null) {
// Already cancelled
return;
}
try {
respondToRequest(request, requestInfo);
} catch (IOException e) {
e.printStackTrace(stderr);
// In case of error, shut down the entire worker.
currentThread.interrupt();
} finally {
activeRequests.remove(request.getRequestId());
}
},
threadName);
activeRequests.put(request.getRequestId(), new RequestInfo(t));
t.start();
}

/** Handles and responds to the given {@link WorkRequest}. */
Expand All @@ -260,7 +297,11 @@ void respondToRequest(WorkRequest request, RequestInfo requestInfo) throws IOExc
if (optBuilder.isPresent()) {
WorkResponse.Builder builder = optBuilder.get();
builder.setRequestId(request.getRequestId());
builder.setOutput(builder.getOutput() + sw.toString()).setExitCode(exitCode);
if (requestInfo.isCancelled()) {
builder.setWasCancelled(true);
} else {
builder.setOutput(builder.getOutput() + sw).setExitCode(exitCode);
}
WorkResponse response = builder.build();
synchronized (this) {
messageProcessor.writeWorkResponse(response);
Expand All @@ -270,6 +311,45 @@ void respondToRequest(WorkRequest request, RequestInfo requestInfo) throws IOExc
}
}

/**
* Handles cancelling an existing request, including sending a response if that is not done by the
* time {@code cancelCallback.accept} returns.
*/
void respondToCancelRequest(WorkRequest request) throws IOException {
// Theoretically, we could have gotten two singleplex requests, and we can't tell those apart.
// However, that's a violation of the protocol, so we don't try to handle it (not least because
// handling it would be quite error-prone).
RequestInfo ri = activeRequests.remove(request.getRequestId());

if (ri == null) {
return;
}
if (cancelCallback == null) {
ri.setCancelled();
// This is either an error on the server side or a version mismatch between the server setup
// and the binary. It's better to wait for the regular work to finish instead of breaking the
// build, but we should inform the user about the bad setup.
ri.addOutput(
String.format(
"Cancellation request received for worker request %d, but this worker does not"
+ " support cancellation.\n",
request.getRequestId()));
} else {
if (ri.thread.isAlive() && !ri.isCancelled()) {
ri.setCancelled();
cancelCallback.accept(request.getRequestId(), ri.thread);
Optional<WorkResponse.Builder> builder = ri.takeBuilder();
if (builder.isPresent()) {
WorkResponse response =
builder.get().setWasCancelled(true).setRequestId(request.getRequestId()).build();
synchronized (this) {
messageProcessor.writeWorkResponse(response);
}
}
}
}
}

@Override
public void close() throws IOException {
messageProcessor.close();
Expand Down
11 changes: 6 additions & 5 deletions src/main/protobuf/worker_protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ message WorkRequest {

// Each WorkRequest must have either a unique
// request_id or request_id = 0. If request_id is 0, this WorkRequest must be
// processed alone, otherwise the worker may process multiple WorkRequests in
// parallel (multiplexing). As an exception to the above, if the cancel field
// is true, the request_id must be the same as a previously sent WorkRequest.
// The request_id must be attached unchanged to the corresponding
// WorkResponse.
// processed alone (singleplex), otherwise the worker may process multiple
// WorkRequests in parallel (multiplexing). As an exception to the above, if
// the cancel field is true, the request_id must be the same as a previously
// sent WorkRequest. The request_id must be attached unchanged to the
// corresponding WorkResponse. Only one singleplex request may be sent to a
// worker at a time.
int32 request_id = 3;

// EXPERIMENTAL: When true, this is a cancel request, indicating that a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,10 @@ public void processRequests() throws IOException {
if (poisoned && workerOptions.hardPoison) {
throw new IllegalStateException("I'm a very poisoned worker and will just crash.");
}
if (request.getRequestId() != 0) {
Thread t = createResponseThread(request);
t.start();
if (request.getCancel()) {
respondToCancelRequest(request);
} else {
respondToRequest(request, new RequestInfo());
startResponseThread(request);
}
if (workerOptions.exitAfter > 0 && workUnitCounter > workerOptions.exitAfter) {
System.exit(0);
Expand Down
Loading

0 comments on commit a698bef

Please sign in to comment.