Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up SafeTimer usage, replace with PeriodicTimer where possible #8953

Merged
merged 1 commit into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/Orleans.Core/Core/ClusterClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ static void ValidateSystemConfiguration(IServiceProvider serviceProvider)
/// <inheritdoc />
public async Task StartAsync(CancellationToken cancellationToken)
{
await _runtimeClient.Start(cancellationToken).ConfigureAwait(false);
await _runtimeClient.StartAsync(cancellationToken).ConfigureAwait(false);
await _clusterClientLifecycle.OnStart(cancellationToken).ConfigureAwait(false);
}

Expand All @@ -71,8 +71,7 @@ public async Task StopAsync(CancellationToken cancellationToken)
_logger.LogInformation("Client shutting down");

await _clusterClientLifecycle.OnStop(cancellationToken).ConfigureAwait(false);

_runtimeClient?.Reset();
await _runtimeClient.StopAsync(cancellationToken).WaitAsync(cancellationToken);
}
finally
{
Expand Down
6 changes: 5 additions & 1 deletion src/Orleans.Core/Core/DefaultClientServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public static void AddDefaultServices(IClientBuilder builder)

services.Add(ServiceDescriptor);

// Common services
services.AddLogging();
services.AddOptions();
services.TryAddSingleton<TimeProvider>(TimeProvider.System);

// Options logging
services.TryAddSingleton(typeof(IOptionFormatter<>), typeof(DefaultOptionsFormatter<>));
services.TryAddSingleton(typeof(IOptionFormatterResolver<>), typeof(DefaultOptionsFormatterResolver<>));
Expand All @@ -64,7 +69,6 @@ public static void AddDefaultServices(IClientBuilder builder)
services.AddFromExisting<IHostEnvironmentStatistics, OldEnvironmentStatistics>();
#pragma warning restore 618

services.AddLogging();
services.TryAddSingleton<GrainBindingsResolver>();
services.TryAddSingleton<LocalClientDetails>();
services.TryAddSingleton<OutsideRuntimeClient>();
Expand Down
4 changes: 2 additions & 2 deletions src/Orleans.Core/Messaging/ClientMessageCenter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,10 @@ private async Task EstablishInitialConnection(CancellationToken cancellationToke
}
}

public void Stop()
public async Task StopAsync(CancellationToken cancellationToken)
{
Running = false;
gatewayManager.Stop();
await gatewayManager.StopAsync(cancellationToken);
}

public void DispatchLocalMessage(Message message)
Expand Down
79 changes: 50 additions & 29 deletions src/Orleans.Core/Messaging/GatewayManager.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#nullable enable
using System;
using System.Collections.Generic;
using System.Linq;
Expand Down Expand Up @@ -27,28 +28,29 @@ internal class GatewayManager : IDisposable
private readonly ILogger logger;
private readonly ConnectionManager connectionManager;
private readonly GatewayOptions gatewayOptions;
private AsyncTaskSafeTimer gatewayRefreshTimer;
private List<SiloAddress> cachedLiveGateways;
private HashSet<SiloAddress> cachedLiveGatewaysSet;
private List<SiloAddress> knownGateways;
private readonly PeriodicTimer gatewayRefreshTimer;
private List<SiloAddress> cachedLiveGateways = [];
private HashSet<SiloAddress> cachedLiveGatewaysSet = [];
private List<SiloAddress> knownGateways = [];
private DateTime lastRefreshTime;
private int roundRobinCounter;
private bool gatewayRefreshCallInitiated;
private bool gatewayListProviderInitialized;

private readonly ILogger<SafeTimer> timerLogger;
private Task? gatewayRefreshTimerTask;

public GatewayManager(
IOptions<GatewayOptions> gatewayOptions,
IGatewayListProvider gatewayListProvider,
ILoggerFactory loggerFactory,
ConnectionManager connectionManager)
ConnectionManager connectionManager,
TimeProvider timeProvider)
{
this.gatewayOptions = gatewayOptions.Value;
this.logger = loggerFactory.CreateLogger<GatewayManager>();
this.connectionManager = connectionManager;
this.gatewayListProvider = gatewayListProvider;
this.timerLogger = loggerFactory.CreateLogger<SafeTimer>();

this.gatewayRefreshTimer = new PeriodicTimer(this.gatewayOptions.GatewayListRefreshPeriod, timeProvider);
}

public async Task StartAsync(CancellationToken cancellationToken)
Expand All @@ -59,13 +61,6 @@ public async Task StartAsync(CancellationToken cancellationToken)
gatewayListProviderInitialized = true;
}

