From 90965b072eb4a6dec8ff5b8abde3726732d37bdc Mon Sep 17 00:00:00 2001 From: Benjamin Peterson Date: Wed, 22 Dec 2021 11:40:01 -0600 Subject: [PATCH] Stop remote blob upload if upload is complete. (#14467) If a ByteStream/Write RPC fails, but ByteStream/QueryWriteStatus reveals the upload is in fact complete, avoid a NullPointerException. This CL is the dual fix of 78b89a0136a83d303d4d88373d6e510f85a81fbb for uploads. On bazel-6.0.0-pre.20211117.1, I observed: ``` java.lang.NullPointerException at com.google.devtools.build.lib.remote.Chunker.seek(Chunker.java:156) at com.google.devtools.build.lib.remote.ByteStreamUploader$AsyncUpload.lambda$start$0(ByteStreamUploader.java:416) at com.google.devtools.build.lib.remote.Retrier.executeAsync(Retrier.java:277) at com.google.devtools.build.lib.remote.Retrier.lambda$onExecuteAsyncFailure$1(Retrier.java:293) at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleAsyncTask.runInterruptibly(TrustedListenableFutureTask.java:160) at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleAsyncTask.runInterruptibly(TrustedListenableFutureTask.java:143) at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69) at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69) at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ``` Closes #14464. PiperOrigin-RevId: 417795715 --- .../build/lib/remote/ByteStreamUploader.java | 10 +-- .../lib/remote/ByteStreamUploaderTest.java | 62 +++++++++++++++++++ 2 files changed, 67 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index cc31b5bf070705..d4aa19061c3c34 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -248,7 +248,7 @@ public ListenableFuture uploadBlobAsync( checkState(!isShutdown, "Must not call uploadBlobs after shutdown."); if (!forceUpload && uploadedBlobs.contains(HashCode.fromString(digest.getHash()))) { - return Futures.immediateFuture(null); + return immediateVoidFuture(); } ListenableFuture inProgress = uploadsInProgress.get(digest); @@ -410,7 +410,7 @@ ListenableFuture start() { () -> retrier.executeAsync( () -> { - if (chunker.getSize() == 0) { + if (chunker.getSize() == committedOffset.get()) { return immediateVoidFuture(); } try { @@ -426,7 +426,7 @@ ListenableFuture start() { if (chunker.hasNext()) { return callAndQueryOnFailure(committedOffset, progressiveBackoff); } - return Futures.immediateFuture(null); + return immediateVoidFuture(); }, progressiveBackoff), callCredentialsProvider); @@ -448,7 +448,7 @@ ListenableFuture start() { return Futures.immediateFailedFuture(new IOException(message)); } } - return Futures.immediateFuture(null); + return immediateVoidFuture(); }, MoreExecutors.directExecutor()); } @@ -536,7 +536,7 @@ private ListenableFuture query( progressiveBackoff.reset(); } committedOffset.set(committedSize); - return Futures.immediateFuture(null); + return immediateVoidFuture(); }, MoreExecutors.directExecutor()); } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index 15cc335a0ace40..c4165c2de1cf97 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -1517,6 +1517,68 @@ public void onCompleted() { blockUntilInternalStateConsistent(uploader); } + @Test + public void failureAfterUploadCompletes() throws Exception { + AtomicInteger numUploads = new AtomicInteger(); + RemoteRetrier retrier = + TestUtils.newRemoteRetrier( + () -> mockBackoff, e -> e instanceof StatusRuntimeException, retryService); + ByteStreamUploader uploader = + new ByteStreamUploader( + INSTANCE_NAME, + new ReferenceCountedChannel(channelConnectionFactory), + CallCredentialsProvider.NO_CREDENTIALS, + /* callTimeoutSecs= */ 60, + retrier); + + byte[] blob = new byte[CHUNK_SIZE - 1]; + new Random().nextBytes(blob); + + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public StreamObserver write(StreamObserver streamObserver) { + numUploads.incrementAndGet(); + return new StreamObserver() { + @Override + public void onNext(WriteRequest writeRequest) {} + + @Override + public void onError(Throwable throwable) { + fail("onError should never be called."); + } + + @Override + public void onCompleted() { + streamObserver.onNext( + WriteResponse.newBuilder().setCommittedSize(blob.length).build()); + streamObserver.onError(Status.UNAVAILABLE.asException()); + } + }; + } + + @Override + public void queryWriteStatus( + QueryWriteStatusRequest request, StreamObserver response) { + response.onNext( + QueryWriteStatusResponse.newBuilder() + .setCommittedSize(blob.length) + .setComplete(true) + .build()); + response.onCompleted(); + } + }); + + Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); + HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash()); + + uploader.uploadBlob(context, hash, chunker, true); + + blockUntilInternalStateConsistent(uploader); + + assertThat(numUploads.get()).isEqualTo(1); + } + @Test public void testCompressedUploads() throws Exception { RemoteRetrier retrier =