Skip to content

Commit

Permalink
Merge pull request ppy#214 from bdach/fix-broken-tracking
Browse files Browse the repository at this point in the history
Fix incorect connection metrics tracking on instances pending shutdown
  • Loading branch information
peppy authored Feb 1, 2024
2 parents 0806da6 + fc2a91a commit 1e6947b
Showing 1 changed file with 61 additions and 64 deletions.
125 changes: 61 additions & 64 deletions osu.Server.Spectator/ConcurrentConnectionLimiter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,52 +36,51 @@ public ConcurrentConnectionLimiter(

public async Task OnConnectedAsync(HubLifetimeContext context, Func<HubLifetimeContext, Task> next)
{
try
await registerConnection(context);
await next(context);
}

private async Task registerConnection(HubLifetimeContext context)
{
int userId = context.Context.GetUserId();

using (var userState = await connectionStates.GetForUse(userId, true))
{
int userId = context.Context.GetUserId();
if (userState.Item == null)
{
log(context, "connection from first client instance");
userState.Item = new ConnectionState(context);
return;
}

using (var userState = await connectionStates.GetForUse(userId, true))
if (context.Context.GetTokenId() == userState.Item.TokenId)
{
if (userState.Item == null)
{
log(context, "connection from first client instance");
userState.Item = new ConnectionState(context);
return;
}
// The assumption is that the client has already dropped the old connection,
// so we don't bother to ask for a disconnection.

if (context.Context.GetTokenId() == userState.Item.TokenId)
{
// The assumption is that the client has already dropped the old connection,
// so we don't bother to ask for a disconnection.

log(context, "subsequent connection from same client instance, registering");
// Importantly, this will replace the old connection, ensuring it cannot be
// used to communicate on anymore.
userState.Item.RegisterConnectionId(context);
return;
}
log(context, "subsequent connection from same client instance, registering");
// Importantly, this will replace the old connection, ensuring it cannot be
// used to communicate on anymore.
userState.Item.RegisterConnectionId(context);
return;
}

log(context, "connection from new client instance, dropping existing state");
log(context, "connection from new client instance, dropping existing state");

foreach (var hubType in stateful_user_hubs)
foreach (var hubType in stateful_user_hubs)
{
var hubContextType = typeof(IHubContext<>).MakeGenericType(hubType);
var hubContext = serviceProvider.GetRequiredService(hubContextType) as IHubContext;

if (userState.Item.ConnectionIds.TryGetValue(hubType, out string? connectionId))
{
var hubContextType = typeof(IHubContext<>).MakeGenericType(hubType);
var hubContext = serviceProvider.GetRequiredService(hubContextType) as IHubContext;

if (userState.Item.ConnectionIds.TryGetValue(hubType, out string? connectionId))
{
hubContext?.Clients.Client(connectionId)
.SendCoreAsync(nameof(IStatefulUserHubClient.DisconnectRequested), Array.Empty<object>());
}
hubContext?.Clients.Client(connectionId)
.SendCoreAsync(nameof(IStatefulUserHubClient.DisconnectRequested), Array.Empty<object>());
}

log(context, "existing state dropped");
userState.Item = new ConnectionState(context);
}
}
finally
{
await next(context);

log(context, "existing state dropped");
userState.Item = new ConnectionState(context);
}
}

Expand Down Expand Up @@ -111,41 +110,39 @@ private static void log(HubLifetimeContext context, string message)

public async Task OnDisconnectedAsync(HubLifetimeContext context, Exception? exception, Func<HubLifetimeContext, Exception?, Task> next)
{
try
{
if (exception != null)
// network disconnection. wait for user to return.
return;
// if `exception` isn't null then the disconnection is not clean,
// so don't unregister yet in hopes that the user will return after a transient network failure or similar.
if (exception == null)
await unregisterConnection(context, exception);
await next(context, exception);
}

int userId = context.Context.GetUserId();
private async Task unregisterConnection(HubLifetimeContext context, Exception? exception)
{
int userId = context.Context.GetUserId();

using (var userState = await connectionStates.GetForUse(userId, true))
{
string? registeredConnectionId = null;
using (var userState = await connectionStates.GetForUse(userId, true))
{
string? registeredConnectionId = null;

bool tokenIdMatches = context.Context.GetTokenId() == userState.Item?.TokenId;
bool hubRegistered = userState.Item?.ConnectionIds.TryGetValue(context.Hub.GetType(), out registeredConnectionId) == true;
bool connectionIdMatches = registeredConnectionId == context.Context.ConnectionId;
bool tokenIdMatches = context.Context.GetTokenId() == userState.Item?.TokenId;
bool hubRegistered = userState.Item?.ConnectionIds.TryGetValue(context.Hub.GetType(), out registeredConnectionId) == true;
bool connectionIdMatches = registeredConnectionId == context.Context.ConnectionId;

bool connectionCanBeCleanedUp = tokenIdMatches && hubRegistered && connectionIdMatches;
bool connectionCanBeCleanedUp = tokenIdMatches && hubRegistered && connectionIdMatches;

if (connectionCanBeCleanedUp)
{
log(context, "disconnected from hub");
userState.Item!.ConnectionIds.Remove(context.Hub.GetType());
}
if (connectionCanBeCleanedUp)
{
log(context, "disconnected from hub");
userState.Item!.ConnectionIds.Remove(context.Hub.GetType());
}

if (userState.Item?.ConnectionIds.Count == 0)
{
log(context, "all connections closed, destroying state");
userState.Destroy();
}
if (userState.Item?.ConnectionIds.Count == 0)
{
log(context, "all connections closed, destroying state");
userState.Destroy();
}
}
finally
{
await next(context, exception);
}
}
}
}

0 comments on commit 1e6947b

Please sign in to comment.