Skip to content

Commit

Permalink
And mnemonic and label to remote metadata
Browse files Browse the repository at this point in the history
    This reflects bazelbuild/remote-apis#186

    Closes #13109.

    PiperOrigin-RevId: 368763391
  • Loading branch information
Luca Di Grazia committed Sep 4, 2022
1 parent c6e08cd commit a1fbe0f
Show file tree
Hide file tree
Showing 13 changed files with 2,499 additions and 1,012 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,106 +13,283 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.RequestMetadata;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile;
import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
import com.google.devtools.build.lib.buildeventstream.PathConverter;
import com.google.devtools.build.lib.collect.ImmutableIterable;
import com.google.devtools.build.lib.remote.common.MissingDigestsFinder;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.remoteexecution.v1test.Digest;
import io.grpc.Context;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCounted;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;

/**
* A {@link BuildEventArtifactUploader} backed by {@link ByteStreamUploader}.
*/
class ByteStreamBuildEventArtifactUploader implements BuildEventArtifactUploader {
/** A {@link BuildEventArtifactUploader} backed by {@link ByteStreamUploader}. */
class ByteStreamBuildEventArtifactUploader extends AbstractReferenceCounted
implements BuildEventArtifactUploader {

private final Context ctx;
private final ListeningExecutorService uploadExecutor;
private final String buildRequestId;
private final String commandId;
private final ByteStreamUploader uploader;
private final String remoteServerInstanceName;
private final MissingDigestsFinder missingDigestsFinder;

private final AtomicBoolean shutdown = new AtomicBoolean();

ByteStreamBuildEventArtifactUploader(
ByteStreamUploader uploader, String remoteServerName, Context ctx,
@Nullable String remoteInstanceName) {
ByteStreamUploader uploader,
MissingDigestsFinder missingDigestsFinder,
String remoteServerInstanceName,
String buildRequestId,
String commandId,
int maxUploadThreads) {
this.uploader = Preconditions.checkNotNull(uploader);
String remoteServerInstanceName = Preconditions.checkNotNull(remoteServerName);
if (!Strings.isNullOrEmpty(remoteInstanceName)) {
remoteServerInstanceName += "/" + remoteInstanceName;
}
this.ctx = ctx;
this.buildRequestId = buildRequestId;
this.commandId = commandId;
this.remoteServerInstanceName = remoteServerInstanceName;
// Limit the maximum threads number to 1000 (chosen arbitrarily)
this.uploadExecutor =
MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(
Math.min(maxUploadThreads, 1000),
new ThreadFactoryBuilder().setNameFormat("bes-artifact-uploader-%d").build()));
this.missingDigestsFinder = missingDigestsFinder;
}

/** Returns {@code true} if Bazel knows that the file is stored on a remote system. */
private static boolean isRemoteFile(Path file) {
return file.getFileSystem() instanceof RemoteActionFileSystem
&& ((RemoteActionFileSystem) file.getFileSystem()).isRemote(file);
}

private static final class PathMetadata {

private final Path path;
private final Digest digest;
private final boolean directory;
private final boolean remote;

PathMetadata(Path path, Digest digest, boolean directory, boolean remote) {
this.path = path;
this.digest = digest;
this.directory = directory;
this.remote = remote;
}

public Path getPath() {
return path;
}

public Digest getDigest() {
return digest;
}

public boolean isDirectory() {
return directory;
}

public boolean isRemote() {
return remote;
}
}

/**
* Collects metadata for {@code file}. Depending on the underlying filesystem used this method
* might do I/O.
*/
private static PathMetadata readPathMetadata(Path file) throws IOException {
if (file.isDirectory()) {
return new PathMetadata(file, /* digest= */ null, /* directory= */ true, /* remote= */ false);
}
DigestUtil digestUtil = new DigestUtil(file.getFileSystem().getDigestFunction());
Digest digest = digestUtil.compute(file);
return new PathMetadata(file, digest, /* directory= */ false, isRemoteFile(file));
}

private static List<PathMetadata> processQueryResult(
ImmutableSet<Digest> missingDigests, List<PathMetadata> filesToQuery) {
List<PathMetadata> allPaths = new ArrayList<>(filesToQuery.size());
for (PathMetadata file : filesToQuery) {
if (missingDigests.contains(file.getDigest())) {
allPaths.add(file);
} else {
PathMetadata remotePathMetadata =
new PathMetadata(
file.getPath(), file.getDigest(), file.isDirectory(), /* remote= */ true);
allPaths.add(remotePathMetadata);
}
}
return allPaths;
}

/**
* For files where {@link PathMetadata#isRemote()} returns {@code false} this method checks if the
* remote cache already contains the file. If so {@link PathMetadata#isRemote()} is set to {@code
* true}.
*/
private ListenableFuture<ImmutableIterable<PathMetadata>> queryRemoteCache(
ImmutableList<ListenableFuture<PathMetadata>> allPaths) throws Exception {
RequestMetadata metadata =
TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "bes-upload", null);
RemoteActionExecutionContext context = RemoteActionExecutionContext.create(metadata);

List<PathMetadata> knownRemotePaths = new ArrayList<>(allPaths.size());
List<PathMetadata> filesToQuery = new ArrayList<>();
Set<Digest> digestsToQuery = new HashSet<>();
for (ListenableFuture<PathMetadata> pathMetadataFuture : allPaths) {
// This line is guaranteed to not block, as this code is only called after all futures in
// allPaths have completed.
PathMetadata pathMetadata = pathMetadataFuture.get();
if (pathMetadata.isRemote() || pathMetadata.isDirectory()) {
knownRemotePaths.add(pathMetadata);
} else {
filesToQuery.add(pathMetadata);
digestsToQuery.add(pathMetadata.getDigest());
}
}
if (digestsToQuery.isEmpty()) {
return Futures.immediateFuture(ImmutableIterable.from(knownRemotePaths));
}
return Futures.transform(
missingDigestsFinder.findMissingDigests(context, digestsToQuery),
(missingDigests) -> {
List<PathMetadata> filesToQueryUpdated = processQueryResult(missingDigests, filesToQuery);
return ImmutableIterable.from(Iterables.concat(knownRemotePaths, filesToQueryUpdated));
},
MoreExecutors.directExecutor());
}

