Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for file share reliable download #22504

Merged
merged 12 commits into from
Jul 1, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -671,20 +671,6 @@ class APISpec extends StorageSpec {
}
}

class MockRetryRangeResponsePolicy implements HttpPipelinePolicy {
@Override
Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
return next.process().flatMap { HttpResponse response ->
if (response.getRequest().getHeaders().getValue("x-ms-range") != "bytes=2-6") {
return Mono.<HttpResponse> error(new IllegalArgumentException("The range header was not set correctly on retry."))
} else {
// ETag can be a dummy value. It's not validated, but DownloadResponse requires one
return Mono.<HttpResponse> just(new MockDownloadHttpResponse(response, 206, Flux.error(new IOException())))
}
}
}
}

/**
* Injects one retry-able IOException failure per url.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import com.azure.storage.common.test.shared.extensions.LiveOnly
import com.azure.storage.common.test.shared.extensions.PlaybackOnly
import com.azure.storage.common.test.shared.extensions.RequiredServiceVersion
import com.azure.storage.common.test.shared.policy.MockFailureResponsePolicy
import com.azure.storage.common.test.shared.policy.MockRetryRangeResponsePolicy
import reactor.core.Exceptions
import reactor.core.publisher.Flux
import reactor.core.publisher.Hooks
Expand Down Expand Up @@ -377,7 +378,7 @@ class BlobAPITest extends APISpec {
constructed in BlobClient.download().
*/
setup:
def bu2 = getBlobClient(env.primaryAccount.credential, bc.getBlobUrl(), new MockRetryRangeResponsePolicy())
def bu2 = getBlobClient(env.primaryAccount.credential, bc.getBlobUrl(), new MockRetryRangeResponsePolicy("bytes=2-6"))

when:
def range = new BlobRange(2, 5L)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.storage.common.test.shared.policy;

import com.azure.core.http.HttpPipelineCallContext;
import com.azure.core.http.HttpPipelineNextPolicy;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.policy.HttpPipelinePolicy;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.IOException;

