Skip to content

Commit

Permalink
Merge pull request #90 from takenet/feature/581985-update-azure-servi…
Browse files Browse the repository at this point in the history
…cebus-lib

Migrating to Azure.Messaging.ServiceBus from Microsoft.Azure.ServiceBus
  • Loading branch information
Joaomlg authored Mar 8, 2024
2 parents 4443b2e + bb350cb commit b922666
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 50 deletions.
89 changes: 47 additions & 42 deletions src/Take.Elephant.Azure/AzureServiceBusQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Core;
using Microsoft.Azure.ServiceBus.Management;
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;

namespace Take.Elephant.Azure
{
Expand All @@ -16,61 +15,70 @@ public class AzureServiceBusQueue<T> : IBlockingQueue<T>, IBatchSenderQueue<T>,

private readonly string _entityPath;
private readonly ISerializer<T> _serializer;
private QueueDescription _queueDescription;
private readonly ReceiveMode _receiveMode;
private readonly MessageSender _messageSender;
private readonly MessageReceiver _messageReceiver;
private readonly ManagementClient _managementClient;
private CreateQueueOptions _queueOptions;
private readonly ServiceBusReceiveMode _receiveMode;
private readonly ServiceBusClient _client;
private readonly ServiceBusSender _messageSender;
private readonly ServiceBusReceiver _messageReceiver;
private readonly ServiceBusAdministrationClient _administrationClient;
private readonly SemaphoreSlim _queueCreationSemaphore;
private bool _queueExists;

public AzureServiceBusQueue(
string connectionString,
string entityPath,
ISerializer<T> serializer,
ReceiveMode receiveMode = ReceiveMode.PeekLock,
RetryPolicy retryPolicy = null,
ServiceBusReceiveMode receiveMode = ServiceBusReceiveMode.PeekLock,
ServiceBusRetryOptions retryOptions = null,
int receiverPrefetchCount = 0,
QueueDescription queueDescription = null)
CreateQueueOptions queueOptions = null)
{
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
_entityPath = entityPath;
_receiveMode = receiveMode;
_messageSender = new MessageSender(connectionString, entityPath, retryPolicy);
_messageReceiver = new MessageReceiver(connectionString, entityPath, _receiveMode, retryPolicy, receiverPrefetchCount);
_managementClient = new ManagementClient(connectionString);
_client = new ServiceBusClient(connectionString, new ServiceBusClientOptions
{
RetryOptions = retryOptions ?? new ServiceBusRetryOptions(),
});
_messageSender = _client.CreateSender(entityPath);
_messageReceiver = _client.CreateReceiver(entityPath, new ServiceBusReceiverOptions
{
ReceiveMode = _receiveMode,
PrefetchCount = receiverPrefetchCount
});
_administrationClient = new ServiceBusAdministrationClient(connectionString);
_queueCreationSemaphore = new SemaphoreSlim(1, 1);

if (queueDescription != null)
if (queueOptions != null)
{
queueDescription.Path = _entityPath;
_queueDescription = queueDescription;
queueOptions.Name = _entityPath;
_queueOptions = queueOptions;
}
else
{
_queueDescription = new QueueDescription(_entityPath);
_queueOptions = new CreateQueueOptions(_entityPath);
}
}

public virtual async Task EnqueueAsync(T item, CancellationToken cancellationToken = default)
{
await CreateQueueIfNotExistsAsync(cancellationToken);
var message = CreateMessage(item);
await _messageSender.SendAsync(message);
await _messageSender.SendMessageAsync(message, cancellationToken);
}

public virtual async Task EnqueueBatchAsync(IEnumerable<T> items, CancellationToken cancellationToken = default)
{
await CreateQueueIfNotExistsAsync(cancellationToken);
var batch = new List<Message>();
var batch = new List<ServiceBusMessage>();

foreach (var item in items)
{
var message = CreateMessage(item);
batch.Add(message);
}

await _messageSender.SendAsync(batch);
await _messageSender.SendMessagesAsync(batch, cancellationToken);
}

public virtual async Task<T> DequeueOrDefaultAsync(CancellationToken cancellationToken = default)
Expand All @@ -79,14 +87,13 @@ public virtual async Task<T> DequeueOrDefaultAsync(CancellationToken cancellatio

try
{
var message = await _messageReceiver.ReceiveAsync(
TimeSpan.FromMilliseconds(MIN_RECEIVE_TIMEOUT));
var message = await _messageReceiver.ReceiveMessageAsync(TimeSpan.FromMilliseconds(MIN_RECEIVE_TIMEOUT), cancellationToken);
if (message != null)
{
return await CreateItemAndCompleteAsync(message);
return await CreateItemAndCompleteAsync(message, cancellationToken);
}
}
catch (ServiceBusTimeoutException) { }
catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.ServiceTimeout) { }

