Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable a way to Unregister Message Handler and Session Handler #14021

Merged
merged 23 commits into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8c46a41
add UnregisterMessageHandler method
DorothySun216 Aug 6, 2020
db9b322
Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/IReceiverCl…
DorothySun216 Aug 10, 2020
da77a24
Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageRece…
DorothySun216 Aug 10, 2020
5e1576a
Update the unregister method to be async and await for inflight opera…
DorothySun216 Aug 13, 2020
61f0998
Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/SubscriptionClie…
DorothySun216 Aug 13, 2020
2988d02
Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/Core/MessageRece…
DorothySun216 Aug 13, 2020
d3faba2
Update sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs
DorothySun216 Aug 13, 2020
4b921a6
Change name to have async suffix and add to existing onMessageQueueTests
DorothySun216 Aug 13, 2020
78aff54
Add UnregisterSessionHandlerAsync and corresponding tests
DorothySun216 Aug 13, 2020
25f6b8d
nit
DorothySun216 Aug 13, 2020
d4a5589
nit
DorothySun216 Aug 13, 2020
ecd820b
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-net i…
DorothySun216 Aug 20, 2020
cd8f044
Add a new cancellation type to not cancel inflight message handling o…
DorothySun216 Aug 23, 2020
fff0016
Add another type of cancellation token to session handler path
DorothySun216 Aug 24, 2020
fae7bf1
nit
DorothySun216 Aug 24, 2020
642dc36
Add a timeout parameter to unregister functions and add according uni…
DorothySun216 Sep 4, 2020
ffc4249
nit
DorothySun216 Sep 4, 2020
fbd31d0
cancel runningTaskCancellationTokenSource after unregister is done
DorothySun216 Sep 4, 2020
f4dac4b
change public API
DorothySun216 Sep 9, 2020
9692122
update the API header
DorothySun216 Sep 9, 2020
7741ad9
update the API definition
DorothySun216 Sep 10, 2020
2bb96a5
fix spacing
DorothySun216 Sep 10, 2020
c0ec74a
fix ApproveAzureServiceBus CIT test
DorothySun216 Sep 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ public interface IReceiverClient : IClientEntity
/// <remarks>Enable prefetch to speed up the receive rate.</remarks>
void RegisterMessageHandler(Func<Message, CancellationToken, Task> handler, MessageHandlerOptions messageHandlerOptions);

/// <summary>
/// Unregister message handler from the receiver if there is an active message handler registered. This operation waits for the completion
/// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously
/// registered.
/// <param name="inflightMessageHandlerTasksWaitTimeout"> is the waitTimeout for inflight message handling tasks.
/// </summary>
Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout);

/// <summary>
/// Completes a <see cref="Message"/> using its lock token. This will delete the message from the queue.
/// </summary>
Expand Down Expand Up @@ -115,4 +123,4 @@ public interface IReceiverClient : IClientEntity
/// </remarks>
Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ public class MessageReceiver : ClientEntity, IMessageReceiver
int prefetchCount;
long lastPeekedSequenceNumber;
MessageReceivePump receivePump;
// Cancellation token to cancel the message pump. Once this is fired, all future message handling operations registered by application will be
// cancelled.
CancellationTokenSource receivePumpCancellationTokenSource;
// Cancellation token to cancel the inflight message handling operations registered by application in the message pump.
CancellationTokenSource runningTaskCancellationTokenSource;

/// <summary>
/// Creates a new MessageReceiver from a <see cref="ServiceBusConnectionStringBuilder"/>.
Expand Down Expand Up @@ -899,6 +903,51 @@ public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handle
this.OnMessageHandler(messageHandlerOptions, handler);
}

