diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java index d1024a3d3143cf..64399b9b91f02f 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java @@ -142,17 +142,23 @@ public void reset() throws IOException { } /** - * Seek to an offset, if necessary resetting or initializing + * Seek to an offset in the source stream. * - *

May close open resources in order to seek to an earlier offset. + *

May close and reopen resources in order to seek to an earlier offset. + * + * @param toOffset the offset from beginning of the source stream. If the source stream is + * compressed, it refers to the offset in the uncompressed form to align with `write_offset` + * in REAPI. */ public void seek(long toOffset) throws IOException { - if (toOffset < offset) { + // For compressed stream, we need to reinitialize the stream since the offset refers to the + // uncompressed form. + if (initialized && toOffset >= offset && !compressed) { + ByteStreams.skipFully(data, toOffset - offset); + } else { reset(); + initialize(toOffset); } - maybeInitialize(); - ByteStreams.skipFully(data, toOffset - offset); - offset = toOffset; if (data.finished()) { close(); } @@ -245,18 +251,26 @@ private void maybeInitialize() throws IOException { if (initialized) { return; } + initialize(0); + } + + private void initialize(long srcPos) throws IOException { + checkState(!initialized); checkState(data == null); checkState(offset == 0); checkState(chunkCache == null); try { + InputStream src = dataSupplier.get(); + ByteStreams.skipFully(src, srcPos); data = compressed - ? new ChunkerInputStream(new ZstdCompressingInputStream(dataSupplier.get())) - : new ChunkerInputStream(dataSupplier.get()); + ? new ChunkerInputStream(new ZstdCompressingInputStream(src)) + : new ChunkerInputStream(src); } catch (RuntimeException e) { Throwables.propagateIfPossible(e.getCause(), IOException.class); throw e; } + offset = srcPos; initialized = true; } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index c4165c2de1cf97..1b88a3267d442e 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -69,6 +69,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -357,23 +358,18 @@ public void progressiveCompressedUploadShouldWork() throws Exception { 300, retrier); - byte[] blob = new byte[CHUNK_SIZE * 2 + 1]; + int chunkSize = 1024; + int skipSize = chunkSize + 1; + byte[] blob = new byte[chunkSize * 2 + 1]; new Random().nextBytes(blob); Chunker chunker = - Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(CHUNK_SIZE).build(); + Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(chunkSize).build(); HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash()); - while (chunker.hasNext()) { - chunker.next(); - } - long expectedSize = chunker.getOffset(); - chunker.reset(); - + ByteArrayOutputStream output = new ByteArrayOutputStream(); serviceRegistry.addService( new ByteStreamImplBase() { - - byte[] receivedData = new byte[(int) expectedSize]; String receivedResourceName = null; boolean receivedComplete = false; long nextOffset = 0; @@ -398,21 +394,21 @@ public void onNext(WriteRequest writeRequest) { assertThat(resourceName).isEmpty(); } - assertThat(writeRequest.getWriteOffset()).isEqualTo(nextOffset); - - ByteString data = writeRequest.getData(); - - System.arraycopy( - data.toByteArray(), 0, receivedData, (int) nextOffset, data.size()); - - nextOffset += data.size(); - receivedComplete = expectedSize == nextOffset; - assertThat(writeRequest.getFinishWrite()).isEqualTo(receivedComplete); - if (initialOffset == 0) { streamObserver.onError(Status.DEADLINE_EXCEEDED.asException()); mustQueryWriteStatus = true; - initialOffset = nextOffset; + initialOffset = skipSize; + nextOffset = initialOffset; + } else { + ByteString data = writeRequest.getData(); + try { + data.writeTo(output); + } catch (IOException e) { + streamObserver.onError(e); + return; + } + nextOffset += data.size(); + receivedComplete = writeRequest.getFinishWrite(); } } @@ -423,10 +419,6 @@ public void onError(Throwable throwable) { @Override public void onCompleted() { - assertThat(nextOffset).isEqualTo(expectedSize); - byte[] decompressed = Zstd.decompress(receivedData, blob.length); - assertThat(decompressed).isEqualTo(blob); - WriteResponse response = WriteResponse.newBuilder().setCommittedSize(nextOffset).build(); streamObserver.onNext(response); @@ -444,7 +436,7 @@ public void queryWriteStatus( if (receivedResourceName != null && receivedResourceName.equals(resourceName)) { assertThat(mustQueryWriteStatus).isTrue(); mustQueryWriteStatus = false; - committedSize = nextOffset; + committedSize = receivedComplete ? blob.length : skipSize; complete = receivedComplete; } else { committedSize = 0; @@ -460,6 +452,9 @@ public void queryWriteStatus( }); uploader.uploadBlob(context, hash, chunker, true); + byte[] decompressed = Zstd.decompress(output.toByteArray(), blob.length - skipSize); + assertThat(Arrays.equals(decompressed, 0, decompressed.length, blob, skipSize, blob.length)) + .isTrue(); // This test should not have triggered any retries. Mockito.verify(mockBackoff, Mockito.never()).nextDelayMillis(any(Exception.class));