diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java index 85da6b014aadce..ef031d7d83771a 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java @@ -129,9 +129,11 @@ private ContentAddressableStorageFutureStub casFutureStub() { .withDeadlineAfter(options.remoteTimeout.getSeconds(), TimeUnit.SECONDS); } - private ByteStreamStub bsAsyncStub() { + private ByteStreamStub bsAsyncStub(RemoteActionExecutionContext context) { return ByteStreamGrpc.newStub(channel) - .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor()) + .withInterceptors( + TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata()), + new NetworkTimeInterceptor(context::getNetworkTime)) .withCallCredentials(callCredentialsProvider.getCallCredentials()) .withDeadlineAfter(options.remoteTimeout.getSeconds(), TimeUnit.SECONDS); } @@ -283,7 +285,8 @@ public void uploadActionResult( } @Override - public ListenableFuture downloadBlob(Digest digest, OutputStream out) { + public ListenableFuture downloadBlob( + RemoteActionExecutionContext context, Digest digest, OutputStream out) { if (digest.getSizeBytes() == 0) { return Futures.immediateFuture(null); } @@ -295,12 +298,14 @@ public ListenableFuture downloadBlob(Digest digest, OutputStream out) { out = digestOut; } - return downloadBlob(digest, out, digestSupplier); + return downloadBlob(context, digest, out, digestSupplier); } private ListenableFuture downloadBlob( - Digest digest, OutputStream out, @Nullable Supplier digestSupplier) { - Context ctx = Context.current(); + RemoteActionExecutionContext context, + Digest digest, + OutputStream out, + @Nullable Supplier digestSupplier) { AtomicLong offset = new AtomicLong(0); ProgressiveBackoff progressiveBackoff = new ProgressiveBackoff(retrier::newBackoff); ListenableFuture downloadFuture = @@ -308,10 +313,8 @@ private ListenableFuture downloadBlob( () -> retrier.executeAsync( () -> - ctx.call( - () -> - requestRead( - offset, progressiveBackoff, digest, out, digestSupplier)), + requestRead( + context, offset, progressiveBackoff, digest, out, digestSupplier), progressiveBackoff), callCredentialsProvider); @@ -331,6 +334,7 @@ public static String getResourceName(String instanceName, Digest digest) { } private ListenableFuture requestRead( + RemoteActionExecutionContext context, AtomicLong offset, ProgressiveBackoff progressiveBackoff, Digest digest, @@ -338,7 +342,7 @@ private ListenableFuture requestRead( @Nullable Supplier digestSupplier) { String resourceName = getResourceName(options.remoteInstanceName, digest); SettableFuture future = SettableFuture.create(); - bsAsyncStub() + bsAsyncStub(context) .read( ReadRequest.newBuilder() .setResourceName(resourceName) diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java index 6191f65903a1c0..e78c168e82e197 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java @@ -33,6 +33,9 @@ import com.google.devtools.build.lib.profiler.ProfilerTask; import com.google.devtools.build.lib.profiler.SilentCloseable; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; +import com.google.devtools.build.lib.remote.common.NetworkTime; +import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; +import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import com.google.devtools.build.lib.remote.util.Utils; @@ -66,15 +69,17 @@ class RemoteActionInputFetcher implements ActionInputPrefetcher { @GuardedBy("lock") final Map> downloadsInProgress = new HashMap<>(); + private final String buildRequestId; + private final String commandId; private final RemoteCache remoteCache; private final Path execRoot; - private final RequestMetadata requestMetadata; RemoteActionInputFetcher( - RemoteCache remoteCache, Path execRoot, RequestMetadata requestMetadata) { + String buildRequestId, String commandId, RemoteCache remoteCache, Path execRoot) { + this.buildRequestId = Preconditions.checkNotNull(buildRequestId); + this.commandId = Preconditions.checkNotNull(commandId); this.remoteCache = Preconditions.checkNotNull(remoteCache); this.execRoot = Preconditions.checkNotNull(execRoot); - this.requestMetadata = Preconditions.checkNotNull(requestMetadata); } /** @@ -160,13 +165,15 @@ private ListenableFuture downloadFileAsync(Path path, FileArtifactValue me ListenableFuture download = downloadsInProgress.get(path); if (download == null) { - Context ctx = - TracingMetadataUtils.contextWithMetadata( - requestMetadata.toBuilder().setActionId(metadata.getActionId()).build()); + RequestMetadata requestMetadata = + TracingMetadataUtils.buildMetadata(buildRequestId, commandId, metadata.getActionId()); + RemoteActionExecutionContext remoteActionExecutionContext = + new RemoteActionExecutionContextImpl(requestMetadata, new NetworkTime()); + Context ctx = TracingMetadataUtils.contextWithMetadata(requestMetadata); Context prevCtx = ctx.attach(); try { Digest digest = DigestUtil.buildDigest(metadata.getDigest(), metadata.getSize()); - download = remoteCache.downloadFile(path, digest); + download = remoteCache.downloadFile(remoteActionExecutionContext, path, digest); downloadsInProgress.put(path, download); Futures.addCallback( download, diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java index 8e68aaba540cb1..3950814d09f253 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java @@ -261,14 +261,15 @@ public static void waitForBulkTransfer( * @return a future that completes after the download completes (succeeds / fails). If successful, * the content is stored in the future's {@code byte[]}. */ - public ListenableFuture downloadBlob(Digest digest) { + public ListenableFuture downloadBlob( + RemoteActionExecutionContext context, Digest digest) { if (digest.getSizeBytes() == 0) { return EMPTY_BYTES; } ByteArrayOutputStream bOut = new ByteArrayOutputStream((int) digest.getSizeBytes()); SettableFuture outerF = SettableFuture.create(); Futures.addCallback( - cacheProtocol.downloadBlob(digest, bOut), + cacheProtocol.downloadBlob(context, digest, bOut), new FutureCallback() { @Override public void onSuccess(Void aVoid) { @@ -305,12 +306,13 @@ private static Path toTmpDownloadPath(Path actualPath) { * @throws ExecException in case clean up after a failed download failed. */ public void download( + RemoteActionExecutionContext context, ActionResult result, Path execRoot, FileOutErr origOutErr, OutputFilesLocker outputFilesLocker) throws ExecException, IOException, InterruptedException { - ActionResultMetadata metadata = parseActionResultMetadata(result, execRoot); + ActionResultMetadata metadata = parseActionResultMetadata(context, result, execRoot); List> downloads = Stream.concat( @@ -321,7 +323,7 @@ public void download( (file) -> { try { ListenableFuture download = - downloadFile(toTmpDownloadPath(file.path()), file.digest()); + downloadFile(context, toTmpDownloadPath(file.path()), file.digest()); return Futures.transform(download, (d) -> file, directExecutor()); } catch (IOException e) { return Futures.immediateFailedFuture(e); @@ -337,7 +339,7 @@ public void download( if (origOutErr != null) { tmpOutErr = origOutErr.childOutErr(); } - downloads.addAll(downloadOutErr(result, tmpOutErr)); + downloads.addAll(downloadOutErr(context, result, tmpOutErr)); try { waitForBulkTransfer(downloads, /* cancelRemainingOnInterrupt=*/ true); @@ -449,7 +451,8 @@ private void createSymlinks(Iterable symlinks) throws IOExcepti } /** Downloads a file (that is not a directory). The content is fetched from the digest. */ - public ListenableFuture downloadFile(Path path, Digest digest) throws IOException { + public ListenableFuture downloadFile( + RemoteActionExecutionContext context, Path path, Digest digest) throws IOException { Preconditions.checkNotNull(path.getParentDirectory()).createDirectoryAndParents(); if (digest.getSizeBytes() == 0) { // Handle empty file locally. @@ -472,7 +475,7 @@ public ListenableFuture downloadFile(Path path, Digest digest) throws IOEx OutputStream out = new LazyFileOutputStream(path); SettableFuture outerF = SettableFuture.create(); - ListenableFuture f = cacheProtocol.downloadBlob(digest, out); + ListenableFuture f = cacheProtocol.downloadBlob(context, digest, out); Futures.addCallback( f, new FutureCallback() { @@ -509,7 +512,8 @@ public void onFailure(Throwable t) { return outerF; } - private List> downloadOutErr(ActionResult result, OutErr outErr) { + private List> downloadOutErr( + RemoteActionExecutionContext context, ActionResult result, OutErr outErr) { List> downloads = new ArrayList<>(); if (!result.getStdoutRaw().isEmpty()) { try { @@ -521,7 +525,8 @@ private List> downloadOutErr(ActionResult result, } else if (result.hasStdoutDigest()) { downloads.add( Futures.transform( - cacheProtocol.downloadBlob(result.getStdoutDigest(), outErr.getOutputStream()), + cacheProtocol.downloadBlob( + context, result.getStdoutDigest(), outErr.getOutputStream()), (d) -> null, directExecutor())); } @@ -535,7 +540,8 @@ private List> downloadOutErr(ActionResult result, } else if (result.hasStderrDigest()) { downloads.add( Futures.transform( - cacheProtocol.downloadBlob(result.getStderrDigest(), outErr.getErrorStream()), + cacheProtocol.downloadBlob( + context, result.getStderrDigest(), outErr.getErrorStream()), (d) -> null, directExecutor())); } @@ -549,6 +555,7 @@ private List> downloadOutErr(ActionResult result, *

This method only downloads output directory metadata, stdout and stderr as well as the * contents of {@code inMemoryOutputPath} if specified. * + * @param context the context this action running with * @param result the action result metadata of a successfully executed action (exit code = 0). * @param outputs the action's declared output files * @param inMemoryOutputPath the path of an output file whose contents should be returned in @@ -564,6 +571,7 @@ private List> downloadOutErr(ActionResult result, */ @Nullable public InMemoryOutput downloadMinimal( + RemoteActionExecutionContext context, String actionId, ActionResult result, Collection outputs, @@ -579,7 +587,7 @@ public InMemoryOutput downloadMinimal( ActionResultMetadata metadata; try (SilentCloseable c = Profiler.instance().profile("Remote.parseActionResultMetadata")) { - metadata = parseActionResultMetadata(result, execRoot); + metadata = parseActionResultMetadata(context, result, execRoot); } if (!metadata.symlinks().isEmpty()) { @@ -614,9 +622,10 @@ public InMemoryOutput downloadMinimal( try (SilentCloseable c = Profiler.instance().profile("Remote.download")) { ListenableFuture inMemoryOutputDownload = null; if (inMemoryOutput != null) { - inMemoryOutputDownload = downloadBlob(inMemoryOutputDigest); + inMemoryOutputDownload = downloadBlob(context, inMemoryOutputDigest); } - waitForBulkTransfer(downloadOutErr(result, outErr), /* cancelRemainingOnInterrupt=*/ true); + waitForBulkTransfer( + downloadOutErr(context, result, outErr), /* cancelRemainingOnInterrupt=*/ true); if (inMemoryOutputDownload != null) { waitForBulkTransfer( ImmutableList.of(inMemoryOutputDownload), /* cancelRemainingOnInterrupt=*/ true); @@ -708,7 +717,8 @@ private DirectoryMetadata parseDirectory( return new DirectoryMetadata(filesBuilder.build(), symlinksBuilder.build()); } - private ActionResultMetadata parseActionResultMetadata(ActionResult actionResult, Path execRoot) + private ActionResultMetadata parseActionResultMetadata( + RemoteActionExecutionContext context, ActionResult actionResult, Path execRoot) throws IOException, InterruptedException { Preconditions.checkNotNull(actionResult, "actionResult"); Map> dirMetadataDownloads = @@ -717,7 +727,7 @@ private ActionResultMetadata parseActionResultMetadata(ActionResult actionResult dirMetadataDownloads.put( execRoot.getRelative(dir.getPath()), Futures.transform( - downloadBlob(dir.getTreeDigest()), + downloadBlob(context, dir.getTreeDigest()), (treeBytes) -> { try { return Tree.parseFrom(treeBytes); diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index ecb175e3992a8b..dfa368bb7768d8 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -15,7 +15,6 @@ package com.google.devtools.build.lib.remote; import build.bazel.remote.execution.v2.DigestFunction; -import build.bazel.remote.execution.v2.RequestMetadata; import build.bazel.remote.execution.v2.ServerCapabilities; import com.google.auth.Credentials; import com.google.common.annotations.VisibleForTesting; @@ -512,9 +511,6 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { requestContext, remoteOptions.remoteInstanceName)); - Context repoContext = - TracingMetadataUtils.contextWithMetadata(buildRequestId, invocationId, "repository_rule"); - if (enableRemoteExecution) { RemoteExecutionClient remoteExecutor; if (remoteOptions.remoteExecutionKeepalive) { @@ -550,7 +546,6 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { digestUtil, buildRequestId, invocationId, - "repository_rule", remoteOptions.remoteInstanceName, remoteOptions.remoteAcceptCached)); } else { @@ -579,10 +574,11 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { if (enableRemoteDownloader) { remoteDownloaderSupplier.set( new GrpcRemoteDownloader( + buildRequestId, + invocationId, downloaderChannel.retain(), Optional.ofNullable(credentials), retrier, - repoContext, cacheClient, remoteOptions)); downloaderChannel.release(); @@ -855,14 +851,12 @@ public void executorInit(CommandEnvironment env, BuildRequest request, ExecutorB env.getOptions().getOptions(RemoteOptions.class), "RemoteOptions"); RemoteOutputsMode remoteOutputsMode = remoteOptions.remoteOutputsMode; if (!remoteOutputsMode.downloadAllOutputs()) { - RequestMetadata requestMetadata = - RequestMetadata.newBuilder() - .setCorrelatedInvocationsId(env.getBuildRequestId()) - .setToolInvocationId(env.getCommandId().toString()) - .build(); actionInputFetcher = new RemoteActionInputFetcher( - actionContextProvider.getRemoteCache(), env.getExecRoot(), requestMetadata); + env.getBuildRequestId(), + env.getCommandId().toString(), + actionContextProvider.getRemoteCache(), + env.getExecRoot()); builder.setActionInputPrefetcher(actionInputFetcher); remoteOutputService.setActionInputFetcher(actionInputFetcher); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java index e66897b599ad48..5d89161722f913 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutor.java @@ -20,6 +20,7 @@ import build.bazel.remote.execution.v2.ExecuteRequest; import build.bazel.remote.execution.v2.ExecuteResponse; import build.bazel.remote.execution.v2.Platform; +import build.bazel.remote.execution.v2.RequestMetadata; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSortedMap; @@ -55,7 +56,6 @@ public class RemoteRepositoryRemoteExecutor implements RepositoryRemoteExecutor private final DigestUtil digestUtil; private final String buildRequestId; private final String commandId; - private final String actionId; private final String remoteInstanceName; private final boolean acceptCached; @@ -66,7 +66,6 @@ public RemoteRepositoryRemoteExecutor( DigestUtil digestUtil, String buildRequestId, String commandId, - String actionId, String remoteInstanceName, boolean acceptCached) { this.remoteCache = remoteCache; @@ -74,12 +73,11 @@ public RemoteRepositoryRemoteExecutor( this.digestUtil = digestUtil; this.buildRequestId = buildRequestId; this.commandId = commandId; - this.actionId = actionId; this.remoteInstanceName = remoteInstanceName; this.acceptCached = acceptCached; } - private ExecutionResult downloadOutErr(ActionResult result) + private ExecutionResult downloadOutErr(RemoteActionExecutionContext context, ActionResult result) throws IOException, InterruptedException { try (SilentCloseable c = Profiler.instance().profile(ProfilerTask.REMOTE_DOWNLOAD, "download stdout/stderr")) { @@ -87,14 +85,14 @@ private ExecutionResult downloadOutErr(ActionResult result) if (!result.getStdoutRaw().isEmpty()) { stdout = result.getStdoutRaw().toByteArray(); } else if (result.hasStdoutDigest()) { - stdout = Utils.getFromFuture(remoteCache.downloadBlob(result.getStdoutDigest())); + stdout = Utils.getFromFuture(remoteCache.downloadBlob(context, result.getStdoutDigest())); } byte[] stderr = new byte[0]; if (!result.getStderrRaw().isEmpty()) { stderr = result.getStderrRaw().toByteArray(); } else if (result.hasStderrDigest()) { - stderr = Utils.getFromFuture(remoteCache.downloadBlob(result.getStderrDigest())); + stderr = Utils.getFromFuture(remoteCache.downloadBlob(context, result.getStderrDigest())); } return new ExecutionResult(result.getExitCode(), stdout, stderr); @@ -110,8 +108,9 @@ public ExecutionResult execute( String workingDirectory, Duration timeout) throws IOException, InterruptedException { - Context requestCtx = - TracingMetadataUtils.contextWithMetadata(buildRequestId, commandId, actionId); + RequestMetadata metadata = + TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "repository_rule"); + Context requestCtx = TracingMetadataUtils.contextWithMetadata(metadata); Context prev = requestCtx.attach(); try { Platform platform = PlatformUtils.buildPlatformProto(executionProperties); @@ -130,9 +129,7 @@ public ExecutionResult execute( Digest actionDigest = digestUtil.compute(action); ActionKey actionKey = new ActionKey(actionDigest); RemoteActionExecutionContext remoteActionExecutionContext = - new RemoteActionExecutionContextImpl( - TracingMetadataUtils.buildMetadata(buildRequestId, commandId, actionId), - new NetworkTime()); + new RemoteActionExecutionContextImpl(metadata, new NetworkTime()); ActionResult actionResult; try (SilentCloseable c = Profiler.instance().profile(ProfilerTask.REMOTE_CACHE_CHECK, "check cache hit")) { @@ -164,7 +161,7 @@ public ExecutionResult execute( actionResult = response.getResult(); } } - return downloadOutErr(actionResult); + return downloadOutErr(remoteActionExecutionContext, actionResult); } finally { requestCtx.detach(prev); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorFactory.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorFactory.java index 70ee4d903ca690..a67cf859bef185 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorFactory.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorFactory.java @@ -26,7 +26,6 @@ class RemoteRepositoryRemoteExecutorFactory implements RepositoryRemoteExecutorF private final DigestUtil digestUtil; private final String buildRequestId; private final String commandId; - private final String actionId; private final String remoteInstanceName; private final boolean acceptCached; @@ -37,7 +36,6 @@ class RemoteRepositoryRemoteExecutorFactory implements RepositoryRemoteExecutorF DigestUtil digestUtil, String buildRequestId, String commandId, - String actionId, String remoteInstanceName, boolean acceptCached) { this.remoteExecutionCache = remoteExecutionCache; @@ -45,7 +43,6 @@ class RemoteRepositoryRemoteExecutorFactory implements RepositoryRemoteExecutorF this.digestUtil = digestUtil; this.buildRequestId = buildRequestId; this.commandId = commandId; - this.actionId = actionId; this.remoteInstanceName = remoteInstanceName; this.acceptCached = acceptCached; } @@ -58,7 +55,6 @@ public RepositoryRemoteExecutor create() { digestUtil, buildRequestId, commandId, - actionId, remoteInstanceName, acceptCached); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java index a7bc727238bd35..d213beea64c1cf 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java @@ -191,7 +191,11 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context) try (SilentCloseable c = prof.profile(ProfilerTask.REMOTE_DOWNLOAD, "download outputs")) { remoteCache.download( - result, execRoot, context.getFileOutErr(), context::lockOutputFiles); + remoteActionExecutionContext, + result, + execRoot, + context.getFileOutErr(), + context::lockOutputFiles); } } else { PathFragment inMemoryOutputPath = getInMemoryOutputPath(spawn); @@ -200,6 +204,7 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context) prof.profile(ProfilerTask.REMOTE_DOWNLOAD, "download outputs minimal")) { inMemoryOutput = remoteCache.downloadMinimal( + remoteActionExecutionContext, actionKey.getDigest().getHash(), result, spawn.getOutputFiles(), diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java index fdce7c5dfb90e1..fa99c638b09359 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java @@ -277,6 +277,7 @@ public SpawnResult exec(Spawn spawn, SpawnExecutionContext context) } else { try { return downloadAndFinalizeSpawnResult( + remoteActionExecutionContext, actionKey.getDigest().getHash(), cachedResult, /* cacheHit= */ true, @@ -366,11 +367,12 @@ public SpawnResult exec(Spawn spawn, SpawnExecutionContext context) spawnMetricsAccounting(spawnMetrics, actionResult.getExecutionMetadata()); try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "download server logs")) { - maybeDownloadServerLogs(reply, actionKey); + maybeDownloadServerLogs(remoteActionExecutionContext, reply, actionKey); } try { return downloadAndFinalizeSpawnResult( + remoteActionExecutionContext, actionKey.getDigest().getHash(), actionResult, reply.getCachedResult(), @@ -452,6 +454,7 @@ static void spawnMetricsAccounting( } private SpawnResult downloadAndFinalizeSpawnResult( + RemoteActionExecutionContext remoteActionExecutionContext, String actionId, ActionResult actionResult, boolean cacheHit, @@ -473,7 +476,11 @@ private SpawnResult downloadAndFinalizeSpawnResult( if (downloadOutputs) { try (SilentCloseable c = Profiler.instance().profile(REMOTE_DOWNLOAD, "download outputs")) { remoteCache.download( - actionResult, execRoot, context.getFileOutErr(), context::lockOutputFiles); + remoteActionExecutionContext, + actionResult, + execRoot, + context.getFileOutErr(), + context::lockOutputFiles); } } else { PathFragment inMemoryOutputPath = getInMemoryOutputPath(spawn); @@ -481,6 +488,7 @@ private SpawnResult downloadAndFinalizeSpawnResult( Profiler.instance().profile(REMOTE_DOWNLOAD, "download outputs minimal")) { inMemoryOutput = remoteCache.downloadMinimal( + remoteActionExecutionContext, actionId, actionResult, spawn.getOutputFiles(), @@ -532,7 +540,8 @@ private void maybeWriteParamFilesLocally(Spawn spawn) throws IOException { } } - private void maybeDownloadServerLogs(ExecuteResponse resp, ActionKey actionKey) + private void maybeDownloadServerLogs( + RemoteActionExecutionContext context, ExecuteResponse resp, ActionKey actionKey) throws InterruptedException { ActionResult result = resp.getResult(); if (resp.getServerLogsCount() > 0 @@ -545,7 +554,7 @@ private void maybeDownloadServerLogs(ExecuteResponse resp, ActionKey actionKey) logPath = parent.getRelative(e.getKey()); logCount++; try { - getFromFuture(remoteCache.downloadFile(logPath, e.getValue().getDigest())); + getFromFuture(remoteCache.downloadFile(context, logPath, e.getValue().getDigest())); } catch (IOException ex) { reportOnce(Event.warn("Failed downloading server logs from the remote cache.")); } @@ -598,11 +607,16 @@ private SpawnResult execLocallyAndUploadOrFail( command, uploadLocalResults); } - return handleError(cause, context.getFileOutErr(), actionKey, context); + return handleError( + remoteActionExecutionContext, cause, context.getFileOutErr(), actionKey, context); } private SpawnResult handleError( - IOException exception, FileOutErr outErr, ActionKey actionKey, SpawnExecutionContext context) + RemoteActionExecutionContext remoteActionExecutionContext, + IOException exception, + FileOutErr outErr, + ActionKey actionKey, + SpawnExecutionContext context) throws ExecException, InterruptedException, IOException { boolean remoteCacheFailed = BulkTransferException.isOnlyCausedByCacheNotFoundException(exception); @@ -610,11 +624,16 @@ private SpawnResult handleError( ExecutionStatusException e = (ExecutionStatusException) exception.getCause(); if (e.getResponse() != null) { ExecuteResponse resp = e.getResponse(); - maybeDownloadServerLogs(resp, actionKey); + maybeDownloadServerLogs(remoteActionExecutionContext, resp, actionKey); if (resp.hasResult()) { try { // We try to download all (partial) results even on server error, for debuggability. - remoteCache.download(resp.getResult(), execRoot, outErr, context::lockOutputFiles); + remoteCache.download( + remoteActionExecutionContext, + resp.getResult(), + execRoot, + outErr, + context::lockOutputFiles); } catch (BulkTransferException bulkTransferEx) { exception.addSuppressed(bulkTransferEx); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java index 2f3c74b0fa04cd..caf497f9df693c 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java @@ -66,6 +66,7 @@ public int hashCode() { /** * Downloads an action result for the {@code actionKey}. * + * @param context the context for the action. * @param actionKey The digest of the {@link Action} that generated the action result. * @param inlineOutErr A hint to the server to inline the stdout and stderr in the {@code * ActionResult} message. @@ -78,6 +79,7 @@ ListenableFuture downloadActionResult( /** * Uploads an action result for the {@code actionKey}. * + * @param context the context for the action. * @param actionKey The digest of the {@link Action} that generated the action result. * @param actionResult The action result to associate with the {@code actionKey}. * @throws IOException If there is an error uploading the action result. @@ -92,10 +94,12 @@ void uploadActionResult( * *

It's the callers responsibility to close {@code out}. * + * @param context the context for the action. * @return A Future representing pending completion of the download. If a BLOB for {@code digest} * does not exist in the cache the Future fails with a {@link CacheNotFoundException}. */ - ListenableFuture downloadBlob(Digest digest, OutputStream out); + ListenableFuture downloadBlob( + RemoteActionExecutionContext context, Digest digest, OutputStream out); /** * Uploads a {@code file} to the CAS. diff --git a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java index 74c19bdcf15d5b..040a7d447836c2 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskAndRemoteCacheClient.java @@ -129,9 +129,10 @@ private static ListenableFuture closeStreamOnError( } @Override - public ListenableFuture downloadBlob(Digest digest, OutputStream out) { + public ListenableFuture downloadBlob( + RemoteActionExecutionContext context, Digest digest, OutputStream out) { if (diskCache.contains(digest)) { - return diskCache.downloadBlob(digest, out); + return diskCache.downloadBlob(context, digest, out); } Path tempPath = newTempPath(); @@ -144,21 +145,19 @@ public ListenableFuture downloadBlob(Digest digest, OutputStream out) { if (!options.incompatibleRemoteResultsIgnoreDisk || options.remoteAcceptCached) { ListenableFuture download = - closeStreamOnError(remoteCache.downloadBlob(digest, tempOut), tempOut); - ListenableFuture saveToDiskAndTarget = - Futures.transformAsync( - download, - (unused) -> { - try { - tempOut.close(); - diskCache.captureFile(tempPath, digest, /* isActionCache= */ false); - } catch (IOException e) { - return Futures.immediateFailedFuture(e); - } - return diskCache.downloadBlob(digest, out); - }, - MoreExecutors.directExecutor()); - return saveToDiskAndTarget; + closeStreamOnError(remoteCache.downloadBlob(context, digest, tempOut), tempOut); + return Futures.transformAsync( + download, + (unused) -> { + try { + tempOut.close(); + diskCache.captureFile(tempPath, digest, /* isActionCache= */ false); + } catch (IOException e) { + return Futures.immediateFailedFuture(e); + } + return diskCache.downloadBlob(context, digest, out); + }, + MoreExecutors.directExecutor()); } else { return Futures.immediateFuture(null); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java index 9cef499ab4942b..027891353cfe4c 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/disk/DiskCacheClient.java @@ -81,7 +81,8 @@ private ListenableFuture download(Digest digest, OutputStream out, boolean } @Override - public ListenableFuture downloadBlob(Digest digest, OutputStream out) { + public ListenableFuture downloadBlob( + RemoteActionExecutionContext context, Digest digest, OutputStream out) { @Nullable DigestOutputStream digestOut = verifyDownloads ? digestUtil.newDigestOutputStream(out) : null; return Futures.transformAsync( diff --git a/src/main/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloader.java b/src/main/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloader.java index 37b639977d3090..672207bb68e71a 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloader.java @@ -20,6 +20,7 @@ import build.bazel.remote.asset.v1.FetchGrpc.FetchBlockingStub; import build.bazel.remote.asset.v1.Qualifier; import build.bazel.remote.execution.v2.Digest; +import build.bazel.remote.execution.v2.RequestMetadata; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.devtools.build.lib.bazel.repository.downloader.Checksum; @@ -28,6 +29,9 @@ import com.google.devtools.build.lib.events.ExtendedEventHandler; import com.google.devtools.build.lib.remote.ReferenceCountedChannel; import com.google.devtools.build.lib.remote.RemoteRetrier; +import com.google.devtools.build.lib.remote.common.NetworkTime; +import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; +import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; @@ -36,7 +40,6 @@ import com.google.gson.Gson; import com.google.gson.JsonObject; import io.grpc.CallCredentials; -import io.grpc.Context; import io.grpc.StatusRuntimeException; import java.io.IOException; import java.io.OutputStream; @@ -58,10 +61,11 @@ */ public class GrpcRemoteDownloader implements AutoCloseable, Downloader { + private final String buildRequestId; + private final String commandId; private final ReferenceCountedChannel channel; private final Optional credentials; private final RemoteRetrier retrier; - private final Context requestCtx; private final RemoteCacheClient cacheClient; private final RemoteOptions options; @@ -75,17 +79,19 @@ public class GrpcRemoteDownloader implements AutoCloseable, Downloader { private static final String QUALIFIER_AUTH_HEADERS = "bazel.auth_headers"; public GrpcRemoteDownloader( + String buildRequestId, + String commandId, ReferenceCountedChannel channel, Optional credentials, RemoteRetrier retrier, - Context requestCtx, RemoteCacheClient cacheClient, RemoteOptions options) { + this.buildRequestId = buildRequestId; + this.commandId = commandId; this.channel = channel; this.credentials = credentials; this.retrier = retrier; this.cacheClient = cacheClient; - this.requestCtx = requestCtx; this.options = options; } @@ -109,22 +115,26 @@ public void download( Map clientEnv, com.google.common.base.Optional type) throws IOException, InterruptedException { + RequestMetadata metadata = + TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "remote_downloader"); + RemoteActionExecutionContext remoteActionExecutionContext = + new RemoteActionExecutionContextImpl(metadata, new NetworkTime()); + final FetchBlobRequest request = newFetchBlobRequest(options.remoteInstanceName, urls, authHeaders, checksum, canonicalId); try { FetchBlobResponse response = - retrier.execute(() -> requestCtx.call(() -> fetchBlockingStub().fetchBlob(request))); + retrier.execute(() -> fetchBlockingStub(remoteActionExecutionContext).fetchBlob(request)); final Digest blobDigest = response.getBlobDigest(); retrier.execute( - () -> - requestCtx.call( - () -> { - try (OutputStream out = newOutputStream(destination, checksum)) { - Utils.getFromFuture(cacheClient.downloadBlob(blobDigest, out)); - } - return null; - })); + () -> { + try (OutputStream out = newOutputStream(destination, checksum)) { + Utils.getFromFuture( + cacheClient.downloadBlob(remoteActionExecutionContext, blobDigest, out)); + } + return null; + }); } catch (StatusRuntimeException e) { throw new IOException(e); } @@ -164,9 +174,10 @@ static FetchBlobRequest newFetchBlobRequest( return requestBuilder.build(); } - private FetchBlockingStub fetchBlockingStub() { + private FetchBlockingStub fetchBlockingStub(RemoteActionExecutionContext context) { return FetchGrpc.newBlockingStub(channel) - .withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor()) + .withInterceptors( + TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata())) .withInterceptors(TracingMetadataUtils.newDownloaderHeadersInterceptor(options)) .withCallCredentials(credentials.orElse(null)) .withDeadlineAfter(options.remoteTimeout.getSeconds(), TimeUnit.SECONDS); diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java index 9a55733bdbccea..7a67bb5bdb696e 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java @@ -438,7 +438,8 @@ private boolean isChannelPipelineEmpty(ChannelPipeline pipeline) { } @Override - public ListenableFuture downloadBlob(Digest digest, OutputStream out) { + public ListenableFuture downloadBlob( + RemoteActionExecutionContext context, Digest digest, OutputStream out) { final DigestOutputStream digestOut = verifyDownloads ? digestUtil.newDigestOutputStream(out) : null; return Futures.transformAsync( diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java index 878920b0bcb4aa..d22e1ed9a5fa73 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java @@ -248,10 +248,11 @@ private GrpcCacheClient newClient(RemoteOptions remoteOptions, Supplier channel.retain(), callCredentialsProvider, remoteOptions, retrier, DIGEST_UTIL, uploader); } - private static byte[] downloadBlob(GrpcCacheClient cacheClient, Digest digest) + private static byte[] downloadBlob( + RemoteActionExecutionContext context, GrpcCacheClient cacheClient, Digest digest) throws IOException, InterruptedException { try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - getFromFuture(cacheClient.downloadBlob(digest, out)); + getFromFuture(cacheClient.downloadBlob(context, digest, out)); return out.toByteArray(); } } @@ -324,7 +325,7 @@ public void testDownloadEmptyBlob() throws Exception { GrpcCacheClient client = newClient(); Digest emptyDigest = DIGEST_UTIL.compute(new byte[0]); // Will not call the mock Bytestream interface at all. - assertThat(downloadBlob(client, emptyDigest)).isEmpty(); + assertThat(downloadBlob(remoteActionExecutionContext, client, emptyDigest)).isEmpty(); } @Test @@ -341,7 +342,8 @@ public void read(ReadRequest request, StreamObserver responseObser responseObserver.onCompleted(); } }); - assertThat(new String(downloadBlob(client, digest), UTF_8)).isEqualTo("abcdefg"); + assertThat(new String(downloadBlob(remoteActionExecutionContext, client, digest), UTF_8)) + .isEqualTo("abcdefg"); } @Test @@ -362,7 +364,8 @@ public void read(ReadRequest request, StreamObserver responseObser responseObserver.onCompleted(); } }); - assertThat(new String(downloadBlob(client, digest), UTF_8)).isEqualTo("abcdefg"); + assertThat(new String(downloadBlob(remoteActionExecutionContext, client, digest), UTF_8)) + .isEqualTo("abcdefg"); } @Test @@ -381,7 +384,12 @@ public void testDownloadAllResults() throws Exception { result.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest); result.addOutputFilesBuilder().setPath("b/empty").setDigest(emptyDigest); result.addOutputFilesBuilder().setPath("a/bar").setDigest(barDigest).setIsExecutable(true); - remoteCache.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {}); + remoteCache.download( + remoteActionExecutionContext, + result.build(), + execRoot, + null, + /* outputFilesLocker= */ () -> {}); assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/foo"))).isEqualTo(fooDigest); assertThat(DIGEST_UTIL.compute(execRoot.getRelative("b/empty"))).isEqualTo(emptyDigest); assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/bar"))).isEqualTo(barDigest); @@ -417,7 +425,12 @@ public void testDownloadDirectory() throws Exception { ActionResult.Builder result = ActionResult.newBuilder(); result.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest); result.addOutputDirectoriesBuilder().setPath("a/bar").setTreeDigest(barTreeDigest); - remoteCache.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {}); + remoteCache.download( + remoteActionExecutionContext, + result.build(), + execRoot, + null, + /* outputFilesLocker= */ () -> {}); assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/foo"))).isEqualTo(fooDigest); assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/bar/qux"))).isEqualTo(quxDigest); @@ -438,7 +451,12 @@ public void testDownloadDirectoryEmpty() throws Exception { ActionResult.Builder result = ActionResult.newBuilder(); result.addOutputDirectoriesBuilder().setPath("a/bar").setTreeDigest(barTreeDigest); - remoteCache.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {}); + remoteCache.download( + remoteActionExecutionContext, + result.build(), + execRoot, + null, + /* outputFilesLocker= */ () -> {}); assertThat(execRoot.getRelative("a/bar").isDirectory()).isTrue(); } @@ -480,7 +498,12 @@ public void testDownloadDirectoryNested() throws Exception { ActionResult.Builder result = ActionResult.newBuilder(); result.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest); result.addOutputDirectoriesBuilder().setPath("a/bar").setTreeDigest(barTreeDigest); - remoteCache.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {}); + remoteCache.download( + remoteActionExecutionContext, + result.build(), + execRoot, + null, + /* outputFilesLocker= */ () -> {}); assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/foo"))).isEqualTo(fooDigest); assertThat(DIGEST_UTIL.compute(execRoot.getRelative("a/bar/wobble/qux"))).isEqualTo(quxDigest); @@ -1099,7 +1122,8 @@ public void read(ReadRequest request, StreamObserver responseObser } } }); - assertThat(new String(downloadBlob(client, digest), UTF_8)).isEqualTo("abcdefg"); + assertThat(new String(downloadBlob(remoteActionExecutionContext, client, digest), UTF_8)) + .isEqualTo("abcdefg"); Mockito.verify(mockBackoff, Mockito.never()).nextDelayMillis(any(Exception.class)); } @@ -1123,7 +1147,9 @@ public void read(ReadRequest request, StreamObserver responseObser responseObserver.onError(Status.DEADLINE_EXCEEDED.asException()); } }); - IOException e = assertThrows(IOException.class, () -> downloadBlob(client, digest)); + IOException e = + assertThrows( + IOException.class, () -> downloadBlob(remoteActionExecutionContext, client, digest)); Status st = Status.fromThrowable(e); assertThat(st.getCode()).isEqualTo(Status.Code.DEADLINE_EXCEEDED); Mockito.verify(mockBackoff, Mockito.times(1)).nextDelayMillis(any(Exception.class)); @@ -1144,7 +1170,9 @@ public void read(ReadRequest request, StreamObserver responseObser responseObserver.onCompleted(); } }); - IOException e = assertThrows(IOException.class, () -> downloadBlob(client, digest)); + IOException e = + assertThrows( + IOException.class, () -> downloadBlob(remoteActionExecutionContext, client, digest)); assertThat(e).hasMessageThat().contains(digest.getHash()); assertThat(e).hasMessageThat().contains(DIGEST_UTIL.computeAsUtf8("bar").getHash()); } @@ -1168,7 +1196,8 @@ public void read(ReadRequest request, StreamObserver responseObser } }); - assertThat(downloadBlob(client, digest)).isEqualTo(downloadContents.toByteArray()); + assertThat(downloadBlob(remoteActionExecutionContext, client, digest)) + .isEqualTo(downloadContents.toByteArray()); } @Test diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java index 8810319a3fe280..c4dd363c40f4ae 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java @@ -20,7 +20,6 @@ import static org.mockito.Mockito.when; import build.bazel.remote.execution.v2.Digest; -import build.bazel.remote.execution.v2.RequestMetadata; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; @@ -94,7 +93,7 @@ public void testFetching() throws Exception { MetadataProvider metadataProvider = new StaticMetadataProvider(metadata); RemoteCache remoteCache = newCache(options, digestUtil, cacheEntries); RemoteActionInputFetcher actionInputFetcher = - new RemoteActionInputFetcher(remoteCache, execRoot, RequestMetadata.getDefaultInstance()); + new RemoteActionInputFetcher("none", "none", remoteCache, execRoot); // act actionInputFetcher.prefetchFiles(metadata.keySet(), metadataProvider); @@ -117,7 +116,7 @@ public void testStagingVirtualActionInput() throws Exception { MetadataProvider metadataProvider = new StaticMetadataProvider(new HashMap<>()); RemoteCache remoteCache = newCache(options, digestUtil, new HashMap<>()); RemoteActionInputFetcher actionInputFetcher = - new RemoteActionInputFetcher(remoteCache, execRoot, RequestMetadata.getDefaultInstance()); + new RemoteActionInputFetcher("none", "none", remoteCache, execRoot); VirtualActionInput a = ActionsTestUtil.createVirtualActionInput("file1", "hello world"); // act @@ -137,7 +136,7 @@ public void testStagingEmptyVirtualActionInput() throws Exception { MetadataProvider metadataProvider = new StaticMetadataProvider(new HashMap<>()); RemoteCache remoteCache = newCache(options, digestUtil, new HashMap<>()); RemoteActionInputFetcher actionInputFetcher = - new RemoteActionInputFetcher(remoteCache, execRoot, RequestMetadata.getDefaultInstance()); + new RemoteActionInputFetcher("none", "none", remoteCache, execRoot); // act actionInputFetcher.prefetchFiles( @@ -159,7 +158,7 @@ public void testFileNotFound() throws Exception { MetadataProvider metadataProvider = new StaticMetadataProvider(metadata); RemoteCache remoteCache = newCache(options, digestUtil, new HashMap<>()); RemoteActionInputFetcher actionInputFetcher = - new RemoteActionInputFetcher(remoteCache, execRoot, RequestMetadata.getDefaultInstance()); + new RemoteActionInputFetcher("none", "none", remoteCache, execRoot); // act assertThrows( @@ -183,7 +182,7 @@ public void testIgnoreNoneRemoteFiles() throws Exception { MetadataProvider metadataProvider = new StaticMetadataProvider(ImmutableMap.of(a, f)); RemoteCache remoteCache = newCache(options, digestUtil, new HashMap<>()); RemoteActionInputFetcher actionInputFetcher = - new RemoteActionInputFetcher(remoteCache, execRoot, RequestMetadata.getDefaultInstance()); + new RemoteActionInputFetcher("none", "none", remoteCache, execRoot); // act actionInputFetcher.prefetchFiles(ImmutableList.of(a), metadataProvider); @@ -201,7 +200,7 @@ public void testDownloadFile() throws Exception { Artifact a1 = createRemoteArtifact("file1", "hello world", metadata, cacheEntries); RemoteCache remoteCache = newCache(options, digestUtil, cacheEntries); RemoteActionInputFetcher actionInputFetcher = - new RemoteActionInputFetcher(remoteCache, execRoot, RequestMetadata.getDefaultInstance()); + new RemoteActionInputFetcher("none", "none", remoteCache, execRoot); // act actionInputFetcher.downloadFile(a1.getPath(), metadata.get(a1)); @@ -222,11 +221,11 @@ public void testDownloadFile_onInterrupt_deletePartialDownloadedFile() throws Ex Map cacheEntries = new HashMap<>(); Artifact a1 = createRemoteArtifact("file1", "hello world", metadata, cacheEntries); RemoteCache remoteCache = mock(RemoteCache.class); - when(remoteCache.downloadFile(any(), any())) + when(remoteCache.downloadFile(any(), any(), any())) .thenAnswer( invocation -> { - Path path = invocation.getArgument(0); - Digest digest = invocation.getArgument(1); + Path path = invocation.getArgument(1); + Digest digest = invocation.getArgument(2); ByteString content = cacheEntries.get(digest); if (content == null) { return Futures.immediateFailedFuture(new IOException("Not found")); @@ -238,7 +237,7 @@ public void testDownloadFile_onInterrupt_deletePartialDownloadedFile() throws Ex .create(); // A future that never complete so we can interrupt later }); RemoteActionInputFetcher actionInputFetcher = - new RemoteActionInputFetcher(remoteCache, execRoot, RequestMetadata.getDefaultInstance()); + new RemoteActionInputFetcher("none", "none", remoteCache, execRoot); AtomicBoolean interrupted = new AtomicBoolean(false); Thread t = diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java index abe594dc914cb4..92e4ca217edcd3 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTests.java @@ -548,7 +548,7 @@ public void downloadRelativeFileSymlink() throws Exception { ActionResult.Builder result = ActionResult.newBuilder(); result.addOutputFileSymlinksBuilder().setPath("a/b/link").setTarget("../../foo"); // Doesn't check for dangling links, hence download succeeds. - cache.download(result.build(), execRoot, null, outputFilesLocker); + cache.download(remoteActionExecutionContext, result.build(), execRoot, null, outputFilesLocker); Path path = execRoot.getRelative("a/b/link"); assertThat(path.isSymbolicLink()).isTrue(); assertThat(path.readSymbolicLink()).isEqualTo(PathFragment.create("../../foo")); @@ -561,7 +561,7 @@ public void downloadRelativeDirectorySymlink() throws Exception { ActionResult.Builder result = ActionResult.newBuilder(); result.addOutputDirectorySymlinksBuilder().setPath("a/b/link").setTarget("foo"); // Doesn't check for dangling links, hence download succeeds. - cache.download(result.build(), execRoot, null, outputFilesLocker); + cache.download(remoteActionExecutionContext, result.build(), execRoot, null, outputFilesLocker); Path path = execRoot.getRelative("a/b/link"); assertThat(path.isSymbolicLink()).isTrue(); assertThat(path.readSymbolicLink()).isEqualTo(PathFragment.create("foo")); @@ -581,7 +581,7 @@ public void downloadRelativeSymlinkInDirectory() throws Exception { ActionResult.Builder result = ActionResult.newBuilder(); result.addOutputDirectoriesBuilder().setPath("dir").setTreeDigest(treeDigest); // Doesn't check for dangling links, hence download succeeds. - cache.download(result.build(), execRoot, null, outputFilesLocker); + cache.download(remoteActionExecutionContext, result.build(), execRoot, null, outputFilesLocker); Path path = execRoot.getRelative("dir/link"); assertThat(path.isSymbolicLink()).isTrue(); assertThat(path.readSymbolicLink()).isEqualTo(PathFragment.create("../foo")); @@ -596,7 +596,13 @@ public void downloadAbsoluteDirectorySymlinkError() throws Exception { IOException expected = assertThrows( IOException.class, - () -> cache.download(result.build(), execRoot, null, outputFilesLocker)); + () -> + cache.download( + remoteActionExecutionContext, + result.build(), + execRoot, + null, + outputFilesLocker)); assertThat(expected).hasMessageThat().contains("/abs/link"); assertThat(expected).hasMessageThat().contains("absolute path"); verify(outputFilesLocker).lock(); @@ -610,7 +616,13 @@ public void downloadAbsoluteFileSymlinkError() throws Exception { IOException expected = assertThrows( IOException.class, - () -> cache.download(result.build(), execRoot, null, outputFilesLocker)); + () -> + cache.download( + remoteActionExecutionContext, + result.build(), + execRoot, + null, + outputFilesLocker)); assertThat(expected).hasMessageThat().contains("/abs/link"); assertThat(expected).hasMessageThat().contains("absolute path"); verify(outputFilesLocker).lock(); @@ -631,7 +643,13 @@ public void downloadAbsoluteSymlinkInDirectoryError() throws Exception { IOException expected = assertThrows( IOException.class, - () -> cache.download(result.build(), execRoot, null, outputFilesLocker)); + () -> + cache.download( + remoteActionExecutionContext, + result.build(), + execRoot, + null, + outputFilesLocker)); assertThat(expected.getSuppressed()).isEmpty(); assertThat(expected).hasMessageThat().contains("dir/link"); assertThat(expected).hasMessageThat().contains("/foo"); @@ -655,7 +673,9 @@ public void downloadFailureMaintainsDirectories() throws Exception { result.addOutputFiles(OutputFile.newBuilder().setPath("otherfile").setDigest(otherFileDigest)); assertThrows( BulkTransferException.class, - () -> cache.download(result.build(), execRoot, null, outputFilesLocker)); + () -> + cache.download( + remoteActionExecutionContext, result.build(), execRoot, null, outputFilesLocker)); assertThat(cache.getNumFailedDownloads()).isEqualTo(1); assertThat(execRoot.getRelative("outputdir").exists()).isTrue(); assertThat(execRoot.getRelative("outputdir/outputfile").exists()).isFalse(); @@ -689,7 +709,11 @@ public void onErrorWaitForRemainingDownloadsToComplete() throws Exception { BulkTransferException.class, () -> cache.download( - result, execRoot, new FileOutErr(stdout, stderr), outputFilesLocker)); + remoteActionExecutionContext, + result, + execRoot, + new FileOutErr(stdout, stderr), + outputFilesLocker)); assertThat(downloadException.getSuppressed()).hasLength(1); assertThat(cache.getNumSuccessfulDownloads()).isEqualTo(2); assertThat(cache.getNumFailedDownloads()).isEqualTo(1); @@ -721,7 +745,11 @@ public void downloadWithMultipleErrorsAddsThemAsSuppressed() throws Exception { BulkTransferException.class, () -> cache.download( - result, execRoot, new FileOutErr(stdout, stderr), outputFilesLocker)); + remoteActionExecutionContext, + result, + execRoot, + new FileOutErr(stdout, stderr), + outputFilesLocker)); assertThat(e.getSuppressed()).hasLength(2); assertThat(e.getSuppressed()[0]).isInstanceOf(IOException.class); @@ -753,7 +781,11 @@ public void downloadWithDuplicateIOErrorsDoesNotSuppress() throws Exception { BulkTransferException.class, () -> cache.download( - result, execRoot, new FileOutErr(stdout, stderr), outputFilesLocker)); + remoteActionExecutionContext, + result, + execRoot, + new FileOutErr(stdout, stderr), + outputFilesLocker)); for (Throwable t : downloadException.getSuppressed()) { assertThat(t).isInstanceOf(IOException.class); @@ -785,7 +817,11 @@ public void downloadWithDuplicateInterruptionsDoesNotSuppress() throws Exception InterruptedException.class, () -> cache.download( - result, execRoot, new FileOutErr(stdout, stderr), outputFilesLocker)); + remoteActionExecutionContext, + result, + execRoot, + new FileOutErr(stdout, stderr), + outputFilesLocker)); assertThat(e.getSuppressed()).isEmpty(); assertThat(Throwables.getRootCause(e)).hasMessageThat().isEqualTo("reused interruption"); @@ -814,7 +850,7 @@ public void testDownloadWithStdoutStderrOnSuccess() throws Exception { .setStderrDigest(digestStderr) .build(); - cache.download(result, execRoot, spyOutErr, outputFilesLocker); + cache.download(remoteActionExecutionContext, result, execRoot, spyOutErr, outputFilesLocker); verify(spyOutErr, Mockito.times(2)).childOutErr(); verify(spyChildOutErr).clearOut(); @@ -857,7 +893,9 @@ public void testDownloadWithStdoutStderrOnFailure() throws Exception { .build(); assertThrows( BulkTransferException.class, - () -> cache.download(result, execRoot, spyOutErr, outputFilesLocker)); + () -> + cache.download( + remoteActionExecutionContext, result, execRoot, spyOutErr, outputFilesLocker)); verify(spyOutErr, Mockito.times(2)).childOutErr(); verify(spyChildOutErr).clearOut(); verify(spyChildOutErr).clearErr(); @@ -894,7 +932,8 @@ public void testDownloadClashes() throws Exception { // act - remoteCache.download(r, execRoot, new FileOutErr(), outputFilesLocker); + remoteCache.download( + remoteActionExecutionContext, r, execRoot, new FileOutErr(), outputFilesLocker); // assert @@ -928,6 +967,7 @@ public void testDownloadMinimalFiles() throws Exception { // act InMemoryOutput inMemoryOutput = remoteCache.downloadMinimal( + remoteActionExecutionContext, "action-id", r, ImmutableList.of(a1, a2), @@ -990,6 +1030,7 @@ public void testDownloadMinimalDirectory() throws Exception { // act InMemoryOutput inMemoryOutput = remoteCache.downloadMinimal( + remoteActionExecutionContext, "action-id", r, ImmutableList.of(dir), @@ -1065,6 +1106,7 @@ public void testDownloadMinimalDirectoryFails() throws Exception { BulkTransferException.class, () -> remoteCache.downloadMinimal( + remoteActionExecutionContext, "action-id", r, ImmutableList.of(dir), @@ -1099,6 +1141,7 @@ public void testDownloadMinimalWithStdoutStderr() throws Exception { // act InMemoryOutput inMemoryOutput = remoteCache.downloadMinimal( + remoteActionExecutionContext, "action-id", r, ImmutableList.of(), @@ -1142,6 +1185,7 @@ public void testDownloadMinimalWithInMemoryOutput() throws Exception { // act InMemoryOutput inMemoryOutput = remoteCache.downloadMinimal( + remoteActionExecutionContext, "action-id", r, ImmutableList.of(a1, a2), @@ -1182,6 +1226,7 @@ public void testDownloadMinimalWithMissingInMemoryOutput() throws Exception { // act InMemoryOutput inMemoryOutput = remoteCache.downloadMinimal( + remoteActionExecutionContext, "action-id", r, ImmutableList.of(a1), @@ -1207,10 +1252,14 @@ public void testDownloadEmptyBlobAndFile() throws Exception { Digest emptyDigest = digestUtil.compute(new byte[0]); // act and assert - assertThat(Utils.getFromFuture(remoteCache.downloadBlob(emptyDigest))).isEmpty(); + assertThat( + Utils.getFromFuture( + remoteCache.downloadBlob(remoteActionExecutionContext, emptyDigest))) + .isEmpty(); try (OutputStream out = file.getOutputStream()) { - Utils.getFromFuture(remoteCache.downloadFile(file, emptyDigest)); + Utils.getFromFuture( + remoteCache.downloadFile(remoteActionExecutionContext, file, emptyDigest)); } assertThat(file.exists()).isTrue(); assertThat(file.getFileSize()).isEqualTo(0); @@ -1235,7 +1284,7 @@ public void testDownloadFileWithSymlinkTemplate() throws Exception { RemoteCache remoteCache = new InMemoryRemoteCache(cas, options, digestUtil); // act - Utils.getFromFuture(remoteCache.downloadFile(file, helloDigest)); + Utils.getFromFuture(remoteCache.downloadFile(remoteActionExecutionContext, file, helloDigest)); // assert assertThat(file.isSymbolicLink()).isTrue(); @@ -1276,7 +1325,12 @@ public void testDownloadDirectory() throws Exception { // act RemoteCache remoteCache = newRemoteCache(cas); - remoteCache.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {}); + remoteCache.download( + remoteActionExecutionContext, + result.build(), + execRoot, + null, + /* outputFilesLocker= */ () -> {}); // assert assertThat(digestUtil.compute(execRoot.getRelative("a/foo"))).isEqualTo(fooDigest); @@ -1300,7 +1354,12 @@ public void testDownloadEmptyDirectory() throws Exception { // act RemoteCache remoteCache = newRemoteCache(map); - remoteCache.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {}); + remoteCache.download( + remoteActionExecutionContext, + result.build(), + execRoot, + null, + /* outputFilesLocker= */ () -> {}); // assert assertThat(execRoot.getRelative("a/bar").isDirectory()).isTrue(); @@ -1344,7 +1403,12 @@ public void testDownloadNestedDirectory() throws Exception { // act RemoteCache remoteCache = newRemoteCache(map); - remoteCache.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {}); + remoteCache.download( + remoteActionExecutionContext, + result.build(), + execRoot, + null, + /* outputFilesLocker= */ () -> {}); // assert assertThat(digestUtil.compute(execRoot.getRelative("a/foo"))).isEqualTo(fooDigest); @@ -1394,7 +1458,12 @@ public void testDownloadDirectoryWithSameHash() throws Exception { // act RemoteCache remoteCache = newRemoteCache(map); - remoteCache.download(result.build(), execRoot, null, /* outputFilesLocker= */ () -> {}); + remoteCache.download( + remoteActionExecutionContext, + result.build(), + execRoot, + null, + /* outputFilesLocker= */ () -> {}); // assert assertThat(digestUtil.compute(execRoot.getRelative("a/bar/foo/file"))).isEqualTo(fileDigest); diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorTest.java index 96c87a6a163581..7fde5bcd734049 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteRepositoryRemoteExecutorTest.java @@ -63,7 +63,6 @@ public void setup() { DIGEST_UTIL, "none", "none", - "repo", /* remoteInstanceName= */ "foo", /* acceptCached= */ true); } diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java index 47e7dd132ab849..d4104756e21d6d 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnCacheTest.java @@ -261,13 +261,13 @@ public Void answer(InvocationOnMock invocation) { } }) .when(remoteCache) - .download(eq(actionResult), eq(execRoot), eq(outErr), any()); + .download(any(), eq(actionResult), eq(execRoot), eq(outErr), any()); CacheHandle entry = cache.lookup(simpleSpawn, simplePolicy); assertThat(entry.hasResult()).isTrue(); SpawnResult result = entry.getResult(); // All other methods on RemoteActionCache have side effects, so we verify all of them. - verify(remoteCache).download(eq(actionResult), eq(execRoot), eq(outErr), any()); + verify(remoteCache).download(any(), eq(actionResult), eq(execRoot), eq(outErr), any()); verify(remoteCache, never()) .upload( any(RemoteActionExecutionContext.class), @@ -621,7 +621,7 @@ public ActionResult answer(InvocationOnMock invocation) { }); doThrow(new CacheNotFoundException(digest)) .when(remoteCache) - .download(eq(actionResult), eq(execRoot), eq(outErr), any()); + .download(any(), eq(actionResult), eq(execRoot), eq(outErr), any()); CacheHandle entry = cache.lookup(simpleSpawn, simplePolicy); assertThat(entry.hasResult()).isFalse(); @@ -700,7 +700,7 @@ public void testDownloadMinimal() throws Exception { assertThat(cacheHandle.hasResult()).isTrue(); assertThat(cacheHandle.getResult().exitCode()).isEqualTo(0); verify(remoteCache) - .downloadMinimal(any(), any(), anyCollection(), any(), any(), any(), any(), any()); + .downloadMinimal(any(), any(), any(), anyCollection(), any(), any(), any(), any(), any()); } @Test @@ -717,7 +717,7 @@ public void testDownloadMinimalIoError() throws Exception { any(RemoteActionExecutionContext.class), any(), /* inlineOutErr= */ eq(false))) .thenReturn(success); when(remoteCache.downloadMinimal( - any(), any(), anyCollection(), any(), any(), any(), any(), any())) + any(), any(), any(), anyCollection(), any(), any(), any(), any(), any())) .thenThrow(downloadFailure); // act @@ -726,7 +726,7 @@ public void testDownloadMinimalIoError() throws Exception { // assert assertThat(cacheHandle.hasResult()).isFalse(); verify(remoteCache) - .downloadMinimal(any(), any(), anyCollection(), any(), any(), any(), any(), any()); + .downloadMinimal(any(), any(), any(), anyCollection(), any(), any(), any(), any(), any()); assertThat(eventHandler.getEvents().size()).isEqualTo(1); Event evt = eventHandler.getEvents().get(0); assertThat(evt.getKind()).isEqualTo(EventKind.WARNING); diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java index 3ca3fd7c68d321..7cb8328e74fb56 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerTest.java @@ -375,7 +375,13 @@ public void treatFailedCachedActionAsCacheMiss_local() throws Exception { any(), /* uploadLocalResults= */ eq(true)); verify(cache).upload(any(), any(), any(), any(), any(), any(), any()); - verify(cache, never()).download(any(ActionResult.class), any(Path.class), eq(outErr), any()); + verify(cache, never()) + .download( + any(RemoteActionExecutionContext.class), + any(ActionResult.class), + any(Path.class), + eq(outErr), + any()); } @Test @@ -558,7 +564,8 @@ public void testHumanReadableServerLogsSavedForFailingAction() throws Exception .build()); SettableFuture completed = SettableFuture.create(); completed.set(null); - when(cache.downloadFile(eq(logPath), eq(logDigest))).thenReturn(completed); + when(cache.downloadFile(any(RemoteActionExecutionContext.class), eq(logPath), eq(logDigest))) + .thenReturn(completed); Spawn spawn = newSimpleSpawn(); SpawnExecutionContext policy = getSpawnContext(spawn); @@ -567,7 +574,7 @@ public void testHumanReadableServerLogsSavedForFailingAction() throws Exception assertThat(res.status()).isEqualTo(Status.NON_ZERO_EXIT); verify(executor).executeRemotely(any(ExecuteRequest.class), any(OperationObserver.class)); - verify(cache).downloadFile(eq(logPath), eq(logDigest)); + verify(cache).downloadFile(any(RemoteActionExecutionContext.class), eq(logPath), eq(logDigest)); } @Test @@ -587,7 +594,8 @@ public void testHumanReadableServerLogsSavedForFailingActionWithStatus() throws .thenThrow(new IOException(new ExecutionStatusException(resp.getStatus(), resp))); SettableFuture completed = SettableFuture.create(); completed.set(null); - when(cache.downloadFile(eq(logPath), eq(logDigest))).thenReturn(completed); + when(cache.downloadFile(any(RemoteActionExecutionContext.class), eq(logPath), eq(logDigest))) + .thenReturn(completed); Spawn spawn = newSimpleSpawn(); SpawnExecutionContext policy = getSpawnContext(spawn); @@ -596,7 +604,7 @@ public void testHumanReadableServerLogsSavedForFailingActionWithStatus() throws assertThat(res.status()).isEqualTo(Status.TIMEOUT); verify(executor).executeRemotely(any(ExecuteRequest.class), any(OperationObserver.class)); - verify(cache).downloadFile(eq(logPath), eq(logDigest)); + verify(cache).downloadFile(any(RemoteActionExecutionContext.class), eq(logPath), eq(logDigest)); } @Test @@ -618,8 +626,15 @@ public void testNonHumanReadableServerLogsNotSaved() throws Exception { assertThat(res.status()).isEqualTo(Status.NON_ZERO_EXIT); verify(executor).executeRemotely(any(ExecuteRequest.class), any(OperationObserver.class)); - verify(cache).download(eq(result), eq(execRoot), any(FileOutErr.class), any()); - verify(cache, never()).downloadFile(any(Path.class), any(Digest.class)); + verify(cache) + .download( + any(RemoteActionExecutionContext.class), + eq(result), + eq(execRoot), + any(FileOutErr.class), + any()); + verify(cache, never()) + .downloadFile(any(RemoteActionExecutionContext.class), any(Path.class), any(Digest.class)); } @Test @@ -644,8 +659,15 @@ public void testServerLogsNotSavedForSuccessfulAction() throws Exception { assertThat(res.status()).isEqualTo(Status.SUCCESS); verify(executor).executeRemotely(any(ExecuteRequest.class), any(OperationObserver.class)); - verify(cache).download(eq(result), eq(execRoot), any(FileOutErr.class), any()); - verify(cache, never()).downloadFile(any(Path.class), any(Digest.class)); + verify(cache) + .download( + any(RemoteActionExecutionContext.class), + eq(result), + eq(execRoot), + any(FileOutErr.class), + any()); + verify(cache, never()) + .downloadFile(any(RemoteActionExecutionContext.class), any(Path.class), any(Digest.class)); } @Test @@ -664,12 +686,24 @@ public void cacheDownloadFailureTriggersRemoteExecution() throws Exception { new BulkTransferException(new CacheNotFoundException(Digest.getDefaultInstance())); doThrow(downloadFailure) .when(cache) - .download(eq(cachedResult), any(Path.class), any(FileOutErr.class), any()); + .download( + any(RemoteActionExecutionContext.class), + eq(cachedResult), + any(Path.class), + any(FileOutErr.class), + any()); ActionResult execResult = ActionResult.newBuilder().setExitCode(31).build(); ExecuteResponse succeeded = ExecuteResponse.newBuilder().setResult(execResult).build(); when(executor.executeRemotely(any(ExecuteRequest.class), any(OperationObserver.class))) .thenReturn(succeeded); - doNothing().when(cache).download(eq(execResult), any(Path.class), any(FileOutErr.class), any()); + doNothing() + .when(cache) + .download( + any(RemoteActionExecutionContext.class), + eq(execResult), + any(Path.class), + any(FileOutErr.class), + any()); Spawn spawn = newSimpleSpawn(); @@ -706,8 +740,20 @@ public void resultsDownloadFailureTriggersRemoteExecutionWithSkipCacheLookup() t new BulkTransferException(new CacheNotFoundException(Digest.getDefaultInstance())); doThrow(downloadFailure) .when(cache) - .download(eq(cachedResult), any(Path.class), any(FileOutErr.class), any()); - doNothing().when(cache).download(eq(execResult), any(Path.class), any(FileOutErr.class), any()); + .download( + any(RemoteActionExecutionContext.class), + eq(cachedResult), + any(Path.class), + any(FileOutErr.class), + any()); + doNothing() + .when(cache) + .download( + any(RemoteActionExecutionContext.class), + eq(execResult), + any(Path.class), + any(FileOutErr.class), + any()); Spawn spawn = newSimpleSpawn(); @@ -760,7 +806,13 @@ public void testRemoteExecutionTimeout() throws Exception { assertThat(res.status()).isEqualTo(Status.TIMEOUT); verify(executor).executeRemotely(any(ExecuteRequest.class), any(OperationObserver.class)); - verify(cache).download(eq(cachedResult), eq(execRoot), any(FileOutErr.class), any()); + verify(cache) + .download( + any(RemoteActionExecutionContext.class), + eq(cachedResult), + eq(execRoot), + any(FileOutErr.class), + any()); } @Test @@ -797,7 +849,13 @@ public void testRemoteExecutionTimeoutDoesNotTriggerFallback() throws Exception assertThat(res.status()).isEqualTo(Status.TIMEOUT); verify(executor).executeRemotely(any(ExecuteRequest.class), any(OperationObserver.class)); - verify(cache).download(eq(cachedResult), eq(execRoot), any(FileOutErr.class), any()); + verify(cache) + .download( + any(RemoteActionExecutionContext.class), + eq(cachedResult), + eq(execRoot), + any(FileOutErr.class), + any()); verify(localRunner, never()).exec(eq(spawn), eq(policy)); } @@ -829,7 +887,13 @@ public void testRemoteExecutionCommandFailureDoesNotTriggerFallback() throws Exc assertThat(res.exitCode()).isEqualTo(33); verify(executor).executeRemotely(any(ExecuteRequest.class), any(OperationObserver.class)); - verify(cache, never()).download(eq(cachedResult), eq(execRoot), any(FileOutErr.class), any()); + verify(cache, never()) + .download( + any(RemoteActionExecutionContext.class), + eq(cachedResult), + eq(execRoot), + any(FileOutErr.class), + any()); verify(localRunner, never()).exec(eq(spawn), eq(policy)); } @@ -968,8 +1032,22 @@ public void testDownloadMinimalOnCacheHit() throws Exception { // assert verify(cache) .downloadMinimal( - any(), eq(succeededAction), anyCollection(), any(), any(), any(), any(), any()); - verify(cache, never()).download(any(ActionResult.class), any(Path.class), eq(outErr), any()); + any(RemoteActionExecutionContext.class), + any(), + eq(succeededAction), + anyCollection(), + any(), + any(), + any(), + any(), + any()); + verify(cache, never()) + .download( + any(RemoteActionExecutionContext.class), + any(ActionResult.class), + any(Path.class), + eq(outErr), + any()); } @Test @@ -996,8 +1074,22 @@ public void testDownloadMinimalOnCacheMiss() throws Exception { verify(executor).executeRemotely(any(), any(OperationObserver.class)); verify(cache) .downloadMinimal( - any(), eq(succeededAction), anyCollection(), any(), any(), any(), any(), any()); - verify(cache, never()).download(any(ActionResult.class), any(Path.class), eq(outErr), any()); + any(RemoteActionExecutionContext.class), + any(), + eq(succeededAction), + anyCollection(), + any(), + any(), + any(), + any(), + any()); + verify(cache, never()) + .download( + any(RemoteActionExecutionContext.class), + any(ActionResult.class), + any(Path.class), + eq(outErr), + any()); } @Test @@ -1012,7 +1104,16 @@ public void testDownloadMinimalIoError() throws Exception { /* inlineOutErr= */ eq(false))) .thenReturn(succeededAction); IOException downloadFailure = new IOException("downloadMinimal failed"); - when(cache.downloadMinimal(any(), any(), anyCollection(), any(), any(), any(), any(), any())) + when(cache.downloadMinimal( + any(RemoteActionExecutionContext.class), + any(), + any(), + anyCollection(), + any(), + any(), + any(), + any(), + any())) .thenThrow(downloadFailure); RemoteSpawnRunner runner = newSpawnRunner(); @@ -1027,8 +1128,22 @@ public void testDownloadMinimalIoError() throws Exception { // assert verify(cache) .downloadMinimal( - any(), eq(succeededAction), anyCollection(), any(), any(), any(), any(), any()); - verify(cache, never()).download(any(ActionResult.class), any(Path.class), eq(outErr), any()); + any(RemoteActionExecutionContext.class), + any(), + eq(succeededAction), + anyCollection(), + any(), + any(), + any(), + any(), + any()); + verify(cache, never()) + .download( + any(RemoteActionExecutionContext.class), + any(ActionResult.class), + any(Path.class), + eq(outErr), + any()); } @Test @@ -1059,10 +1174,10 @@ public void testDownloadTopLevel() throws Exception { assertThat(result.status()).isEqualTo(Status.SUCCESS); // assert - verify(cache).download(eq(succeededAction), any(Path.class), eq(outErr), any()); + verify(cache).download(any(), eq(succeededAction), any(Path.class), eq(outErr), any()); verify(cache, never()) .downloadMinimal( - any(), eq(succeededAction), anyCollection(), any(), any(), any(), any(), any()); + any(), any(), eq(succeededAction), anyCollection(), any(), any(), any(), any(), any()); } @Test diff --git a/src/test/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloaderTest.java index c9dfcc04dab9e5..41fdd7f7fa54fd 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloaderTest.java @@ -124,10 +124,11 @@ private GrpcRemoteDownloader newDownloader(RemoteCacheClient cacheClient) throws new ReferenceCountedChannel( InProcessChannelBuilder.forName(fakeServerName).directExecutor().build()); return new GrpcRemoteDownloader( + "none", + "none", channel.retain(), Optional.empty(), retrier, - withEmptyMetadata, cacheClient, remoteOptions); } diff --git a/src/test/java/com/google/devtools/build/lib/remote/http/BUILD b/src/test/java/com/google/devtools/build/lib/remote/http/BUILD index f1ac992cebb0a1..8c7a9a993299c9 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/http/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/http/BUILD @@ -20,6 +20,7 @@ java_test( ], test_class = "com.google.devtools.build.lib.AllTests", deps = [ + "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/http", "//src/main/java/com/google/devtools/build/lib/remote/util", "//src/main/java/com/google/devtools/build/lib/vfs", diff --git a/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java index f9365bff3b419e..7515f32834fe12 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java @@ -32,7 +32,11 @@ import com.google.common.base.Charsets; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.devtools.build.lib.remote.common.NetworkTime; +import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; +import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl; import com.google.devtools.build.lib.remote.util.DigestUtil; +import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import com.google.devtools.build.lib.vfs.DigestHashFunction; import com.google.devtools.build.remote.worker.http.HttpCacheServerHandler; import com.google.protobuf.ByteString; @@ -87,6 +91,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.IntFunction; import javax.annotation.Nullable; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -101,6 +106,8 @@ public class HttpCacheClientTest { private static final DigestUtil DIGEST_UTIL = new DigestUtil(DigestHashFunction.SHA256); private static final Digest DIGEST = DIGEST_UTIL.computeAsUtf8("File Contents"); + private RemoteActionExecutionContext remoteActionExecutionContext; + private static ServerChannel createServer( Class serverChannelClass, IntFunction newEventLoopGroup, @@ -291,6 +298,15 @@ private HttpCacheClient createHttpBlobStore( serverChannel, timeoutSeconds, /* remoteVerifyDownloads= */ true, creds); } + @Before + public void setUp() throws Exception { + remoteActionExecutionContext = + new RemoteActionExecutionContextImpl( + TracingMetadataUtils.buildMetadata( + "none", "none", Digest.getDefaultInstance().getHash()), + new NetworkTime()); + } + @Test public void testUploadAtMostOnce() throws Exception { ServerChannel server = null; @@ -330,7 +346,8 @@ public void connectTimeout() throws Exception { Credentials credentials = newCredentials(); HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials); - getFromFuture(blobStore.downloadBlob(DIGEST, new ByteArrayOutputStream())); + getFromFuture( + blobStore.downloadBlob(remoteActionExecutionContext, DIGEST, new ByteArrayOutputStream())); fail("Exception expected"); } @@ -375,7 +392,9 @@ protected void channelRead0( Credentials credentials = newCredentials(); HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials); - getFromFuture(blobStore.downloadBlob(DIGEST, new ByteArrayOutputStream())); + getFromFuture( + blobStore.downloadBlob( + remoteActionExecutionContext, DIGEST, new ByteArrayOutputStream())); fail("Exception expected"); } finally { testServer.stop(server); @@ -450,7 +469,10 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) try (OutputStream out = new ByteArrayOutputStream()) { IOException e = assertThrows( - IOException.class, () -> getFromFuture(blobStore.downloadBlob(fooDigest, out))); + IOException.class, + () -> + getFromFuture( + blobStore.downloadBlob(remoteActionExecutionContext, fooDigest, out))); assertThat(e).hasMessageThat().contains(fooDigest.getHash()); assertThat(e).hasMessageThat().contains(DIGEST_UTIL.computeAsUtf8("bar").getHash()); } @@ -487,7 +509,7 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) server, /* timeoutSeconds= */ 1, /* remoteVerifyDownloads= */ false, credentials); Digest fooDigest = DIGEST_UTIL.compute("foo".getBytes(Charsets.UTF_8)); try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - getFromFuture(blobStore.downloadBlob(fooDigest, out)); + getFromFuture(blobStore.downloadBlob(remoteActionExecutionContext, fooDigest, out)); assertThat(out.toByteArray()).isEqualTo("bar".getBytes(Charsets.UTF_8)); } } finally { @@ -512,7 +534,7 @@ private void expiredAuthTokensShouldBeRetried_get( Credentials credentials = newCredentials(); HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials); ByteArrayOutputStream out = Mockito.spy(new ByteArrayOutputStream()); - getFromFuture(blobStore.downloadBlob(DIGEST, out)); + getFromFuture(blobStore.downloadBlob(remoteActionExecutionContext, DIGEST, out)); assertThat(out.toString(Charsets.US_ASCII.name())).isEqualTo("File Contents"); verify(credentials, times(1)).refresh(); verify(credentials, times(2)).getRequestMetadata(any(URI.class)); @@ -568,7 +590,9 @@ private void errorCodeThatShouldNotBeRetried_get( Credentials credentials = newCredentials(); HttpCacheClient blobStore = createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials); - getFromFuture(blobStore.downloadBlob(DIGEST, new ByteArrayOutputStream())); + getFromFuture( + blobStore.downloadBlob( + remoteActionExecutionContext, DIGEST, new ByteArrayOutputStream())); fail("Exception expected."); } catch (Exception e) { assertThat(e).isInstanceOf(HttpException.class); diff --git a/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java b/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java index d932ddf56e5bb8..9c78ef0ffde548 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java +++ b/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java @@ -32,8 +32,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; -/** A {@link RemoteCache} that stores its contents in memory. */ -public class InMemoryCacheClient implements RemoteCacheClient { +/** A {@link RemoteCacheClient} that stores its contents in memory. */ +public final class InMemoryCacheClient implements RemoteCacheClient { private final ConcurrentMap downloadFailures = new ConcurrentHashMap<>(); private final ConcurrentMap ac = new ConcurrentHashMap<>(); @@ -66,7 +66,8 @@ public int getNumFailedDownloads() { } @Override - public ListenableFuture downloadBlob(Digest digest, OutputStream out) { + public ListenableFuture downloadBlob( + RemoteActionExecutionContext context, Digest digest, OutputStream out) { Exception failure = downloadFailures.get(digest); if (failure != null) { numFailures.incrementAndGet(); diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java index 39b4fd272121f1..8bf39e88be4a2f 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java @@ -17,6 +17,7 @@ import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture; import build.bazel.remote.execution.v2.Digest; +import build.bazel.remote.execution.v2.RequestMetadata; import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase; import com.google.bytestream.ByteStreamProto.ReadRequest; import com.google.bytestream.ByteStreamProto.ReadResponse; @@ -25,7 +26,11 @@ import com.google.common.flogger.GoogleLogger; import com.google.devtools.build.lib.remote.Chunker; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; +import com.google.devtools.build.lib.remote.common.NetworkTime; +import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; +import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl; import com.google.devtools.build.lib.remote.util.DigestUtil; +import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import com.google.devtools.build.lib.vfs.FileSystemUtils; import com.google.devtools.build.lib.vfs.Path; import io.grpc.Status; @@ -66,6 +71,9 @@ public ByteStreamServer(OnDiskBlobStoreCache cache, Path workPath, DigestUtil di @Override public void read(ReadRequest request, StreamObserver responseObserver) { + RequestMetadata meta = TracingMetadataUtils.fromCurrentContext(); + RemoteActionExecutionContext context = + new RemoteActionExecutionContextImpl(meta, new NetworkTime()); Digest digest = parseDigestFromResourceName(request.getResourceName()); if (digest == null) { @@ -78,7 +86,8 @@ public void read(ReadRequest request, StreamObserver responseObser try { // This still relies on the blob size to be small enough to fit in memory. // TODO(olaola): refactor to fix this if the need arises. - Chunker c = Chunker.builder().setInput(getFromFuture(cache.downloadBlob(digest))).build(); + Chunker c = + Chunker.builder().setInput(getFromFuture(cache.downloadBlob(context, digest))).build(); while (c.hasNext()) { responseObserver.onNext( ReadResponse.newBuilder().setData(c.next().getData()).build()); diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java index e05e8434021f7e..9b3bba571f5461 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/CasServer.java @@ -26,8 +26,13 @@ import build.bazel.remote.execution.v2.FindMissingBlobsResponse; import build.bazel.remote.execution.v2.GetTreeRequest; import build.bazel.remote.execution.v2.GetTreeResponse; +import build.bazel.remote.execution.v2.RequestMetadata; import com.google.common.flogger.GoogleLogger; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; +import com.google.devtools.build.lib.remote.common.NetworkTime; +import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; +import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl; +import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import com.google.protobuf.InvalidProtocolBufferException; import com.google.rpc.Code; import io.grpc.stub.StreamObserver; @@ -86,6 +91,9 @@ public void batchUpdateBlobs( @Override public void getTree(GetTreeRequest request, StreamObserver responseObserver) { + RequestMetadata meta = TracingMetadataUtils.fromCurrentContext(); + RemoteActionExecutionContext context = + new RemoteActionExecutionContextImpl(meta, new NetworkTime()); // Directories are returned in depth-first order. We store all previously-traversed digests so // identical subtrees having the same digest will only be traversed and returned once. Set seenDigests = new HashSet<>(); @@ -97,7 +105,7 @@ public void getTree(GetTreeRequest request, StreamObserver resp Digest digest = pendingDigests.pop(); byte[] directoryBytes; try { - directoryBytes = getFromFuture(cache.downloadBlob(digest)); + directoryBytes = getFromFuture(cache.downloadBlob(context, digest)); } catch (CacheNotFoundException e) { responseObserver.onError(StatusUtils.notFoundError(digest)); return; diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java index 0257ee281bb236..e693ccc3051b97 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java @@ -52,6 +52,7 @@ import com.google.devtools.build.lib.vfs.Path; import com.google.longrunning.Operation; import com.google.protobuf.Any; +import com.google.protobuf.ExtensionRegistry; import com.google.protobuf.util.Durations; import com.google.rpc.Code; import com.google.rpc.Status; @@ -252,9 +253,15 @@ private ActionResult execute( Action action; ActionKey actionKey = digestUtil.asActionKey(actionDigest); try { - action = Action.parseFrom(getFromFuture(cache.downloadBlob(actionDigest))); - command = Command.parseFrom(getFromFuture(cache.downloadBlob(action.getCommandDigest()))); - cache.downloadTree(action.getInputRootDigest(), execRoot); + action = + Action.parseFrom( + getFromFuture(cache.downloadBlob(context, actionDigest)), + ExtensionRegistry.getEmptyRegistry()); + command = + Command.parseFrom( + getFromFuture(cache.downloadBlob(context, action.getCommandDigest())), + ExtensionRegistry.getEmptyRegistry()); + cache.downloadTree(context, action.getInputRootDigest(), execRoot); } catch (CacheNotFoundException e) { throw StatusUtils.notFoundError(e.getMissingDigest()); } diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java index 829d863d3297a1..804f739089a503 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java @@ -45,17 +45,19 @@ public boolean containsKey(Digest digest) { } @SuppressWarnings("ProtoParseWithRegistry") - public void downloadTree(Digest rootDigest, Path rootLocation) + public void downloadTree( + RemoteActionExecutionContext context, Digest rootDigest, Path rootLocation) throws IOException, InterruptedException { rootLocation.createDirectoryAndParents(); - Directory directory = Directory.parseFrom(Utils.getFromFuture(downloadBlob(rootDigest))); + Directory directory = + Directory.parseFrom(Utils.getFromFuture(downloadBlob(context, rootDigest))); for (FileNode file : directory.getFilesList()) { Path dst = rootLocation.getRelative(file.getName()); - Utils.getFromFuture(downloadFile(dst, file.getDigest())); + Utils.getFromFuture(downloadFile(context, dst, file.getDigest())); dst.setExecutable(file.getIsExecutable()); } for (DirectoryNode child : directory.getDirectoriesList()) { - downloadTree(child.getDigest(), rootLocation.getRelative(child.getName())); + downloadTree(context, child.getDigest(), rootLocation.getRelative(child.getName())); } }