public class MockRetryRangeResponsePolicy implements HttpPipelinePolicy {

private final String rangeMatch;

public MockRetryRangeResponsePolicy(String rangeMatch) {
this.rangeMatch = rangeMatch;
}

@Override
public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
return next.process().flatMap(response -> {
if (!response.getRequest().getHeaders().getValue("x-ms-range").equals(rangeMatch)) {
return Mono.error(new IllegalArgumentException("The range header was not set correctly on retry."));
} else {
// ETag can be a dummy value. It's not validated, but DownloadResponse requires one
return Mono.just(new MockDownloadHttpResponse(response, 206, Flux.error(new IOException())));
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -485,20 +485,6 @@ class APISpec extends StorageSpec {
}
}

class MockRetryRangeResponsePolicy implements HttpPipelinePolicy {
@Override
Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
return next.process().flatMap { HttpResponse response ->
if (response.getRequest().getHeaders().getValue("x-ms-range") != "bytes=2-6") {
return Mono.<HttpResponse> error(new IllegalArgumentException("The range header was not set correctly on retry."))
} else {
// ETag can be a dummy value. It's not validated, but DownloadResponse requires one
return Mono.<HttpResponse> just(new MockDownloadHttpResponse(response, 206, Flux.error(new IOException())))
}
}
}
}

def getMockRequest() {
HttpHeaders headers = new HttpHeaders()
headers.put(Constants.HeaderConstants.CONTENT_ENCODING, "en-US")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import com.azure.storage.blob.models.BlockListType
import com.azure.storage.common.test.shared.extensions.LiveOnly
import com.azure.storage.common.test.shared.extensions.RequiredServiceVersion
import com.azure.storage.common.test.shared.policy.MockFailureResponsePolicy
import com.azure.storage.common.test.shared.policy.MockRetryRangeResponsePolicy
import com.azure.storage.file.datalake.models.DownloadRetryOptions
import com.azure.storage.file.datalake.models.AccessTier
import com.azure.storage.file.datalake.models.DataLakeRequestConditions
Expand Down Expand Up @@ -979,7 +980,7 @@ class FileAPITest extends APISpec {
constructed in BlobClient.download().
*/
setup:
def fileClient = getFileClient(env.dataLakeAccount.credential, fc.getPathUrl(), new MockRetryRangeResponsePolicy())
def fileClient = getFileClient(env.dataLakeAccount.credential, fc.getPathUrl(), new MockRetryRangeResponsePolicy("bytes=2-6"))

fc.append(new ByteArrayInputStream(data.defaultBytes), 0, data.defaultDataSize)
fc.flush(data.defaultDataSize)
Expand Down
1 change: 1 addition & 0 deletions sdk/storage/azure-storage-file-share/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Release History

## 12.11.0-beta.1 (Unreleased)
- Added support to reliably download a file.
- Added support for the 2020-10-02 service version.

## 12.10.0 (2021-06-09)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.azure.core.http.rest.PagedResponseBase;
import com.azure.core.http.rest.Response;
import com.azure.core.http.rest.SimpleResponse;
import com.azure.core.http.rest.StreamResponse;
import com.azure.core.util.Context;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.FluxUtil;
Expand Down Expand Up @@ -48,6 +49,7 @@
import com.azure.storage.file.share.implementation.util.ShareSasImplUtil;
import com.azure.storage.file.share.models.CloseHandlesInfo;
import com.azure.storage.file.share.models.CopyStatusType;
import com.azure.storage.file.share.models.DownloadRetryOptions;
import com.azure.storage.file.share.models.HandleItem;
import com.azure.storage.file.share.models.LeaseDurationType;
import com.azure.storage.file.share.models.LeaseStateType;
Expand All @@ -58,6 +60,7 @@
import com.azure.storage.file.share.models.ShareErrorCode;
import com.azure.storage.file.share.models.ShareFileCopyInfo;
import com.azure.storage.file.share.models.ShareFileDownloadAsyncResponse;
import com.azure.storage.file.share.models.ShareFileDownloadHeaders;
import com.azure.storage.file.share.models.ShareFileHttpHeaders;
import com.azure.storage.file.share.models.ShareFileInfo;
import com.azure.storage.file.share.models.ShareFileMetadataInfo;
Expand All @@ -70,6 +73,7 @@
import com.azure.storage.file.share.models.ShareFileUploadRangeFromUrlInfo;
import com.azure.storage.file.share.models.ShareRequestConditions;
import com.azure.storage.file.share.models.ShareStorageException;
import com.azure.storage.file.share.options.ShareFileDownloadOptions;
import com.azure.storage.file.share.options.ShareFileListRangesDiffOptions;
import com.azure.storage.file.share.sas.ShareServiceSasSignatureValues;
import reactor.core.Exceptions;
Expand All @@ -93,6 +97,7 @@
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -134,6 +139,7 @@ public class ShareFileAsyncClient {
static final long FILE_DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024L;
static final long FILE_MAX_PUT_RANGE_SIZE = 4 * Constants.MB;
private static final long DOWNLOAD_UPLOAD_CHUNK_TIMEOUT = 300;
private static final Duration TIMEOUT_VALUE = Duration.ofSeconds(60);

private final AzureFileStorageImpl azureFileStorageClient;
private final String shareName;
Expand Down Expand Up @@ -734,7 +740,8 @@ private Mono<Response<ShareFileProperties>> downloadResponseInChunk(Response<Sha
}
return chunks;
}).flatMapMany(Flux::fromIterable).flatMap(chunk ->
downloadWithResponse(chunk, false, requestConditions, context)
downloadWithResponse(new ShareFileDownloadOptions().setRange(chunk).setRangeContentMd5(false)
.setRequestConditions(requestConditions), context)
.map(ShareFileDownloadAsyncResponse::getValue)
.subscribeOn(Schedulers.elastic())
.flatMap(fbb -> FluxUtil
Expand Down Expand Up @@ -778,8 +785,7 @@ private void channelCleanUp(AsynchronousFileChannel channel) {
*/
public Flux<ByteBuffer> download() {
try {
return downloadWithResponse(null, null).flatMapMany(
ShareFileDownloadAsyncResponse::getValue);
return downloadWithResponse(null).flatMapMany(ShareFileDownloadAsyncResponse::getValue);
} catch (RuntimeException ex) {
return fluxError(logger, ex);
}
Expand Down Expand Up @@ -826,25 +832,111 @@ public Mono<ShareFileDownloadAsyncResponse> downloadWithResponse(ShareFileRange
*/
public Mono<ShareFileDownloadAsyncResponse> downloadWithResponse(ShareFileRange range, Boolean rangeGetContentMD5,
ShareRequestConditions requestConditions) {
return downloadWithResponse(new ShareFileDownloadOptions().setRange(range)
.setRangeContentMd5(rangeGetContentMD5).setRequestConditions(requestConditions));
}

/**
* Downloads a file from the system, including its metadata and properties
*
* <p><strong>Code Samples</strong></p>
*
* <p>Download the file from 1024 to 2048 bytes with its metadata and properties and without the contentMD5. </p>
*
* {@codesnippet com.azure.storage.file.share.ShareFileAsyncClient.downloadWithResponse#ShareFileDownloadOptions}
*
* <p>For more information, see the
* <a href="https://docs.microsoft.com/rest/api/storageservices/get-file">Azure Docs</a>.</p>
*
* @param options {@link ShareFileDownloadOptions}
* true, as long as the range is less than or equal to 4 MB in size.
* @return A reactive response containing response data and the file data.
*/
public Mono<ShareFileDownloadAsyncResponse> downloadWithResponse(ShareFileDownloadOptions options) {
try {
return withContext(context -> downloadWithResponse(range, rangeGetContentMD5,
requestConditions, context));
return withContext(context -> downloadWithResponse(options, context));
} catch (RuntimeException ex) {
return monoError(logger, ex);
}
}

Mono<ShareFileDownloadAsyncResponse> downloadWithResponse(ShareFileRange range, Boolean rangeGetContentMD5,
Mono<ShareFileDownloadAsyncResponse> downloadWithResponse(ShareFileDownloadOptions options, Context context) {
options = options == null ? new ShareFileDownloadOptions() : options;
ShareFileRange range = options.getRange() == null ? new ShareFileRange(0) : options.getRange();
ShareRequestConditions requestConditions = options.getRequestConditions() == null
? new ShareRequestConditions() : options.getRequestConditions();
DownloadRetryOptions retryOptions = options.getRetryOptions() == null ? new DownloadRetryOptions()
: options.getRetryOptions();
Boolean getRangeContentMd5 = options.getRangeContentMd5();

return downloadRange(range, getRangeContentMd5, requestConditions, context)
.map(response -> {
String eTag = ModelHelper.getETag(response.getHeaders());
ShareFileDownloadHeaders headers = ModelHelper.transformFileDownloadHeaders(response.getHeaders());

long finalEnd;
if (range.getEnd() == null) {
finalEnd = headers.getContentRange() == null ? headers.getContentLength()
: Long.parseLong(headers.getContentRange().split("/")[1]);
} else {
finalEnd = range.getEnd();
}

Flux<ByteBuffer> bufferFlux = FluxUtil.createRetriableDownloadFlux(
() -> response.getValue().timeout(TIMEOUT_VALUE),
(throwable, offset) -> {
if (!(throwable instanceof IOException || throwable instanceof TimeoutException)) {
return Flux.error(throwable);
}

long newCount = finalEnd - (offset - range.getStart());

/*
It is possible that the network stream will throw an error after emitting all data but before
completing. Issuing a retry at this stage would leave the download in a bad state with incorrect count
and offset values. Because we have read the intended amount of data, we can ignore the error at the end
of the stream.
*/
if (newCount == 0) {
logger.warning("Exception encountered in ReliableDownload after all data read from the network but "
+ "but before stream signaled completion. Returning success as all data was downloaded. "
+ "Exception message: " + throwable.getMessage());
return Flux.empty();
}

try {
return downloadRange(
new ShareFileRange(offset, range.getEnd()), getRangeContentMd5,
requestConditions, context).flatMapMany(r -> {
String receivedETag = ModelHelper.getETag(r.getHeaders());
if ((eTag == null && receivedETag == null)
|| (eTag != null && eTag.equals(receivedETag))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we assume etag isn't null? What would be scenario it is null ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

return r.getValue().timeout(TIMEOUT_VALUE);
} else {
return Flux.<ByteBuffer>error(
new ConcurrentModificationException(String.format("File has been modified "
+ "concurrently. Expected eTag: %s, Received eTag: %s", eTag,
receivedETag)));
}
});
} catch (Exception e) {
return Flux.error(e);
}
},
retryOptions.getMaxRetryRequests(),
range.getStart()
).switchIfEmpty(Flux.just(ByteBuffer.wrap(new byte[0])));

return new ShareFileDownloadAsyncResponse(response.getRequest(), response.getStatusCode(),
response.getHeaders(), bufferFlux, headers);
});
}

private Mono<StreamResponse> downloadRange(ShareFileRange range, Boolean rangeGetContentMD5,
ShareRequestConditions requestConditions, Context context) {
requestConditions = requestConditions == null ? new ShareRequestConditions() : requestConditions;
String rangeString = range == null ? null : range.toString();

return azureFileStorageClient.getFiles()
.downloadWithResponseAsync(shareName, filePath, null, rangeString, rangeGetContentMD5,
requestConditions.getLeaseId(), context)
.map(response -> new ShareFileDownloadAsyncResponse(response.getRequest(), response.getStatusCode(),
response.getHeaders(), response.getValue(),
ModelHelper.transformFileDownloadHeaders(response.getHeaders())));
return azureFileStorageClient.getFiles().downloadWithResponseAsync(shareName, filePath, null,
rangeString, rangeGetContentMD5, requestConditions.getLeaseId(), context);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.azure.storage.file.share.models.ShareFileUploadRangeOptions;
import com.azure.storage.file.share.models.ShareRequestConditions;
import com.azure.storage.file.share.models.ShareStorageException;
import com.azure.storage.file.share.options.ShareFileDownloadOptions;
import com.azure.storage.file.share.options.ShareFileListRangesDiffOptions;
import com.azure.storage.file.share.sas.ShareServiceSasSignatureValues;
import reactor.core.Exceptions;
Expand Down Expand Up @@ -561,10 +562,36 @@ public ShareFileDownloadResponse downloadWithResponse(OutputStream stream, Share
*/
public ShareFileDownloadResponse downloadWithResponse(OutputStream stream, ShareFileRange range,
Boolean rangeGetContentMD5, ShareRequestConditions requestConditions, Duration timeout, Context context) {
return downloadWithResponse(stream, new ShareFileDownloadOptions().setRange(range)
.setRangeContentMd5(rangeGetContentMD5).setRequestConditions(requestConditions), timeout, context);
}

/**
* Downloads a file from the system, including its metadata and properties
*
* <p><strong>Code Samples</strong></p>
*
* <p>Download the file from 1024 to 2048 bytes with its metadata and properties and without the contentMD5. </p>
*
* {@codesnippet com.azure.storage.file.share.ShareFileClient.downloadWithResponse#OutputStream-ShareFileDownloadOptions-Duration-Context}
*
* <p>For more information, see the
* <a href="https://docs.microsoft.com/rest/api/storageservices/get-file">Azure Docs</a>.</p>
*
* @param stream A non-null {@link OutputStream} where the downloaded data will be written.
* @param options {@link ShareFileDownloadOptions}
* @param timeout An optional timeout applied to the operation. If a response is not returned before the timeout
* concludes a {@link RuntimeException} will be thrown.
* @param context Additional context that is passed through the Http pipeline during the service call.
* @return A response containing the headers and response status code
* @throws NullPointerException If {@code stream} is {@code null}.
* @throws RuntimeException if the operation doesn't complete before the timeout concludes.
*/
public ShareFileDownloadResponse downloadWithResponse(OutputStream stream, ShareFileDownloadOptions options,
Duration timeout, Context context) {
Objects.requireNonNull(stream, "'stream' cannot be null.");

Mono<ShareFileDownloadResponse> download = shareFileAsyncClient.downloadWithResponse(range, rangeGetContentMD5,
requestConditions, context)
Mono<ShareFileDownloadResponse> download = shareFileAsyncClient.downloadWithResponse(options, context)
.flatMap(response -> response.getValue().reduce(stream, (outputStream, buffer) -> {
try {
outputStream.write(FluxUtil.byteBufferToArray(buffer));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,13 @@ public static ShareFileDownloadHeaders transformFileDownloadHeaders(HttpHeaders
}
}

public static String getETag(HttpHeaders headers) {
if (headers == null) {
return null;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we assume headers aren't null ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that seems like a fair assumption. Will remove.

return headers.getValue("ETag");
}

public static ShareFileItemProperties transformFileProperty(FileProperty property) {
if (property == null) {
return null;
Expand Down
Loading