From 9ef22e05ee02702aa08035ecbf61da64dd5fdb61 Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Tue, 17 Oct 2023 11:48:23 -0700 Subject: [PATCH 01/10] Add checkpoint data to job plan file --- .../src/AppendBlobStorageResource.cs | 12 +- .../src/BlobDestinationCheckpointData.cs | 18 +-- .../src/BlobSourceCheckpointData.cs | 2 +- .../src/BlobStorageResourceContainer.cs | 16 +++ .../src/BlockBlobStorageResource.cs | 12 +- .../src/PageBlobStorageResource.cs | 12 +- .../ShareDirectoryStorageResourceContainer.cs | 10 ++ .../src/ShareFileStorageResource.cs | 4 +- .../src/LocalDestinationCheckpointData.cs | 2 +- .../LocalDirectoryStorageResourceContainer.cs | 10 ++ .../src/LocalFileStorageResource.cs | 12 +- .../src/LocalSourceCheckpointData.cs | 2 +- .../src/Shared/DataMovementConstants.cs | 6 +- .../src/Shared/JobPlan/JobPlanHeader.cs | 113 +++++++++++++++--- .../src/Shared/JobPlanExtensions.cs | 21 ++-- .../src/Shared/LocalTransferCheckpointer.cs | 4 +- .../src/StorageResource.cs | 12 ++ .../src/StorageResourceCheckpointData.cs | 2 +- .../src/StorageResourceItem.cs | 12 -- .../tests/JobPlanHeaderTests.cs | 28 ++++- .../tests/LocalTransferCheckpointerFactory.cs | 14 ++- .../tests/MockResourceCheckpointData.cs | 36 ++++++ .../tests/MockStorageResource.cs | 12 +- .../tests/Resources/SampleJobPlanFile.b1.ndm | Bin 108 -> 166 bytes .../tests/Shared/CheckpointerTesting.cs | 33 ----- .../Shared/MemoryStorageResourceContainer.cs | 10 ++ .../tests/Shared/MemoryStorageResourceItem.cs | 4 +- 27 files changed, 273 insertions(+), 146 deletions(-) create mode 100644 sdk/storage/Azure.Storage.DataMovement/tests/MockResourceCheckpointData.cs diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/AppendBlobStorageResource.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/AppendBlobStorageResource.cs index 80c85f3ffa618..8e6ccd9d83f74 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/AppendBlobStorageResource.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/AppendBlobStorageResource.cs @@ -283,20 +283,12 @@ protected override async Task DeleteIfExistsAsync(CancellationToken cancel return await BlobClient.DeleteIfExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false); } - /// - /// Gets the source checkpoint data for this resource that will be written to the checkpointer. - /// - /// A containing the checkpoint information for this resource. - protected override StorageResourceCheckpointData GetSourceCheckpointData() + public override StorageResourceCheckpointData GetSourceCheckpointData() { return new BlobSourceCheckpointData(); } - /// - /// Gets the destination checkpoint data for this resource that will be written to the checkpointer. - /// - /// A containing the checkpoint information for this resource. - protected override StorageResourceCheckpointData GetDestinationCheckpointData() + public override StorageResourceCheckpointData GetDestinationCheckpointData() { return new BlobDestinationCheckpointData( BlobType.Append, diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobDestinationCheckpointData.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobDestinationCheckpointData.cs index ac906a2a04991..738667ae93e97 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobDestinationCheckpointData.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobDestinationCheckpointData.cs @@ -83,7 +83,7 @@ public BlobDestinationCheckpointData( _cpkScopeBytes = CpkScope != default ? Encoding.UTF8.GetBytes(CpkScope) : new byte[0]; } - protected override void Serialize(Stream stream) + public override void Serialize(Stream stream) { Argument.AssertNotNull(stream, nameof(stream)); @@ -97,31 +97,31 @@ protected override void Serialize(Stream stream) writer.Write((byte)BlobType); // ContentType offset/length - WriteVariableLengthFieldInfo(writer, _contentTypeBytes, ref currentVariableLengthIndex); + WriteVariableLengthFieldInfo(writer, _contentTypeBytes.Length, ref currentVariableLengthIndex); // ContentEncoding offset/length - WriteVariableLengthFieldInfo(writer, _contentEncodingBytes, ref currentVariableLengthIndex); + WriteVariableLengthFieldInfo(writer, _contentEncodingBytes.Length, ref currentVariableLengthIndex); // ContentLanguage offset/length - WriteVariableLengthFieldInfo(writer, _contentLanguageBytes, ref currentVariableLengthIndex); + WriteVariableLengthFieldInfo(writer, _contentLanguageBytes.Length, ref currentVariableLengthIndex); // ContentDisposition offset/length - WriteVariableLengthFieldInfo(writer, _contentDispositionBytes, ref currentVariableLengthIndex); + WriteVariableLengthFieldInfo(writer, _contentDispositionBytes.Length, ref currentVariableLengthIndex); // CacheControl offset/length - WriteVariableLengthFieldInfo(writer, _cacheControlBytes, ref currentVariableLengthIndex); + WriteVariableLengthFieldInfo(writer, _cacheControlBytes.Length, ref currentVariableLengthIndex); // AccessTier writer.Write((byte)AccessTier.ToJobPlanAccessTier()); // Metadata offset/length - WriteVariableLengthFieldInfo(writer, _metadataBytes, ref currentVariableLengthIndex); + WriteVariableLengthFieldInfo(writer, _metadataBytes.Length, ref currentVariableLengthIndex); // Tags offset/length - WriteVariableLengthFieldInfo(writer, _tagsBytes, ref currentVariableLengthIndex); + WriteVariableLengthFieldInfo(writer, _tagsBytes.Length, ref currentVariableLengthIndex); // CpkScope offset/length - WriteVariableLengthFieldInfo(writer, _cpkScopeBytes, ref currentVariableLengthIndex); + WriteVariableLengthFieldInfo(writer, _cpkScopeBytes.Length, ref currentVariableLengthIndex); writer.Write(_contentTypeBytes); writer.Write(_contentEncodingBytes); diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobSourceCheckpointData.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobSourceCheckpointData.cs index 480b290c6f1f7..47a290dbe02c8 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobSourceCheckpointData.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobSourceCheckpointData.cs @@ -9,7 +9,7 @@ internal class BlobSourceCheckpointData : StorageResourceCheckpointData { public override int Length => 0; - protected override void Serialize(Stream stream) + public override void Serialize(Stream stream) { } } diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobStorageResourceContainer.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobStorageResourceContainer.cs index e07d9bfdce039..a5f790189ff1c 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobStorageResourceContainer.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobStorageResourceContainer.cs @@ -129,6 +129,22 @@ protected override async IAsyncEnumerable GetStorageResourcesAs } } + public override StorageResourceCheckpointData GetSourceCheckpointData() + { + return new BlobSourceCheckpointData(); + } + + public override StorageResourceCheckpointData GetDestinationCheckpointData() + { + return new BlobDestinationCheckpointData( + _options.BlobType, + _options.BlobOptions.HttpHeaders, + _options.BlobOptions.AccessTier, + _options.BlobOptions.Metadata, + _options.BlobOptions.Tags, + default); // TODO: Update when we support encryption scopes + } + private string ApplyOptionalPrefix(string path) => IsDirectory ? string.Join("/", DirectoryPrefix, path) diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlockBlobStorageResource.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlockBlobStorageResource.cs index b9810b23c2ee4..2cc8e99e519ff 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlockBlobStorageResource.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlockBlobStorageResource.cs @@ -325,20 +325,12 @@ protected override async Task DeleteIfExistsAsync(CancellationToken cancel return await BlobClient.DeleteIfExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false); } - /// - /// Gets the source checkpoint data for this resource that will be written to the checkpointer. - /// - /// A containing the checkpoint information for this resource. - protected override StorageResourceCheckpointData GetSourceCheckpointData() + public override StorageResourceCheckpointData GetSourceCheckpointData() { return new BlobSourceCheckpointData(); } - /// - /// Gets the destination checkpoint data for this resource that will be written to the checkpointer. - /// - /// A containing the checkpoint information for this resource. - protected override StorageResourceCheckpointData GetDestinationCheckpointData() + public override StorageResourceCheckpointData GetDestinationCheckpointData() { return new BlobDestinationCheckpointData( BlobType.Block, diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/PageBlobStorageResource.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/PageBlobStorageResource.cs index 6b7710fa25ec3..f56f7d170ed73 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/PageBlobStorageResource.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/PageBlobStorageResource.cs @@ -287,20 +287,12 @@ protected override async Task DeleteIfExistsAsync(CancellationToken cancel return await BlobClient.DeleteIfExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false); } - /// - /// Gets the source checkpoint data for this resource that will be written to the checkpointer. - /// - /// A containing the checkpoint information for this resource. - protected override StorageResourceCheckpointData GetSourceCheckpointData() + public override StorageResourceCheckpointData GetSourceCheckpointData() { return new BlobSourceCheckpointData(); } - /// - /// Gets the destination checkpoint data for this resource that will be written to the checkpointer. - /// - /// A containing the checkpoint information for this resource. - protected override StorageResourceCheckpointData GetDestinationCheckpointData() + public override StorageResourceCheckpointData GetDestinationCheckpointData() { return new BlobDestinationCheckpointData( BlobType.Page, diff --git a/sdk/storage/Azure.Storage.DataMovement.Files.Shares/src/ShareDirectoryStorageResourceContainer.cs b/sdk/storage/Azure.Storage.DataMovement.Files.Shares/src/ShareDirectoryStorageResourceContainer.cs index 161024ebb0fa8..4993ba818c4bc 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Files.Shares/src/ShareDirectoryStorageResourceContainer.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Files.Shares/src/ShareDirectoryStorageResourceContainer.cs @@ -49,5 +49,15 @@ protected override async IAsyncEnumerable GetStorageResourcesAs yield return new ShareFileStorageResource(client, ResourceOptions); } } + + public override StorageResourceCheckpointData GetSourceCheckpointData() + { + throw new NotImplementedException(); + } + + public override StorageResourceCheckpointData GetDestinationCheckpointData() + { + throw new NotImplementedException(); + } } } diff --git a/sdk/storage/Azure.Storage.DataMovement.Files.Shares/src/ShareFileStorageResource.cs b/sdk/storage/Azure.Storage.DataMovement.Files.Shares/src/ShareFileStorageResource.cs index 819a094e74a4c..138f2832d58f2 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Files.Shares/src/ShareFileStorageResource.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Files.Shares/src/ShareFileStorageResource.cs @@ -201,12 +201,12 @@ protected override async Task ReadStreamAsync( return response.Value.ToStorageResourceReadStreamResult(); } - protected override StorageResourceCheckpointData GetSourceCheckpointData() + public override StorageResourceCheckpointData GetSourceCheckpointData() { throw new NotImplementedException(); } - protected override StorageResourceCheckpointData GetDestinationCheckpointData() + public override StorageResourceCheckpointData GetDestinationCheckpointData() { throw new NotImplementedException(); } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/LocalDestinationCheckpointData.cs b/sdk/storage/Azure.Storage.DataMovement/src/LocalDestinationCheckpointData.cs index 97a45b05a20af..d592b5faaceab 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/LocalDestinationCheckpointData.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/LocalDestinationCheckpointData.cs @@ -9,7 +9,7 @@ internal class LocalDestinationCheckpointData : StorageResourceCheckpointData { public override int Length => 0; - protected internal override void Serialize(Stream stream) + public override void Serialize(Stream stream) { } } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/LocalDirectoryStorageResourceContainer.cs b/sdk/storage/Azure.Storage.DataMovement/src/LocalDirectoryStorageResourceContainer.cs index 5e11dd6a633e5..a919df7668101 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/LocalDirectoryStorageResourceContainer.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/LocalDirectoryStorageResourceContainer.cs @@ -79,5 +79,15 @@ protected internal override async IAsyncEnumerable GetStorageRe } } } + + public override StorageResourceCheckpointData GetSourceCheckpointData() + { + return new LocalSourceCheckpointData(); + } + + public override StorageResourceCheckpointData GetDestinationCheckpointData() + { + return new LocalDestinationCheckpointData(); + } } } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/LocalFileStorageResource.cs b/sdk/storage/Azure.Storage.DataMovement/src/LocalFileStorageResource.cs index ae32bd3705ef8..cdb84b6fb3008 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/LocalFileStorageResource.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/LocalFileStorageResource.cs @@ -276,20 +276,12 @@ protected internal override Task DeleteIfExistsAsync(CancellationToken can return Task.FromResult(false); } - /// - /// Gets the source checkpoint data for this resource that will be written to the checkpointer. - /// - /// A containing the checkpoint information for this resource. - protected internal override StorageResourceCheckpointData GetSourceCheckpointData() + public override StorageResourceCheckpointData GetSourceCheckpointData() { return new LocalSourceCheckpointData(); } - /// - /// Gets the destination checkpoint data for this resource that will be written to the checkpointer. - /// - /// A containing the checkpoint information for this resource. - protected internal override StorageResourceCheckpointData GetDestinationCheckpointData() + public override StorageResourceCheckpointData GetDestinationCheckpointData() { return new LocalDestinationCheckpointData(); } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/LocalSourceCheckpointData.cs b/sdk/storage/Azure.Storage.DataMovement/src/LocalSourceCheckpointData.cs index 48e09453d0495..d4aa8a90877bb 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/LocalSourceCheckpointData.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/LocalSourceCheckpointData.cs @@ -9,7 +9,7 @@ internal class LocalSourceCheckpointData : StorageResourceCheckpointData { public override int Length => 0; - protected internal override void Serialize(Stream stream) + public override void Serialize(Stream stream) { } } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs b/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs index c71a94ae80e9c..951c58d7851de 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/Shared/DataMovementConstants.cs @@ -88,7 +88,11 @@ internal static class JobPlanFile internal const int ParentSourcePathLengthIndex = ParentSourcePathOffsetIndex + IntSizeInBytes; internal const int ParentDestPathOffsetIndex = ParentSourcePathLengthIndex + IntSizeInBytes; internal const int ParentDestPathLengthIndex = ParentDestPathOffsetIndex + IntSizeInBytes; - internal const int VariableLengthStartIndex = ParentDestPathLengthIndex + IntSizeInBytes; + internal const int SourceCheckpointDataOffsetIndex = ParentDestPathLengthIndex + IntSizeInBytes; + internal const int SourceCheckpointDataLengthIndex = SourceCheckpointDataOffsetIndex + IntSizeInBytes; + internal const int DestinationCheckpointDataOffsetIndex = SourceCheckpointDataLengthIndex + IntSizeInBytes; + internal const int DestinationCheckpointDataLengthIndex = DestinationCheckpointDataOffsetIndex + IntSizeInBytes; + internal const int VariableLengthStartIndex = DestinationCheckpointDataLengthIndex + IntSizeInBytes; } /// diff --git a/sdk/storage/Azure.Storage.DataMovement/src/Shared/JobPlan/JobPlanHeader.cs b/sdk/storage/Azure.Storage.DataMovement/src/Shared/JobPlan/JobPlanHeader.cs index 2226b249558e8..c8433d652b685 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/Shared/JobPlan/JobPlanHeader.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/Shared/JobPlan/JobPlanHeader.cs @@ -65,6 +65,21 @@ internal class JobPlanHeader /// public string ParentDestinationPath; + /// + /// Additional checkpoint data specific to the source resource. + /// Only populated when using . + /// + public byte[] SourceCheckpointData; + + /// + /// Additional checkpoint data specific to the destination resource. + /// Only populated when using . + /// + public byte[] DestinationCheckpointData; + + private StorageResourceCheckpointData _sourceCheckpointData; + private StorageResourceCheckpointData _destinationCheckpointData; + public JobPlanHeader( string version, string transferId, @@ -76,7 +91,9 @@ public JobPlanHeader( bool enumerationComplete, DataTransferStatus jobStatus, string parentSourcePath, - string parentDestinationPath) + string parentDestinationPath, + StorageResourceCheckpointData sourceCheckpointData, + StorageResourceCheckpointData destinationCheckpointData) { Argument.AssertNotNull(version, nameof(version)); Argument.AssertNotNullOrEmpty(transferId, nameof(transferId)); @@ -85,6 +102,8 @@ public JobPlanHeader( Argument.AssertNotNull(createTime, nameof(createTime)); Argument.AssertNotNullOrEmpty(parentSourcePath, nameof(parentSourcePath)); Argument.AssertNotNullOrEmpty(parentDestinationPath, nameof(parentDestinationPath)); + Argument.AssertNotNull(sourceCheckpointData, nameof(sourceCheckpointData)); + Argument.AssertNotNull(destinationCheckpointData, nameof(destinationCheckpointData)); if (sourceProviderId.Length > DataMovementConstants.JobPlanFile.ProviderIdMaxLength) { @@ -106,6 +125,39 @@ public JobPlanHeader( JobStatus = jobStatus; ParentSourcePath = parentSourcePath; ParentDestinationPath = parentDestinationPath; + + _sourceCheckpointData = sourceCheckpointData; + _destinationCheckpointData = destinationCheckpointData; + } + + private JobPlanHeader( + string version, + string transferId, + DateTimeOffset createTime, + JobPlanOperation operationType, + string sourceProviderId, + string destinationProviderId, + bool isContainer, + bool enumerationComplete, + DataTransferStatus jobStatus, + string parentSourcePath, + string parentDestinationPath, + byte[] sourceCheckpointData, + byte[] destinationCheckpointData) + { + Version = version; + TransferId = transferId; + CreateTime = createTime; + OperationType = operationType; + SourceProviderId = sourceProviderId; + DestinationProviderId = destinationProviderId; + IsContainer = isContainer; + EnumerationComplete = enumerationComplete; + JobStatus = jobStatus; + ParentSourcePath = parentSourcePath; + ParentDestinationPath = parentDestinationPath; + SourceCheckpointData = sourceCheckpointData; + DestinationCheckpointData = destinationCheckpointData; } public void Serialize(Stream stream) @@ -145,17 +197,26 @@ public void Serialize(Stream stream) // ParentSourcePath offset/length byte[] parentSourcePathBytes = Encoding.UTF8.GetBytes(ParentSourcePath); - JobPlanExtensions.WriteVariableLengthFieldInfo(writer, parentSourcePathBytes, ref currentVariableLengthIndex); + JobPlanExtensions.WriteVariableLengthFieldInfo(writer, parentSourcePathBytes.Length, ref currentVariableLengthIndex); // ParentDestinationPath offset/length byte[] parentDestinationPathBytes = Encoding.UTF8.GetBytes(ParentDestinationPath); - JobPlanExtensions.WriteVariableLengthFieldInfo(writer, parentDestinationPathBytes, ref currentVariableLengthIndex); + JobPlanExtensions.WriteVariableLengthFieldInfo(writer, parentDestinationPathBytes.Length, ref currentVariableLengthIndex); + + // SourceCheckpointData offset/length + JobPlanExtensions.WriteVariableLengthFieldInfo(writer, _sourceCheckpointData.Length, ref currentVariableLengthIndex); + + // DestinationCheckpointData offset/length + JobPlanExtensions.WriteVariableLengthFieldInfo(writer, _destinationCheckpointData.Length, ref currentVariableLengthIndex); // ParentSourcePath writer.Write(parentSourcePathBytes); // ParentDestinationPath writer.Write(parentDestinationPathBytes); + + _sourceCheckpointData.Serialize(stream); + _destinationCheckpointData.Serialize(stream); } public static JobPlanHeader Deserialize(Stream stream) @@ -201,17 +262,21 @@ public static JobPlanHeader Deserialize(Stream stream) // JobStatus JobPlanStatus jobPlanStatus = (JobPlanStatus)reader.ReadInt32(); - // ParentSourcePath offset + // ParentSourcePath offset/length int parentSourcePathOffset = reader.ReadInt32(); - - // ParentSourcePath length int parentSourcePathLength = reader.ReadInt32(); - // ParentDestPath offset - int parentDestPathOffset = reader.ReadInt32(); + // ParentDestinationPath offset/length + int parentDestinationPathOffset = reader.ReadInt32(); + int parentDestinationPathLength = reader.ReadInt32(); + + // SourceCheckpointData offset/length + int sourceCheckpointDataOffset = reader.ReadInt32(); + int sourceCheckpointDataLength = reader.ReadInt32(); - // ParentDestPath length - int parentDestPathLength = reader.ReadInt32(); + // DestinationCheckpointData offset/length + int destinationCheckpointDataOffset = reader.ReadInt32(); + int destinationCheckpointDataLength = reader.ReadInt32(); // ParentSourcePath string parentSourcePath = null; @@ -224,11 +289,27 @@ public static JobPlanHeader Deserialize(Stream stream) // ParentDestinationPath string parentDestinationPath = null; - if (parentDestPathOffset > 0) + if (parentDestinationPathOffset > 0) + { + reader.BaseStream.Position = parentDestinationPathOffset; + byte[] parentDestinationPathBytes = reader.ReadBytes(parentDestinationPathLength); + parentDestinationPath = parentDestinationPathBytes.ToString(parentDestinationPathLength); + } + + // SourceCheckpointData + byte[] sourceCheckpointData = Array.Empty(); + if (sourceCheckpointDataOffset > 0) + { + reader.BaseStream.Position = sourceCheckpointDataOffset; + sourceCheckpointData = reader.ReadBytes(sourceCheckpointDataLength); + } + + // DestinationCheckpointData + byte[] destinationCheckpointData = Array.Empty(); + if (destinationCheckpointDataOffset > 0) { - reader.BaseStream.Position = parentDestPathOffset; - byte[] parentDestinationPathBytes = reader.ReadBytes(parentDestPathLength); - parentDestinationPath = parentDestinationPathBytes.ToString(parentDestPathLength); + reader.BaseStream.Position = destinationCheckpointDataOffset; + destinationCheckpointData = reader.ReadBytes(destinationCheckpointDataLength); } return new JobPlanHeader( @@ -242,7 +323,9 @@ public static JobPlanHeader Deserialize(Stream stream) enumerationComplete, jobPlanStatus.ToDataTransferStatus(), parentSourcePath, - parentDestinationPath); + parentDestinationPath, + sourceCheckpointData, + destinationCheckpointData); } private static void WritePaddedString(BinaryWriter writer, string value, int setSizeInBytes) diff --git a/sdk/storage/Azure.Storage.DataMovement/src/Shared/JobPlanExtensions.cs b/sdk/storage/Azure.Storage.DataMovement/src/Shared/JobPlanExtensions.cs index 3722b92bf1e33..c0d41f0395100 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/Shared/JobPlanExtensions.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/Shared/JobPlanExtensions.cs @@ -257,25 +257,24 @@ internal static DataTransferStatus ToDataTransferStatus(this JobPlanStatus jobPl } /// - /// Writes the length and offset field for the given byte array - /// and increments currentVariableLengthIndex accordingly. + /// Writes the given length and offset and increments currentOffset accordingly. /// /// The writer to write to. - /// The data to write info about. - /// - /// A reference to the current index of the variable length fields + /// The length of the variable length field. + /// + /// A reference to the current offset of the variable length fields /// that will be used to set the offset and then incremented. /// internal static void WriteVariableLengthFieldInfo( BinaryWriter writer, - byte[] bytes, - ref int currentVariableLengthIndex) + int length, + ref int currentOffset) { // Write the offset, -1 if size is 0 - if (bytes.Length > 0) + if (length > 0) { - writer.Write(currentVariableLengthIndex); - currentVariableLengthIndex += bytes.Length; + writer.Write(currentOffset); + currentOffset += length; } else { @@ -283,7 +282,7 @@ internal static void WriteVariableLengthFieldInfo( } // Write the length - writer.Write(bytes.Length); + writer.Write(length); } internal static string ToSanitizedString(this Uri uri) diff --git a/sdk/storage/Azure.Storage.DataMovement/src/Shared/LocalTransferCheckpointer.cs b/sdk/storage/Azure.Storage.DataMovement/src/Shared/LocalTransferCheckpointer.cs index 17160e35b1f4e..28c4747af434a 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/Shared/LocalTransferCheckpointer.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/Shared/LocalTransferCheckpointer.cs @@ -79,7 +79,9 @@ public override async Task AddNewJobAsync( false, /* enumerationComplete */ new DataTransferStatusInternal(), source.Uri.ToSanitizedString(), - destination.Uri.ToSanitizedString()); + destination.Uri.ToSanitizedString(), + source.GetSourceCheckpointData(), + destination.GetDestinationCheckpointData()); using (Stream headerStream = new MemoryStream()) { diff --git a/sdk/storage/Azure.Storage.DataMovement/src/StorageResource.cs b/sdk/storage/Azure.Storage.DataMovement/src/StorageResource.cs index cc04d0c37eece..d120c99383759 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/StorageResource.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/StorageResource.cs @@ -32,5 +32,17 @@ protected StorageResource() /// NOTE: Must be no more than 5 characters long. /// public abstract string ProviderId { get; } + + /// + /// Gets the source checkpoint data for this resource that will be written to the checkpointer. + /// + /// A containing the checkpoint information for this resource. + public abstract StorageResourceCheckpointData GetSourceCheckpointData(); + + /// + /// Gets the destination checkpoint data for this resource that will be written to the checkpointer. + /// + /// A containing the checkpoint information for this resource. + public abstract StorageResourceCheckpointData GetDestinationCheckpointData(); } } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/StorageResourceCheckpointData.cs b/sdk/storage/Azure.Storage.DataMovement/src/StorageResourceCheckpointData.cs index fa23a33e53bc8..d47613fe417e4 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/StorageResourceCheckpointData.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/StorageResourceCheckpointData.cs @@ -19,6 +19,6 @@ public abstract class StorageResourceCheckpointData /// Serializes the checkpoint data into the given stream. /// /// The stream to serialize the data into. - protected internal abstract void Serialize(Stream stream); + public abstract void Serialize(Stream stream); } } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/StorageResourceItem.cs b/sdk/storage/Azure.Storage.DataMovement/src/StorageResourceItem.cs index fb9f0d2bbbf05..dc8744473d227 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/StorageResourceItem.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/StorageResourceItem.cs @@ -184,17 +184,5 @@ protected internal abstract Task CopyBlockFromUriAsync( /// Otherwise if the storage resource does not exist, false will be returned. /// protected internal abstract Task DeleteIfExistsAsync(CancellationToken cancellationToken = default); - - /// - /// Gets the source checkpoint data for this resource that will be written to the checkpointer. - /// - /// A containing the checkpoint information for this resource. - protected internal abstract StorageResourceCheckpointData GetSourceCheckpointData(); - - /// - /// Gets the destination checkpoint data for this resource that will be written to the checkpointer. - /// - /// A containing the checkpoint information for this resource. - protected internal abstract StorageResourceCheckpointData GetDestinationCheckpointData(); } } diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/JobPlanHeaderTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/JobPlanHeaderTests.cs index 73fd0a9b10bac..758d18b13e9fe 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/JobPlanHeaderTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/JobPlanHeaderTests.cs @@ -10,14 +10,34 @@ namespace Azure.Storage.DataMovement.Tests { public class JobPlanHeaderTests : DataMovementTestBase { + private MockResourceCheckpointData _checkpointData = new(); + public JobPlanHeaderTests(bool async) : base(async, default) { } + private JobPlanHeader CreateJobPlanHeader() + { + return new JobPlanHeader( + DataMovementConstants.JobPlanFile.SchemaVersion, + DefaultTransferId, + DefaultCreateTime, + DefaultJobPlanOperation, + DefaultSourceProviderId, + DefaultDestinationProviderId, + isContainer: false, + enumerationComplete: false, + DefaultJobStatus, + DefaultSourcePath, + DefaultDestinationPath, + _checkpointData, + _checkpointData); + } + [Test] public void Ctor() { - JobPlanHeader header = CreateDefaultJobHeader(); + JobPlanHeader header = CreateJobPlanHeader(); Assert.AreEqual(DataMovementConstants.JobPlanFile.SchemaVersion, header.Version); Assert.AreEqual(DefaultTransferId, header.TransferId); @@ -35,7 +55,7 @@ public void Ctor() [Test] public void Serialize() { - JobPlanHeader header = CreateDefaultJobHeader(); + JobPlanHeader header = CreateJobPlanHeader(); string samplePath = Path.Combine("Resources", "SampleJobPlanFile.b1.ndm"); using (MemoryStream headerStream = new MemoryStream(DataMovementConstants.JobPlanFile.VariableLengthStartIndex)) @@ -54,7 +74,7 @@ public void Serialize() [Test] public void Deserialize() { - JobPlanHeader header = CreateDefaultJobHeader(); + JobPlanHeader header = CreateJobPlanHeader(); using (Stream stream = new MemoryStream(DataMovementConstants.JobPlanFile.VariableLengthStartIndex)) { @@ -87,6 +107,8 @@ private void DeserializeAndVerify(Stream stream, string version) Assert.AreEqual(DefaultJobStatus, deserialized.JobStatus); Assert.AreEqual(DefaultSourcePath, deserialized.ParentSourcePath); Assert.AreEqual(DefaultDestinationPath, deserialized.ParentDestinationPath); + CollectionAssert.AreEqual(_checkpointData.Bytes, deserialized.SourceCheckpointData); + CollectionAssert.AreEqual(_checkpointData.Bytes, deserialized.DestinationCheckpointData); } } } diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/LocalTransferCheckpointerFactory.cs b/sdk/storage/Azure.Storage.DataMovement/tests/LocalTransferCheckpointerFactory.cs index 4508a7e1aa937..b45fe3f43ad6c 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/LocalTransferCheckpointerFactory.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/LocalTransferCheckpointerFactory.cs @@ -47,6 +47,7 @@ internal static readonly DateTimeOffset _testStartTime internal const JobPartPlanRehydratePriorityType _testRehydratePriorityType = JobPartPlanRehydratePriorityType.None; internal static readonly DataTransferStatus _testJobStatus = new DataTransferStatusInternal(DataTransferState.Queued, false, false); internal static readonly DataTransferStatus _testPartStatus = new DataTransferStatusInternal(DataTransferState.Queued, false, false); + internal static readonly MockResourceCheckpointData _mockCheckpointData = new MockResourceCheckpointData(); private string _checkpointerPath; @@ -68,7 +69,7 @@ public LocalTransferCheckpointer BuildCheckpointer(long transferCount) jobPartCount: _partCountDefault); } - // Return constructed chekcpointer + // Return constructed checkpointer return new LocalTransferCheckpointer(_checkpointerPath); } @@ -153,9 +154,14 @@ internal void CreateStubJobPlanFile( string sourceProviderId = _testSourceProviderId, string destinationProviderId = _testDestinationProviderId, bool isContainer = false, - DataTransferStatus status = default) + DataTransferStatus status = default, + StorageResourceCheckpointData sourceCheckpointData = default, + StorageResourceCheckpointData destinationCheckpointData = default) { status ??= new DataTransferStatus(); + sourceCheckpointData ??= _mockCheckpointData; + destinationCheckpointData ??= _mockCheckpointData; + JobPlanHeader header = new JobPlanHeader( DataMovementConstants.JobPlanFile.SchemaVersion, transferId, @@ -167,7 +173,9 @@ internal void CreateStubJobPlanFile( false, /* enumerationComplete */ status, parentSourcePath, - parentDestinationPath); + parentDestinationPath, + sourceCheckpointData, + destinationCheckpointData); string filePath = Path.Combine(checkpointPath, $"{transferId}.{DataMovementConstants.JobPlanFile.FileExtension}"); using (FileStream stream = File.Create(filePath)) diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/MockResourceCheckpointData.cs b/sdk/storage/Azure.Storage.DataMovement/tests/MockResourceCheckpointData.cs new file mode 100644 index 0000000000000..865f1c76bd2a1 --- /dev/null +++ b/sdk/storage/Azure.Storage.DataMovement/tests/MockResourceCheckpointData.cs @@ -0,0 +1,36 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.IO; +using System.Text; + +namespace Azure.Storage.DataMovement.Tests +{ + public class MockResourceCheckpointData : StorageResourceCheckpointData + { + public byte[] Bytes; + + public override int Length => Bytes.Length; + + public MockResourceCheckpointData() + { + string testString = "Hello World!"; + using (MemoryStream stream = new MemoryStream()) + { + BinaryWriter writer = new BinaryWriter(stream); + + writer.Write(false); + writer.Write(42); + writer.Write(123860); + writer.Write(Encoding.UTF8.GetBytes(testString)); + + Bytes = stream.ToArray(); + } + } + + public override void Serialize(Stream stream) + { + stream.Write(Bytes, 0, Bytes.Length); + } + } +} diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/MockStorageResource.cs b/sdk/storage/Azure.Storage.DataMovement/tests/MockStorageResource.cs index fafdd03f10fad..616e984ad3efa 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/MockStorageResource.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/MockStorageResource.cs @@ -87,20 +87,12 @@ protected internal override Task ReadStreamAsyn return Task.FromResult(new StorageResourceReadStreamResult(_readStream)); } - /// - /// Gets the source checkpoint data for this resource that will be written to the checkpointer. - /// - /// A containing the checkpoint information for this resource. - protected internal override StorageResourceCheckpointData GetSourceCheckpointData() + public override StorageResourceCheckpointData GetSourceCheckpointData() { return null; } - /// - /// Gets the destination checkpoint data for this resource that will be written to the checkpointer. - /// - /// A containing the checkpoint information for this resource. - protected internal override StorageResourceCheckpointData GetDestinationCheckpointData() + public override StorageResourceCheckpointData GetDestinationCheckpointData() { return null; } diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/Resources/SampleJobPlanFile.b1.ndm b/sdk/storage/Azure.Storage.DataMovement/tests/Resources/SampleJobPlanFile.b1.ndm index 8966811c3de500711141fbfe67361133dc827c8d..d02558e0f809cca8bb8dbb76656474bb494b442f 100644 GIT binary patch delta 77 zcmc~<#yG)TGMs^dK>&!;fmjrXYoPSRi4Hni3|c_mmB)+>9;rDw`3m9rML8*oSOozu Cdl6m$ delta 18 YcmZ3+m@~ng)185VK>&!uCkE&M045~^p8x;= diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/Shared/CheckpointerTesting.cs b/sdk/storage/Azure.Storage.DataMovement/tests/Shared/CheckpointerTesting.cs index 186e48337b531..538a75d48cca9 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/Shared/CheckpointerTesting.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/Shared/CheckpointerTesting.cs @@ -178,39 +178,6 @@ internal static JobPartPlanHeader CreateDefaultJobPartHeader( atomicPartStatus: atomicPartStatus); } - internal static JobPlanHeader CreateDefaultJobHeader( - string version = DataMovementConstants.JobPlanFile.SchemaVersion, - string transferId = DefaultTransferId, - DateTimeOffset createTime = default, - JobPlanOperation operationType = DefaultJobPlanOperation, - string sourceProviderId = DefaultSourceProviderId, - string destinationProviderId = DefaultDestinationProviderId, - bool isContainer = false, - bool enumerationComplete = false, - DataTransferStatus jobStatus = default, - string parentSourcePath = DefaultSourcePath, - string parentDestinationPath = DefaultDestinationPath) - { - if (createTime == default) - { - createTime = DefaultCreateTime; - } - jobStatus ??= DefaultJobStatus; - - return new JobPlanHeader( - version, - transferId, - createTime, - operationType, - sourceProviderId, - destinationProviderId, - isContainer, - enumerationComplete, - jobStatus, - parentSourcePath, - parentDestinationPath); - } - internal static async Task AssertJobPlanHeaderAsync(JobPartPlanHeader header, Stream stream) { int headerSize = DataMovementConstants.JobPartPlanFile.JobPartHeaderSizeInBytes; diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/Shared/MemoryStorageResourceContainer.cs b/sdk/storage/Azure.Storage.DataMovement/tests/Shared/MemoryStorageResourceContainer.cs index f40571647fc9c..978e75fa39d77 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/Shared/MemoryStorageResourceContainer.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/Shared/MemoryStorageResourceContainer.cs @@ -54,6 +54,16 @@ protected internal override async IAsyncEnumerable GetStorageRe } } + public override StorageResourceCheckpointData GetDestinationCheckpointData() + { + throw new NotImplementedException(); + } + + public override StorageResourceCheckpointData GetSourceCheckpointData() + { + throw new NotImplementedException(); + } + private IEnumerable GetStorageResources(bool includeContainers) { Queue queue = new(); diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/Shared/MemoryStorageResourceItem.cs b/sdk/storage/Azure.Storage.DataMovement/tests/Shared/MemoryStorageResourceItem.cs index 7363eaba93d16..410dfc531a172 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/Shared/MemoryStorageResourceItem.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/Shared/MemoryStorageResourceItem.cs @@ -68,7 +68,7 @@ protected internal override Task GetCopyAuthorizationHeaderAs throw new NotImplementedException(); } - protected internal override StorageResourceCheckpointData GetDestinationCheckpointData() + public override StorageResourceCheckpointData GetDestinationCheckpointData() { throw new NotImplementedException(); } @@ -78,7 +78,7 @@ protected internal override Task GetPropertiesAsync(C return Task.FromResult(new StorageResourceProperties(default, default, Buffer.Length, default)); } - protected internal override StorageResourceCheckpointData GetSourceCheckpointData() + public override StorageResourceCheckpointData GetSourceCheckpointData() { throw new NotImplementedException(); } From dec12b62076bb2005817edbe11dc800a86dc5437 Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Tue, 17 Oct 2023 15:23:27 -0700 Subject: [PATCH 02/10] Add checkpoint data to properties --- .../src/CheckpointerExtensions.cs | 2 ++ .../src/DataTransferProperties.cs | 10 ++++++++++ .../tests/GetTransfersTests.cs | 8 ++++++-- .../tests/JobPlanHeaderTests.cs | 10 ++++------ .../tests/LocalTransferCheckpointerFactory.cs | 5 ++--- .../tests/MockResourceCheckpointData.cs | 4 ++++ 6 files changed, 28 insertions(+), 11 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement/src/CheckpointerExtensions.cs b/sdk/storage/Azure.Storage.DataMovement/src/CheckpointerExtensions.cs index c5ffedb841ef5..965e9620e79a7 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/CheckpointerExtensions.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/CheckpointerExtensions.cs @@ -70,9 +70,11 @@ internal static async Task GetDataTransferPropertiesAsyn SourceTypeId = sourceTypeId, SourceUri = new Uri(header.ParentSourcePath), SourceProviderId = header.SourceProviderId, + SourceCheckpointData = header.SourceCheckpointData, DestinationTypeId = destinationTypeId, DestinationUri = new Uri(header.ParentDestinationPath), DestinationProviderId = header.DestinationProviderId, + DestinationCheckpointData = header.DestinationCheckpointData, IsContainer = header.IsContainer, }; } diff --git a/sdk/storage/Azure.Storage.DataMovement/src/DataTransferProperties.cs b/sdk/storage/Azure.Storage.DataMovement/src/DataTransferProperties.cs index f3b1605da236a..6ddf75526257f 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/DataTransferProperties.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/DataTransferProperties.cs @@ -36,6 +36,11 @@ public class DataTransferProperties /// public virtual string SourceProviderId { get; internal set; } + /// + /// The additional checkpoint data specific to the source resource. + /// + public virtual byte[] SourceCheckpointData { get; internal set; } + /// /// Contains the type id for the destination resource to use during rehydration. /// Will be null if is true. @@ -52,6 +57,11 @@ public class DataTransferProperties /// public virtual string DestinationProviderId { get; internal set; } + /// + /// The additional checkpoint data specific to the destination resource. + /// + public virtual byte[] DestinationCheckpointData { get; internal set; } + /// /// Defines whether or not this was a container transfer, in order to rehydrate the StorageResource. /// diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/GetTransfersTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/GetTransfersTests.cs index eccbc82438ff1..a1f85c4887a77 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/GetTransfersTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/GetTransfersTests.cs @@ -5,7 +5,6 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; -using Azure.Storage.DataMovement.JobPlan; using NUnit.Framework; namespace Azure.Storage.DataMovement.Tests @@ -323,7 +322,9 @@ private void AddTransferFromDataTransferProperties( parentDestinationPath: properties.DestinationUri.AbsoluteUri, sourceProviderId: properties.SourceProviderId, destinationProviderId: properties.DestinationProviderId, - isContainer: properties.IsContainer); + isContainer: properties.IsContainer, + sourceCheckpointData: MockResourceCheckpointData.DefaultInstance, + destinationCheckpointData: MockResourceCheckpointData.DefaultInstance); if (properties.IsContainer) { @@ -381,6 +382,9 @@ private void AssertTransferProperties(DataTransferProperties expected, DataTrans Assert.AreEqual(expected.DestinationTypeId, actual.DestinationTypeId); Assert.AreEqual(expected.DestinationUri.AbsoluteUri.TrimEnd('\\', '/'), actual.DestinationUri.AbsoluteUri.TrimEnd('\\', '/')); Assert.AreEqual(expected.IsContainer, actual.IsContainer); + + CollectionAssert.AreEqual(MockResourceCheckpointData.DefaultInstance.Bytes, actual.SourceCheckpointData); + CollectionAssert.AreEqual(MockResourceCheckpointData.DefaultInstance.Bytes, actual.DestinationCheckpointData); } private string GetTypeIdForProvider(string providerId) diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/JobPlanHeaderTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/JobPlanHeaderTests.cs index 758d18b13e9fe..208f2bf54c103 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/JobPlanHeaderTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/JobPlanHeaderTests.cs @@ -10,8 +10,6 @@ namespace Azure.Storage.DataMovement.Tests { public class JobPlanHeaderTests : DataMovementTestBase { - private MockResourceCheckpointData _checkpointData = new(); - public JobPlanHeaderTests(bool async) : base(async, default) { } @@ -30,8 +28,8 @@ private JobPlanHeader CreateJobPlanHeader() DefaultJobStatus, DefaultSourcePath, DefaultDestinationPath, - _checkpointData, - _checkpointData); + MockResourceCheckpointData.DefaultInstance, + MockResourceCheckpointData.DefaultInstance); } [Test] @@ -107,8 +105,8 @@ private void DeserializeAndVerify(Stream stream, string version) Assert.AreEqual(DefaultJobStatus, deserialized.JobStatus); Assert.AreEqual(DefaultSourcePath, deserialized.ParentSourcePath); Assert.AreEqual(DefaultDestinationPath, deserialized.ParentDestinationPath); - CollectionAssert.AreEqual(_checkpointData.Bytes, deserialized.SourceCheckpointData); - CollectionAssert.AreEqual(_checkpointData.Bytes, deserialized.DestinationCheckpointData); + CollectionAssert.AreEqual(MockResourceCheckpointData.DefaultInstance.Bytes, deserialized.SourceCheckpointData); + CollectionAssert.AreEqual(MockResourceCheckpointData.DefaultInstance.Bytes, deserialized.DestinationCheckpointData); } } } diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/LocalTransferCheckpointerFactory.cs b/sdk/storage/Azure.Storage.DataMovement/tests/LocalTransferCheckpointerFactory.cs index b45fe3f43ad6c..608a6f024bd1b 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/LocalTransferCheckpointerFactory.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/LocalTransferCheckpointerFactory.cs @@ -47,7 +47,6 @@ internal static readonly DateTimeOffset _testStartTime internal const JobPartPlanRehydratePriorityType _testRehydratePriorityType = JobPartPlanRehydratePriorityType.None; internal static readonly DataTransferStatus _testJobStatus = new DataTransferStatusInternal(DataTransferState.Queued, false, false); internal static readonly DataTransferStatus _testPartStatus = new DataTransferStatusInternal(DataTransferState.Queued, false, false); - internal static readonly MockResourceCheckpointData _mockCheckpointData = new MockResourceCheckpointData(); private string _checkpointerPath; @@ -159,8 +158,8 @@ internal void CreateStubJobPlanFile( StorageResourceCheckpointData destinationCheckpointData = default) { status ??= new DataTransferStatus(); - sourceCheckpointData ??= _mockCheckpointData; - destinationCheckpointData ??= _mockCheckpointData; + sourceCheckpointData ??= MockResourceCheckpointData.DefaultInstance; + destinationCheckpointData ??= MockResourceCheckpointData.DefaultInstance; JobPlanHeader header = new JobPlanHeader( DataMovementConstants.JobPlanFile.SchemaVersion, diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/MockResourceCheckpointData.cs b/sdk/storage/Azure.Storage.DataMovement/tests/MockResourceCheckpointData.cs index 865f1c76bd2a1..855d7411db734 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/MockResourceCheckpointData.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/MockResourceCheckpointData.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. +using System; using System.IO; using System.Text; @@ -12,6 +13,9 @@ public class MockResourceCheckpointData : StorageResourceCheckpointData public override int Length => Bytes.Length; + public static MockResourceCheckpointData DefaultInstance => s_instance.Value; + private static Lazy s_instance = new(() => new MockResourceCheckpointData()); + public MockResourceCheckpointData() { string testString = "Hello World!"; From 49380da5b6504566372d5ed0010be3e5f276a26e Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Wed, 18 Oct 2023 17:59:30 -0700 Subject: [PATCH 03/10] Consume checkpoint data in provider --- .../src/AppendBlobStorageResource.cs | 8 +- .../src/BlobDestinationCheckpointData.cs | 23 +- .../src/BlobsStorageResourceProvider.cs | 197 +++++------- .../src/BlockBlobStorageResource.cs | 8 +- .../src/DataMovementBlobsExtensions.cs | 187 +++--------- .../src/PageBlobStorageResource.cs | 8 +- .../tests/RehydrateBlobResourceTests.cs | 282 +++++------------- 7 files changed, 221 insertions(+), 492 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/AppendBlobStorageResource.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/AppendBlobStorageResource.cs index 8e6ccd9d83f74..56380612ac6e6 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/AppendBlobStorageResource.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/AppendBlobStorageResource.cs @@ -292,10 +292,10 @@ public override StorageResourceCheckpointData GetDestinationCheckpointData() { return new BlobDestinationCheckpointData( BlobType.Append, - _options.HttpHeaders, - _options.AccessTier, - _options.Metadata, - _options.Tags, + _options?.HttpHeaders, + _options?.AccessTier, + _options?.Metadata, + _options?.Tags, default); // TODO: Update when we support encryption scopes } diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobDestinationCheckpointData.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobDestinationCheckpointData.cs index 738667ae93e97..d4cd967f0c36e 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobDestinationCheckpointData.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobDestinationCheckpointData.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. +using System; using System.IO; using System.Text; using Azure.Core; @@ -69,18 +70,18 @@ public BlobDestinationCheckpointData( Version = DataMovementBlobConstants.DestinationJobPartHeader.SchemaVersion; BlobType = blobType; ContentHeaders = contentHeaders; - _contentTypeBytes = ContentHeaders?.ContentType != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentType) : new byte[0]; - _contentEncodingBytes = ContentHeaders?.ContentEncoding != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentEncoding) : new byte[0]; - _contentLanguageBytes = ContentHeaders?.ContentLanguage != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentLanguage) : new byte[0]; - _contentDispositionBytes = ContentHeaders?.ContentDisposition != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentDisposition) : new byte[0]; - _cacheControlBytes = ContentHeaders?.CacheControl != default ? Encoding.UTF8.GetBytes(ContentHeaders.CacheControl) : new byte[0]; + _contentTypeBytes = ContentHeaders?.ContentType != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentType) : Array.Empty(); + _contentEncodingBytes = ContentHeaders?.ContentEncoding != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentEncoding) : Array.Empty(); + _contentLanguageBytes = ContentHeaders?.ContentLanguage != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentLanguage) : Array.Empty(); + _contentDispositionBytes = ContentHeaders?.ContentDisposition != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentDisposition) : Array.Empty(); + _cacheControlBytes = ContentHeaders?.CacheControl != default ? Encoding.UTF8.GetBytes(ContentHeaders.CacheControl) : Array.Empty(); AccessTier = accessTier; Metadata = metadata; - _metadataBytes = Metadata != default ? Encoding.UTF8.GetBytes(Metadata.DictionaryToString()) : new byte[0]; + _metadataBytes = Metadata != default ? Encoding.UTF8.GetBytes(Metadata.DictionaryToString()) : Array.Empty(); Tags = blobTags; - _tagsBytes = Tags != default ? Encoding.UTF8.GetBytes(Tags.DictionaryToString()) : new byte[0]; + _tagsBytes = Tags != default ? Encoding.UTF8.GetBytes(Tags.DictionaryToString()) : Array.Empty(); CpkScope = cpkScope; - _cpkScopeBytes = CpkScope != default ? Encoding.UTF8.GetBytes(CpkScope) : new byte[0]; + _cpkScopeBytes = CpkScope != default ? Encoding.UTF8.GetBytes(CpkScope) : Array.Empty(); } public override void Serialize(Stream stream) @@ -168,7 +169,11 @@ internal static BlobDestinationCheckpointData Deserialize(Stream stream) // AccessTier JobPlanAccessTier jobPlanAccessTier = (JobPlanAccessTier)reader.ReadByte(); - AccessTier accessTier = new AccessTier(jobPlanAccessTier.ToString()); + AccessTier? accessTier = default; + if (!jobPlanAccessTier.Equals(JobPlanAccessTier.None)) + { + accessTier = new AccessTier(jobPlanAccessTier.ToString()); + } // Metadata offset/length int metadataOffset = reader.ReadInt32(); diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobsStorageResourceProvider.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobsStorageResourceProvider.cs index 35796dffd72ac..7344b3f7d4bb6 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobsStorageResourceProvider.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobsStorageResourceProvider.cs @@ -6,6 +6,7 @@ using System.Threading.Tasks; using Azure.Core; using Azure.Storage.Blobs; +using Azure.Storage.Blobs.Models; using Azure.Storage.Blobs.Specialized; namespace Azure.Storage.DataMovement.Blobs @@ -222,19 +223,20 @@ public BlobsStorageResourceProvider(GetAzureSasCredential getAzureSasCredentialA #region Abstract Class Implementation /// - protected override async Task FromSourceAsync(DataTransferProperties properties, CancellationToken cancellationToken) - => await FromTransferPropertiesAsync(properties, getSource: true, cancellationToken).ConfigureAwait(false); + protected override Task FromSourceAsync(DataTransferProperties properties, CancellationToken cancellationToken) + => Task.FromResult(FromTransferProperties(properties, getSource: true, cancellationToken)); /// - protected override async Task FromDestinationAsync(DataTransferProperties properties, CancellationToken cancellationToken) - => await FromTransferPropertiesAsync(properties, getSource: false, cancellationToken).ConfigureAwait(false); + protected override Task FromDestinationAsync(DataTransferProperties properties, CancellationToken cancellationToken) + => Task.FromResult(FromTransferProperties(properties, getSource: false, cancellationToken)); - private async Task FromTransferPropertiesAsync( + private StorageResource FromTransferProperties( DataTransferProperties properties, bool getSource, CancellationToken cancellationToken) { - ResourceType type = GetType(getSource ? properties.SourceTypeId : properties.DestinationTypeId, properties.IsContainer); + BlobDestinationCheckpointData checkpointData = properties.GetCheckpointData(); + ResourceType type = GetType(checkpointData.BlobType, properties.IsContainer); Uri uri = getSource ? properties.SourceUri : properties.DestinationUri; IBlobResourceRehydrator rehydrator = type switch { @@ -246,29 +248,29 @@ private async Task FromTransferPropertiesAsync( }; return _credentialType switch { - CredentialType.None => await rehydrator.RehydrateAsync( + CredentialType.None => rehydrator.Rehydrate( properties, + checkpointData, getSource, - cancellationToken) - .ConfigureAwait(false), - CredentialType.SharedKey => await rehydrator.RehydrateAsync( + cancellationToken), + CredentialType.SharedKey => rehydrator.Rehydrate( properties, + checkpointData, getSource, _getStorageSharedKeyCredential(uri, getSource), - cancellationToken) - .ConfigureAwait(false), - CredentialType.Token => await rehydrator.RehydrateAsync( + cancellationToken), + CredentialType.Token => rehydrator.Rehydrate( properties, + checkpointData, getSource, _getTokenCredential(uri, getSource), - cancellationToken) - .ConfigureAwait(false), - CredentialType.Sas => await rehydrator.RehydrateAsync( + cancellationToken), + CredentialType.Sas => rehydrator.Rehydrate( properties, + checkpointData, getSource, _getAzureSasCredential(uri, getSource), - cancellationToken) - .ConfigureAwait(false), + cancellationToken), _ => throw BadCredentialTypeException(_credentialType), }; } @@ -476,22 +478,26 @@ public StorageResource FromClient( /// private interface IBlobResourceRehydrator { - Task RehydrateAsync( + StorageResource Rehydrate( DataTransferProperties properties, + BlobDestinationCheckpointData checkpointData, bool isSource, CancellationToken cancellationToken); - Task RehydrateAsync( + StorageResource Rehydrate( DataTransferProperties properties, + BlobDestinationCheckpointData checkpointData, bool isSource, StorageSharedKeyCredential credential, CancellationToken cancellationToken); - Task RehydrateAsync( + StorageResource Rehydrate( DataTransferProperties properties, + BlobDestinationCheckpointData checkpointData, bool isSource, TokenCredential credential, CancellationToken cancellationToken); - Task RehydrateAsync( + StorageResource Rehydrate( DataTransferProperties properties, + BlobDestinationCheckpointData checkpointData, bool isSource, AzureSasCredential credential, CancellationToken cancellationToken); @@ -499,21 +505,6 @@ Task RehydrateAsync( private class BlobContainerResourceRehydrator : IBlobResourceRehydrator { - private async Task GetOptionsAsync( - DataTransferProperties transferProperties, - bool isSource, - CancellationToken cancellationToken) - { - Argument.AssertNotNull(transferProperties, nameof(transferProperties)); - TransferCheckpointer checkpointer = transferProperties.Checkpointer.GetCheckpointer(); - - return await checkpointer.GetBlobContainerOptionsAsync( - GetPrefix(transferProperties, isSource), - transferProperties.TransferId, - isSource, - cancellationToken).ConfigureAwait(false); - } - private Uri GetUri(DataTransferProperties properties, bool getSource) => getSource ? properties.SourceUri : properties.DestinationUri; @@ -526,220 +517,194 @@ private Uri GetContainerUri(DataTransferProperties properties, bool getSource) BlobName = "" }.ToUri(); - public async Task RehydrateAsync( + public StorageResource Rehydrate( DataTransferProperties properties, + BlobDestinationCheckpointData checkpointData, bool isSource, CancellationToken cancellationToken) => new BlobStorageResourceContainer( new BlobContainerClient(GetContainerUri(properties, isSource)), - await GetOptionsAsync(properties, isSource, cancellationToken).ConfigureAwait(false)); + !isSource ? checkpointData.GetBlobContainerOptions(GetPrefix(properties, isSource)) : default); - public async Task RehydrateAsync( + public StorageResource Rehydrate( DataTransferProperties properties, + BlobDestinationCheckpointData checkpointData, bool isSource, StorageSharedKeyCredential credential, CancellationToken cancellationToken) => new BlobStorageResourceContainer( new BlobContainerClient(GetContainerUri(properties, isSource), credential), - await GetOptionsAsync(properties, isSource, cancellationToken).ConfigureAwait(false)); + !isSource ? checkpointData.GetBlobContainerOptions(GetPrefix(properties, isSource)) : default); - public async Task RehydrateAsync( + public StorageResource Rehydrate( DataTransferProperties properties, + BlobDestinationCheckpointData checkpointData, bool isSource, TokenCredential credential, CancellationToken cancellationToken) => new BlobStorageResourceContainer( new BlobContainerClient(GetContainerUri(properties, isSource), credential), - await GetOptionsAsync(properties, isSource, cancellationToken).ConfigureAwait(false)); + !isSource ? checkpointData.GetBlobContainerOptions(GetPrefix(properties, isSource)) : default); - public async Task RehydrateAsync( + public StorageResource Rehydrate( DataTransferProperties properties, + BlobDestinationCheckpointData checkpointData, bool isSource, AzureSasCredential credential, CancellationToken cancellationToken) => new BlobStorageResourceContainer( new BlobContainerClient(GetContainerUri(properties, isSource), credential), - await GetOptionsAsync(properties, isSource, cancellationToken).ConfigureAwait(false)); + !isSource ? checkpointData.GetBlobContainerOptions(GetPrefix(properties, isSource)) : default); } private class BlockBlobResourceRehydrator : IBlobResourceRehydrator { - private async Task GetOptionsAsync( - DataTransferProperties transferProperties, - bool isSource, - CancellationToken cancellationToken) - { - Argument.AssertNotNull(transferProperties, nameof(transferProperties)); - TransferCheckpointer checkpointer = transferProperties.Checkpointer.GetCheckpointer(); - - return await checkpointer.GetBlockBlobResourceOptionsAsync( - transferProperties.TransferId, - isSource, - cancellationToken).ConfigureAwait(false); - } - private Uri GetUri(DataTransferProperties properties, bool getSource) => getSource ? properties.SourceUri : properties.DestinationUri; - public async Task RehydrateAsync( + public StorageResource Rehydrate( DataTransferProperties properties, + BlobDestinationCheckpointData checkpointData, bool isSource, CancellationToken cancellationToken) => new BlockBlobStorageResource( new BlockBlobClient(GetUri(properties, isSource)), - await GetOptionsAsync(properties, isSource, cancellationToken).ConfigureAwait(false)); + !isSource ? checkpointData.GetBlockBlobResourceOptions() : default); - public async Task RehydrateAsync( + public StorageResource Rehydrate( DataTransferProperties properties, + BlobDestinationCheckpointData checkpointData, bool isSource, StorageSharedKeyCredential credential, CancellationToken cancellationToken) => new BlockBlobStorageResource( new BlockBlobClient(GetUri(properties, isSource), credential), - await GetOptionsAsync(properties, isSource, cancellationToken).ConfigureAwait(false)); + !isSource ? checkpointData.GetBlockBlobResourceOptions() : default); - public async Task RehydrateAsync( + public StorageResource Rehydrate( DataTransferProperties properties, + BlobDestinationCheckpointData checkpointData, bool isSource, TokenCredential credential, CancellationToken cancellationToken) => new BlockBlobStorageResource( new BlockBlobClient(GetUri(properties, isSource), credential), - await GetOptionsAsync(properties, isSource, cancellationToken).ConfigureAwait(false)); + !isSource ? checkpointData.GetBlockBlobResourceOptions() : default); - public async Task RehydrateAsync( + public StorageResource Rehydrate( DataTransferProperties properties, + BlobDestinationCheckpointData checkpointData, bool isSource, AzureSasCredential credential, CancellationToken cancellationToken) => new BlockBlobStorageResource( new BlockBlobClient(GetUri(properties, isSource), credential), - await GetOptionsAsync(properties, isSource, cancellationToken).ConfigureAwait(false)); + !isSource ? checkpointData.GetBlockBlobResourceOptions() : default); } private class PageBlobResourceRehydrator : IBlobResourceRehydrator { - private async Task GetOptionsAsync( - DataTransferProperties transferProperties, - bool isSource, - CancellationToken cancellationToken) - { - Argument.AssertNotNull(transferProperties, nameof(transferProperties)); - TransferCheckpointer checkpointer = transferProperties.Checkpointer.GetCheckpointer(); - - return await checkpointer.GetPageBlobResourceOptionsAsync( - transferProperties.TransferId, - isSource, - cancellationToken).ConfigureAwait(false); - } - private Uri GetUri(DataTransferProperties properties, bool getSource) => getSource ? properties.SourceUri : properties.DestinationUri; - public async Task RehydrateAsync( + public StorageResource Rehydrate( DataTransferProperties properties, + BlobDestinationCheckpointData checkpointData, bool isSource, CancellationToken cancellationToken) => new PageBlobStorageResource( new PageBlobClient(GetUri(properties, isSource)), - await GetOptionsAsync(properties, isSource, cancellationToken).ConfigureAwait(false)); + !isSource ? checkpointData.GetPageBlobResourceOptions() : default); - public async Task RehydrateAsync( + public StorageResource Rehydrate( DataTransferProperties properties, + BlobDestinationCheckpointData checkpointData, bool isSource, StorageSharedKeyCredential credential, CancellationToken cancellationToken) => new PageBlobStorageResource( new PageBlobClient(GetUri(properties, isSource), credential), - await GetOptionsAsync(properties, isSource, cancellationToken).ConfigureAwait(false)); + !isSource ? checkpointData.GetPageBlobResourceOptions() : default); - public async Task RehydrateAsync( + public StorageResource Rehydrate( DataTransferProperties properties, + BlobDestinationCheckpointData checkpointData, bool isSource, TokenCredential credential, CancellationToken cancellationToken) => new PageBlobStorageResource( new PageBlobClient(GetUri(properties, isSource), credential), - await GetOptionsAsync(properties, isSource, cancellationToken).ConfigureAwait(false)); + !isSource ? checkpointData.GetPageBlobResourceOptions() : default); - public async Task RehydrateAsync( + public StorageResource Rehydrate( DataTransferProperties properties, + BlobDestinationCheckpointData checkpointData, bool isSource, AzureSasCredential credential, CancellationToken cancellationToken) => new PageBlobStorageResource( new PageBlobClient(GetUri(properties, isSource), credential), - await GetOptionsAsync(properties, isSource, cancellationToken).ConfigureAwait(false)); + !isSource ? checkpointData.GetPageBlobResourceOptions() : default); } private class AppendBlobResourceRehydrator : IBlobResourceRehydrator { - private async Task GetOptionsAsync( - DataTransferProperties transferProperties, - bool isSource, - CancellationToken cancellationToken) - { - Argument.AssertNotNull(transferProperties, nameof(transferProperties)); - TransferCheckpointer checkpointer = transferProperties.Checkpointer.GetCheckpointer(); - - return new AppendBlobStorageResourceOptions(await checkpointer.GetBlobResourceOptionsAsync( - transferProperties.TransferId, - isSource, - cancellationToken).ConfigureAwait(false)); - } - private Uri GetUri(DataTransferProperties properties, bool getSource) => getSource ? properties.SourceUri : properties.DestinationUri; - public async Task RehydrateAsync( + public StorageResource Rehydrate( DataTransferProperties properties, + BlobDestinationCheckpointData checkpointData, bool isSource, CancellationToken cancellationToken) => new AppendBlobStorageResource( new AppendBlobClient(GetUri(properties, isSource)), - await GetOptionsAsync(properties, isSource, cancellationToken).ConfigureAwait(false)); + !isSource ? checkpointData.GetAppendBlobResourceOptions() : default); - public async Task RehydrateAsync( + public StorageResource Rehydrate( DataTransferProperties properties, + BlobDestinationCheckpointData checkpointData, bool isSource, StorageSharedKeyCredential credential, CancellationToken cancellationToken) => new AppendBlobStorageResource( new AppendBlobClient(GetUri(properties, isSource), credential), - await GetOptionsAsync(properties, isSource, cancellationToken).ConfigureAwait(false)); + !isSource ? checkpointData.GetAppendBlobResourceOptions() : default); - public async Task RehydrateAsync( + public StorageResource Rehydrate( DataTransferProperties properties, + BlobDestinationCheckpointData checkpointData, bool isSource, TokenCredential credential, CancellationToken cancellationToken) => new AppendBlobStorageResource( new AppendBlobClient(GetUri(properties, isSource), credential), - await GetOptionsAsync(properties, isSource, cancellationToken).ConfigureAwait(false)); + !isSource ? checkpointData.GetAppendBlobResourceOptions() : default); - public async Task RehydrateAsync( + public StorageResource Rehydrate( DataTransferProperties properties, + BlobDestinationCheckpointData checkpointData, bool isSource, AzureSasCredential credential, CancellationToken cancellationToken) => new AppendBlobStorageResource( new AppendBlobClient(GetUri(properties, isSource), credential), - await GetOptionsAsync(properties, isSource, cancellationToken).ConfigureAwait(false)); + !isSource ? checkpointData.GetAppendBlobResourceOptions() : default); } #endregion - private static ResourceType GetType(string typeId, bool isContainer) + private static ResourceType GetType(BlobType blobType, bool isContainer) { if (isContainer) { return ResourceType.BlobContainer; } - return typeId switch + return blobType switch { - "BlockBlob" => ResourceType.BlockBlob, - "PageBlob" => ResourceType.PageBlob, - "AppendBlob" => ResourceType.AppendBlob, + BlobType.Block => ResourceType.BlockBlob, + BlobType.Page => ResourceType.PageBlob, + BlobType.Append => ResourceType.AppendBlob, _ => ResourceType.Unknown }; } diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlockBlobStorageResource.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlockBlobStorageResource.cs index 2cc8e99e519ff..ec0c033b20dfe 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlockBlobStorageResource.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlockBlobStorageResource.cs @@ -334,10 +334,10 @@ public override StorageResourceCheckpointData GetDestinationCheckpointData() { return new BlobDestinationCheckpointData( BlobType.Block, - _options.HttpHeaders, - _options.AccessTier, - _options.Metadata, - _options.Tags, + _options?.HttpHeaders, + _options?.AccessTier, + _options?.Metadata, + _options?.Tags, default); // TODO: Update when we support encryption scopes } diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/DataMovementBlobsExtensions.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/DataMovementBlobsExtensions.cs index eba8b26e9eb94..543e4088f7b4b 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/DataMovementBlobsExtensions.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/DataMovementBlobsExtensions.cs @@ -5,6 +5,7 @@ using System.Threading; using Azure.Storage.Blobs.Models; using Azure.Storage.DataMovement.JobPlan; +using System.IO; namespace Azure.Storage.DataMovement.Blobs { @@ -406,179 +407,59 @@ internal static PageBlobUploadPagesFromUriOptions ToUploadPagesFromUriOptions( }; } - internal static async Task GetBlockBlobResourceOptionsAsync( - this TransferCheckpointer checkpointer, - string transferId, - bool isSource, - CancellationToken cancellationToken) + internal static BlobDestinationCheckpointData GetCheckpointData(this DataTransferProperties properties) { - BlobStorageResourceOptions baseOptions = await checkpointer.GetBlobResourceOptionsAsync( - transferId, - isSource, - cancellationToken).ConfigureAwait(false); - BlockBlobStorageResourceOptions options = new(baseOptions); - - // Get AccessTier - if (!isSource) + using (MemoryStream stream = new(properties.DestinationCheckpointData)) { - int startIndex = DataMovementConstants.JobPartPlanFile.DstBlobBlockBlobTierIndex; - JobPartPlanBlockBlobTier accessTier = (JobPartPlanBlockBlobTier)await checkpointer.GetByteValue( - transferId, - startIndex, - cancellationToken: cancellationToken).ConfigureAwait(false); - options.AccessTier = accessTier.ToAccessTier(); + return BlobDestinationCheckpointData.Deserialize(stream); } - return options; } - internal static async Task GetPageBlobResourceOptionsAsync( - this TransferCheckpointer checkpointer, - string transferId, - bool isSource, - CancellationToken cancellationToken) + internal static BlobStorageResourceOptions GetBlobResourceOptions( + this BlobDestinationCheckpointData checkpointData) { - BlobStorageResourceOptions baseOptions = await checkpointer.GetBlobResourceOptionsAsync( - transferId, - isSource, - cancellationToken).ConfigureAwait(false); - PageBlobStorageResourceOptions options = new(baseOptions); - - if (!isSource) + return new() { - // Get AccessTier - int startIndex = DataMovementConstants.JobPartPlanFile.DstBlobPageBlobTierIndex; - JobPartPlanPageBlobTier accessTier = (JobPartPlanPageBlobTier)await checkpointer.GetByteValue( - transferId, - startIndex, - cancellationToken: cancellationToken).ConfigureAwait(false); - options.AccessTier = accessTier.ToAccessTier(); - } - return options; + Metadata = checkpointData.Metadata, + Tags = checkpointData.Tags, + HttpHeaders = checkpointData.ContentHeaders, + AccessTier = checkpointData.AccessTier, + // LegalHold = checkpointData.LegalHold + }; } - internal static async Task GetBlobResourceOptionsAsync( - this TransferCheckpointer checkpointer, - string transferId, - bool isSource, - CancellationToken cancellationToken) + internal static BlockBlobStorageResourceOptions GetBlockBlobResourceOptions( + this BlobDestinationCheckpointData checkpointData) { - BlobStorageResourceOptions options = new BlobStorageResourceOptions(); - - // TODO: parse out the rest of the parameters from the Job Part Plan File - - if (!isSource) - { - // Get Metadata - int metadataIndex = DataMovementConstants.JobPartPlanFile.DstBlobMetadataLengthIndex; - int metadataReadLength = DataMovementConstants.JobPartPlanFile.DstBlobTagsLengthIndex - metadataIndex; - string metadata = await checkpointer.GetHeaderUShortValue( - transferId, - metadataIndex, - metadataReadLength, - DataMovementConstants.JobPartPlanFile.MetadataStrNumBytes, - cancellationToken).ConfigureAwait(false); - options.Metadata = metadata.ToDictionary(nameof(metadata)); - - // Get blob tags - int tagsIndex = DataMovementConstants.JobPartPlanFile.DstBlobTagsLengthIndex; - int tagsReadLength = DataMovementConstants.JobPartPlanFile.DstBlobIsSourceEncrypted - tagsIndex; - string tags = await checkpointer.GetHeaderLongValue( - transferId, - tagsIndex, - tagsReadLength, - DataMovementConstants.JobPartPlanFile.BlobTagsStrNumBytes, - cancellationToken).ConfigureAwait(false); - options.Tags = tags.ToDictionary(nameof(tags)); - } - return options; + BlobStorageResourceOptions baseOptions = checkpointData.GetBlobResourceOptions(); + return new BlockBlobStorageResourceOptions(baseOptions); } - internal static async Task GetBlobContainerOptionsAsync( - this TransferCheckpointer checkpointer, - string directoryPrefix, - string transferId, - bool isSource, - CancellationToken cancellationToken) + internal static PageBlobStorageResourceOptions GetPageBlobResourceOptions( + this BlobDestinationCheckpointData checkpointData) { - BlobStorageResourceOptions baseOptions = await checkpointer.GetBlobResourceOptionsAsync( - transferId, - isSource, - cancellationToken).ConfigureAwait(false); - BlobStorageResourceContainerOptions options = new() - { - BlobDirectoryPrefix = directoryPrefix, - BlobOptions = baseOptions, - }; - - return options; + BlobStorageResourceOptions baseOptions = checkpointData.GetBlobResourceOptions(); + return new PageBlobStorageResourceOptions(baseOptions); } - private static AccessTier ToAccessTier(this JobPartPlanBlockBlobTier tier) + internal static AppendBlobStorageResourceOptions GetAppendBlobResourceOptions( + this BlobDestinationCheckpointData checkpointData) { - if (JobPartPlanBlockBlobTier.Archive == tier) - { - return AccessTier.Archive; - } - else if (JobPartPlanBlockBlobTier.Cool == tier) - { - return AccessTier.Cool; - } - else if (JobPartPlanBlockBlobTier.Cold == tier) - { - return AccessTier.Cold; - } - else // including JobPartPlanBlockBlobTier.Hot == tier - { - return AccessTier.Hot; - } + BlobStorageResourceOptions baseOptions = checkpointData.GetBlobResourceOptions(); + return new AppendBlobStorageResourceOptions(baseOptions); } - private static AccessTier ToAccessTier(this JobPartPlanPageBlobTier tier) + internal static BlobStorageResourceContainerOptions GetBlobContainerOptions( + this BlobDestinationCheckpointData checkpointData, + string directoryPrefix) { - if (JobPartPlanPageBlobTier.P4 == tier) - { - return AccessTier.P4; - } - else if (JobPartPlanPageBlobTier.P6 == tier) - { - return AccessTier.P6; - } - else if (JobPartPlanPageBlobTier.P10 == tier) - { - return AccessTier.P10; - } - else if (JobPartPlanPageBlobTier.P15 == tier) - { - return AccessTier.P15; - } - else if (JobPartPlanPageBlobTier.P20 == tier) - { - return AccessTier.P20; - } - else if (JobPartPlanPageBlobTier.P30 == tier) + BlobStorageResourceOptions baseOptions = checkpointData.GetBlobResourceOptions(); + return new BlobStorageResourceContainerOptions() { - return AccessTier.P30; - } - else if (JobPartPlanPageBlobTier.P40 == tier) - { - return AccessTier.P40; - } - else if (JobPartPlanPageBlobTier.P50 == tier) - { - return AccessTier.P50; - } - else if (JobPartPlanPageBlobTier.P60 == tier) - { - return AccessTier.P60; - } - else if (JobPartPlanPageBlobTier.P70 == tier) - { - return AccessTier.P70; - } - else // including JobPartPlanPageBlobTier.P80 == tier - { - return AccessTier.P80; - } + BlobType = checkpointData.BlobType, + BlobDirectoryPrefix = directoryPrefix, + BlobOptions = baseOptions, + }; } } } diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/PageBlobStorageResource.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/PageBlobStorageResource.cs index f56f7d170ed73..d3413f8074fb9 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/PageBlobStorageResource.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/PageBlobStorageResource.cs @@ -296,10 +296,10 @@ public override StorageResourceCheckpointData GetDestinationCheckpointData() { return new BlobDestinationCheckpointData( BlobType.Page, - _options.HttpHeaders, - _options.AccessTier, - _options.Metadata, - _options.Tags, + _options?.HttpHeaders, + _options?.AccessTier, + _options?.Metadata, + _options?.Tags, default); // TODO: Update when we support encryption scopes } diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/RehydrateBlobResourceTests.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/RehydrateBlobResourceTests.cs index e116a7b5eaf4b..1bd639c6dd6f6 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/RehydrateBlobResourceTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/RehydrateBlobResourceTests.cs @@ -8,7 +8,6 @@ using System.Threading.Tasks; using Azure.Storage.Blobs.Models; using Azure.Storage.Test; -using DMBlobs::Azure.Storage.DataMovement.JobPlan; using DMBlobs::Azure.Storage.DataMovement.Blobs; using Moq; using NUnit.Framework; @@ -51,6 +50,41 @@ private static string ToProviderId(StorageResourceType type) }; } + private static BlobDestinationCheckpointData GetPopulatedCheckpointData( + BlobType blobType, + AccessTier? accessTier = default) + { + BlobHttpHeaders headers = new() + { + ContentType = "text/plain", + ContentEncoding = "gzip", + ContentLanguage = "en-US", + ContentDisposition = "inline", + CacheControl = "no-cache", + }; + return new BlobDestinationCheckpointData( + blobType, + headers, + accessTier, + DataProvider.BuildMetadata(), + DataProvider.BuildTags(), + "encryption-scope"); + } + + private static BlobDestinationCheckpointData GetDefaultCheckpointData(BlobType blobType) + { + return new BlobDestinationCheckpointData(blobType, default, default, default, default, default); + } + + private static byte[] GetBytes(StorageResourceCheckpointData checkpointData) + { + using (MemoryStream stream = new MemoryStream()) + { + checkpointData.Serialize(stream); + return stream.ToArray(); + } + } + private static Mock GetProperties( string checkpointerPath, string transferId, @@ -60,7 +94,8 @@ private static Mock GetProperties( string destinationResourceId, string sourceProviderId, string destinationProviderId, - bool isContainer) + bool isContainer, + BlobDestinationCheckpointData destinationCheckpointData) { var mock = new Mock(MockBehavior.Strict); mock.Setup(p => p.TransferId).Returns(transferId); @@ -71,95 +106,17 @@ private static Mock GetProperties( mock.Setup(p => p.DestinationTypeId).Returns(destinationResourceId); mock.Setup(p => p.SourceProviderId).Returns(sourceProviderId); mock.Setup(p => p.DestinationProviderId).Returns(destinationProviderId); + mock.Setup(p => p.SourceCheckpointData).Returns(Array.Empty()); + mock.Setup(p => p.DestinationCheckpointData).Returns(GetBytes(destinationCheckpointData)); mock.Setup(p => p.IsContainer).Returns(isContainer); return mock; } - private JobPlanOperation GetPlanOperation( - StorageResourceType sourceType, - StorageResourceType destinationType) - { - if (sourceType == StorageResourceType.Local) - { - return JobPlanOperation.Upload; - } - else if (destinationType == StorageResourceType.Local) - { - return JobPlanOperation.Download; - } - return JobPlanOperation.ServiceToService; - } - - private async Task AddJobPartToCheckpointer( - TransferCheckpointer checkpointer, - string transferId, - StorageResourceType sourceType, - List sourcePaths, - StorageResourceType destinationType, - List destinationPaths, - int partCount = 1, - JobPartPlanHeader header = default) - { - // Populate sourcePaths if not provided - if (sourcePaths == default) - { - string sourcePath = "sample-source"; - sourcePaths = new List(); - for (int i = 0; i < partCount; i++) - { - sourcePaths.Add(Path.Combine(sourcePath, $"file{i}")); - } - } - // Populate destPaths if not provided - if (destinationPaths == default) - { - string destPath = "sample-dest"; - destinationPaths = new List(); - for (int i = 0; i < partCount; i++) - { - destinationPaths.Add(Path.Combine(destPath, $"file{i}")); - } - } - - JobPlanOperation operationType = GetPlanOperation(sourceType, destinationType); - - // Use mock resources that don't correspond to correct paths - var sourceMock = new Mock(); - sourceMock.Setup(s => s.Uri).Returns(new Uri(CheckpointerTesting.DefaultWebSourcePath)); - sourceMock.Setup(s => s.ProviderId).Returns(ToProviderId(sourceType)); - var destMock = new Mock(); - destMock.Setup(s => s.Uri).Returns(new Uri(CheckpointerTesting.DefaultWebDestinationPath)); - destMock.Setup(s => s.ProviderId).Returns(ToProviderId(destinationType)); - await checkpointer.AddNewJobAsync(transferId, sourceMock.Object, destMock.Object); - - for (int currentPart = 0; currentPart < partCount; currentPart++) - { - header ??= CheckpointerTesting.CreateDefaultJobPartHeader( - transferId: transferId, - partNumber: currentPart, - sourcePath: sourcePaths[currentPart], - destinationPath: destinationPaths[currentPart], - fromTo: operationType); - - using (Stream stream = new MemoryStream()) - { - header.Serialize(stream); - - await checkpointer.AddNewJobPartAsync( - transferId: transferId, - partNumber: currentPart, - chunksTotal: 1, - headerStream: stream); - } - } - } - [Test] public async Task RehydrateBlockBlob( [Values(true, false)] bool isSource) { using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory(); - TransferCheckpointer checkpointer = new LocalTransferCheckpointer(test.DirectoryPath); string transferId = GetNewTransferId(); string sourcePath = "https://storageaccount.blob.core.windows.net/container/blobsource"; string destinationPath = "https://storageaccount.blob.core.windows.net/container/blobdest"; @@ -177,15 +134,8 @@ public async Task RehydrateBlockBlob( ToResourceId(destinationType), ToProviderId(sourceType), ToProviderId(destinationType), - isContainer: false).Object; - - await AddJobPartToCheckpointer( - checkpointer, - transferId, - sourceType, - new List() { sourcePath }, - destinationType, - new List() { destinationPath }); + isContainer: false, + GetDefaultCheckpointData(BlobType.Block)).Object; StorageResource storageResource = isSource ? await new BlobsStorageResourceProvider().FromSourceInternalHookAsync(transferProperties) @@ -199,7 +149,6 @@ await AddJobPartToCheckpointer( public async Task RehydrateBlockBlob_Options() { using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory(); - TransferCheckpointer checkpointer = new LocalTransferCheckpointer(test.DirectoryPath); string transferId = GetNewTransferId(); string sourcePath = "https://storageaccount.blob.core.windows.net/container/blobsource"; string destinationPath = "https://storageaccount.blob.core.windows.net/container/blobdest"; @@ -207,6 +156,7 @@ public async Task RehydrateBlockBlob_Options() StorageResourceType sourceType = StorageResourceType.BlockBlob; StorageResourceType destinationType = StorageResourceType.BlockBlob; + BlobDestinationCheckpointData checkpointData = GetPopulatedCheckpointData(BlobType.Block, AccessTier.Cool); DataTransferProperties transferProperties = GetProperties( test.DirectoryPath, transferId, @@ -216,37 +166,21 @@ public async Task RehydrateBlockBlob_Options() ToResourceId(destinationType), ToProviderId(sourceType), ToProviderId(destinationType), - isContainer: false).Object; - - IDictionary metadata = DataProvider.BuildMetadata(); - IDictionary blobTags = DataProvider.BuildTags(); - - JobPartPlanHeader header = CheckpointerTesting.CreateDefaultJobPartHeader( - transferId: transferId, - partNumber: 0, - sourcePath: sourcePath, - destinationPath: destinationPath, - fromTo: GetPlanOperation(sourceType, destinationType), - blobTags: blobTags, - metadata: metadata, - blockBlobTier: JobPartPlanBlockBlobTier.Cool); - - await AddJobPartToCheckpointer( - checkpointer, - transferId, - sourceType, - new List() { sourcePath }, - destinationType, - new List() { destinationPath }, - header: header); + isContainer: false, + checkpointData).Object; BlockBlobStorageResource storageResource = (BlockBlobStorageResource)await new BlobsStorageResourceProvider() .FromDestinationInternalHookAsync(transferProperties); Assert.AreEqual(destinationPath, storageResource.Uri.AbsoluteUri); - Assert.AreEqual(AccessTier.Cool, storageResource._options.AccessTier); - Assert.AreEqual(metadata, storageResource._options.Metadata); - Assert.AreEqual(blobTags, storageResource._options.Tags); + Assert.AreEqual(checkpointData.AccessTier, storageResource._options.AccessTier); + Assert.AreEqual(checkpointData.Metadata, storageResource._options.Metadata); + Assert.AreEqual(checkpointData.Tags, storageResource._options.Tags); + Assert.AreEqual(checkpointData.ContentHeaders.ContentType, storageResource._options.HttpHeaders.ContentType); + Assert.AreEqual(checkpointData.ContentHeaders.ContentEncoding, storageResource._options.HttpHeaders.ContentEncoding); + Assert.AreEqual(checkpointData.ContentHeaders.ContentLanguage, storageResource._options.HttpHeaders.ContentLanguage); + Assert.AreEqual(checkpointData.ContentHeaders.ContentDisposition, storageResource._options.HttpHeaders.ContentDisposition); + Assert.AreEqual(checkpointData.ContentHeaders.CacheControl, storageResource._options.HttpHeaders.CacheControl); } [Test] @@ -254,7 +188,6 @@ public async Task RehydratePageBlob( [Values(true, false)] bool isSource) { using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory(); - TransferCheckpointer checkpointer = new LocalTransferCheckpointer(test.DirectoryPath); string transferId = GetNewTransferId(); string sourcePath = "https://storageaccount.blob.core.windows.net/container/blobsource"; string destinationPath = "https://storageaccount.blob.core.windows.net/container/blobdest"; @@ -272,15 +205,8 @@ public async Task RehydratePageBlob( ToResourceId(destinationType), ToProviderId(sourceType), ToProviderId(destinationType), - isContainer: false).Object; - - await AddJobPartToCheckpointer( - checkpointer, - transferId, - sourceType, - new List() { sourcePath }, - destinationType, - new List() { destinationPath }); + isContainer: false, + GetDefaultCheckpointData(BlobType.Page)).Object; StorageResource storageResource = isSource ? await new BlobsStorageResourceProvider().FromSourceInternalHookAsync(transferProperties) @@ -294,7 +220,6 @@ await AddJobPartToCheckpointer( public async Task RehydratePageBlob_Options() { using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory(); - TransferCheckpointer checkpointer = new LocalTransferCheckpointer(test.DirectoryPath); string transferId = GetNewTransferId(); string sourcePath = "https://storageaccount.blob.core.windows.net/container/blobsource"; string destinationPath = "https://storageaccount.blob.core.windows.net/container/blobdest"; @@ -302,6 +227,7 @@ public async Task RehydratePageBlob_Options() StorageResourceType sourceType = StorageResourceType.PageBlob; StorageResourceType destinationType = StorageResourceType.PageBlob; + BlobDestinationCheckpointData checkpointData = GetPopulatedCheckpointData(BlobType.Page, AccessTier.P30); DataTransferProperties transferProperties = GetProperties( test.DirectoryPath, transferId, @@ -311,37 +237,21 @@ public async Task RehydratePageBlob_Options() ToResourceId(destinationType), ToProviderId(sourceType), ToProviderId(destinationType), - isContainer: false).Object; - - IDictionary metadata = DataProvider.BuildMetadata(); - IDictionary blobTags = DataProvider.BuildTags(); - - JobPartPlanHeader header = CheckpointerTesting.CreateDefaultJobPartHeader( - transferId: transferId, - partNumber: 0, - sourcePath: sourcePath, - destinationPath: destinationPath, - fromTo: GetPlanOperation(sourceType, destinationType), - blobTags: blobTags, - metadata: metadata, - pageBlobTier: JobPartPlanPageBlobTier.P30); - - await AddJobPartToCheckpointer( - checkpointer, - transferId, - sourceType, - new List() { sourcePath }, - destinationType, - new List() { destinationPath }, - header: header); + isContainer: false, + checkpointData).Object; PageBlobStorageResource storageResource = (PageBlobStorageResource)await new BlobsStorageResourceProvider() .FromDestinationInternalHookAsync(transferProperties); Assert.AreEqual(destinationPath, storageResource.Uri.AbsoluteUri); - Assert.AreEqual(AccessTier.P30, storageResource._options.AccessTier); - Assert.AreEqual(metadata, storageResource._options.Metadata); - Assert.AreEqual(blobTags, storageResource._options.Tags); + Assert.AreEqual(checkpointData.AccessTier, storageResource._options.AccessTier); + Assert.AreEqual(checkpointData.Metadata, storageResource._options.Metadata); + Assert.AreEqual(checkpointData.Tags, storageResource._options.Tags); + Assert.AreEqual(checkpointData.ContentHeaders.ContentType, storageResource._options.HttpHeaders.ContentType); + Assert.AreEqual(checkpointData.ContentHeaders.ContentEncoding, storageResource._options.HttpHeaders.ContentEncoding); + Assert.AreEqual(checkpointData.ContentHeaders.ContentLanguage, storageResource._options.HttpHeaders.ContentLanguage); + Assert.AreEqual(checkpointData.ContentHeaders.ContentDisposition, storageResource._options.HttpHeaders.ContentDisposition); + Assert.AreEqual(checkpointData.ContentHeaders.CacheControl, storageResource._options.HttpHeaders.CacheControl); } [Test] @@ -349,7 +259,6 @@ public async Task RehydrateAppendBlob( [Values(true, false)] bool isSource) { using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory(); - TransferCheckpointer checkpointer = new LocalTransferCheckpointer(test.DirectoryPath); string transferId = GetNewTransferId(); string sourcePath = "https://storageaccount.blob.core.windows.net/container/blobsource"; string destinationPath = "https://storageaccount.blob.core.windows.net/container/blobdest"; @@ -367,15 +276,8 @@ public async Task RehydrateAppendBlob( ToResourceId(destinationType), ToProviderId(sourceType), ToProviderId(destinationType), - isContainer: false).Object; - - await AddJobPartToCheckpointer( - checkpointer, - transferId, - sourceType, - new List() { sourcePath }, - destinationType, - new List() { destinationPath }); + isContainer: false, + GetDefaultCheckpointData(BlobType.Append)).Object; StorageResource storageResource = isSource ? await new BlobsStorageResourceProvider().FromSourceInternalHookAsync(transferProperties) @@ -389,7 +291,6 @@ await AddJobPartToCheckpointer( public async Task RehydrateAppendBlob_Options() { using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory(); - TransferCheckpointer checkpointer = new LocalTransferCheckpointer(test.DirectoryPath); string transferId = GetNewTransferId(); string sourcePath = "https://storageaccount.blob.core.windows.net/container/blobsource"; string destinationPath = "https://storageaccount.blob.core.windows.net/container/blobdest"; @@ -397,6 +298,7 @@ public async Task RehydrateAppendBlob_Options() StorageResourceType sourceType = StorageResourceType.AppendBlob; StorageResourceType destinationType = StorageResourceType.AppendBlob; + BlobDestinationCheckpointData checkpointData = GetPopulatedCheckpointData(BlobType.Append, accessTier: default); DataTransferProperties transferProperties = GetProperties( test.DirectoryPath, transferId, @@ -406,44 +308,28 @@ public async Task RehydrateAppendBlob_Options() ToResourceId(destinationType), ToProviderId(sourceType), ToProviderId(destinationType), - isContainer: false).Object; - - IDictionary metadata = DataProvider.BuildMetadata(); - IDictionary blobTags = DataProvider.BuildTags(); - - JobPartPlanHeader header = CheckpointerTesting.CreateDefaultJobPartHeader( - transferId: transferId, - partNumber: 0, - sourcePath: sourcePath, - destinationPath: destinationPath, - fromTo: GetPlanOperation(sourceType, destinationType), - blobTags: blobTags, - metadata: metadata); - - await AddJobPartToCheckpointer( - checkpointer, - transferId, - sourceType, - new List() { sourcePath }, - destinationType, - new List() { destinationPath }, - header: header); + isContainer: false, + checkpointData).Object; AppendBlobStorageResource storageResource = (AppendBlobStorageResource)await new BlobsStorageResourceProvider() .FromDestinationInternalHookAsync(transferProperties); Assert.AreEqual(destinationPath, storageResource.Uri.AbsoluteUri); - Assert.AreEqual(metadata, storageResource._options.Metadata); - Assert.AreEqual(blobTags, storageResource._options.Tags); + Assert.AreEqual(checkpointData.AccessTier, storageResource._options.AccessTier); + Assert.AreEqual(checkpointData.Metadata, storageResource._options.Metadata); + Assert.AreEqual(checkpointData.Tags, storageResource._options.Tags); + Assert.AreEqual(checkpointData.ContentHeaders.ContentType, storageResource._options.HttpHeaders.ContentType); + Assert.AreEqual(checkpointData.ContentHeaders.ContentEncoding, storageResource._options.HttpHeaders.ContentEncoding); + Assert.AreEqual(checkpointData.ContentHeaders.ContentLanguage, storageResource._options.HttpHeaders.ContentLanguage); + Assert.AreEqual(checkpointData.ContentHeaders.ContentDisposition, storageResource._options.HttpHeaders.ContentDisposition); + Assert.AreEqual(checkpointData.ContentHeaders.CacheControl, storageResource._options.HttpHeaders.CacheControl); } [Test] - [Combinatorial] public async Task RehydrateBlobContainer( [Values(true, false)] bool isSource) { using DisposingLocalDirectory test = DisposingLocalDirectory.GetTestDirectory(); - TransferCheckpointer checkpointer = new LocalTransferCheckpointer(test.DirectoryPath); string transferId = GetNewTransferId(); List sourcePaths = new List(); string sourceParentPath = "https://storageaccount.blob.core.windows.net/sourcecontainer"; @@ -471,16 +357,8 @@ public async Task RehydrateBlobContainer( ToResourceId(destinationType), ToProviderId(sourceType), ToProviderId(destinationType), - isContainer: true).Object; - - await AddJobPartToCheckpointer( - checkpointer, - transferId, - sourceType, - sourcePaths, - destinationType, - destinationPaths, - jobPartCount); + isContainer: true, + GetDefaultCheckpointData(BlobType.Block)).Object; StorageResource storageResource = isSource ? await new BlobsStorageResourceProvider().FromSourceInternalHookAsync(transferProperties) From a1be6312fc8cb93bbca5274937809a788f410be6 Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Thu, 19 Oct 2023 11:16:01 -0700 Subject: [PATCH 04/10] Add source checkpoint data for Blob --- .../src/AppendBlobStorageResource.cs | 2 +- .../src/BlobCheckpointData.cs | 40 +++++++++ .../src/BlobDestinationCheckpointData.cs | 29 ++----- .../src/BlobSourceCheckpointData.cs | 34 +++++++- .../src/BlobStorageResourceContainer.cs | 13 +-- .../src/BlobsStorageResourceProvider.cs | 83 ++++++++++--------- .../src/BlockBlobStorageResource.cs | 2 +- .../src/DataMovementBlobConstants.cs | 11 ++- .../src/DataMovementBlobsExtensions.cs | 16 +++- .../src/PageBlobStorageResource.cs | 2 +- .../BlobDestinationCheckpointDataTests.cs | 8 +- .../tests/BlobSourceCheckpointDataTests.cs | 64 ++++++++++++++ .../tests/RehydrateBlobResourceTests.cs | 35 +++++--- 13 files changed, 244 insertions(+), 95 deletions(-) create mode 100644 sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobCheckpointData.cs create mode 100644 sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlobSourceCheckpointDataTests.cs diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/AppendBlobStorageResource.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/AppendBlobStorageResource.cs index 56380612ac6e6..efa9243fe3f83 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/AppendBlobStorageResource.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/AppendBlobStorageResource.cs @@ -285,7 +285,7 @@ protected override async Task DeleteIfExistsAsync(CancellationToken cancel public override StorageResourceCheckpointData GetSourceCheckpointData() { - return new BlobSourceCheckpointData(); + return new BlobSourceCheckpointData(BlobType.Append); } public override StorageResourceCheckpointData GetDestinationCheckpointData() diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobCheckpointData.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobCheckpointData.cs new file mode 100644 index 0000000000000..2893b055d898f --- /dev/null +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobCheckpointData.cs @@ -0,0 +1,40 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.IO; +using Azure.Storage.Blobs.Models; + +namespace Azure.Storage.DataMovement.Blobs +{ + /// + /// Base class for Blob source and destination checkpoint data + /// which contains shared fields. + /// + internal abstract class BlobCheckpointData : StorageResourceCheckpointData + { + /// + /// Schema version. + /// + public int Version; + + /// + /// The type of blob. + /// + public BlobType BlobType; + + public BlobCheckpointData(int version, BlobType blobType) + { + Version = version; + BlobType = blobType; + } + + protected static void CheckSchemaVersion(int expected, int actual) + { + if (expected != actual) + { + throw Errors.UnsupportedJobSchemaVersionHeader(actual.ToString()); + } + } + } +} diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobDestinationCheckpointData.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobDestinationCheckpointData.cs index d4cd967f0c36e..0dd6ca9bafe26 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobDestinationCheckpointData.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobDestinationCheckpointData.cs @@ -12,18 +12,8 @@ namespace Azure.Storage.DataMovement.Blobs { - internal class BlobDestinationCheckpointData : StorageResourceCheckpointData + internal class BlobDestinationCheckpointData : BlobCheckpointData { - /// - /// Schema version. - /// - public int Version; - - /// - /// The type of the destination blob. - /// - public BlobType BlobType; - /// /// The content headers for the destination blob. /// @@ -66,9 +56,8 @@ public BlobDestinationCheckpointData( Metadata metadata, Tags blobTags, string cpkScope) + : base(DataMovementBlobConstants.DestinationCheckpointData.SchemaVersion, blobType) { - Version = DataMovementBlobConstants.DestinationJobPartHeader.SchemaVersion; - BlobType = blobType; ContentHeaders = contentHeaders; _contentTypeBytes = ContentHeaders?.ContentType != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentType) : Array.Empty(); _contentEncodingBytes = ContentHeaders?.ContentEncoding != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentEncoding) : Array.Empty(); @@ -88,7 +77,7 @@ public override void Serialize(Stream stream) { Argument.AssertNotNull(stream, nameof(stream)); - int currentVariableLengthIndex = DataMovementBlobConstants.DestinationJobPartHeader.VariableLengthStartIndex; + int currentVariableLengthIndex = DataMovementBlobConstants.DestinationCheckpointData.VariableLengthStartIndex; BinaryWriter writer = new BinaryWriter(stream); // Version @@ -142,7 +131,7 @@ internal static BlobDestinationCheckpointData Deserialize(Stream stream) // Version int version = reader.ReadInt32(); - CheckSchemaVersion(version); + CheckSchemaVersion(DataMovementBlobConstants.DestinationCheckpointData.SchemaVersion, version); // BlobType BlobType blobType = (BlobType)reader.ReadByte(); @@ -272,7 +261,7 @@ internal static BlobDestinationCheckpointData Deserialize(Stream stream) private int CalculateLength() { // Length is fixed size fields plus length of each variable length field - int length = DataMovementBlobConstants.DestinationJobPartHeader.VariableLengthStartIndex; + int length = DataMovementBlobConstants.DestinationCheckpointData.VariableLengthStartIndex; length += _contentTypeBytes.Length; length += _contentEncodingBytes.Length; length += _contentLanguageBytes.Length; @@ -283,13 +272,5 @@ private int CalculateLength() length += _cpkScopeBytes.Length; return length; } - - private static void CheckSchemaVersion(int version) - { - if (version != DataMovementBlobConstants.DestinationJobPartHeader.SchemaVersion) - { - throw Errors.UnsupportedJobSchemaVersionHeader(version.ToString()); - } - } } } diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobSourceCheckpointData.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobSourceCheckpointData.cs index 47a290dbe02c8..e03afb23d3c65 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobSourceCheckpointData.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobSourceCheckpointData.cs @@ -2,15 +2,45 @@ // Licensed under the MIT License. using System.IO; +using Azure.Core; +using Azure.Storage.Blobs.Models; namespace Azure.Storage.DataMovement.Blobs { - internal class BlobSourceCheckpointData : StorageResourceCheckpointData + internal class BlobSourceCheckpointData : BlobCheckpointData { - public override int Length => 0; + public BlobSourceCheckpointData(BlobType blobType) + : base(DataMovementBlobConstants.SourceCheckpointData.SchemaVersion, blobType) + { + } + + public override int Length => DataMovementBlobConstants.SourceCheckpointData.DataSize; public override void Serialize(Stream stream) { + Argument.AssertNotNull(stream, nameof(stream)); + BinaryWriter writer = new BinaryWriter(stream); + + // Version + writer.Write(Version); + + // BlobType + writer.Write((byte)BlobType); + } + + internal static BlobSourceCheckpointData Deserialize(Stream stream) + { + Argument.AssertNotNull(stream, nameof(stream)); + BinaryReader reader = new BinaryReader(stream); + + // Version + int version = reader.ReadInt32(); + CheckSchemaVersion(DataMovementBlobConstants.SourceCheckpointData.SchemaVersion, version); + + // BlobType + BlobType blobType = (BlobType)reader.ReadByte(); + + return new BlobSourceCheckpointData(blobType); } } } diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobStorageResourceContainer.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobStorageResourceContainer.cs index a5f790189ff1c..02ea5e1f1fef6 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobStorageResourceContainer.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobStorageResourceContainer.cs @@ -131,17 +131,18 @@ protected override async IAsyncEnumerable GetStorageResourcesAs public override StorageResourceCheckpointData GetSourceCheckpointData() { - return new BlobSourceCheckpointData(); + // Source blob type does not matter for container + return new BlobSourceCheckpointData(BlobType.Block); } public override StorageResourceCheckpointData GetDestinationCheckpointData() { return new BlobDestinationCheckpointData( - _options.BlobType, - _options.BlobOptions.HttpHeaders, - _options.BlobOptions.AccessTier, - _options.BlobOptions.Metadata, - _options.BlobOptions.Tags, + _options?.BlobType ?? BlobType.Block, + _options?.BlobOptions?.HttpHeaders, + _options?.BlobOptions?.AccessTier, + _options?.BlobOptions?.Metadata, + _options?.BlobOptions?.Tags, default); // TODO: Update when we support encryption scopes } diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobsStorageResourceProvider.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobsStorageResourceProvider.cs index 7344b3f7d4bb6..f7a6d926ab350 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobsStorageResourceProvider.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobsStorageResourceProvider.cs @@ -235,7 +235,8 @@ private StorageResource FromTransferProperties( bool getSource, CancellationToken cancellationToken) { - BlobDestinationCheckpointData checkpointData = properties.GetCheckpointData(); + BlobCheckpointData checkpointData = properties.GetCheckpointData(getSource); + ResourceType type = GetType(checkpointData.BlobType, properties.IsContainer); Uri uri = getSource ? properties.SourceUri : properties.DestinationUri; IBlobResourceRehydrator rehydrator = type switch @@ -250,24 +251,24 @@ private StorageResource FromTransferProperties( { CredentialType.None => rehydrator.Rehydrate( properties, - checkpointData, + checkpointData as BlobDestinationCheckpointData, getSource, cancellationToken), CredentialType.SharedKey => rehydrator.Rehydrate( properties, - checkpointData, + checkpointData as BlobDestinationCheckpointData, getSource, _getStorageSharedKeyCredential(uri, getSource), cancellationToken), CredentialType.Token => rehydrator.Rehydrate( properties, - checkpointData, + checkpointData as BlobDestinationCheckpointData, getSource, _getTokenCredential(uri, getSource), cancellationToken), CredentialType.Sas => rehydrator.Rehydrate( properties, - checkpointData, + checkpointData as BlobDestinationCheckpointData, getSource, _getAzureSasCredential(uri, getSource), cancellationToken), @@ -480,24 +481,24 @@ private interface IBlobResourceRehydrator { StorageResource Rehydrate( DataTransferProperties properties, - BlobDestinationCheckpointData checkpointData, + BlobDestinationCheckpointData destinationCheckpointData, bool isSource, CancellationToken cancellationToken); StorageResource Rehydrate( DataTransferProperties properties, - BlobDestinationCheckpointData checkpointData, + BlobDestinationCheckpointData destinationCheckpointData, bool isSource, StorageSharedKeyCredential credential, CancellationToken cancellationToken); StorageResource Rehydrate( DataTransferProperties properties, - BlobDestinationCheckpointData checkpointData, + BlobDestinationCheckpointData destinationCheckpointData, bool isSource, TokenCredential credential, CancellationToken cancellationToken); StorageResource Rehydrate( DataTransferProperties properties, - BlobDestinationCheckpointData checkpointData, + BlobDestinationCheckpointData destinationCheckpointData, bool isSource, AzureSasCredential credential, CancellationToken cancellationToken); @@ -519,42 +520,42 @@ private Uri GetContainerUri(DataTransferProperties properties, bool getSource) public StorageResource Rehydrate( DataTransferProperties properties, - BlobDestinationCheckpointData checkpointData, + BlobDestinationCheckpointData destinationCheckpointData, bool isSource, CancellationToken cancellationToken) => new BlobStorageResourceContainer( new BlobContainerClient(GetContainerUri(properties, isSource)), - !isSource ? checkpointData.GetBlobContainerOptions(GetPrefix(properties, isSource)) : default); + !isSource ? destinationCheckpointData.GetBlobContainerOptions(GetPrefix(properties, isSource)) : default); public StorageResource Rehydrate( DataTransferProperties properties, - BlobDestinationCheckpointData checkpointData, + BlobDestinationCheckpointData destinationCheckpointData, bool isSource, StorageSharedKeyCredential credential, CancellationToken cancellationToken) => new BlobStorageResourceContainer( new BlobContainerClient(GetContainerUri(properties, isSource), credential), - !isSource ? checkpointData.GetBlobContainerOptions(GetPrefix(properties, isSource)) : default); + !isSource ? destinationCheckpointData.GetBlobContainerOptions(GetPrefix(properties, isSource)) : default); public StorageResource Rehydrate( DataTransferProperties properties, - BlobDestinationCheckpointData checkpointData, + BlobDestinationCheckpointData destinationCheckpointData, bool isSource, TokenCredential credential, CancellationToken cancellationToken) => new BlobStorageResourceContainer( new BlobContainerClient(GetContainerUri(properties, isSource), credential), - !isSource ? checkpointData.GetBlobContainerOptions(GetPrefix(properties, isSource)) : default); + !isSource ? destinationCheckpointData.GetBlobContainerOptions(GetPrefix(properties, isSource)) : default); public StorageResource Rehydrate( DataTransferProperties properties, - BlobDestinationCheckpointData checkpointData, + BlobDestinationCheckpointData destinationCheckpointData, bool isSource, AzureSasCredential credential, CancellationToken cancellationToken) => new BlobStorageResourceContainer( new BlobContainerClient(GetContainerUri(properties, isSource), credential), - !isSource ? checkpointData.GetBlobContainerOptions(GetPrefix(properties, isSource)) : default); + !isSource ? destinationCheckpointData.GetBlobContainerOptions(GetPrefix(properties, isSource)) : default); } private class BlockBlobResourceRehydrator : IBlobResourceRehydrator @@ -564,42 +565,42 @@ private Uri GetUri(DataTransferProperties properties, bool getSource) public StorageResource Rehydrate( DataTransferProperties properties, - BlobDestinationCheckpointData checkpointData, + BlobDestinationCheckpointData destinationCheckpointData, bool isSource, CancellationToken cancellationToken) => new BlockBlobStorageResource( new BlockBlobClient(GetUri(properties, isSource)), - !isSource ? checkpointData.GetBlockBlobResourceOptions() : default); + !isSource ? destinationCheckpointData.GetBlockBlobResourceOptions() : default); public StorageResource Rehydrate( DataTransferProperties properties, - BlobDestinationCheckpointData checkpointData, + BlobDestinationCheckpointData destinationCheckpointData, bool isSource, StorageSharedKeyCredential credential, CancellationToken cancellationToken) => new BlockBlobStorageResource( new BlockBlobClient(GetUri(properties, isSource), credential), - !isSource ? checkpointData.GetBlockBlobResourceOptions() : default); + !isSource ? destinationCheckpointData.GetBlockBlobResourceOptions() : default); public StorageResource Rehydrate( DataTransferProperties properties, - BlobDestinationCheckpointData checkpointData, + BlobDestinationCheckpointData destinationCheckpointData, bool isSource, TokenCredential credential, CancellationToken cancellationToken) => new BlockBlobStorageResource( new BlockBlobClient(GetUri(properties, isSource), credential), - !isSource ? checkpointData.GetBlockBlobResourceOptions() : default); + !isSource ? destinationCheckpointData.GetBlockBlobResourceOptions() : default); public StorageResource Rehydrate( DataTransferProperties properties, - BlobDestinationCheckpointData checkpointData, + BlobDestinationCheckpointData destinationCheckpointData, bool isSource, AzureSasCredential credential, CancellationToken cancellationToken) => new BlockBlobStorageResource( new BlockBlobClient(GetUri(properties, isSource), credential), - !isSource ? checkpointData.GetBlockBlobResourceOptions() : default); + !isSource ? destinationCheckpointData.GetBlockBlobResourceOptions() : default); } private class PageBlobResourceRehydrator : IBlobResourceRehydrator @@ -609,42 +610,42 @@ private Uri GetUri(DataTransferProperties properties, bool getSource) public StorageResource Rehydrate( DataTransferProperties properties, - BlobDestinationCheckpointData checkpointData, + BlobDestinationCheckpointData destinationCheckpointData, bool isSource, CancellationToken cancellationToken) => new PageBlobStorageResource( new PageBlobClient(GetUri(properties, isSource)), - !isSource ? checkpointData.GetPageBlobResourceOptions() : default); + !isSource ? destinationCheckpointData.GetPageBlobResourceOptions() : default); public StorageResource Rehydrate( DataTransferProperties properties, - BlobDestinationCheckpointData checkpointData, + BlobDestinationCheckpointData destinationCheckpointData, bool isSource, StorageSharedKeyCredential credential, CancellationToken cancellationToken) => new PageBlobStorageResource( new PageBlobClient(GetUri(properties, isSource), credential), - !isSource ? checkpointData.GetPageBlobResourceOptions() : default); + !isSource ? destinationCheckpointData.GetPageBlobResourceOptions() : default); public StorageResource Rehydrate( DataTransferProperties properties, - BlobDestinationCheckpointData checkpointData, + BlobDestinationCheckpointData destinationCheckpointData, bool isSource, TokenCredential credential, CancellationToken cancellationToken) => new PageBlobStorageResource( new PageBlobClient(GetUri(properties, isSource), credential), - !isSource ? checkpointData.GetPageBlobResourceOptions() : default); + !isSource ? destinationCheckpointData.GetPageBlobResourceOptions() : default); public StorageResource Rehydrate( DataTransferProperties properties, - BlobDestinationCheckpointData checkpointData, + BlobDestinationCheckpointData destinationCheckpointData, bool isSource, AzureSasCredential credential, CancellationToken cancellationToken) => new PageBlobStorageResource( new PageBlobClient(GetUri(properties, isSource), credential), - !isSource ? checkpointData.GetPageBlobResourceOptions() : default); + !isSource ? destinationCheckpointData.GetPageBlobResourceOptions() : default); } private class AppendBlobResourceRehydrator : IBlobResourceRehydrator @@ -654,42 +655,42 @@ private Uri GetUri(DataTransferProperties properties, bool getSource) public StorageResource Rehydrate( DataTransferProperties properties, - BlobDestinationCheckpointData checkpointData, + BlobDestinationCheckpointData destinationCheckpointData, bool isSource, CancellationToken cancellationToken) => new AppendBlobStorageResource( new AppendBlobClient(GetUri(properties, isSource)), - !isSource ? checkpointData.GetAppendBlobResourceOptions() : default); + !isSource ? destinationCheckpointData.GetAppendBlobResourceOptions() : default); public StorageResource Rehydrate( DataTransferProperties properties, - BlobDestinationCheckpointData checkpointData, + BlobDestinationCheckpointData destinationCheckpointData, bool isSource, StorageSharedKeyCredential credential, CancellationToken cancellationToken) => new AppendBlobStorageResource( new AppendBlobClient(GetUri(properties, isSource), credential), - !isSource ? checkpointData.GetAppendBlobResourceOptions() : default); + !isSource ? destinationCheckpointData.GetAppendBlobResourceOptions() : default); public StorageResource Rehydrate( DataTransferProperties properties, - BlobDestinationCheckpointData checkpointData, + BlobDestinationCheckpointData destinationCheckpointData, bool isSource, TokenCredential credential, CancellationToken cancellationToken) => new AppendBlobStorageResource( new AppendBlobClient(GetUri(properties, isSource), credential), - !isSource ? checkpointData.GetAppendBlobResourceOptions() : default); + !isSource ? destinationCheckpointData.GetAppendBlobResourceOptions() : default); public StorageResource Rehydrate( DataTransferProperties properties, - BlobDestinationCheckpointData checkpointData, + BlobDestinationCheckpointData destinationCheckpointData, bool isSource, AzureSasCredential credential, CancellationToken cancellationToken) => new AppendBlobStorageResource( new AppendBlobClient(GetUri(properties, isSource), credential), - !isSource ? checkpointData.GetAppendBlobResourceOptions() : default); + !isSource ? destinationCheckpointData.GetAppendBlobResourceOptions() : default); } #endregion diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlockBlobStorageResource.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlockBlobStorageResource.cs index ec0c033b20dfe..86fa48ce6c785 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlockBlobStorageResource.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlockBlobStorageResource.cs @@ -327,7 +327,7 @@ protected override async Task DeleteIfExistsAsync(CancellationToken cancel public override StorageResourceCheckpointData GetSourceCheckpointData() { - return new BlobSourceCheckpointData(); + return new BlobSourceCheckpointData(BlobType.Block); } public override StorageResourceCheckpointData GetDestinationCheckpointData() diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/DataMovementBlobConstants.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/DataMovementBlobConstants.cs index 84f0c05266d55..117ebf153bbaf 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/DataMovementBlobConstants.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/DataMovementBlobConstants.cs @@ -7,7 +7,16 @@ namespace Azure.Storage.DataMovement.Blobs { internal class DataMovementBlobConstants { - internal class DestinationJobPartHeader + internal class SourceCheckpointData + { + internal const int SchemaVersion = 1; + + internal const int VersionIndex = 0; + internal const int BlobTypeIndex = VersionIndex + IntSizeInBytes; + internal const int DataSize = BlobTypeIndex + OneByte; + } + + internal class DestinationCheckpointData { internal const int SchemaVersion = 1; diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/DataMovementBlobsExtensions.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/DataMovementBlobsExtensions.cs index 543e4088f7b4b..52243239fb1d3 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/DataMovementBlobsExtensions.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/DataMovementBlobsExtensions.cs @@ -407,11 +407,21 @@ internal static PageBlobUploadPagesFromUriOptions ToUploadPagesFromUriOptions( }; } - internal static BlobDestinationCheckpointData GetCheckpointData(this DataTransferProperties properties) + internal static BlobCheckpointData GetCheckpointData(this DataTransferProperties properties, bool isSource) { - using (MemoryStream stream = new(properties.DestinationCheckpointData)) + if (isSource) { - return BlobDestinationCheckpointData.Deserialize(stream); + using (MemoryStream stream = new(properties.SourceCheckpointData)) + { + return BlobSourceCheckpointData.Deserialize(stream); + } + } + else + { + using (MemoryStream stream = new(properties.DestinationCheckpointData)) + { + return BlobDestinationCheckpointData.Deserialize(stream); + } } } diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/PageBlobStorageResource.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/PageBlobStorageResource.cs index d3413f8074fb9..660e0d9219517 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/PageBlobStorageResource.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/PageBlobStorageResource.cs @@ -289,7 +289,7 @@ protected override async Task DeleteIfExistsAsync(CancellationToken cancel public override StorageResourceCheckpointData GetSourceCheckpointData() { - return new BlobSourceCheckpointData(); + return new BlobSourceCheckpointData(BlobType.Page); } public override StorageResourceCheckpointData GetDestinationCheckpointData() diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlobDestinationCheckpointDataTests.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlobDestinationCheckpointDataTests.cs index 08f6e13e64f27..5c318bf3742ff 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlobDestinationCheckpointDataTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlobDestinationCheckpointDataTests.cs @@ -48,7 +48,7 @@ public void Ctor() { BlobDestinationCheckpointData data = CreateDefault(); - Assert.AreEqual(DataMovementBlobConstants.DestinationJobPartHeader.SchemaVersion, data.Version); + Assert.AreEqual(DataMovementBlobConstants.DestinationCheckpointData.SchemaVersion, data.Version); Assert.AreEqual(DefaultBlobType, data.BlobType); Assert.AreEqual(DefaultContentType, data.ContentHeaders.ContentType); Assert.AreEqual(DefaultContentEncoding, data.ContentHeaders.ContentEncoding); @@ -67,7 +67,7 @@ public void Serialize() BlobDestinationCheckpointData data = CreateDefault(); string samplePath = Path.Combine("Resources", "BlobDestinationCheckpointData.1.bin"); - using (MemoryStream dataStream = new MemoryStream(DataMovementBlobConstants.DestinationJobPartHeader.VariableLengthStartIndex)) + using (MemoryStream dataStream = new MemoryStream(DataMovementBlobConstants.DestinationCheckpointData.VariableLengthStartIndex)) using (FileStream fileStream = File.OpenRead(samplePath)) { data.Serialize(dataStream); @@ -85,11 +85,11 @@ public void Deserialize() { BlobDestinationCheckpointData data = CreateDefault(); - using (Stream stream = new MemoryStream(DataMovementBlobConstants.DestinationJobPartHeader.VariableLengthStartIndex)) + using (Stream stream = new MemoryStream(DataMovementBlobConstants.DestinationCheckpointData.VariableLengthStartIndex)) { data.Serialize(stream); stream.Position = 0; - DeserializeAndVerify(stream, DataMovementBlobConstants.DestinationJobPartHeader.SchemaVersion); + DeserializeAndVerify(stream, DataMovementBlobConstants.DestinationCheckpointData.SchemaVersion); } } diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlobSourceCheckpointDataTests.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlobSourceCheckpointDataTests.cs new file mode 100644 index 0000000000000..69f279af66cc4 --- /dev/null +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlobSourceCheckpointDataTests.cs @@ -0,0 +1,64 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +extern alias DMBlobs; + +using System.IO; +using Azure.Storage.Blobs.Models; +using DMBlobs::Azure.Storage.DataMovement.Blobs; +using NUnit.Framework; + +namespace Azure.Storage.DataMovement.Blobs.Tests +{ + public class BlobSourceCheckpointDataTests + { + [Test] + public void Ctor() + { + BlobSourceCheckpointData data = new(BlobType.Block); + + Assert.AreEqual(DataMovementBlobConstants.SourceCheckpointData.SchemaVersion, data.Version); + Assert.AreEqual(BlobType.Block, data.BlobType); + } + + [Test] + public void Serialize() + { + BlobSourceCheckpointData data = new(BlobType.Block); + + byte[] expected; + using (MemoryStream stream = new MemoryStream(DataMovementBlobConstants.SourceCheckpointData.DataSize)) + { + BinaryWriter writer = new BinaryWriter(stream); + writer.Write(DataMovementBlobConstants.SourceCheckpointData.SchemaVersion); + writer.Write((byte)BlobType.Block); + expected = stream.ToArray(); + } + + byte[] actual; + using (MemoryStream stream = new MemoryStream(DataMovementBlobConstants.SourceCheckpointData.DataSize)) + { + data.Serialize(stream); + actual = stream.ToArray(); + } + + CollectionAssert.AreEqual(expected, actual); + } + + [Test] + public void Deserialize() + { + BlobSourceCheckpointData data = new(BlobType.Block); + + using (Stream stream = new MemoryStream(DataMovementBlobConstants.DestinationCheckpointData.VariableLengthStartIndex)) + { + data.Serialize(stream); + stream.Position = 0; + BlobSourceCheckpointData deserialized = BlobSourceCheckpointData.Deserialize(stream); + + Assert.AreEqual(DataMovementBlobConstants.SourceCheckpointData.SchemaVersion, deserialized.Version); + Assert.AreEqual(BlobType.Block, deserialized.BlobType); + } + } + } +} diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/RehydrateBlobResourceTests.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/RehydrateBlobResourceTests.cs index 1bd639c6dd6f6..5b61e6bf7df29 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/RehydrateBlobResourceTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/RehydrateBlobResourceTests.cs @@ -50,7 +50,12 @@ private static string ToProviderId(StorageResourceType type) }; } - private static BlobDestinationCheckpointData GetPopulatedCheckpointData( + private static BlobSourceCheckpointData GetSourceCheckpointData(BlobType blobType) + { + return new BlobSourceCheckpointData(blobType); + } + + private static BlobDestinationCheckpointData GetPopulatedDestinationCheckpointData( BlobType blobType, AccessTier? accessTier = default) { @@ -71,12 +76,12 @@ private static BlobDestinationCheckpointData GetPopulatedCheckpointData( "encryption-scope"); } - private static BlobDestinationCheckpointData GetDefaultCheckpointData(BlobType blobType) + private static BlobDestinationCheckpointData GetDefaultDestinationCheckpointData(BlobType blobType) { return new BlobDestinationCheckpointData(blobType, default, default, default, default, default); } - private static byte[] GetBytes(StorageResourceCheckpointData checkpointData) + private static byte[] GetBytes(BlobCheckpointData checkpointData) { using (MemoryStream stream = new MemoryStream()) { @@ -95,6 +100,7 @@ private static Mock GetProperties( string sourceProviderId, string destinationProviderId, bool isContainer, + BlobSourceCheckpointData sourceCheckpointData, BlobDestinationCheckpointData destinationCheckpointData) { var mock = new Mock(MockBehavior.Strict); @@ -106,7 +112,7 @@ private static Mock GetProperties( mock.Setup(p => p.DestinationTypeId).Returns(destinationResourceId); mock.Setup(p => p.SourceProviderId).Returns(sourceProviderId); mock.Setup(p => p.DestinationProviderId).Returns(destinationProviderId); - mock.Setup(p => p.SourceCheckpointData).Returns(Array.Empty()); + mock.Setup(p => p.SourceCheckpointData).Returns(GetBytes(sourceCheckpointData)); mock.Setup(p => p.DestinationCheckpointData).Returns(GetBytes(destinationCheckpointData)); mock.Setup(p => p.IsContainer).Returns(isContainer); return mock; @@ -135,7 +141,8 @@ public async Task RehydrateBlockBlob( ToProviderId(sourceType), ToProviderId(destinationType), isContainer: false, - GetDefaultCheckpointData(BlobType.Block)).Object; + GetSourceCheckpointData(BlobType.Block), + GetDefaultDestinationCheckpointData(BlobType.Block)).Object; StorageResource storageResource = isSource ? await new BlobsStorageResourceProvider().FromSourceInternalHookAsync(transferProperties) @@ -156,7 +163,7 @@ public async Task RehydrateBlockBlob_Options() StorageResourceType sourceType = StorageResourceType.BlockBlob; StorageResourceType destinationType = StorageResourceType.BlockBlob; - BlobDestinationCheckpointData checkpointData = GetPopulatedCheckpointData(BlobType.Block, AccessTier.Cool); + BlobDestinationCheckpointData checkpointData = GetPopulatedDestinationCheckpointData(BlobType.Block, AccessTier.Cool); DataTransferProperties transferProperties = GetProperties( test.DirectoryPath, transferId, @@ -167,6 +174,7 @@ public async Task RehydrateBlockBlob_Options() ToProviderId(sourceType), ToProviderId(destinationType), isContainer: false, + GetSourceCheckpointData(BlobType.Block), checkpointData).Object; BlockBlobStorageResource storageResource = (BlockBlobStorageResource)await new BlobsStorageResourceProvider() @@ -206,7 +214,8 @@ public async Task RehydratePageBlob( ToProviderId(sourceType), ToProviderId(destinationType), isContainer: false, - GetDefaultCheckpointData(BlobType.Page)).Object; + GetSourceCheckpointData(BlobType.Page), + GetDefaultDestinationCheckpointData(BlobType.Page)).Object; StorageResource storageResource = isSource ? await new BlobsStorageResourceProvider().FromSourceInternalHookAsync(transferProperties) @@ -227,7 +236,7 @@ public async Task RehydratePageBlob_Options() StorageResourceType sourceType = StorageResourceType.PageBlob; StorageResourceType destinationType = StorageResourceType.PageBlob; - BlobDestinationCheckpointData checkpointData = GetPopulatedCheckpointData(BlobType.Page, AccessTier.P30); + BlobDestinationCheckpointData checkpointData = GetPopulatedDestinationCheckpointData(BlobType.Page, AccessTier.P30); DataTransferProperties transferProperties = GetProperties( test.DirectoryPath, transferId, @@ -238,6 +247,7 @@ public async Task RehydratePageBlob_Options() ToProviderId(sourceType), ToProviderId(destinationType), isContainer: false, + GetSourceCheckpointData(BlobType.Page), checkpointData).Object; PageBlobStorageResource storageResource = (PageBlobStorageResource)await new BlobsStorageResourceProvider() @@ -277,7 +287,8 @@ public async Task RehydrateAppendBlob( ToProviderId(sourceType), ToProviderId(destinationType), isContainer: false, - GetDefaultCheckpointData(BlobType.Append)).Object; + GetSourceCheckpointData(BlobType.Append), + GetDefaultDestinationCheckpointData(BlobType.Append)).Object; StorageResource storageResource = isSource ? await new BlobsStorageResourceProvider().FromSourceInternalHookAsync(transferProperties) @@ -298,7 +309,7 @@ public async Task RehydrateAppendBlob_Options() StorageResourceType sourceType = StorageResourceType.AppendBlob; StorageResourceType destinationType = StorageResourceType.AppendBlob; - BlobDestinationCheckpointData checkpointData = GetPopulatedCheckpointData(BlobType.Append, accessTier: default); + BlobDestinationCheckpointData checkpointData = GetPopulatedDestinationCheckpointData(BlobType.Append, accessTier: default); DataTransferProperties transferProperties = GetProperties( test.DirectoryPath, transferId, @@ -309,6 +320,7 @@ public async Task RehydrateAppendBlob_Options() ToProviderId(sourceType), ToProviderId(destinationType), isContainer: false, + GetSourceCheckpointData(BlobType.Append), checkpointData).Object; AppendBlobStorageResource storageResource = (AppendBlobStorageResource)await new BlobsStorageResourceProvider() @@ -358,7 +370,8 @@ public async Task RehydrateBlobContainer( ToProviderId(sourceType), ToProviderId(destinationType), isContainer: true, - GetDefaultCheckpointData(BlobType.Block)).Object; + GetSourceCheckpointData(BlobType.Block), + GetDefaultDestinationCheckpointData(BlobType.Block)).Object; StorageResource storageResource = isSource ? await new BlobsStorageResourceProvider().FromSourceInternalHookAsync(transferProperties) From 71a8c06d9dde0bb8f668cf696e7942063d58982c Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Thu, 19 Oct 2023 17:09:08 -0700 Subject: [PATCH 05/10] Fix container sources --- .../src/BlobsStorageResourceProvider.cs | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobsStorageResourceProvider.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobsStorageResourceProvider.cs index f7a6d926ab350..ee1c86396c115 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobsStorageResourceProvider.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobsStorageResourceProvider.cs @@ -506,6 +506,26 @@ StorageResource Rehydrate( private class BlobContainerResourceRehydrator : IBlobResourceRehydrator { + private BlobStorageResourceContainerOptions GetOptions( + DataTransferProperties transferProperties, + BlobDestinationCheckpointData destinationCheckpointData, + bool isSource) + { + Argument.AssertNotNull(transferProperties, nameof(transferProperties)); + + if (isSource) + { + return new BlobStorageResourceContainerOptions() + { + BlobDirectoryPrefix = GetPrefix(transferProperties, isSource) + }; + } + else + { + return destinationCheckpointData.GetBlobContainerOptions(GetPrefix(transferProperties, isSource)); + } + } + private Uri GetUri(DataTransferProperties properties, bool getSource) => getSource ? properties.SourceUri : properties.DestinationUri; @@ -525,7 +545,7 @@ public StorageResource Rehydrate( CancellationToken cancellationToken) => new BlobStorageResourceContainer( new BlobContainerClient(GetContainerUri(properties, isSource)), - !isSource ? destinationCheckpointData.GetBlobContainerOptions(GetPrefix(properties, isSource)) : default); + GetOptions(properties, destinationCheckpointData, isSource)); public StorageResource Rehydrate( DataTransferProperties properties, @@ -535,7 +555,7 @@ public StorageResource Rehydrate( CancellationToken cancellationToken) => new BlobStorageResourceContainer( new BlobContainerClient(GetContainerUri(properties, isSource), credential), - !isSource ? destinationCheckpointData.GetBlobContainerOptions(GetPrefix(properties, isSource)) : default); + GetOptions(properties, destinationCheckpointData, isSource)); public StorageResource Rehydrate( DataTransferProperties properties, @@ -545,7 +565,7 @@ public StorageResource Rehydrate( CancellationToken cancellationToken) => new BlobStorageResourceContainer( new BlobContainerClient(GetContainerUri(properties, isSource), credential), - !isSource ? destinationCheckpointData.GetBlobContainerOptions(GetPrefix(properties, isSource)) : default); + GetOptions(properties, destinationCheckpointData, isSource)); public StorageResource Rehydrate( DataTransferProperties properties, @@ -555,7 +575,7 @@ public StorageResource Rehydrate( CancellationToken cancellationToken) => new BlobStorageResourceContainer( new BlobContainerClient(GetContainerUri(properties, isSource), credential), - !isSource ? destinationCheckpointData.GetBlobContainerOptions(GetPrefix(properties, isSource)) : default); + GetOptions(properties, destinationCheckpointData, isSource)); } private class BlockBlobResourceRehydrator : IBlobResourceRehydrator From ee7bc13d1881d0c3d49e53094fee74f2202c70fd Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Thu, 19 Oct 2023 18:13:42 -0700 Subject: [PATCH 06/10] Add back reader lock, add options e2e test --- .../src/Shared/LocalTransferCheckpointer.cs | 10 ++- .../tests/PauseResumeTransferTests.cs | 73 +++++++++++++++++++ 2 files changed, 81 insertions(+), 2 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement/src/Shared/LocalTransferCheckpointer.cs b/sdk/storage/Azure.Storage.DataMovement/src/Shared/LocalTransferCheckpointer.cs index 28c4747af434a..a2dbca82d5ace 100644 --- a/sdk/storage/Azure.Storage.DataMovement/src/Shared/LocalTransferCheckpointer.cs +++ b/sdk/storage/Azure.Storage.DataMovement/src/Shared/LocalTransferCheckpointer.cs @@ -154,13 +154,19 @@ public override async Task ReadJobPlanFileAsync( CancellationHelper.ThrowIfCancellationRequested(cancellationToken); if (_transferStates.TryGetValue(transferId, out JobPlanFile jobPlanFile)) { + // Lock MMF + await jobPlanFile.WriteLock.WaitAsync().ConfigureAwait(false); + using (MemoryMappedFile mmf = MemoryMappedFile.CreateFromFile(jobPlanFile.FilePath)) using (MemoryMappedViewStream mmfStream = mmf.CreateViewStream(offset, length, MemoryMappedFileAccess.Read)) { await mmfStream.CopyToAsync(copiedStream).ConfigureAwait(false); - copiedStream.Position = 0; - return copiedStream; } + + // Release MMF + jobPlanFile.WriteLock.Release(); + copiedStream.Position = 0; + return copiedStream; } else { diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/PauseResumeTransferTests.cs b/sdk/storage/Azure.Storage.DataMovement/tests/PauseResumeTransferTests.cs index 5ac73d8e39fbf..27c5a53803705 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/PauseResumeTransferTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/PauseResumeTransferTests.cs @@ -15,6 +15,7 @@ using Azure.Storage.Blobs.Specialized; using Azure.Storage.Blobs.Tests; using Azure.Storage.DataMovement.JobPlan; +using Azure.Storage.Test; using DMBlobs::Azure.Storage.DataMovement.Blobs; using Moq; using NUnit.Framework; @@ -560,6 +561,78 @@ await AssertSourceAndDestinationAsync( destinationContainer: destinationContainer.Container); } + [Ignore("https://github.com/Azure/azure-sdk-for-net/issues/35439")] + [Test] + [LiveOnly] + [TestCase(TransferDirection.Upload)] + [TestCase(TransferDirection.Copy)] + public async Task ResumeTransferAsync_Options(TransferDirection transferType) + { + // Arrange + using DisposingLocalDirectory checkpointerDirectory = DisposingLocalDirectory.GetTestDirectory(); + using DisposingLocalDirectory localDirectory = DisposingLocalDirectory.GetTestDirectory(); + await using DisposingContainer blobContainer = await GetTestContainerAsync(publicAccessType: PublicAccessType.BlobContainer); + + BlobsStorageResourceProvider blobProvider = new(GetSharedKeyCredential()); + LocalFilesStorageResourceProvider localProvider = new(); + TransferManagerOptions options = new TransferManagerOptions() + { + CheckpointerOptions = new TransferCheckpointStoreOptions(checkpointerDirectory.DirectoryPath), + ErrorHandling = DataTransferErrorMode.ContinueOnFailure, + ResumeProviders = new() { blobProvider, localProvider }, + }; + TransferManager transferManager = new TransferManager(options); + + BlockBlobStorageResourceOptions testOptions = new() + { + Metadata = DataProvider.BuildMetadata(), + Tags = DataProvider.BuildTags(), + AccessTier = AccessTier.Cool, + HttpHeaders = new BlobHttpHeaders() + { + ContentLanguage = "en-US", + }, + }; + + long size = Constants.KB; + StorageResource source; + StorageResource destination; + if (transferType == TransferDirection.Upload) + { + source = await CreateLocalFileSourceResourceAsync(size, localDirectory.DirectoryPath, localProvider); + destination = CreateBlobDestinationResource(blobContainer.Container, blobProvider, options: testOptions); + } + else // Copy + { + source = await CreateBlobSourceResourceAsync(size, GetNewBlobName(), blobContainer.Container, blobProvider); + destination = CreateBlobDestinationResource(blobContainer.Container, blobProvider, options: testOptions); + } + + DataTransfer transfer = await transferManager.StartTransferAsync(source, destination); + + // Act - Pause Job + CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await transferManager.PauseTransferIfRunningAsync(transfer.Id, cancellationTokenSource.Token); + Assert.AreEqual(DataTransferState.Paused, transfer.TransferStatus.State); + + // Act - Resume Job + DataTransfer resumeTransfer = await transferManager.ResumeTransferAsync(transfer.Id); + CancellationTokenSource waitTransferCompletion = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await resumeTransfer.WaitForCompletionAsync(waitTransferCompletion.Token); + + // Assert + Assert.AreEqual(DataTransferState.Completed, resumeTransfer.TransferStatus.State); + Assert.IsTrue(resumeTransfer.HasCompleted); + + BlobUriBuilder builder = new BlobUriBuilder(destination.Uri); + BlockBlobClient blob = blobContainer.Container.GetBlockBlobClient(builder.BlobName); + BlobProperties props = (await blob.GetPropertiesAsync()).Value; + Assert.AreEqual(testOptions.Metadata, props.Metadata); + Assert.AreEqual(testOptions.Tags.Count, props.TagCount); + Assert.AreEqual(testOptions.AccessTier, new AccessTier(props.AccessTier)); + Assert.AreEqual(testOptions.HttpHeaders.ContentLanguage, props.ContentLanguage); + } + private async Task CreateBlobDirectorySourceResourceAsync( long size, int blobCount, From e3d78bdcd010f2f4945e223a42d676fa27cf0edd Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Thu, 19 Oct 2023 18:19:34 -0700 Subject: [PATCH 07/10] Export API --- .../api/Azure.Storage.DataMovement.net6.0.cs | 8 +++++--- .../api/Azure.Storage.DataMovement.netstandard2.0.cs | 8 +++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs index 6a23aa7170c48..3e2cd2474113c 100644 --- a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs +++ b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.net6.0.cs @@ -63,10 +63,12 @@ public partial class DataTransferProperties { protected internal DataTransferProperties() { } public virtual Azure.Storage.DataMovement.TransferCheckpointStoreOptions Checkpointer { get { throw null; } } + public virtual byte[] DestinationCheckpointData { get { throw null; } } public virtual string DestinationProviderId { get { throw null; } } public virtual string DestinationTypeId { get { throw null; } } public virtual System.Uri DestinationUri { get { throw null; } } public virtual bool IsContainer { get { throw null; } } + public virtual byte[] SourceCheckpointData { get { throw null; } } public virtual string SourceProviderId { get { throw null; } } public virtual string SourceTypeId { get { throw null; } } public virtual System.Uri SourceUri { get { throw null; } } @@ -117,12 +119,14 @@ protected StorageResource() { } protected internal abstract bool IsContainer { get; } public abstract string ProviderId { get; } public abstract System.Uri Uri { get; } + public abstract Azure.Storage.DataMovement.StorageResourceCheckpointData GetDestinationCheckpointData(); + public abstract Azure.Storage.DataMovement.StorageResourceCheckpointData GetSourceCheckpointData(); } public abstract partial class StorageResourceCheckpointData { protected StorageResourceCheckpointData() { } public abstract int Length { get; } - protected internal abstract void Serialize(System.IO.Stream stream); + public abstract void Serialize(System.IO.Stream stream); } public abstract partial class StorageResourceContainer : Azure.Storage.DataMovement.StorageResource { @@ -157,9 +161,7 @@ protected StorageResourceItem() { } protected internal abstract System.Threading.Tasks.Task CopyFromUriAsync(Azure.Storage.DataMovement.StorageResourceItem sourceResource, bool overwrite, long completeLength, Azure.Storage.DataMovement.StorageResourceCopyFromUriOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); protected internal abstract System.Threading.Tasks.Task DeleteIfExistsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); protected internal abstract System.Threading.Tasks.Task GetCopyAuthorizationHeaderAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); - protected internal abstract Azure.Storage.DataMovement.StorageResourceCheckpointData GetDestinationCheckpointData(); protected internal abstract System.Threading.Tasks.Task GetPropertiesAsync(System.Threading.CancellationToken token = default(System.Threading.CancellationToken)); - protected internal abstract Azure.Storage.DataMovement.StorageResourceCheckpointData GetSourceCheckpointData(); protected internal abstract System.Threading.Tasks.Task ReadStreamAsync(long position = (long)0, long? length = default(long?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); } public partial class StorageResourceProperties diff --git a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs index 6a23aa7170c48..3e2cd2474113c 100644 --- a/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs +++ b/sdk/storage/Azure.Storage.DataMovement/api/Azure.Storage.DataMovement.netstandard2.0.cs @@ -63,10 +63,12 @@ public partial class DataTransferProperties { protected internal DataTransferProperties() { } public virtual Azure.Storage.DataMovement.TransferCheckpointStoreOptions Checkpointer { get { throw null; } } + public virtual byte[] DestinationCheckpointData { get { throw null; } } public virtual string DestinationProviderId { get { throw null; } } public virtual string DestinationTypeId { get { throw null; } } public virtual System.Uri DestinationUri { get { throw null; } } public virtual bool IsContainer { get { throw null; } } + public virtual byte[] SourceCheckpointData { get { throw null; } } public virtual string SourceProviderId { get { throw null; } } public virtual string SourceTypeId { get { throw null; } } public virtual System.Uri SourceUri { get { throw null; } } @@ -117,12 +119,14 @@ protected StorageResource() { } protected internal abstract bool IsContainer { get; } public abstract string ProviderId { get; } public abstract System.Uri Uri { get; } + public abstract Azure.Storage.DataMovement.StorageResourceCheckpointData GetDestinationCheckpointData(); + public abstract Azure.Storage.DataMovement.StorageResourceCheckpointData GetSourceCheckpointData(); } public abstract partial class StorageResourceCheckpointData { protected StorageResourceCheckpointData() { } public abstract int Length { get; } - protected internal abstract void Serialize(System.IO.Stream stream); + public abstract void Serialize(System.IO.Stream stream); } public abstract partial class StorageResourceContainer : Azure.Storage.DataMovement.StorageResource { @@ -157,9 +161,7 @@ protected StorageResourceItem() { } protected internal abstract System.Threading.Tasks.Task CopyFromUriAsync(Azure.Storage.DataMovement.StorageResourceItem sourceResource, bool overwrite, long completeLength, Azure.Storage.DataMovement.StorageResourceCopyFromUriOptions options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); protected internal abstract System.Threading.Tasks.Task DeleteIfExistsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); protected internal abstract System.Threading.Tasks.Task GetCopyAuthorizationHeaderAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); - protected internal abstract Azure.Storage.DataMovement.StorageResourceCheckpointData GetDestinationCheckpointData(); protected internal abstract System.Threading.Tasks.Task GetPropertiesAsync(System.Threading.CancellationToken token = default(System.Threading.CancellationToken)); - protected internal abstract Azure.Storage.DataMovement.StorageResourceCheckpointData GetSourceCheckpointData(); protected internal abstract System.Threading.Tasks.Task ReadStreamAsync(long position = (long)0, long? length = default(long?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)); } public partial class StorageResourceProperties From 65857f90bc0f6a6b7ad21b870171b9feb7f78ef2 Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Thu, 19 Oct 2023 19:45:21 -0700 Subject: [PATCH 08/10] Add stubs for Files --- .../ShareDirectoryStorageResourceContainer.cs | 4 ++-- .../src/ShareFileDestinationCheckpointData.cs | 16 ++++++++++++++++ .../src/ShareFileSourceCheckpointData.cs | 16 ++++++++++++++++ .../src/ShareFileStorageResource.cs | 4 ++-- 4 files changed, 36 insertions(+), 4 deletions(-) create mode 100644 sdk/storage/Azure.Storage.DataMovement.Files.Shares/src/ShareFileDestinationCheckpointData.cs create mode 100644 sdk/storage/Azure.Storage.DataMovement.Files.Shares/src/ShareFileSourceCheckpointData.cs diff --git a/sdk/storage/Azure.Storage.DataMovement.Files.Shares/src/ShareDirectoryStorageResourceContainer.cs b/sdk/storage/Azure.Storage.DataMovement.Files.Shares/src/ShareDirectoryStorageResourceContainer.cs index 4993ba818c4bc..21aeb776b40aa 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Files.Shares/src/ShareDirectoryStorageResourceContainer.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Files.Shares/src/ShareDirectoryStorageResourceContainer.cs @@ -52,12 +52,12 @@ protected override async IAsyncEnumerable GetStorageResourcesAs public override StorageResourceCheckpointData GetSourceCheckpointData() { - throw new NotImplementedException(); + return new ShareFileSourceCheckpointData(); } public override StorageResourceCheckpointData GetDestinationCheckpointData() { - throw new NotImplementedException(); + return new ShareFileDestinationCheckpointData(); } } } diff --git a/sdk/storage/Azure.Storage.DataMovement.Files.Shares/src/ShareFileDestinationCheckpointData.cs b/sdk/storage/Azure.Storage.DataMovement.Files.Shares/src/ShareFileDestinationCheckpointData.cs new file mode 100644 index 0000000000000..2a1624015970d --- /dev/null +++ b/sdk/storage/Azure.Storage.DataMovement.Files.Shares/src/ShareFileDestinationCheckpointData.cs @@ -0,0 +1,16 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.IO; + +namespace Azure.Storage.DataMovement.Files.Shares +{ + internal class ShareFileDestinationCheckpointData : StorageResourceCheckpointData + { + public override int Length => 0; + + public override void Serialize(Stream stream) + { + } + } +} diff --git a/sdk/storage/Azure.Storage.DataMovement.Files.Shares/src/ShareFileSourceCheckpointData.cs b/sdk/storage/Azure.Storage.DataMovement.Files.Shares/src/ShareFileSourceCheckpointData.cs new file mode 100644 index 0000000000000..daa9f96a0e8f3 --- /dev/null +++ b/sdk/storage/Azure.Storage.DataMovement.Files.Shares/src/ShareFileSourceCheckpointData.cs @@ -0,0 +1,16 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System.IO; + +namespace Azure.Storage.DataMovement.Files.Shares +{ + internal class ShareFileSourceCheckpointData : StorageResourceCheckpointData + { + public override int Length => 0; + + public override void Serialize(Stream stream) + { + } + } +} diff --git a/sdk/storage/Azure.Storage.DataMovement.Files.Shares/src/ShareFileStorageResource.cs b/sdk/storage/Azure.Storage.DataMovement.Files.Shares/src/ShareFileStorageResource.cs index 138f2832d58f2..ed5fd2c7b9503 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Files.Shares/src/ShareFileStorageResource.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Files.Shares/src/ShareFileStorageResource.cs @@ -203,12 +203,12 @@ protected override async Task ReadStreamAsync( public override StorageResourceCheckpointData GetSourceCheckpointData() { - throw new NotImplementedException(); + return new ShareFileSourceCheckpointData(); } public override StorageResourceCheckpointData GetDestinationCheckpointData() { - throw new NotImplementedException(); + return new ShareFileDestinationCheckpointData(); } } From 3da866f577f7384b00736f3a27814ba3ca4305c2 Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Fri, 20 Oct 2023 14:04:49 -0700 Subject: [PATCH 09/10] Feedback, fix tests --- .../src/BlobCheckpointData.cs | 8 -------- .../src/BlobDestinationCheckpointData.cs | 5 ++++- .../src/BlobSourceCheckpointData.cs | 5 ++++- .../tests/MockStorageResource.cs | 4 ++-- 4 files changed, 10 insertions(+), 12 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobCheckpointData.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobCheckpointData.cs index 2893b055d898f..3fbbd5725d9bb 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobCheckpointData.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobCheckpointData.cs @@ -28,13 +28,5 @@ public BlobCheckpointData(int version, BlobType blobType) Version = version; BlobType = blobType; } - - protected static void CheckSchemaVersion(int expected, int actual) - { - if (expected != actual) - { - throw Errors.UnsupportedJobSchemaVersionHeader(actual.ToString()); - } - } } } diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobDestinationCheckpointData.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobDestinationCheckpointData.cs index 0dd6ca9bafe26..75ae4edbf0310 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobDestinationCheckpointData.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobDestinationCheckpointData.cs @@ -131,7 +131,10 @@ internal static BlobDestinationCheckpointData Deserialize(Stream stream) // Version int version = reader.ReadInt32(); - CheckSchemaVersion(DataMovementBlobConstants.DestinationCheckpointData.SchemaVersion, version); + if (version != DataMovementBlobConstants.DestinationCheckpointData.SchemaVersion) + { + throw Errors.UnsupportedJobSchemaVersionHeader(version.ToString()); + } // BlobType BlobType blobType = (BlobType)reader.ReadByte(); diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobSourceCheckpointData.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobSourceCheckpointData.cs index e03afb23d3c65..83170b9ed972d 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobSourceCheckpointData.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/src/BlobSourceCheckpointData.cs @@ -35,7 +35,10 @@ internal static BlobSourceCheckpointData Deserialize(Stream stream) // Version int version = reader.ReadInt32(); - CheckSchemaVersion(DataMovementBlobConstants.SourceCheckpointData.SchemaVersion, version); + if (version != DataMovementBlobConstants.SourceCheckpointData.SchemaVersion) + { + throw Errors.UnsupportedJobSchemaVersionHeader(version.ToString()); + } // BlobType BlobType blobType = (BlobType)reader.ReadByte(); diff --git a/sdk/storage/Azure.Storage.DataMovement/tests/MockStorageResource.cs b/sdk/storage/Azure.Storage.DataMovement/tests/MockStorageResource.cs index 616e984ad3efa..2365d67820029 100644 --- a/sdk/storage/Azure.Storage.DataMovement/tests/MockStorageResource.cs +++ b/sdk/storage/Azure.Storage.DataMovement/tests/MockStorageResource.cs @@ -89,12 +89,12 @@ protected internal override Task ReadStreamAsyn public override StorageResourceCheckpointData GetSourceCheckpointData() { - return null; + return new MockResourceCheckpointData(); } public override StorageResourceCheckpointData GetDestinationCheckpointData() { - return null; + return new MockResourceCheckpointData(); } protected internal override async Task CopyFromStreamAsync(Stream stream, long streamLength, bool overwrite, long completeLength, StorageResourceWriteToOffsetOptions options = null, CancellationToken cancellationToken = default) From 07506d0ae05e7c06e01cf48ecbe8d79b42183f51 Mon Sep 17 00:00:00 2001 From: Jacob Lauzon Date: Mon, 23 Oct 2023 12:45:59 -0700 Subject: [PATCH 10/10] Test feedback --- .../tests/BlobSourceCheckpointDataTests.cs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlobSourceCheckpointDataTests.cs b/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlobSourceCheckpointDataTests.cs index 69f279af66cc4..b7b9119a05022 100644 --- a/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlobSourceCheckpointDataTests.cs +++ b/sdk/storage/Azure.Storage.DataMovement.Blobs/tests/BlobSourceCheckpointDataTests.cs @@ -46,9 +46,12 @@ public void Serialize() } [Test] - public void Deserialize() + [TestCase(BlobType.Block)] + [TestCase(BlobType.Page)] + [TestCase(BlobType.Append)] + public void Deserialize(BlobType blobType) { - BlobSourceCheckpointData data = new(BlobType.Block); + BlobSourceCheckpointData data = new(blobType); using (Stream stream = new MemoryStream(DataMovementBlobConstants.DestinationCheckpointData.VariableLengthStartIndex)) { @@ -57,7 +60,7 @@ public void Deserialize() BlobSourceCheckpointData deserialized = BlobSourceCheckpointData.Deserialize(stream); Assert.AreEqual(DataMovementBlobConstants.SourceCheckpointData.SchemaVersion, deserialized.Version); - Assert.AreEqual(BlobType.Block, deserialized.BlobType); + Assert.AreEqual(blobType, deserialized.BlobType); } } }