Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add main logic for Client Invocation #1687

Merged
merged 42 commits into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
79a0567
main logic for client invocation
xingsy97 Sep 23, 2022
f3281af
add ClientInvocation main logic
xingsy97 Sep 23, 2022
790b6c0
bug fix
xingsy97 Sep 23, 2022
f594136
bug fix
xingsy97 Sep 23, 2022
9dccae7
bug fix
xingsy97 Sep 23, 2022
a7c8c8a
update
xingsy97 Sep 23, 2022
f1b96c1
update
xingsy97 Sep 23, 2022
b129204
update
xingsy97 Sep 26, 2022
f4e143c
sync with PR 1684 and some updates
xingsy97 Sep 27, 2022
ee60530
minor fix
xingsy97 Sep 27, 2022
d2a3178
update about nameProvider
xingsy97 Sep 27, 2022
8299c2d
Merge branch 'dev' into ci-main
xingsy97 Sep 28, 2022
d18743a
tmp
xingsy97 Sep 28, 2022
ecf28b4
update
xingsy97 Oct 10, 2022
789427d
Merge branch 'dev' into ci-main
xingsy97 Oct 10, 2022
a64dec1
solve conflict
xingsy97 Oct 14, 2022
9a57478
Merge branch 'dev' into ci-main
xingsy97 Oct 14, 2022
c8198d5
update
xingsy97 Oct 14, 2022
1ad6573
Merge branch 'dev' into ci-main
xingsy97 Oct 14, 2022
9265187
update
xingsy97 Oct 14, 2022
b93a8c1
minior fix
xingsy97 Oct 14, 2022
ec252ad
new-line character fix
xingsy97 Oct 14, 2022
886b985
minor fix
xingsy97 Oct 14, 2022
0c40d03
update
xingsy97 Oct 17, 2022
c337cd3
add new UTs and move existing UTs
xingsy97 Oct 20, 2022
3ba27a2
add & update UTs
xingsy97 Oct 20, 2022
4b1dc1e
Merge branch 'dev' into ci-main
xingsy97 Oct 20, 2022
fcb849e
migrate to ServiceConnection and ServiceLifetimeManager from their ba…
xingsy97 Oct 21, 2022
995cadb
Merge branch 'ci-main' of https://github.com/xingsy97/azure-signalr i…
xingsy97 Oct 21, 2022
d0a36d0
minor bug fix
xingsy97 Oct 21, 2022
fb5a38e
format clean
xingsy97 Oct 21, 2022
2d98f5a
bug fix for PingMessage
xingsy97 Oct 21, 2022
2c58f3b
bug fix & add log
xingsy97 Oct 21, 2022
e6dcc32
code clean
xingsy97 Oct 21, 2022
2168535
add RemoveInvocation for caller, improve exception handling
xingsy97 Oct 24, 2022
8a5356f
remove instanceId for caller AddInvocation
xingsy97 Oct 24, 2022
85f4b9f
make RemoveInvocation return void
xingsy97 Oct 24, 2022
67ff88c
update
xingsy97 Oct 24, 2022
51b1271
Merge branch 'dev' into ci-main
xingsy97 Oct 24, 2022
0e6631f
update
xingsy97 Oct 24, 2022
a80e79f
Merge branch 'ci-main' of https://github.com/xingsy97/azure-signalr i…
xingsy97 Oct 24, 2022
a1a38e2
minor fix
xingsy97 Oct 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build/dependencies.props
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
<MicrosoftAspNetCoreHttpConnectionsClientPackage3_0Version>3.0.0</MicrosoftAspNetCoreHttpConnectionsClientPackage3_0Version>
<MicrosoftAspNetCoreHttpConnectionsClientPackage3_1Version>3.1.9</MicrosoftAspNetCoreHttpConnectionsClientPackage3_1Version>
<MicrosoftAspNetCoreHttpConnectionsClientPackage5_0Version>5.0.1</MicrosoftAspNetCoreHttpConnectionsClientPackage5_0Version>
<MicrosoftAspNetCoreHttpConnectionsClientPackage7_0Version>7.0.0-preview.7.22376.6</MicrosoftAspNetCoreHttpConnectionsClientPackage7_0Version>
<MicrosoftAspNetCoreHttpConnectionsClientPackage6_0Version>6.0.10</MicrosoftAspNetCoreHttpConnectionsClientPackage6_0Version>
<MicrosoftAspNetCoreHttpConnectionsCommonPackageVersion>1.0.4</MicrosoftAspNetCoreHttpConnectionsCommonPackageVersion>
<MicrosoftAspNetCoreHttpConnectionsCommonPackage3_0Version>3.0.0</MicrosoftAspNetCoreHttpConnectionsCommonPackage3_0Version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ protected Task OnServiceErrorAsync(ServiceErrorMessage serviceErrorMessage)
return Task.CompletedTask;
}

