Skip to content

Commit

Permalink
feat: add new JournalingBlobWriteSessionConfig usable with gRPC trans…
Browse files Browse the repository at this point in the history
…port

### Overview

New BlobWriteSessionConfig that will use disk to "Journal" data before transmitting to GCS.

By journaling the data to disk we can rewind to an arbitrary offset in the case of failure, while being optimistic and keeping the stream open to gcs.

General flow of data into this new WritableByteChannel:
1. #write(ByteBuffer)
2. Write contents of ByteBuffer to recovery file on disk
3. force a flush to disk
4. transmit contents of ByteBuffer to GCS leaving stream open
5. return

If the stream to gcs is interrupted with a retriable error
1. query the offset that was successfully committed to GCS
2. rewind our transmit context to the offset
3. Open the recovery file at offset from query
4. stream contents from recovery file to GCS leaving stream open
5. Once recovery file contents are transmitted to GCS return

### Benchmark results

#### Setup
1. c2d-standard-32 in us-east1
    * with 4 local nvme sdd used for the recovery files
    * default NIC, premium network tier
    * debian 11 image
2. regional bucket in us-east1

#### Workload
generate a random file with size `1GiB..2GiB` upload random file to GCS using each of the following configurations:
1. DefaultBlobWriteSessionConfig with 16MiB chunk size
2. DefaultBlobWriteSessionConfig with 64MiB chunk size
3. BufferToDiskThenUpload with the following directories as buffer locations: /mnt/ssd1:/mnt/ssd2:/mnt/ssd3:/mnt/ssd4
4. JournalingBlobWriteSessionConfig with the following directories as journal locations: /mnt/ssd1:/mnt/ssd2:/mnt/ssd3:/mnt/ssd4

Run across `{1,2,4,8,16,32}` concurrent threads to evaluate contention and horizontal scaling.

#### Collected metrics
1. Generate random file
2. record begin instant
3. create new BlobWriteSession
4. BlobWriteSession#open()
5. Copy all bytes from random file to WritableByteChannel from 4
6. close the WritableByteChannel from 4
7. record end instant
8. report `objectSize,configUsed,elapsedTimeUs,Status,ThreadCount`

#### Results summary

Throuhgput in MiB/s, grouped by ThreadCount and UploadStrategy
```
                             count     mean     std     min      50%      75%      90%      99%      max
ThreadCount UploadStrategy
1           PUT 16MiB       4341.0   79.941   8.752  21.599   80.218   85.628   90.710   99.635  106.627
            PUT 64MiB       4341.0  100.410  11.555  20.490  100.022  108.208  115.214  128.251  139.710
            BtDtU           4341.0  104.728  22.527  39.265  110.374  122.335  130.899  146.897  158.975
            journaling      4341.0  125.820  31.332  45.502  133.590  149.027  161.899  188.716  201.938
2           PUT 16MiB       4237.0   80.000   8.525  15.814   80.693   85.651   90.241   97.958  106.677
            PUT 64MiB       4237.0  101.062  11.030  55.813  101.049  108.007  115.114  127.299  135.149
            BtDtU           4237.0  104.236  21.031   5.602  109.382  120.411  128.532  143.113  162.146
            journaling      4237.0  125.010  29.761  43.207  131.827  147.362  159.425  182.441  209.000
4           PUT 16MiB       4411.0   79.708   8.357  40.691   80.600   85.567   89.586   95.533  103.506
            PUT 64MiB       4411.0  100.536   9.947  58.084  100.846  107.209  113.144  122.172  131.974
            BtDtU           4411.0  103.421  21.314  36.401  108.778  119.887  128.550  144.903  158.948
            journaling      4411.0  123.705  30.707  40.082  130.553  147.581  159.995  186.684  222.646
8           PUT 16MiB       4260.0   79.314   8.393   7.148   80.153   85.175   89.319   95.475  100.757
            PUT 64MiB       4260.0   99.913  10.438  60.685  100.450  107.144  112.551  122.409  132.130
            BtDtU           4260.0  102.472  21.228  32.552  108.226  119.072  126.700  142.831  155.628
            journaling      4260.0  122.931  30.261  42.747  130.306  146.098  158.005  184.798  203.696
16          PUT 16MiB       4473.0   77.735   8.091  24.149   78.483   83.123   87.092   95.740  106.176
            PUT 64MiB       4473.0   97.690   9.987  45.342   97.768  103.996  109.807  122.202  140.906
            BtDtU           4473.0   99.314  21.090  39.412  104.270  116.041  124.532  139.305  148.162
            journaling      4473.0  118.956  30.486  44.253  122.585  143.344  156.484  182.211  200.777
32          PUT 16MiB       4024.0   72.923   8.045  20.205   73.601   78.575   82.341   88.970  100.665
            PUT 64MiB       4024.0   93.151  10.030  20.913   93.506   99.748  105.297  116.163  128.284
            BtDtU           4024.0   89.134  18.995  35.633   91.033  103.698  112.994  131.555  146.751
            journaling      4024.0  104.557  30.965  11.785   98.794  129.923  146.747  174.618  200.303
```
  • Loading branch information
