Skip to content

Commit

Permalink
Fix incorect connection metrics tracking on instances pending shutdown
Browse files Browse the repository at this point in the history
While a spectator server instance was pending shutdown, it could start
reporting endlessly rising user counts until its actual shutdown. The
reason for this was questionable logic in `ConcurrentConnectionLimiter`.

When I wrote that thing I used `try...finally` in `OnConnectedAsync()`
and `OnDisconnectedAsync()` because I wanted to have multiple returns in
the body of the method and sitll call `await next()` at the end. However
I appear to have also forgotten that it means that `await next()` would
be called _even if the preceding code threw an exception_, and then _the
exception would be thrown anyway_.

This in the case of `StatefulUserHub`s led to the following sequence of
events:

1. `ConcurrentConnectionLimiter.OnConnectedAsync()` fires
2. It dies on its innards, but because of the `finally`, calls `await
   next()` anyway
3. Inside `await next()` is the hub's connection method, in particular
   `LoggingHub.OnConnectedAsync()`, which increments the datadog
   connected user gauge
4. Then after the finally, because any exception that was thrown was
   not caught, it will get _rethrown_, interrupting the connection flow
5. After this, `OnDisconnectedAsync()` will not get called for this
   connection, leading the counter to not decrement back down from the
   increment in point (3)

leading to the runaway increase in reported user counts.

To fix, stop using `try...finally` and split the blocks of code that
want early-returns to separate methods instead.
  • Loading branch information
bdach committed Jan 31, 2024
1 parent 0806da6 commit fc2a91a
Showing 1 changed file with 61 additions and 64 deletions.
125 changes: 61 additions & 64 deletions osu.Server.Spectator/ConcurrentConnectionLimiter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,52 +36,51 @@ public ConcurrentConnectionLimiter(

public async Task OnConnectedAsync(HubLifetimeContext context, Func<HubLifetimeContext, Task> next)
{
try
await registerConnection(context);
await next(context);
}

private async Task registerConnection(HubLifetimeContext context)
{
int userId = context.Context.GetUserId();

using (var userState = await connectionStates.GetForUse(userId, true))
{
int userId = context.Context.GetUserId();
if (userState.Item == null)
{
log(context, "connection from first client instance");
userState.Item = new ConnectionState(context);
return;
}

using (var userState = await connectionStates.GetForUse(userId, true))
if (context.Context.GetTokenId() == userState.Item.TokenId)
{
if (userState.Item == null)
{
log(context, "connection from first client instance");
userState.Item = new ConnectionState(context);
return;
}
// The assumption is that the client has already dropped the old connection,
// so we don't bother to ask for a disconnection.

if (context.Context.GetTokenId() == userState.Item.TokenId)
{
// The assumption is that the client has already dropped the old connection,
// so we don't bother to ask for a disconnection.

log(context, "subsequent connection from same client instance, registering");
// Importantly, this will replace the old connection, ensuring it cannot be
// used to communicate on anymore.
userState.Item.RegisterConnectionId(context);
return;
}
log(context, "subsequent connection from same client instance, registering");
// Importantly, this will replace the old connection, ensuring it cannot be
// used to communicate on anymore.
userState.Item.RegisterConnectionId(context);
return;
}

log(context, "connection from new client instance, dropping existing state");
log(context, "connection from new client instance, dropping existing state");

foreach (var hubType in stateful_user_hubs)
foreach (var hubType in stateful_user_hubs)
{
var hubContextType = typeof(IHubContext<>).MakeGenericType(hubType);
var hubContext = serviceProvider.GetRequiredService(hubContextType) as IHubContext;

if (userState.Item.ConnectionIds.TryGetValue(hubType, out string? connectionId))
{
var hubContextType = typeof(IHubContext<>).MakeGenericType(hubType);
var hubContext = serviceProvider.GetRequiredService(hubContextType) as IHubContext;

if (userState.Item.ConnectionIds.TryGetValue(hubType, out string? connectionId))
{
hubContext?.Clients.Client(connectionId)
.SendCoreAsync(nameof(IStatefulUserHubClient.DisconnectRequested), Array.Empty<object>());
}
hubContext?.Clients.Client(connectionId)
.SendCoreAsync(nameof(IStatefulUserHubClient.DisconnectRequested), Array.Empty<object>());
}

log(context, "existing state dropped");
userState.Item = new ConnectionState(context);
}
}
finally
{
await next(context);

log(context, "existing state dropped");
userState.Item = new ConnectionState(context);
}
}

Expand Down Expand Up @@ -111,41 +110,39 @@ private static void log(HubLifetimeContext context, string message)

public async Task OnDisconnectedAsync(HubLifetimeContext context, Exception? exception, Func<HubLifetimeContext, Exception?, Task> next)
{
try
{
if (exception != null)
// network disconnection. wait for user to return.
return;
// if `exception` isn't null then the disconnection is not clean,
// so don't unregister yet in hopes that the user will return after a transient network failure or similar.
if (exception == null)
await unregisterConnection(context, exception);
await next(context, exception);
}

int userId = context.Context.GetUserId();
private async Task unregisterConnection(HubLifetimeContext context, Exception? exception)
{
int userId = context.Context.GetUserId();

using (var userState = await connectionStates.GetForUse(userId, true))
{
string? registeredConnectionId = null;
using (var userState = await connectionStates.GetForUse(userId, true))
{
string? registeredConnectionId = null;

bool tokenIdMatches = context.Context.GetTokenId() == userState.Item?.TokenId;
bool hubRegistered = userState.Item?.ConnectionIds.TryGetValue(context.Hub.GetType(), out registeredConnectionId) == true;
bool connectionIdMatches = registeredConnectionId == context.Context.ConnectionId;
bool tokenIdMatches = context.Context.GetTokenId() == userState.Item?.TokenId;
bool hubRegistered = userState.Item?.ConnectionIds.TryGetValue(context.Hub.GetType(), out registeredConnectionId) == true;
bool connectionIdMatches = registeredConnectionId == context.Context.ConnectionId;

bool connectionCanBeCleanedUp = tokenIdMatches && hubRegistered && connectionIdMatches;
bool connectionCanBeCleanedUp = tokenIdMatches && hubRegistered && connectionIdMatches;

if (connectionCanBeCleanedUp)
{
log(context, "disconnected from hub");
userState.Item!.ConnectionIds.Remove(context.Hub.GetType());
}
if (connectionCanBeCleanedUp)
{
log(context, "disconnected from hub");
userState.Item!.ConnectionIds.Remove(context.Hub.GetType());
}

if (userState.Item?.ConnectionIds.Count == 0)
{
log(context, "all connections closed, destroying state");
userState.Destroy();
}
if (userState.Item?.ConnectionIds.Count == 0)
{
log(context, "all connections closed, destroying state");
userState.Destroy();
}
}
finally
{
await next(context, exception);
}
}
}
}

0 comments on commit fc2a91a

Please sign in to comment.