this.gatewayRefreshTimer = new AsyncTaskSafeTimer(
this.timerLogger,
RefreshSnapshotLiveGateways_TimerCallback,
null,
this.gatewayOptions.GatewayListRefreshPeriod,
this.gatewayOptions.GatewayListRefreshPeriod);

var knownGateways = await this.gatewayListProvider.GetGateways();
if (knownGateways.Count == 0)
{
Expand All @@ -81,19 +76,28 @@ public async Task StartAsync(CancellationToken cancellationToken)
Utils.EnumerableToString(knownGateways));

this.roundRobinCounter = this.gatewayOptions.PreferredGatewayIndex >= 0 ? this.gatewayOptions.PreferredGatewayIndex : Random.Shared.Next(knownGateways.Count);
this.knownGateways = this.cachedLiveGateways = knownGateways.Select(gw => gw.ToGatewayAddress()).ToList();
var newGateways = new List<SiloAddress>();
foreach (var gatewayUri in knownGateways)
{
if (gatewayUri?.ToGatewayAddress() is { } gatewayAddress)
{
newGateways.Add(gatewayAddress);
}
}

this.knownGateways = this.cachedLiveGateways = newGateways;
this.cachedLiveGatewaysSet = new HashSet<SiloAddress>(cachedLiveGateways);
this.lastRefreshTime = DateTime.UtcNow;
this.gatewayRefreshTimerTask ??= PeriodicallyRefreshGatewaySnapshot();
}

public void Stop()
public async Task StopAsync(CancellationToken cancellationToken)
{
if (gatewayRefreshTimer != null)
gatewayRefreshTimer.Dispose();
if (gatewayRefreshTimerTask is { } task)
{
Utils.SafeExecute(gatewayRefreshTimer.Dispose, logger);
await task.WaitAsync(cancellationToken);
}

gatewayRefreshTimer = null;
}

public void MarkAsDead(SiloAddress gateway)
Expand Down Expand Up @@ -217,22 +221,39 @@ internal void ExpediteUpdateLiveGatewaysSnapshot()
{
try
{
await RefreshSnapshotLiveGateways_TimerCallback(null);
gatewayRefreshCallInitiated = false;
await RefreshGatewaySnapshot();
}
catch
finally
{
// Intentionally ignore any exceptions here.
gatewayRefreshCallInitiated = false;
}
});
}

[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")]
internal async Task RefreshSnapshotLiveGateways_TimerCallback(object context)
internal async Task PeriodicallyRefreshGatewaySnapshot()
{
await Task.Yield();

if (gatewayListProvider is null)
{
return;
}

while (await gatewayRefreshTimer.WaitForNextTickAsync())
{
await RefreshGatewaySnapshot();
}
}

private async Task RefreshGatewaySnapshot()
{
try
{
if (gatewayListProvider is null) return;
if (gatewayListProvider is null)
{
return;
}

// the listProvider.GetGateways() is not under lock.
var allGateways = await gatewayListProvider.GetGateways();
Expand All @@ -242,7 +263,7 @@ internal async Task RefreshSnapshotLiveGateways_TimerCallback(object context)
}
catch (Exception exc)
{
logger.LogError((int)ErrorCode.ProxyClient_GetGateways, exc, "Exception occurred during RefreshSnapshotLiveGateways_TimerCallback -> listProvider.GetGateways()");
logger.LogError((int)ErrorCode.ProxyClient_GetGateways, exc, "Error refreshing gateways.");
}
}

Expand Down Expand Up @@ -367,7 +388,7 @@ private async Task CloseEvictedGatewayConnections(List<SiloAddress> liveGateways

public void Dispose()
{
this.gatewayRefreshTimer?.Dispose();
this.gatewayRefreshTimer.Dispose();
}
}
}
77 changes: 50 additions & 27 deletions src/Orleans.Core/Runtime/OutsideRuntimeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ internal class OutsideRuntimeClient : IRuntimeClient, IDisposable, IClusterConne
private readonly ILoggerFactory loggerFactory;

private readonly SharedCallbackData sharedCallbackData;
private SafeTimer callbackTimer;
private readonly PeriodicTimer callbackTimer;
private Task callbackTimerTask;

