Skip to content

Commit

Permalink
New event to handle Metadata update (#332)
Browse files Browse the repository at this point in the history
With this PR #328 the client can handle multi-producers and consumers per connection.

This PR removes MetadataHandler and introduces OnMetadataUpdate event.

The event can handle multiple Metadata updates coming from the server. Metadata update is raised when a stream is deleted, or a replica is removed.

The server automatically removes the producers and consumers linked to the connection, here we need to remove these entities from the internal pool to be consistent.

- Refactor RawConsumer and RawProducer. Remove duplication code. Move the common code to the AbstractEntity Class

---------

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
  • Loading branch information
Gsantomaggio authored Dec 18, 2023
1 parent 4cefb84 commit d5cdce4
Show file tree
Hide file tree
Showing 14 changed files with 463 additions and 242 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/build-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ jobs:
key: ${{ runner.os }}-v2-nuget-${{ hashFiles('**/*.csproj') }}
restore-keys: |
${{ runner.os }}-v2-nuget-
- name: Wait RabbitMQ is Up
run: docker exec ${{ job.services.rabbitmq.id }} rabbitmqctl wait --pid 1 --timeout 60
- name: Enable RabbitMQ Plugins
run: docker exec ${{ job.services.rabbitmq.id }} rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management rabbitmq_amqp1_0
- name: Restore
Expand Down
71 changes: 63 additions & 8 deletions RabbitMQ.Stream.Client/AbstractEntity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,87 @@
namespace RabbitMQ.Stream.Client
{

public abstract record EntityCommonConfig
{
internal ConnectionsPool Pool { get; set; }
}

internal enum EntityStatus
{
Open,
Closed,
Disposed
}
public abstract class AbstractEntity

public interface IClosable
{
public Task<ResponseCode> Close();
}

public abstract class AbstractEntity : IClosable
{
private readonly CancellationTokenSource _cancelTokenSource = new();
protected CancellationToken Token => _cancelTokenSource.Token;

protected ILogger Logger { get; init; }
internal EntityStatus _status = EntityStatus.Closed;

protected byte EntityId { get; set; }
protected abstract string GetStream();
protected abstract string DumpEntityConfiguration();

// here the _cancelTokenSource is disposed and the token is cancelled
// in producer is used to cancel the send task
// in consumer is used to cancel the receive task
protected void MaybeCancelToken()
private void MaybeCancelToken()
{

if (!_cancelTokenSource.IsCancellationRequested)
_cancelTokenSource.Cancel();
}

public abstract Task<ResponseCode> Close();

protected void Dispose(bool disposing, string entityInfo, ILogger logger)
/// <summary>
/// Remove the producer or consumer from the server
/// </summary>
/// <param name="ignoreIfAlreadyDeleted"> In case the producer or consumer is already removed from the server.
/// ex: metadata update </param>
/// <returns></returns>
protected abstract Task<ResponseCode> DeleteEntityFromTheServer(bool ignoreIfAlreadyDeleted = false);

/// <summary>
/// Internal close method. It is called by the public Close method.
/// Set the status to closed and remove the producer or consumer from the server ( if it is not already removed )
/// Close the TCP connection if it is not already closed or it is needed.
/// </summary>
/// <param name="config">The connection pool instance</param>
/// <param name="ignoreIfAlreadyDeleted"></param>
/// <returns></returns>
protected async Task<ResponseCode> Shutdown(EntityCommonConfig config, bool ignoreIfAlreadyDeleted = false)
{
MaybeCancelToken();

if (!IsOpen()) // the client is already closed
{
return ResponseCode.Ok;
}

_status = EntityStatus.Closed;
var result = await DeleteEntityFromTheServer(ignoreIfAlreadyDeleted).ConfigureAwait(false);

if (_client is { IsClosed: true })
{
return result;
}

var closed = await _client.MaybeClose($"closing: {EntityId}",
GetStream(), config.Pool)
.ConfigureAwait(false);
ClientExceptions.MaybeThrowException(closed.ResponseCode, $"_client-close-Entity: {EntityId}");
Logger.LogDebug("{EntityInfo} is closed", DumpEntityConfiguration());
return result;
}

protected void Dispose(bool disposing)
{
if (!disposing)
{
Expand All @@ -51,12 +107,12 @@ protected void Dispose(bool disposing, string entityInfo, ILogger logger)
var closeTask = Close();
if (!closeTask.Wait(Consts.MidWait))
{
logger.LogWarning("Failed to close {EntityInfo} in time", entityInfo);
Logger?.LogWarning("Failed to close {EntityInfo} in time", DumpEntityConfiguration());
}
}
catch (Exception e)
{
logger?.LogWarning("Failed to close {EntityInfo}, error {Error} ", entityInfo, e.Message);
Logger?.LogWarning("Failed to close {EntityInfo}, error {Error} ", DumpEntityConfiguration(), e.Message);
}
finally
{
Expand All @@ -70,6 +126,5 @@ public bool IsOpen()
}

internal Client _client;

}
}
100 changes: 74 additions & 26 deletions RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ public record ClientParameters
public string VirtualHost { get; set; } = "/";
public EndPoint Endpoint { get; set; } = new IPEndPoint(IPAddress.Loopback, 5552);

public Action<MetaDataUpdate> MetadataHandler { get; set; } = _ => { };
public delegate void MetadataUpdateHandler(MetaDataUpdate update);

public event MetadataUpdateHandler OnMetadataUpdate;
public Action<Exception> UnhandledExceptionHandler { get; set; } = _ => { };
public TimeSpan Heartbeat { get; set; } = TimeSpan.FromMinutes(1);

Expand All @@ -71,6 +73,11 @@ public string ClientProvidedName
public AddressResolver AddressResolver { get; set; } = null;

public AuthMechanism AuthMechanism { get; set; } = AuthMechanism.Plain;

internal void FireMetadataUpdate(MetaDataUpdate metaDataUpdate)
{
OnMetadataUpdate?.Invoke(metaDataUpdate);
}
}

internal readonly struct OutgoingMsg : ICommand
Expand Down Expand Up @@ -213,7 +220,8 @@ await client
.ConfigureAwait(false);
logger?.LogDebug("Sasl mechanism: {Mechanisms}", saslHandshakeResponse.Mechanisms);

var isValid = saslHandshakeResponse.Mechanisms.Contains(parameters.AuthMechanism.ToString().ToUpperInvariant(),
var isValid = saslHandshakeResponse.Mechanisms.Contains(
parameters.AuthMechanism.ToString().ToUpperInvariant(),
StringComparer.OrdinalIgnoreCase);
if (!isValid)
{
Expand All @@ -225,7 +233,8 @@ await client
var authResponse =
await client
.Request<SaslAuthenticateRequest, SaslAuthenticateResponse>(corr =>
new SaslAuthenticateRequest(corr, parameters.AuthMechanism.ToString().ToUpperInvariant(), saslData))
new SaslAuthenticateRequest(corr, parameters.AuthMechanism.ToString().ToUpperInvariant(),
saslData))
.ConfigureAwait(false);
ClientExceptions.MaybeThrowException(authResponse.ResponseCode, parameters.UserName);

Expand Down Expand Up @@ -322,22 +331,28 @@ public ValueTask<bool> Publish<T>(T msg) where T : struct, ICommand
return (publisherId, response);
}

public async Task<DeletePublisherResponse> DeletePublisher(byte publisherId)
public async Task<DeletePublisherResponse> DeletePublisher(byte publisherId,
bool ignoreIfAlreadyRemoved = false)
{
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
try
{
var result =
await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
new DeletePublisherRequest(corr, publisherId)).ConfigureAwait(false);
if (!ignoreIfAlreadyRemoved)
{
var result =
await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
new DeletePublisherRequest(corr, publisherId)).ConfigureAwait(false);

return result;
return result;
}
}
finally
{
publishers.Remove(publisherId);
_poolSemaphore.Release();
}

