Skip to content

Commit

Permalink
Can optionally force content into the blob regardless of size (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
PureKrome authored Nov 20, 2023
1 parent 7920ea3 commit f27fab2
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,22 @@ public async Task AddMessageAsync_GivenSomeSimpleString_ShouldAddTheMessageToThe
retrievedMessage.Content.ShouldBe(message);
}

[Fact]
public async Task AddMessageAsync_GivenSomeSimpleStringAndFocedOntoTheBlob_ShouldAddTheMessageToTheBlobAndQueue()
{
// Arrange.
var cancellationToken = CancellationToken.None;
var message = "hello";

// Act.
await HybridQueue.AddMessageAsync(message, null, true, cancellationToken);

// Assert.
var retrievedMessage = await HybridQueue.GetMessageAsync<string>(cancellationToken);
retrievedMessage.ShouldNotBeNull();
retrievedMessage.Content.ShouldBe(message);
}

[Fact]
public async Task AddMessageAsync_GivenASimpleInt_ShouldAddTheMessageToTheQueue()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,27 @@ public async Task AddMessagesAsync_GivenSomeSimpleStrings_ShouldAddThemAllToTheQ
.ShouldBeEquivalentTo(messages.OrderBy(s => s));
}

[Fact]
public async Task AddMessagesAsync_GivenSomeSimpleStringsAndForcedOntoTheBlob_ShouldAddThemAllToTheBlobAndQueue()
{
// Arrange.
var cancellationToken = CancellationToken.None;
var messages = new[] { "aaa", "bbb", "ccc", "ddd" };

// Act.
await HybridQueue.AddMessagesAsync(messages, null, 10, true, cancellationToken);

// Assert.
var retrievedMessage = await HybridQueue.GetMessagesAsync<string>(cancellationToken);
retrievedMessage.ShouldNotBeNull();

retrievedMessage
.Select(hm => hm.Content)
.ToArray()
.OrderBy(s => s)
.ShouldBeEquivalentTo(messages.OrderBy(s => s));
}

