Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storage][DataMovement] Add source/destination checkpoint data to Job Plan file #39411

Merged
merged 10 commits into from
Oct 23, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -283,27 +283,19 @@ protected override async Task<bool> DeleteIfExistsAsync(CancellationToken cancel
return await BlobClient.DeleteIfExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
}

/// <summary>
/// Gets the source checkpoint data for this resource that will be written to the checkpointer.
/// </summary>
/// <returns>A <see cref="StorageResourceCheckpointData"/> containing the checkpoint information for this resource.</returns>
protected override StorageResourceCheckpointData GetSourceCheckpointData()
public override StorageResourceCheckpointData GetSourceCheckpointData()
jalauzon-msft marked this conversation as resolved.
Show resolved Hide resolved
{
return new BlobSourceCheckpointData();
return new BlobSourceCheckpointData(BlobType.Append);
}

/// <summary>
/// Gets the destination checkpoint data for this resource that will be written to the checkpointer.
/// </summary>
/// <returns>A <see cref="StorageResourceCheckpointData"/> containing the checkpoint information for this resource.</returns>
protected override StorageResourceCheckpointData GetDestinationCheckpointData()
public override StorageResourceCheckpointData GetDestinationCheckpointData()
{
return new BlobDestinationCheckpointData(
BlobType.Append,
_options.HttpHeaders,
_options.AccessTier,
_options.Metadata,
_options.Tags,
_options?.HttpHeaders,
_options?.AccessTier,
_options?.Metadata,
_options?.Tags,
default); // TODO: Update when we support encryption scopes
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.IO;
using Azure.Storage.Blobs.Models;

namespace Azure.Storage.DataMovement.Blobs
{
/// <summary>
/// Base class for Blob source and destination checkpoint data
/// which contains shared fields.
/// </summary>
internal abstract class BlobCheckpointData : StorageResourceCheckpointData
jalauzon-msft marked this conversation as resolved.
Show resolved Hide resolved
{
/// <summary>
/// Schema version.
/// </summary>
public int Version;
jalauzon-msft marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// The type of blob.
/// </summary>
public BlobType BlobType;

public BlobCheckpointData(int version, BlobType blobType)
{
Version = version;
BlobType = blobType;
}

protected static void CheckSchemaVersion(int expected, int actual)
{
if (expected != actual)
{
throw Errors.UnsupportedJobSchemaVersionHeader(actual.ToString());
}
}
jalauzon-msft marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.IO;
using System.Text;
using Azure.Core;
Expand All @@ -11,18 +12,8 @@

namespace Azure.Storage.DataMovement.Blobs
{
internal class BlobDestinationCheckpointData : StorageResourceCheckpointData
internal class BlobDestinationCheckpointData : BlobCheckpointData
{
/// <summary>
/// Schema version.
/// </summary>
public int Version;

/// <summary>
/// The type of the destination blob.
/// </summary>
public BlobType BlobType;

/// <summary>
/// The content headers for the destination blob.
/// </summary>
Expand Down Expand Up @@ -65,29 +56,28 @@ public BlobDestinationCheckpointData(
Metadata metadata,
Tags blobTags,
string cpkScope)
: base(DataMovementBlobConstants.DestinationCheckpointData.SchemaVersion, blobType)
{
Version = DataMovementBlobConstants.DestinationJobPartHeader.SchemaVersion;
BlobType = blobType;
ContentHeaders = contentHeaders;
_contentTypeBytes = ContentHeaders?.ContentType != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentType) : new byte[0];
_contentEncodingBytes = ContentHeaders?.ContentEncoding != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentEncoding) : new byte[0];
_contentLanguageBytes = ContentHeaders?.ContentLanguage != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentLanguage) : new byte[0];
_contentDispositionBytes = ContentHeaders?.ContentDisposition != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentDisposition) : new byte[0];
_cacheControlBytes = ContentHeaders?.CacheControl != default ? Encoding.UTF8.GetBytes(ContentHeaders.CacheControl) : new byte[0];
_contentTypeBytes = ContentHeaders?.ContentType != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentType) : Array.Empty<byte>();
_contentEncodingBytes = ContentHeaders?.ContentEncoding != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentEncoding) : Array.Empty<byte>();
_contentLanguageBytes = ContentHeaders?.ContentLanguage != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentLanguage) : Array.Empty<byte>();
_contentDispositionBytes = ContentHeaders?.ContentDisposition != default ? Encoding.UTF8.GetBytes(ContentHeaders.ContentDisposition) : Array.Empty<byte>();
_cacheControlBytes = ContentHeaders?.CacheControl != default ? Encoding.UTF8.GetBytes(ContentHeaders.CacheControl) : Array.Empty<byte>();
AccessTier = accessTier;
Metadata = metadata;
_metadataBytes = Metadata != default ? Encoding.UTF8.GetBytes(Metadata.DictionaryToString()) : new byte[0];
_metadataBytes = Metadata != default ? Encoding.UTF8.GetBytes(Metadata.DictionaryToString()) : Array.Empty<byte>();
Tags = blobTags;
_tagsBytes = Tags != default ? Encoding.UTF8.GetBytes(Tags.DictionaryToString()) : new byte[0];
_tagsBytes = Tags != default ? Encoding.UTF8.GetBytes(Tags.DictionaryToString()) : Array.Empty<byte>();
CpkScope = cpkScope;
_cpkScopeBytes = CpkScope != default ? Encoding.UTF8.GetBytes(CpkScope) : new byte[0];
_cpkScopeBytes = CpkScope != default ? Encoding.UTF8.GetBytes(CpkScope) : Array.Empty<byte>();
}

