diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedWritableByteChannel.java index 35ad97ffea..8201fbc67d 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedWritableByteChannel.java @@ -36,9 +36,9 @@ final class ApiaryUnbufferedWritableByteChannel implements UnbufferedWritableByt private final SettableApiFuture result; private final LongConsumer committedBytesCallback; - private boolean open = true; + private boolean open; private long cumulativeByteCount; - private boolean finished = false; + private boolean finished; ApiaryUnbufferedWritableByteChannel( HttpClientContext httpClientContext, @@ -50,6 +50,9 @@ final class ApiaryUnbufferedWritableByteChannel implements UnbufferedWritableByt this.session = ResumableSession.json(httpClientContext, deps, alg, resumableWrite); this.result = result; this.committedBytesCallback = committedBytesCallback; + this.open = true; + this.cumulativeByteCount = resumableWrite.getBeginOffset(); + this.finished = false; } @Override diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java index 28c4b8d1e6..c74c94f4cd 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java @@ -82,7 +82,7 @@ public WriteChannel restore() { entity != null ? Conversions.apiary().blobInfo().encode(entity) : null; return new BlobWriteChannelV2.BlobWriteChannelV2State( (HttpStorageOptions) serviceOptions, - JsonResumableWrite.of(encode, ImmutableMap.of(), uploadId), + JsonResumableWrite.of(encode, ImmutableMap.of(), uploadId, position), position, isOpen, chunkSize, diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannelV2.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannelV2.java index 1bb0742513..48bad4de6c 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannelV2.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannelV2.java @@ -107,6 +107,10 @@ static final class BlobWriteChannelV2State @Override public WriteChannel restore() { + JsonResumableWrite resumableWrite = this.resumableWrite; + if (position != null) { + resumableWrite = resumableWrite.withBeginOffset(position); + } BlobWriteChannelV2 channel = new BlobWriteChannelV2(BlobReadChannelContext.from(options), resumableWrite); if (chunkSize != null) { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java index 5abb43c16b..5a4864996f 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java @@ -153,7 +153,7 @@ public void rewindTo(long offset) { success = true; //noinspection DataFlowIssue compareTo result will filter out actualSize == null return ResumableOperationResult.complete(storageObject, actualSize.longValue()); - } else if (compare < 0) { + } else if (compare > 0) { StorageException se = JsonResumableSessionFailureScenario.SCENARIO_4_1.toStorageException( uploadId, response, null, toString(storageObject)); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableWrite.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableWrite.java index 4ec1fc8919..336ce0e477 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableWrite.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableWrite.java @@ -16,6 +16,8 @@ package com.google.cloud.storage; +import static com.google.common.base.Preconditions.checkArgument; + import com.google.api.services.storage.model.StorageObject; import com.google.cloud.storage.spi.v1.StorageRpc; import com.google.common.base.MoreObjects; @@ -41,6 +43,7 @@ final class JsonResumableWrite implements Serializable { @MonotonicNonNull private final String signedUrl; @NonNull private final String uploadId; + private final long beginOffset; private volatile String objectJson; @@ -48,17 +51,32 @@ private JsonResumableWrite( StorageObject object, Map options, String signedUrl, - @NonNull String uploadId) { + @NonNull String uploadId, + long beginOffset) { this.object = object; this.options = options; this.signedUrl = signedUrl; this.uploadId = uploadId; + this.beginOffset = beginOffset; } public @NonNull String getUploadId() { return uploadId; } + public long getBeginOffset() { + return beginOffset; + } + + public JsonResumableWrite withBeginOffset(long newBeginOffset) { + checkArgument( + newBeginOffset >= beginOffset, + "New beginOffset must be >= existing beginOffset (%s >= %s)", + newBeginOffset, + beginOffset); + return new JsonResumableWrite(object, options, signedUrl, uploadId, newBeginOffset); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -68,15 +86,16 @@ public boolean equals(Object o) { return false; } JsonResumableWrite that = (JsonResumableWrite) o; - return Objects.equals(object, that.object) + return beginOffset == that.beginOffset + && Objects.equals(object, that.object) && Objects.equals(options, that.options) && Objects.equals(signedUrl, that.signedUrl) - && uploadId.equals(that.uploadId); + && Objects.equals(uploadId, that.uploadId); } @Override public int hashCode() { - return Objects.hash(object, options, signedUrl, uploadId); + return Objects.hash(object, options, signedUrl, uploadId, beginOffset); } @Override @@ -86,6 +105,7 @@ public String toString() { .add("options", options) .add("signedUrl", signedUrl) .add("uploadId", uploadId) + .add("beginOffset", beginOffset) .toString(); } @@ -112,11 +132,11 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE } static JsonResumableWrite of( - StorageObject req, Map options, String uploadId) { - return new JsonResumableWrite(req, options, null, uploadId); + StorageObject req, Map options, String uploadId, long beginOffset) { + return new JsonResumableWrite(req, options, null, uploadId, beginOffset); } - static JsonResumableWrite of(String signedUrl, String uploadId) { - return new JsonResumableWrite(null, null, signedUrl, uploadId); + static JsonResumableWrite of(String signedUrl, String uploadId, long beginOffset) { + return new JsonResumableWrite(null, null, signedUrl, uploadId, beginOffset); } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java index fd387717b1..d4ad26015f 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java @@ -243,7 +243,7 @@ public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOp optionsMap, retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap)); JsonResumableWrite jsonResumableWrite = - JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get()); + JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0); JsonResumableSession session = ResumableSession.json( @@ -671,7 +671,7 @@ public StorageWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) optionsMap, retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap)); JsonResumableWrite jsonResumableWrite = - JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get()); + JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0); return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite); } @@ -688,7 +688,7 @@ public StorageWriteChannel writer(URL signedURL) { ResumableMedia.startUploadForSignedUrl( getOptions(), signedURL, forResumableUploadSessionCreate); JsonResumableWrite jsonResumableWrite = - JsonResumableWrite.of(signedUrlString, uploadIdSupplier.get()); + JsonResumableWrite.of(signedUrlString, uploadIdSupplier.get(), 0); return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite); } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITJsonResumableSessionTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITJsonResumableSessionTest.java index 108e6d6de4..7336749dbb 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITJsonResumableSessionTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITJsonResumableSessionTest.java @@ -113,7 +113,8 @@ public void rewindWillQueryStatusOnlyWhenDirty() throws Exception { URI endpoint = fakeHttpServer.getEndpoint(); String uploadUrl = String.format("%s/upload/%s", endpoint.toString(), UUID.randomUUID()); - JsonResumableWrite resumableWrite = JsonResumableWrite.of(null, ImmutableMap.of(), uploadUrl); + JsonResumableWrite resumableWrite = + JsonResumableWrite.of(null, ImmutableMap.of(), uploadUrl, 0); JsonResumableSession session = new JsonResumableSession( httpClientContext, RETRYING_DEPENDENCIES, RETRY_ALGORITHM, resumableWrite); @@ -167,7 +168,8 @@ public void retryAttemptWillReturnQueryResultIfPersistedSizeMatchesSpecifiedEndO URI endpoint = fakeHttpServer.getEndpoint(); String uploadUrl = String.format("%s/upload/%s", endpoint.toString(), UUID.randomUUID()); - JsonResumableWrite resumableWrite = JsonResumableWrite.of(null, ImmutableMap.of(), uploadUrl); + JsonResumableWrite resumableWrite = + JsonResumableWrite.of(null, ImmutableMap.of(), uploadUrl, 0); JsonResumableSession session = new JsonResumableSession( httpClientContext, RETRYING_DEPENDENCIES, RETRY_ALGORITHM, resumableWrite); @@ -234,7 +236,8 @@ public void rewindOfContentIsRelativeToItsBeginOffsetOfTheOverallObject() throws URI endpoint = fakeHttpServer.getEndpoint(); String uploadUrl = String.format("%s/upload/%s", endpoint.toString(), UUID.randomUUID()); - JsonResumableWrite resumableWrite = JsonResumableWrite.of(null, ImmutableMap.of(), uploadUrl); + JsonResumableWrite resumableWrite = + JsonResumableWrite.of(null, ImmutableMap.of(), uploadUrl, 0); JsonResumableSession session = new JsonResumableSession( httpClientContext, RETRYING_DEPENDENCIES, RETRY_ALGORITHM, resumableWrite); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/SerializationTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/SerializationTest.java index 1e5eda2d44..87b88a78b8 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/SerializationTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/SerializationTest.java @@ -216,7 +216,8 @@ protected Restorable[] restorableObjects() { JsonResumableWrite.of( Conversions.apiary().blobInfo().encode(BlobInfo.newBuilder("b", "n").build()), ImmutableMap.of(), - "upload-id")); + "upload-id", + 0)); return new Restorable[] {readerV2, writer}; } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteChannelTest.java index 9b4ab36689..338f2d9be7 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteChannelTest.java @@ -16,6 +16,7 @@ package com.google.cloud.storage.it; +import static com.google.cloud.storage.TestUtils.xxd; import static com.google.common.truth.Truth.assertThat; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.assertArrayEquals; @@ -27,6 +28,7 @@ import com.google.api.client.json.JsonParser; import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.cloud.NoCredentials; +import com.google.cloud.RestorableState; import com.google.cloud.WriteChannel; import com.google.cloud.conformance.storage.v1.InstructionList; import com.google.cloud.conformance.storage.v1.Method; @@ -53,6 +55,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Optional; import java.util.logging.Logger; import org.junit.Test; @@ -153,6 +156,39 @@ public void changeChunkSizeAfterWrite() throws IOException { } } + @Test + public void restoreProperlyPlumbsBeginOffset() throws IOException { + BlobInfo info = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build(); + int _256KiB = 256 * 1024; + + byte[] bytes1 = DataGenerator.base64Characters().genBytes(_256KiB); + byte[] bytes2 = DataGenerator.base64Characters().genBytes(73); + + int allLength = bytes1.length + bytes2.length; + byte[] expected = Arrays.copyOf(bytes1, allLength); + System.arraycopy(bytes2, 0, expected, bytes1.length, bytes2.length); + String xxdExpected = xxd(expected); + + RestorableState capture; + { + WriteChannel writer = storage.writer(info, BlobWriteOption.doesNotExist()); + writer.setChunkSize(_256KiB); + writer.write(ByteBuffer.wrap(bytes1)); + // explicitly do not close writer, it will finalize the session + capture = writer.capture(); + } + + assertThat(capture).isNotNull(); + WriteChannel restored = capture.restore(); + restored.write(ByteBuffer.wrap(bytes2)); + restored.close(); + + byte[] readAllBytes = storage.readAllBytes(info.getBlobId()); + assertThat(readAllBytes).hasLength(expected.length); + String xxdActual = xxd(readAllBytes); + assertThat(xxdActual).isEqualTo(xxdExpected); + } + private void doJsonUnexpectedEOFTest(int contentSize, int cappedByteCount) throws IOException { String blobPath = String.format("%s/%s/blob", generator.randomObjectName(), NOW_STRING);