Skip to content

Commit

Permalink
Optimizations for default block size on HTBB
Browse files Browse the repository at this point in the history
  • Loading branch information
rickle-msft authored Oct 22, 2019
1 parent 9fedee1 commit b820645
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,20 @@
* #getPageBlobAsyncClient() getPageBlobAsyncClient} to construct a client that allows blob specific operations.
*
* <p>
* Please refer to the <a href=https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-block-blobs--append-blobs--and-page-blobs>Azure
* Please refer to the
* <a href=https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-block-blobs--append-blobs--and-page-blobs>Azure
* Docs</a> for more information.
*/
public class BlobAsyncClient extends BlobAsyncClientBase {
public static final int BLOB_DEFAULT_UPLOAD_BLOCK_SIZE = 4 * Constants.MB;
public static final int BLOB_DEFAULT_NUMBER_OF_BUFFERS = 8;
/**
* If a blob is known to be greater than 100MB, using a larger block size will trigger some server-side
* optimizations. If the block size is not set and the size of the blob is known to be greater than 100MB, this
* value will be used.
*/
public static final int BLOB_DEFAULT_HTBB_UPLOAD_BLOCK_SIZE = 8 * Constants.MB;
static final int BLOB_MAX_UPLOAD_BLOCK_SIZE = 100 * Constants.MB;

private final ClientLogger logger = new ClientLogger(BlobAsyncClient.class);

/**
Expand All @@ -94,6 +101,7 @@ protected BlobAsyncClient(HttpPipeline pipeline, String url, BlobServiceVersion
* Creates a new {@link BlobAsyncClient} linked to the {@code snapshot} of this blob resource.
*
* @param snapshot the identifier for a specific snapshot of this blob
*
* @return a {@link BlobAsyncClient} used to interact with the specific snapshot.
*/
@Override
Expand Down Expand Up @@ -174,6 +182,7 @@ private SpecializedBlobClientBuilder prepareBuilder() {
* {@code Flux} be replayable. In other words, it does not have to support multiple subscribers and is not expected
* to produce the same values across subscriptions.
* @param parallelTransferOptions {@link ParallelTransferOptions} used to configure buffered uploading.
*
* @return A reactive response containing the information of the uploaded block blob.
*/
public Mono<BlockBlobItem> upload(Flux<ByteBuffer> data, ParallelTransferOptions parallelTransferOptions) {
Expand All @@ -193,17 +202,17 @@ public Mono<BlockBlobItem> upload(Flux<ByteBuffer> data, ParallelTransferOptions
* see the <a href="https://docs.microsoft.com/rest/api/storageservices/put-block">Azure Docs for Put Block</a> and
* the <a href="https://docs.microsoft.com/rest/api/storageservices/put-block-list">Azure Docs for Put Block List</a>.
* <p>
* The data passed need not support multiple subscriptions/be replayable as is required in other upload methods when
* retries are enabled, and the length of the data need not be known in advance. Therefore, this method should
* The data passed need not support multiple subscriptions/be replayable as is required in other upload methods
* when retries are enabled, and the length of the data need not be known in advance. Therefore, this method should
* support uploading any arbitrary data source, including network streams. This behavior is possible because this
* method will perform some internal buffering as configured by the blockSize and numBuffers parameters, so while
* this method may offer additional convenience, it will not be as performant as other options, which should be
* preferred when possible.
* <p>
* Typically, the greater the number of buffers used, the greater the possible parallelism when transferring the
* data. Larger buffers means we will have to stage fewer blocks and therefore require fewer IO operations. The
* trade-offs between these values are context-dependent, so some experimentation may be required to optimize inputs
* for a given scenario.
* trade-offs between these values are context-dependent, so some experimentation may be required to optimize
* inputs for a given scenario.
*
* <p><strong>Code Samples</strong></p>
*
Expand Down Expand Up @@ -232,18 +241,16 @@ public Mono<Response<BlockBlobItem>> uploadWithResponse(Flux<ByteBuffer> data,
Objects.requireNonNull(data, "'data' must not be null");
BlobRequestConditions accessConditionsFinal = accessConditions == null
? new BlobRequestConditions() : accessConditions;
final ParallelTransferOptions finalParallelTransferOptions = parallelTransferOptions == null
? new ParallelTransferOptions() : parallelTransferOptions;
int blockSize = finalParallelTransferOptions.getBlockSize();
int numBuffers = finalParallelTransferOptions.getNumBuffers();
ProgressReceiver progressReceiver = finalParallelTransferOptions.getProgressReceiver();
final ParallelTransferOptions finalParallelTransferOptions = new ParallelTransferOptions();
finalParallelTransferOptions.populateAndApplyDefaults(parallelTransferOptions);

// See ProgressReporter for an explanation on why this lock is necessary and why we use AtomicLong.
AtomicLong totalProgress = new AtomicLong(0);
Lock progressLock = new ReentrantLock();

// Validation done in the constructor.
UploadBufferPool pool = new UploadBufferPool(numBuffers, blockSize);
UploadBufferPool pool = new UploadBufferPool(finalParallelTransferOptions.getNumBuffers(),
finalParallelTransferOptions.getBlockSize());

/*
Break the source Flux into chunks that are <= chunk size. This makes filling the pooled buffers much easier
Expand All @@ -254,29 +261,30 @@ as we can guarantee we only need at most two buffers for any call to write (two
Flux<ByteBuffer> chunkedSource = data
.filter(ByteBuffer::hasRemaining)
.flatMapSequential(buffer -> {
if (buffer.remaining() <= blockSize) {
if (buffer.remaining() <= finalParallelTransferOptions.getBlockSize()) {
return Flux.just(buffer);
}
int numSplits = (int) Math.ceil(buffer.remaining() / (double) blockSize);
int numSplits =
(int) Math.ceil(buffer.remaining() / (double) finalParallelTransferOptions.getBlockSize());
return Flux.range(0, numSplits)
.map(i -> {
ByteBuffer duplicate = buffer.duplicate().asReadOnlyBuffer();
duplicate.position(i * blockSize);
duplicate.limit(Math.min(duplicate.limit(), (i + 1) * blockSize));
duplicate.position(i * finalParallelTransferOptions.getBlockSize());
duplicate.limit(Math.min(duplicate.limit(),
(i + 1) * finalParallelTransferOptions.getBlockSize()));
return duplicate;
});
});

/*
Write to the pool and upload the output.
*/
/*
Write to the pool and upload the output.
*/
return chunkedSource.concatMap(pool::write)
.concatWith(Flux.defer(pool::flush))
.flatMapSequential(buffer -> {
// Report progress as necessary.
Flux<ByteBuffer> progressData = ProgressReporter.addParallelProgressReporting(Flux.just(buffer),
progressReceiver, progressLock, totalProgress);

finalParallelTransferOptions.getProgressReceiver(), progressLock, totalProgress);

final String blockId = Base64.getEncoder().encodeToString(
UUID.randomUUID().toString().getBytes(UTF_8));
Expand All @@ -296,7 +304,6 @@ as we can guarantee we only need at most two buffers for any call to write (two
} catch (RuntimeException ex) {
return monoError(logger, ex);
}

}

/**
Expand All @@ -308,6 +315,7 @@ as we can guarantee we only need at most two buffers for any call to write (two
* {@codesnippet com.azure.storage.blob.BlobAsyncClient.uploadFromFile#String}
*
* @param filePath Path to the upload file
*
* @return An empty response
*/
public Mono<Void> uploadFromFile(String filePath) {
Expand All @@ -334,6 +342,7 @@ public Mono<Void> uploadFromFile(String filePath) {
* @param tier {@link AccessTier} for the destination blob.
* @param accessConditions {@link BlobRequestConditions}
* @return An empty response
*
* @throws IllegalArgumentException If {@code blockSize} is less than 0 or greater than 100MB
* @throws UncheckedIOException If an I/O error occurs
*/
Expand All @@ -342,10 +351,8 @@ public Mono<Void> uploadFromFile(String filePath, ParallelTransferOptions parall
BlobHttpHeaders headers, Map<String, String> metadata, AccessTier tier,
BlobRequestConditions accessConditions) {
try {
final ParallelTransferOptions finalParallelTransferOptions = parallelTransferOptions == null
? new ParallelTransferOptions()
: parallelTransferOptions;
ProgressReceiver progressReceiver = finalParallelTransferOptions.getProgressReceiver();
ParallelTransferOptions finalParallelTransferOptions = new ParallelTransferOptions();
finalParallelTransferOptions.populateAndApplyDefaults(parallelTransferOptions);

// See ProgressReporter for an explanation on why this lock is necessary and why we use AtomicLong.
AtomicLong totalProgress = new AtomicLong(0);
Expand All @@ -354,14 +361,15 @@ public Mono<Void> uploadFromFile(String filePath, ParallelTransferOptions parall
return Mono.using(() -> uploadFileResourceSupplier(filePath),
channel -> {
final SortedMap<Long, String> blockIds = new TreeMap<>();
return Flux.fromIterable(sliceFile(filePath, finalParallelTransferOptions.getBlockSize()))
return Flux.fromIterable(sliceFile(filePath, finalParallelTransferOptions.getBlockSize(),
parallelTransferOptions == null || parallelTransferOptions.getBlockSize() == null))
.doOnNext(chunk -> blockIds.put(chunk.getOffset(), getBlockID()))
.flatMap(chunk -> {
String blockId = blockIds.get(chunk.getOffset());

Flux<ByteBuffer> progressData = ProgressReporter.addParallelProgressReporting(
FluxUtil.readFile(channel, chunk.getOffset(), chunk.getCount()),
progressReceiver, progressLock, totalProgress);
finalParallelTransferOptions.getProgressReceiver(), progressLock, totalProgress);

return getBlockBlobAsyncClient()
.stageBlockWithResponse(blockId, progressData, chunk.getCount(), null);
Expand All @@ -386,7 +394,9 @@ public Mono<Void> uploadFromFile(String filePath, ParallelTransferOptions parall
* Resource Supplier for UploadFile
*
* @param filePath The path for the file
*
* @return {@code AsynchronousFileChannel}
*
* @throws UncheckedIOException an input output exception.
*/
protected AsynchronousFileChannel uploadFileResourceSupplier(String filePath) {
Expand All @@ -409,9 +419,12 @@ private String getBlockID() {
return Base64.getEncoder().encodeToString(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));
}

private List<BlobRange> sliceFile(String path, int blockSize) {
private List<BlobRange> sliceFile(String path, int blockSize, boolean enableHtbbOptimizations) {
File file = new File(path);
assert file.exists();
if (file.length() > 100 * Constants.MB && enableHtbbOptimizations) {
blockSize = BLOB_DEFAULT_HTBB_UPLOAD_BLOCK_SIZE;
}
List<BlobRange> ranges = new ArrayList<>();
for (long pos = 0; pos < file.length(); pos += blockSize) {
long count = blockSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,48 +5,44 @@

import com.azure.core.annotation.Fluent;
import com.azure.storage.blob.ProgressReceiver;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import com.azure.storage.common.implementation.Constants;
import com.azure.storage.common.implementation.StorageImplUtils;

import static com.azure.storage.blob.BlobAsyncClient.BLOB_DEFAULT_NUMBER_OF_BUFFERS;
import static com.azure.storage.blob.BlobAsyncClient.BLOB_DEFAULT_UPLOAD_BLOCK_SIZE;

/**
* This class contains configuration used to parallelize data transfer operations.
*/
@Fluent
public class ParallelTransferOptions {

private static final int BLOB_DEFAULT_UPLOAD_BLOCK_SIZE = 4 * Constants.MB;
private static final int BLOB_MAX_BLOCK_SIZE = 100 * Constants.MB;
public final class ParallelTransferOptions {

private static final int BLOB_DEFAULT_NUMBER_OF_PARALLEL_TRANSFERS = 8;
private static final int BLOB_MAX_UPLOAD_BLOCK_SIZE = 100 * Constants.MB;

private int blockSize;
private int numBuffers;
private Integer blockSize;
private Integer numBuffers;
private ProgressReceiver progressReceiver;

/**
* Creates a new {@link ParallelTransferOptions} with default parameters applied.
* blockSize = 4MB
* numBuffers = 8
*/
public ParallelTransferOptions() {
this.blockSize = BLOB_DEFAULT_UPLOAD_BLOCK_SIZE;
this.numBuffers = BLOB_DEFAULT_NUMBER_OF_PARALLEL_TRANSFERS;
this.progressReceiver = null;
}

/**
* Gets the block size (chunk size) to transfer at a time.
* @return The block size.
*/
public int getBlockSize() {
public Integer getBlockSize() {
return this.blockSize;
}

/**
* Gets the number of buffers being used for a transfer operation.
* @return The number of buffers.
*/
public int getNumBuffers() {
public Integer getNumBuffers() {
return this.numBuffers;
}

Expand All @@ -68,30 +64,56 @@ public ProgressReceiver getProgressReceiver() {
* @return The updated ParallelTransferOptions object.
* @throws IllegalArgumentException when block size is less than 0 or greater than max blob block size (10MB).
*/
public ParallelTransferOptions setBlockSize(int blockSize) {
StorageImplUtils.assertInBounds("blockSize", blockSize, 0, BLOB_MAX_BLOCK_SIZE);
public ParallelTransferOptions setBlockSize(Integer blockSize) {
if (blockSize != null) {
StorageImplUtils.assertInBounds("blockSize", blockSize, 0, BlockBlobAsyncClient.MAX_STAGE_BLOCK_BYTES);
}
this.blockSize = blockSize;
return this;
}

/**
* Sets the number of buffers being used for an upload/download operation.
* @param numBuffers The number of buffers.
* For upload, The number of buffers is the maximum number of buffers this method should allocate.
* For buffered upload only, the number of buffers is the maximum number of buffers this method should allocate.
* Must be at least two. Typically, the larger the number of buffers, the more parallel, and thus faster, the
* upload portion of this operation will be. The amount of memory consumed by this method may be up to
* blockSize * numBuffers.
* @return The updated ParallelTransferOptions object.
* @throws IllegalArgumentException when numBuffers is less than 2.
*/
public ParallelTransferOptions setNumBuffers(int numBuffers) {
StorageImplUtils.assertInBounds("numBuffers", numBuffers, 2, Integer.MAX_VALUE);
public ParallelTransferOptions setNumBuffers(Integer numBuffers) {
if (numBuffers != null) {
StorageImplUtils.assertInBounds("numBuffers", numBuffers, 2, Integer.MAX_VALUE);
}
this.numBuffers = numBuffers;
return this;
}

/**
* Sets the progress receiver for parallel reporting.
* @param progressReceiver The progress receiver.
* @return The updated ParallelTransferOptions object.
*/
public ParallelTransferOptions setProgressReceiver(ProgressReceiver progressReceiver) {
this.progressReceiver = progressReceiver;
return this;
}

/**
* RESERVED FOR INTERNAL USE.
*
* @param other The customer provided transfer options. If it has non-null values, they will be used, otherwise
* defaults will be set.
*/
public void populateAndApplyDefaults(ParallelTransferOptions other) {
if (other == null) {
other = new ParallelTransferOptions();
}
this.setBlockSize(other.getBlockSize() == null
? Integer.valueOf(BLOB_DEFAULT_UPLOAD_BLOCK_SIZE) : other.getBlockSize());
this.setNumBuffers(other.getNumBuffers() == null
? Integer.valueOf(BLOB_DEFAULT_NUMBER_OF_BUFFERS) : other.getNumBuffers());
this.setProgressReceiver(other.getProgressReceiver());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.storage.blob.specialized;

import static com.azure.core.implementation.util.FluxUtil.withContext;
import com.azure.core.http.HttpPipeline;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.RequestConditions;
Expand Down Expand Up @@ -65,7 +66,6 @@

import static com.azure.core.implementation.util.FluxUtil.fluxError;
import static com.azure.core.implementation.util.FluxUtil.monoError;
import static com.azure.core.implementation.util.FluxUtil.withContext;

/**
* This class provides a client that contains all operations that apply to any blob type.
Expand Down Expand Up @@ -747,11 +747,9 @@ public Mono<Response<BlobProperties>> downloadToFileWithResponse(String filePath
// TODO (gapra) : Investigate if this is can be parallelized, and include the parallelTransfers parameter.
Mono<Response<BlobProperties>> downloadToFileWithResponse(String filePath, BlobRange range,
ParallelTransferOptions parallelTransferOptions, ReliableDownloadOptions options,
BlobRequestConditions accessConditions, boolean rangeGetContentMd5, Context context) {
final ParallelTransferOptions finalParallelTransferOptions = parallelTransferOptions == null
? new ParallelTransferOptions()
: parallelTransferOptions;
ProgressReceiver progressReceiver = finalParallelTransferOptions.getProgressReceiver();
BlobRequestConditions accessConditions, boolean rangeGetContentMD5, Context context) {
ParallelTransferOptions finalParallelTransferOptions = new ParallelTransferOptions();
finalParallelTransferOptions.populateAndApplyDefaults(parallelTransferOptions);

// See ProgressReporter for an explanation on why this lock is necessary and why we use AtomicLong.
AtomicLong totalProgress = new AtomicLong(0);
Expand All @@ -760,9 +758,9 @@ Mono<Response<BlobProperties>> downloadToFileWithResponse(String filePath, BlobR
return Mono.using(() -> downloadToFileResourceSupplier(filePath),
channel -> getPropertiesWithResponse(accessConditions)
.flatMap(response -> processInRange(channel, response,
range, finalParallelTransferOptions.getBlockSize(), options, accessConditions, rangeGetContentMd5,
context, totalProgress, progressLock, progressReceiver)), this::downloadToFileCleanup);

range, finalParallelTransferOptions.getBlockSize(), options, accessConditions, rangeGetContentMD5,
context, totalProgress, progressLock, finalParallelTransferOptions.getProgressReceiver())),
this::downloadToFileCleanup);
}

private Mono<Response<BlobProperties>> processInRange(AsynchronousFileChannel channel,
Expand Down
Loading

0 comments on commit b820645

Please sign in to comment.