From 47fde853c17d16689a732d5d8eadc70a45efea49 Mon Sep 17 00:00:00 2001 From: JesseLovelace <43148100+JesseLovelace@users.noreply.github.com> Date: Wed, 21 Feb 2024 10:28:23 -0800 Subject: [PATCH] feat: Add Bidi write feature (#2343) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Add Bidi write feature * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * add in bidi ssb workload * WIP * Add in bidi fields * Add in DefaultBlobWriteSession test * fix copyright * remove bucket creation line * cleanup object created by ssb * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix up comments --------- Co-authored-by: Owl Bot Co-authored-by: Sydney Munro --- README.md | 6 +- .../storage/BidiBlobWriteSessionConfig.java | 180 ++++++++++++++++ .../cloud/storage/BidiResumableWrite.java | 96 +++++++++ .../google/cloud/storage/BidiWriteCtx.java | 119 ++++++++++ .../storage/BlobWriteSessionConfigs.java | 15 ++ ...apicBidiUnbufferedWritableByteChannel.java | 165 ++++++++++++++ ...BidiWritableByteChannelSessionBuilder.java | 178 +++++++++++++++ .../storage/GapicUploadSessionBuilder.java | 31 +++ .../storage/GrpcRetryAlgorithmManager.java | 7 + .../google/cloud/storage/GrpcStorageImpl.java | 36 ++++ .../com/google/cloud/storage/UnifiedOpts.java | 9 + .../cloud/storage/WriteFlushStrategy.java | 203 ++++++++++++++++++ .../storage/it/ITBlobWriteSessionTest.java | 17 ++ .../cloud/storage/benchmarking/Bidi.java | 88 ++++++++ .../StorageSharedBenchmarkingCli.java | 50 +++++ .../StorageSharedBenchmarkingUtils.java | 25 ++- .../cloud/storage/benchmarking/W1R3.java | 29 +-- 17 files changed, 1226 insertions(+), 28 deletions(-) create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/BidiBlobWriteSessionConfig.java create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/BidiWriteCtx.java create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java create mode 100644 storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/Bidi.java diff --git a/README.md b/README.md index 269e3c0862..038279f8d0 100644 --- a/README.md +++ b/README.md @@ -57,13 +57,13 @@ implementation 'com.google.cloud:google-cloud-storage' If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-storage:2.33.0' +implementation 'com.google.cloud:google-cloud-storage:2.34.0' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-storage" % "2.33.0" +libraryDependencies += "com.google.cloud" % "google-cloud-storage" % "2.34.0" ``` @@ -428,7 +428,7 @@ Java is a registered trademark of Oracle and/or its affiliates. [kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-storage/java11.html [stability-image]: https://img.shields.io/badge/stability-stable-green [maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-storage.svg -[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-storage/2.33.0 +[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-storage/2.34.0 [authentication]: https://github.com/googleapis/google-cloud-java#authentication [auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes [predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiBlobWriteSessionConfig.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiBlobWriteSessionConfig.java new file mode 100644 index 0000000000..06dd7e7a02 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiBlobWriteSessionConfig.java @@ -0,0 +1,180 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.api.core.BetaApi; +import com.google.api.core.InternalApi; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.storage.v2.BidiWriteObjectRequest; +import com.google.storage.v2.BidiWriteObjectResponse; +import java.io.IOException; +import java.nio.channels.WritableByteChannel; +import java.time.Clock; + +public class BidiBlobWriteSessionConfig extends BlobWriteSessionConfig + implements BlobWriteSessionConfig.GrpcCompatible { + private static final long serialVersionUID = -903533790705476197L; + + private final int bufferSize; + + @InternalApi + BidiBlobWriteSessionConfig(int bufferSize) { + this.bufferSize = bufferSize; + } + + /** + * The number of bytes to hold in the buffer before each flush + * + *

Default: {@code 16777216 (16 MiB)} + * + * @see #withBufferSize(int) + * @since 2.34.0 This new api is in preview and is subject to breaking changes. + */ + public int getBufferSize() { + return bufferSize; + } + + @Override + WriterFactory createFactory(Clock clock) throws IOException { + return new Factory(ByteSizeConstants._16MiB); + } + + @InternalApi + private static final class Factory implements WriterFactory { + private static final Conversions.Decoder + WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER = + Conversions.grpc().blobInfo().compose(BidiWriteObjectResponse::getResource); + + private final int bufferSize; + + private Factory(int bufferSize) { + this.bufferSize = bufferSize; + } + + @InternalApi + @Override + public WritableByteChannelSession writeSession( + StorageInternal s, BlobInfo info, UnifiedOpts.Opts opts) { + if (s instanceof GrpcStorageImpl) { + return new DecoratedWritableByteChannelSession<>( + new LazySession<>( + new LazyWriteChannel<>( + () -> { + GrpcStorageImpl grpc = (GrpcStorageImpl) s; + GrpcCallContext grpcCallContext = + opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); + BidiWriteObjectRequest req = grpc.getBidiWriteObjectRequest(info, opts); + + ApiFuture startResumableWrite = + grpc.startResumableWrite(grpcCallContext, req); + return ResumableMedia.gapic() + .write() + .bidiByteChannel(grpc.storageClient.bidiWriteObjectCallable()) + .setHasher(Hasher.noop()) + .setByteStringStrategy(ByteStringStrategy.copy()) + .resumable() + .withRetryConfig( + grpc.getOptions(), grpc.retryAlgorithmManager.idempotent()) + .buffered(BufferHandle.allocate(bufferSize)) + .setStartAsync(startResumableWrite) + .build(); + })), + WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER); + } else { + throw new IllegalStateException( + "Unknown Storage implementation: " + s.getClass().getName()); + } + } + } + + /** + * Create a new instance with the {@code bufferSize} set to the specified value. + * + *

Default: {@code 16777216 (16 MiB)} + * + * @param bufferSize The number of bytes to hold in the buffer before each flush. Must be >= + * {@code 262144 (256 KiB)} + * @return The new instance + * @see #getBufferSize() + * @since 2.34.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + public BidiBlobWriteSessionConfig withBufferSize(int bufferSize) { + Preconditions.checkArgument( + bufferSize >= ByteSizeConstants._256KiB, + "bufferSize must be >= %d", + ByteSizeConstants._256KiB); + return new BidiBlobWriteSessionConfig(bufferSize); + } + + private static final class DecoratedWritableByteChannelSession + implements WritableByteChannelSession { + + private final WritableByteChannelSession delegate; + private final Conversions.Decoder decoder; + + private DecoratedWritableByteChannelSession( + WritableByteChannelSession delegate, Conversions.Decoder decoder) { + this.delegate = delegate; + this.decoder = decoder; + } + + @Override + public WBC open() { + try { + return WritableByteChannelSession.super.open(); + } catch (Exception e) { + throw StorageException.coalesce(e); + } + } + + @Override + public ApiFuture openAsync() { + return delegate.openAsync(); + } + + @Override + public ApiFuture getResult() { + return ApiFutures.transform( + delegate.getResult(), decoder::decode, MoreExecutors.directExecutor()); + } + } + + private static final class LazySession + implements WritableByteChannelSession< + BufferedWritableByteChannelSession.BufferedWritableByteChannel, R> { + private final LazyWriteChannel lazy; + + private LazySession(LazyWriteChannel lazy) { + this.lazy = lazy; + } + + @Override + public ApiFuture openAsync() { + return lazy.getSession().openAsync(); + } + + @Override + public ApiFuture getResult() { + return lazy.getSession().getResult(); + } + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java new file mode 100644 index 0000000000..dab7b2474c --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java @@ -0,0 +1,96 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static com.google.cloud.storage.StorageV2ProtoUtils.fmtProto; + +import com.google.cloud.storage.BidiWriteCtx.BidiWriteObjectRequestBuilderFactory; +import com.google.storage.v2.BidiWriteObjectRequest; +import com.google.storage.v2.StartResumableWriteRequest; +import com.google.storage.v2.StartResumableWriteResponse; +import java.util.Objects; +import java.util.function.Function; +import org.checkerframework.checker.nullness.qual.Nullable; + +final class BidiResumableWrite implements BidiWriteObjectRequestBuilderFactory { + + private final StartResumableWriteRequest req; + private final StartResumableWriteResponse res; + + private final BidiWriteObjectRequest writeRequest; + + public BidiResumableWrite( + StartResumableWriteRequest req, + StartResumableWriteResponse res, + Function f) { + this.req = req; + this.res = res; + this.writeRequest = f.apply(res.getUploadId()); + } + + public StartResumableWriteRequest getReq() { + return req; + } + + public StartResumableWriteResponse getRes() { + return res; + } + + @Override + public BidiWriteObjectRequest.Builder newBuilder() { + return writeRequest.toBuilder(); + } + + @Override + public @Nullable String bucketName() { + if (req.hasWriteObjectSpec() && req.getWriteObjectSpec().hasResource()) { + return req.getWriteObjectSpec().getResource().getBucket(); + } + return null; + } + + @Override + public String toString() { + return "BidiResumableWrite{" + "req=" + fmtProto(req) + ", res=" + fmtProto(res) + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ResumableWrite)) { + return false; + } + ResumableWrite resumableWrite = (ResumableWrite) o; + return Objects.equals(req, resumableWrite.getReq()) + && Objects.equals(res, resumableWrite.getRes()); + } + + @Override + public int hashCode() { + return Objects.hash(req, res); + } + + /** + * Helper function which is more specific than {@link Function#identity()}. Constraining the input + * and output to be exactly {@link BidiResumableWrite}. + */ + static BidiResumableWrite identity(BidiResumableWrite w) { + return w; + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiWriteCtx.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiWriteCtx.java new file mode 100644 index 0000000000..09e4177c27 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BidiWriteCtx.java @@ -0,0 +1,119 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static com.google.cloud.storage.StorageV2ProtoUtils.fmtProto; + +import com.google.cloud.storage.BidiWriteCtx.BidiWriteObjectRequestBuilderFactory; +import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown; +import com.google.storage.v2.BidiWriteObjectRequest; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.checkerframework.checker.nullness.qual.Nullable; + +final class BidiWriteCtx { + + private final RequestFactoryT requestFactory; + + private final AtomicLong totalSentBytes; + private final AtomicLong confirmedBytes; + private final AtomicReference cumulativeCrc32c; + + BidiWriteCtx(RequestFactoryT requestFactory) { + this.requestFactory = requestFactory; + this.totalSentBytes = new AtomicLong(0); + this.confirmedBytes = new AtomicLong(0); + this.cumulativeCrc32c = new AtomicReference<>(); + } + + public RequestFactoryT getRequestFactory() { + return requestFactory; + } + + public BidiWriteObjectRequest.Builder newRequestBuilder() { + return requestFactory.newBuilder(); + } + + public AtomicLong getTotalSentBytes() { + return totalSentBytes; + } + + public AtomicLong getConfirmedBytes() { + return confirmedBytes; + } + + public AtomicReference getCumulativeCrc32c() { + return cumulativeCrc32c; + } + + // TODO: flush this out more + boolean isDirty() { + return confirmedBytes.get() == totalSentBytes.get(); + } + + @Override + public String toString() { + return "ServerState{" + + "requestFactory=" + + requestFactory + + ", totalSentBytes=" + + totalSentBytes + + ", confirmedBytes=" + + confirmedBytes + + ", totalSentCrc32c=" + + cumulativeCrc32c + + '}'; + } + + interface BidiWriteObjectRequestBuilderFactory { + BidiWriteObjectRequest.Builder newBuilder(); + + @Nullable + String bucketName(); + + static BidiSimpleWriteObjectRequestBuilderFactory simple(BidiWriteObjectRequest req) { + return new BidiSimpleWriteObjectRequestBuilderFactory(req); + } + } + + static final class BidiSimpleWriteObjectRequestBuilderFactory + implements BidiWriteObjectRequestBuilderFactory { + private final BidiWriteObjectRequest req; + + private BidiSimpleWriteObjectRequestBuilderFactory(BidiWriteObjectRequest req) { + this.req = req; + } + + @Override + public BidiWriteObjectRequest.Builder newBuilder() { + return req.toBuilder(); + } + + @Override + public @Nullable String bucketName() { + if (req.hasWriteObjectSpec() && req.getWriteObjectSpec().hasResource()) { + return req.getWriteObjectSpec().getResource().getBucket(); + } + return null; + } + + @Override + public String toString() { + return "SimpleBidiWriteObjectRequestBuilderFactory{" + "req=" + fmtProto(req) + '}'; + } + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java index 0f11cb19a4..43b68d8a0e 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java @@ -246,6 +246,21 @@ public static DefaultBlobWriteSessionConfig getDefault() { return new DefaultBlobWriteSessionConfig(ByteSizeConstants._16MiB); } + /** + * Factory to produce a resumable upload using a bi-directional stream. This should provide a + * small performance increase compared to a regular resumable upload. + * + *

Configuration of the buffer size can be performed via {@link + * BidiBlobWriteSessionConfig#withBufferSize(int)}. + * + * @since 2.34.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + @TransportCompatibility({Transport.GRPC}) + public static BidiBlobWriteSessionConfig bidiWrite() { + return new BidiBlobWriteSessionConfig(ByteSizeConstants._16MiB); + } + /** * Create a new {@link BlobWriteSessionConfig} which will first buffer the content of the object * to a temporary file under {@code java.io.tmpdir}. diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java new file mode 100644 index 0000000000..d19a880264 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java @@ -0,0 +1,165 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import com.google.api.core.SettableApiFuture; +import com.google.cloud.storage.ChunkSegmenter.ChunkSegment; +import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown; +import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; +import com.google.storage.v2.BidiWriteObjectRequest; +import com.google.storage.v2.BidiWriteObjectResponse; +import com.google.storage.v2.ChecksummedData; +import com.google.storage.v2.ObjectChecksums; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.List; +import org.checkerframework.checker.nullness.qual.NonNull; + +final class GapicBidiUnbufferedWritableByteChannel< + RequestFactoryT extends BidiWriteCtx.BidiWriteObjectRequestBuilderFactory> + implements UnbufferedWritableByteChannel { + + private final SettableApiFuture resultFuture; + private final ChunkSegmenter chunkSegmenter; + + private final BidiWriteCtx writeCtx; + private final WriteFlushStrategy.BidiFlusher flusher; + + private boolean open = true; + private boolean finished = false; + + GapicBidiUnbufferedWritableByteChannel( + SettableApiFuture resultFuture, + ChunkSegmenter chunkSegmenter, + RequestFactoryT requestFactory, + WriteFlushStrategy.BidiFlusherFactory flusherFactory) { + this.resultFuture = resultFuture; + this.chunkSegmenter = chunkSegmenter; + + this.writeCtx = new BidiWriteCtx<>(requestFactory); + this.flusher = + flusherFactory.newFlusher( + requestFactory.bucketName(), writeCtx.getConfirmedBytes()::set, resultFuture::set); + } + + @Override + public long write(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOException { + return internalWrite(srcs, srcsOffset, srcsLength, false); + } + + @Override + public boolean isOpen() { + return open; + } + + @Override + public void close() throws IOException { + if (!finished) { + BidiWriteObjectRequest message = finishMessage(); + try { + flusher.close(message); + finished = true; + } catch (RuntimeException e) { + resultFuture.setException(e); + throw e; + } + } else { + flusher.close(null); + } + open = false; + } + + @VisibleForTesting + BidiWriteCtx getWriteCtx() { + return writeCtx; + } + + private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, boolean finalize) + throws ClosedChannelException { + if (!open) { + throw new ClosedChannelException(); + } + + ChunkSegment[] data = chunkSegmenter.segmentBuffers(srcs, srcsOffset, srcsLength); + + List messages = new ArrayList<>(); + + int bytesConsumed = 0; + for (ChunkSegment datum : data) { + Crc32cLengthKnown crc32c = datum.getCrc32c(); + ByteString b = datum.getB(); + int contentSize = b.size(); + long offset = writeCtx.getTotalSentBytes().getAndAdd(contentSize); + Crc32cLengthKnown cumulative = + writeCtx + .getCumulativeCrc32c() + .accumulateAndGet(crc32c, chunkSegmenter.getHasher()::nullSafeConcat); + ChecksummedData.Builder checksummedData = ChecksummedData.newBuilder().setContent(b); + if (crc32c != null) { + checksummedData.setCrc32C(crc32c.getValue()); + } + BidiWriteObjectRequest.Builder builder = + writeCtx + .newRequestBuilder() + .setWriteOffset(offset) + .setChecksummedData(checksummedData.build()); + if (!datum.isOnlyFullBlocks()) { + builder.setFinishWrite(true); + if (cumulative != null) { + builder.setObjectChecksums( + ObjectChecksums.newBuilder().setCrc32C(cumulative.getValue()).build()); + } + finished = true; + } + + BidiWriteObjectRequest build = builder.build(); + messages.add(build); + bytesConsumed += contentSize; + } + if (finalize && !finished) { + messages.add(finishMessage()); + finished = true; + } + + try { + flusher.flush(messages); + } catch (RuntimeException e) { + resultFuture.setException(e); + throw e; + } + + return bytesConsumed; + } + + @NonNull + private BidiWriteObjectRequest finishMessage() { + long offset = writeCtx.getTotalSentBytes().get(); + Crc32cLengthKnown crc32cValue = writeCtx.getCumulativeCrc32c().get(); + + BidiWriteObjectRequest.Builder b = + writeCtx.newRequestBuilder().setFinishWrite(true).setWriteOffset(offset); + if (crc32cValue != null) { + b.setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(crc32cValue.getValue()).build()); + } + BidiWriteObjectRequest message = b.build(); + return message; + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java new file mode 100644 index 0000000000..e26d33bbb0 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiWritableByteChannelSessionBuilder.java @@ -0,0 +1,178 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static java.util.Objects.requireNonNull; + +import com.google.api.core.ApiFuture; +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.retrying.ResultRetryAlgorithm; +import com.google.api.gax.rpc.BidiStreamingCallable; +import com.google.cloud.storage.ChannelSession.BufferedWriteSession; +import com.google.cloud.storage.Retrying.RetryingDependencies; +import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel; +import com.google.storage.v2.BidiWriteObjectRequest; +import com.google.storage.v2.BidiWriteObjectResponse; +import com.google.storage.v2.ServiceConstants.Values; +import java.nio.ByteBuffer; +import java.util.function.BiFunction; +import java.util.function.Function; + +final class GapicBidiWritableByteChannelSessionBuilder { + + private final BidiStreamingCallable write; + private Hasher hasher; + private ByteStringStrategy byteStringStrategy; + + GapicBidiWritableByteChannelSessionBuilder( + BidiStreamingCallable write) { + this.write = write; + this.hasher = Hasher.noop(); + this.byteStringStrategy = ByteStringStrategy.copy(); + } + + /** + * Set the {@link Hasher} to apply to the bytes passing through the built session's channel. + * + *

Default: {@link Hasher#noop()} + * + * @see Hasher#enabled() + * @see Hasher#noop() + */ + GapicBidiWritableByteChannelSessionBuilder setHasher(Hasher hasher) { + this.hasher = requireNonNull(hasher, "hasher must be non null"); + return this; + } + + /** + * Set the {@link ByteStringStrategy} to be used when constructing {@link + * com.google.protobuf.ByteString ByteString}s from {@link ByteBuffer}s. + * + *

Default: {@link ByteStringStrategy#copy()} + * + *

Note: usage of {@link ByteStringStrategy#noCopy()} requires that any {@link ByteBuffer} + * passed to the session's channel not be modified while {@link + * java.nio.channels.WritableByteChannel#write(ByteBuffer)} is processing. + * + * @see ByteStringStrategy#copy() + * @see ByteStringStrategy#noCopy() + */ + GapicBidiWritableByteChannelSessionBuilder setByteStringStrategy( + ByteStringStrategy byteStringStrategy) { + this.byteStringStrategy = + requireNonNull(byteStringStrategy, "byteStringStrategy must be non null"); + return this; + } + + /** + * When constructing a bidi channel session, there is always a {@link + * GapicBidiUnbufferedWritableByteChannel} at the bottom of it. This method creates a BiFunction + * which will instantiate the {@link GapicBidiUnbufferedWritableByteChannel} when provided with a + * {@code StartT} value and a {@code SettableApiFuture}. + * + *

As part of providing the function, the provided parameters {@code BidiFlusherFactory} and + * {@code f} are "bound" into the returned function. In conjunction with the configured fields of + * this class a new instance of {@link GapicBidiUnbufferedWritableByteChannel} can be constructed. + */ + private + BiFunction, UnbufferedWritableByteChannel> + bindFunction( + WriteFlushStrategy.BidiFlusherFactory flusherFactory, + Function f) { + // it is theoretically possible that the setter methods for the following variables could + // be called again between when this method is invoked and the resulting function is invoked. + // To ensure we are using the specified values at the point in time they are bound to the + // function read them into local variables which will be closed over rather than the class + // fields. + ByteStringStrategy boundStrategy = byteStringStrategy; + Hasher boundHasher = hasher; + return (start, resultFuture) -> + new GapicBidiUnbufferedWritableByteChannel<>( + resultFuture, + new ChunkSegmenter(boundHasher, boundStrategy, Values.MAX_WRITE_CHUNK_BYTES_VALUE), + f.apply(start), + flusherFactory); + } + + GapicBidiWritableByteChannelSessionBuilder.ResumableUploadBuilder resumable() { + return new GapicBidiWritableByteChannelSessionBuilder.ResumableUploadBuilder(); + } + + final class ResumableUploadBuilder { + + private RetryingDependencies deps; + private ResultRetryAlgorithm alg; + + ResumableUploadBuilder() { + this.deps = RetryingDependencies.attemptOnce(); + this.alg = Retrying.neverRetry(); + } + + ResumableUploadBuilder withRetryConfig(RetryingDependencies deps, ResultRetryAlgorithm alg) { + this.deps = requireNonNull(deps, "deps must be non null"); + this.alg = requireNonNull(alg, "alg must be non null"); + return this; + } + + /** + * Buffer using {@code byteBuffer} worth of space before attempting to flush. + * + *

The provided {@link ByteBuffer} should be aligned with GCSs block size of 256 + * KiB. + */ + BufferedResumableUploadBuilder buffered(ByteBuffer byteBuffer) { + return buffered(BufferHandle.handleOf(byteBuffer)); + } + + BufferedResumableUploadBuilder buffered(BufferHandle bufferHandle) { + return new BufferedResumableUploadBuilder(bufferHandle); + } + + final class BufferedResumableUploadBuilder { + + private final BufferHandle bufferHandle; + + private ApiFuture start; + + BufferedResumableUploadBuilder(BufferHandle bufferHandle) { + this.bufferHandle = bufferHandle; + } + + /** + * Set the Future which will contain the ResumableWrite information necessary to open the + * Write stream. + */ + BufferedResumableUploadBuilder setStartAsync(ApiFuture start) { + this.start = requireNonNull(start, "start must be non null"); + return this; + } + + BufferedWritableByteChannelSession build() { + return new BufferedWriteSession<>( + requireNonNull(start, "start must be non null"), + bindFunction( + WriteFlushStrategy.defaultBidiFlusher( + write, deps, alg, Retrying::newCallContext), + BidiResumableWrite::identity) + .andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c)) + .andThen(StorageByteChannels.writable()::createSynchronized)); + } + } + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUploadSessionBuilder.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUploadSessionBuilder.java index 7b4c6a949a..ac52021308 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUploadSessionBuilder.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUploadSessionBuilder.java @@ -18,9 +18,12 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.gax.rpc.BidiStreamingCallable; import com.google.api.gax.rpc.ClientStreamingCallable; import com.google.api.gax.rpc.UnaryCallable; import com.google.common.util.concurrent.MoreExecutors; +import com.google.storage.v2.BidiWriteObjectRequest; +import com.google.storage.v2.BidiWriteObjectResponse; import com.google.storage.v2.StartResumableWriteRequest; import com.google.storage.v2.StartResumableWriteResponse; import com.google.storage.v2.WriteObjectRequest; @@ -40,6 +43,11 @@ GapicWritableByteChannelSessionBuilder byteChannel( return new GapicWritableByteChannelSessionBuilder(write); } + GapicBidiWritableByteChannelSessionBuilder bidiByteChannel( + BidiStreamingCallable write) { + return new GapicBidiWritableByteChannelSessionBuilder(write); + } + ApiFuture resumableWrite( UnaryCallable x, WriteObjectRequest writeObjectRequest) { @@ -62,4 +70,27 @@ ApiFuture resumableWrite( (resp) -> new ResumableWrite(req, resp, f), MoreExecutors.directExecutor()); } + + ApiFuture bidiResumableWrite( + UnaryCallable x, + BidiWriteObjectRequest writeObjectRequest) { + StartResumableWriteRequest.Builder b = StartResumableWriteRequest.newBuilder(); + if (writeObjectRequest.hasWriteObjectSpec()) { + b.setWriteObjectSpec(writeObjectRequest.getWriteObjectSpec()); + } + if (writeObjectRequest.hasCommonObjectRequestParams()) { + b.setCommonObjectRequestParams(writeObjectRequest.getCommonObjectRequestParams()); + } + if (writeObjectRequest.hasObjectChecksums()) { + b.setObjectChecksums(writeObjectRequest.getObjectChecksums()); + } + StartResumableWriteRequest req = b.build(); + Function f = + uploadId -> + writeObjectRequest.toBuilder().clearWriteObjectSpec().setUploadId(uploadId).build(); + return ApiFutures.transform( + x.futureCall(req), + (resp) -> new BidiResumableWrite(req, resp, f), + MoreExecutors.directExecutor()); + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcRetryAlgorithmManager.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcRetryAlgorithmManager.java index 703cd152e5..ea481ab250 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcRetryAlgorithmManager.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcRetryAlgorithmManager.java @@ -20,6 +20,7 @@ import com.google.iam.v1.GetIamPolicyRequest; import com.google.iam.v1.SetIamPolicyRequest; import com.google.iam.v1.TestIamPermissionsRequest; +import com.google.storage.v2.BidiWriteObjectRequest; import com.google.storage.v2.ComposeObjectRequest; import com.google.storage.v2.CreateBucketRequest; import com.google.storage.v2.CreateHmacKeyRequest; @@ -201,4 +202,10 @@ public ResultRetryAlgorithm getFor(WriteObjectRequest req) { ? retryStrategy.getIdempotentHandler() : retryStrategy.getNonidempotentHandler(); } + + public ResultRetryAlgorithm getFor(BidiWriteObjectRequest req) { + return req.getWriteObjectSpec().hasIfGenerationMatch() + ? retryStrategy.getIdempotentHandler() + : retryStrategy.getNonidempotentHandler(); + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index 94ed6b33b0..53c73d4d57 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -84,6 +84,7 @@ import com.google.iam.v1.TestIamPermissionsRequest; import com.google.protobuf.ByteString; import com.google.protobuf.FieldMask; +import com.google.storage.v2.BidiWriteObjectRequest; import com.google.storage.v2.BucketAccessControl; import com.google.storage.v2.ComposeObjectRequest; import com.google.storage.v2.ComposeObjectRequest.SourceObject; @@ -1807,8 +1808,30 @@ WriteObjectRequest getWriteObjectRequest(BlobInfo info, Opts op return opts.writeObjectRequest().apply(requestBuilder).build(); } + BidiWriteObjectRequest getBidiWriteObjectRequest(BlobInfo info, Opts opts) { + Object object = codecs.blobInfo().encode(info); + Object.Builder objectBuilder = + object + .toBuilder() + // required if the data is changing + .clearChecksums() + // trimmed to shave payload size + .clearGeneration() + .clearMetageneration() + .clearSize() + .clearCreateTime() + .clearUpdateTime(); + WriteObjectSpec.Builder specBuilder = WriteObjectSpec.newBuilder().setResource(objectBuilder); + + BidiWriteObjectRequest.Builder requestBuilder = + BidiWriteObjectRequest.newBuilder().setWriteObjectSpec(specBuilder); + + return opts.bidiWriteObjectRequest().apply(requestBuilder).build(); + } + private UnbufferedReadableByteChannelSession unbufferedReadSession( BlobId blob, BlobSourceOption[] options) { + Opts opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts); ReadObjectRequest readObjectRequest = getReadObjectRequest(blob, opts); Set codes = @@ -1838,6 +1861,19 @@ ApiFuture startResumableWrite( req); } + ApiFuture startResumableWrite( + GrpcCallContext grpcCallContext, BidiWriteObjectRequest req) { + Set codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(req)); + GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext()); + return ResumableMedia.gapic() + .write() + .bidiResumableWrite( + storageClient + .startResumableWriteCallable() + .withDefaultCallContext(merge.withRetryableCodes(codes)), + req); + } + private SourceObject sourceObjectEncode(SourceBlob from) { SourceObject.Builder to = SourceObject.newBuilder(); to.setName(from.getName()); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/UnifiedOpts.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/UnifiedOpts.java index 18f79be572..dbf7bb6463 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/UnifiedOpts.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/UnifiedOpts.java @@ -37,6 +37,7 @@ import com.google.iam.v1.GetIamPolicyRequest; import com.google.protobuf.ByteString; import com.google.protobuf.FieldMask; +import com.google.storage.v2.BidiWriteObjectRequest; import com.google.storage.v2.CommonObjectRequestParams; import com.google.storage.v2.ComposeObjectRequest; import com.google.storage.v2.CreateBucketRequest; @@ -171,6 +172,10 @@ default Mapper writeObject() { return Mapper.identity(); } + default Mapper bidiWriteObject() { + return Mapper.identity(); + } + default Mapper updateObject() { return Mapper.identity(); } @@ -2266,6 +2271,10 @@ Mapper writeObjectRequest() { return fuseMappers(ObjectTargetOpt.class, ObjectTargetOpt::writeObject); } + Mapper bidiWriteObjectRequest() { + return fuseMappers(ObjectTargetOpt.class, ObjectTargetOpt::bidiWriteObject); + } + Mapper getObjectsRequest() { return fuseMappers(ObjectSourceOpt.class, ObjectSourceOpt::getObject); } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/WriteFlushStrategy.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/WriteFlushStrategy.java index 806c2b42c0..0a191c7daa 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/WriteFlushStrategy.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/WriteFlushStrategy.java @@ -20,11 +20,14 @@ import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.retrying.ResultRetryAlgorithm; import com.google.api.gax.rpc.ApiStreamObserver; +import com.google.api.gax.rpc.BidiStreamingCallable; import com.google.api.gax.rpc.ClientStreamingCallable; import com.google.cloud.storage.Conversions.Decoder; import com.google.cloud.storage.Retrying.RetryingDependencies; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.storage.v2.BidiWriteObjectRequest; +import com.google.storage.v2.BidiWriteObjectResponse; import com.google.storage.v2.WriteObjectRequest; import com.google.storage.v2.WriteObjectResponse; import java.util.List; @@ -72,6 +75,28 @@ static FlusherFactory fsyncEveryFlush( baseContextSupplier); } + /** + * Create a {@link BidiFlusher} which will keep a bidirectional stream open, flushing and sending + * the appropriate signals to GCS when the buffer is full. + */ + static BidiFlusherFactory defaultBidiFlusher( + BidiStreamingCallable write, + RetryingDependencies deps, + ResultRetryAlgorithm alg, + Supplier baseContextSupplier) { + return (String bucketName, + LongConsumer committedTotalBytesCallback, + Consumer onSuccessCallback) -> + new DefaultBidiFlusher( + write, + deps, + alg, + bucketName, + committedTotalBytesCallback, + onSuccessCallback, + baseContextSupplier); + } + /** * Create a {@link Flusher} which will "fsync" only on {@link Flusher#close(WriteObjectRequest)}. * Calls to {@link Flusher#flush(List)} will be sent but not synced. @@ -121,6 +146,27 @@ private static WriteObjectRequest possiblyPairDownRequest( return b.build(); } + private static BidiWriteObjectRequest possiblyPairDownBidiRequest( + BidiWriteObjectRequest message, boolean firstMessageOfStream) { + if (firstMessageOfStream && message.getWriteOffset() == 0) { + return message; + } + + BidiWriteObjectRequest.Builder b = message.toBuilder(); + if (!firstMessageOfStream) { + b.clearUploadId(); + } + + if (message.getWriteOffset() > 0) { + b.clearWriteObjectSpec(); + } + + if (message.getWriteOffset() > 0 && !message.getFinishWrite()) { + b.clearObjectChecksums(); + } + return b.build(); + } + @FunctionalInterface interface FlusherFactory { /** @@ -140,6 +186,25 @@ interface Flusher { void close(@Nullable WriteObjectRequest req); } + @FunctionalInterface + interface BidiFlusherFactory { + /** + * @param committedTotalBytesCallback Callback to signal the total number of bytes committed by + * this flusher. + * @param onSuccessCallback Callback to signal success, and provide the final response. + */ + BidiFlusher newFlusher( + String bucketName, + LongConsumer committedTotalBytesCallback, + Consumer onSuccessCallback); + } + + interface BidiFlusher { + void flush(@NonNull List segments); + + void close(@Nullable BidiWriteObjectRequest req); + } + private static final class FsyncEveryFlusher implements Flusher { private final ClientStreamingCallable write; @@ -199,6 +264,82 @@ public void close(@Nullable WriteObjectRequest req) { } } + public static final class DefaultBidiFlusher implements BidiFlusher { + + private final BidiStreamingCallable write; + private final RetryingDependencies deps; + private final ResultRetryAlgorithm alg; + private final String bucketName; + private final LongConsumer sizeCallback; + private final Consumer completeCallback; + private final Supplier baseContextSupplier; + private volatile ApiStreamObserver stream; + + private final BidiObserver responseObserver; + + private DefaultBidiFlusher( + BidiStreamingCallable write, + RetryingDependencies deps, + ResultRetryAlgorithm alg, + String bucketName, + LongConsumer sizeCallback, + Consumer completeCallback, + Supplier baseContextSupplier) { + this.write = write; + this.deps = deps; + this.alg = alg; + this.bucketName = bucketName; + this.sizeCallback = sizeCallback; + this.completeCallback = completeCallback; + this.baseContextSupplier = baseContextSupplier; + this.responseObserver = new BidiObserver(sizeCallback, completeCallback); + } + + public void flush(@NonNull List segments) { + ensureOpen(); + Retrying.run( + deps, + alg, + () -> { + boolean first = true; + for (BidiWriteObjectRequest message : segments) { + message = possiblyPairDownBidiRequest(message, first); + + stream.onNext(message); + first = false; + } + BidiWriteObjectRequest message = + BidiWriteObjectRequest.newBuilder().setFlush(true).setStateLookup(true).build(); + stream.onNext(message); + responseObserver.await(); + return null; + }, + Decoder.identity()); + } + + public void close(@Nullable BidiWriteObjectRequest req) { + ensureOpen(); + if (req != null) { + flush(ImmutableList.of(req)); + } + } + + private void ensureOpen() { + if (stream == null) { + synchronized (this) { + if (stream == null) { + GrpcCallContext internalContext = + contextWithBucketName(bucketName, baseContextSupplier.get()); + stream = + this.write + .withDefaultCallContext(internalContext) + .bidiStreamingCall(responseObserver); + } + } + } + } + } + private static final class FsyncOnClose implements Flusher { private final ClientStreamingCallable write; @@ -315,4 +456,66 @@ void await() { } } } + + static class BidiObserver implements ApiStreamObserver { + + private final LongConsumer sizeCallback; + private final Consumer completeCallback; + + private final SettableApiFuture invocationHandle; + private volatile BidiWriteObjectResponse last; + + BidiObserver(LongConsumer sizeCallback, Consumer completeCallback) { + this.sizeCallback = sizeCallback; + this.completeCallback = completeCallback; + this.invocationHandle = SettableApiFuture.create(); + } + + @Override + public void onNext(BidiWriteObjectResponse value) { + // incremental update + if (value.hasPersistedSize()) { + sizeCallback.accept(value.getPersistedSize()); + invocationHandle.set(null); + } else if (value.hasResource()) { + sizeCallback.accept(value.getResource().getSize()); + } + last = value; + } + + /** + * observed exceptions so far + * + *
    + *
  1. {@link com.google.api.gax.rpc.OutOfRangeException} + *
  2. {@link com.google.api.gax.rpc.AlreadyExistsException} + *
  3. {@link io.grpc.StatusRuntimeException} + *
+ */ + @Override + public void onError(Throwable t) { + invocationHandle.setException(t); + } + + @Override + public void onCompleted() { + + if (last != null && last.hasResource()) { + completeCallback.accept(last); + } + invocationHandle.set(null); + } + + void await() { + try { + invocationHandle.get(); + } catch (InterruptedException | ExecutionException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } else { + throw new RuntimeException(e); + } + } + } + } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java index bf8132b318..763346b1f8 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java @@ -102,6 +102,23 @@ public void overrideDefaultBufferSize() throws Exception { } } + @Test + public void bidiTest() throws Exception { + StorageOptions options = null; + if (transport == Transport.GRPC) { + options = + ((GrpcStorageOptions) storage.getOptions()) + .toBuilder() + .setBlobWriteSessionConfig(BlobWriteSessionConfigs.bidiWrite()) + .build(); + } + assertWithMessage("unable to resolve options").that(options).isNotNull(); + + try (Storage s = options.getService()) { + doTest(s); + } + } + @Test public void closingAnOpenedSessionWithoutCallingWriteShouldMakeAnEmptyObject() throws IOException, ExecutionException, InterruptedException, TimeoutException { diff --git a/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/Bidi.java b/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/Bidi.java new file mode 100644 index 0000000000..aa59e0b398 --- /dev/null +++ b/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/Bidi.java @@ -0,0 +1,88 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage.benchmarking; + +import static com.google.cloud.storage.benchmarking.StorageSharedBenchmarkingUtils.generateCloudMonitoringResult; + +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.BlobWriteSession; +import com.google.cloud.storage.DataGenerator; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.Storage.BlobWriteOption; +import java.io.PrintWriter; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.Callable; + +class Bidi implements Callable { + private final Storage storageClient; + private final String bucketName; + private final int objectSize; + private final PrintWriter pw; + private final String api; + private final int workers; + + Bidi( + Storage storageClient, + String bucketName, + int objectSize, + PrintWriter pw, + String api, + int workers) { + this.storageClient = storageClient; + this.bucketName = bucketName; + this.objectSize = objectSize; + this.pw = pw; + this.api = api; + this.workers = workers; + } + + @Override + public String call() throws Exception { + String blobName = DataGenerator.base64Characters().genBytes(20).toString(); + BlobWriteSession sess = + storageClient.blobWriteSession( + BlobInfo.newBuilder(bucketName, blobName).build(), BlobWriteOption.doesNotExist()); + byte[] bytes = DataGenerator.base64Characters().genBytes(objectSize); + Clock clock = Clock.systemDefaultZone(); + Instant startTime = clock.instant(); + try (WritableByteChannel w = sess.open()) { + w.write(ByteBuffer.wrap(bytes)); + } + BlobInfo created = sess.getResult().get(); + Instant endTime = clock.instant(); + Duration elapsedTimeWrite = Duration.between(startTime, endTime); + printResult("BIDI", created, elapsedTimeWrite); + StorageSharedBenchmarkingUtils.cleanupObject(storageClient, created); + return "OK"; + } + + private void printResult(String op, BlobInfo created, Duration duration) { + pw.println( + generateCloudMonitoringResult( + op, + StorageSharedBenchmarkingUtils.calculateThroughput( + created.getSize().doubleValue(), duration), + created, + api, + workers) + .formatAsCustomMetric()); + } +} diff --git a/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/StorageSharedBenchmarkingCli.java b/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/StorageSharedBenchmarkingCli.java index e6c4c0563d..836c28d01e 100644 --- a/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/StorageSharedBenchmarkingCli.java +++ b/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/StorageSharedBenchmarkingCli.java @@ -19,6 +19,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ListenableFutureToApiFuture; import com.google.api.gax.retrying.RetrySettings; +import com.google.cloud.storage.BlobWriteSessionConfigs; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import com.google.common.util.concurrent.ListenableFuture; @@ -106,6 +107,12 @@ public void run() { case "w1r3": runWorkload1(); break; + case "bidi": + runWorkloadBidi(); + break; + case "default-nobidi": + runWorkloadNoBidi(); + break; default: throw new IllegalStateException("Specify a workload to run"); } @@ -151,6 +158,36 @@ private void runWorkload1DirectPath() { } } + private void runWorkloadBidi() { + StorageOptions options = + StorageOptions.grpc() + .setProjectId(project) + .setBlobWriteSessionConfig(BlobWriteSessionConfigs.bidiWrite()) + .build(); + Storage storageClient = options.getService(); + try { + runBidi(storageClient); + } catch (Exception e) { + System.err.println("Failed to run workload bidi" + e.getMessage()); + System.exit(1); + } + } + + private void runWorkloadNoBidi() { + StorageOptions options = + StorageOptions.grpc() + .setProjectId(project) + .setBlobWriteSessionConfig(BlobWriteSessionConfigs.getDefault()) + .build(); + Storage storageClient = options.getService(); + try { + runBidi(storageClient); + } catch (Exception e) { + System.err.println("Failed to run workload no bidi" + e.getMessage()); + System.exit(1); + } + } + private void runW1R3(Storage storageClient) throws ExecutionException, InterruptedException { ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(workers)); @@ -173,6 +210,19 @@ private void runW1R3(Storage storageClient) throws ExecutionException, Interrupt } } + private void runBidi(Storage storageClient) throws ExecutionException, InterruptedException { + ListeningExecutorService executorService = + MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(workers)); + for (int i = 0; i < samples; i++) { + Range objectSizeRange = Range.of(objectSize); + int objectSize = getRandomInt(objectSizeRange.min, objectSizeRange.max); + convert( + executorService.submit( + new Bidi(storageClient, bucket, objectSize, printWriter, api, workers))) + .get(); + } + } + private void runWarmup(Storage storageClient) throws ExecutionException, InterruptedException { if (warmup <= 0) { return; diff --git a/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/StorageSharedBenchmarkingUtils.java b/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/StorageSharedBenchmarkingUtils.java index 137fee5162..524207233b 100644 --- a/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/StorageSharedBenchmarkingUtils.java +++ b/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/StorageSharedBenchmarkingUtils.java @@ -15,7 +15,7 @@ */ package com.google.cloud.storage.benchmarking; -import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; import java.time.Duration; @@ -23,7 +23,7 @@ class StorageSharedBenchmarkingUtils { public static long SSB_SIZE_THRESHOLD_BYTES = 1048576; public static int DEFAULT_NUMBER_OF_READS = 3; - public static void cleanupObject(Storage storage, Blob created) { + public static void cleanupObject(Storage storage, BlobInfo created) { storage.delete( created.getBlobId(), Storage.BlobSourceOption.generationMatch(created.getGeneration())); } @@ -36,4 +36,25 @@ public static double calculateThroughput(double size, Duration elapsedTime) { double throughput = adjustedSize / (elapsedTime.toMillis() / 1000D); return throughput; } + + public static CloudMonitoringResult generateCloudMonitoringResult( + String op, double throughput, BlobInfo created, String api, int workers) { + CloudMonitoringResult result = + CloudMonitoringResult.newBuilder() + .setLibrary("java") + .setApi(api) + .setOp(op) + .setWorkers(workers) + .setObjectSize(created.getSize().intValue()) + .setChunksize(created.getSize().intValue()) + .setCrc32cEnabled(false) + .setMd5Enabled(false) + .setCpuTimeUs(-1) + .setBucketName(created.getBucket()) + .setStatus("OK") + .setTransferSize(created.getSize().toString()) + .setThroughput(throughput) + .build(); + return result; + } } diff --git a/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/W1R3.java b/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/W1R3.java index 83b357db27..7405d9cf1d 100644 --- a/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/W1R3.java +++ b/storage-shared-benchmarking/src/main/java/com/google/cloud/storage/benchmarking/W1R3.java @@ -16,6 +16,8 @@ package com.google.cloud.storage.benchmarking; +import static com.google.cloud.storage.benchmarking.StorageSharedBenchmarkingUtils.generateCloudMonitoringResult; + import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.DataGenerator; @@ -80,7 +82,7 @@ public String call() { printResult("READ[" + i + "]", created, elapsedTimeDownload); } } - StorageSharedBenchmarkingUtils.cleanupObject(storage, created); + StorageSharedBenchmarkingUtils.cleanupObject(storage, created.asBlobInfo()); } catch (Exception e) { CloudMonitoringResult result = CloudMonitoringResult.newBuilder() @@ -110,29 +112,10 @@ private void printResult(String op, Blob created, Duration duration) { op, StorageSharedBenchmarkingUtils.calculateThroughput( created.getSize().doubleValue(), duration), - created) + created.asBlobInfo(), + api, + workers) .formatAsCustomMetric()); } } - - private CloudMonitoringResult generateCloudMonitoringResult( - String op, double throughput, Blob created) { - CloudMonitoringResult result = - CloudMonitoringResult.newBuilder() - .setLibrary("java") - .setApi(api) - .setOp(op) - .setWorkers(workers) - .setObjectSize(created.getSize().intValue()) - .setChunksize(created.getSize().intValue()) - .setCrc32cEnabled(false) - .setMd5Enabled(false) - .setCpuTimeUs(-1) - .setBucketName(created.getBucket()) - .setStatus("OK") - .setTransferSize(created.getSize().toString()) - .setThroughput(throughput) - .build(); - return result; - } }