Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add functionality to unregister message handler to servicebus sdk #6360

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ public interface IReceiverClient : IClientEntity
/// <remarks>Enable prefetch to speed up the receive rate.</remarks>
void RegisterMessageHandler(Func<Message, CancellationToken, Task> handler, MessageHandlerOptions messageHandlerOptions);

/// <summary>
/// Cancels the continuuous reception of messages without closing the underlying service bus connection and unregisters the message handler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

Suggested change
/// Cancels the continuuous reception of messages without closing the underlying service bus connection and unregisters the message handler.
/// Cancels the continuous reception of messages without closing the underlying service bus connection and unregisters the message handler.

/// <remarks>Register a message handler first, using <see cref="RegisterMessageHandler(Func{Message, CancellationToken, Task}, Func{ExceptionReceivedEventArgs, Task})"/>
/// or <see cref="RegisterMessageHandler(Func{Message, CancellationToken, Task}, MessageHandlerOptions)"/>.
/// Although the message handler is unregistered, active threads are not cancelled.</remarks>
/// </summary>
void UnregisterMessageHandler();

/// <summary>
/// Completes a <see cref="Message"/> using its lock token. This will delete the message from the queue.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,15 @@ public class MessageReceiver : ClientEntity, IMessageReceiver
readonly object messageReceivePumpSyncLock;
readonly ActiveClientLinkManager clientLinkManager;
readonly ServiceBusDiagnosticSource diagnosticSource;

int prefetchCount;
long lastPeekedSequenceNumber;
MessageReceivePump receivePump;
CancellationTokenSource receivePumpCancellationTokenSource;

// Signals that the entire processing should be immediately cancelled because the pump is getting disposed
CancellationTokenSource pumpCancellationTokenSource;

// Signals that message reception should stop but active processing should continue
CancellationTokenSource receiveCancellationTokenSource;

/// <summary>
/// Creates a new MessageReceiver from a <see cref="ServiceBusConnectionStringBuilder"/>.
Expand Down Expand Up @@ -899,6 +903,27 @@ public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handle
this.OnMessageHandler(messageHandlerOptions, handler);
}

/// <summary>
/// Cancels the continuuous reception of messages without closing the underlying service bus connection and unregisters the message handler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

Suggested change
/// Cancels the continuuous reception of messages without closing the underlying service bus connection and unregisters the message handler.
/// Cancels the continuous reception of messages without closing the underlying service bus connection and unregisters the message handler.

/// <remarks>Register a message handler first, using <see cref="RegisterMessageHandler(Func{Message, CancellationToken, Task}, Func{ExceptionReceivedEventArgs, Task})"/>
/// or <see cref="RegisterMessageHandler(Func{Message, CancellationToken, Task}, MessageHandlerOptions)"/>.
/// Although the message handler is unregistered, active threads are not cancelled.</remarks>
/// </summary>
public void UnregisterMessageHandler()
{
this.ThrowIfClosed();

lock (this.messageReceivePumpSyncLock)
{
if (this.receivePump != null)
{
this.receiveCancellationTokenSource.Cancel();
this.receiveCancellationTokenSource.Dispose();
this.receivePump = null;
}
}
}

/// <summary>
/// Registers a <see cref="ServiceBusPlugin"/> to be used with this receiver.
/// </summary>
Expand Down Expand Up @@ -1001,8 +1026,9 @@ protected override async Task OnClosingAsync()
{
if (this.receivePump != null)
{
this.receivePumpCancellationTokenSource.Cancel();
this.receivePumpCancellationTokenSource.Dispose();
this.pumpCancellationTokenSource.Cancel();
this.pumpCancellationTokenSource.Dispose();
this.receiveCancellationTokenSource.Dispose();
this.receivePump = null;
}
}
Expand Down Expand Up @@ -1276,9 +1302,15 @@ protected virtual void OnMessageHandler(
{
throw new InvalidOperationException(Resources.MessageHandlerAlreadyRegistered);
}

// pump cancellation token source can be reused on message handlers
if (this.pumpCancellationTokenSource == null)
{
this.pumpCancellationTokenSource = new CancellationTokenSource();
}

this.receivePumpCancellationTokenSource = new CancellationTokenSource();
this.receivePump = new MessageReceivePump(this, registerHandlerOptions, callback, this.ServiceBusConnection.Endpoint, this.receivePumpCancellationTokenSource.Token);
this.receiveCancellationTokenSource = new CancellationTokenSource();
this.receivePump = new MessageReceivePump(this, registerHandlerOptions, callback, this.ServiceBusConnection.Endpoint, this.pumpCancellationTokenSource.Token, this.receiveCancellationTokenSource.Token);
}

