Skip to content

Commit

Permalink
Increase the backoff strategy time (#345)
Browse files Browse the repository at this point in the history
* Increase the backoff strategy time

* The back-off reconnect strategy is increased sensibly.
  The previous value was too aggressive.
  It caused a lot of tentative and a lot of server requests.
  The TCP port can be ready during the restart, but the stream cannot be ready due to the sync.

* Introduce a random delay on the strategy to avoid having the same
  reconnection time in case the client has more producers and consumers.

* Introduce also a random delay on the disconnected part on disconnect for the same above reason.

---------

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
  • Loading branch information
Gsantomaggio authored Jan 19, 2024
1 parent 083ff20 commit fa8f66b
Show file tree
Hide file tree
Showing 15 changed files with 122 additions and 62 deletions.
9 changes: 9 additions & 0 deletions RabbitMQ.Stream.Client/ClientExceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ internal static bool IsStreamNotAvailable(Exception exception)
return exception is CreateException { ResponseCode: ResponseCode.StreamNotAvailable };
}

internal static void CheckLeader(StreamInfo metaStreamInfo)
{
if (metaStreamInfo.Leader.Equals(default(Broker)))
{
throw new LeaderNotFoundException(
$"No leader found for streams {string.Join(" ", metaStreamInfo.Stream)}");
}
}

