Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented Immutable Storage with Versioning #21718

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.azure.storage.blob.implementation.util.ModelHelper;
import com.azure.storage.blob.models.AccessTier;
import com.azure.storage.blob.models.BlobHttpHeaders;
import com.azure.storage.blob.models.BlobImmutabilityPolicy;
import com.azure.storage.blob.models.BlobRange;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.BlockBlobItem;
Expand Down Expand Up @@ -515,16 +516,19 @@ buffers is not a common scenario for async like it is in sync (and we already bu
final BlobRequestConditions requestConditions = options.getRequestConditions() == null
? new BlobRequestConditions() : options.getRequestConditions();
final boolean computeMd5 = options.isComputeMd5();
final BlobImmutabilityPolicy immutabilityPolicy = options.getImmutabilityPolicy() == null
? new BlobImmutabilityPolicy() : options.getImmutabilityPolicy();
final Boolean legalHold = options.isLegalHold();

BlockBlobAsyncClient blockBlobAsyncClient = getBlockBlobAsyncClient();

Function<Flux<ByteBuffer>, Mono<Response<BlockBlobItem>>> uploadInChunksFunction = (stream) ->
uploadInChunks(blockBlobAsyncClient, stream, parallelTransferOptions, headers, metadata, tags,
tier, requestConditions, computeMd5);
tier, requestConditions, computeMd5, immutabilityPolicy, legalHold);

BiFunction<Flux<ByteBuffer>, Long, Mono<Response<BlockBlobItem>>> uploadFullBlobFunction =
(stream, length) -> uploadFullBlob(blockBlobAsyncClient, stream, length, parallelTransferOptions,
headers, metadata, tags, tier, requestConditions, computeMd5);
headers, metadata, tags, tier, requestConditions, computeMd5, immutabilityPolicy, legalHold);

