Skip to content

Commit

Permalink
Remote: Use parameters instead of thread-local storage to provide tra…
Browse files Browse the repository at this point in the history
…cing metadata.

Remote Execution API defines RequestMetadata which is an optional message attached to any RPC request to tell the server about an external context of the request. Existing code uses Context object from gRPC which is essentially a thread-local storage to provide metadata:
    1. We always attach the Context even the underlying implementation isn't based on gRPC.
    2. It is error prone in a multi-threaded context.

This is the first step of a series changes where we introduce RemoteActionExecutionContext and update RemoteCacheClient#downloadActionResult to use it.

These is a regression that networkTime of SpawnMetrics will not be correct since the NetworkTimeInterceptor is no longer installed at RemoteModule and other methods haven't get updated to use the RemoteActionExecutionContext. The networkTime will back to normal once we finish the cleanup.

PiperOrigin-RevId: 353819916
  • Loading branch information
Googler authored and copybara-github committed Jan 26, 2021
1 parent dafcebf commit bc54c64
Show file tree
Hide file tree
Showing 23 changed files with 408 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.google.devtools.build.lib.remote.RemoteRetrier.ProgressiveBackoff;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.MissingDigestsFinder;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.DigestOutputStream;
Expand Down Expand Up @@ -142,9 +143,11 @@ private ActionCacheBlockingStub acBlockingStub() {
.withDeadlineAfter(options.remoteTimeout.getSeconds(), TimeUnit.SECONDS);
}