public GrainAddress CurrentActivationAddress
{
get;
Expand All @@ -60,16 +62,18 @@ public OutsideRuntimeClient(
ILoggerFactory loggerFactory,
IOptions<ClientMessagingOptions> clientMessagingOptions,
MessagingTrace messagingTrace,
IServiceProvider serviceProvider)
IServiceProvider serviceProvider,
TimeProvider timeProvider)
{
TimeProvider = timeProvider;
this.ServiceProvider = serviceProvider;
_localClientDetails = localClientDetails;
this.loggerFactory = loggerFactory;
this.messagingTrace = messagingTrace;
this.logger = loggerFactory.CreateLogger<OutsideRuntimeClient>();
callbacks = new ConcurrentDictionary<CorrelationId, CallbackData>();
this.clientMessagingOptions = clientMessagingOptions.Value;

this.callbackTimer = new PeriodicTimer(TimeSpan.FromTicks(Math.Min(this.clientMessagingOptions.ResponseTimeout.Ticks, TimeSpan.FromSeconds(1).Ticks)), timeProvider);
this.sharedCallbackData = new SharedCallbackData(
msg => this.UnregisterCallback(msg.Id),
this.loggerFactory.CreateLogger<CallbackData>(),
Expand Down Expand Up @@ -104,10 +108,7 @@ internal void ConsumeServices()
this.messagingTrace,
this.loggerFactory.CreateLogger<ClientGrainContext>());

var timerLogger = this.loggerFactory.CreateLogger<SafeTimer>();
var minTicks = Math.Min(this.clientMessagingOptions.ResponseTimeout.Ticks, TimeSpan.FromSeconds(1).Ticks);
var period = TimeSpan.FromTicks(minTicks);
this.callbackTimer = new SafeTimer(timerLogger, this.OnCallbackExpiryTick, null, period, period);
this.callbackTimerTask = Task.Run(MonitorCallbackExpiry);

this.GrainReferenceRuntime = this.ServiceProvider.GetRequiredService<IGrainReferenceRuntime>();

Expand All @@ -129,7 +130,9 @@ internal void ConsumeServices()

public IServiceProvider ServiceProvider { get; private set; }

public async Task Start(CancellationToken cancellationToken)
public TimeProvider TimeProvider { get; }

public async Task StartAsync(CancellationToken cancellationToken)
{
ConsumeServices();

Expand All @@ -140,6 +143,22 @@ public async Task Start(CancellationToken cancellationToken)
logger.LogInformation((int)ErrorCode.ProxyClient_StartDone, "Started client with address {ActivationAddress} and id {ClientId}", CurrentActivationAddress.ToString(), _localClientDetails.ClientId);
}

public async Task StopAsync(CancellationToken cancellationToken)
{
this.callbackTimer.Dispose();
if (this.callbackTimerTask is { } task)
{
await task.WaitAsync(cancellationToken);
}

if (MessageCenter is { } messageCenter)
{
await messageCenter.StopAsync(cancellationToken);
}

ConstructorReset();
}

// used for testing to (carefully!) allow two clients in the same process
private async Task StartInternal(CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -313,18 +332,6 @@ private void UnregisterCallback(CorrelationId id)
callbacks.TryRemove(id, out _);
}

public void Reset()
{
Utils.SafeExecute(() =>
{
if (MessageCenter != null)
{
MessageCenter.Stop();
}
}, logger, "Client.Stop-Transport");
ConstructorReset();
}

private void ConstructorReset()
{
Utils.SafeExecute(() => this.Dispose());
Expand Down Expand Up @@ -380,7 +387,7 @@ public void Dispose()
if (this.disposing) return;
this.disposing = true;

Utils.SafeExecute(() => this.callbackTimer?.Dispose());
Utils.SafeExecute(() => this.callbackTimer.Dispose());

Utils.SafeExecute(() => MessageCenter?.Dispose());

Expand Down Expand Up @@ -434,14 +441,30 @@ public void NotifyGatewayCountChanged(int currentNumberOfGateways, int previousN
}
}

private void OnCallbackExpiryTick(object state)
private async Task MonitorCallbackExpiry()
{
var currentStopwatchTicks = ValueStopwatch.GetTimestamp();
foreach (var pair in callbacks)
while (await callbackTimer.WaitForNextTickAsync())
{
var callback = pair.Value;
if (callback.IsCompleted) continue;
if (callback.IsExpired(currentStopwatchTicks)) callback.OnTimeout();
try
{
var currentStopwatchTicks = ValueStopwatch.GetTimestamp();
foreach (var (_, callback) in callbacks)
{
if (callback.IsCompleted)
{
continue;
}

if (callback.IsExpired(currentStopwatchTicks))
{
callback.OnTimeout();
}
}
}
catch (Exception ex)
{
logger.LogWarning(ex, "Error while processing callback expiry.");
}
}
}

Expand Down
Loading
Loading