/// <summary>
/// Unregister message handler from the receiver if there is an active message handler registered. This operation waits for the completion
/// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously
/// registered.
/// <param name="inflightMessageHandlerTasksWaitTimeout"> is the waitTimeout for inflight message handling tasks.
/// </summary>
public async Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout)
{
this.ThrowIfClosed();

if (inflightMessageHandlerTasksWaitTimeout <= TimeSpan.Zero)
{
throw Fx.Exception.ArgumentOutOfRange(nameof(inflightMessageHandlerTasksWaitTimeout), inflightMessageHandlerTasksWaitTimeout, Resources.TimeoutMustBePositiveNonZero.FormatForUser(nameof(inflightMessageHandlerTasksWaitTimeout), inflightMessageHandlerTasksWaitTimeout));
}

MessagingEventSource.Log.UnregisterMessageHandlerStart(this.ClientId);
lock (this.messageReceivePumpSyncLock)
{
if (this.receivePump == null || this.receivePumpCancellationTokenSource.IsCancellationRequested)
{
// Silently return if handler has already been unregistered.
return;
}

this.receivePumpCancellationTokenSource.Cancel();
this.receivePumpCancellationTokenSource.Dispose();
}

Stopwatch stopWatch = Stopwatch.StartNew();
while (this.receivePump != null
&& stopWatch.Elapsed < inflightMessageHandlerTasksWaitTimeout
&& this.receivePump.maxConcurrentCallsSemaphoreSlim.CurrentCount < this.receivePump.registerHandlerOptions.MaxConcurrentCalls)
{
await Task.Delay(10).ConfigureAwait(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not very comfortable with this potential infinite loop. Imagine OnMessage delegate never completes.then UnregisterMessageHandler never completes. This Unregister doesn't guarantee a clean close, but only a timed unregister. I mean it will wait for some time for all OnMessage delegates to finish, but there is no guarantee. We should ideally take some max timeout or some default timeout for the Unregister.

Copy link
Member

@sidkri sidkri Sep 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be important to make this timeout configurable (and a default used if not specified). In Azure Functions, we provide a large amount of time for function execution to complete and need to provide the same amount of time while shutting down (this API will be invoked during shutdown)

}

lock (this.messageReceivePumpSyncLock)
{
this.runningTaskCancellationTokenSource.Cancel();
this.runningTaskCancellationTokenSource.Dispose();
this.receivePump = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest cancelling and disposing the runningTaskCancellationTokenSource here.

}
MessagingEventSource.Log.UnregisterMessageHandlerStop(this.ClientId);
}

/// <summary>
/// Registers a <see cref="ServiceBusPlugin"/> to be used with this receiver.
/// </summary>
Expand Down Expand Up @@ -1003,6 +1052,9 @@ protected override async Task OnClosingAsync()
{
this.receivePumpCancellationTokenSource.Cancel();
this.receivePumpCancellationTokenSource.Dispose();
// For back-compatibility
this.runningTaskCancellationTokenSource.Cancel();
this.runningTaskCancellationTokenSource.Dispose();
this.receivePump = null;
}
}
Expand Down Expand Up @@ -1279,7 +1331,13 @@ protected virtual void OnMessageHandler(
}

this.receivePumpCancellationTokenSource = new CancellationTokenSource();
this.receivePump = new MessageReceivePump(this, registerHandlerOptions, callback, this.ServiceBusConnection.Endpoint, this.receivePumpCancellationTokenSource.Token);

if (this.runningTaskCancellationTokenSource == null)
{
this.runningTaskCancellationTokenSource = new CancellationTokenSource();
}

this.receivePump = new MessageReceivePump(this, registerHandlerOptions, callback, this.ServiceBusConnection.Endpoint, this.receivePumpCancellationTokenSource.Token, this.runningTaskCancellationTokenSource.Token);
}

try
Expand All @@ -1295,6 +1353,8 @@ protected virtual void OnMessageHandler(
{
this.receivePumpCancellationTokenSource.Cancel();
this.receivePumpCancellationTokenSource.Dispose();
this.runningTaskCancellationTokenSource.Cancel();
this.runningTaskCancellationTokenSource.Dispose();
this.receivePump = null;
}
}
Expand Down
8 changes: 8 additions & 0 deletions sdk/servicebus/Microsoft.Azure.ServiceBus/src/IQueueClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,13 @@ public interface IQueueClient : IReceiverClient, ISenderClient
/// <param name="sessionHandlerOptions">Options used to configure the settings of the session pump.</param>
/// <remarks>Enable prefetch to speed up the receive rate. </remarks>
void RegisterSessionHandler(Func<IMessageSession, Message, CancellationToken, Task> handler, SessionHandlerOptions sessionHandlerOptions);

