Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Random opinionated changes #2

Merged
merged 20 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public class AddMessageAsyncTests : CustomAzuriteTestContainer
public async Task AddMessageAsync_GivenSomeSimpleString_ShouldAddTheMessageToTheQueue()
{
// Arrange.
var cancellationToken = new CancellationToken();
var cancellationToken = CancellationToken.None;
var message = "hello";

// Act.
Expand All @@ -24,7 +24,7 @@ public async Task AddMessageAsync_GivenSomeSimpleString_ShouldAddTheMessageToThe
public async Task AddMessageAsync_GivenASimpleInt_ShouldAddTheMessageToTheQueue()
{
// Arrange.
var cancellationToken = new CancellationToken();
var cancellationToken = CancellationToken.None;
var message = 1234;

// Act.
Expand All @@ -40,7 +40,7 @@ public async Task AddMessageAsync_GivenASimpleInt_ShouldAddTheMessageToTheQueue(
public async Task AddMessageAsync_GivenAComplexInstance_ShouldAddTheMessageToTheQueue()
{
// Arrange.
var cancellationToken = new CancellationToken();
var cancellationToken = CancellationToken.None;
var message = new FakeMessage(10);

// Act.
Expand All @@ -57,7 +57,7 @@ public async Task AddMessageAsync_GivenAComplexInstance_ShouldAddTheMessageToThe
public async Task AddMessageAsync_GivenALargeComplexInstance_ShouldAddTheMessageToABlogAndThenAGuidToTheQueue()
{
// Arrange.
var cancellationToken = new CancellationToken();
var cancellationToken = CancellationToken.None;
var message = new FakeMessage(QueueClient.MessageMaxBytes + 1);

// Act.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public class AddMessagesAsyncTests : CustomAzuriteTestContainer
public async Task AddMessagesAsync_GivenSomeSimpleStrings_ShouldAddThemAllToTheQueue()
{
// Arrange.
var cancellationToken = new CancellationToken();
var cancellationToken = CancellationToken.None;
var messages = new[] { "aaa", "bbb", "ccc", "ddd" };

// Act.
Expand All @@ -29,7 +29,7 @@ public async Task AddMessagesAsync_GivenSomeSimpleStrings_ShouldAddThemAllToTheQ
public async Task AddMessagesAsync_GivenSomeComplexInstances_ShouldAddThemAllToTheQueue()
{
// Arrange.
var cancellationToken = new CancellationToken();
var cancellationToken = CancellationToken.None;
var messages = new[]
{
new FakeMessage(20),
Expand Down Expand Up @@ -57,7 +57,7 @@ public async Task AddMessagesAsync_GivenSomeComplexInstances_ShouldAddThemAllToT
public async Task AddMessagesAsync_GivenSomeLargeComplexInstances_ShouldAddThemAllToTheblobAndQueue()
{
// Arrange.
var cancellationToken = new CancellationToken();
var cancellationToken = CancellationToken.None;
var messages = new[]
{
new FakeMessage(QueueClient.MessageMaxBytes + 1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ public class DeleteMessageAsyncTests : CustomAzuriteTestContainer
public async Task DeleteMessageAsync_GivenALargeComplexInstance_ShouldDeleteTheBlobItemAndQueueMessage()
{
// Arrange.
var cancellationToken = new CancellationToken();
var cancellationToken = CancellationToken.None;
var message = new FakeMessage(QueueClient.MessageMaxBytes + 1);

await HybridQueue.AddMessageAsync(message, default);
var retrievedMessage = await HybridQueue.GetMessageAsync<FakeMessage>(cancellationToken)!;
var retrievedMessage = await HybridQueue.GetMessageAsync<FakeMessage>(cancellationToken);
retrievedMessage.ShouldNotBeNull();

// Act.
await HybridQueue.DeleteMessageAsync(retrievedMessage, default);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public class GetMessagesAsyncTests : CustomAzuriteTestContainer
public async Task GetMessagesAsync_GivenAnInvalidMaxMessages_ShouldThrowAnException(int maxMessages)
{
// Arrange.
var cancellationToken = new CancellationToken();
var cancellationToken = CancellationToken.None;

// Act.
var exception = await Should.ThrowAsync<ArgumentOutOfRangeException>(HybridQueue.GetMessagesAsync<string>(
Expand Down
15 changes: 15 additions & 0 deletions src/SimpleAzure.Storage.HybridQueue/Helpers.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;

namespace WorldDomination.SimpleAzure.Storage.HybridQueues;

internal static class Helpers
Expand All @@ -13,4 +16,16 @@ internal static bool IsASimpleType(this Type type) =>
type.IsPrimitive ||
type == typeof(string) ||
type == typeof(decimal);

/// <summary>
/// Has the same intention as the null suppression operator (!) but throws an exception at the use site,
/// rather than the use site which occurs some unknown time later.
PureKrome marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
internal static T AssumeNotNull<T>(
[NotNull] this T? item,
[CallerArgumentExpression(nameof(item))] string? expr = null)
where T : class
{
return item ?? throw new InvalidOperationException($"Expected '{expr}' to be non-null.");
}
}
2 changes: 1 addition & 1 deletion src/SimpleAzure.Storage.HybridQueue/HybridMessage.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
namespace WorldDomination.SimpleAzure.Storage.HybridQueues;

public record class HybridMessage<T>(T? Content, string MessageId, string PopeReceipt, Guid? BlobId);
public sealed record HybridMessage<T>(T Content, string MessageId, string PopeReceipt, Guid? BlobId);
PureKrome marked this conversation as resolved.
Show resolved Hide resolved
148 changes: 56 additions & 92 deletions src/SimpleAzure.Storage.HybridQueue/HybridQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,12 @@

namespace WorldDomination.SimpleAzure.Storage.HybridQueues;

public class HybridQueue : IHybridQueue
public sealed class HybridQueue(QueueClient queueClient, BlobContainerClient blobContainerClient, ILogger<HybridQueue> logger) : IHybridQueue
{
private readonly QueueClient _queueClient;
private readonly BlobContainerClient _blobContainerClient;
private readonly ILogger<HybridQueue> _logger;
private readonly QueueClient _queueClient = queueClient;
private readonly BlobContainerClient _blobContainerClient = blobContainerClient;
private readonly ILogger<HybridQueue> _logger = logger;
PureKrome marked this conversation as resolved.
Show resolved Hide resolved

/// <inheritdoc />
PureKrome marked this conversation as resolved.
Show resolved Hide resolved
public HybridQueue(QueueClient queueClient, BlobContainerClient blobContainerClient, ILogger<HybridQueue> logger)
{
_queueClient = queueClient;
_blobContainerClient = blobContainerClient;
_logger = logger;
}

/// <inheritdoc />
public async Task AddMessageAsync<T>(T item, TimeSpan? initialVisibilityDelay, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(item);
Expand All @@ -33,27 +24,23 @@
string message;

// Don't waste effort serializing a string. It's already in a format that's ready to go.
if (typeof(T).IsASimpleType())
if (item is string stringItem)
{
if (item is string someString)
{
_logger.LogDebug("Item is a SimpleType: string.");

message = someString; // Note: shouldn't allocate new memory. Should just be a reference to existing memory.
}
else
{
_logger.LogDebug("Item is a SimpleType: something other than a string.");
_logger.LogDebug("Item is a SimpleType: string.");
message = stringItem;
}
else if (typeof(T).IsASimpleType())
{
_logger.LogDebug("Item is a SimpleType: something other than a string.");

message = item.ToString()!;
}
// IsASimpleType ensures that item is a primitive type, or decimal, and none of them
// return null from their ToString method.
message = item.ToString().AssumeNotNull();
PureKrome marked this conversation as resolved.
Show resolved Hide resolved
}
else
{
// It's a complex type, so serialize this as Json.

_logger.LogDebug("Item is a ComplexType: {complexType}", item.GetType().ToString());

message = JsonSerializer.Serialize(item);
}

Expand All @@ -70,7 +57,7 @@

var blobId = Guid.NewGuid().ToString(); // Unique Name/Identifier of this blob item.
var binaryData = new BinaryData(message);
await _blobContainerClient.UploadBlobAsync(blobId, binaryData, cancellationToken);
await _blobContainerClient.UploadBlobAsync(blobId, binaryData, cancellationToken).ConfigureAwait(false);
PureKrome marked this conversation as resolved.
Show resolved Hide resolved
PureKrome marked this conversation as resolved.
Show resolved Hide resolved

message = blobId;

Expand All @@ -81,12 +68,11 @@
message,
initialVisibilityDelay,
null,
cancellationToken);
cancellationToken).ConfigureAwait(false);

_logger.LogDebug("Finished adding an Item to the queue.");
}

/// <inheritdoc />
public async Task AddMessagesAsync<T>(
IEnumerable<T> contents,
TimeSpan? initialVisibilityDelay,
Expand All @@ -105,21 +91,15 @@
}

// Lets batch up these messages to make sure the awaiting of all the tasks doesn't go too crazy.
var contentsSize = contents.Count();
var finalBatchSize = contentsSize > batchSize
? batchSize
: contentsSize;

foreach (var batch in contents.Chunk(finalBatchSize))
foreach (var batch in contents.Chunk(batchSize))
PureKrome marked this conversation as resolved.
Show resolved Hide resolved
{
var tasks = batch.Select(content => AddMessageAsync(content, initialVisibilityDelay, cancellationToken));

// Execute this batch.
await Task.WhenAll(tasks);
await Task.WhenAll(tasks).ConfigureAwait(false);
}
}

/// <inheritdoc />
public async Task DeleteMessageAsync<T>(HybridMessage<T> hybridMessage, CancellationToken cancellationToken)
{
using var _ = _logger.BeginCustomScope(
Expand All @@ -131,51 +111,42 @@
_logger.LogDebug("Deleting a message.");

// We start with any blobs.
if (hybridMessage.BlobId.HasValue)
if (hybridMessage.BlobId is { } blobId)
PureKrome marked this conversation as resolved.
Show resolved Hide resolved
{
_logger.LogDebug("Deleting message from Blob Container.");

var blobClient = _blobContainerClient.GetBlobClient(hybridMessage.BlobId.Value.ToString());
var blobResponse = await blobClient.DeleteIfExistsAsync(cancellationToken: cancellationToken);

var blobClient = _blobContainerClient.GetBlobClient(blobId.ToString());
var blobResponse = await blobClient.DeleteIfExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
if (!blobResponse.Value)
{
_logger.LogWarning("Failed to delete message from Blob Container.");
}
}

var queueResponse = await _queueClient.DeleteMessageAsync(hybridMessage.MessageId, hybridMessage.PopeReceipt, cancellationToken);
var queueResponse = await _queueClient.DeleteMessageAsync(hybridMessage.MessageId, hybridMessage.PopeReceipt, cancellationToken).ConfigureAwait(false);

_logger.LogDebug("Deleted a message from the queue.");
}

/// <inheritdoc />
public async Task<HybridMessage<T?>> GetMessageAsync<T>(TimeSpan? visibilityTimeout, CancellationToken cancellationToken)
public async Task<HybridMessage<T>?> GetMessageAsync<T>(TimeSpan? visibilityTimeout, CancellationToken cancellationToken)
PureKrome marked this conversation as resolved.
Show resolved Hide resolved
{
var messages = await GetMessagesAsync<T>(1, visibilityTimeout, cancellationToken);
var messages = await GetMessagesAsync<T>(1, visibilityTimeout, cancellationToken).ConfigureAwait(false);

if (messages?.Any() ?? false)
return messages switch
PureKrome marked this conversation as resolved.
Show resolved Hide resolved
{
if (messages.Count() > 1)
{
throw new InvalidOperationException($"Expected 1 message but received {messages.Count()} messages");
}

return messages.First();
}

return default;
[] => null,

Check warning on line 137 in src/SimpleAzure.Storage.HybridQueue/HybridQueue.cs

View check run for this annotation

Codecov / codecov/patch

src/SimpleAzure.Storage.HybridQueue/HybridQueue.cs#L137

Added line #L137 was not covered by tests
PureKrome marked this conversation as resolved.
Show resolved Hide resolved
[{ } first] => first,
PureKrome marked this conversation as resolved.
Show resolved Hide resolved
_ => throw new InvalidOperationException($"Expected 1 message but received {messages.Count} messages")

Check warning on line 139 in src/SimpleAzure.Storage.HybridQueue/HybridQueue.cs

View check run for this annotation

Codecov / codecov/patch

src/SimpleAzure.Storage.HybridQueue/HybridQueue.cs#L139

Added line #L139 was not covered by tests
};
}

/// <inheritdoc />
public async Task<IEnumerable<HybridMessage<T>>> GetMessagesAsync<T>(
public async Task<IReadOnlyList<HybridMessage<T>>> GetMessagesAsync<T>(
int maxMessages,
TimeSpan? visibilityTimeout,
CancellationToken cancellationToken)
{
// Note: Why 32? That's the limit for Azure to pop at once.
if (maxMessages < 1 ||
maxMessages > 32)
if (maxMessages is < 1 or > 32)
PureKrome marked this conversation as resolved.
Show resolved Hide resolved
{
throw new ArgumentOutOfRangeException(nameof(maxMessages));
}
Expand All @@ -186,73 +157,66 @@

_logger.LogDebug("About to receive queue message.");

var response = await _queueClient.ReceiveMessagesAsync(maxMessages, visibilityTimeout, cancellationToken);

if (response == null ||
response.Value == null)
var response = await _queueClient.ReceiveMessagesAsync(maxMessages, visibilityTimeout, cancellationToken).ConfigureAwait(false);
if (response?.Value is not { } messages)
PureKrome marked this conversation as resolved.
Show resolved Hide resolved
{
_logger.LogDebug("Response was null or there were no Queue messages retrieved.");

return Enumerable.Empty<HybridMessage<T>>();
return Array.Empty<HybridMessage<T>>();

Check warning on line 164 in src/SimpleAzure.Storage.HybridQueue/HybridQueue.cs

View check run for this annotation

Codecov / codecov/patch

src/SimpleAzure.Storage.HybridQueue/HybridQueue.cs#L164

Added line #L164 was not covered by tests
}

_logger.LogDebug("Received {} messages from queue.", response.Value.Length);

var hybridMessageTasks = response.Value.Select(x => ParseMessageAsync<T>(x, cancellationToken));
_logger.LogDebug("Received {} messages from queue.", messages.Length);

var hybridMessages = await Task.WhenAll(hybridMessageTasks);
var hybridMessageTasks = messages.Select(x => ParseMessageAsync<T>(x, cancellationToken));

var hybridMessages = await Task.WhenAll(hybridMessageTasks).ConfigureAwait(false);
return hybridMessages;
}

private async Task<HybridMessage<T>> ParseMessageAsync<T>(QueueMessage queueMessage, CancellationToken cancellationToken)
{
var message = queueMessage.Body?.ToString();

if (message == null)
{
return new HybridMessage<T>(default, queueMessage.MessageId, queueMessage.PopReceipt, null);
}
var message = queueMessage.Body.ToString().AssumeNotNull();

if (Guid.TryParse(message, out var blobId))
{
using var _ = _logger.BeginCustomScope((nameof(blobId), blobId));

_logger.LogDebug("Retreiving item via Blob Storage.");
_logger.LogDebug("Retrieving item via Blob Storage.");
PureKrome marked this conversation as resolved.
Show resolved Hide resolved

// Lets grab the item from the Blob.
var blobClient = _blobContainerClient.GetBlobClient(blobId.ToString());
using var stream = await blobClient.OpenReadAsync(null, cancellationToken);
using var stream = await blobClient.OpenReadAsync(null, cancellationToken).ConfigureAwait(false);

_logger.LogDebug("About to deserialize stream for a blob item from Blob Storage.");
var blobItem = await JsonSerializer.DeserializeAsync<T>(stream, cancellationToken: cancellationToken)!;
var blobItem = await JsonSerializer.DeserializeAsync<T>(stream, cancellationToken: cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Finished deserializing stream for a blob item from Blob Storage.");

var hybridMessage = new HybridMessage<T>(blobItem, queueMessage.MessageId, queueMessage.PopReceipt, blobId);
if (blobItem is null)
PureKrome marked this conversation as resolved.
Show resolved Hide resolved
{
throw new InvalidOperationException($"Could not deserialize blob '{blobId}' for message '{queueMessage.MessageId}'.");

Check warning on line 195 in src/SimpleAzure.Storage.HybridQueue/HybridQueue.cs

View check run for this annotation

Codecov / codecov/patch

src/SimpleAzure.Storage.HybridQueue/HybridQueue.cs#L194-L195

Added lines #L194 - L195 were not covered by tests
}

return hybridMessage;
return new HybridMessage<T>(blobItem, queueMessage.MessageId, queueMessage.PopReceipt, blobId);
}
else if (typeof(T).IsASimpleType())
{
_logger.LogDebug("Retreiving item: which is a simle type and not a guid/not in Blob Storage.");
_logger.LogDebug("Retrieving item: which is a simle type and not a guid/not in Blob Storage.");

// Do we have a GUID? Guid's are used to represent the blobId.
// Do we have a GUID? Guids are used to represent the blobId.
var value = (T)Convert.ChangeType(message, typeof(T));
var hybridMessage = new HybridMessage<T>(value, queueMessage.MessageId, queueMessage.PopReceipt, null);

return hybridMessage;
return new HybridMessage<T>(value, queueMessage.MessageId, queueMessage.PopReceipt, null);
}
else
{
// Complex type, so lets assume it was serialized as Json ... so now we deserialize it.
_logger.LogDebug("Retrieving a complex item: assumed as json so deserializing it.");

_logger.LogDebug("Retreiving a complex item: assumed as json so deserializing it.");

var item = JsonSerializer.Deserialize<T>(message)!;

var hybridMessage = new HybridMessage<T>(item, queueMessage.MessageId, queueMessage.PopReceipt, null);
var item = JsonSerializer.Deserialize<T>(message);
if (item is null)
{
throw new InvalidOperationException($"Could not deserialize complex type for message '{queueMessage.MessageId}'.");

Check warning on line 216 in src/SimpleAzure.Storage.HybridQueue/HybridQueue.cs

View check run for this annotation

Codecov / codecov/patch

src/SimpleAzure.Storage.HybridQueue/HybridQueue.cs#L215-L216

Added lines #L215 - L216 were not covered by tests
}

return hybridMessage;
return new HybridMessage<T>(item, queueMessage.MessageId, queueMessage.PopReceipt, null);
}
}
}
Loading