private ActionCacheFutureStub acFutureStub() {
private ActionCacheFutureStub acFutureStub(RemoteActionExecutionContext context) {
return ActionCacheGrpc.newFutureStub(channel)
.withInterceptors(TracingMetadataUtils.attachMetadataFromContextInterceptor())
.withInterceptors(
TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata()),
new NetworkTimeInterceptor(context::getNetworkTime))
.withCallCredentials(callCredentialsProvider.getCallCredentials())
.withDeadlineAfter(options.remoteTimeout.getSeconds(), TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -240,19 +243,18 @@ private ListenableFuture<ActionResult> handleStatus(ListenableFuture<ActionResul

@Override
public ListenableFuture<ActionResult> downloadActionResult(
ActionKey actionKey, boolean inlineOutErr) {
RemoteActionExecutionContext context, ActionKey actionKey, boolean inlineOutErr) {
GetActionResultRequest request =
GetActionResultRequest.newBuilder()
.setInstanceName(options.remoteInstanceName)
.setActionDigest(actionKey.getDigest())
.setInlineStderr(inlineOutErr)
.setInlineStdout(inlineOutErr)
.build();
Context ctx = Context.current();
return Utils.refreshIfUnauthenticatedAsync(
() ->
retrier.executeAsync(
() -> ctx.call(() -> handleStatus(acFutureStub().getActionResult(request)))),
() -> handleStatus(acFutureStub(context).getActionResult(request))),
callCredentialsProvider);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The Bazel Authors. All rights reserved.
// Copyright 2020 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -11,12 +11,10 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote.util;
package com.google.devtools.build.lib.remote;

import build.bazel.remote.execution.v2.ExecutionGrpc;
import com.google.common.base.MoreObjects;
import com.google.common.base.Stopwatch;
import com.google.devtools.build.lib.concurrent.ThreadSafety;
import com.google.devtools.build.lib.remote.common.NetworkTime;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
Expand All @@ -27,59 +25,31 @@
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.time.Duration;
import java.util.function.Supplier;

/** Reentrant wall clock stopwatch and grpc interceptor for network waits. */
@ThreadSafety.ThreadSafe
public class NetworkTime {
/** The ClientInterceptor used to track network time. */
public class NetworkTimeInterceptor implements ClientInterceptor {

public static final Context.Key<NetworkTime> CONTEXT_KEY = Context.key("remote-network-time");
private final Supplier<NetworkTime> networkTimeSupplier;

private final Stopwatch wallTime = Stopwatch.createUnstarted();
private int outstanding = 0;

private synchronized void start() {
if (!wallTime.isRunning()) {
wallTime.start();
}
outstanding++;
}

private synchronized void stop() {
if (--outstanding == 0) {
wallTime.stop();
}
}

public Duration getDuration() {
return wallTime.elapsed();
public NetworkTimeInterceptor(Supplier<NetworkTime> networkTimeSupplier) {
this.networkTimeSupplier = networkTimeSupplier;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("outstanding", outstanding)
.add("wallTime", wallTime)
.add("wallTime.isRunning", wallTime.isRunning())
.toString();
}

/** The ClientInterceptor used to track network time. */
public static class Interceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
// prevent accounting for execution wait time
if (method != ExecutionGrpc.getExecuteMethod()
&& method != ExecutionGrpc.getWaitExecutionMethod()) {
NetworkTime networkTime = CONTEXT_KEY.get();
if (networkTime != null) {
call = new NetworkTimeCall<>(call, networkTime);
}
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
ClientCall<ReqT, RespT> call = next.newCall(method, callOptions);
// prevent accounting for execution wait time
if (method != ExecutionGrpc.getExecuteMethod()
&& method != ExecutionGrpc.getWaitExecutionMethod()) {
NetworkTime networkTime = networkTimeSupplier.get();
if (networkTime != null) {
call = new NetworkTimeCall<>(call, networkTime);
}
return call;
}
return call;
}

private static class NetworkTimeCall<ReqT, RespT>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import com.google.devtools.build.lib.remote.RemoteCache.ActionResultMetadata.DirectoryMetadata;
import com.google.devtools.build.lib.remote.RemoteCache.ActionResultMetadata.FileMetadata;
import com.google.devtools.build.lib.remote.RemoteCache.ActionResultMetadata.SymlinkMetadata;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteActionFileArtifactValue;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
Expand Down Expand Up @@ -115,9 +116,10 @@ public RemoteCache(
this.digestUtil = digestUtil;
}

public ActionResult downloadActionResult(ActionKey actionKey, boolean inlineOutErr)
public ActionResult downloadActionResult(
RemoteActionExecutionContext context, ActionKey actionKey, boolean inlineOutErr)
throws IOException, InterruptedException {
return getFromFuture(cacheProtocol.downloadActionResult(actionKey, inlineOutErr));
return getFromFuture(cacheProtocol.downloadActionResult(context, actionKey, inlineOutErr));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.options.RemoteOutputsMode;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.NetworkTime;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.remote.util.Utils;
import com.google.devtools.build.lib.runtime.BlazeModule;
Expand Down Expand Up @@ -330,7 +329,6 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
if (loggingInterceptor != null) {
interceptors.add(loggingInterceptor);
}
interceptors.add(new NetworkTime.Interceptor());
try {
execChannel =
RemoteCacheClientFactory.createGrpcChannelPool(
Expand All @@ -357,7 +355,6 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
if (loggingInterceptor != null) {
interceptors.add(loggingInterceptor);
}
interceptors.add(new NetworkTime.Interceptor());
try {
cacheChannel =
RemoteCacheClientFactory.createGrpcChannelPool(
Expand Down Expand Up @@ -551,7 +548,9 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteCache,
remoteExecutor,
digestUtil,
repoContext,
buildRequestId,
invocationId,
"repository_rule",
remoteOptions.remoteInstanceName,
remoteOptions.remoteAcceptCached));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.common.NetworkTime;
import com.google.devtools.build.lib.remote.common.OperationObserver;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
import com.google.devtools.build.lib.remote.common.RemoteExecutionClient;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.remote.util.Utils;
import com.google.devtools.build.lib.runtime.RepositoryRemoteExecutor;
import com.google.devtools.build.lib.vfs.Path;
Expand All @@ -49,7 +53,9 @@ public class RemoteRepositoryRemoteExecutor implements RepositoryRemoteExecutor
private final RemoteExecutionCache remoteCache;
private final RemoteExecutionClient remoteExecutor;
private final DigestUtil digestUtil;
private final Context requestCtx;
private final String buildRequestId;
private final String commandId;
private final String actionId;

private final String remoteInstanceName;
private final boolean acceptCached;
Expand All @@ -58,13 +64,17 @@ public RemoteRepositoryRemoteExecutor(
RemoteExecutionCache remoteCache,
RemoteExecutionClient remoteExecutor,
DigestUtil digestUtil,
Context requestCtx,
String buildRequestId,
String commandId,
String actionId,
String remoteInstanceName,
boolean acceptCached) {
this.remoteCache = remoteCache;
this.remoteExecutor = remoteExecutor;
this.digestUtil = digestUtil;
this.requestCtx = requestCtx;
this.buildRequestId = buildRequestId;
this.commandId = commandId;
this.actionId = actionId;
this.remoteInstanceName = remoteInstanceName;
this.acceptCached = acceptCached;
}
Expand Down Expand Up @@ -100,6 +110,8 @@ public ExecutionResult execute(
String workingDirectory,
Duration timeout)
throws IOException, InterruptedException {
Context requestCtx =
TracingMetadataUtils.contextWithMetadata(buildRequestId, commandId, actionId);
Context prev = requestCtx.attach();
try {
Platform platform = PlatformUtils.buildPlatformProto(executionProperties);
Expand All @@ -117,10 +129,16 @@ public ExecutionResult execute(
commandHash, merkleTree.getRootDigest(), timeout, acceptCached);
Digest actionDigest = digestUtil.compute(action);
ActionKey actionKey = new ActionKey(actionDigest);
RemoteActionExecutionContext remoteActionExecutionContext =
new RemoteActionExecutionContextImpl(
TracingMetadataUtils.buildMetadata(buildRequestId, commandId, actionId),
new NetworkTime());
ActionResult actionResult;
try (SilentCloseable c =
Profiler.instance().profile(ProfilerTask.REMOTE_CACHE_CHECK, "check cache hit")) {
actionResult = remoteCache.downloadActionResult(actionKey, /* inlineOutErr= */ true);
actionResult =
remoteCache.downloadActionResult(
remoteActionExecutionContext, actionKey, /* inlineOutErr= */ true);
}
if (actionResult == null || actionResult.getExitCode() != 0) {
try (SilentCloseable c =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.runtime.RepositoryRemoteExecutor;
import com.google.devtools.build.lib.runtime.RepositoryRemoteExecutorFactory;
import io.grpc.Context;

/** Factory for {@link RemoteRepositoryRemoteExecutor}. */
class RemoteRepositoryRemoteExecutorFactory implements RepositoryRemoteExecutorFactory {

private final RemoteExecutionCache remoteExecutionCache;
private final RemoteExecutionClient remoteExecutor;
private final DigestUtil digestUtil;
private final Context requestCtx;
private final String buildRequestId;
private final String commandId;
private final String actionId;

private final String remoteInstanceName;
private final boolean acceptCached;
Expand All @@ -34,13 +35,17 @@ class RemoteRepositoryRemoteExecutorFactory implements RepositoryRemoteExecutorF
RemoteExecutionCache remoteExecutionCache,
RemoteExecutionClient remoteExecutor,
DigestUtil digestUtil,
Context requestCtx,
String buildRequestId,
String commandId,
String actionId,
String remoteInstanceName,
boolean acceptCached) {
this.remoteExecutionCache = remoteExecutionCache;
this.remoteExecutor = remoteExecutor;
this.digestUtil = digestUtil;
this.requestCtx = requestCtx;
this.buildRequestId = buildRequestId;
this.commandId = commandId;
this.actionId = actionId;
this.remoteInstanceName = remoteInstanceName;
this.acceptCached = acceptCached;
}
Expand All @@ -51,7 +56,9 @@ public RepositoryRemoteExecutor create() {
remoteExecutionCache,
remoteExecutor,
digestUtil,
requestCtx,
buildRequestId,
commandId,
actionId,
remoteInstanceName,
acceptCached);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.NetworkTime;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContextImpl;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.options.RemoteOutputsMode;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.NetworkTime;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.remote.util.Utils;
import com.google.devtools.build.lib.remote.util.Utils.InMemoryOutput;
Expand Down Expand Up @@ -151,9 +153,15 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context)
digestUtil.compute(command), merkleTreeRoot, context.getTimeout(), true);
// Look up action cache, and reuse the action output if it is found.
ActionKey actionKey = digestUtil.computeActionKey(action);

RemoteActionExecutionContext remoteActionExecutionContext =
new RemoteActionExecutionContextImpl(
TracingMetadataUtils.buildMetadata(
buildRequestId, commandId, actionKey.getDigest().getHash()),
networkTime);
Context withMetadata =
TracingMetadataUtils.contextWithMetadata(buildRequestId, commandId, actionKey)
.withValue(NetworkTime.CONTEXT_KEY, networkTime);
.withValue(NetworkTimeInterceptor.CONTEXT_KEY, networkTime);

Profiler prof = Profiler.instance();
if (options.remoteAcceptCached
Expand All @@ -165,7 +173,9 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context)
try {
ActionResult result;
try (SilentCloseable c = prof.profile(ProfilerTask.REMOTE_CACHE_CHECK, "check cache hit")) {
result = remoteCache.downloadActionResult(actionKey, /* inlineOutErr= */ false);
result =
remoteCache.downloadActionResult(
remoteActionExecutionContext, actionKey, /* inlineOutErr= */ false);
}
// In case the remote cache returned a failed action (exit code != 0) we treat it as a
// cache miss
Expand Down
Loading

0 comments on commit bc54c64

Please sign in to comment.