/// <summary>
/// Unregister session handler from the receiver if there is an active session handler registered. This operation waits for the completion
/// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously
/// registered.
/// <param name="inflightSessionHandlerTasksWaitTimeout"> is the waitTimeout for inflight session handling tasks.
/// </summary>
Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add another function signature for UnregisterSession/MessageHandlerAsync that takes in empty parameter and set a default wait timeout?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could change the function signature like this and check for null in the code
Task UnregisterSessionHandlerAsync(TimeSpan? inflightSessionHandlerTasksWaitTimeout = null);

What should null represent though? It can't be zero or draining will not happen so infinite time seems like the best default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I thought about this but our current function signature doesn't have optional parameters, eg. ReceiveAsync() and ReceiveAsync(TimeSpan operationTimeout). So I am not sure if we want to introduce this.

Copy link
Member

@sidkri sidkri Sep 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. To be consistent then, that convention is appropriate. In the case of ReceiveAsync() its using the OperationTimeout value configured on the connection string or specified when the connection was created and technically is a timeout for a single operation v/s Unregister* which is allowing multiple handlers to complete execution as well as interact over the service bus connection (to complete/abandon messages). It may be ok to repurpose this.OperationTimeout but if we want a different default, I'd suggest just creating a new configuration element that has a default value. This allows consumers to configure it in a pass through way with managed services like Functions (as is the case for most of the service bus sdk configuration). With a configuration element v/s function argument, you can go back to having a single function "Unregister*()"

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,5 +114,13 @@ public interface ISubscriptionClient : IReceiverClient
/// <param name="sessionHandlerOptions">Options used to configure the settings of the session pump.</param>
/// <remarks>Enable prefetch to speed up the receive rate. </remarks>
void RegisterSessionHandler(Func<IMessageSession, Message, CancellationToken, Task> handler, SessionHandlerOptions sessionHandlerOptions);

/// <summary>
/// Unregister session handler from the receiver if there is an active session handler registered. This operation waits for the completion
/// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously
/// registered.
/// <param name="inflightSessionHandlerTasksWaitTimeout"> is the waitTimeout for inflight session handling tasks.
/// </summary>
Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,28 @@ namespace Microsoft.Azure.ServiceBus