public static void MaybeThrowException(ResponseCode responseCode, string message)
{
if (responseCode is ResponseCode.Ok)
Expand Down
11 changes: 6 additions & 5 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,6 @@ RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.ConnectionClosedHandler.get
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.ConnectionClosedHandler.set -> void
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void
RabbitMQ.Stream.Client.Reconnect.IReconnectStrategy
RabbitMQ.Stream.Client.Reconnect.IReconnectStrategy.WhenConnected(string itemIdentifier) -> System.Threading.Tasks.ValueTask
RabbitMQ.Stream.Client.Reconnect.IReconnectStrategy.WhenDisconnected(string itemIdentifier) -> System.Threading.Tasks.ValueTask<bool>
RabbitMQ.Stream.Client.Reliable.Consumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.set -> void
Expand All @@ -199,6 +196,9 @@ RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.IsOpen() -> bool
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Send(ulong publishing, RabbitMQ.Stream.Client.Message message) -> System.Threading.Tasks.ValueTask
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig.DeduplicatingProducerConfig(RabbitMQ.Stream.Client.StreamSystem streamSystem, string stream, string reference) -> void
RabbitMQ.Stream.Client.Reliable.IReconnectStrategy
RabbitMQ.Stream.Client.Reliable.IReconnectStrategy.WhenConnected(string itemIdentifier) -> System.Threading.Tasks.ValueTask
RabbitMQ.Stream.Client.Reliable.IReconnectStrategy.WhenDisconnected(string itemIdentifier) -> System.Threading.Tasks.ValueTask<bool>
RabbitMQ.Stream.Client.Reliable.Producer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.set -> void
Expand All @@ -208,8 +208,8 @@ RabbitMQ.Stream.Client.Reliable.ProducerFactory._producer -> RabbitMQ.Stream.Cli
RabbitMQ.Stream.Client.Reliable.ReliableBase.CompareStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus toTest) -> bool
RabbitMQ.Stream.Client.Reliable.ReliableBase.UpdateStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus status) -> void
RabbitMQ.Stream.Client.Reliable.ReliableBase._status -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
RabbitMQ.Stream.Client.Reliable.ReliableConfig.ReconnectStrategy.get -> RabbitMQ.Stream.Client.Reconnect.IReconnectStrategy
RabbitMQ.Stream.Client.Reliable.ReliableConfig.ResourceAvailableReconnectStrategy.get -> RabbitMQ.Stream.Client.Reconnect.IReconnectStrategy
RabbitMQ.Stream.Client.Reliable.ReliableConfig.ReconnectStrategy.get -> RabbitMQ.Stream.Client.Reliable.IReconnectStrategy
RabbitMQ.Stream.Client.Reliable.ReliableConfig.ResourceAvailableReconnectStrategy.get -> RabbitMQ.Stream.Client.Reliable.IReconnectStrategy
RabbitMQ.Stream.Client.Reliable.ReliableConfig.ResourceAvailableReconnectStrategy.set -> void
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.Closed = 3 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
Expand Down Expand Up @@ -264,5 +264,6 @@ static RabbitMQ.Stream.Client.RawProducer.Create(RabbitMQ.Stream.Client.ClientPa
static RabbitMQ.Stream.Client.RawSuperStreamConsumer.Create(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, System.Collections.Generic.IDictionary<string, RabbitMQ.Stream.Client.StreamInfo> streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.ISuperStreamConsumer
static RabbitMQ.Stream.Client.RawSuperStreamProducer.Create(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, System.Collections.Generic.IDictionary<string, RabbitMQ.Stream.Client.StreamInfo> streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.ISuperStreamProducer
static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.Reliable.Producer> logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer>
static RabbitMQ.Stream.Client.Reliable.ReliableBase.RandomWait() -> System.Threading.Tasks.Task
static RabbitMQ.Stream.Client.RoutingHelper<T>.LookupLeaderConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>
static RabbitMQ.Stream.Client.RoutingHelper<T>.LookupRandomConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IClient>
5 changes: 2 additions & 3 deletions RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ private RawConsumerConfig FromStreamConfig(string stream)
}

consumer?.Dispose();
_streamInfos.Remove(stream);

if (_config.ConnectionClosedHandler != null)
{
Expand All @@ -117,7 +116,6 @@ private RawConsumerConfig FromStreamConfig(string stream)
{
_consumers.TryRemove(update.Stream, out var consumer);
consumer?.Close();
_streamInfos.Remove(update.Stream);
if (_config.MetadataHandler != null)
{
await _config.MetadataHandler(update).ConfigureAwait(false);
Expand Down Expand Up @@ -159,12 +157,13 @@ private async Task StartConsumers()

public async Task ReconnectPartition(StreamInfo streamInfo)
{
ClientExceptions.CheckLeader(streamInfo);
await _semaphoreSlim.WaitAsync().ConfigureAwait(false);
try
{
_consumers.TryRemove(streamInfo.Stream, out var consumer);
consumer?.Dispose();
_streamInfos.TryAdd(streamInfo.Stream, streamInfo); // add the new stream infos
_streamInfos[streamInfo.Stream] = streamInfo;
await MaybeAddConsumer(streamInfo.Stream).ConfigureAwait(false);
}
finally
Expand Down
1 change: 1 addition & 0 deletions RabbitMQ.Stream.Client/RawSuperStreamProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ private async Task<IProducer> MaybeAddAndGetProducer(string stream)

public async Task ReconnectPartition(StreamInfo streamInfo)
{
ClientExceptions.CheckLeader(streamInfo);
await _semaphoreSlim.WaitAsync().ConfigureAwait(false);
try
{
Expand Down
1 change: 0 additions & 1 deletion RabbitMQ.Stream.Client/Reliable/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using RabbitMQ.Stream.Client.Reconnect;

namespace RabbitMQ.Stream.Client.Reliable;

Expand Down
2 changes: 2 additions & 0 deletions RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ private async Task<IConsumer> SuperConsumer(bool boot)
OffsetSpec = offsetSpecs,
ConnectionClosedHandler = async (closeReason, partitionStream) =>
{
await RandomWait().ConfigureAwait(false);
if (closeReason == ConnectionClosedReason.Normal)
{
BaseLogger.LogInformation("{Identity} is closed normally", ToString());
Expand All @@ -134,6 +135,7 @@ await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r)
},
MetadataHandler = async update =>
{
await RandomWait().ConfigureAwait(false);
var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r)
.ConfigureAwait(false);
Expand Down
24 changes: 6 additions & 18 deletions RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

namespace RabbitMQ.Stream.Client.Reconnect;
namespace RabbitMQ.Stream.Client.Reliable;

/// <summary>
/// IReconnectStrategy is the interface to reconnect the TCP client
Expand Down Expand Up @@ -48,36 +48,24 @@ public BackOffReconnectStrategy(ILogger logger = null)
// else the backoff will be too long
private void MaybeResetTentatives()
{
if (Tentatives > 5)
if (Tentatives > 4)
{
Tentatives = 1;
}
}

public async ValueTask<bool> WhenDisconnected(string connectionIdentifier)
{

Tentatives <<= 1;
var next = Random.Shared.Next(Tentatives * 1000, Tentatives * 3000);
_logger.LogInformation(
"{ConnectionIdentifier} disconnected, check if reconnection needed in {ReconnectionDelayMs} ms",
connectionIdentifier,
Tentatives * 100
next
);
await Task.Delay(TimeSpan.FromMilliseconds(Tentatives * 100)).ConfigureAwait(false);
await Task.Delay(TimeSpan.FromMilliseconds(next)).ConfigureAwait(false);
MaybeResetTentatives();
return true;
// this will be in another commit
// Tentatives <<= 1;
// var next = Random.Shared.Next(Tentatives * 1000, Tentatives * 2000);
// _logger.LogInformation(
// "{ConnectionIdentifier} disconnected, check if reconnection needed in {ReconnectionDelayMs} ms",
// connectionIdentifier,
// next
// );
//
// await Task.Delay(TimeSpan.FromMilliseconds(next)).ConfigureAwait(false);
// MaybeResetTentatives();
// return true;
}

public ValueTask WhenConnected(string connectionIdentifier)
Expand Down Expand Up @@ -112,7 +100,7 @@ public async ValueTask<bool> WhenDisconnected(string resourceIdentifier)
{
Tentatives <<= 1;
_logger.LogInformation(
"{ConnectionIdentifier} resource not available, retry in {ReconnectionDelayMs} seconds",
"{ConnectionIdentifier} resource not available, retry in {ReconnectionDelayS} seconds",
resourceIdentifier,
Tentatives
);
Expand Down
1 change: 0 additions & 1 deletion RabbitMQ.Stream.Client/Reliable/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using RabbitMQ.Stream.Client.Reconnect;

namespace RabbitMQ.Stream.Client.Reliable;

Expand Down
4 changes: 4 additions & 0 deletions RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ private async Task<IProducer> SuperStreamProducer(bool boot)
Filter = _producerConfig.Filter,
ConnectionClosedHandler = async (closeReason, partitionStream) =>
{
await RandomWait().ConfigureAwait(false);
if (closeReason == ConnectionClosedReason.Normal)
{
BaseLogger.LogDebug("{Identity} is closed normally", ToString());
Expand All @@ -59,6 +60,7 @@ await OnEntityClosed(_producerConfig.StreamSystem, partitionStream, r)
},
MetadataHandler = async update =>
{
await RandomWait().ConfigureAwait(false);
var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition;
await OnEntityClosed(_producerConfig.StreamSystem, update.Stream, r)
.ConfigureAwait(false);
Expand Down Expand Up @@ -99,10 +101,12 @@ private async Task<IProducer> StandardProducer()
Filter = _producerConfig.Filter,
MetadataHandler = async _ =>
{
await RandomWait().ConfigureAwait(false);
await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream).ConfigureAwait(false);
},
ConnectionClosedHandler = async (closeReason) =>
{
await RandomWait().ConfigureAwait(false);
if (closeReason == ConnectionClosedReason.Normal)
{
BaseLogger.LogDebug("{Identity} is closed normally", ToString());
Expand Down
28 changes: 22 additions & 6 deletions RabbitMQ.Stream.Client/Reliable/ReliableBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using RabbitMQ.Stream.Client.Reconnect;

namespace RabbitMQ.Stream.Client.Reliable;

Expand Down Expand Up @@ -50,6 +49,11 @@ public abstract class ReliableBase
private readonly object _lock = new();
protected ReliableEntityStatus _status = ReliableEntityStatus.Initialization;

protected static async Task RandomWait()
{
await Task.Delay(Consts.RandomMid()).ConfigureAwait(false);
}

protected void UpdateStatus(ReliableEntityStatus status)
{
lock (_lock)
Expand Down Expand Up @@ -112,7 +116,6 @@ private async Task MaybeInit(bool boot)

reconnect = true;
LogException(e);

}

if (reconnect)
Expand All @@ -130,6 +133,7 @@ private async Task Init(bool boot)
BaseLogger.LogDebug("{Identity} is already closed. The init will be skipped", ToString());
return;
}

// each time that the client is initialized, we need to reset the status
// if we hare here it means that the entity is not open for some reason like:
// first time initialization or reconnect due of a IsAKnownException
Expand All @@ -156,7 +160,8 @@ private async Task Init(bool boot)
/// <summary>
/// When the clients receives a meta data update, it doesn't know
/// If the stream exists or not. It just knows that the stream topology has changed.
/// the method CheckIfStreamIsAvailable checks if the stream exists.
/// the method CheckIfStreamIsAvailable checks if the stream exists
/// and if the leader is available.
/// </summary>
/// <param name="stream">stream name</param>
/// <param name="system">stream system</param>
Expand All @@ -172,7 +177,16 @@ private async Task<bool> CheckIfStreamIsAvailable(string stream, StreamSystem sy
{
exists = await system.StreamExists(stream).ConfigureAwait(false);
var available = exists ? "available" : "not available";
await _resourceAvailableReconnectStrategy.WhenConnected($"{stream} is {available}")
if (exists)
{
// It is not enough to check if the stream exists
// we need to check if the stream has the leader
var streamInfo = await system.StreamInfo(stream).ConfigureAwait(false);
ClientExceptions.CheckLeader(streamInfo);
available += " and has a valid leader";
}

await _resourceAvailableReconnectStrategy.WhenConnected($"{stream} for {ToString()} is {available}")
.ConfigureAwait(false);
break;
}
Expand Down Expand Up @@ -229,7 +243,8 @@ private async Task MaybeReconnect()
}
}

private async Task MaybeReconnectPartition(StreamInfo streamInfo, string info, Func<StreamInfo, Task> reconnectPartitionFunc)
private async Task MaybeReconnectPartition(StreamInfo streamInfo, string info,
Func<StreamInfo, Task> reconnectPartitionFunc)
{
var reconnect = await _reconnectStrategy
.WhenDisconnected($"Super Stream partition: {streamInfo.Stream} for {info}").ConfigureAwait(false);
Expand Down Expand Up @@ -285,7 +300,8 @@ private void LogException(Exception exception)
/// <param name="system">Stream System</param>
/// <param name="stream">Partition Stream</param>
/// <param name="reconnectPartitionFunc">Function to reconnect the partition</param>
internal async Task OnEntityClosed(StreamSystem system, string stream, Func<StreamInfo, Task> reconnectPartitionFunc)
internal async Task OnEntityClosed(StreamSystem system, string stream,
Func<StreamInfo, Task> reconnectPartitionFunc)
{
var streamExists = false;
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
Expand Down
18 changes: 7 additions & 11 deletions RabbitMQ.Stream.Client/StreamSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,6 @@ private async Task MayBeReconnectLocator()
}
}

private static void CheckLeader(StreamInfo metaStreamInfo)
{
if (metaStreamInfo.Leader.Equals(default(Broker)))
{
throw new LeaderNotFoundException(
$"No leader found for streams {string.Join(" ", metaStreamInfo.Stream)}");
}
}

public async Task<ISuperStreamProducer> CreateRawSuperStreamProducer(
RawSuperStreamProducerConfig rawSuperStreamProducerConfig, ILogger logger = null)
{
Expand Down Expand Up @@ -197,6 +188,11 @@ public async Task<ISuperStreamProducer> CreateRawSuperStreamProducer(
streamInfos[partitionsStream] = await StreamInfo(partitionsStream).ConfigureAwait(false);
}

foreach (var (_, value) in streamInfos)
{
ClientExceptions.CheckLeader(value);
}

var r = RawSuperStreamProducer.Create(rawSuperStreamProducerConfig,
streamInfos,
_clientParameters with { ClientProvidedName = rawSuperStreamProducerConfig.ClientProvidedName },
Expand Down Expand Up @@ -273,7 +269,7 @@ public async Task<IProducer> CreateRawProducer(RawProducerConfig rawProducerConf
throw new CreateProducerException($"producer could not be created code: {metaStreamInfo.ResponseCode}");
}

CheckLeader(metaStreamInfo);
ClientExceptions.CheckLeader(metaStreamInfo);

try
{
Expand Down Expand Up @@ -432,7 +428,7 @@ public async Task<IConsumer> CreateRawConsumer(RawConsumerConfig rawConsumerConf
metaStreamInfo.ResponseCode);
}

CheckLeader(metaStreamInfo);
ClientExceptions.CheckLeader(metaStreamInfo);

try
{
Expand Down
Loading

0 comments on commit fa8f66b

Please sign in to comment.