Skip to content

Commit

Permalink
Report digest of failed uploads
Browse files Browse the repository at this point in the history
This will make the error message more useful because otherwise, there
was no way of telling which upload failed or timed out (other than
running with `-j 1`)

Closes bazelbuild#12507.

PiperOrigin-RevId: 343977441
  • Loading branch information
Yannic authored and copybara-github committed Nov 24, 2020
1 parent 4f044e0 commit 913a985
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.hash.HashCode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
Expand Down Expand Up @@ -194,11 +193,7 @@ private ListenableFuture<List<PathMetadata>> uploadLocalFiles(
final ListenableFuture<Void> upload;
Context prevCtx = ctx.attach();
try {
upload =
uploader.uploadBlobAsync(
HashCode.fromString(path.getDigest().getHash()),
chunker,
/* forceUpload=*/ false);
upload = uploader.uploadBlobAsync(path.getDigest(), chunker, /* forceUpload=*/ false);
} finally {
ctx.detach(prevCtx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.SECONDS;

import build.bazel.remote.execution.v2.Digest;
import com.google.bytestream.ByteStreamGrpc;
import com.google.bytestream.ByteStreamGrpc.ByteStreamFutureStub;
import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest;
Expand Down Expand Up @@ -64,10 +65,10 @@
/**
* A client implementing the {@code Write} method of the {@code ByteStream} gRPC service.
*
* <p>The uploader supports reference counting to easily be shared between components with
* different lifecyles. After instantiation the reference count is {@code 1}.
* <p>The uploader supports reference counting to easily be shared between components with different
* lifecyles. After instantiation the reference count is {@code 1}.
*
* See {@link ReferenceCounted} for more information on reference counting.
* <p>See {@link ReferenceCounted} for more information on reference counting.
*/
class ByteStreamUploader extends AbstractReferenceCounted {

Expand All @@ -81,12 +82,12 @@ class ByteStreamUploader extends AbstractReferenceCounted {

private final Object lock = new Object();

/** Contains the hash codes of already uploaded blobs. **/
/** Contains the hash codes of already uploaded blobs. * */
@GuardedBy("lock")
private final Set<HashCode> uploadedBlobs = new HashSet<>();

@GuardedBy("lock")
private final Map<HashCode, ListenableFuture<Void>> uploadsInProgress = new HashMap<>();
private final Map<Digest, ListenableFuture<Void>> uploadsInProgress = new HashMap<>();

@GuardedBy("lock")
private boolean isShutdown;
Expand Down Expand Up @@ -179,8 +180,8 @@ public void uploadBlobs(Map<HashCode, Chunker> chunkers, boolean forceUpload)
* Cancels all running uploads. The method returns immediately and does NOT wait for the uploads
* to be cancelled.
*
* <p>This method should not be called directly, but will be called implicitly when the
* reference count reaches {@code 0}.
* <p>This method should not be called directly, but will be called implicitly when the reference
* count reaches {@code 0}.
*/
@VisibleForTesting
void shutdown() {
Expand All @@ -199,6 +200,16 @@ void shutdown() {
}
}

/** @deprecated Use {@link #uploadBlobAsync(Digest, Chunker, boolean)} instead. */
@Deprecated
@VisibleForTesting
public ListenableFuture<Void> uploadBlobAsync(
HashCode hash, Chunker chunker, boolean forceUpload) {
Digest digest =
Digest.newBuilder().setHash(hash.toString()).setSizeBytes(chunker.getSize()).build();
return uploadBlobAsync(digest, chunker, forceUpload);
}

/**
* Uploads a BLOB asynchronously to the remote {@code ByteStream} service. The call returns
* immediately and one can listen to the returned future for the success/failure of the upload.
Expand All @@ -209,32 +220,32 @@ void shutdown() {
* <p>Trying to upload the same BLOB multiple times concurrently, results in only one upload being
* performed. This is transparent to the user of this API.
*
* @param hash the hash of the data to upload.
* @param digest the {@link Digest} of the data to upload.
* @param chunker the data to upload.
* @param forceUpload if {@code false} the blob is not uploaded if it has previously been
* uploaded, if {@code true} the blob is uploaded.
* @throws IOException when reading of the {@link Chunker}s input source fails
*/
public ListenableFuture<Void> uploadBlobAsync(
HashCode hash, Chunker chunker, boolean forceUpload) {
Digest digest, Chunker chunker, boolean forceUpload) {
synchronized (lock) {
checkState(!isShutdown, "Must not call uploadBlobs after shutdown.");

if (!forceUpload && uploadedBlobs.contains(hash)) {
if (!forceUpload && uploadedBlobs.contains(HashCode.fromString(digest.getHash()))) {
return Futures.immediateFuture(null);
}

ListenableFuture<Void> inProgress = uploadsInProgress.get(hash);
ListenableFuture<Void> inProgress = uploadsInProgress.get(digest);
if (inProgress != null) {
return inProgress;
}

ListenableFuture<Void> uploadResult =
Futures.transform(
startAsyncUpload(hash, chunker),
startAsyncUpload(digest, chunker),
(v) -> {
synchronized (lock) {
uploadedBlobs.add(hash);
uploadedBlobs.add(HashCode.fromString(digest.getHash()));
}
return null;
},
Expand All @@ -244,14 +255,20 @@ public ListenableFuture<Void> uploadBlobAsync(
Futures.catchingAsync(
uploadResult,
StatusRuntimeException.class,
(sre) -> Futures.immediateFailedFuture(new IOException(sre)),
(sre) ->
Futures.immediateFailedFuture(
new IOException(
String.format(
"Error while uploading artifact with digest '%s/%s'",
digest.getHash(), digest.getSizeBytes()),
sre)),
MoreExecutors.directExecutor());

uploadsInProgress.put(hash, uploadResult);
uploadsInProgress.put(digest, uploadResult);
uploadResult.addListener(
() -> {
synchronized (lock) {
uploadsInProgress.remove(hash);
uploadsInProgress.remove(digest);
}
},
MoreExecutors.directExecutor());
Expand All @@ -267,25 +284,33 @@ boolean uploadsInProgress() {
}
}

private static String buildUploadResourceName(
String instanceName, UUID uuid, HashCode hash, long size) {
String resourceName = format("uploads/%s/blobs/%s/%d", uuid, hash, size);
private static String buildUploadResourceName(String instanceName, UUID uuid, Digest digest) {
String resourceName =
format("uploads/%s/blobs/%s/%d", uuid, digest.getHash(), digest.getSizeBytes());
if (!Strings.isNullOrEmpty(instanceName)) {
resourceName = instanceName + "/" + resourceName;
}
return resourceName;
}

/** Starts a file upload and returns a future representing the upload. */
private ListenableFuture<Void> startAsyncUpload(HashCode hash, Chunker chunker) {
private ListenableFuture<Void> startAsyncUpload(Digest digest, Chunker chunker) {
try {
chunker.reset();
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}

if (chunker.getSize() != digest.getSizeBytes()) {
return Futures.immediateFailedFuture(
new IllegalStateException(
String.format(
"Expected chunker size of %d, got %d",
digest.getSizeBytes(), chunker.getSize())));
}

UUID uploadId = UUID.randomUUID();
String resourceName = buildUploadResourceName(instanceName, uploadId, hash, chunker.getSize());
String resourceName = buildUploadResourceName(instanceName, uploadId, digest);
AsyncUpload newUpload =
new AsyncUpload(
channel, callCredentialsProvider, callTimeoutSecs, retrier, resourceName, chunker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.flogger.GoogleLogger;
import com.google.common.hash.HashCode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -387,16 +386,14 @@ public void onCompleted() {
@Override
public ListenableFuture<Void> uploadFile(Digest digest, Path path) {
return uploader.uploadBlobAsync(
HashCode.fromString(digest.getHash()),
digest,
Chunker.builder().setInput(digest.getSizeBytes(), path).build(),
/* forceUpload= */ true);
}

@Override
public ListenableFuture<Void> uploadBlob(Digest digest, ByteString data) {
return uploader.uploadBlobAsync(
HashCode.fromString(digest.getHash()),
Chunker.builder().setInput(data.toByteArray()).build(),
/* forceUpload= */ true);
digest, Chunker.builder().setInput(data.toByteArray()).build(), /* forceUpload= */ true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ public void remoteFileShouldNotBeUploaded_findMissingDigests() throws Exception
StaticMissingDigestsFinder digestQuerier =
Mockito.spy(new StaticMissingDigestsFinder(ImmutableSet.of(remoteDigest)));
ByteStreamUploader uploader = Mockito.mock(ByteStreamUploader.class);
when(uploader.uploadBlobAsync(any(), any(), anyBoolean()))
when(uploader.uploadBlobAsync(any(Digest.class), any(), anyBoolean()))
.thenReturn(Futures.immediateFuture(null));
ByteStreamBuildEventArtifactUploader artifactUploader =
newArtifactUploader(uploader, digestQuerier);
Expand All @@ -349,8 +349,7 @@ public void remoteFileShouldNotBeUploaded_findMissingDigests() throws Exception

// assert
verify(digestQuerier).findMissingDigests(any());
verify(uploader)
.uploadBlobAsync(eq(HashCode.fromString(localDigest.getHash())), any(), anyBoolean());
verify(uploader).uploadBlobAsync(eq(localDigest), any(), anyBoolean());
assertThat(pathConverter.apply(remoteFile)).contains(remoteDigest.getHash());
assertThat(pathConverter.apply(localFile)).contains(localDigest.getHash());
}
Expand Down

0 comments on commit 913a985

Please sign in to comment.