sealed class MessageReceivePump
{
public readonly SemaphoreSlim maxConcurrentCallsSemaphoreSlim;
public readonly MessageHandlerOptions registerHandlerOptions;
readonly Func<Message, CancellationToken, Task> onMessageCallback;
readonly string endpoint;
readonly MessageHandlerOptions registerHandlerOptions;
readonly IMessageReceiver messageReceiver;
readonly CancellationToken pumpCancellationToken;
readonly SemaphoreSlim maxConcurrentCallsSemaphoreSlim;
readonly CancellationToken runningTaskCancellationToken;
readonly ServiceBusDiagnosticSource diagnosticSource;

public MessageReceivePump(IMessageReceiver messageReceiver,
MessageHandlerOptions registerHandlerOptions,
Func<Message, CancellationToken, Task> callback,
Uri endpoint,
CancellationToken pumpCancellationToken)
CancellationToken pumpCancellationToken,
CancellationToken runningTaskCancellationToken)
{
this.messageReceiver = messageReceiver ?? throw new ArgumentNullException(nameof(messageReceiver));
this.registerHandlerOptions = registerHandlerOptions;
this.onMessageCallback = callback;
this.endpoint = endpoint.Authority;
this.pumpCancellationToken = pumpCancellationToken;
this.runningTaskCancellationToken = runningTaskCancellationToken;
this.maxConcurrentCallsSemaphoreSlim = new SemaphoreSlim(this.registerHandlerOptions.MaxConcurrentCalls);
this.diagnosticSource = new ServiceBusDiagnosticSource(messageReceiver.Path, endpoint);
}
Expand Down Expand Up @@ -163,7 +166,7 @@ async Task MessageDispatchTask(Message message)
try
{
MessagingEventSource.Log.MessageReceiverPumpUserCallbackStart(this.messageReceiver.ClientId, message);
await this.onMessageCallback(message, this.pumpCancellationToken).ConfigureAwait(false);
await this.onMessageCallback(message, this.runningTaskCancellationToken).ConfigureAwait(false);

MessagingEventSource.Log.MessageReceiverPumpUserCallbackStop(this.messageReceiver.ClientId, message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1380,6 +1380,42 @@ public void ManagementSerializationException(string objectName, string details =
this.WriteEvent(117, objectName, details);
}
}

[Event(118, Level = EventLevel.Informational, Message = "{0}: Unregister MessageHandler start.")]
public void UnregisterMessageHandlerStart(string clientId)
{
if (this.IsEnabled())
{
this.WriteEvent(118, clientId);
}
}

[Event(119, Level = EventLevel.Informational, Message = "{0}: Unregister MessageHandler done.")]
public void UnregisterMessageHandlerStop(string clientId)
{
if (this.IsEnabled())
{
this.WriteEvent(119, clientId);
}
}

[Event(120, Level = EventLevel.Informational, Message = "{0}: Unregister SessionHandler start.")]
public void UnregisterSessionHandlerStart(string clientId)
{
if (this.IsEnabled())
{
this.WriteEvent(120, clientId);
}
}

[Event(121, Level = EventLevel.Informational, Message = "{0}: Unregister SessionHandler done.")]
public void UnregisterSessionHandlerStop(string clientId)
{
if (this.IsEnabled())
{
this.WriteEvent(121, clientId);
}
}
}

internal static class TraceHelper
Expand Down
24 changes: 24 additions & 0 deletions sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,18 @@ public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handle
this.InnerReceiver.RegisterMessageHandler(handler, messageHandlerOptions);
}

/// <summary>
/// Unregister message handler from the receiver if there is an active message handler registered. This operation waits for the completion
/// of inflight receive and message handling operations to finish and unregisters future receives on the message handler which previously
/// registered.
/// <param name="inflightMessageHandlerTasksWaitTimeout"> is the waitTimeout for inflight message handling tasks.
/// </summary>
public async Task UnregisterMessageHandlerAsync(TimeSpan inflightMessageHandlerTasksWaitTimeout)
{
this.ThrowIfClosed();
await this.InnerReceiver.UnregisterMessageHandlerAsync(inflightMessageHandlerTasksWaitTimeout).ConfigureAwait(false);
}

/// <summary>
/// Receive session messages continuously from the queue. Registers a message handler and begins a new thread to receive session-messages.
/// This handler(<see cref="Func{IMessageSession, Message, CancellationToken, Task}"/>) is awaited on every time a new message is received by the queue client.
Expand Down Expand Up @@ -476,6 +488,18 @@ public void RegisterSessionHandler(Func<IMessageSession, Message, CancellationTo
this.SessionPumpHost.OnSessionHandler(handler, sessionHandlerOptions);
}

/// <summary>
/// Unregister session handler from the receiver if there is an active session handler registered. This operation waits for the completion
/// of inflight receive and session handling operations to finish and unregisters future receives on the session handler which previously
/// registered.
/// <param name="inflightSessionHandlerTasksWaitTimeout"> is the waitTimeout for inflight session handling tasks.
/// </summary>
public async Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout)
{
this.ThrowIfClosed();
await this.SessionPumpHost.UnregisterSessionHandlerAsync(inflightSessionHandlerTasksWaitTimeout).ConfigureAwait(false);
}

