Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
Y-Sindo committed Oct 26, 2023
2 parents f5cf33f + 5c42ded commit 7418236
Show file tree
Hide file tree
Showing 48 changed files with 1,695 additions and 517 deletions.
2 changes: 1 addition & 1 deletion docs/management-sdk-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ This SDK can communicates to Azure SignalR Service with two transport types:
| | Transient | Persistent |
| ------------------------------ | ------------------ | --------------------------------- |
| Default JSON library | `Newtonsoft.Json` | The same as Asp.Net Core SignalR: <br>`Newtonsoft.Json` for .NET Standard 2.0; <br>`System.Text.Json` for .NET Core App 3.1 and above |
| MessaegPack clients support | since v1.21.0 | since v1.20.0 |
| MessagePack clients support | since v1.21.0 | since v1.20.0 |

#### Json serialization
See [Customizing Json Serialization in Management SDK](./advanced-topics/json-object-serializer.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,22 @@ namespace Microsoft.Azure.SignalR.AspNet
{
internal partial class ServiceConnection : ServiceConnectionBase
{
private const string ReconnectMessage = "asrs:reconnect";

private static readonly Dictionary<string, string> CustomHeader = new Dictionary<string, string>
{{Constants.AsrsUserAgent, ProductInfo.GetProductInfo()}};

private const string ReconnectMessage = "asrs:reconnect";

private static readonly TimeSpan CloseApplicationTimeout = TimeSpan.FromSeconds(5);

private readonly ConcurrentDictionary<string, ClientConnectionContext> _clientConnections =
new ConcurrentDictionary<string, ClientConnectionContext>(StringComparer.Ordinal);

private readonly IConnectionFactory _connectionFactory;

private readonly IClientConnectionManager _clientConnectionManager;

private readonly AckHandler _ackHandler;

public ServiceConnection(
string serverId,
string connectionId,
Expand All @@ -42,6 +45,7 @@ public ServiceConnection(
ILoggerFactory loggerFactory,
IServiceMessageHandler serviceMessageHandler,
IServiceEventHandler serviceEventHandler,
AckHandler ackHandler,
ServiceConnectionType connectionType = ServiceConnectionType.Default)
: base(
serviceProtocol,
Expand All @@ -55,6 +59,7 @@ public ServiceConnection(
{
_connectionFactory = connectionFactory;
_clientConnectionManager = clientConnectionManager;
_ackHandler = ackHandler;
}

protected override Task<ConnectionContext> CreateConnection(string target = null)
Expand Down Expand Up @@ -147,6 +152,17 @@ protected virtual async Task CleanupConnectionsAsyncCore(string instanceId = nul
}
}

private static string GetString(ReadOnlySequence<byte> buffer)
{
if (buffer.IsSingleSegment)
{
MemoryMarshal.TryGetArray(buffer.First, out var segment);
return Encoding.UTF8.GetString(segment.Array, segment.Offset, segment.Count);
}

return Encoding.UTF8.GetString(buffer.ToArray());
}

private async Task ForwardMessageToApplication(string connectionId, ServiceMessage message)
{
if (_clientConnections.TryGetValue(connectionId, out var clientContext))
Expand Down Expand Up @@ -230,6 +246,7 @@ private async Task OnConnectedAsyncCore(ClientConnectionContext clientContext, O
catch (Exception e)
{
Log.ConnectedStartingFailed(Logger, connectionId, e);

// Should not wait for application task inside the application task
_ = PerformDisconnectCore(connectionId, false);
_ = SafeWriteAsync(new CloseConnectionMessage(connectionId, e.Message));
Expand Down Expand Up @@ -276,14 +293,18 @@ private async Task ProcessMessageAsync(ClientConnectionContext clientContext, Ca
case OpenConnectionMessage openConnectionMessage:
await OnConnectedAsyncCore(clientContext, openConnectionMessage);
break;

case CloseConnectionMessage closeConnectionMessage:

// should not wait for application task when inside the application task
// As the messages are in a queue, close message should be after all the other messages
await PerformDisconnectCore(closeConnectionMessage.ConnectionId, false);
return;

case ConnectionDataMessage connectionDataMessage:
ProcessOutgoingMessages(clientContext, connectionDataMessage);
break;

default:
break;
}
Expand All @@ -304,17 +325,6 @@ private async Task ProcessMessageAsync(ClientConnectionContext clientContext, Ca
}
}

private static string GetString(ReadOnlySequence<byte> buffer)
{
if (buffer.IsSingleSegment)
{
MemoryMarshal.TryGetArray(buffer.First, out var segment);
return Encoding.UTF8.GetString(segment.Array, segment.Offset, segment.Count);
}

return Encoding.UTF8.GetString(buffer.ToArray());
}

private string GetInstanceId(IDictionary<string, StringValues> header)
{
if (header.TryGetValue(Constants.AsrsInstanceId, out var instanceId))
Expand All @@ -324,4 +334,4 @@ private string GetInstanceId(IDictionary<string, StringValues> header)
return null;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public ServiceConnectionFactory(
_serviceEventHandler = serviceEventHandler;
}

public IServiceConnection Create(HubServiceEndpoint endpoint, IServiceMessageHandler serviceMessageHandler, ServiceConnectionType type)
public IServiceConnection Create(HubServiceEndpoint endpoint, IServiceMessageHandler serviceMessageHandler, AckHandler ackHandler, ServiceConnectionType type)
{
return new ServiceConnection(
_nameProvider.GetName(),
Expand All @@ -41,6 +41,7 @@ public IServiceConnection Create(HubServiceEndpoint endpoint, IServiceMessageHan
_logger,
serviceMessageHandler,
_serviceEventHandler,
ackHandler,
type);
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/Microsoft.Azure.SignalR.Common/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,5 +113,11 @@ public static class ErrorCodes
public const string InfoUserNotInGroup = "Info.User.NotInGroup";
public const string ErrorConnectionNotExisted = "Error.Connection.NotExisted";
}

public static class HttpClientNames
{
public const string Resilient = "Resilient";
public const string MessageResilient = "MessageResilient";
}
}
}
15 changes: 12 additions & 3 deletions src/Microsoft.Azure.SignalR.Common/Endpoints/AadAccessKey.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using System;
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Http;
Expand Down Expand Up @@ -154,7 +157,14 @@ private async Task AuthorizeWithRetryAsync(CancellationToken ctoken = default)
catch (Exception e)
{
latest = e;
await Task.Delay(AuthorizeRetryInterval);
try
{
await Task.Delay(AuthorizeRetryInterval, ctoken);
}
catch (OperationCanceledException)
{
break;
}
}
}

Expand All @@ -169,7 +179,6 @@ private async Task AuthorizeWithTokenAsync(string accessToken, CancellationToken
await new RestClient().SendAsync(
api,
HttpMethod.Get,
"",
handleExpectedResponseAsync: HandleHttpResponseAsync,
cancellationToken: ctoken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ private async Task UpdateAccessKeyAsync(AadAccessKey key)
try
{
await key.UpdateAccessKeyAsync();
Log.SucceedToAuthorizeAccessKey(logger, key.Endpoint.AbsoluteUri);
Log.SucceedToAuthorizeAccessKey(logger, key.AuthorizeUrl);
}
catch (Exception e)
{
Log.FailedToAuthorizeAccessKey(logger, key.Endpoint.AbsoluteUri, e);
Log.FailedToAuthorizeAccessKey(logger, key.AuthorizeUrl, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
{
internal interface IServiceConnectionFactory
{
IServiceConnection Create(HubServiceEndpoint endpoint, IServiceMessageHandler serviceMessageHandler, ServiceConnectionType type);
IServiceConnection Create(HubServiceEndpoint endpoint, IServiceMessageHandler serviceMessageHandler, AckHandler ackHandler, ServiceConnectionType type);
}
}
Loading

0 comments on commit 7418236

Please sign in to comment.