Skip to content

Commit

Permalink
Remote: Use parameters instead of thread-local storage to provide tra…
Browse files Browse the repository at this point in the history
…cing metadata. (Part 4)

Change RemoteCacheClient#upload{File,Blob} to use RemoteActionExecutionContext.

PiperOrigin-RevId: 354472775
  • Loading branch information
Googler authored and copybara-github committed Jan 29, 2021
1 parent db69c9f commit 37ee252
Show file tree
Hide file tree
Showing 25 changed files with 375 additions and 273 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.google.devtools.build.lib.remote;

import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.RequestMetadata;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
Expand All @@ -29,7 +30,11 @@
import com.google.devtools.build.lib.buildeventstream.PathConverter;
import com.google.devtools.build.lib.collect.ImmutableIterable;
import com.google.devtools.build.lib.remote.common.MissingDigestsFinder;
import com.google.devtools.build.lib.remote.common.NetworkTime;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.vfs.Path;
import io.grpc.Context;
import io.netty.util.AbstractReferenceCounted;
Expand All @@ -50,7 +55,8 @@ class ByteStreamBuildEventArtifactUploader extends AbstractReferenceCounted
implements BuildEventArtifactUploader {

private final ListeningExecutorService uploadExecutor;
private final Context ctx;
private final String buildRequestId;
private final String commandId;
private final ByteStreamUploader uploader;
private final String remoteServerInstanceName;
private final MissingDigestsFinder missingDigestsFinder;
Expand All @@ -61,15 +67,17 @@ class ByteStreamBuildEventArtifactUploader extends AbstractReferenceCounted
ByteStreamUploader uploader,
MissingDigestsFinder missingDigestsFinder,
String remoteServerName,
Context ctx,
String buildRequestId,
String commandId,
@Nullable String remoteInstanceName,
int maxUploadThreads) {
this.uploader = Preconditions.checkNotNull(uploader);
String remoteServerInstanceName = Preconditions.checkNotNull(remoteServerName);
if (!Strings.isNullOrEmpty(remoteInstanceName)) {
remoteServerInstanceName += "/" + remoteInstanceName;
}
this.ctx = ctx;
this.buildRequestId = buildRequestId;
this.commandId = commandId;
this.remoteServerInstanceName = remoteServerInstanceName;
// Limit the maximum threads number to 1000 (chosen arbitrarily)
this.uploadExecutor =
Expand Down Expand Up @@ -153,6 +161,8 @@ private static List<PathMetadata> processQueryResult(
*/
private ListenableFuture<ImmutableIterable<PathMetadata>> queryRemoteCache(
ImmutableList<ListenableFuture<PathMetadata>> allPaths) throws Exception {
Context ctx = TracingMetadataUtils.contextWithMetadata(buildRequestId, commandId, "bes-upload");

List<PathMetadata> knownRemotePaths = new ArrayList<>(allPaths.size());
List<PathMetadata> filesToQuery = new ArrayList<>();
Set<Digest> digestsToQuery = new HashSet<>();
Expand Down Expand Up @@ -185,19 +195,20 @@ private ListenableFuture<ImmutableIterable<PathMetadata>> queryRemoteCache(
*/
private ListenableFuture<List<PathMetadata>> uploadLocalFiles(
ImmutableIterable<PathMetadata> allPaths) {
RequestMetadata metadata =
TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "bes-upload");
RemoteActionExecutionContext context =
new RemoteActionExecutionContextImpl(metadata, new NetworkTime());

ImmutableList.Builder<ListenableFuture<PathMetadata>> allPathsUploaded =
ImmutableList.builder();
for (PathMetadata path : allPaths) {
if (!path.isRemote() && !path.isDirectory()) {
Chunker chunker =
Chunker.builder().setInput(path.getDigest().getSizeBytes(), path.getPath()).build();
final ListenableFuture<Void> upload;
Context prevCtx = ctx.attach();
try {
upload = uploader.uploadBlobAsync(path.getDigest(), chunker, /* forceUpload=*/ false);
} finally {
ctx.detach(prevCtx);
}
upload =
uploader.uploadBlobAsync(context, path.getDigest(), chunker, /* forceUpload= */ false);
allPathsUploaded.add(Futures.transform(upload, unused -> path, uploadExecutor));
} else {
allPathsUploaded.add(Futures.immediateFuture(path));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.runtime.BuildEventArtifactUploaderFactory;
import com.google.devtools.build.lib.runtime.CommandEnvironment;
import io.grpc.Context;
import javax.annotation.Nullable;

/**
Expand All @@ -29,20 +28,23 @@ class ByteStreamBuildEventArtifactUploaderFactory implements

private final ByteStreamUploader uploader;
private final String remoteServerName;
private final Context ctx;
private final String buildRequestId;
private final String commandId;
private final MissingDigestsFinder missingDigestsFinder;
@Nullable private final String remoteInstanceName;

ByteStreamBuildEventArtifactUploaderFactory(
ByteStreamUploader uploader,
MissingDigestsFinder missingDigestsFinder,
String remoteServerName,
Context ctx,
String buildRequestId,
String commandId,
@Nullable String remoteInstanceName) {
this.uploader = uploader;
this.missingDigestsFinder = missingDigestsFinder;
this.remoteServerName = remoteServerName;
this.ctx = ctx;
this.buildRequestId = buildRequestId;
this.commandId = commandId;
this.remoteInstanceName = remoteInstanceName;
}

Expand All @@ -52,7 +54,8 @@ public BuildEventArtifactUploader create(CommandEnvironment env) {
uploader.retain(),
missingDigestsFinder,
remoteServerName,
ctx,
buildRequestId,
commandId,
remoteInstanceName,
env.getOptions().getOptions(RemoteOptions.class).buildEventUploadMaxThreads);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.authandtls.CallCredentialsProvider;
import com.google.devtools.build.lib.remote.RemoteRetrier.ProgressiveBackoff;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.remote.util.Utils;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.Status.Code;
Expand Down Expand Up @@ -134,9 +134,10 @@ public ByteStreamUploader(
* uploaded, if {@code true} the blob is uploaded.
* @throws IOException when reading of the {@link Chunker}s input source fails
*/
public void uploadBlob(HashCode hash, Chunker chunker, boolean forceUpload)
public void uploadBlob(
RemoteActionExecutionContext context, HashCode hash, Chunker chunker, boolean forceUpload)
throws IOException, InterruptedException {
uploadBlobs(singletonMap(hash, chunker), forceUpload);
uploadBlobs(context, singletonMap(hash, chunker), forceUpload);
}

/**
Expand All @@ -156,12 +157,14 @@ public void uploadBlob(HashCode hash, Chunker chunker, boolean forceUpload)
* uploaded, if {@code true} the blob is uploaded.
* @throws IOException when reading of the {@link Chunker}s input source or uploading fails
*/
public void uploadBlobs(Map<HashCode, Chunker> chunkers, boolean forceUpload)
public void uploadBlobs(
RemoteActionExecutionContext context, Map<HashCode, Chunker> chunkers, boolean forceUpload)
throws IOException, InterruptedException {
List<ListenableFuture<Void>> uploads = new ArrayList<>();

for (Map.Entry<HashCode, Chunker> chunkerEntry : chunkers.entrySet()) {
uploads.add(uploadBlobAsync(chunkerEntry.getKey(), chunkerEntry.getValue(), forceUpload));
uploads.add(
uploadBlobAsync(context, chunkerEntry.getKey(), chunkerEntry.getValue(), forceUpload));
}

try {
Expand Down Expand Up @@ -200,14 +203,17 @@ void shutdown() {
}
}

/** @deprecated Use {@link #uploadBlobAsync(Digest, Chunker, boolean)} instead. */
/**
* @deprecated Use {@link #uploadBlobAsync(RemoteActionExecutionContext, Digest, Chunker,
* boolean)} instead.
*/
@Deprecated
@VisibleForTesting
public ListenableFuture<Void> uploadBlobAsync(
HashCode hash, Chunker chunker, boolean forceUpload) {
RemoteActionExecutionContext context, HashCode hash, Chunker chunker, boolean forceUpload) {
Digest digest =
Digest.newBuilder().setHash(hash.toString()).setSizeBytes(chunker.getSize()).build();
return uploadBlobAsync(digest, chunker, forceUpload);
return uploadBlobAsync(context, digest, chunker, forceUpload);
}

/**
Expand All @@ -227,7 +233,7 @@ public ListenableFuture<Void> uploadBlobAsync(
* @throws IOException when reading of the {@link Chunker}s input source fails
*/
public ListenableFuture<Void> uploadBlobAsync(
Digest digest, Chunker chunker, boolean forceUpload) {
RemoteActionExecutionContext context, Digest digest, Chunker chunker, boolean forceUpload) {
synchronized (lock) {
checkState(!isShutdown, "Must not call uploadBlobs after shutdown.");

Expand All @@ -242,7 +248,7 @@ public ListenableFuture<Void> uploadBlobAsync(

ListenableFuture<Void> uploadResult =
Futures.transform(
startAsyncUpload(digest, chunker),
startAsyncUpload(context, digest, chunker),
(v) -> {
synchronized (lock) {
uploadedBlobs.add(HashCode.fromString(digest.getHash()));
Expand Down Expand Up @@ -294,7 +300,8 @@ private static String buildUploadResourceName(String instanceName, UUID uuid, Di
}

/** Starts a file upload and returns a future representing the upload. */
private ListenableFuture<Void> startAsyncUpload(Digest digest, Chunker chunker) {
private ListenableFuture<Void> startAsyncUpload(
RemoteActionExecutionContext context, Digest digest, Chunker chunker) {
try {
chunker.reset();
} catch (IOException e) {
Expand All @@ -313,7 +320,13 @@ private ListenableFuture<Void> startAsyncUpload(Digest digest, Chunker chunker)
String resourceName = buildUploadResourceName(instanceName, uploadId, digest);
AsyncUpload newUpload =
new AsyncUpload(
channel, callCredentialsProvider, callTimeoutSecs, retrier, resourceName, chunker);
context,
channel,
callCredentialsProvider,
callTimeoutSecs,
retrier,
resourceName,
chunker);
ListenableFuture<Void> currUpload = newUpload.start();
currUpload.addListener(
() -> {
Expand Down Expand Up @@ -348,6 +361,7 @@ public ReferenceCounted touch(Object o) {

private static class AsyncUpload {

private final RemoteActionExecutionContext context;
private final Channel channel;
private final CallCredentialsProvider callCredentialsProvider;
private final long callTimeoutSecs;
Expand All @@ -358,12 +372,14 @@ private static class AsyncUpload {
private ClientCall<WriteRequest, WriteResponse> call;

AsyncUpload(
RemoteActionExecutionContext context,
Channel channel,
CallCredentialsProvider callCredentialsProvider,
long callTimeoutSecs,
Retrier retrier,
String resourceName,
Chunker chunker) {
this.context = context;
this.channel = channel;
this.callCredentialsProvider = callCredentialsProvider;
this.callTimeoutSecs = callTimeoutSecs;
Expand All @@ -373,7 +389,6 @@ private static class AsyncUpload {
}

ListenableFuture<Void> start() {
Context ctx = Context.current();
ProgressiveBackoff progressiveBackoff = new ProgressiveBackoff(retrier::newBackoff);
AtomicLong committedOffset = new AtomicLong(0);

Expand All @@ -383,8 +398,7 @@ ListenableFuture<Void> start() {
retrier.executeAsync(
() -> {
if (committedOffset.get() < chunker.getSize()) {
return ctx.call(
() -> callAndQueryOnFailure(committedOffset, progressiveBackoff));
return callAndQueryOnFailure(committedOffset, progressiveBackoff);
}
return Futures.immediateFuture(null);
},
Expand All @@ -409,7 +423,8 @@ ListenableFuture<Void> start() {

private ByteStreamFutureStub bsFutureStub() {
return ByteStreamGrpc.newFutureStub(channel)
.withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
.withInterceptors(
TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata()))
.withCallCredentials(callCredentialsProvider.getCallCredentials())
.withDeadlineAfter(callTimeoutSecs, SECONDS);
}
Expand All @@ -420,7 +435,7 @@ private ListenableFuture<Void> callAndQueryOnFailure(
call(committedOffset),
Exception.class,
(e) -> guardQueryWithSuppression(e, committedOffset, progressiveBackoff),
Context.current().fixedContextExecutor(MoreExecutors.directExecutor()));
MoreExecutors.directExecutor());
}

private ListenableFuture<Void> guardQueryWithSuppression(
Expand Down Expand Up @@ -584,7 +599,9 @@ public void onReady() {
}
}
};
call.start(callListener, TracingMetadataUtils.headersFromCurrentContext());
call.start(
callListener,
TracingMetadataUtils.headersFromRequestMetadata(context.getRequestMetadata()));
call.request(1);
return uploadResult;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,16 +393,22 @@ public void onCompleted() {
}

@Override
public ListenableFuture<Void> uploadFile(Digest digest, Path path) {
public ListenableFuture<Void> uploadFile(
RemoteActionExecutionContext context, Digest digest, Path path) {
return uploader.uploadBlobAsync(
context,
digest,
Chunker.builder().setInput(digest.getSizeBytes(), path).build(),
/* forceUpload= */ true);
}

@Override
public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
public ListenableFuture<Void> uploadBlob(
RemoteActionExecutionContext context, Digest digest, ByteString data) {
return uploader.uploadBlobAsync(
digest, Chunker.builder().setInput(data.toByteArray()).build(), /* forceUpload= */ true);
context,
digest,
Chunker.builder().setInput(data.toByteArray()).build(),
/* forceUpload= */ true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public ActionResult upload(
int exitCode)
throws ExecException, IOException, InterruptedException {
ActionResult.Builder resultBuilder = ActionResult.newBuilder();
uploadOutputs(execRoot, actionKey, action, command, outputs, outErr, resultBuilder);
uploadOutputs(context, execRoot, actionKey, action, command, outputs, outErr, resultBuilder);
resultBuilder.setExitCode(exitCode);
ActionResult result = resultBuilder.build();
if (exitCode == 0 && !action.getDoNotCache()) {
Expand All @@ -162,6 +162,7 @@ public ActionResult upload(
}

private void uploadOutputs(
RemoteActionExecutionContext context,
Path execRoot,
ActionKey actionKey,
Action action,
Expand Down Expand Up @@ -192,14 +193,14 @@ private void uploadOutputs(
for (Digest digest : digestsToUpload) {
Path file = digestToFile.get(digest);
if (file != null) {
uploads.add(cacheProtocol.uploadFile(digest, file));
uploads.add(cacheProtocol.uploadFile(context, digest, file));
} else {
ByteString blob = digestToBlobs.get(digest);
if (blob == null) {
String message = "FindMissingBlobs call returned an unknown digest: " + digest;
throw new IOException(message);
}
uploads.add(cacheProtocol.uploadBlob(digest, blob));
uploads.add(cacheProtocol.uploadBlob(context, digest, blob));
}
}

Expand Down
Loading

0 comments on commit 37ee252

Please sign in to comment.