Skip to content

Commit

Permalink
Remote: Use parameters instead of thread-local storage to provide tra…
Browse files Browse the repository at this point in the history
…cing metadata. (Part 3)

Change RemoteCacheClient#downloadBlob to use RemoteActionExecutionContext.

PiperOrigin-RevId: 354239205
  • Loading branch information
Googler authored and copybara-github committed Jan 28, 2021
1 parent 92955e6 commit 75bd1ff
Show file tree
Hide file tree
Showing 27 changed files with 513 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,11 @@ private ContentAddressableStorageFutureStub casFutureStub() {
.withDeadlineAfter(options.remoteTimeout.getSeconds(), TimeUnit.SECONDS);
}

private ByteStreamStub bsAsyncStub() {
private ByteStreamStub bsAsyncStub(RemoteActionExecutionContext context) {
return ByteStreamGrpc.newStub(channel)
.withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
.withInterceptors(
TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata()),
new NetworkTimeInterceptor(context::getNetworkTime))
.withCallCredentials(callCredentialsProvider.getCallCredentials())
.withDeadlineAfter(options.remoteTimeout.getSeconds(), TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -283,7 +285,8 @@ public void uploadActionResult(
}

@Override
public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
public ListenableFuture<Void> downloadBlob(
RemoteActionExecutionContext context, Digest digest, OutputStream out) {
if (digest.getSizeBytes() == 0) {
return Futures.immediateFuture(null);
}
Expand All @@ -295,23 +298,23 @@ public ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
out = digestOut;
}

return downloadBlob(digest, out, digestSupplier);
return downloadBlob(context, digest, out, digestSupplier);
}

private ListenableFuture<Void> downloadBlob(
Digest digest, OutputStream out, @Nullable Supplier<Digest> digestSupplier) {
Context ctx = Context.current();
RemoteActionExecutionContext context,
Digest digest,
OutputStream out,
@Nullable Supplier<Digest> digestSupplier) {
AtomicLong offset = new AtomicLong(0);
ProgressiveBackoff progressiveBackoff = new ProgressiveBackoff(retrier::newBackoff);
ListenableFuture<Void> downloadFuture =
Utils.refreshIfUnauthenticatedAsync(
() ->
retrier.executeAsync(
() ->
ctx.call(
() ->
requestRead(
offset, progressiveBackoff, digest, out, digestSupplier)),
requestRead(
context, offset, progressiveBackoff, digest, out, digestSupplier),
progressiveBackoff),
callCredentialsProvider);

Expand All @@ -331,14 +334,15 @@ public static String getResourceName(String instanceName, Digest digest) {
}

private ListenableFuture<Void> requestRead(
RemoteActionExecutionContext context,
AtomicLong offset,
ProgressiveBackoff progressiveBackoff,
Digest digest,
OutputStream out,
@Nullable Supplier<Digest> digestSupplier) {
String resourceName = getResourceName(options.remoteInstanceName, digest);
SettableFuture<Void> future = SettableFuture.create();
bsAsyncStub()
bsAsyncStub(context)
.read(
ReadRequest.newBuilder()
.setResourceName(resourceName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.NetworkTime;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.remote.util.Utils;
Expand Down Expand Up @@ -66,15 +69,17 @@ class RemoteActionInputFetcher implements ActionInputPrefetcher {
@GuardedBy("lock")
final Map<Path, ListenableFuture<Void>> downloadsInProgress = new HashMap<>();

private final String buildRequestId;
private final String commandId;
private final RemoteCache remoteCache;
private final Path execRoot;
private final RequestMetadata requestMetadata;

RemoteActionInputFetcher(
RemoteCache remoteCache, Path execRoot, RequestMetadata requestMetadata) {
String buildRequestId, String commandId, RemoteCache remoteCache, Path execRoot) {
this.buildRequestId = Preconditions.checkNotNull(buildRequestId);
this.commandId = Preconditions.checkNotNull(commandId);
this.remoteCache = Preconditions.checkNotNull(remoteCache);
this.execRoot = Preconditions.checkNotNull(execRoot);
this.requestMetadata = Preconditions.checkNotNull(requestMetadata);
}

/**
Expand Down Expand Up @@ -160,13 +165,15 @@ private ListenableFuture<Void> downloadFileAsync(Path path, FileArtifactValue me

ListenableFuture<Void> download = downloadsInProgress.get(path);
if (download == null) {
Context ctx =
TracingMetadataUtils.contextWithMetadata(
requestMetadata.toBuilder().setActionId(metadata.getActionId()).build());
RequestMetadata requestMetadata =
TracingMetadataUtils.buildMetadata(buildRequestId, commandId, metadata.getActionId());
RemoteActionExecutionContext remoteActionExecutionContext =
new RemoteActionExecutionContextImpl(requestMetadata, new NetworkTime());
Context ctx = TracingMetadataUtils.contextWithMetadata(requestMetadata);
Context prevCtx = ctx.attach();
try {
Digest digest = DigestUtil.buildDigest(metadata.getDigest(), metadata.getSize());
download = remoteCache.downloadFile(path, digest);
download = remoteCache.downloadFile(remoteActionExecutionContext, path, digest);
downloadsInProgress.put(path, download);
Futures.addCallback(
download,
Expand Down
40 changes: 25 additions & 15 deletions src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -261,14 +261,15 @@ public static void waitForBulkTransfer(
* @return a future that completes after the download completes (succeeds / fails). If successful,
* the content is stored in the future's {@code byte[]}.
*/
public ListenableFuture<byte[]> downloadBlob(Digest digest) {
public ListenableFuture<byte[]> downloadBlob(
RemoteActionExecutionContext context, Digest digest) {
if (digest.getSizeBytes() == 0) {
return EMPTY_BYTES;
}
ByteArrayOutputStream bOut = new ByteArrayOutputStream((int) digest.getSizeBytes());
SettableFuture<byte[]> outerF = SettableFuture.create();
Futures.addCallback(
cacheProtocol.downloadBlob(digest, bOut),
cacheProtocol.downloadBlob(context, digest, bOut),
new FutureCallback<Void>() {
@Override
public void onSuccess(Void aVoid) {
Expand Down Expand Up @@ -305,12 +306,13 @@ private static Path toTmpDownloadPath(Path actualPath) {
* @throws ExecException in case clean up after a failed download failed.
*/
public void download(
RemoteActionExecutionContext context,
ActionResult result,
Path execRoot,
FileOutErr origOutErr,
OutputFilesLocker outputFilesLocker)
throws ExecException, IOException, InterruptedException {
ActionResultMetadata metadata = parseActionResultMetadata(result, execRoot);
ActionResultMetadata metadata = parseActionResultMetadata(context, result, execRoot);

List<ListenableFuture<FileMetadata>> downloads =
Stream.concat(
Expand All @@ -321,7 +323,7 @@ public void download(
(file) -> {
try {
ListenableFuture<Void> download =
downloadFile(toTmpDownloadPath(file.path()), file.digest());
downloadFile(context, toTmpDownloadPath(file.path()), file.digest());
return Futures.transform(download, (d) -> file, directExecutor());
} catch (IOException e) {
return Futures.<FileMetadata>immediateFailedFuture(e);
Expand All @@ -337,7 +339,7 @@ public void download(
if (origOutErr != null) {
tmpOutErr = origOutErr.childOutErr();
}
downloads.addAll(downloadOutErr(result, tmpOutErr));
downloads.addAll(downloadOutErr(context, result, tmpOutErr));

try {
waitForBulkTransfer(downloads, /* cancelRemainingOnInterrupt=*/ true);
Expand Down Expand Up @@ -449,7 +451,8 @@ private void createSymlinks(Iterable<SymlinkMetadata> symlinks) throws IOExcepti
}

/** Downloads a file (that is not a directory). The content is fetched from the digest. */
public ListenableFuture<Void> downloadFile(Path path, Digest digest) throws IOException {
public ListenableFuture<Void> downloadFile(
RemoteActionExecutionContext context, Path path, Digest digest) throws IOException {
Preconditions.checkNotNull(path.getParentDirectory()).createDirectoryAndParents();
if (digest.getSizeBytes() == 0) {
// Handle empty file locally.
Expand All @@ -472,7 +475,7 @@ public ListenableFuture<Void> downloadFile(Path path, Digest digest) throws IOEx

OutputStream out = new LazyFileOutputStream(path);
SettableFuture<Void> outerF = SettableFuture.create();
ListenableFuture<Void> f = cacheProtocol.downloadBlob(digest, out);
ListenableFuture<Void> f = cacheProtocol.downloadBlob(context, digest, out);
Futures.addCallback(
f,
new FutureCallback<Void>() {
Expand Down Expand Up @@ -509,7 +512,8 @@ public void onFailure(Throwable t) {
return outerF;
}

private List<ListenableFuture<FileMetadata>> downloadOutErr(ActionResult result, OutErr outErr) {
private List<ListenableFuture<FileMetadata>> downloadOutErr(
RemoteActionExecutionContext context, ActionResult result, OutErr outErr) {
List<ListenableFuture<FileMetadata>> downloads = new ArrayList<>();
if (!result.getStdoutRaw().isEmpty()) {
try {
Expand All @@ -521,7 +525,8 @@ private List<ListenableFuture<FileMetadata>> downloadOutErr(ActionResult result,
} else if (result.hasStdoutDigest()) {
downloads.add(
Futures.transform(
cacheProtocol.downloadBlob(result.getStdoutDigest(), outErr.getOutputStream()),
cacheProtocol.downloadBlob(
context, result.getStdoutDigest(), outErr.getOutputStream()),
(d) -> null,
directExecutor()));
}
Expand All @@ -535,7 +540,8 @@ private List<ListenableFuture<FileMetadata>> downloadOutErr(ActionResult result,
} else if (result.hasStderrDigest()) {
downloads.add(
Futures.transform(
cacheProtocol.downloadBlob(result.getStderrDigest(), outErr.getErrorStream()),
cacheProtocol.downloadBlob(
context, result.getStderrDigest(), outErr.getErrorStream()),
(d) -> null,
directExecutor()));
}
Expand All @@ -549,6 +555,7 @@ private List<ListenableFuture<FileMetadata>> downloadOutErr(ActionResult result,
* <p>This method only downloads output directory metadata, stdout and stderr as well as the
* contents of {@code inMemoryOutputPath} if specified.
*
* @param context the context this action running with
* @param result the action result metadata of a successfully executed action (exit code = 0).
* @param outputs the action's declared output files
* @param inMemoryOutputPath the path of an output file whose contents should be returned in
Expand All @@ -564,6 +571,7 @@ private List<ListenableFuture<FileMetadata>> downloadOutErr(ActionResult result,
*/
@Nullable
public InMemoryOutput downloadMinimal(
RemoteActionExecutionContext context,
String actionId,
ActionResult result,
Collection<? extends ActionInput> outputs,
Expand All @@ -579,7 +587,7 @@ public InMemoryOutput downloadMinimal(

ActionResultMetadata metadata;
try (SilentCloseable c = Profiler.instance().profile("Remote.parseActionResultMetadata")) {
metadata = parseActionResultMetadata(result, execRoot);
metadata = parseActionResultMetadata(context, result, execRoot);
}

if (!metadata.symlinks().isEmpty()) {
Expand Down Expand Up @@ -614,9 +622,10 @@ public InMemoryOutput downloadMinimal(
try (SilentCloseable c = Profiler.instance().profile("Remote.download")) {
ListenableFuture<byte[]> inMemoryOutputDownload = null;
if (inMemoryOutput != null) {
inMemoryOutputDownload = downloadBlob(inMemoryOutputDigest);
inMemoryOutputDownload = downloadBlob(context, inMemoryOutputDigest);
}
waitForBulkTransfer(downloadOutErr(result, outErr), /* cancelRemainingOnInterrupt=*/ true);
waitForBulkTransfer(
downloadOutErr(context, result, outErr), /* cancelRemainingOnInterrupt=*/ true);
if (inMemoryOutputDownload != null) {
waitForBulkTransfer(
ImmutableList.of(inMemoryOutputDownload), /* cancelRemainingOnInterrupt=*/ true);
Expand Down Expand Up @@ -708,7 +717,8 @@ private DirectoryMetadata parseDirectory(
return new DirectoryMetadata(filesBuilder.build(), symlinksBuilder.build());
}

private ActionResultMetadata parseActionResultMetadata(ActionResult actionResult, Path execRoot)
private ActionResultMetadata parseActionResultMetadata(
RemoteActionExecutionContext context, ActionResult actionResult, Path execRoot)
throws IOException, InterruptedException {
Preconditions.checkNotNull(actionResult, "actionResult");
Map<Path, ListenableFuture<Tree>> dirMetadataDownloads =
Expand All @@ -717,7 +727,7 @@ private ActionResultMetadata parseActionResultMetadata(ActionResult actionResult
dirMetadataDownloads.put(
execRoot.getRelative(dir.getPath()),
Futures.transform(
downloadBlob(dir.getTreeDigest()),
downloadBlob(context, dir.getTreeDigest()),
(treeBytes) -> {
try {
return Tree.parseFrom(treeBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package com.google.devtools.build.lib.remote;

import build.bazel.remote.execution.v2.DigestFunction;
import build.bazel.remote.execution.v2.RequestMetadata;
import build.bazel.remote.execution.v2.ServerCapabilities;
import com.google.auth.Credentials;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -512,9 +511,6 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
requestContext,
remoteOptions.remoteInstanceName));

Context repoContext =
TracingMetadataUtils.contextWithMetadata(buildRequestId, invocationId, "repository_rule");

if (enableRemoteExecution) {
RemoteExecutionClient remoteExecutor;
if (remoteOptions.remoteExecutionKeepalive) {
Expand Down Expand Up @@ -550,7 +546,6 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
digestUtil,
buildRequestId,
invocationId,
"repository_rule",
remoteOptions.remoteInstanceName,
remoteOptions.remoteAcceptCached));
} else {
Expand Down Expand Up @@ -579,10 +574,11 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
if (enableRemoteDownloader) {
remoteDownloaderSupplier.set(
new GrpcRemoteDownloader(
buildRequestId,
invocationId,
downloaderChannel.retain(),
Optional.ofNullable(credentials),
retrier,
repoContext,
cacheClient,
remoteOptions));
downloaderChannel.release();
Expand Down Expand Up @@ -855,14 +851,12 @@ public void executorInit(CommandEnvironment env, BuildRequest request, ExecutorB
env.getOptions().getOptions(RemoteOptions.class), "RemoteOptions");
RemoteOutputsMode remoteOutputsMode = remoteOptions.remoteOutputsMode;
if (!remoteOutputsMode.downloadAllOutputs()) {
RequestMetadata requestMetadata =
RequestMetadata.newBuilder()
.setCorrelatedInvocationsId(env.getBuildRequestId())
.setToolInvocationId(env.getCommandId().toString())
.build();
actionInputFetcher =
new RemoteActionInputFetcher(
actionContextProvider.getRemoteCache(), env.getExecRoot(), requestMetadata);
env.getBuildRequestId(),
env.getCommandId().toString(),
actionContextProvider.getRemoteCache(),
env.getExecRoot());
builder.setActionInputPrefetcher(actionInputFetcher);
remoteOutputService.setActionInputFetcher(actionInputFetcher);
}
Expand Down
Loading

0 comments on commit 75bd1ff

Please sign in to comment.