try
Expand All @@ -1292,8 +1324,9 @@ protected virtual void OnMessageHandler(
{
if (this.receivePump != null)
{
this.receivePumpCancellationTokenSource.Cancel();
this.receivePumpCancellationTokenSource.Dispose();
this.pumpCancellationTokenSource.Cancel();
this.pumpCancellationTokenSource.Dispose();
this.receiveCancellationTokenSource.Dispose();
this.receivePump = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,28 @@ sealed class MessageReceivePump
readonly string endpoint;
readonly MessageHandlerOptions registerHandlerOptions;
readonly IMessageReceiver messageReceiver;

// Signals that the entire processing should be immediately cancelled because the pump is getting disposed
readonly CancellationToken pumpCancellationToken;

// Signals that message reception should stop but active processing should continue
readonly CancellationToken receiveCancellationToken;
readonly SemaphoreSlim maxConcurrentCallsSemaphoreSlim;
readonly ServiceBusDiagnosticSource diagnosticSource;

public MessageReceivePump(IMessageReceiver messageReceiver,
MessageHandlerOptions registerHandlerOptions,
Func<Message, CancellationToken, Task> callback,
Uri endpoint,
CancellationToken pumpCancellationToken)
CancellationToken pumpCancellationToken,
CancellationToken receiveCancellationToken)
{
this.messageReceiver = messageReceiver ?? throw new ArgumentNullException(nameof(messageReceiver));
this.registerHandlerOptions = registerHandlerOptions;
this.onMessageCallback = callback;
this.endpoint = endpoint.Authority;
this.pumpCancellationToken = pumpCancellationToken;
this.receiveCancellationToken = receiveCancellationToken;
this.maxConcurrentCallsSemaphoreSlim = new SemaphoreSlim(this.registerHandlerOptions.MaxConcurrentCalls);
this.diagnosticSource = new ServiceBusDiagnosticSource(messageReceiver.Path, endpoint);
}
Expand All @@ -55,7 +62,7 @@ Task RaiseExceptionReceived(Exception e, string action)

async Task MessagePumpTaskAsync()
{
while (!this.pumpCancellationToken.IsCancellationRequested)
while (!this.pumpCancellationToken.IsCancellationRequested && ! this.receiveCancellationToken.IsCancellationRequested)
{
Message message = null;
try
Expand Down
10 changes: 10 additions & 0 deletions sdk/servicebus/Microsoft.Azure.ServiceBus/src/QueueClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,16 @@ public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handle
this.InnerReceiver.RegisterMessageHandler(handler, messageHandlerOptions);
}

/// <summary>
/// Cancels the continuuous reception of messages without closing the underlying service bus connection and unregisters the message handler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continuuous [](start = 24, length = 11)

typo

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

Suggested change
/// Cancels the continuuous reception of messages without closing the underlying service bus connection and unregisters the message handler.
/// Cancels the continuous reception of messages without closing the underlying service bus connection and unregisters the message handler.

/// <remarks>Register a message handler first, using <see cref="RegisterMessageHandler(Func{Message, CancellationToken, Task}, Func{ExceptionReceivedEventArgs, Task})"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remarks [](start = 13, length = 7)

Also add a remark mentioning the behavior when this is called multiple times

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same in other clients


In reply to: 302802451 [](ancestors = 302802451)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, looks like there is a difference in the xml doc between different files. Could you make sure all xmldocs of this method have the same comment


In reply to: 302802465 [](ancestors = 302802465,302802451)

/// or <see cref="RegisterMessageHandler(Func{Message, CancellationToken, Task}, MessageHandlerOptions)"/></remarks>
/// </summary>
public void UnregisterMessageHandler()
{
this.InnerReceiver.UnregisterMessageHandler();
}

/// <summary>
/// Receive session messages continuously from the queue. Registers a message handler and begins a new thread to receive session-messages.
/// This handler(<see cref="Func{IMessageSession, Message, CancellationToken, Task}"/>) is awaited on every time a new message is received by the queue client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,16 @@ public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handle
this.InnerSubscriptionClient.InnerReceiver.RegisterMessageHandler(handler, messageHandlerOptions);
}

/// <summary>
/// Cancels the continuuous reception of messages without closing the underlying service bus connection and unregisters the message handler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

Suggested change
/// Cancels the continuuous reception of messages without closing the underlying service bus connection and unregisters the message handler.
/// Cancels the continuous reception of messages without closing the underlying service bus connection and unregisters the message handler.

/// <remarks>Register a message handler first, using <see cref="RegisterMessageHandler(Func{Message, CancellationToken, Task}, Func{ExceptionReceivedEventArgs, Task})"/>
/// or <see cref="RegisterMessageHandler(Func{Message, CancellationToken, Task}, MessageHandlerOptions)"/></remarks>
/// </summary>
public void UnregisterMessageHandler()
{
this.InnerSubscriptionClient.InnerReceiver.UnregisterMessageHandler();
}

/// <summary>
/// Receive session messages continuously from the queue. Registers a message handler and begins a new thread to receive session-messages.
/// This handler(<see cref="Func{IMessageSession, Message, CancellationToken, Task}"/>) is awaited on every time a new message is received by the subscription client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ namespace Microsoft.Azure.ServiceBus
public System.Threading.Tasks.Task<long> ScheduleMessageAsync(Microsoft.Azure.ServiceBus.Message message, System.DateTimeOffset scheduleEnqueueTimeUtc) { }
public System.Threading.Tasks.Task SendAsync(Microsoft.Azure.ServiceBus.Message message) { }
public System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IList<Microsoft.Azure.ServiceBus.Message> messageList) { }
public void UnregisterMessageHandler() { }
public override void UnregisterPlugin(string serviceBusPluginName) { }
}
public sealed class QuotaExceededException : Microsoft.Azure.ServiceBus.ServiceBusException
Expand Down Expand Up @@ -447,6 +448,7 @@ namespace Microsoft.Azure.ServiceBus
public void RegisterSessionHandler(System.Func<Microsoft.Azure.ServiceBus.IMessageSession, Microsoft.Azure.ServiceBus.Message, System.Threading.CancellationToken, System.Threading.Tasks.Task> handler, System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> exceptionReceivedHandler) { }
public void RegisterSessionHandler(System.Func<Microsoft.Azure.ServiceBus.IMessageSession, Microsoft.Azure.ServiceBus.Message, System.Threading.CancellationToken, System.Threading.Tasks.Task> handler, Microsoft.Azure.ServiceBus.SessionHandlerOptions sessionHandlerOptions) { }
public System.Threading.Tasks.Task RemoveRuleAsync(string ruleName) { }
public void UnregisterMessageHandler() { }
public override void UnregisterPlugin(string serviceBusPluginName) { }
}
public class TopicClient : Microsoft.Azure.ServiceBus.ClientEntity, Microsoft.Azure.ServiceBus.Core.ISenderClient, Microsoft.Azure.ServiceBus.IClientEntity, Microsoft.Azure.ServiceBus.ITopicClient
Expand Down Expand Up @@ -520,6 +522,7 @@ namespace Microsoft.Azure.ServiceBus.Core
System.Threading.Tasks.Task DeadLetterAsync(string lockToken, string deadLetterReason, string deadLetterErrorDescription = null);
void RegisterMessageHandler(System.Func<Microsoft.Azure.ServiceBus.Message, System.Threading.CancellationToken, System.Threading.Tasks.Task> handler, System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> exceptionReceivedHandler);
void RegisterMessageHandler(System.Func<Microsoft.Azure.ServiceBus.Message, System.Threading.CancellationToken, System.Threading.Tasks.Task> handler, Microsoft.Azure.ServiceBus.MessageHandlerOptions messageHandlerOptions);
void UnregisterMessageHandler();
}
public interface ISenderClient : Microsoft.Azure.ServiceBus.IClientEntity
{
Expand Down Expand Up @@ -572,6 +575,7 @@ namespace Microsoft.Azure.ServiceBus.Core
public override void RegisterPlugin(Microsoft.Azure.ServiceBus.Core.ServiceBusPlugin serviceBusPlugin) { }
public System.Threading.Tasks.Task RenewLockAsync(Microsoft.Azure.ServiceBus.Message message) { }
public System.Threading.Tasks.Task<System.DateTime> RenewLockAsync(string lockToken) { }
public void UnregisterMessageHandler() { }
public override void UnregisterPlugin(string serviceBusPluginName) { }
}
public class MessageSender : Microsoft.Azure.ServiceBus.ClientEntity, Microsoft.Azure.ServiceBus.Core.IMessageSender, Microsoft.Azure.ServiceBus.Core.ISenderClient, Microsoft.Azure.ServiceBus.IClientEntity
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Microsoft.Azure.ServiceBus.UnitTests
{
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus.Core;
using Xunit;

public sealed class MessageReceiverTests : SenderReceiverClientTestBase
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessageReceiverTests [](start = 24, length = 20)

We don't have a test which ensures that once Unregister is called, there are no more messages that are received, i.e, ensure no handlers are called after that

{
public static IEnumerable<object[]> TestPermutations => new object[][]
{
// Expected structure: { usePartitionedTopic, useSessionTopic, maxCurrentCalls }
new object[] { false, false, 1 },
new object[] { false, false, 5 },
new object[] { true, false, 1 },
new object[] { true, false, 5 },
};

[Theory]
[MemberData(nameof(TestPermutations))]
[LiveTest]
[DisplayTestMethodName]
public Task UnregisterMessageHandlerPeekLockWithAutoCompleteTrue(bool partitioned, bool sessionEnabled, int maxConcurrentCalls)
{
return this.UnregisterMessageHandlerAsync(partitioned, sessionEnabled, maxConcurrentCalls, ReceiveMode.PeekLock, true);
}

[Theory]
[MemberData(nameof(TestPermutations))]
[LiveTest]
[DisplayTestMethodName]
public Task UnregisterMessageHandlerReceiveDelete(bool partitioned, bool sessionEnabled, int maxConcurrentCalls)
{
return this.UnregisterMessageHandlerAsync(partitioned, sessionEnabled, maxConcurrentCalls, ReceiveMode.ReceiveAndDelete, false);
}

private async Task UnregisterMessageHandlerAsync(bool partitioned, bool sessionEnabled, int maxConcurrentCalls, ReceiveMode mode, bool autoComplete)
{
const int messageCount = 10;

await ServiceBusScope.UsingTopicAsync(partitioned, sessionEnabled, async (topicName, subscriptionName) =>
{
var topicClient = new TopicClient(TestUtility.NamespaceConnectionString, topicName);
var subscriptionClient = new SubscriptionClient(
TestUtility.NamespaceConnectionString,
topicName,
subscriptionName,
mode);

try
{
await this.MessageHandlerUnregisterAsyncTestCase(
topicClient.InnerSender,
subscriptionClient.InnerSubscriptionClient.InnerReceiver,
maxConcurrentCalls,
autoComplete,
messageCount);
}
finally
{
await subscriptionClient.CloseAsync();
await topicClient.CloseAsync();
}
});
}


[Theory]
[MemberData(nameof(TestPermutations))]
[LiveTest]
[DisplayTestMethodName]
public Task MultipleMessageHandlersPeekLockWithAutoCompleteTrue(bool partitioned, bool sessionEnabled, int maxConcurrentCalls)
{
return this.MultipleMessageHandlersAsync(partitioned, sessionEnabled, maxConcurrentCalls, ReceiveMode.PeekLock, true);
}

[Theory]
[MemberData(nameof(TestPermutations))]
[LiveTest]
[DisplayTestMethodName]
public Task MultipleMessageHandlersReceiveDelete(bool partitioned, bool sessionEnabled, int maxConcurrentCalls)
{
return this.MultipleMessageHandlersAsync(partitioned, sessionEnabled, maxConcurrentCalls, ReceiveMode.ReceiveAndDelete, false);
}

private async Task MultipleMessageHandlersAsync(bool partitioned, bool sessionEnabled, int maxConcurrentCalls, ReceiveMode mode, bool autoComplete)
{
const int messageCount = 10;

await ServiceBusScope.UsingTopicAsync(partitioned, sessionEnabled, async (topicName, subscriptionName) =>
{
var topicClient = new TopicClient(TestUtility.NamespaceConnectionString, topicName);
var subscriptionClient = new SubscriptionClient(
TestUtility.NamespaceConnectionString,
topicName,
subscriptionName,
mode);

try
{
await this.MultipleMessageHandlerAsyncTestCase(
topicClient.InnerSender,
subscriptionClient.InnerSubscriptionClient.InnerReceiver,
maxConcurrentCalls,
autoComplete,
messageCount);
}
finally
{
await subscriptionClient.CloseAsync();
await topicClient.CloseAsync();
}
});
}
}
}
Loading