Flux<ByteBuffer> data = options.getDataFlux() == null ? Utility.convertStreamToByteBuffer(
options.getDataStream(), options.getLength(),
Expand All @@ -541,7 +545,8 @@ buffers is not a common scenario for async like it is in sync (and we already bu
private Mono<Response<BlockBlobItem>> uploadFullBlob(BlockBlobAsyncClient blockBlobAsyncClient,
Flux<ByteBuffer> data, long length, ParallelTransferOptions parallelTransferOptions, BlobHttpHeaders headers,
Map<String, String> metadata, Map<String, String> tags, AccessTier tier,
BlobRequestConditions requestConditions, boolean computeMd5) {
BlobRequestConditions requestConditions, boolean computeMd5, BlobImmutabilityPolicy immutabilityPolicy,
Boolean legalHold) {

/*
Note that there is no need to buffer here as the flux returned by the size gate in this case is created
Expand All @@ -559,14 +564,17 @@ private Mono<Response<BlockBlobItem>> uploadFullBlob(BlockBlobAsyncClient blockB
.setTags(tags)
.setTier(tier)
.setRequestConditions(requestConditions)
.setContentMd5(fluxMd5Wrapper.getMd5()))
.setContentMd5(fluxMd5Wrapper.getMd5())
.setImmutabilityPolicy(immutabilityPolicy)
.setLegalHold(legalHold))
.flatMap(blockBlobAsyncClient::uploadWithResponse);
}

private Mono<Response<BlockBlobItem>> uploadInChunks(BlockBlobAsyncClient blockBlobAsyncClient,
Flux<ByteBuffer> data, ParallelTransferOptions parallelTransferOptions, BlobHttpHeaders headers,
Map<String, String> metadata, Map<String, String> tags, AccessTier tier,
BlobRequestConditions requestConditions, boolean computeMd5) {
BlobRequestConditions requestConditions, boolean computeMd5, BlobImmutabilityPolicy immutabilityPolicy,
Boolean legalHold) {
// TODO: Sample/api reference
// See ProgressReporter for an explanation on why this lock is necessary and why we use AtomicLong.
AtomicLong totalProgress = new AtomicLong();
Expand Down Expand Up @@ -609,7 +617,8 @@ private Mono<Response<BlockBlobItem>> uploadInChunks(BlockBlobAsyncClient blockB
.flatMap(ids ->
blockBlobAsyncClient.commitBlockListWithResponse(new BlockBlobCommitBlockListOptions(ids)
.setHeaders(headers).setMetadata(metadata).setTags(tags).setTier(tier)
.setRequestConditions(requestConditions)));
.setRequestConditions(requestConditions).setImmutabilityPolicy(immutabilityPolicy)
.setLegalHold(legalHold)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ Mono<Response<BlobContainerProperties>> getPropertiesWithResponse(String leaseId
hd.getLastModified(), hd.getXMsLeaseDuration(), hd.getXMsLeaseState(), hd.getXMsLeaseStatus(),
hd.getXMsBlobPublicAccess(), Boolean.TRUE.equals(hd.isXMsHasImmutabilityPolicy()),
Boolean.TRUE.equals(hd.isXMsHasLegalHold()), hd.getXMsDefaultEncryptionScope(),
hd.isXMsDenyEncryptionScopeOverride());
hd.isXMsDenyEncryptionScopeOverride(), hd.isXMsImmutableStorageWithVersioningEnabled());
return new SimpleResponse<>(rb, properties);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.azure.core.http.HttpClient;
import com.azure.core.http.HttpPipeline;
import com.azure.core.http.HttpPipelinePosition;
import com.azure.core.http.policy.AzureSasCredentialPolicy;
import com.azure.core.http.policy.BearerTokenAuthenticationPolicy;
import com.azure.core.http.policy.HttpLogOptions;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.util.ClientOptions;
Expand All @@ -25,6 +27,7 @@
import com.azure.storage.common.implementation.connectionstring.StorageConnectionString;
import com.azure.storage.common.implementation.connectionstring.StorageEndpoint;
import com.azure.storage.common.policy.RequestRetryOptions;
import com.azure.storage.common.policy.StorageSharedKeyCredentialPolicy;

import java.net.MalformedURLException;
import java.net.URL;
Expand Down Expand Up @@ -100,11 +103,6 @@ public BlobServiceAsyncClient buildAsyncClient() {

boolean anonymousAccess = false;

if (Objects.isNull(storageSharedKeyCredential) && Objects.isNull(tokenCredential)
&& Objects.isNull(azureSasCredential) && Objects.isNull(sasToken)) {
anonymousAccess = true;
}

if (Objects.nonNull(customerProvidedKey) && Objects.nonNull(encryptionScope)) {
throw logger.logExceptionAsError(new IllegalArgumentException("Customer provided key and encryption "
+ "scope cannot both be set"));
Expand All @@ -116,6 +114,23 @@ public BlobServiceAsyncClient buildAsyncClient() {
endpoint, retryOptions, logOptions,
clientOptions, httpClient, perCallPolicies, perRetryPolicies, configuration, logger);

boolean foundCredential = false;
for (int i = 0; i < pipeline.getPolicyCount(); i++) {
if (pipeline.getPolicy(i) instanceof StorageSharedKeyCredentialPolicy) {
foundCredential = true;
break;
}
if (pipeline.getPolicy(i) instanceof BearerTokenAuthenticationPolicy) {
foundCredential = true;
break;
}
if (pipeline.getPolicy(i) instanceof AzureSasCredentialPolicy) {
foundCredential = true;
break;
}
}
anonymousAccess = !foundCredential;

return new BlobServiceAsyncClient(pipeline, endpoint, serviceVersion, accountName, customerProvidedKey,
encryptionScope, blobContainerEncryptionScope, anonymousAccess);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import com.azure.storage.blob.implementation.models.AppendBlobsAppendBlockResponse;
import com.azure.storage.blob.implementation.models.AppendBlobsCreateResponse;
import com.azure.storage.blob.implementation.models.AppendBlobsSealResponse;
import com.azure.storage.blob.implementation.models.BlobImmutabilityPolicyMode;
import com.azure.storage.blob.implementation.models.EncryptionScope;
import com.azure.storage.blob.implementation.models.StorageErrorException;
import com.azure.storage.blob.models.BlobHttpHeaders;
import com.azure.storage.blob.models.BlobImmutabilityPolicyMode;
import com.azure.storage.blob.models.CpkInfo;
import com.azure.storage.blob.models.EncryptionAlgorithmType;
import java.net.URL;
Expand Down Expand Up @@ -162,6 +162,7 @@ Mono<AppendBlobsAppendBlockFromUrlResponse> appendBlockFromUrl(
@HeaderParam("x-ms-source-if-none-match") String sourceIfNoneMatch,
@HeaderParam("x-ms-version") String version,
@HeaderParam("x-ms-client-request-id") String requestId,
@HeaderParam("x-ms-copy-source-authorization") String copySourceAuthorization,
@HeaderParam("Accept") String accept,
Context context);

Expand Down Expand Up @@ -497,6 +498,8 @@ public Mono<AppendBlobsAppendBlockResponse> appendBlockWithResponseAsync(
* @param sourceIfNoneMatch Specify an ETag value to operate only on blobs without a matching value.
* @param requestId Provides a client-generated, opaque value with a 1 KB character limit that is recorded in the
* analytics logs when storage analytics logging is enabled.
* @param copySourceAuthorization Only Bearer type is supported. Credentials should be a valid OAuth access token to
* copy source.
* @param cpkInfo Parameter group.
* @param encryptionScope Parameter group.
* @param context The context to associate with this operation.
Expand Down Expand Up @@ -529,6 +532,7 @@ public Mono<AppendBlobsAppendBlockFromUrlResponse> appendBlockFromUrlWithRespons
String sourceIfMatch,
String sourceIfNoneMatch,
String requestId,
String copySourceAuthorization,
CpkInfo cpkInfo,
EncryptionScope encryptionScope,
Context context) {
Expand Down Expand Up @@ -595,6 +599,7 @@ public Mono<AppendBlobsAppendBlockFromUrlResponse> appendBlockFromUrlWithRespons
sourceIfNoneMatch,
this.client.getVersion(),
requestId,
copySourceAuthorization,
accept,
context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.azure.core.util.DateTimeRfc1123;
import com.azure.storage.blob.implementation.models.BlobDeleteType;
import com.azure.storage.blob.implementation.models.BlobExpiryOptions;
import com.azure.storage.blob.implementation.models.BlobImmutabilityPolicyMode;
import com.azure.storage.blob.implementation.models.BlobTags;
import com.azure.storage.blob.implementation.models.BlobsAbortCopyFromURLResponse;
import com.azure.storage.blob.implementation.models.BlobsAcquireLeaseResponse;
Expand Down Expand Up @@ -62,6 +61,7 @@
import com.azure.storage.blob.implementation.models.StorageErrorException;
import com.azure.storage.blob.models.AccessTier;
import com.azure.storage.blob.models.BlobHttpHeaders;
import com.azure.storage.blob.models.BlobImmutabilityPolicyMode;
import com.azure.storage.blob.models.CpkInfo;
import com.azure.storage.blob.models.DeleteSnapshotsOptionType;
import com.azure.storage.blob.models.EncryptionAlgorithmType;
Expand Down Expand Up @@ -302,10 +302,12 @@ Mono<BlobsSetHttpHeadersResponse> setHttpHeaders(

@Put("/{containerName}/{blob}")
@ExpectedResponses({200})
@UnexpectedResponseExceptionType(StorageErrorException.class)
@UnexpectedResponseExceptionType(com.azure.storage.blob.models.BlobStorageException.class)
Mono<BlobsSetImmutabilityPolicyResponse> setImmutabilityPolicy(
@HostParam("url") String url,
@QueryParam("comp") String comp,
@PathParam("containerName") String containerName,
@PathParam("blob") String blob,
@QueryParam("timeout") Integer timeout,
@HeaderParam("x-ms-version") String version,
@HeaderParam("x-ms-client-request-id") String requestId,
Expand All @@ -317,10 +319,12 @@ Mono<BlobsSetImmutabilityPolicyResponse> setImmutabilityPolicy(

@Delete("/{containerName}/{blob}")
@ExpectedResponses({200})
@UnexpectedResponseExceptionType(StorageErrorException.class)
@UnexpectedResponseExceptionType(com.azure.storage.blob.models.BlobStorageException.class)
Mono<BlobsDeleteImmutabilityPolicyResponse> deleteImmutabilityPolicy(
@HostParam("url") String url,
@QueryParam("comp") String comp,
@PathParam("containerName") String containerName,
@PathParam("blob") String blob,
@QueryParam("timeout") Integer timeout,
@HeaderParam("x-ms-version") String version,
@HeaderParam("x-ms-client-request-id") String requestId,
Expand All @@ -329,10 +333,12 @@ Mono<BlobsDeleteImmutabilityPolicyResponse> deleteImmutabilityPolicy(

@Put("/{containerName}/{blob}")
@ExpectedResponses({200})
@UnexpectedResponseExceptionType(StorageErrorException.class)
@UnexpectedResponseExceptionType(com.azure.storage.blob.models.BlobStorageException.class)
Mono<BlobsSetLegalHoldResponse> setLegalHold(
@HostParam("url") String url,
@QueryParam("comp") String comp,
@PathParam("containerName") String containerName,
@PathParam("blob") String blob,
@QueryParam("timeout") Integer timeout,
@HeaderParam("x-ms-version") String version,
@HeaderParam("x-ms-client-request-id") String requestId,
Expand Down Expand Up @@ -561,6 +567,7 @@ Mono<BlobsCopyFromURLResponse> copyFromURL(
@HeaderParam("x-ms-immutability-policy-until-date") DateTimeRfc1123 immutabilityPolicyExpiry,
@HeaderParam("x-ms-immutability-policy-mode") BlobImmutabilityPolicyMode immutabilityPolicyMode,
@HeaderParam("x-ms-legal-hold") Boolean legalHold,
@HeaderParam("x-ms-copy-source-authorization") String copySourceAuthorization,
@HeaderParam("Accept") String accept,
Context context);

Expand Down Expand Up @@ -1395,6 +1402,8 @@ public Mono<BlobsSetHttpHeadersResponse> setHttpHeadersWithResponseAsync(
/**
* The Set Immutability Policy operation sets the immutability policy on the blob.
*
* @param containerName The container name.
* @param blob The blob name.
* @param timeout The timeout parameter is expressed in seconds. For more information, see &lt;a
* href="https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/setting-timeouts-for-blob-service-operations"&gt;Setting
* Timeouts for Blob Service Operations.&lt;/a&gt;.
Expand All @@ -1412,6 +1421,8 @@ public Mono<BlobsSetHttpHeadersResponse> setHttpHeadersWithResponseAsync(
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<BlobsSetImmutabilityPolicyResponse> setImmutabilityPolicyWithResponseAsync(
String containerName,
String blob,
Integer timeout,
String requestId,
OffsetDateTime ifUnmodifiedSince,
Expand All @@ -1427,6 +1438,8 @@ public Mono<BlobsSetImmutabilityPolicyResponse> setImmutabilityPolicyWithRespons
return service.setImmutabilityPolicy(
this.client.getUrl(),
comp,
containerName,
blob,
timeout,
this.client.getVersion(),
requestId,
Expand All @@ -1440,6 +1453,8 @@ public Mono<BlobsSetImmutabilityPolicyResponse> setImmutabilityPolicyWithRespons
/**
* The Delete Immutability Policy operation deletes the immutability policy on the blob.
*
* @param containerName The container name.
* @param blob The blob name.
* @param timeout The timeout parameter is expressed in seconds. For more information, see &lt;a
* href="https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/setting-timeouts-for-blob-service-operations"&gt;Setting
* Timeouts for Blob Service Operations.&lt;/a&gt;.
Expand All @@ -1453,16 +1468,26 @@ public Mono<BlobsSetImmutabilityPolicyResponse> setImmutabilityPolicyWithRespons
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<BlobsDeleteImmutabilityPolicyResponse> deleteImmutabilityPolicyWithResponseAsync(
Integer timeout, String requestId, Context context) {
String containerName, String blob, Integer timeout, String requestId, Context context) {
final String comp = "immutabilityPolicies";
final String accept = "application/xml";
return service.deleteImmutabilityPolicy(
this.client.getUrl(), comp, timeout, this.client.getVersion(), requestId, accept, context);
this.client.getUrl(),
comp,
containerName,
blob,
timeout,
this.client.getVersion(),
requestId,
accept,
context);
}

/**
* The Set Legal Hold operation sets a legal hold on the blob.
*
* @param containerName The container name.
* @param blob The blob name.
* @param legalHold Specified if a legal hold should be set on the blob.
* @param timeout The timeout parameter is expressed in seconds. For more information, see &lt;a
* href="https://docs.microsoft.com/en-us/rest/api/storageservices/fileservices/setting-timeouts-for-blob-service-operations"&gt;Setting
Expand All @@ -1477,11 +1502,20 @@ public Mono<BlobsDeleteImmutabilityPolicyResponse> deleteImmutabilityPolicyWithR
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<BlobsSetLegalHoldResponse> setLegalHoldWithResponseAsync(
boolean legalHold, Integer timeout, String requestId, Context context) {
String containerName, String blob, boolean legalHold, Integer timeout, String requestId, Context context) {
final String comp = "legalhold";
final String accept = "application/xml";
return service.setLegalHold(
this.client.getUrl(), comp, timeout, this.client.getVersion(), requestId, legalHold, accept, context);
this.client.getUrl(),
comp,
containerName,
blob,
timeout,
this.client.getVersion(),
requestId,
legalHold,
accept,
context);
}

/**
Expand Down Expand Up @@ -2167,6 +2201,8 @@ public Mono<BlobsStartCopyFromURLResponse> startCopyFromURLWithResponseAsync(
* @param immutabilityPolicyExpiry Specifies the date time when the blobs immutability policy is set to expire.
* @param immutabilityPolicyMode Specifies the immutability policy mode to set on the blob.
* @param legalHold Specified if a legal hold should be set on the blob.
* @param copySourceAuthorization Only Bearer type is supported. Credentials should be a valid OAuth access token to
* copy source.
* @param context The context to associate with this operation.
* @throws IllegalArgumentException thrown if parameters fail the validation.
* @throws StorageErrorException thrown if the request is rejected by server.
Expand Down Expand Up @@ -2197,6 +2233,7 @@ public Mono<BlobsCopyFromURLResponse> copyFromURLWithResponseAsync(
OffsetDateTime immutabilityPolicyExpiry,
BlobImmutabilityPolicyMode immutabilityPolicyMode,
Boolean legalHold,
String copySourceAuthorization,
Context context) {
final String xMsRequiresSync = "true";
final String accept = "application/xml";
Expand Down Expand Up @@ -2237,6 +2274,7 @@ public Mono<BlobsCopyFromURLResponse> copyFromURLWithResponseAsync(
immutabilityPolicyExpiryConverted,
immutabilityPolicyMode,
legalHold,
copySourceAuthorization,
accept,
context);
}
Expand Down
Loading