/**
* Uploads any files from {@code allPaths} where {@link PathMetadata#isRemote()} returns {@code
* false}.
*/
private ListenableFuture<List<PathMetadata>> uploadLocalFiles(
ImmutableIterable<PathMetadata> allPaths) {
RequestMetadata metadata =
TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "bes-upload", null);
RemoteActionExecutionContext context = RemoteActionExecutionContext.create(metadata);

ImmutableList.Builder<ListenableFuture<PathMetadata>> allPathsUploaded =
ImmutableList.builder();
for (PathMetadata path : allPaths) {
if (!path.isRemote() && !path.isDirectory()) {
Chunker chunker =
Chunker.builder().setInput(path.getDigest().getSizeBytes(), path.getPath()).build();
final ListenableFuture<Void> upload;
upload =
uploader.uploadBlobAsync(context, path.getDigest(), chunker, /* forceUpload= */ false);
allPathsUploaded.add(Futures.transform(upload, unused -> path, uploadExecutor));
} else {
allPathsUploaded.add(Futures.immediateFuture(path));
}
}
return Futures.allAsList(allPathsUploaded.build());
}

@Override
public ListenableFuture<PathConverter> upload(Map<Path, LocalFile> files) {
if (files.isEmpty()) {
return Futures.immediateFuture(PathConverter.NO_CONVERSION);
}
List<ListenableFuture<PathDigestPair>> uploads = new ArrayList<>(files.size());

Context prevCtx = ctx.attach();
try {
for (Path file : files.keySet()) {
Chunker chunker = new Chunker(file);
Digest digest = chunker.digest();
ListenableFuture<PathDigestPair> upload =
Futures.transform(
uploader.uploadBlobAsync(chunker, /*forceUpload=*/false),
unused -> new PathDigestPair(file, digest),
MoreExecutors.directExecutor());
uploads.add(upload);
}

return Futures.transform(Futures.allAsList(uploads),
(uploadsDone) -> new PathConverterImpl(remoteServerInstanceName, uploadsDone),
MoreExecutors.directExecutor());
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
} finally {
ctx.detach(prevCtx);
// Collect metadata about each path
ImmutableList.Builder<ListenableFuture<PathMetadata>> allPathMetadata = ImmutableList.builder();
for (Path file : files.keySet()) {
ListenableFuture<PathMetadata> pathMetadata =
uploadExecutor.submit(() -> readPathMetadata(file));
allPathMetadata.add(pathMetadata);
}

// Query the remote cache to check which files need to be uploaded
ImmutableList<ListenableFuture<PathMetadata>> allPaths = allPathMetadata.build();
ListenableFuture<ImmutableIterable<PathMetadata>> allPathsUpdatedMetadata =
Futures.whenAllSucceed(allPaths)
.callAsync(() -> queryRemoteCache(allPaths), MoreExecutors.directExecutor());

// Upload local files (if any)
ListenableFuture<List<PathMetadata>> allPathsMetadata =
Futures.transformAsync(
allPathsUpdatedMetadata,
(paths) -> uploadLocalFiles(paths),
MoreExecutors.directExecutor());

return Futures.transform(
allPathsMetadata,
(metadata) -> new PathConverterImpl(remoteServerInstanceName, metadata),
MoreExecutors.directExecutor());
}

@Override
public void shutdown() {
public boolean mayBeSlow() {
return true;
}

@Override
protected void deallocate() {
if (shutdown.getAndSet(true)) {
return;
}
uploader.release();
uploadExecutor.shutdown();
}

@Override
public ReferenceCounted touch(Object o) {
return this;
}

private static class PathConverterImpl implements PathConverter {

private final String remoteServerInstanceName;
private final Map<Path, Digest> pathToDigest;
private final Set<Path> skippedPaths;

PathConverterImpl(String remoteServerInstanceName,
List<PathDigestPair> uploads) {
PathConverterImpl(String remoteServerInstanceName, List<PathMetadata> uploads) {
Preconditions.checkNotNull(uploads);
this.remoteServerInstanceName = remoteServerInstanceName;
pathToDigest = new HashMap<>(uploads.size());
for (PathDigestPair pair : uploads) {
pathToDigest.put(pair.getPath(), pair.getDigest());
ImmutableSet.Builder<Path> skippedPaths = ImmutableSet.builder();
for (PathMetadata pair : uploads) {
Path path = pair.getPath();
Digest digest = pair.getDigest();
if (digest != null) {
pathToDigest.put(path, digest);
} else {
skippedPaths.add(path);
}
}
this.skippedPaths = skippedPaths.build();
}

@Override
public String apply(Path path) {
Preconditions.checkNotNull(path);
Digest digest = pathToDigest.get(path);
if (digest == null) {
if (skippedPaths.contains(path)) {
return null;
}
// It's a programming error to reference a file that has not been uploaded.
throw new IllegalStateException(
String.format("Illegal file reference: '%s'", path.getPathString()));
Expand All @@ -122,23 +299,4 @@ public String apply(Path path) {
remoteServerInstanceName, digest.getHash(), digest.getSizeBytes());
}
}

private static class PathDigestPair {

private final Path path;
private final Digest digest;

PathDigestPair(Path path, Digest digest) {
this.path = path;
this.digest = digest;
}

public Path getPath() {
return path;
}

public Digest getDigest() {
return digest;
}
}
}
Loading

0 comments on commit a1fbe0f

Please sign in to comment.