Skip to content

Commit

Permalink
Download to File Investigation (#34907)
Browse files Browse the repository at this point in the history
Download to File Investigation
  • Loading branch information
alzimmermsft authored and ibrahimrabab committed May 16, 2023
1 parent 80e46f0 commit a05f337
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.azure.core.http.rest.StreamResponse;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.ProgressReporter;
import com.azure.core.util.io.IOUtils;
import com.azure.storage.blob.implementation.accesshelpers.BlobDownloadAsyncResponseConstructorProxy;
import com.azure.storage.blob.implementation.models.BlobsDownloadHeaders;
import com.azure.storage.blob.implementation.util.ModelHelper;
Expand All @@ -35,8 +34,8 @@ public final class BlobDownloadAsyncResponse extends ResponseBase<BlobDownloadHe
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);

private final StreamResponse sourceResponse;
private final BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume;
private final DownloadRetryOptions retryOptions;
//private final BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume;
//private final DownloadRetryOptions retryOptions;

/**
* Constructs a {@link BlobDownloadAsyncResponse}.
Expand All @@ -51,8 +50,8 @@ public BlobDownloadAsyncResponse(HttpRequest request, int statusCode, HttpHeader
BlobDownloadHeaders deserializedHeaders) {
super(request, statusCode, headers, value, deserializedHeaders);
this.sourceResponse = null;
this.onErrorResume = null;
this.retryOptions = null;
//this.onErrorResume = null;
//this.retryOptions = null;
}

/**
Expand All @@ -67,8 +66,8 @@ public BlobDownloadAsyncResponse(HttpRequest request, int statusCode, HttpHeader
super(sourceResponse.getRequest(), sourceResponse.getStatusCode(), sourceResponse.getHeaders(),
createResponseFlux(sourceResponse, onErrorResume, retryOptions), extractHeaders(sourceResponse));
this.sourceResponse = Objects.requireNonNull(sourceResponse, "'sourceResponse' must not be null");
this.onErrorResume = Objects.requireNonNull(onErrorResume, "'onErrorResume' must not be null");
this.retryOptions = Objects.requireNonNull(retryOptions, "'retryOptions' must not be null");
//this.onErrorResume = Objects.requireNonNull(onErrorResume, "'onErrorResume' must not be null");
//this.retryOptions = Objects.requireNonNull(retryOptions, "'retryOptions' must not be null");
}

private static BlobDownloadHeaders extractHeaders(StreamResponse response) {
Expand All @@ -93,10 +92,14 @@ private static Flux<ByteBuffer> createResponseFlux(StreamResponse sourceResponse
*/
public Mono<Void> writeValueToAsync(AsynchronousByteChannel channel, ProgressReporter progressReporter) {
Objects.requireNonNull(channel, "'channel' must not be null");
if (sourceResponse != null) {
return IOUtils.transferStreamResponseToAsynchronousByteChannel(channel, sourceResponse, onErrorResume,
progressReporter, retryOptions.getMaxRetryRequests());
} else if (super.getValue() != null) {
// Retaining commented out code as there will be a fix within the Core libraries, specifically HTTP Netty,
// where this performance enhancement will be re-enabled again in the future.
// if (sourceResponse != null) {
// return IOUtils.transferStreamResponseToAsynchronousByteChannel(channel, sourceResponse, onErrorResume,
// progressReporter, retryOptions.getMaxRetryRequests());
// }

if (super.getValue() != null) {
return FluxUtil.writeToAsynchronousByteChannel(
FluxUtil.addProgressReporting(super.getValue(), progressReporter), channel);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -126,9 +128,11 @@
* refer to the {@link BlockBlobClient}, {@link PageBlobClient}, or {@link AppendBlobClient} for upload options.
*/
public class BlobAsyncClientBase {

private static final ClientLogger LOGGER = new ClientLogger(BlobAsyncClientBase.class);

private static final Set<OpenOption> DEFAULT_OPEN_OPTIONS_SET = Collections.unmodifiableSet(new HashSet<>(
Arrays.asList(StandardOpenOption.CREATE_NEW, StandardOpenOption.READ, StandardOpenOption.WRITE)));

/**
* Backing REST client for the blob client.
*/
Expand Down Expand Up @@ -1533,10 +1537,7 @@ Mono<Response<BlobProperties>> downloadToFileWithResponse(BlobDownloadToFileOpti
// Default behavior is not to overwrite
Set<OpenOption> openOptions = options.getOpenOptions();
if (openOptions == null) {
openOptions = new HashSet<>();
openOptions.add(StandardOpenOption.CREATE_NEW);
openOptions.add(StandardOpenOption.WRITE);
openOptions.add(StandardOpenOption.READ);
openOptions = DEFAULT_OPEN_OPTIONS_SET;
}

AsynchronousFileChannel channel = downloadToFileResourceSupplier(options.getFilePath(), openOptions);
Expand Down Expand Up @@ -1587,8 +1588,8 @@ private Mono<Response<BlobProperties>> downloadToFileImpl(AsynchronousFileChanne
.flatMap(chunkNum -> ChunkedDownloadUtils.downloadChunk(chunkNum, initialResponse,
finalRange, finalParallelTransferOptions, finalConditions, newCount, downloadFunc,
response -> writeBodyToFile(response, file, chunkNum, finalParallelTransferOptions,
progressReporter == null ? null : progressReporter.createChild()
).flux()), finalParallelTransferOptions.getMaxConcurrency())
progressReporter == null ? null : progressReporter.createChild()).flux()),
finalParallelTransferOptions.getMaxConcurrency())

// Only the first download call returns a value.
.then(Mono.just(ModelHelper.buildBlobPropertiesResponse(initialResponse)));
Expand Down

0 comments on commit a05f337

Please sign in to comment.