protected override void Serialize(Stream stream)
public override void Serialize(Stream stream)
{
Argument.AssertNotNull(stream, nameof(stream));

int currentVariableLengthIndex = DataMovementBlobConstants.DestinationJobPartHeader.VariableLengthStartIndex;
int currentVariableLengthIndex = DataMovementBlobConstants.DestinationCheckpointData.VariableLengthStartIndex;
BinaryWriter writer = new BinaryWriter(stream);

// Version
Expand All @@ -97,31 +87,31 @@ protected override void Serialize(Stream stream)
writer.Write((byte)BlobType);

// ContentType offset/length
WriteVariableLengthFieldInfo(writer, _contentTypeBytes, ref currentVariableLengthIndex);
WriteVariableLengthFieldInfo(writer, _contentTypeBytes.Length, ref currentVariableLengthIndex);

// ContentEncoding offset/length
WriteVariableLengthFieldInfo(writer, _contentEncodingBytes, ref currentVariableLengthIndex);
WriteVariableLengthFieldInfo(writer, _contentEncodingBytes.Length, ref currentVariableLengthIndex);

// ContentLanguage offset/length
WriteVariableLengthFieldInfo(writer, _contentLanguageBytes, ref currentVariableLengthIndex);
WriteVariableLengthFieldInfo(writer, _contentLanguageBytes.Length, ref currentVariableLengthIndex);

// ContentDisposition offset/length
WriteVariableLengthFieldInfo(writer, _contentDispositionBytes, ref currentVariableLengthIndex);
WriteVariableLengthFieldInfo(writer, _contentDispositionBytes.Length, ref currentVariableLengthIndex);

// CacheControl offset/length
WriteVariableLengthFieldInfo(writer, _cacheControlBytes, ref currentVariableLengthIndex);
WriteVariableLengthFieldInfo(writer, _cacheControlBytes.Length, ref currentVariableLengthIndex);

// AccessTier
writer.Write((byte)AccessTier.ToJobPlanAccessTier());

// Metadata offset/length
WriteVariableLengthFieldInfo(writer, _metadataBytes, ref currentVariableLengthIndex);
WriteVariableLengthFieldInfo(writer, _metadataBytes.Length, ref currentVariableLengthIndex);

// Tags offset/length
WriteVariableLengthFieldInfo(writer, _tagsBytes, ref currentVariableLengthIndex);
WriteVariableLengthFieldInfo(writer, _tagsBytes.Length, ref currentVariableLengthIndex);

// CpkScope offset/length
WriteVariableLengthFieldInfo(writer, _cpkScopeBytes, ref currentVariableLengthIndex);
WriteVariableLengthFieldInfo(writer, _cpkScopeBytes.Length, ref currentVariableLengthIndex);

writer.Write(_contentTypeBytes);
writer.Write(_contentEncodingBytes);
Expand All @@ -141,7 +131,7 @@ internal static BlobDestinationCheckpointData Deserialize(Stream stream)

// Version
int version = reader.ReadInt32();
CheckSchemaVersion(version);
CheckSchemaVersion(DataMovementBlobConstants.DestinationCheckpointData.SchemaVersion, version);

// BlobType
BlobType blobType = (BlobType)reader.ReadByte();
Expand All @@ -168,7 +158,11 @@ internal static BlobDestinationCheckpointData Deserialize(Stream stream)

// AccessTier
JobPlanAccessTier jobPlanAccessTier = (JobPlanAccessTier)reader.ReadByte();
AccessTier accessTier = new AccessTier(jobPlanAccessTier.ToString());
AccessTier? accessTier = default;
if (!jobPlanAccessTier.Equals(JobPlanAccessTier.None))
{
accessTier = new AccessTier(jobPlanAccessTier.ToString());
}
jaschrep-msft marked this conversation as resolved.
Show resolved Hide resolved

// Metadata offset/length
int metadataOffset = reader.ReadInt32();
Expand Down Expand Up @@ -267,7 +261,7 @@ internal static BlobDestinationCheckpointData Deserialize(Stream stream)
private int CalculateLength()
{
// Length is fixed size fields plus length of each variable length field
int length = DataMovementBlobConstants.DestinationJobPartHeader.VariableLengthStartIndex;
int length = DataMovementBlobConstants.DestinationCheckpointData.VariableLengthStartIndex;
length += _contentTypeBytes.Length;
length += _contentEncodingBytes.Length;
length += _contentLanguageBytes.Length;
Expand All @@ -278,13 +272,5 @@ private int CalculateLength()
length += _cpkScopeBytes.Length;
return length;
}

private static void CheckSchemaVersion(int version)
{
if (version != DataMovementBlobConstants.DestinationJobPartHeader.SchemaVersion)
{
throw Errors.UnsupportedJobSchemaVersionHeader(version.ToString());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,45 @@
// Licensed under the MIT License.

using System.IO;
using Azure.Core;
using Azure.Storage.Blobs.Models;

namespace Azure.Storage.DataMovement.Blobs
{
internal class BlobSourceCheckpointData : StorageResourceCheckpointData
internal class BlobSourceCheckpointData : BlobCheckpointData
{
public override int Length => 0;
public BlobSourceCheckpointData(BlobType blobType)
: base(DataMovementBlobConstants.SourceCheckpointData.SchemaVersion, blobType)
jalauzon-msft marked this conversation as resolved.
Show resolved Hide resolved
{
}

public override int Length => DataMovementBlobConstants.SourceCheckpointData.DataSize;

protected override void Serialize(Stream stream)
public override void Serialize(Stream stream)
{
Argument.AssertNotNull(stream, nameof(stream));
BinaryWriter writer = new BinaryWriter(stream);

// Version
writer.Write(Version);

// BlobType
writer.Write((byte)BlobType);
}

internal static BlobSourceCheckpointData Deserialize(Stream stream)
{
Argument.AssertNotNull(stream, nameof(stream));
BinaryReader reader = new BinaryReader(stream);

// Version
int version = reader.ReadInt32();
CheckSchemaVersion(DataMovementBlobConstants.SourceCheckpointData.SchemaVersion, version);

// BlobType
BlobType blobType = (BlobType)reader.ReadByte();

return new BlobSourceCheckpointData(blobType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,23 @@ protected override async IAsyncEnumerable<StorageResource> GetStorageResourcesAs
}
}

public override StorageResourceCheckpointData GetSourceCheckpointData()
{
// Source blob type does not matter for container
return new BlobSourceCheckpointData(BlobType.Block);
}

public override StorageResourceCheckpointData GetDestinationCheckpointData()
{
return new BlobDestinationCheckpointData(
_options?.BlobType ?? BlobType.Block,
_options?.BlobOptions?.HttpHeaders,
_options?.BlobOptions?.AccessTier,
_options?.BlobOptions?.Metadata,
_options?.BlobOptions?.Tags,
default); // TODO: Update when we support encryption scopes
}

private string ApplyOptionalPrefix(string path)
=> IsDirectory
? string.Join("/", DirectoryPrefix, path)
Expand Down
Loading
Loading