[Fact]
public async Task AddMessagesAsync_GivenSomeComplexInstances_ShouldAddThemAllToTheQueue()
{
Expand Down
71 changes: 48 additions & 23 deletions src/SimpleAzure.Storage.HybridQueue/HybridQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,32 @@ public sealed class HybridQueue(
private readonly BlobContainerClient _blobContainerClient = blobContainerClient;
private readonly ILogger<HybridQueue> _logger = logger;

public async Task AddMessageAsync<T>(T item, TimeSpan? initialVisibilityDelay, CancellationToken cancellationToken)
public async Task AddMessageAsync<T>(
T item,
TimeSpan? initialVisibilityDelay,
bool isForcedOntoBlob,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(item);

using var _ = _logger.BeginCustomScope(("queueName", _queueClient.Name));
using var _ = _logger.BeginCustomScope(
("queueName", _queueClient.Name),
(nameof(isForcedOntoBlob), isForcedOntoBlob));

_logger.LogDebug("Adding a message to a Hybrid Queue.");

string message;


// Don't waste effort serializing a string. It's already in a format that's ready to go.
if (item is string stringItem)
if (!isForcedOntoBlob &&
item is string stringItem)
{
_logger.LogDebug("Item is a SimpleType: string.");
message = stringItem;
}
else if (typeof(T).IsASimpleType())
else if (!isForcedOntoBlob &&
typeof(T).IsASimpleType())
{
_logger.LogDebug("Item is a SimpleType: something other than a string.");

Expand All @@ -42,29 +51,25 @@ public async Task AddMessageAsync<T>(T item, TimeSpan? initialVisibilityDelay, C
}
else
{
// It's a complex type, so serialize this as Json.
_logger.LogDebug("Item is a ComplexType: {complexType}", item.GetType().ToString());
if (isForcedOntoBlob)
{
_logger.LogDebug("Is forced onto blob == true.");
}
else
{
_logger.LogDebug("Item is a ComplexType: {complexType}", item.GetType().ToString());
}

message = JsonSerializer.Serialize(item);
}

// Is this item/content _too big_ for a normal queue-message?
var messageSize = Encoding.UTF8.GetByteCount(message);
if (messageSize > _queueClient.MessageMaxBytes)
var messageSize = isForcedOntoBlob
? -1 // Don't need to determine the byte count because we
: Encoding.UTF8.GetByteCount(message);
if (isForcedOntoBlob || messageSize > _queueClient.MessageMaxBytes)
{
// Yes - yes it is. Too big.
// So lets store the content in Blob.
// Then get the BlobId
// Then store the BlobId GUID in the queue message.

_logger.LogDebug("Item is too large to fit into a queue. Storing into a blob then a queue. Item size: {itemSize}", messageSize);

var blobId = Guid.NewGuid().ToString(); // Unique Name/Identifier of this blob item.
var binaryData = new BinaryData(message);
await _blobContainerClient.UploadBlobAsync(blobId, binaryData, cancellationToken).ConfigureAwait(false);

message = blobId;

_logger.LogDebug("Item added to blob. BlobId: {blobId}. Status: {responseStatus}", blobId);
message = await AddMessageToStoreThenQueueAsync(message, messageSize, cancellationToken).ConfigureAwait(false);
}

await _queueClient.SendMessageAsync(
Expand All @@ -80,6 +85,7 @@ public async Task AddMessagesAsync<T>(
IEnumerable<T> contents,
TimeSpan? initialVisibilityDelay,
int batchSize,
bool isForcedOntoBlob,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(contents);
Expand All @@ -96,7 +102,7 @@ public async Task AddMessagesAsync<T>(
// Lets batch up these messages to make sure the awaiting of all the tasks doesn't go too crazy.
foreach (var batch in contents.Chunk(batchSize))
{
var tasks = batch.Select(content => AddMessageAsync(content, initialVisibilityDelay, cancellationToken));
var tasks = batch.Select(content => AddMessageAsync(content, initialVisibilityDelay, isForcedOntoBlob, cancellationToken));

// Execute this batch.
await Task.WhenAll(tasks).ConfigureAwait(false);
Expand Down Expand Up @@ -176,6 +182,25 @@ public async Task<IReadOnlyList<HybridMessage<T>>> GetMessagesAsync<T>(
return hybridMessages;
}

private async Task<string> AddMessageToStoreThenQueueAsync(string message, int messageSize, CancellationToken cancellationToken)
{
// Yes - yes it is. Too big.
// So lets store the content in Blob.
// Then get the BlobId
// Then store the BlobId GUID in the queue message.

_logger.LogDebug("Item is too large to fit into a queue. Storing into a blob then a queue. Item size: {itemSize}", messageSize);

var blobId = Guid.NewGuid().ToString(); // Unique Name/Identifier of this blob item.
var binaryData = new BinaryData(message);
await _blobContainerClient.UploadBlobAsync(blobId, binaryData, cancellationToken).ConfigureAwait(false);

message = blobId;

_logger.LogDebug("Item added to blob. BlobId: {blobId}.", blobId);
return message;
}

private async Task<HybridMessage<T>> ParseMessageAsync<T>(QueueMessage queueMessage, CancellationToken cancellationToken)
{
var message = queueMessage.Body.ToString().AssumeNotNull();
Expand Down
10 changes: 7 additions & 3 deletions src/SimpleAzure.Storage.HybridQueue/IHybridQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,38 @@ namespace WorldDomination.SimpleAzure.Storage.HybridQueues;
public interface IHybridQueue
{
/// <summary>
/// Initiates an asynchronous operation to add an item to the queue.
/// Initiates an asynchronous operation to add an item to the queue and potentially the backing blob.
/// </summary>
/// <typeparam name="T">Type of item.</typeparam>
/// <param name="item">An item to add to the queue.</param>
/// <param name="initialVisibilityDelay">How long to initially hide the message.</param>
/// <param name="isForcedOntoBlob">The item is placed on the blob regardless of the item's size.</param>
/// <param name="cancellationToken">A System.Threading.CancellationToken to observe while waiting for a task to complete.</param>
/// <returns>A System.Threading.Tasks.Task object that represents the asynchronous operation.</returns>
/// <remarks>If the item is a IsPrimitive (int, etc) or a string then it's stored -as is-. Otherwise, it is serialized to Json and then stored as Json.(</remarks>
Task AddMessageAsync<T>(
T item,
TimeSpan? initialVisibilityDelay,
bool isForcedOntoBlob,
CancellationToken cancellationToken);


/// <summary>
/// Initiates an asynchronous operation to add a batch messages to the queue.
/// Initiates an asynchronous operation to add a batch of messages to the queue and potentially the backing blob.
/// </summary>
/// <typeparam name="T">Type of item.</typeparam>
/// <param name="contents">Collection of content to add to the queue.</param>
/// <param name="initialVisibilityDelay">How long to initially hide the message.</param>
/// <param name="batchSize">Number of messages per batch, to store as one parallel execution.</param>
/// <param name="isForcedOntoBlob">The item is placed on the blob regardless of the item's size.</param>
/// <param name="cancellationToken">A System.Threading.CancellationToken to observe while waiting for a task to complete.</param>
/// <returns>A System.Threading.Tasks.Task object that represents the asynchronous operation.</returns>
/// <remarks>If any item is a IsPrimitive (int, etc) or a string then it's stored -as is-. Otherwise, it is serialized to Json and then stored as Json.(</remarks>
Task AddMessagesAsync<T>(
IEnumerable<T> contents,
TimeSpan? initialVisibilityDelay,
int batchSize,
bool isForcedOntoBlob,
CancellationToken cancellationToken);

/// <summary>
Expand All @@ -42,7 +46,7 @@ Task AddMessagesAsync<T>(
Task DeleteMessageAsync<T>(HybridMessage<T> hybridMessage, CancellationToken cancellationToken);

/// <summary>
/// Retrieves a message from a queue and wraps it in a simple Message class.
/// Retrieves a message from a queue and potentially the backing blob. It will then wrap it in a simple Message class.
/// </summary>
/// <typeparam name="T">Type of item.</typeparam>
/// <param name="visibilityTimeout">A System.TimeSpan specifying the visibility timeout interval.</param>
Expand Down
12 changes: 6 additions & 6 deletions src/SimpleAzure.Storage.HybridQueue/IHybridQueueExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@ namespace WorldDomination.SimpleAzure.Storage.HybridQueues;
public static class IHybridQueueExtensions
{
/// <summary>
/// Initiates an asynchronous operation to add an item to the queue.
/// Initiates an asynchronous operation to add an item to the queue and potentially the backing blob.
/// </summary>
/// <typeparam name="T">Type of item.</typeparam>
/// <param name="queue">The queue to operate on</param>
/// <param name="queue">The queue to operate on.</param>
/// <param name="item">An item to add to the queue.</param>
/// <param name="cancellationToken">A System.Threading.CancellationToken to observe while waiting for a task to complete.</param>
/// <returns>A System.Threading.Tasks.Task object that represents the asynchronous operation.</returns>
/// <remarks>If the item is a IsPrimitive (int, etc) or a string then it's stored -as is-. Otherwise, it is serialized to Json and then stored as Json.(</remarks>
public static Task AddMessageAsync<T>(this IHybridQueue queue, T item, CancellationToken cancellationToken) =>
queue.AddMessageAsync(item, null, cancellationToken);
queue.AddMessageAsync(item, null, false, cancellationToken);

/// <summary>
/// Initiates an asynchronous operation to add a batch messages to the queue.
/// Initiates an asynchronous operation to add a batch of messages to the queue and potentially the backing blob.
/// </summary>
/// <typeparam name="T">Type of item.</typeparam>
/// <param name="queue">The queue to operate on</param>
Expand All @@ -24,10 +24,10 @@ public static Task AddMessageAsync<T>(this IHybridQueue queue, T item, Cancellat
/// <returns>A System.Threading.Tasks.Task object that represents the asynchronous operation.</returns>
/// <remarks>If any item is a IsPrimitive (int, etc) or a string then it's stored -as is-. Otherwise, it is serialized to Json and then stored as Json.(</remarks>
public static Task AddMessagesAsync<T>(this IHybridQueue queue, IEnumerable<T> contents, CancellationToken cancellationToken) =>
queue.AddMessagesAsync(contents, null, 25, cancellationToken);
queue.AddMessagesAsync(contents, null, 25, false, cancellationToken);

/// <summary>
/// Retrieves a message from a queue and wraps it in a simple Message class.
/// Retrieves a message from a queue and potentially the backing blob. It will then wrap it in a simple Message class.
/// </summary>
/// <typeparam name="T">Type of item.</typeparam>
/// <param name="queue">The queue to operate on</param>
Expand Down

0 comments on commit f27fab2

Please sign in to comment.