/// <summary>
/// Schedules a message to appear on Service Bus at a later time.
/// </summary>
Expand Down
54 changes: 53 additions & 1 deletion sdk/servicebus/Microsoft.Azure.ServiceBus/src/SessionPumpHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

namespace Microsoft.Azure.ServiceBus
{
using Microsoft.Azure.ServiceBus.Primitives;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -12,6 +14,7 @@ internal sealed class SessionPumpHost
readonly object syncLock;
SessionReceivePump sessionReceivePump;
CancellationTokenSource sessionPumpCancellationTokenSource;
CancellationTokenSource runningTaskCancellationTokenSource;
readonly Uri endpoint;

public SessionPumpHost(string clientId, ReceiveMode receiveMode, ISessionClient sessionClient, Uri endpoint)
Expand All @@ -35,6 +38,9 @@ public void Close()
{
this.sessionPumpCancellationTokenSource?.Cancel();
this.sessionPumpCancellationTokenSource?.Dispose();
// For back-compatibility
this.runningTaskCancellationTokenSource?.Cancel();
this.runningTaskCancellationTokenSource?.Dispose();
this.sessionReceivePump = null;
}
}
Expand All @@ -53,14 +59,22 @@ public void OnSessionHandler(
}

this.sessionPumpCancellationTokenSource = new CancellationTokenSource();

// Running task cancellation token source can be reused if previously UnregisterSessionHandlerAsync was called
if (this.runningTaskCancellationTokenSource == null)
{
this.runningTaskCancellationTokenSource = new CancellationTokenSource();
}

this.sessionReceivePump = new SessionReceivePump(
this.ClientId,
this.SessionClient,
this.ReceiveMode,
sessionHandlerOptions,
callback,
this.endpoint,
this.sessionPumpCancellationTokenSource.Token);
this.sessionPumpCancellationTokenSource.Token,
this.runningTaskCancellationTokenSource.Token);
}

try
Expand All @@ -82,5 +96,43 @@ public void OnSessionHandler(

MessagingEventSource.Log.RegisterOnSessionHandlerStop(this.ClientId);
}

public async Task UnregisterSessionHandlerAsync(TimeSpan inflightSessionHandlerTasksWaitTimeout)
{
if (inflightSessionHandlerTasksWaitTimeout <= TimeSpan.Zero)
{
throw Fx.Exception.ArgumentOutOfRange(nameof(inflightSessionHandlerTasksWaitTimeout), inflightSessionHandlerTasksWaitTimeout, Resources.TimeoutMustBePositiveNonZero.FormatForUser(nameof(inflightSessionHandlerTasksWaitTimeout), inflightSessionHandlerTasksWaitTimeout));
}

MessagingEventSource.Log.UnregisterSessionHandlerStart(this.ClientId);
lock (this.syncLock)
{
if (this.sessionReceivePump == null || this.sessionPumpCancellationTokenSource.IsCancellationRequested)
{
// Silently return if handler has already been unregistered.
return;
}

this.sessionPumpCancellationTokenSource.Cancel();
this.sessionPumpCancellationTokenSource.Dispose();
}

Stopwatch stopWatch = Stopwatch.StartNew();
while (this.sessionReceivePump != null
&& stopWatch.Elapsed < inflightSessionHandlerTasksWaitTimeout
&& (this.sessionReceivePump.maxConcurrentSessionsSemaphoreSlim.CurrentCount <
this.sessionReceivePump.sessionHandlerOptions.MaxConcurrentSessions
|| this.sessionReceivePump.maxPendingAcceptSessionsSemaphoreSlim.CurrentCount <
this.sessionReceivePump.sessionHandlerOptions.MaxConcurrentAcceptSessionCalls))
{
await Task.Delay(10).ConfigureAwait(false);
}

lock (this.sessionPumpCancellationTokenSource)
{
this.sessionReceivePump = null;
}
MessagingEventSource.Log.UnregisterSessionHandlerStop(this.ClientId);
}
}
}
Loading