Skip to content

Commit

Permalink
Random opinionated changes (#2)
Browse files Browse the repository at this point in the history
* Fix spelling mistakes

A clean error list is a happy error list

* Improve string type checking

There is no need to check if a string is a simple type, only to then check if its a string. Additionally, by capturing the result of the type check, we avoid the extra as (or cast)

* Assume nullability of primitive types

At the end of the day, what we want here is null warning suppression, but doing it with an exclamation mark can be quite hidden, and can just result in exceptions sometime later at runtime. The AssumeNotNull method is an aggressive version that throws at the site of the actual null.

Also increased the language version to be able to use nameof to refer to a parameter.

* Don't allow null messages

The system never puts nulls in, so it was a little odd that it allowed for them to be retreived. Instead, if a message can't be retreived then the whole message may as well be null.

* Use primary constructor

C# 12 is nce

* Remove extra logging message

The BeginScope method is on ILogger, not ILogger<T>, so there was no difference between this and the method below

* Use pattern matching to simplify code

In particular with Nullable<T> its much nicer to use patterns, rather than .HasValue and then .Value. There is also a minor perf improvement as .Value has to check, and throw, for nulls.

* Return array instead of IEnumerable

Task.WhenAll returns an array, but returning it as an IEnumerable meant the GetMessageAsync method has to enumerate the results no less than 4 times!

* Remove unnecessary batch size calc

Mainly I wanted to remove yet another enumeration of a collection, but also because this calculation seemed useless to me. If we want to batch into chunks of 20, and there are only 10, then surely it will produce one chunk whether the batch size is set to 10 or 20.

* Remove inheritdocs

The IDE will automatically show the doc comments from the interface member, without needing to specify inheritdoc everywhere. Also removed some whitespace I saw while checking that no, this isn't producing a documentation file.

* Tweak visibility

Sealed performs better 😛

* Use CancellationToken.None

This is the convention, and clearly communicates intent

* Move helper methods to extension methods

Default interface implementations are tempting to use as helper methods, but they are not great, as any new implementor will, at best, think they have to implement them, and at worst get compile errors. Helpers written as extension methods in the same namespace are just as discoverable, and should work regardless of implementation.

* Remove unused using

Must have crept in when rebasing

* Specify ConfigureAwait(false)

No need to force things to stay on the same thread when doing async network I/O to Azure

* Get really really opinionated

Why not go all out :D

* Don't use collection expressions

It didn't build in a codespace, so I guess this is too new.

* Copy and paste doc, rather than use inherit doc, so that there are more places to update if anything changes.

* Change array return to IReadOnlyList

Check out how awesome patterns are! Didn't have to update any of them even though the return type now has a Count property instaed of a Length property! Yay patterns!!

* Use a local for the return value
  • Loading branch information
davidwengier authored Nov 13, 2023
1 parent e4956f8 commit 5167351
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public class AddMessageAsyncTests : CustomAzuriteTestContainer
public async Task AddMessageAsync_GivenSomeSimpleString_ShouldAddTheMessageToTheQueue()
{
// Arrange.
var cancellationToken = new CancellationToken();
var cancellationToken = CancellationToken.None;
var message = "hello";

// Act.
Expand All @@ -24,7 +24,7 @@ public async Task AddMessageAsync_GivenSomeSimpleString_ShouldAddTheMessageToThe
public async Task AddMessageAsync_GivenASimpleInt_ShouldAddTheMessageToTheQueue()
{
// Arrange.
var cancellationToken = new CancellationToken();
var cancellationToken = CancellationToken.None;
var message = 1234;

// Act.
Expand All @@ -40,7 +40,7 @@ public async Task AddMessageAsync_GivenASimpleInt_ShouldAddTheMessageToTheQueue(
public async Task AddMessageAsync_GivenAComplexInstance_ShouldAddTheMessageToTheQueue()
{
// Arrange.
var cancellationToken = new CancellationToken();
var cancellationToken = CancellationToken.None;
var message = new FakeMessage(10);

// Act.
Expand All @@ -57,7 +57,7 @@ public async Task AddMessageAsync_GivenAComplexInstance_ShouldAddTheMessageToThe
public async Task AddMessageAsync_GivenALargeComplexInstance_ShouldAddTheMessageToABlogAndThenAGuidToTheQueue()
{
// Arrange.
var cancellationToken = new CancellationToken();
var cancellationToken = CancellationToken.None;
var message = new FakeMessage(QueueClient.MessageMaxBytes + 1);

// Act.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public class AddMessagesAsyncTests : CustomAzuriteTestContainer
public async Task AddMessagesAsync_GivenSomeSimpleStrings_ShouldAddThemAllToTheQueue()
{
// Arrange.
var cancellationToken = new CancellationToken();
var cancellationToken = CancellationToken.None;
var messages = new[] { "aaa", "bbb", "ccc", "ddd" };

// Act.
Expand All @@ -29,7 +29,7 @@ public async Task AddMessagesAsync_GivenSomeSimpleStrings_ShouldAddThemAllToTheQ
public async Task AddMessagesAsync_GivenSomeComplexInstances_ShouldAddThemAllToTheQueue()
{
// Arrange.
var cancellationToken = new CancellationToken();
var cancellationToken = CancellationToken.None;
var messages = new[]
{
new FakeMessage(20),
Expand Down Expand Up @@ -57,7 +57,7 @@ public async Task AddMessagesAsync_GivenSomeComplexInstances_ShouldAddThemAllToT
public async Task AddMessagesAsync_GivenSomeLargeComplexInstances_ShouldAddThemAllToTheblobAndQueue()
{
// Arrange.
var cancellationToken = new CancellationToken();
var cancellationToken = CancellationToken.None;
var messages = new[]
{
new FakeMessage(QueueClient.MessageMaxBytes + 1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ public class DeleteMessageAsyncTests : CustomAzuriteTestContainer
public async Task DeleteMessageAsync_GivenALargeComplexInstance_ShouldDeleteTheBlobItemAndQueueMessage()
{
// Arrange.
var cancellationToken = new CancellationToken();
var cancellationToken = CancellationToken.None;
var message = new FakeMessage(QueueClient.MessageMaxBytes + 1);

await HybridQueue.AddMessageAsync(message, default);
var retrievedMessage = await HybridQueue.GetMessageAsync<FakeMessage>(cancellationToken)!;
var retrievedMessage = await HybridQueue.GetMessageAsync<FakeMessage>(cancellationToken);
retrievedMessage.ShouldNotBeNull();

// Act.
await HybridQueue.DeleteMessageAsync(retrievedMessage, default);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public class GetMessagesAsyncTests : CustomAzuriteTestContainer
public async Task GetMessagesAsync_GivenAnInvalidMaxMessages_ShouldThrowAnException(int maxMessages)
{
// Arrange.
var cancellationToken = new CancellationToken();
var cancellationToken = CancellationToken.None;

// Act.
var exception = await Should.ThrowAsync<ArgumentOutOfRangeException>(HybridQueue.GetMessagesAsync<string>(
Expand Down
15 changes: 15 additions & 0 deletions src/SimpleAzure.Storage.HybridQueue/Helpers.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;

namespace WorldDomination.SimpleAzure.Storage.HybridQueues;

internal static class Helpers
Expand All @@ -13,4 +16,16 @@ internal static bool IsASimpleType(this Type type) =>
type.IsPrimitive ||
type == typeof(string) ||
type == typeof(decimal);

/// <summary>
/// Has the same intention as the null suppression operator (!) but throws an exception at the use site,
/// rather than the use site which occurs some unknown time later.
/// </summary>
internal static T AssumeNotNull<T>(
[NotNull] this T? item,
[CallerArgumentExpression(nameof(item))] string? expr = null)
where T : class
{
return item ?? throw new InvalidOperationException($"Expected '{expr}' to be non-null.");
}
}
2 changes: 1 addition & 1 deletion src/SimpleAzure.Storage.HybridQueue/HybridMessage.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
namespace WorldDomination.SimpleAzure.Storage.HybridQueues;

public record class HybridMessage<T>(T? Content, string MessageId, string PopeReceipt, Guid? BlobId);
public sealed record HybridMessage<T>(T Content, string MessageId, string PopeReceipt, Guid? BlobId);
148 changes: 56 additions & 92 deletions src/SimpleAzure.Storage.HybridQueue/HybridQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,12 @@

namespace WorldDomination.SimpleAzure.Storage.HybridQueues;

public class HybridQueue : IHybridQueue
public sealed class HybridQueue(QueueClient queueClient, BlobContainerClient blobContainerClient, ILogger<HybridQueue> logger) : IHybridQueue
{
private readonly QueueClient _queueClient;
private readonly BlobContainerClient _blobContainerClient;
private readonly ILogger<HybridQueue> _logger;
private readonly QueueClient _queueClient = queueClient;
private readonly BlobContainerClient _blobContainerClient = blobContainerClient;
private readonly ILogger<HybridQueue> _logger = logger;

/// <inheritdoc />
public HybridQueue(QueueClient queueClient, BlobContainerClient blobContainerClient, ILogger<HybridQueue> logger)
{
_queueClient = queueClient;
_blobContainerClient = blobContainerClient;
_logger = logger;
}

/// <inheritdoc />
public async Task AddMessageAsync<T>(T item, TimeSpan? initialVisibilityDelay, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(item);
Expand All @@ -33,27 +24,23 @@ public async Task AddMessageAsync<T>(T item, TimeSpan? initialVisibilityDelay, C
string message;

// Don't waste effort serializing a string. It's already in a format that's ready to go.
if (typeof(T).IsASimpleType())
if (item is string stringItem)
{
if (item is string someString)
{
_logger.LogDebug("Item is a SimpleType: string.");

message = someString; // Note: shouldn't allocate new memory. Should just be a reference to existing memory.
}
else
{
_logger.LogDebug("Item is a SimpleType: something other than a string.");
_logger.LogDebug("Item is a SimpleType: string.");
message = stringItem;
}
else if (typeof(T).IsASimpleType())
{
_logger.LogDebug("Item is a SimpleType: something other than a string.");

message = item.ToString()!;
}
// IsASimpleType ensures that item is a primitive type, or decimal, and none of them
// return null from their ToString method.
message = item.ToString().AssumeNotNull();
}
else
{
// It's a complex type, so serialize this as Json.

_logger.LogDebug("Item is a ComplexType: {complexType}", item.GetType().ToString());

message = JsonSerializer.Serialize(item);
}

Expand All @@ -70,7 +57,7 @@ public async Task AddMessageAsync<T>(T item, TimeSpan? initialVisibilityDelay, C

var blobId = Guid.NewGuid().ToString(); // Unique Name/Identifier of this blob item.
var binaryData = new BinaryData(message);
await _blobContainerClient.UploadBlobAsync(blobId, binaryData, cancellationToken);
await _blobContainerClient.UploadBlobAsync(blobId, binaryData, cancellationToken).ConfigureAwait(false);

message = blobId;

Expand All @@ -81,12 +68,11 @@ await _queueClient.SendMessageAsync(
message,
initialVisibilityDelay,
null,
cancellationToken);
cancellationToken).ConfigureAwait(false);

_logger.LogDebug("Finished adding an Item to the queue.");
}

/// <inheritdoc />
public async Task AddMessagesAsync<T>(
IEnumerable<T> contents,
TimeSpan? initialVisibilityDelay,
Expand All @@ -105,21 +91,15 @@ public async Task AddMessagesAsync<T>(
}

// Lets batch up these messages to make sure the awaiting of all the tasks doesn't go too crazy.
var contentsSize = contents.Count();
var finalBatchSize = contentsSize > batchSize
? batchSize
: contentsSize;

foreach (var batch in contents.Chunk(finalBatchSize))
foreach (var batch in contents.Chunk(batchSize))
{
var tasks = batch.Select(content => AddMessageAsync(content, initialVisibilityDelay, cancellationToken));

// Execute this batch.
await Task.WhenAll(tasks);
await Task.WhenAll(tasks).ConfigureAwait(false);
}
}

/// <inheritdoc />
public async Task DeleteMessageAsync<T>(HybridMessage<T> hybridMessage, CancellationToken cancellationToken)
{
using var _ = _logger.BeginCustomScope(
Expand All @@ -131,51 +111,42 @@ public async Task DeleteMessageAsync<T>(HybridMessage<T> hybridMessage, Cancella
_logger.LogDebug("Deleting a message.");

// We start with any blobs.
if (hybridMessage.BlobId.HasValue)
if (hybridMessage.BlobId is { } blobId)
{
_logger.LogDebug("Deleting message from Blob Container.");

var blobClient = _blobContainerClient.GetBlobClient(hybridMessage.BlobId.Value.ToString());
var blobResponse = await blobClient.DeleteIfExistsAsync(cancellationToken: cancellationToken);

var blobClient = _blobContainerClient.GetBlobClient(blobId.ToString());
var blobResponse = await blobClient.DeleteIfExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
if (!blobResponse.Value)
{
_logger.LogWarning("Failed to delete message from Blob Container.");
}
}

var queueResponse = await _queueClient.DeleteMessageAsync(hybridMessage.MessageId, hybridMessage.PopeReceipt, cancellationToken);
var queueResponse = await _queueClient.DeleteMessageAsync(hybridMessage.MessageId, hybridMessage.PopeReceipt, cancellationToken).ConfigureAwait(false);

_logger.LogDebug("Deleted a message from the queue.");
}

/// <inheritdoc />
public async Task<HybridMessage<T?>> GetMessageAsync<T>(TimeSpan? visibilityTimeout, CancellationToken cancellationToken)
public async Task<HybridMessage<T>?> GetMessageAsync<T>(TimeSpan? visibilityTimeout, CancellationToken cancellationToken)
{
var messages = await GetMessagesAsync<T>(1, visibilityTimeout, cancellationToken);
var messages = await GetMessagesAsync<T>(1, visibilityTimeout, cancellationToken).ConfigureAwait(false);

if (messages?.Any() ?? false)
return messages switch
{
if (messages.Count() > 1)
{
throw new InvalidOperationException($"Expected 1 message but received {messages.Count()} messages");
}

return messages.First();
}

return default;
[] => null,
[{ } first] => first,
_ => throw new InvalidOperationException($"Expected 1 message but received {messages.Count} messages")
};
}

/// <inheritdoc />
public async Task<IEnumerable<HybridMessage<T>>> GetMessagesAsync<T>(
public async Task<IReadOnlyList<HybridMessage<T>>> GetMessagesAsync<T>(
int maxMessages,
TimeSpan? visibilityTimeout,
CancellationToken cancellationToken)
{
// Note: Why 32? That's the limit for Azure to pop at once.
if (maxMessages < 1 ||
maxMessages > 32)
if (maxMessages is < 1 or > 32)
{
throw new ArgumentOutOfRangeException(nameof(maxMessages));
}
Expand All @@ -186,73 +157,66 @@ public async Task<IEnumerable<HybridMessage<T>>> GetMessagesAsync<T>(

_logger.LogDebug("About to receive queue message.");

var response = await _queueClient.ReceiveMessagesAsync(maxMessages, visibilityTimeout, cancellationToken);

if (response == null ||
response.Value == null)
var response = await _queueClient.ReceiveMessagesAsync(maxMessages, visibilityTimeout, cancellationToken).ConfigureAwait(false);
if (response?.Value is not { } messages)
{
_logger.LogDebug("Response was null or there were no Queue messages retrieved.");

return Enumerable.Empty<HybridMessage<T>>();
return Array.Empty<HybridMessage<T>>();
}

_logger.LogDebug("Received {} messages from queue.", response.Value.Length);

var hybridMessageTasks = response.Value.Select(x => ParseMessageAsync<T>(x, cancellationToken));
_logger.LogDebug("Received {} messages from queue.", messages.Length);

var hybridMessages = await Task.WhenAll(hybridMessageTasks);
var hybridMessageTasks = messages.Select(x => ParseMessageAsync<T>(x, cancellationToken));

var hybridMessages = await Task.WhenAll(hybridMessageTasks).ConfigureAwait(false);
return hybridMessages;
}

private async Task<HybridMessage<T>> ParseMessageAsync<T>(QueueMessage queueMessage, CancellationToken cancellationToken)
{
var message = queueMessage.Body?.ToString();

if (message == null)
{
return new HybridMessage<T>(default, queueMessage.MessageId, queueMessage.PopReceipt, null);
}
var message = queueMessage.Body.ToString().AssumeNotNull();

if (Guid.TryParse(message, out var blobId))
{
using var _ = _logger.BeginCustomScope((nameof(blobId), blobId));

_logger.LogDebug("Retreiving item via Blob Storage.");
_logger.LogDebug("Retrieving item via Blob Storage.");

// Lets grab the item from the Blob.
var blobClient = _blobContainerClient.GetBlobClient(blobId.ToString());
using var stream = await blobClient.OpenReadAsync(null, cancellationToken);
using var stream = await blobClient.OpenReadAsync(null, cancellationToken).ConfigureAwait(false);

_logger.LogDebug("About to deserialize stream for a blob item from Blob Storage.");
var blobItem = await JsonSerializer.DeserializeAsync<T>(stream, cancellationToken: cancellationToken)!;
var blobItem = await JsonSerializer.DeserializeAsync<T>(stream, cancellationToken: cancellationToken).ConfigureAwait(false);
_logger.LogDebug("Finished deserializing stream for a blob item from Blob Storage.");

var hybridMessage = new HybridMessage<T>(blobItem, queueMessage.MessageId, queueMessage.PopReceipt, blobId);
if (blobItem is null)
{
throw new InvalidOperationException($"Could not deserialize blob '{blobId}' for message '{queueMessage.MessageId}'.");
}

return hybridMessage;
return new HybridMessage<T>(blobItem, queueMessage.MessageId, queueMessage.PopReceipt, blobId);
}
else if (typeof(T).IsASimpleType())
{
_logger.LogDebug("Retreiving item: which is a simle type and not a guid/not in Blob Storage.");
_logger.LogDebug("Retrieving item: which is a simle type and not a guid/not in Blob Storage.");

// Do we have a GUID? Guid's are used to represent the blobId.
// Do we have a GUID? Guids are used to represent the blobId.
var value = (T)Convert.ChangeType(message, typeof(T));
var hybridMessage = new HybridMessage<T>(value, queueMessage.MessageId, queueMessage.PopReceipt, null);

return hybridMessage;
return new HybridMessage<T>(value, queueMessage.MessageId, queueMessage.PopReceipt, null);
}
else
{
// Complex type, so lets assume it was serialized as Json ... so now we deserialize it.
_logger.LogDebug("Retrieving a complex item: assumed as json so deserializing it.");

_logger.LogDebug("Retreiving a complex item: assumed as json so deserializing it.");

var item = JsonSerializer.Deserialize<T>(message)!;

var hybridMessage = new HybridMessage<T>(item, queueMessage.MessageId, queueMessage.PopReceipt, null);
var item = JsonSerializer.Deserialize<T>(message);
if (item is null)
{
throw new InvalidOperationException($"Could not deserialize complex type for message '{queueMessage.MessageId}'.");
}

return hybridMessage;
return new HybridMessage<T>(item, queueMessage.MessageId, queueMessage.PopReceipt, null);
}
}
}
Loading

0 comments on commit 5167351

Please sign in to comment.