Skip to content

Commit

Permalink
Merge branch 'release-6.1.0' into ks/cherry-pick16995
Browse files Browse the repository at this point in the history
  • Loading branch information
ShreeM01 authored Feb 7, 2023
2 parents f4e7a4e + 82168d4 commit 3e9dcf8
Show file tree
Hide file tree
Showing 21 changed files with 234 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public ImmutableList<SpawnResult> exec(
spawnLogContext.logSpawn(
spawn,
actionExecutionContext.getMetadataProvider(),
context.getInputMapping(PathFragment.EMPTY_FRAGMENT),
context.getInputMapping(PathFragment.EMPTY_FRAGMENT, /* willAccessRepeatedly= */ false),
context.getTimeout(),
spawnResult);
} catch (IOException | ForbiddenActionInputException e) {
Expand Down Expand Up @@ -246,7 +246,9 @@ public ListenableFuture<Void> prefetchInputs()
return actionExecutionContext
.getActionInputPrefetcher()
.prefetchFiles(
getInputMapping(PathFragment.EMPTY_FRAGMENT).values(), getMetadataProvider());
getInputMapping(PathFragment.EMPTY_FRAGMENT, /* willAccessRepeatedly= */ true)
.values(),
getMetadataProvider());
}

return immediateVoidFuture();
Expand Down Expand Up @@ -306,22 +308,33 @@ public FileOutErr getFileOutErr() {
}

