diff --git a/README.md b/README.md
index 269e3c0862..038279f8d0 100644
--- a/README.md
+++ b/README.md
@@ -57,13 +57,13 @@ implementation 'com.google.cloud:google-cloud-storage'
If you are using Gradle without BOM, add this to your dependencies:
```Groovy
-implementation 'com.google.cloud:google-cloud-storage:2.33.0'
+implementation 'com.google.cloud:google-cloud-storage:2.34.0'
```
If you are using SBT, add this to your dependencies:
```Scala
-libraryDependencies += "com.google.cloud" % "google-cloud-storage" % "2.33.0"
+libraryDependencies += "com.google.cloud" % "google-cloud-storage" % "2.34.0"
```
@@ -428,7 +428,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-storage/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-storage.svg
-[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-storage/2.33.0
+[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-storage/2.34.0
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiBlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiBlobWriteSessionConfig.java
new file mode 100644
index 0000000000..06dd7e7a02
--- /dev/null
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiBlobWriteSessionConfig.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.api.core.BetaApi;
+import com.google.api.core.InternalApi;
+import com.google.api.gax.grpc.GrpcCallContext;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.storage.v2.BidiWriteObjectRequest;
+import com.google.storage.v2.BidiWriteObjectResponse;
+import java.io.IOException;
+import java.nio.channels.WritableByteChannel;
+import java.time.Clock;
+
+public class BidiBlobWriteSessionConfig extends BlobWriteSessionConfig
+ implements BlobWriteSessionConfig.GrpcCompatible {
+ private static final long serialVersionUID = -903533790705476197L;
+
+ private final int bufferSize;
+
+ @InternalApi
+ BidiBlobWriteSessionConfig(int bufferSize) {
+ this.bufferSize = bufferSize;
+ }
+
+ /**
+ * The number of bytes to hold in the buffer before each flush
+ *
+ *
Default: {@code 16777216 (16 MiB)}
+ *
+ * @see #withBufferSize(int)
+ * @since 2.34.0 This new api is in preview and is subject to breaking changes.
+ */
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ @Override
+ WriterFactory createFactory(Clock clock) throws IOException {
+ return new Factory(ByteSizeConstants._16MiB);
+ }
+
+ @InternalApi
+ private static final class Factory implements WriterFactory {
+ private static final Conversions.Decoder
+ WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER =
+ Conversions.grpc().blobInfo().compose(BidiWriteObjectResponse::getResource);
+
+ private final int bufferSize;
+
+ private Factory(int bufferSize) {
+ this.bufferSize = bufferSize;
+ }
+
+ @InternalApi
+ @Override
+ public WritableByteChannelSession, BlobInfo> writeSession(
+ StorageInternal s, BlobInfo info, UnifiedOpts.Opts opts) {
+ if (s instanceof GrpcStorageImpl) {
+ return new DecoratedWritableByteChannelSession<>(
+ new LazySession<>(
+ new LazyWriteChannel<>(
+ () -> {
+ GrpcStorageImpl grpc = (GrpcStorageImpl) s;
+ GrpcCallContext grpcCallContext =
+ opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
+ BidiWriteObjectRequest req = grpc.getBidiWriteObjectRequest(info, opts);
+
+ ApiFuture startResumableWrite =
+ grpc.startResumableWrite(grpcCallContext, req);
+ return ResumableMedia.gapic()
+ .write()
+ .bidiByteChannel(grpc.storageClient.bidiWriteObjectCallable())
+ .setHasher(Hasher.noop())
+ .setByteStringStrategy(ByteStringStrategy.copy())
+ .resumable()
+ .withRetryConfig(
+ grpc.getOptions(), grpc.retryAlgorithmManager.idempotent())
+ .buffered(BufferHandle.allocate(bufferSize))
+ .setStartAsync(startResumableWrite)
+ .build();
+ })),
+ WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER);
+ } else {
+ throw new IllegalStateException(
+ "Unknown Storage implementation: " + s.getClass().getName());
+ }
+ }
+ }
+
+ /**
+ * Create a new instance with the {@code bufferSize} set to the specified value.
+ *
+ *
Default: {@code 16777216 (16 MiB)}
+ *
+ * @param bufferSize The number of bytes to hold in the buffer before each flush. Must be >=
+ * {@code 262144 (256 KiB)}
+ * @return The new instance
+ * @see #getBufferSize()
+ * @since 2.34.0 This new api is in preview and is subject to breaking changes.
+ */
+ @BetaApi
+ public BidiBlobWriteSessionConfig withBufferSize(int bufferSize) {
+ Preconditions.checkArgument(
+ bufferSize >= ByteSizeConstants._256KiB,
+ "bufferSize must be >= %d",
+ ByteSizeConstants._256KiB);
+ return new BidiBlobWriteSessionConfig(bufferSize);
+ }
+
+ private static final class DecoratedWritableByteChannelSession
+ implements WritableByteChannelSession {
+
+ private final WritableByteChannelSession delegate;
+ private final Conversions.Decoder decoder;
+
+ private DecoratedWritableByteChannelSession(
+ WritableByteChannelSession delegate, Conversions.Decoder decoder) {
+ this.delegate = delegate;
+ this.decoder = decoder;
+ }
+
+ @Override
+ public WBC open() {
+ try {
+ return WritableByteChannelSession.super.open();
+ } catch (Exception e) {
+ throw StorageException.coalesce(e);
+ }
+ }
+
+ @Override
+ public ApiFuture openAsync() {
+ return delegate.openAsync();
+ }
+
+ @Override
+ public ApiFuture getResult() {
+ return ApiFutures.transform(
+ delegate.getResult(), decoder::decode, MoreExecutors.directExecutor());
+ }
+ }
+
+ private static final class LazySession
+ implements WritableByteChannelSession<
+ BufferedWritableByteChannelSession.BufferedWritableByteChannel, R> {
+ private final LazyWriteChannel lazy;
+
+ private LazySession(LazyWriteChannel lazy) {
+ this.lazy = lazy;
+ }
+
+ @Override
+ public ApiFuture openAsync() {
+ return lazy.getSession().openAsync();
+ }
+
+ @Override
+ public ApiFuture getResult() {
+ return lazy.getSession().getResult();
+ }
+ }
+}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java
new file mode 100644
index 0000000000..dab7b2474c
--- /dev/null
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import static com.google.cloud.storage.StorageV2ProtoUtils.fmtProto;
+
+import com.google.cloud.storage.BidiWriteCtx.BidiWriteObjectRequestBuilderFactory;
+import com.google.storage.v2.BidiWriteObjectRequest;
+import com.google.storage.v2.StartResumableWriteRequest;
+import com.google.storage.v2.StartResumableWriteResponse;
+import java.util.Objects;
+import java.util.function.Function;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+final class BidiResumableWrite implements BidiWriteObjectRequestBuilderFactory {
+
+ private final StartResumableWriteRequest req;
+ private final StartResumableWriteResponse res;
+
+ private final BidiWriteObjectRequest writeRequest;
+
+ public BidiResumableWrite(
+ StartResumableWriteRequest req,
+ StartResumableWriteResponse res,
+ Function f) {
+ this.req = req;
+ this.res = res;
+ this.writeRequest = f.apply(res.getUploadId());
+ }
+
+ public StartResumableWriteRequest getReq() {
+ return req;
+ }
+
+ public StartResumableWriteResponse getRes() {
+ return res;
+ }
+
+ @Override
+ public BidiWriteObjectRequest.Builder newBuilder() {
+ return writeRequest.toBuilder();
+ }
+
+ @Override
+ public @Nullable String bucketName() {
+ if (req.hasWriteObjectSpec() && req.getWriteObjectSpec().hasResource()) {
+ return req.getWriteObjectSpec().getResource().getBucket();
+ }
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "BidiResumableWrite{" + "req=" + fmtProto(req) + ", res=" + fmtProto(res) + '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ResumableWrite)) {
+ return false;
+ }
+ ResumableWrite resumableWrite = (ResumableWrite) o;
+ return Objects.equals(req, resumableWrite.getReq())
+ && Objects.equals(res, resumableWrite.getRes());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(req, res);
+ }
+
+ /**
+ * Helper function which is more specific than {@link Function#identity()}. Constraining the input
+ * and output to be exactly {@link BidiResumableWrite}.
+ */
+ static BidiResumableWrite identity(BidiResumableWrite w) {
+ return w;
+ }
+}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiWriteCtx.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiWriteCtx.java
new file mode 100644
index 0000000000..09e4177c27
--- /dev/null
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiWriteCtx.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import static com.google.cloud.storage.StorageV2ProtoUtils.fmtProto;
+
+import com.google.cloud.storage.BidiWriteCtx.BidiWriteObjectRequestBuilderFactory;
+import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
+import com.google.storage.v2.BidiWriteObjectRequest;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+final class BidiWriteCtx {
+
+ private final RequestFactoryT requestFactory;
+
+ private final AtomicLong totalSentBytes;
+ private final AtomicLong confirmedBytes;
+ private final AtomicReference cumulativeCrc32c;
+
+ BidiWriteCtx(RequestFactoryT requestFactory) {
+ this.requestFactory = requestFactory;
+ this.totalSentBytes = new AtomicLong(0);
+ this.confirmedBytes = new AtomicLong(0);
+ this.cumulativeCrc32c = new AtomicReference<>();
+ }
+
+ public RequestFactoryT getRequestFactory() {
+ return requestFactory;
+ }
+
+ public BidiWriteObjectRequest.Builder newRequestBuilder() {
+ return requestFactory.newBuilder();
+ }
+
+ public AtomicLong getTotalSentBytes() {
+ return totalSentBytes;
+ }
+
+ public AtomicLong getConfirmedBytes() {
+ return confirmedBytes;
+ }
+
+ public AtomicReference getCumulativeCrc32c() {
+ return cumulativeCrc32c;
+ }
+
+ // TODO: flush this out more
+ boolean isDirty() {
+ return confirmedBytes.get() == totalSentBytes.get();
+ }
+
+ @Override
+ public String toString() {
+ return "ServerState{"
+ + "requestFactory="
+ + requestFactory
+ + ", totalSentBytes="
+ + totalSentBytes
+ + ", confirmedBytes="
+ + confirmedBytes
+ + ", totalSentCrc32c="
+ + cumulativeCrc32c
+ + '}';
+ }
+
+ interface BidiWriteObjectRequestBuilderFactory {
+ BidiWriteObjectRequest.Builder newBuilder();
+
+ @Nullable
+ String bucketName();
+
+ static BidiSimpleWriteObjectRequestBuilderFactory simple(BidiWriteObjectRequest req) {
+ return new BidiSimpleWriteObjectRequestBuilderFactory(req);
+ }
+ }
+
+ static final class BidiSimpleWriteObjectRequestBuilderFactory
+ implements BidiWriteObjectRequestBuilderFactory {
+ private final BidiWriteObjectRequest req;
+
+ private BidiSimpleWriteObjectRequestBuilderFactory(BidiWriteObjectRequest req) {
+ this.req = req;
+ }
+
+ @Override
+ public BidiWriteObjectRequest.Builder newBuilder() {
+ return req.toBuilder();
+ }
+
+ @Override
+ public @Nullable String bucketName() {
+ if (req.hasWriteObjectSpec() && req.getWriteObjectSpec().hasResource()) {
+ return req.getWriteObjectSpec().getResource().getBucket();
+ }
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "SimpleBidiWriteObjectRequestBuilderFactory{" + "req=" + fmtProto(req) + '}';
+ }
+ }
+}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java
index 0f11cb19a4..43b68d8a0e 100644
--- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java
@@ -246,6 +246,21 @@ public static DefaultBlobWriteSessionConfig getDefault() {
return new DefaultBlobWriteSessionConfig(ByteSizeConstants._16MiB);
}
+ /**
+ * Factory to produce a resumable upload using a bi-directional stream. This should provide a
+ * small performance increase compared to a regular resumable upload.
+ *
+ *
Configuration of the buffer size can be performed via {@link
+ * BidiBlobWriteSessionConfig#withBufferSize(int)}.
+ *
+ * @since 2.34.0 This new api is in preview and is subject to breaking changes.
+ */
+ @BetaApi
+ @TransportCompatibility({Transport.GRPC})
+ public static BidiBlobWriteSessionConfig bidiWrite() {
+ return new BidiBlobWriteSessionConfig(ByteSizeConstants._16MiB);
+ }
+
/**
* Create a new {@link BlobWriteSessionConfig} which will first buffer the content of the object
* to a temporary file under {@code java.io.tmpdir}.
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java
new file mode 100644
index 0000000000..d19a880264
--- /dev/null
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import com.google.api.core.SettableApiFuture;
+import com.google.cloud.storage.ChunkSegmenter.ChunkSegment;
+import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
+import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+import com.google.storage.v2.BidiWriteObjectRequest;
+import com.google.storage.v2.BidiWriteObjectResponse;
+import com.google.storage.v2.ChecksummedData;
+import com.google.storage.v2.ObjectChecksums;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.List;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+final class GapicBidiUnbufferedWritableByteChannel<
+ RequestFactoryT extends BidiWriteCtx.BidiWriteObjectRequestBuilderFactory>
+ implements UnbufferedWritableByteChannel {
+
+ private final SettableApiFuture resultFuture;
+ private final ChunkSegmenter chunkSegmenter;
+
+ private final BidiWriteCtx writeCtx;
+ private final WriteFlushStrategy.BidiFlusher flusher;
+
+ private boolean open = true;
+ private boolean finished = false;
+
+ GapicBidiUnbufferedWritableByteChannel(
+ SettableApiFuture resultFuture,
+ ChunkSegmenter chunkSegmenter,
+ RequestFactoryT requestFactory,
+ WriteFlushStrategy.BidiFlusherFactory flusherFactory) {
+ this.resultFuture = resultFuture;
+ this.chunkSegmenter = chunkSegmenter;
+
+ this.writeCtx = new BidiWriteCtx<>(requestFactory);
+ this.flusher =
+ flusherFactory.newFlusher(
+ requestFactory.bucketName(), writeCtx.getConfirmedBytes()::set, resultFuture::set);
+ }
+
+ @Override
+ public long write(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOException {
+ return internalWrite(srcs, srcsOffset, srcsLength, false);
+ }
+
+ @Override
+ public boolean isOpen() {
+ return open;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!finished) {
+ BidiWriteObjectRequest message = finishMessage();
+ try {
+ flusher.close(message);
+ finished = true;
+ } catch (RuntimeException e) {
+ resultFuture.setException(e);
+ throw e;
+ }
+ } else {
+ flusher.close(null);
+ }
+ open = false;
+ }
+
+ @VisibleForTesting
+ BidiWriteCtx getWriteCtx() {
+ return writeCtx;
+ }
+
+ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, boolean finalize)
+ throws ClosedChannelException {
+ if (!open) {
+ throw new ClosedChannelException();
+ }
+
+ ChunkSegment[] data = chunkSegmenter.segmentBuffers(srcs, srcsOffset, srcsLength);
+
+ List messages = new ArrayList<>();
+
+ int bytesConsumed = 0;
+ for (ChunkSegment datum : data) {
+ Crc32cLengthKnown crc32c = datum.getCrc32c();
+ ByteString b = datum.getB();
+ int contentSize = b.size();
+ long offset = writeCtx.getTotalSentBytes().getAndAdd(contentSize);
+ Crc32cLengthKnown cumulative =
+ writeCtx
+ .getCumulativeCrc32c()
+ .accumulateAndGet(crc32c, chunkSegmenter.getHasher()::nullSafeConcat);
+ ChecksummedData.Builder checksummedData = ChecksummedData.newBuilder().setContent(b);
+ if (crc32c != null) {
+ checksummedData.setCrc32C(crc32c.getValue());
+ }
+ BidiWriteObjectRequest.Builder builder =
+ writeCtx
+ .newRequestBuilder()
+ .setWriteOffset(offset)
+ .setChecksummedData(checksummedData.build());
+ if (!datum.isOnlyFullBlocks()) {
+ builder.setFinishWrite(true);
+ if (cumulative != null) {
+ builder.setObjectChecksums(
+ ObjectChecksums.newBuilder().setCrc32C(cumulative.getValue()).build());
+ }
+ finished = true;
+ }
+
+ BidiWriteObjectRequest build = builder.build();
+ messages.add(build);
+ bytesConsumed += contentSize;
+ }
+ if (finalize && !finished) {
+ messages.add(finishMessage());
+ finished = true;
+ }
+
+ try {
+ flusher.flush(messages);
+ } catch (RuntimeException e) {
+ resultFuture.setException(e);
+ throw e;
+ }
+
+ return bytesConsumed;
+ }
+
+ @NonNull
+ private BidiWriteObjectRequest finishMessage() {
+ long offset = writeCtx.getTotalSentBytes().get();
+ Crc32cLengthKnown crc32cValue = writeCtx.getCumulativeCrc32c().get();
+
+ BidiWriteObjectRequest.Builder b =
+ writeCtx.newRequestBuilder().setFinishWrite(true).setWriteOffset(offset);
+ if (crc32cValue != null) {
+ b.setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(crc32cValue.getValue()).build());
+ }
+ BidiWriteObjectRequest message = b.build();
+ return message;
+ }
+}
diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java
new file mode 100644
index 0000000000..e26d33bbb0
--- /dev/null
+++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2023 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.storage;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.SettableApiFuture;
+import com.google.api.gax.retrying.ResultRetryAlgorithm;
+import com.google.api.gax.rpc.BidiStreamingCallable;
+import com.google.cloud.storage.ChannelSession.BufferedWriteSession;
+import com.google.cloud.storage.Retrying.RetryingDependencies;
+import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
+import com.google.storage.v2.BidiWriteObjectRequest;
+import com.google.storage.v2.BidiWriteObjectResponse;
+import com.google.storage.v2.ServiceConstants.Values;
+import java.nio.ByteBuffer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+final class GapicBidiWritableByteChannelSessionBuilder {
+
+ private final BidiStreamingCallable write;
+ private Hasher hasher;
+ private ByteStringStrategy byteStringStrategy;
+
+ GapicBidiWritableByteChannelSessionBuilder(
+ BidiStreamingCallable write) {
+ this.write = write;
+ this.hasher = Hasher.noop();
+ this.byteStringStrategy = ByteStringStrategy.copy();
+ }
+
+ /**
+ * Set the {@link Hasher} to apply to the bytes passing through the built session's channel.
+ *
+ *
Default: {@link Hasher#noop()}
+ *
+ * @see Hasher#enabled()
+ * @see Hasher#noop()
+ */
+ GapicBidiWritableByteChannelSessionBuilder setHasher(Hasher hasher) {
+ this.hasher = requireNonNull(hasher, "hasher must be non null");
+ return this;
+ }
+
+ /**
+ * Set the {@link ByteStringStrategy} to be used when constructing {@link
+ * com.google.protobuf.ByteString ByteString}s from {@link ByteBuffer}s.
+ *
+ *
Note: usage of {@link ByteStringStrategy#noCopy()} requires that any {@link ByteBuffer}
+ * passed to the session's channel not be modified while {@link
+ * java.nio.channels.WritableByteChannel#write(ByteBuffer)} is processing.
+ *
+ * @see ByteStringStrategy#copy()
+ * @see ByteStringStrategy#noCopy()
+ */
+ GapicBidiWritableByteChannelSessionBuilder setByteStringStrategy(
+ ByteStringStrategy byteStringStrategy) {
+ this.byteStringStrategy =
+ requireNonNull(byteStringStrategy, "byteStringStrategy must be non null");
+ return this;
+ }
+
+ /**
+ * When constructing a bidi channel session, there is always a {@link
+ * GapicBidiUnbufferedWritableByteChannel} at the bottom of it. This method creates a BiFunction
+ * which will instantiate the {@link GapicBidiUnbufferedWritableByteChannel} when provided with a
+ * {@code StartT} value and a {@code SettableApiFuture}.
+ *
+ *
As part of providing the function, the provided parameters {@code BidiFlusherFactory} and
+ * {@code f} are "bound" into the returned function. In conjunction with the configured fields of
+ * this class a new instance of {@link GapicBidiUnbufferedWritableByteChannel} can be constructed.
+ */
+ private
+ BiFunction, UnbufferedWritableByteChannel>
+ bindFunction(
+ WriteFlushStrategy.BidiFlusherFactory flusherFactory,
+ Function f) {
+ // it is theoretically possible that the setter methods for the following variables could
+ // be called again between when this method is invoked and the resulting function is invoked.
+ // To ensure we are using the specified values at the point in time they are bound to the
+ // function read them into local variables which will be closed over rather than the class
+ // fields.
+ ByteStringStrategy boundStrategy = byteStringStrategy;
+ Hasher boundHasher = hasher;
+ return (start, resultFuture) ->
+ new GapicBidiUnbufferedWritableByteChannel<>(
+ resultFuture,
+ new ChunkSegmenter(boundHasher, boundStrategy, Values.MAX_WRITE_CHUNK_BYTES_VALUE),
+ f.apply(start),
+ flusherFactory);
+ }
+
+ GapicBidiWritableByteChannelSessionBuilder.ResumableUploadBuilder resumable() {
+ return new GapicBidiWritableByteChannelSessionBuilder.ResumableUploadBuilder();
+ }
+
+ final class ResumableUploadBuilder {
+
+ private RetryingDependencies deps;
+ private ResultRetryAlgorithm> alg;
+
+ ResumableUploadBuilder() {
+ this.deps = RetryingDependencies.attemptOnce();
+ this.alg = Retrying.neverRetry();
+ }
+
+ ResumableUploadBuilder withRetryConfig(RetryingDependencies deps, ResultRetryAlgorithm> alg) {
+ this.deps = requireNonNull(deps, "deps must be non null");
+ this.alg = requireNonNull(alg, "alg must be non null");
+ return this;
+ }
+
+ /**
+ * Buffer using {@code byteBuffer} worth of space before attempting to flush.
+ *
+ *