diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index 7aa6a23af4183e..d0e5d330af228f 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -25,12 +25,10 @@ import com.google.bytestream.ByteStreamGrpc.ByteStreamFutureStub; import com.google.bytestream.ByteStreamGrpc.ByteStreamStub; import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest; -import com.google.bytestream.ByteStreamProto.QueryWriteStatusResponse; import com.google.bytestream.ByteStreamProto.WriteRequest; import com.google.bytestream.ByteStreamProto.WriteResponse; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Ascii; -import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.util.concurrent.AsyncCallable; import com.google.common.util.concurrent.Futures; @@ -238,6 +236,16 @@ private ListenableFuture startAsyncUpload( return currUpload; } + /** + * Signal that the blob already exists on the server, so upload should complete early but + * successfully. + */ + private static final class AlreadyExists extends Exception { + private AlreadyExists() { + super(); + } + } + private static final class AsyncUpload implements AsyncCallable { private final RemoteActionExecutionContext context; private final ReferenceCountedChannel channel; @@ -269,28 +277,26 @@ private static final class AsyncUpload implements AsyncCallable { } ListenableFuture start() { - return Futures.transformAsync( - Utils.refreshIfUnauthenticatedAsync( - () -> retrier.executeAsync(this, progressiveBackoff), callCredentialsProvider), - committedSize -> { - try { - checkCommittedSize(committedSize); - } catch (IOException e) { - return Futures.immediateFailedFuture(e); - } - return immediateVoidFuture(); - }, + return Futures.catching( + Futures.transformAsync( + Utils.refreshIfUnauthenticatedAsync( + () -> retrier.executeAsync(this, progressiveBackoff), callCredentialsProvider), + committedSize -> { + try { + checkCommittedSize(committedSize); + } catch (IOException e) { + return Futures.immediateFailedFuture(e); + } + return immediateVoidFuture(); + }, + MoreExecutors.directExecutor()), + AlreadyExists.class, + ae -> null, MoreExecutors.directExecutor()); } + /** Check the committed_size the server returned makes sense after a successful full upload. */ private void checkCommittedSize(long committedSize) throws IOException { - // Only check for matching committed size if we have completed the upload. If another client - // did, they might have used a different compression level/algorithm, so we cannot know the - // expected committed offset - if (chunker.hasNext()) { - return; - } - long expected = chunker.getOffset(); if (committedSize == expected) { @@ -329,9 +335,6 @@ public ListenableFuture call() { firstAttempt ? Futures.immediateFuture(0L) : query(), committedSize -> { if (!firstAttempt) { - if (chunker.getSize() == committedSize) { - return Futures.immediateFuture(committedSize); - } if (committedSize > lastCommittedOffset) { // We have made progress on this upload in the last request. Reset the backoff so // that this request has a full deck of retries @@ -362,7 +365,7 @@ private ByteStreamStub bsAsyncStub(Channel channel) { private ListenableFuture query() { ListenableFuture committedSizeFuture = - Futures.transform( + Futures.transformAsync( channel.withChannelFuture( channel -> bsFutureStub(channel) @@ -370,7 +373,10 @@ private ListenableFuture query() { QueryWriteStatusRequest.newBuilder() .setResourceName(resourceName) .build())), - QueryWriteStatusResponse::getCommittedSize, + r -> + r.getComplete() + ? Futures.immediateFailedFuture(new AlreadyExists()) + : Futures.immediateFuture(r.getCommittedSize()), MoreExecutors.directExecutor()); return Futures.catchingAsync( committedSizeFuture, @@ -392,24 +398,7 @@ private ListenableFuture upload(long pos) { channel -> { SettableFuture uploadResult = SettableFuture.create(); bsAsyncStub(channel).write(new Writer(resourceName, chunker, pos, uploadResult)); - return Futures.catchingAsync( - uploadResult, - Throwable.class, - throwable -> { - Preconditions.checkNotNull(throwable); - - Status status = Status.fromThrowable(throwable); - switch (status.getCode()) { - case ALREADY_EXISTS: - // Server indicated the blob already exists, so we translate the error to a - // successful upload. - return Futures.immediateFuture(chunker.getSize()); - - default: - return Futures.immediateFailedFuture(throwable); - } - }, - MoreExecutors.directExecutor()); + return uploadResult; }); } } @@ -423,6 +412,7 @@ private static final class Writer private long committedSize = -1; private ClientCallStreamObserver requestObserver; private boolean first = true; + private boolean finishedWriting; private Writer( String resourceName, Chunker chunker, long pos, SettableFuture uploadResult) { @@ -447,10 +437,6 @@ public void beforeStart(ClientCallStreamObserver requestObserver) @Override public void run() { - if (committedSize != -1) { - requestObserver.cancel("server has returned early", null); - return; - } while (requestObserver.isReady()) { WriteRequest.Builder request = WriteRequest.newBuilder(); if (first) { @@ -477,6 +463,7 @@ public void run() { .build()); if (isLastChunk) { requestObserver.onCompleted(); + finishedWriting = true; return; } } @@ -515,12 +502,22 @@ public void onNext(WriteResponse response) { @Override public void onCompleted() { - uploadResult.set(committedSize); + if (finishedWriting) { + uploadResult.set(committedSize); + } else { + // Server completed succesfully before we finished writing all the data, meaning the blob + // already exists. The server is supposed to set committed_size to the size of the blob (for + // uncompressed uploads) or -1 (for compressed uploads), but we do not verify this. + requestObserver.cancel("server has returned early", null); + uploadResult.setException(new AlreadyExists()); + } } @Override public void onError(Throwable t) { - uploadResult.setException(t); + requestObserver.cancel("failed", t); + uploadResult.setException( + (Status.fromThrowable(t).getCode() == Code.ALREADY_EXISTS) ? new AlreadyExists() : t); } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index 75e604d38835a4..d1dce5d0e482c0 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -477,6 +477,65 @@ public void queryWriteStatus( Mockito.verify(mockBackoff, Mockito.times(1)).getRetryAttempts(); } + @Test + public void progressiveCompressedUploadSeesAlreadyExistsAtTheEnd() throws Exception { + RemoteRetrier retrier = + TestUtils.newRemoteRetrier( + () -> new FixedBackoff(1, 0), + e -> Status.fromThrowable(e).getCode() == Code.INTERNAL, + retryService); + ByteStreamUploader uploader = + new ByteStreamUploader( + INSTANCE_NAME, + referenceCountedChannel, + CallCredentialsProvider.NO_CREDENTIALS, + 300, + retrier, + /* maximumOpenFiles= */ -1); + + int chunkSize = 1024; + byte[] blob = new byte[chunkSize * 2 + 1]; + new Random().nextBytes(blob); + + Chunker chunker = + Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(chunkSize).build(); + Digest digest = DIGEST_UTIL.compute(blob); + + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public StreamObserver write(StreamObserver streamObserver) { + return new StreamObserver() { + @Override + public void onNext(WriteRequest writeRequest) {} + + @Override + public void onError(Throwable throwable) { + fail("onError should never be called."); + } + + @Override + public void onCompleted() { + streamObserver.onError(Status.INTERNAL.asException()); + } + }; + } + + @Override + public void queryWriteStatus( + QueryWriteStatusRequest request, StreamObserver response) { + response.onNext( + QueryWriteStatusResponse.newBuilder() + .setCommittedSize(blob.length) + .setComplete(true) + .build()); + response.onCompleted(); + } + }); + + uploader.uploadBlob(context, digest, chunker); + } + @Test public void concurrentlyCompletedUploadIsNotRetried() throws Exception { // Test that after an upload has failed and the QueryWriteStatus call returns @@ -609,8 +668,7 @@ public void queryWriteStatus( @Test public void earlyWriteResponseShouldCompleteUpload() throws Exception { - RemoteRetrier retrier = - TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService); + RemoteRetrier retrier = TestUtils.newRemoteRetrier(() -> mockBackoff, e -> false, retryService); ByteStreamUploader uploader = new ByteStreamUploader( INSTANCE_NAME, @@ -700,8 +758,7 @@ public void onCompleted() { @Test public void incorrectCommittedSizeDoesNotFailIncompleteUpload() throws Exception { - RemoteRetrier retrier = - TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService); + RemoteRetrier retrier = TestUtils.newRemoteRetrier(() -> mockBackoff, e -> false, retryService); ByteStreamUploader uploader = new ByteStreamUploader( INSTANCE_NAME,