@Override
public SortedMap<PathFragment, ActionInput> getInputMapping(PathFragment baseDirectory)
public SortedMap<PathFragment, ActionInput> getInputMapping(
PathFragment baseDirectory, boolean willAccessRepeatedly)
throws IOException, ForbiddenActionInputException {
if (lazyInputMapping == null || !inputMappingBaseDirectory.equals(baseDirectory)) {
try (SilentCloseable c =
Profiler.instance().profile("AbstractSpawnStrategy.getInputMapping")) {
inputMappingBaseDirectory = baseDirectory;
lazyInputMapping =
spawnInputExpander.getInputMapping(
spawn,
actionExecutionContext.getArtifactExpander(),
baseDirectory,
actionExecutionContext.getMetadataProvider());
}
// Return previously computed copy if present.
if (lazyInputMapping != null && inputMappingBaseDirectory.equals(baseDirectory)) {
return lazyInputMapping;
}

SortedMap<PathFragment, ActionInput> inputMapping;
try (SilentCloseable c =
Profiler.instance().profile("AbstractSpawnStrategy.getInputMapping")) {
inputMapping =
spawnInputExpander.getInputMapping(
spawn,
actionExecutionContext.getArtifactExpander(),
baseDirectory,
actionExecutionContext.getMetadataProvider());
}

return lazyInputMapping;
// Don't cache the input mapping if it is unlikely that it is used again.
// This reduces memory usage in the case where remote caching/execution is
// used, and the expected cache hit rate is high.
if (willAccessRepeatedly) {
inputMappingBaseDirectory = baseDirectory;
lazyInputMapping = inputMapping;
}
return inputMapping;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ void lockOutputFiles(int exitCode, String errorMessage, FileOutErr outErr)
* mapping is used in a context where the directory relative to which the keys are interpreted
* is not the same as the execroot.
*/
SortedMap<PathFragment, ActionInput> getInputMapping(PathFragment baseDirectory)
SortedMap<PathFragment, ActionInput> getInputMapping(
PathFragment baseDirectory, boolean willAccessRepeatedly)
throws IOException, ForbiddenActionInputException;

/** Reports a progress update to the Spawn strategy. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.devtools.build.lib.vfs.PathFragment;
import java.io.IOException;
import java.util.SortedMap;
import javax.annotation.Nullable;

/** A value class representing an action which can be executed remotely. */
public class RemoteAction {
Expand All @@ -36,7 +37,9 @@ public class RemoteAction {
private final SpawnExecutionContext spawnExecutionContext;
private final RemoteActionExecutionContext remoteActionExecutionContext;
private final RemotePathResolver remotePathResolver;
private final MerkleTree merkleTree;
@Nullable private final MerkleTree merkleTree;
private final long inputBytes;
private final long inputFiles;
private final Digest commandHash;
private final Command command;
private final Action action;
Expand All @@ -51,12 +54,15 @@ public class RemoteAction {
Digest commandHash,
Command command,
Action action,
ActionKey actionKey) {
ActionKey actionKey,
boolean remoteDiscardMerkleTrees) {
this.spawn = spawn;
this.spawnExecutionContext = spawnExecutionContext;
this.remoteActionExecutionContext = remoteActionExecutionContext;
this.remotePathResolver = remotePathResolver;
this.merkleTree = merkleTree;
this.merkleTree = remoteDiscardMerkleTrees ? null : merkleTree;
this.inputBytes = merkleTree.getInputBytes();
this.inputFiles = merkleTree.getInputFiles();
this.commandHash = commandHash;
this.command = command;
this.action = action;
Expand All @@ -80,12 +86,12 @@ public Spawn getSpawn() {
* Returns the sum of file sizes plus protobuf sizes used to represent the inputs of this action.
*/
public long getInputBytes() {
return merkleTree.getInputBytes();
return inputBytes;
}

/** Returns the number of input files of this action. */
public long getInputFiles() {
return merkleTree.getInputFiles();
return inputFiles;
}

/** Returns the id this is action. */
Expand All @@ -111,6 +117,7 @@ public Command getCommand() {
return command;
}

@Nullable
public MerkleTree getMerkleTree() {
return merkleTree;
}
Expand All @@ -119,9 +126,9 @@ public MerkleTree getMerkleTree() {
* Returns a {@link SortedMap} which maps from input paths for remote action to {@link
* ActionInput}.
*/
public SortedMap<PathFragment, ActionInput> getInputMap()
public SortedMap<PathFragment, ActionInput> getInputMap(boolean willAccessRepeatedly)
throws IOException, ForbiddenActionInputException {
return remotePathResolver.getInputMapping(spawnExecutionContext);
return remotePathResolver.getInputMapping(spawnExecutionContext, willAccessRepeatedly);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Phaser;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -377,7 +378,9 @@ private MerkleTree buildInputMerkleTree(
}
return MerkleTree.merge(subMerkleTrees, digestUtil);
} else {
SortedMap<PathFragment, ActionInput> inputMap = remotePathResolver.getInputMapping(context);
SortedMap<PathFragment, ActionInput> inputMap =
remotePathResolver.getInputMapping(
context, /* willAccessRepeatedly= */ !remoteOptions.remoteDiscardMerkleTrees);
if (!outputDirMap.isEmpty()) {
// The map returned by getInputMapping is mutable, but must not be mutated here as it is
// shared with all other strategies.
Expand Down Expand Up @@ -436,63 +439,90 @@ private static ByteString buildSalt(Spawn spawn) {
return null;
}

/**
* Semaphore for limiting the concurrent number of Merkle tree input roots we compute and keep in
* memory.
*
* <p>When --jobs is set to a high value to let the remote execution service runs many actions in
* parallel, there is no point in letting the local system compute Merkle trees of input roots
* with the same amount of parallelism. Not only does this make Bazel feel sluggish and slow to
* respond to being interrupted, it causes it to exhaust memory.
*
* <p>As there is no point in letting Merkle tree input root computation use a higher concurrency
* than the number of CPUs in the system, use a semaphore to limit the concurrency of
* buildRemoteAction().
*/
private final Semaphore remoteActionBuildingSemaphore =
new Semaphore(Runtime.getRuntime().availableProcessors(), true);

@Nullable
private ToolSignature getToolSignature(Spawn spawn, SpawnExecutionContext context)
throws IOException, ExecException, InterruptedException {
return remoteOptions.markToolInputs
&& Spawns.supportsWorkers(spawn)
&& !spawn.getToolFiles().isEmpty()
? computePersistentWorkerSignature(spawn, context)
: null;
}

/** Creates a new {@link RemoteAction} instance from spawn. */
public RemoteAction buildRemoteAction(Spawn spawn, SpawnExecutionContext context)
throws IOException, ExecException, ForbiddenActionInputException, InterruptedException {
ToolSignature toolSignature =
remoteOptions.markToolInputs
&& Spawns.supportsWorkers(spawn)
&& !spawn.getToolFiles().isEmpty()
? computePersistentWorkerSignature(spawn, context)
: null;
final MerkleTree merkleTree = buildInputMerkleTree(spawn, context, toolSignature);

// Get the remote platform properties.
Platform platform = PlatformUtils.getPlatformProto(spawn, remoteOptions);
if (toolSignature != null) {
platform =
PlatformUtils.getPlatformProto(
spawn, remoteOptions, ImmutableMap.of("persistentWorkerKey", toolSignature.key));
} else {
platform = PlatformUtils.getPlatformProto(spawn, remoteOptions);
}

Command command =
buildCommand(
spawn.getOutputFiles(),
spawn.getArguments(),
spawn.getEnvironment(),
platform,
remotePathResolver);
Digest commandHash = digestUtil.compute(command);
Action action =
Utils.buildAction(
commandHash,
merkleTree.getRootDigest(),
platform,
context.getTimeout(),
Spawns.mayBeCachedRemotely(spawn),
buildSalt(spawn));

ActionKey actionKey = digestUtil.computeActionKey(action);

RequestMetadata metadata =
TracingMetadataUtils.buildMetadata(
buildRequestId, commandId, actionKey.getDigest().getHash(), spawn.getResourceOwner());
RemoteActionExecutionContext remoteActionExecutionContext =
RemoteActionExecutionContext.create(
spawn, metadata, getWriteCachePolicy(spawn), getReadCachePolicy(spawn));

return new RemoteAction(
spawn,
context,
remoteActionExecutionContext,
remotePathResolver,
merkleTree,
commandHash,
command,
action,
actionKey);
remoteActionBuildingSemaphore.acquire();
try {
ToolSignature toolSignature = getToolSignature(spawn, context);
final MerkleTree merkleTree = buildInputMerkleTree(spawn, context, toolSignature);

// Get the remote platform properties.
Platform platform = PlatformUtils.getPlatformProto(spawn, remoteOptions);
if (toolSignature != null) {
platform =
PlatformUtils.getPlatformProto(
spawn, remoteOptions, ImmutableMap.of("persistentWorkerKey", toolSignature.key));
} else {
platform = PlatformUtils.getPlatformProto(spawn, remoteOptions);
}

Command command =
buildCommand(
spawn.getOutputFiles(),
spawn.getArguments(),
spawn.getEnvironment(),
platform,
remotePathResolver);
Digest commandHash = digestUtil.compute(command);
Action action =
Utils.buildAction(
commandHash,
merkleTree.getRootDigest(),
platform,
context.getTimeout(),
Spawns.mayBeCachedRemotely(spawn),
buildSalt(spawn));

ActionKey actionKey = digestUtil.computeActionKey(action);

RequestMetadata metadata =
TracingMetadataUtils.buildMetadata(
buildRequestId, commandId, actionKey.getDigest().getHash(), spawn.getResourceOwner());
RemoteActionExecutionContext remoteActionExecutionContext =
RemoteActionExecutionContext.create(
spawn, metadata, getWriteCachePolicy(spawn), getReadCachePolicy(spawn));

return new RemoteAction(
spawn,
context,
remoteActionExecutionContext,
remotePathResolver,
merkleTree,
commandHash,
command,
action,
actionKey,
remoteOptions.remoteDiscardMerkleTrees);
} finally {
remoteActionBuildingSemaphore.release();
}
}

@Nullable
Expand Down Expand Up @@ -1338,7 +1368,7 @@ private void reportUploadError(Throwable error) {
* <p>Must be called before calling {@link #executeRemotely}.
*/
public void uploadInputsIfNotPresent(RemoteAction action, boolean force)
throws IOException, InterruptedException {
throws IOException, ExecException, ForbiddenActionInputException, InterruptedException {
checkState(!shutdown.get(), "shutdown");
checkState(mayBeExecutedRemotely(action.getSpawn()), "spawn can't be executed remotely");

Expand All @@ -1347,13 +1377,33 @@ public void uploadInputsIfNotPresent(RemoteAction action, boolean force)
Map<Digest, Message> additionalInputs = Maps.newHashMapWithExpectedSize(2);
additionalInputs.put(action.getActionKey().getDigest(), action.getAction());
additionalInputs.put(action.getCommandHash(), action.getCommand());
remoteExecutionCache.ensureInputsPresent(
action
.getRemoteActionExecutionContext()
.withWriteCachePolicy(CachePolicy.REMOTE_CACHE_ONLY), // Only upload to remote cache
action.getMerkleTree(),
additionalInputs,
force);

// As uploading depends on having the full input root in memory, limit
// concurrency. This prevents memory exhaustion. We assume that
// ensureInputsPresent() provides enough parallelism to saturate the
// network connection.
remoteActionBuildingSemaphore.acquire();
try {
MerkleTree merkleTree = action.getMerkleTree();
if (merkleTree == null) {
// --experimental_remote_discard_merkle_trees was provided.
// Recompute the input root.
Spawn spawn = action.getSpawn();
SpawnExecutionContext context = action.getSpawnExecutionContext();
ToolSignature toolSignature = getToolSignature(spawn, context);
merkleTree = buildInputMerkleTree(spawn, context, toolSignature);
}

remoteExecutionCache.ensureInputsPresent(
action
.getRemoteActionExecutionContext()
.withWriteCachePolicy(CachePolicy.REMOTE_CACHE_ONLY), // Only upload to remote cache
merkleTree,
additionalInputs,
force);
} finally {
remoteActionBuildingSemaphore.release();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void close() {}

private void checkForConcurrentModifications()
throws IOException, ForbiddenActionInputException {
for (ActionInput input : action.getInputMap().values()) {
for (ActionInput input : action.getInputMap(true).values()) {
if (input instanceof VirtualActionInput) {
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,9 +621,9 @@ private Map<Path, Long> getInputCtimes(SortedMap<PathFragment, ActionInput> inpu
SpawnResult execLocallyAndUpload(
RemoteAction action, Spawn spawn, SpawnExecutionContext context, boolean uploadLocalResults)
throws ExecException, IOException, ForbiddenActionInputException, InterruptedException {
Map<Path, Long> ctimesBefore = getInputCtimes(action.getInputMap());
Map<Path, Long> ctimesBefore = getInputCtimes(action.getInputMap(true));
SpawnResult result = execLocally(spawn, context);
Map<Path, Long> ctimesAfter = getInputCtimes(action.getInputMap());
Map<Path, Long> ctimesAfter = getInputCtimes(action.getInputMap(true));
uploadLocalResults =
uploadLocalResults && Status.SUCCESS.equals(result.status()) && result.exitCode() == 0;
if (!uploadLocalResults) {
Expand Down
Loading

0 comments on commit 3e9dcf8

Please sign in to comment.