return new DeletePublisherResponse();
}

public async Task<(byte, SubscribeResponse)> Subscribe(string stream, IOffsetType offsetType,
Expand Down Expand Up @@ -386,20 +401,24 @@ await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
return (subscriptionId, response);
}

public async Task<UnsubscribeResponse> Unsubscribe(byte subscriptionId)
public async Task<UnsubscribeResponse> Unsubscribe(byte subscriptionId, bool ignoreIfAlreadyRemoved = false)
{
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
try
{
// here we reduce a bit the timeout to avoid waiting too much
// if the client is busy with read operations it can take time to process the unsubscribe
// but the subscribe is removed.
var result =
await Request<UnsubscribeRequest, UnsubscribeResponse>(corr =>
new UnsubscribeRequest(corr, subscriptionId), TimeSpan.FromSeconds(5)).ConfigureAwait(false);
_logger.LogDebug("Unsubscribe request : {SubscriptionId}", subscriptionId);

return result;
if (!ignoreIfAlreadyRemoved)
{
// here we reduce a bit the timeout to avoid waiting too much
// if the client is busy with read operations it can take time to process the unsubscribe
// but the subscribe is removed.
var result =
await Request<UnsubscribeRequest, UnsubscribeResponse>(corr =>
new UnsubscribeRequest(corr, subscriptionId),
TimeSpan.FromSeconds(5)).ConfigureAwait(false);
_logger.LogDebug("Unsubscribe request : {SubscriptionId}", subscriptionId);

return result;
}
}
finally
{
Expand All @@ -408,6 +427,8 @@ await Request<UnsubscribeRequest, UnsubscribeResponse>(corr =>
consumers.Remove(subscriptionId);
_poolSemaphore.Release();
}

return new UnsubscribeResponse();
}

public async Task<PartitionsQueryResponse> QueryPartition(string superStream)
Expand Down Expand Up @@ -477,12 +498,25 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
case PublishConfirm.Key:
PublishConfirm.Read(frame, out var confirm);
confirmFrames += 1;
var (confirmCallback, _) = publishers[confirm.PublisherId];
confirmCallback(confirm.PublishingIds);
if (MemoryMarshal.TryGetArray(confirm.PublishingIds, out var confirmSegment))
if (publishers.TryGetValue(confirm.PublisherId, out var publisherConf))
{
if (confirmSegment.Array != null)
ArrayPool<ulong>.Shared.Return(confirmSegment.Array);
var (confirmCallback, _) = publisherConf;
confirmCallback(confirm.PublishingIds);
if (MemoryMarshal.TryGetArray(confirm.PublishingIds, out var confirmSegment))
{
if (confirmSegment.Array != null)
ArrayPool<ulong>.Shared.Return(confirmSegment.Array);
}
}
else
{
// the producer is not found, this can happen when the producer is closing
// and there are still confirmation on the wire
// we can ignore the error since the producer does not exists anymore
_logger?.LogDebug(
"Could not find stream producer {ID} or producer is closing." +
"A possible cause it that the producer was closed and the are still confirmation on the wire. ",
confirm.PublisherId);
}

break;
Expand All @@ -507,12 +541,26 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
break;
case PublishError.Key:
PublishError.Read(frame, out var error);
var (_, errorCallback) = publishers[error.PublisherId];
errorCallback(error.PublishingErrors);
if (publishers.TryGetValue(error.PublisherId, out var publisher))
{
var (_, errorCallback) = publisher;
errorCallback(error.PublishingErrors);
}
else
{
// the producer is not found, this can happen when the producer is closing
// and there are still confirmation on the wire
// we can ignore the error since the producer does not exists anymore
_logger?.LogDebug(
"Could not find stream producer {ID} or producer is closing." +
"A possible cause it that the producer was closed and the are still confirmation on the wire. ",
error.PublisherId);
}

break;
case MetaDataUpdate.Key:
MetaDataUpdate.Read(frame, out var metaDataUpdate);
Parameters.MetadataHandler(metaDataUpdate);
Parameters.FireMetadataUpdate(metaDataUpdate);
break;
case TuneResponse.Key:
TuneResponse.Read(frame, out var tuneResponse);
Expand Down
7 changes: 2 additions & 5 deletions RabbitMQ.Stream.Client/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,18 @@

namespace RabbitMQ.Stream.Client;

public interface IConsumer
public interface IConsumer : IClosable
{
public Task StoreOffset(ulong offset);
public Task<ResponseCode> Close();
public void Dispose();

public ConsumerInfo Info { get; }
}

public record IConsumerConfig : INamedEntity
public record IConsumerConfig : EntityCommonConfig, INamedEntity
{
private ushort _initialCredits = Consts.ConsumerInitialCredits;

internal ConnectionsPool Pool { get; set; }

// StoredOffsetSpec configuration it is needed to keep the offset spec.
// since the offset can be decided from the ConsumerConfig.OffsetSpec.
// and from ConsumerConfig.ConsumerUpdateListener.
Expand Down
8 changes: 2 additions & 6 deletions RabbitMQ.Stream.Client/IProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace RabbitMQ.Stream.Client;
// - Super-Stream producer
// </summary>

public interface IProducer
public interface IProducer : IClosable
{
/// <summary>
/// Send the message to the stream in asynchronous mode.
Expand Down Expand Up @@ -49,8 +49,6 @@ public interface IProducer
/// <returns></returns>
public ValueTask Send(ulong publishingId, List<Message> subEntryMessages, CompressionType compressionType);

public Task<ResponseCode> Close();

/// <summary>
/// Return the last publishing id.
/// </summary>
Expand Down Expand Up @@ -83,11 +81,9 @@ public record ProducerFilter
public Func<Message, string> FilterValue { get; set; } = null;
}

public record IProducerConfig : INamedEntity
public record IProducerConfig : EntityCommonConfig, INamedEntity
{

internal ConnectionsPool Pool { get; set; }

public string Reference { get; set; }
public int MaxInFlight { get; set; } = 1_000;
public string ClientProvidedName { get; set; } = "dotnet-stream-raw-producer";
Expand Down
Loading

0 comments on commit d5cdce4

Please sign in to comment.