BenWhitehead committed Sep 6, 2023
1 parent 497991c commit 2ab83ea
Show file tree
Hide file tree
Showing 19 changed files with 1,263 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,115 @@
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;

/**
* Factory class to select and construct {@link BlobWriteSessionConfig}s.
*
* <p>There are several strategies which can be used to upload a {@link Blob} to Google Cloud
* Storage. This class provides factories which allow you to select the appropriate strategy for
* your workload.
*
* <table>
* <caption>Comparison of Strategies</caption>
* <tr>
* <th>Strategy</th>
* <th>Factory Method(s)</th>
* <th>Description</th>
* <th>Retry Support</th>
* <th>Transports Supported</th>
* <th>Cloud Storage API used</th>
* <th>Considerations</th>
* </tr>
* <tr>
* <td>Default (Chunk based upload)</td>
* <td>{@link #getDefault()}</td>
* <td>
* Buffer up to a configurable amount of bytes in memory, write to Cloud Storage when
* full or close. Buffer size is configurable via
* {@link DefaultBlobWriteSessionConfig#withChunkSize(int)}
* </td>
* <td>
* Each chunk is retried up to the limitations specified in
* {@link StorageOptions#getRetrySettings()}
* </td>
* <td>gRPC</td>
* <td><a href="https://cloud.google.com/storage/docs/resumable-uploads">Resumable Upload</a></td>
* <td>The network will only be used for the following operations:
* <ol>
* <li>Creating the Resumable Upload Session</li>
* <li>Transmitting zero or more incremental chunks</li>
* <li>Transmitting the final chunk and finalizing the Resumable Upload Session</li>
* <li>
* If any of the above are interrupted with a retryable error, the Resumable Upload Session
* will be queried to reconcile client side state with Cloud Storage
* </li>
* </ol>
* </td>
* </tr>
* <tr>
* <td>Buffer to disk then upload</td>
* <td>
* <ul>
* <li>{@link #bufferToDiskThenUpload(Path)}</li>
* <li>{@link #bufferToDiskThenUpload(Collection) bufferToDiskThenUpload(Collection&lt;Path>)}</li>
* <li>{@link #bufferToTempDirThenUpload()}</li>
* </ul>
* </td>
* <td>
* Buffer bytes to a temporary file on disk. On {@link WritableByteChannel#close() close()}
* upload the entire files contents to Cloud Storage. Delete the temporary file.
* </td>
* <td>
* Upload the file in the fewest number of RPC possible retrying within the limitations
* specified in {@link StorageOptions#getRetrySettings()}
* </td>
* <td>gRPC</td>
* <td><a href="https://cloud.google.com/storage/docs/resumable-uploads">Resumable Upload</a></td>
* <td>
* <ol>
* <li>A Resumable Upload Session will be used to upload the file on disk.</li>
* <li>
* If the upload is interrupted with a retryable error, the Resumable Upload Session will
* be queried to restart the upload from Cloud Storage's last received byte
* </li>
* </ol>
* </td>
* </tr>
* <tr>
* <td>Journal to disk while uploading</td>
* <td>{@link #journaling(Collection) journaling(Collection&lt;Path>)}</td>
* <td>
* Create a Resumable Upload Session, before transmitting bytes to Cloud Storage write
* to a recovery file on disk. If the stream to Cloud Storage is interrupted with a
* retryable error query the offset of the Resumable Upload Session, then open the recovery
* file from the offset and transmit the bytes to Cloud Storage.
* </td>
* <td>gRPC</td>
* <td><a href="https://cloud.google.com/storage/docs/resumable-uploads">Resumable Upload</a></td>
* <td>
* <ol>
* <li>
* The stream to Cloud Storage will be held open until a) the write is complete
* b) the stream is interrupted
* </li>
* <li>
* Because the bytes are journaled to disk, the upload to Cloud Storage can only
* be as fast as the disk.
* </li>
* <li>
* The use of <a href="https://cloud.google.com/compute/docs/disks/local-ssd#nvme">Compute
* Engine Local NVMe SSD</a> is strongly encouraged compared to Compute Engine Persistent
* Disk.
* </li>
* </ol>
* </td>
* </tr>
* </table>
*
* @see BlobWriteSessionConfig
* @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig)
* @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...)
Expand Down Expand Up @@ -56,11 +158,11 @@ public static DefaultBlobWriteSessionConfig getDefault() {
* Create a new {@link BlobWriteSessionConfig} which will first buffer the content of the object
* to a temporary file under {@code java.io.tmpdir}.
*
* <p>Once the file on disk is closed, the entire file will then be uploaded to Google Cloud
* Storage.
* <p>Once the file on disk is closed, the entire file will then be uploaded to Cloud Storage.
*
* @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...)
* @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig)
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
public static BlobWriteSessionConfig bufferToTempDirThenUpload() throws IOException {
Expand All @@ -72,11 +174,11 @@ public static BlobWriteSessionConfig bufferToTempDirThenUpload() throws IOExcept
* Create a new {@link BlobWriteSessionConfig} which will first buffer the content of the object
* to a temporary file under the specified {@code path}.
*
* <p>Once the file on disk is closed, the entire file will then be uploaded to Google Cloud
* Storage.
* <p>Once the file on disk is closed, the entire file will then be uploaded to Cloud Storage.
*
* @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...)
* @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig)
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
public static BufferToDiskThenUpload bufferToDiskThenUpload(Path path) throws IOException {
Expand All @@ -87,18 +189,34 @@ public static BufferToDiskThenUpload bufferToDiskThenUpload(Path path) throws IO
* Create a new {@link BlobWriteSessionConfig} which will first buffer the content of the object
* to a temporary file under one of the specified {@code paths}.
*
* <p>Once the file on disk is closed, the entire file will then be uploaded to Google Cloud
* Storage.
* <p>Once the file on disk is closed, the entire file will then be uploaded to Cloud Storage.
*
* <p>The specifics of how the work is spread across multiple paths is undefined and subject to
* change.
*
* @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...)
* @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig)
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
public static BufferToDiskThenUpload bufferToDiskThenUpload(Collection<Path> paths)
throws IOException {
return new BufferToDiskThenUpload(ImmutableList.copyOf(paths), false);
}

/**
* Create a new {@link BlobWriteSessionConfig} which will journal writes to a temporary file under
* one of the specified {@code paths} before transmitting the bytes to Cloud Storage.
*
* <p>The specifics of how the work is spread across multiple paths is undefined and subject to
* change.
*
* @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...)
* @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig)
* @since 2.27.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
public static JournalingBlobWriteSessionConfig journaling(Collection<Path> paths) {
return new JournalingBlobWriteSessionConfig(ImmutableList.copyOf(paths), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.storage.Conversions.Decoder;
import com.google.cloud.storage.RecoveryFileManager.RecoveryVolumeSinkFactory;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
import com.google.cloud.storage.UnifiedOpts.Opts;
Expand Down Expand Up @@ -96,8 +97,7 @@ WriterFactory createFactory(Clock clock) throws IOException {
return new Factory(recoveryFileManager, clock, gcs);
}

private RecoveryFileManager.RecoverVolumeSinkFactory getRecoverVolumeSinkFactory(
Clock clock, Duration window) {
private RecoveryVolumeSinkFactory getRecoverVolumeSinkFactory(Clock clock, Duration window) {
return path -> {
ThroughputSink windowed = ThroughputSink.windowed(ThroughputMovingWindow.of(window), clock);
if (includeLoggingSink) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package com.google.cloud.storage;

import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.function.Consumer;

/**
Expand Down Expand Up @@ -137,4 +139,19 @@ static int alignSize(int size, int alignmentMultiple) {
} // else size is already aligned
return alignedSize;
}

static int fillFrom(ByteBuffer buf, ReadableByteChannel c) throws IOException {
int total = 0;
while (buf.hasRemaining()) {
int read = c.read(buf);
if (read != -1) {
total += read;
} else if (total == 0) {
return -1;
} else {
break;
}
}
return total;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ Hasher getHasher() {
return hasher;
}

ChunkSegment[] segmentBuffer(ByteBuffer bb) {
return segmentBuffers(new ByteBuffer[] {bb}, 0, 1);
}

/**
* Given {@code bbs}, yield N segments, where each segment is at most {@code maxSegmentSize}
* bytes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public int getChunkSize() {
*
* <p><i>Default:</i> {@code 16777216 (16 MiB)}
*
* @param chunkSize The number of bytes each chunk should be. Must be >= {@code 262144 (256 KiB)}
* @param chunkSize The number of bytes each chunk should be. Must be &gt;= {@code 262144 (256
* KiB)}
* @return The new instance
* @see #getChunkSize()
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ public int write(ByteBuffer src) throws IOException {
ByteBuffer slice = src.slice();
Buffers.limit(slice, bufferRemaining);
int write = channel.write(slice);
Buffers.position(src, srcPosition + write);
int newPosition = srcPosition + write;
Buffers.position(src, newPosition);
bytesConsumed += write;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.rpc.ClientStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.storage.ChannelSession.BufferedWriteSession;
import com.google.cloud.storage.ChannelSession.UnbufferedWriteSession;
import com.google.cloud.storage.Retrying.RetryingDependencies;
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
import com.google.cloud.storage.WriteCtx.WriteObjectRequestBuilderFactory;
import com.google.cloud.storage.WriteFlushStrategy.FlusherFactory;
import com.google.storage.v2.QueryWriteStatusRequest;
import com.google.storage.v2.QueryWriteStatusResponse;
import com.google.storage.v2.ServiceConstants.Values;
import com.google.storage.v2.WriteObjectRequest;
import com.google.storage.v2.WriteObjectResponse;
Expand Down Expand Up @@ -106,6 +109,10 @@ ResumableUploadBuilder resumable() {
return new ResumableUploadBuilder();
}

JournalingResumableUploadBuilder journaling() {
return new JournalingResumableUploadBuilder();
}

/**
* When constructing any of our channel sessions, there is always a {@link
* GapicUnbufferedWritableByteChannel} at the bottom of it. This method creates a BiFunction which
Expand Down Expand Up @@ -332,4 +339,102 @@ BufferedWritableByteChannelSession<WriteObjectResponse> build() {
}
}
}

final class JournalingResumableUploadBuilder {

private RetryingDependencies deps;
private ResultRetryAlgorithm<?> alg;
private BufferHandle bufferHandle;
private BufferHandle recoveryBuffer;
private RecoveryFile recoveryFile;
private UnaryCallable<QueryWriteStatusRequest, QueryWriteStatusResponse> query;

JournalingResumableUploadBuilder() {
this.deps = RetryingDependencies.attemptOnce();
this.alg = Retrying.neverRetry();
}

JournalingResumableUploadBuilder withRetryConfig(
RetryingDependencies deps,
ResultRetryAlgorithm<?> alg,
UnaryCallable<QueryWriteStatusRequest, QueryWriteStatusResponse> query) {
this.deps = requireNonNull(deps, "deps must be non null");
this.alg = requireNonNull(alg, "alg must be non null");
this.query = requireNonNull(query, "query must be non null");
return this;
}

JournalingResumableUploadBuilder withBuffer(BufferHandle bufferHandle) {
this.bufferHandle = requireNonNull(bufferHandle, "bufferHandle must be non null");
return this;
}

JournalingResumableUploadBuilder withRecoveryBuffer(BufferHandle bufferHandle) {
this.recoveryBuffer = requireNonNull(bufferHandle, "bufferHandle must be non null");
return this;
}

JournalingResumableUploadBuilder withRecoveryFile(RecoveryFile recoveryFile) {
this.recoveryFile = requireNonNull(recoveryFile, "recoveryFile must be non null");
return this;
}

/**
* Set the Future which will contain the ResumableWrite information necessary to open the Write
* stream.
*/
BuildableJournalingResumableUploadBuilder setStartAsync(
ApiFuture<WriteCtx<ResumableWrite>> start) {
requireNonNull(start, "start must be non null");
return new BuildableJournalingResumableUploadBuilder(start);
}

