From 913a985a5c2fc3842b12c6e5f29af0fa1bccfd6a Mon Sep 17 00:00:00 2001 From: Yannic Bonenberger Date: Mon, 23 Nov 2020 20:00:26 -0800 Subject: [PATCH] Report digest of failed uploads This will make the error message more useful because otherwise, there was no way of telling which upload failed or timed out (other than running with `-j 1`) Closes #12507. PiperOrigin-RevId: 343977441 --- .../ByteStreamBuildEventArtifactUploader.java | 7 +- .../build/lib/remote/ByteStreamUploader.java | 67 +++++++++++++------ .../build/lib/remote/GrpcCacheClient.java | 7 +- ...eStreamBuildEventArtifactUploaderTest.java | 5 +- 4 files changed, 51 insertions(+), 35 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java index 3a03343e3e8bc4..89fa1f91e89d53 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java @@ -19,7 +19,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import com.google.common.hash.HashCode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -194,11 +193,7 @@ private ListenableFuture> uploadLocalFiles( final ListenableFuture upload; Context prevCtx = ctx.attach(); try { - upload = - uploader.uploadBlobAsync( - HashCode.fromString(path.getDigest().getHash()), - chunker, - /* forceUpload=*/ false); + upload = uploader.uploadBlobAsync(path.getDigest(), chunker, /* forceUpload=*/ false); } finally { ctx.detach(prevCtx); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index a69a4df5486dab..c1133796129b4b 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -19,6 +19,7 @@ import static java.util.Collections.singletonMap; import static java.util.concurrent.TimeUnit.SECONDS; +import build.bazel.remote.execution.v2.Digest; import com.google.bytestream.ByteStreamGrpc; import com.google.bytestream.ByteStreamGrpc.ByteStreamFutureStub; import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest; @@ -64,10 +65,10 @@ /** * A client implementing the {@code Write} method of the {@code ByteStream} gRPC service. * - *

The uploader supports reference counting to easily be shared between components with - * different lifecyles. After instantiation the reference count is {@code 1}. + *

The uploader supports reference counting to easily be shared between components with different + * lifecyles. After instantiation the reference count is {@code 1}. * - * See {@link ReferenceCounted} for more information on reference counting. + *

