Skip to content

Commit

Permalink
Improve the rawProducer and rawSuperStreamProducer status (#337)
Browse files Browse the repository at this point in the history
* Improve the rawProducer and rawSuperStreamProducer status

*   Part of #336
     the aim is to standardise the behaviour.

*  Add AlreadyClosedException when a producer or super stream producer is closed
  In the same way as AMQP does with channels

*  Producer: Flush the pending messages when closed.
* Producer: Add AlreadyClosedException when closed. So it is the same as:
  AMQP and RawProducer and RawSuperStreamProducer

* Improve IsAKnownException function with the AggregateException that can contain
  known exceptions needed to reconnect the client

* Add EntitiesStateTests to test all the status and verify that all the entities have the
  same behaviour

* Assert the AlreadyClosedException Exception

---------
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
  • Loading branch information
Gsantomaggio authored Dec 20, 2023
1 parent d5cdce4 commit cf8c927
Show file tree
Hide file tree
Showing 12 changed files with 234 additions and 74 deletions.
8 changes: 8 additions & 0 deletions RabbitMQ.Stream.Client/AbstractEntity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ public abstract class AbstractEntity : IClosable
protected abstract string GetStream();
protected abstract string DumpEntityConfiguration();

protected void ThrowIfClosed()
{
if (!IsOpen())
{
throw new AlreadyClosedException($"{DumpEntityConfiguration()} is closed.");
}
}

// here the _cancelTokenSource is disposed and the token is cancelled
// in producer is used to cancel the send task
// in consumer is used to cancel the receive task
Expand Down
39 changes: 39 additions & 0 deletions RabbitMQ.Stream.Client/ClientExceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,42 @@
// Copyright (c) 2007-2023 VMware, Inc.

using System;
using System.Linq;
using System.Net.Sockets;

namespace RabbitMQ.Stream.Client
{
internal static class ClientExceptions
{

// <summary>
/// IsAKnownException returns true if the exception is a known exception
/// We need it to reconnect when the producer/consumer.
/// - LeaderNotFoundException is a temporary exception
/// It means that the leader is not available and the client can't reconnect.
/// Especially the Producer that needs to know the leader.
/// - SocketException
/// Client is trying to connect in a not ready endpoint.
/// It is usually a temporary situation.
/// - TimeoutException
/// Network call timed out. It is often a temporary situation and we should retry.
/// In this case we can try to reconnect.
///
/// For the other kind of exception, we just throw back the exception.
//</summary>
internal static bool IsAKnownException(Exception exception)
{
if (exception is AggregateException aggregateException)
{
var x = aggregateException.InnerExceptions.Select(x =>
x.GetType() == typeof(SocketException) || x.GetType() == typeof(TimeoutException) ||
x.GetType() == typeof(LeaderNotFoundException));
return x.Any();
}

return exception is (SocketException or TimeoutException or LeaderNotFoundException);
}

public static void MaybeThrowException(ResponseCode responseCode, string message)
{
if (responseCode is ResponseCode.Ok)
Expand All @@ -27,6 +58,14 @@ public static void MaybeThrowException(ResponseCode responseCode, string message
}
}

public class AlreadyClosedException : Exception
{
public AlreadyClosedException(string s)
: base(s)
{
}
}

public class ProtocolException : Exception
{
protected ProtocolException(string s)
Expand Down
3 changes: 3 additions & 0 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ RabbitMQ.Stream.Client.AbstractEntity.IsOpen() -> bool
RabbitMQ.Stream.Client.AbstractEntity.Logger.get -> Microsoft.Extensions.Logging.ILogger
RabbitMQ.Stream.Client.AbstractEntity.Logger.init -> void
RabbitMQ.Stream.Client.AbstractEntity.Shutdown(RabbitMQ.Stream.Client.EntityCommonConfig config, bool ignoreIfAlreadyDeleted = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.ResponseCode>
RabbitMQ.Stream.Client.AbstractEntity.ThrowIfClosed() -> void
RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.CancellationToken
RabbitMQ.Stream.Client.AlreadyClosedException
RabbitMQ.Stream.Client.AlreadyClosedException.AlreadyClosedException(string s) -> void
RabbitMQ.Stream.Client.AuthMechanism
RabbitMQ.Stream.Client.AuthMechanism.External = 1 -> RabbitMQ.Stream.Client.AuthMechanism
RabbitMQ.Stream.Client.AuthMechanism.Plain = 0 -> RabbitMQ.Stream.Client.AuthMechanism
Expand Down
4 changes: 4 additions & 0 deletions RabbitMQ.Stream.Client/RawProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ private ClientParameters.MetadataUpdateHandler OnMetadataUpdate() =>
/// <param name="compressionType">No Compression, Gzip Compression. Other types are not provided by default</param>
public async ValueTask Send(ulong publishingId, List<Message> subEntryMessages, CompressionType compressionType)
{
ThrowIfClosed();
if (subEntryMessages.Count != 0)
{
await SemaphoreAwaitAsync().ConfigureAwait(false);
Expand All @@ -239,6 +240,7 @@ private async Task SemaphoreAwaitAsync()
/// <param name="messages"></param>
public async ValueTask Send(List<(ulong, Message)> messages)
{
ThrowIfClosed();
PreValidateBatch(messages);
await InternalBatchSend(messages).ConfigureAwait(false);
}
Expand Down Expand Up @@ -275,6 +277,7 @@ internal void PreValidateBatch(List<(ulong, Message)> messages)

private async Task SendMessages(List<(ulong, Message)> messages, bool clearMessagesList = true)
{
ThrowIfClosed();
if (IsFilteringEnabled)
{
await _client.Publish(new PublishFilter(EntityId, messages, _config.Filter.FilterValue,
Expand Down Expand Up @@ -322,6 +325,7 @@ public async Task<ulong> GetLastPublishingId()
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public async ValueTask Send(ulong publishingId, Message message)
{
ThrowIfClosed();
if (message.Size > _client.MaxFrameSize)
{
throw new InvalidOperationException($"Message size is to big. " +
Expand Down
11 changes: 11 additions & 0 deletions RabbitMQ.Stream.Client/RawSuperStreamProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ private async Task<IProducer> InitProducer(string stream)
return p;
}

protected void ThrowIfClosed()
{
if (!IsOpen())
{
throw new AlreadyClosedException($"Super stream {_config.SuperStream} is closed.");
}
}

private async Task<IProducer> GetProducer(string stream)
{
if (!_producers.ContainsKey(stream))
Expand Down Expand Up @@ -170,12 +178,14 @@ private async Task<IProducer> GetProducerForMessage(Message message)

public async ValueTask Send(ulong publishingId, Message message)
{
ThrowIfClosed();
var producer = await GetProducerForMessage(message).ConfigureAwait(false);
await producer.Send(publishingId, message).ConfigureAwait(false);
}

public async ValueTask Send(List<(ulong, Message)> messages)
{
ThrowIfClosed();
var aggregate = new List<(IProducer, List<(ulong, Message)>)>();

// this part is not super-optimized
Expand Down Expand Up @@ -203,6 +213,7 @@ public async ValueTask Send(List<(ulong, Message)> messages)

public async ValueTask Send(ulong publishingId, List<Message> subEntryMessages, CompressionType compressionType)
{
ThrowIfClosed();
var aggregate = new List<(IProducer, List<Message>)>();

// this part is not super-optimized
Expand Down
10 changes: 10 additions & 0 deletions RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ internal void Start()

internal void Stop()
{
FlushPendingMessages();
_invalidateTimer.Enabled = false;
_waitForConfirmationActionBlock.Complete();
}
Expand All @@ -129,6 +130,15 @@ await RemoveUnConfirmedMessage(ConfirmationStatus.ClientTimeoutError, pair.Value
}
}

private async void FlushPendingMessages()
{
foreach (var pair in _waitForConfirmation)
{
await RemoveUnConfirmedMessage(ConfirmationStatus.ClientTimeoutError, pair.Value.PublishingId, null)
.ConfigureAwait(false);
}
}

internal void AddUnConfirmedMessage(ulong publishingId, Message message)
{
AddUnConfirmedMessage(publishingId, new List<Message> { message });
Expand Down
40 changes: 36 additions & 4 deletions RabbitMQ.Stream.Client/Reliable/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ private Producer(ProducerConfig producerConfig, ILogger<Producer> logger = null)
_logger = logger ?? NullLogger<Producer>.Instance;
}

private void ThrowIfClosed()
{
if (!_isOpen)
{
throw new AlreadyClosedException("Producer is closed");
}
}

/// <summary>
/// Create a new Producer.
/// <param name="producerConfig">Producer Configuration. Where StreamSystem and Stream are mandatory.</param>
Expand Down Expand Up @@ -241,6 +249,7 @@ internal async Task<ulong> GetLastPublishingId()

internal async ValueTask SendInternal(ulong publishingId, Message message)
{
ThrowIfClosed();
_confirmationPipe.AddUnConfirmedMessage(publishingId, message);
try
{
Expand All @@ -249,9 +258,16 @@ internal async ValueTask SendInternal(ulong publishingId, Message message)
// In this case it skips the publish until
// the producer is connected. Messages are safe since are stored
// on the _waitForConfirmation list. The user will get Timeout Error
if (!(_inReconnection))
if (!_inReconnection)
{
await _producer.Send(publishingId, message).ConfigureAwait(false);
if (_producer.IsOpen())
{
await _producer.Send(publishingId, message).ConfigureAwait(false);
}
else
{
_logger?.LogDebug("The internal producer is closed. Message will be timed out");
}
}
}

Expand Down Expand Up @@ -284,14 +300,22 @@ internal async ValueTask SendInternal(ulong publishingId, Message message)
/// In case of error the messages are considered as timed out, you will receive a confirmation with the status TimedOut.
public async ValueTask Send(List<Message> messages, CompressionType compressionType)
{
ThrowIfClosed();
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
Interlocked.Increment(ref _publishingId);
_confirmationPipe.AddUnConfirmedMessage(_publishingId, messages);
try
{
if (!_inReconnection)
{
await _producer.Send(_publishingId, messages, compressionType).ConfigureAwait(false);
if (_producer.IsOpen())
{
await _producer.Send(_publishingId, messages, compressionType).ConfigureAwait(false);
}
else
{
_logger?.LogDebug("The internal producer is closed. Message will be timed out");
}
}
}

Expand Down Expand Up @@ -330,6 +354,7 @@ public override string ToString()
/// In case of error the messages are considered as timed out, you will receive a confirmation with the status TimedOut.
public async ValueTask Send(List<Message> messages)
{
ThrowIfClosed();
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
var messagesToSend = new List<(ulong, Message)>();
foreach (var message in messages)
Expand All @@ -352,7 +377,14 @@ public async ValueTask Send(List<Message> messages)
// on the _waitForConfirmation list. The user will get Timeout Error
if (!(_inReconnection))
{
await _producer.Send(messagesToSend).ConfigureAwait(false);
if (_producer.IsOpen())
{
await _producer.Send(messagesToSend).ConfigureAwait(false);
}
else
{
_logger?.LogDebug("The internal producer is closed. Message will be timed out");
}
}
}

Expand Down
29 changes: 5 additions & 24 deletions RabbitMQ.Stream.Client/Reliable/ReliableBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// Copyright (c) 2007-2023 VMware, Inc.

using System;
using System.Net.Sockets;

using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -67,7 +67,7 @@ private async Task Init(bool boot, IReconnectStrategy reconnectStrategy)

catch (Exception e)
{
reconnect = IsAKnownException(e);
reconnect = ClientExceptions.IsAKnownException(e);
LogException(e);
if (!reconnect)
{
Expand Down Expand Up @@ -143,7 +143,8 @@ internal async Task HandleMetaDataMaybeReconnect(string stream, StreamSystem sys
await Task.Delay(500).ConfigureAwait(false);
if (await system.StreamExists(stream).ConfigureAwait(false))
{
BaseLogger.LogInformation("Meta data update stream: {StreamIdentifier}. The stream still exists. Client: {Identity}",
BaseLogger.LogInformation(
"Meta data update stream: {StreamIdentifier}. The stream still exists. Client: {Identity}",
stream,
ToString()
);
Expand All @@ -164,31 +165,11 @@ internal async Task HandleMetaDataMaybeReconnect(string stream, StreamSystem sys
}
}

// <summary>
/// IsAKnownException returns true if the exception is a known exception
/// We need it to reconnect when the producer/consumer.
/// - LeaderNotFoundException is a temporary exception
/// It means that the leader is not available and the client can't reconnect.
/// Especially the Producer that needs to know the leader.
/// - SocketException
/// Client is trying to connect in a not ready endpoint.
/// It is usually a temporary situation.
/// - TimeoutException
/// Some call went in timeout. Maybe a temporary DNS problem.
/// In this case we can try to reconnect.
///
/// For the other kind of exception, we just throw back the exception.
//</summary>
internal static bool IsAKnownException(Exception exception)
{
return exception is (SocketException or TimeoutException or LeaderNotFoundException);
}

private void LogException(Exception exception)
{
const string KnownExceptionTemplate = "{Identity} trying to reconnect due to exception";
const string UnknownExceptionTemplate = "{Identity} received an exception during initialization";
if (IsAKnownException(exception))
if (ClientExceptions.IsAKnownException(exception))
{
BaseLogger.LogError(exception, KnownExceptionTemplate, ToString());
}
Expand Down
Loading

0 comments on commit cf8c927

Please sign in to comment.