final class BuildableJournalingResumableUploadBuilder {
private final ApiFuture<WriteCtx<ResumableWrite>> start;

private BuildableJournalingResumableUploadBuilder(ApiFuture<WriteCtx<ResumableWrite>> start) {
this.start = start;
}

BufferedWritableByteChannelSession<WriteObjectResponse> build() {
return new BufferedWriteSession<>(
requireNonNull(start, "start must be non null"),
bindFunction()
.andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c))
.andThen(StorageByteChannels.writable()::createSynchronized));
}

private BiFunction<
WriteCtx<ResumableWrite>,
SettableApiFuture<WriteObjectResponse>,
UnbufferedWritableByteChannel>
bindFunction() {
// it is theoretically possible that the setter methods for the following variables could
// be called again between when this method is invoked and the resulting function is
// invoked.
// To ensure we are using the specified values at the point in time they are bound to the
// function read them into local variables which will be closed over rather than the class
// fields.
RetryingDependencies deps = JournalingResumableUploadBuilder.this.deps;
ResultRetryAlgorithm<?> alg = JournalingResumableUploadBuilder.this.alg;
BufferHandle recoveryBuffer = JournalingResumableUploadBuilder.this.recoveryBuffer;
RecoveryFile recoveryFile = JournalingResumableUploadBuilder.this.recoveryFile;
UnaryCallable<QueryWriteStatusRequest, QueryWriteStatusResponse> query =
JournalingResumableUploadBuilder.this.query;
ByteStringStrategy boundStrategy = byteStringStrategy;
Hasher boundHasher = hasher;
return (writeCtx, resultFuture) ->
new SyncAndUploadUnbufferedWritableByteChannel(
write,
query,
resultFuture,
new ChunkSegmenter(boundHasher, boundStrategy, Values.MAX_WRITE_CHUNK_BYTES_VALUE),
deps,
alg,
writeCtx,
recoveryFile,
recoveryBuffer);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1768,7 +1768,7 @@ ReadObjectRequest getReadObjectRequest(BlobId blob, Opts<ObjectSourceOpt> opts)
return opts.readObjectRequest().apply(builder).build();
}

private WriteObjectRequest getWriteObjectRequest(BlobInfo info, Opts<ObjectTargetOpt> opts) {
WriteObjectRequest getWriteObjectRequest(BlobInfo info, Opts<ObjectTargetOpt> opts) {
Object object = codecs.blobInfo().encode(info);
Object.Builder objectBuilder =
object
Expand Down
Loading

0 comments on commit 2ab83ea

Please sign in to comment.