See {@link ReferenceCounted} for more information on reference counting. */ class ByteStreamUploader extends AbstractReferenceCounted { @@ -81,12 +82,12 @@ class ByteStreamUploader extends AbstractReferenceCounted { private final Object lock = new Object(); - /** Contains the hash codes of already uploaded blobs. **/ + /** Contains the hash codes of already uploaded blobs. * */ @GuardedBy("lock") private final Set uploadedBlobs = new HashSet<>(); @GuardedBy("lock") - private final Map> uploadsInProgress = new HashMap<>(); + private final Map> uploadsInProgress = new HashMap<>(); @GuardedBy("lock") private boolean isShutdown; @@ -179,8 +180,8 @@ public void uploadBlobs(Map chunkers, boolean forceUpload) * Cancels all running uploads. The method returns immediately and does NOT wait for the uploads * to be cancelled. * - *

This method should not be called directly, but will be called implicitly when the - * reference count reaches {@code 0}. + *

This method should not be called directly, but will be called implicitly when the reference + * count reaches {@code 0}. */ @VisibleForTesting void shutdown() { @@ -199,6 +200,16 @@ void shutdown() { } } + /** @deprecated Use {@link #uploadBlobAsync(Digest, Chunker, boolean)} instead. */ + @Deprecated + @VisibleForTesting + public ListenableFuture uploadBlobAsync( + HashCode hash, Chunker chunker, boolean forceUpload) { + Digest digest = + Digest.newBuilder().setHash(hash.toString()).setSizeBytes(chunker.getSize()).build(); + return uploadBlobAsync(digest, chunker, forceUpload); + } + /** * Uploads a BLOB asynchronously to the remote {@code ByteStream} service. The call returns * immediately and one can listen to the returned future for the success/failure of the upload. @@ -209,32 +220,32 @@ void shutdown() { *

Trying to upload the same BLOB multiple times concurrently, results in only one upload being * performed. This is transparent to the user of this API. * - * @param hash the hash of the data to upload. + * @param digest the {@link Digest} of the data to upload. * @param chunker the data to upload. * @param forceUpload if {@code false} the blob is not uploaded if it has previously been * uploaded, if {@code true} the blob is uploaded. * @throws IOException when reading of the {@link Chunker}s input source fails */ public ListenableFuture uploadBlobAsync( - HashCode hash, Chunker chunker, boolean forceUpload) { + Digest digest, Chunker chunker, boolean forceUpload) { synchronized (lock) { checkState(!isShutdown, "Must not call uploadBlobs after shutdown."); - if (!forceUpload && uploadedBlobs.contains(hash)) { + if (!forceUpload && uploadedBlobs.contains(HashCode.fromString(digest.getHash()))) { return Futures.immediateFuture(null); } - ListenableFuture inProgress = uploadsInProgress.get(hash); + ListenableFuture inProgress = uploadsInProgress.get(digest); if (inProgress != null) { return inProgress; } ListenableFuture uploadResult = Futures.transform( - startAsyncUpload(hash, chunker), + startAsyncUpload(digest, chunker), (v) -> { synchronized (lock) { - uploadedBlobs.add(hash); + uploadedBlobs.add(HashCode.fromString(digest.getHash())); } return null; }, @@ -244,14 +255,20 @@ public ListenableFuture uploadBlobAsync( Futures.catchingAsync( uploadResult, StatusRuntimeException.class, - (sre) -> Futures.immediateFailedFuture(new IOException(sre)), + (sre) -> + Futures.immediateFailedFuture( + new IOException( + String.format( + "Error while uploading artifact with digest '%s/%s'", + digest.getHash(), digest.getSizeBytes()), + sre)), MoreExecutors.directExecutor()); - uploadsInProgress.put(hash, uploadResult); + uploadsInProgress.put(digest, uploadResult); uploadResult.addListener( () -> { synchronized (lock) { - uploadsInProgress.remove(hash); + uploadsInProgress.remove(digest); } }, MoreExecutors.directExecutor()); @@ -267,9 +284,9 @@ boolean uploadsInProgress() { } } - private static String buildUploadResourceName( - String instanceName, UUID uuid, HashCode hash, long size) { - String resourceName = format("uploads/%s/blobs/%s/%d", uuid, hash, size); + private static String buildUploadResourceName(String instanceName, UUID uuid, Digest digest) { + String resourceName = + format("uploads/%s/blobs/%s/%d", uuid, digest.getHash(), digest.getSizeBytes()); if (!Strings.isNullOrEmpty(instanceName)) { resourceName = instanceName + "/" + resourceName; } @@ -277,15 +294,23 @@ private static String buildUploadResourceName( } /** Starts a file upload and returns a future representing the upload. */ - private ListenableFuture startAsyncUpload(HashCode hash, Chunker chunker) { + private ListenableFuture startAsyncUpload(Digest digest, Chunker chunker) { try { chunker.reset(); } catch (IOException e) { return Futures.immediateFailedFuture(e); } + if (chunker.getSize() != digest.getSizeBytes()) { + return Futures.immediateFailedFuture( + new IllegalStateException( + String.format( + "Expected chunker size of %d, got %d", + digest.getSizeBytes(), chunker.getSize()))); + } + UUID uploadId = UUID.randomUUID(); - String resourceName = buildUploadResourceName(instanceName, uploadId, hash, chunker.getSize()); + String resourceName = buildUploadResourceName(instanceName, uploadId, digest); AsyncUpload newUpload = new AsyncUpload( channel, callCredentialsProvider, callTimeoutSecs, retrier, resourceName, chunker); diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java index 2e783cae0c70af..93a5c1c7309488 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java @@ -38,7 +38,6 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.flogger.GoogleLogger; -import com.google.common.hash.HashCode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; @@ -387,7 +386,7 @@ public void onCompleted() { @Override public ListenableFuture uploadFile(Digest digest, Path path) { return uploader.uploadBlobAsync( - HashCode.fromString(digest.getHash()), + digest, Chunker.builder().setInput(digest.getSizeBytes(), path).build(), /* forceUpload= */ true); } @@ -395,8 +394,6 @@ public ListenableFuture uploadFile(Digest digest, Path path) { @Override public ListenableFuture uploadBlob(Digest digest, ByteString data) { return uploader.uploadBlobAsync( - HashCode.fromString(digest.getHash()), - Chunker.builder().setInput(data.toByteArray()).build(), - /* forceUpload= */ true); + digest, Chunker.builder().setInput(data.toByteArray()).build(), /* forceUpload= */ true); } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java index ce3072e5cc3895..5c44697da14a97 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java @@ -333,7 +333,7 @@ public void remoteFileShouldNotBeUploaded_findMissingDigests() throws Exception StaticMissingDigestsFinder digestQuerier = Mockito.spy(new StaticMissingDigestsFinder(ImmutableSet.of(remoteDigest))); ByteStreamUploader uploader = Mockito.mock(ByteStreamUploader.class); - when(uploader.uploadBlobAsync(any(), any(), anyBoolean())) + when(uploader.uploadBlobAsync(any(Digest.class), any(), anyBoolean())) .thenReturn(Futures.immediateFuture(null)); ByteStreamBuildEventArtifactUploader artifactUploader = newArtifactUploader(uploader, digestQuerier); @@ -349,8 +349,7 @@ public void remoteFileShouldNotBeUploaded_findMissingDigests() throws Exception // assert verify(digestQuerier).findMissingDigests(any()); - verify(uploader) - .uploadBlobAsync(eq(HashCode.fromString(localDigest.getHash())), any(), anyBoolean()); + verify(uploader).uploadBlobAsync(eq(localDigest), any(), anyBoolean()); assertThat(pathConverter.apply(remoteFile)).contains(remoteDigest.getHash()); assertThat(pathConverter.apply(localFile)).contains(localDigest.getHash()); }