protected Task OnPingMessageAsync(PingMessage pingMessage)
protected virtual Task OnPingMessageAsync(PingMessage pingMessage)
{
if (RuntimeServicePingMessage.TryGetOffline(pingMessage, out var instanceId))
{
Expand Down Expand Up @@ -550,7 +550,7 @@ private async Task ProcessIncomingAsync(ConnectionContext connection)
}
}

private Task DispatchMessageAsync(ServiceMessage message)
protected virtual Task DispatchMessageAsync(ServiceMessage message)
{
return message switch
{
Expand Down
5 changes: 5 additions & 0 deletions src/Microsoft.Azure.SignalR/DependencyInjectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ private static ISignalRServerBuilder AddAzureSignalRCore(this ISignalRServerBuil
.AddSingleton<IClientConnectionFactory, ClientConnectionFactory>()
.AddSingleton<IHostedService, HeartBeat>()
.AddSingleton<IAccessKeySynchronizer, AccessKeySynchronizer>()
#if NET7_0_OR_GREATER
.AddSingleton<IClientInvocationManager, ClientInvocationManager>()
#else
.AddSingleton<IClientInvocationManager, DummyClientInvocationManager>()
#endif
.AddSingleton(typeof(NegotiateHandler<>));

// If a custom router is added, do not add the default router
Expand Down
6 changes: 5 additions & 1 deletion src/Microsoft.Azure.SignalR/HubHost/ServiceHubDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ internal class ServiceHubDispatcher<THub> where THub : Hub
private readonly IEndpointRouter _router;
private readonly string _hubName;
private readonly IServiceEventHandler _serviceEventHandler;
private readonly IClientInvocationManager _clientInvocationManager;

protected readonly IServerNameProvider _nameProvider;

Expand All @@ -45,6 +46,7 @@ public ServiceHubDispatcher(
IServerNameProvider nameProvider,
ServerLifetimeManager serverLifetimeManager,
IClientConnectionFactory clientConnectionFactory,
IClientInvocationManager clientInvocationManager,
IServiceEventHandler serviceEventHandler)
{
_serviceProtocol = serviceProtocol;
Expand All @@ -62,6 +64,7 @@ public ServiceHubDispatcher(
_nameProvider = nameProvider;
_hubName = typeof(THub).Name;
_serviceEventHandler = serviceEventHandler;
_clientInvocationManager = clientInvocationManager;

serverLifetimeManager?.Register(ShutdownAsync);
}
Expand Down Expand Up @@ -150,7 +153,8 @@ internal virtual ServiceConnectionFactory GetServiceConnectionFactory(
connectionDelegate,
_clientConnectionFactory,
_nameProvider,
_serviceEventHandler)
_serviceEventHandler,
_clientInvocationManager)
{
ConfigureContext = contextConfig,
ShutdownMode = _options.GracefulShutdown.Mode
Expand Down
99 changes: 94 additions & 5 deletions src/Microsoft.Azure.SignalR/HubHost/ServiceLifetimeManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@

using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.SignalR;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.Azure.SignalR.Common;
using Microsoft.Azure.SignalR.Protocol;
using Microsoft.Extensions.Logging;
Expand All @@ -18,8 +20,9 @@ internal class ServiceLifetimeManager<THub> : ServiceLifetimeManagerBase<THub> w
{
private const string MarkerNotConfiguredError =
"'AddAzureSignalR(...)' was called without a matching call to 'IApplicationBuilder.UseAzureSignalR(...)'.";

private readonly IClientInvocationManager _clientInvocationManager;
private readonly IClientConnectionManager _clientConnectionManager;
private readonly string _callerId;

public ServiceLifetimeManager(
IServiceConnectionManager<THub> serviceConnectionManager,
Expand All @@ -29,12 +32,15 @@ public ServiceLifetimeManager(
AzureSignalRMarkerService marker,
IOptions<HubOptions> globalHubOptions,
IOptions<HubOptions<THub>> hubOptions,
IBlazorDetector blazorDetector)
IBlazorDetector blazorDetector,
IServerNameProvider nameProvider,
IClientInvocationManager clientInvocationManager)
: base(
serviceConnectionManager,
protocolResolver,
globalHubOptions,
hubOptions, logger)
hubOptions,
logger)
{
// after core 3.0 UseAzureSignalR() is not required.
#if NETSTANDARD2_0
Expand All @@ -43,12 +49,19 @@ public ServiceLifetimeManager(
throw new InvalidOperationException(MarkerNotConfiguredError);
}
#endif
_clientConnectionManager = clientConnectionManager;

if (hubOptions.Value.SupportedProtocols != null && hubOptions.Value.SupportedProtocols.Any(x => x.Equals(Constants.Protocol.BlazorPack, StringComparison.OrdinalIgnoreCase)))
{
blazorDetector?.TrySetBlazor(typeof(THub).Name, true);
}

if (nameProvider == null)
{
throw new ArgumentNullException(nameof(nameProvider));
}
_callerId = nameProvider.GetName();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (nameProvider == null)
{
throw new ArgumentNullException(nameof(nameProvider));
}
_callerId = nameProvider.GetName();
_callerId = nameProvider?.GetName() ?? throw new ArgumentNullException(nameof(nameProvider));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


_clientInvocationManager = clientInvocationManager ?? throw new ArgumentNullException(nameof(clientInvocationManager));
_clientConnectionManager = clientConnectionManager ?? throw new ArgumentNullException(nameof(clientConnectionManager));
}

public override Task OnConnectedAsync(HubConnectionContext connection)
Expand Down Expand Up @@ -103,6 +116,82 @@ public override async Task SendConnectionAsync(string connectionId, string metho
}
}

#if NET7_0_OR_GREATER
vicancy marked this conversation as resolved.
Show resolved Hide resolved
public override async Task<T> InvokeConnectionAsync<T>(string connectionId, string methodName, object[] args, CancellationToken cancellationToken = default)
{
if (IsInvalidArgument(connectionId))
{
throw new ArgumentNullException(nameof(connectionId));
}

if (IsInvalidArgument(methodName))
{
throw new ArgumentNullException(nameof(methodName));
}

var invocationId = _clientInvocationManager.Caller.GenerateInvocationId(connectionId);

// Exception handling follows https://source.dot.net/#Microsoft.AspNetCore.SignalR.Core/DefaultHubLifetimeManager.cs,349
try
{
var message = AppendMessageTracingId(new ClientInvocationMessage(invocationId, connectionId, _callerId, SerializeAllProtocols(methodName, args, invocationId)));
await WriteAsync(message);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can move to before L133

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved

if (_clientConnectionManager.ClientConnections.TryGetValue(connectionId, out var clientConnectionContext))
{
var instanceId = clientConnectionContext.InstanceId;
var task = _clientInvocationManager.Caller.AddInvocation<T>(connectionId, invocationId, instanceId, cancellationToken);
return await task;
}
}
catch
{
// Use `TryCompleteResult` to remove the failed invoation.
// If the invocation was not added by caller, it will be ignored.
// The content of error message is useless and will not be exposed to client.
_clientInvocationManager.Caller.TryCompleteResult(connectionId, CompletionMessage.WithError(invocationId, "Invocation Failed"));
throw;
vicancy marked this conversation as resolved.
Show resolved Hide resolved
}

// when condition `_clientConnectionManager.ClientConnections.TryGetValue` is false
throw new InvalidOperationException($"ConnectionId {connectionId} is invalid.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why throw here? If target client is not routed to this invoke server, false is expected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is possible that connection is closed during this period

Copy link
Contributor Author

@xingsy97 xingsy97 Oct 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is a bug and was deleted.
If target connection is closed during method InvokeConnectionAsync, the triggered exception will be handled by try-catch

}

// Only route server will reach here
public override async Task SetConnectionResultAsync(string connectionId, CompletionMessage result)
{
if (IsInvalidArgument(connectionId))
{
throw new ArgumentException(NullOrEmptyStringErrorMessage, nameof(connectionId));
}
if (_clientConnectionManager.ClientConnections.TryGetValue(connectionId, out var clientConnectionContext))
{
// Block unknown `results` which belongs to neither Caller nor Router
// `TryCompletionResult` returns false when the corresponding invocation is not existing.
if (!_clientInvocationManager.Caller.TryCompleteResult(connectionId, result)
&& !_clientInvocationManager.Router.TryCompleteResult(connectionId, result))
{
return;
}

var protocol = clientConnectionContext.Protocol;
var message = AppendMessageTracingId(new ClientCompletionMessage(result.InvocationId, connectionId, _callerId, protocol, SerializeCompletionMessage(result, protocol)));
await WriteAsync(message);
}
}

public override bool TryGetReturnType(string invocationId, [NotNullWhen(true)] out Type type)
{
if (_clientInvocationManager.Router.ContainsInvocation(invocationId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this check necessary? Simply try get from caller then try get from router?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this check. Also remove method Router.ContainsInvocation

{
return _clientInvocationManager.Router.TryGetInvocationReturnType(invocationId, out type);
}
else
{
return _clientInvocationManager.Caller.TryGetInvocationReturnType(invocationId, out type);
}
}
#endif

private MultiConnectionDataMessage CreateMessage(string connectionId, string methodName, object[] args, ClientConnectionContext serviceConnectionContext)
{
IDictionary<string, ReadOnlyMemory<byte>> payloads;
Expand Down
16 changes: 14 additions & 2 deletions src/Microsoft.Azure.SignalR/HubHost/ServiceLifetimeManagerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics.CodeAnalysis;
xingsy97 marked this conversation as resolved.
Show resolved Hide resolved
using Microsoft.AspNetCore.SignalR;
using Microsoft.AspNetCore.SignalR.Protocol;
using Microsoft.Azure.SignalR.Protocol;
Expand Down Expand Up @@ -283,10 +284,18 @@ protected static bool IsInvalidArgument(IReadOnlyList<object> list)
return list == null;
}

protected IDictionary<string, ReadOnlyMemory<byte>> SerializeAllProtocols(string method, object[] args)
protected IDictionary<string, ReadOnlyMemory<byte>> SerializeAllProtocols(string method, object[] args, string invocationId = null)
{
var payloads = new Dictionary<string, ReadOnlyMemory<byte>>();
var message = new InvocationMessage(method, args);
InvocationMessage message;
if (invocationId == null)
{
message = new InvocationMessage(method, args);
}
else
{
message = new InvocationMessage(invocationId, method, args);
}
var serializedHubMessages = _messageSerializer.SerializeMessage(message);
foreach (var serializedMessage in serializedHubMessages)
{
Expand All @@ -298,6 +307,9 @@ protected IDictionary<string, ReadOnlyMemory<byte>> SerializeAllProtocols(string
protected ReadOnlyMemory<byte> SerializeProtocol(string protocol, string method, object[] args) =>
_messageSerializer.SerializeMessage(protocol, new InvocationMessage(method, args));

protected ReadOnlyMemory<byte> SerializeCompletionMessage(CompletionMessage message, string protocol) =>
_messageSerializer.SerializeMessage(protocol, message);

protected virtual T AppendMessageTracingId<T>(T message) where T : ServiceMessage, IMessageWithTracingId
{
return message.WithTracingId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ internal partial class ServiceConnection : ServiceConnectionBase

private readonly ConnectionDelegate _connectionDelegate;

private readonly IClientInvocationManager _clientInvocationManager;

public Action<HttpContext> ConfigureContext { get; set; }

public ServiceConnection(IServiceProtocol serviceProtocol,
Expand All @@ -54,6 +56,7 @@ public ServiceConnection(IServiceProtocol serviceProtocol,
HubServiceEndpoint endpoint,
IServiceMessageHandler serviceMessageHandler,
IServiceEventHandler serviceEventHandler,
IClientInvocationManager clientInvocationManager,
ServiceConnectionType connectionType = ServiceConnectionType.Default,
GracefulShutdownMode mode = GracefulShutdownMode.Off,
int closeTimeOutMilliseconds = DefaultCloseTimeoutMilliseconds
Expand All @@ -64,6 +67,7 @@ public ServiceConnection(IServiceProtocol serviceProtocol,
_connectionDelegate = connectionDelegate;
_clientConnectionFactory = clientConnectionFactory;
_closeTimeOutMilliseconds = closeTimeOutMilliseconds;
_clientInvocationManager = clientInvocationManager;
}

protected override Task<ConnectionContext> CreateConnection(string target = null)
Expand Down Expand Up @@ -188,7 +192,31 @@ protected override async Task OnClientMessageAsync(ConnectionDataMessage connect
}
}

private async Task ProcessClientConnectionAsync(ClientConnectionContext connection)
protected override Task DispatchMessageAsync(ServiceMessage message)
{
return message switch
{
PingMessage pingMessage => OnPingMessageAsync(pingMessage),
ClientInvocationMessage clientInvocationMessage => OnClientInvocationAsync(clientInvocationMessage),
ServiceMappingMessage serviceMappingMessage => OnServiceMappingAsync(serviceMappingMessage),
ClientCompletionMessage clientCompletionMessage => OnClientCompletionAsync(clientCompletionMessage),
ErrorCompletionMessage errorCompletionMessage => OnErrorCompletionAsync(errorCompletionMessage),
_ => base.DispatchMessageAsync(message)
};
}

protected override Task OnPingMessageAsync(PingMessage pingMessage)
{
#if NET7_0_OR_GREATER
if (RuntimeServicePingMessage.TryGetOffline(pingMessage, out var instanceId))
{
_clientInvocationManager.Caller.CleanupInvocationsByInstance(instanceId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to clean Router here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added comments

}
#endif
return base.OnPingMessageAsync(pingMessage);
}

private async Task ProcessClientConnectionAsync(ClientConnectionContext connection)
{
try
{
Expand Down Expand Up @@ -434,7 +462,34 @@ private ClientConnectionContext RemoveClientConnection(string connectionId)
{
_connectionIds.TryRemove(connectionId, out _);
_clientConnectionManager.TryRemoveClientConnection(connectionId, out var connection);
#if NET7_0_OR_GREATER
vicancy marked this conversation as resolved.
Show resolved Hide resolved
_clientInvocationManager.Router.CleanupInvocationsByConnection(connectionId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we also do this for Caller? In the case server is both router and caller.

#endif
return connection;
}

private Task OnClientInvocationAsync(ClientInvocationMessage message)
{
_clientInvocationManager.Router.AddInvocation(message.ConnectionId, message.InvocationId, message.CallerServerId, default);
return Task.CompletedTask;
}

private Task OnServiceMappingAsync(ServiceMappingMessage message)
{
_clientInvocationManager.Caller.AddServiceMapping(message);
return Task.CompletedTask;
}

private Task OnClientCompletionAsync(ClientCompletionMessage clientCompletionMessage)
{
_clientInvocationManager.Caller.TryCompleteResult(clientCompletionMessage.ConnectionId, clientCompletionMessage);
return Task.CompletedTask;
}

private Task OnErrorCompletionAsync(ErrorCompletionMessage errorCompletionMessage)
{
_clientInvocationManager.Caller.TryCompleteResult(errorCompletionMessage.ConnectionId, errorCompletionMessage);
return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ internal class ServiceConnectionFactory : IServiceConnectionFactory
private readonly IClientConnectionFactory _clientConnectionFactory;
private readonly IServerNameProvider _nameProvider;
private readonly IServiceEventHandler _serviceEventHandler;
private readonly IClientInvocationManager _clientInvocationManager;

public GracefulShutdownMode ShutdownMode { get; set; } = GracefulShutdownMode.Off;

Expand All @@ -29,7 +30,8 @@ public ServiceConnectionFactory(
ConnectionDelegate connectionDelegate,
IClientConnectionFactory clientConnectionFactory,
IServerNameProvider nameProvider,
IServiceEventHandler serviceEventHandler)
IServiceEventHandler serviceEventHandler,
IClientInvocationManager clientInvocationManager)
{
_serviceProtocol = serviceProtocol;
_clientConnectionManager = clientConnectionManager;
Expand All @@ -39,6 +41,7 @@ public ServiceConnectionFactory(
_clientConnectionFactory = clientConnectionFactory;
_nameProvider = nameProvider;
_serviceEventHandler = serviceEventHandler;
_clientInvocationManager = clientInvocationManager;
}

public virtual IServiceConnection Create(HubServiceEndpoint endpoint, IServiceMessageHandler serviceMessageHandler, ServiceConnectionType type)
Expand All @@ -55,6 +58,7 @@ public virtual IServiceConnection Create(HubServiceEndpoint endpoint, IServiceMe
endpoint,
serviceMessageHandler,
_serviceEventHandler,
_clientInvocationManager,
type,
ShutdownMode
)
Expand Down
Loading