From 9650db3accb3eaf943e571340c2f3e02ae47defb Mon Sep 17 00:00:00 2001 From: Alessandro Patti Date: Sun, 26 Sep 2021 14:24:06 +0200 Subject: [PATCH] Allow calling getActualSize when upload are not finished --- .../build/lib/remote/ByteStreamUploader.java | 5 +-- .../devtools/build/lib/remote/Chunker.java | 35 +++++++++++++------ .../build/lib/remote/ChunkerTest.java | 17 --------- 3 files changed, 28 insertions(+), 29 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index c010f9c75ae75c..40ec95b7cddcce 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -408,8 +408,9 @@ ListenableFuture start() { () -> retrier.executeAsync( () -> { - if (chunker.bytesLeft() != 0 - || committedOffset.get() < chunker.getActualSize()) { + long offset = committedOffset.get(); + if ((offset == 0 && chunker.bytesLeft() != 0) + || (offset < chunker.getActualSize())) { return callAndQueryOnFailure(committedOffset, progressiveBackoff); } return Futures.immediateFuture(null); diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java index 38a96f44e51031..c778346f766182 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java @@ -118,7 +118,8 @@ public int hashCode() { // lazily on the first call to next(), as opposed to opening it in the constructor or on reset(). private boolean initialized; - private AtomicLong actualSize = new AtomicLong(0); + private AtomicLong processedBytes = new AtomicLong(0); + private long actualSize = -1; Chunker(Supplier dataSupplier, long size, int chunkSize, boolean compressed) { this.dataSupplier = checkNotNull(dataSupplier); @@ -136,11 +137,25 @@ public long getSize() { return size; } - public long getActualSize() { - long actualSize = this.actualSize.get(); - checkState(bytesLeft() == 0); - checkState(compressed || actualSize == size); - return actualSize; + public long getActualSize() throws IOException { + if (compressed) { + if (actualSize == -1) { + if (bytesLeft() != 0) { + // If there are bytes left, compute the remaining size + long currentOffset = offset; + while (hasNext()) { + next(); + } + actualSize = processedBytes.get(); + seek(currentOffset); + } else { + actualSize = processedBytes.get(); + } + } + return actualSize; + } else { + return size; + } } /** @@ -198,12 +213,12 @@ public void seek(long toOffset) throws IOException { if (remaining == 0 && toOffset == size) { zos.close(); } - actualSize.addAndGet(baos.toByteArray().length); + processedBytes.addAndGet(baos.toByteArray().length); baos.reset(); } } else { ByteStreams.skipFully(data, toOffset - offset); - actualSize.addAndGet(toOffset); + processedBytes.addAndGet(toOffset); } offset = toOffset; } @@ -273,7 +288,7 @@ public Chunk next() throws IOException { } else { blob = ByteString.copyFrom(chunkCache, 0, bytesToRead); } - actualSize.addAndGet(blob.size()); + processedBytes.addAndGet(blob.size()); // This has to happen after actualSize has been updated // or the guard in getActualSize won't work. @@ -310,7 +325,7 @@ private void maybeInitialize() throws IOException { baos = new ByteArrayOutputStream(); zos = new ZstdOutputStream(baos); } - actualSize = new AtomicLong(0); + processedBytes = new AtomicLong(0); initialized = true; } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java index ce1575948b57b8..ab9ada190cae67 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java @@ -201,23 +201,6 @@ public void testMultiChunkCompressed() throws IOException { assertThat(Zstd.decompress(baos.toByteArray(), data.length)).isEqualTo(data); } - @Test - public void testActualSizeThrowsIfCalledEarly() throws IOException { - byte[] data = {72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 33}; - Chunker chunker = Chunker.builder().setInput(data).setChunkSize(data.length / 2).build(); - chunker.next(); - assertThrows(IllegalStateException.class, chunker::getActualSize); - } - - @Test - public void testActualSizeIsSizeForUncompressedBlob() throws IOException { - byte[] data = {72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 33}; - Chunker chunker = Chunker.builder().setInput(data).setChunkSize(data.length * 2).build(); - chunker.next(); - assertThat(chunker.hasNext()).isFalse(); - assertThat(chunker.getActualSize()).isEqualTo(chunker.getSize()); - } - @Test public void testActualSizeIsCorrectAfterSeek() throws IOException { byte[] data = {72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 33};