return default;
}
Expand All @@ -109,14 +116,13 @@ public virtual async Task<T> DequeueAsync(CancellationToken cancellationToken)
timeout = MAX_RECEIVE_TIMEOUT;
}

var message = await _messageReceiver.ReceiveAsync(
TimeSpan.FromMilliseconds(timeout));
var message = await _messageReceiver.ReceiveMessageAsync(TimeSpan.FromMilliseconds(timeout), cancellationToken);
if (message != null)
{
return await CreateItemAndCompleteAsync(message);
return await CreateItemAndCompleteAsync(message, cancellationToken);
}
}
catch (ServiceBusTimeoutException) { }
catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.ServiceTimeout) { }

tryCount++;
}
Expand All @@ -125,9 +131,9 @@ public virtual async Task<T> DequeueAsync(CancellationToken cancellationToken)
public virtual async Task<long> GetLengthAsync(CancellationToken cancellationToken = default)
{
await CreateQueueIfNotExistsAsync(cancellationToken);

var queueRuntimeInfo = await _managementClient.GetQueueRuntimeInfoAsync(_entityPath, cancellationToken);
return queueRuntimeInfo.MessageCount;
var queueRuntimeProperties = await _administrationClient.GetQueueRuntimePropertiesAsync(_entityPath, cancellationToken);
return queueRuntimeProperties.Value.TotalMessageCount;
}

public virtual async Task CloseAsync(CancellationToken cancellationToken)
Expand All @@ -136,8 +142,7 @@ public virtual async Task CloseAsync(CancellationToken cancellationToken)

await Task.WhenAll(
_messageSender.CloseAsync(),
_messageReceiver.CloseAsync(),
_managementClient.CloseAsync());
_messageReceiver.CloseAsync());
}

private async Task CreateQueueIfNotExistsAsync(CancellationToken cancellationToken = default(CancellationToken))
Expand All @@ -149,15 +154,15 @@ await Task.WhenAll(
{
if (_queueExists) return;

_queueExists = await _managementClient.QueueExistsAsync(_entityPath, cancellationToken);
_queueExists = await _administrationClient.QueueExistsAsync(_entityPath, cancellationToken);

if (!_queueExists)
{
try
{
_queueDescription = await _managementClient.CreateQueueAsync(_queueDescription, cancellationToken);
await _administrationClient.CreateQueueAsync(_queueOptions, cancellationToken);
}
catch (MessagingEntityAlreadyExistsException)
catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessagingEntityAlreadyExists)
{
// Concurrency creation handling
}
Expand All @@ -171,19 +176,19 @@ await Task.WhenAll(
}
}

private Message CreateMessage(T item)
private ServiceBusMessage CreateMessage(T item)
{
var serializedItem = _serializer.Serialize(item);
return new Message(Encoding.UTF8.GetBytes(serializedItem));
return new ServiceBusMessage(Encoding.UTF8.GetBytes(serializedItem));
}

private async Task<T> CreateItemAndCompleteAsync(Message message)
private async Task<T> CreateItemAndCompleteAsync(ServiceBusReceivedMessage message, CancellationToken cancellationToken)
{
var serializedItem = Encoding.UTF8.GetString(message.Body);
var item = _serializer.Deserialize(serializedItem);
if (_receiveMode == ReceiveMode.PeekLock)
if (_receiveMode == ServiceBusReceiveMode.PeekLock)
{
await _messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
await _messageReceiver.CompleteMessageAsync(message, cancellationToken);
}
return item;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Take.Elephant.Azure/Take.Elephant.Azure.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.17.4" />
<PackageReference Include="Azure.Storage.Queues" Version="12.12.0" />
<PackageReference Include="Dawn.Guard" Version="1.6.0" />
<PackageReference Include="Microsoft.Azure.EventHubs" Version="3.0.0" />
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="5.2.0" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.9.0" />
</ItemGroup>
<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using Microsoft.Azure.ServiceBus.Management;
using System.Threading.Tasks;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus.Administration;
using Take.Elephant.Azure;
using Take.Elephant.Tests.Redis;
using Xunit;
Expand All @@ -26,14 +26,12 @@ public override IQueue<Item> Create()

private async Task DeleteQueueAsync(string connectionString, string path)
{
var managementClient = new ManagementClient(connectionString);
var administrationClient = new ServiceBusAdministrationClient(connectionString);

if (await managementClient.QueueExistsAsync(path))
if (await administrationClient.QueueExistsAsync(path))
{
await managementClient.DeleteQueueAsync(path);
await administrationClient.DeleteQueueAsync(path);
}

await managementClient.CloseAsync();
}
}
}

0 comments on commit b922666

Please sign in to comment.