-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add functionality to unregister message handler to servicebus sdk #6360
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -50,11 +50,15 @@ public class MessageReceiver : ClientEntity, IMessageReceiver | |||||
readonly object messageReceivePumpSyncLock; | ||||||
readonly ActiveClientLinkManager clientLinkManager; | ||||||
readonly ServiceBusDiagnosticSource diagnosticSource; | ||||||
|
||||||
int prefetchCount; | ||||||
long lastPeekedSequenceNumber; | ||||||
MessageReceivePump receivePump; | ||||||
CancellationTokenSource receivePumpCancellationTokenSource; | ||||||
|
||||||
// Signals that the entire processing should be immediately cancelled because the pump is getting disposed | ||||||
CancellationTokenSource pumpCancellationTokenSource; | ||||||
|
||||||
// Signals that message reception should stop but active processing should continue | ||||||
CancellationTokenSource receiveCancellationTokenSource; | ||||||
|
||||||
/// <summary> | ||||||
/// Creates a new MessageReceiver from a <see cref="ServiceBusConnectionStringBuilder"/>. | ||||||
|
@@ -899,6 +903,27 @@ public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handle | |||||
this.OnMessageHandler(messageHandlerOptions, handler); | ||||||
} | ||||||
|
||||||
/// <summary> | ||||||
/// Cancels the continuuous reception of messages without closing the underlying service bus connection and unregisters the message handler. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typo
Suggested change
|
||||||
/// <remarks>Register a message handler first, using <see cref="RegisterMessageHandler(Func{Message, CancellationToken, Task}, Func{ExceptionReceivedEventArgs, Task})"/> | ||||||
/// or <see cref="RegisterMessageHandler(Func{Message, CancellationToken, Task}, MessageHandlerOptions)"/>. | ||||||
/// Although the message handler is unregistered, active threads are not cancelled.</remarks> | ||||||
/// </summary> | ||||||
public void UnregisterMessageHandler() | ||||||
{ | ||||||
this.ThrowIfClosed(); | ||||||
|
||||||
lock (this.messageReceivePumpSyncLock) | ||||||
{ | ||||||
if (this.receivePump != null) | ||||||
{ | ||||||
this.receiveCancellationTokenSource.Cancel(); | ||||||
this.receiveCancellationTokenSource.Dispose(); | ||||||
this.receivePump = null; | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
/// <summary> | ||||||
/// Registers a <see cref="ServiceBusPlugin"/> to be used with this receiver. | ||||||
/// </summary> | ||||||
|
@@ -1001,8 +1026,9 @@ protected override async Task OnClosingAsync() | |||||
{ | ||||||
if (this.receivePump != null) | ||||||
{ | ||||||
this.receivePumpCancellationTokenSource.Cancel(); | ||||||
this.receivePumpCancellationTokenSource.Dispose(); | ||||||
this.pumpCancellationTokenSource.Cancel(); | ||||||
this.pumpCancellationTokenSource.Dispose(); | ||||||
this.receiveCancellationTokenSource.Dispose(); | ||||||
this.receivePump = null; | ||||||
} | ||||||
} | ||||||
|
@@ -1276,9 +1302,15 @@ protected virtual void OnMessageHandler( | |||||
{ | ||||||
throw new InvalidOperationException(Resources.MessageHandlerAlreadyRegistered); | ||||||
} | ||||||
|
||||||
// pump cancellation token source can be reused on message handlers | ||||||
if (this.pumpCancellationTokenSource == null) | ||||||
{ | ||||||
this.pumpCancellationTokenSource = new CancellationTokenSource(); | ||||||
} | ||||||
|
||||||
this.receivePumpCancellationTokenSource = new CancellationTokenSource(); | ||||||
this.receivePump = new MessageReceivePump(this, registerHandlerOptions, callback, this.ServiceBusConnection.Endpoint, this.receivePumpCancellationTokenSource.Token); | ||||||
this.receiveCancellationTokenSource = new CancellationTokenSource(); | ||||||
this.receivePump = new MessageReceivePump(this, registerHandlerOptions, callback, this.ServiceBusConnection.Endpoint, this.pumpCancellationTokenSource.Token, this.receiveCancellationTokenSource.Token); | ||||||
} | ||||||
|
||||||
try | ||||||
|
@@ -1292,8 +1324,9 @@ protected virtual void OnMessageHandler( | |||||
{ | ||||||
if (this.receivePump != null) | ||||||
{ | ||||||
this.receivePumpCancellationTokenSource.Cancel(); | ||||||
this.receivePumpCancellationTokenSource.Dispose(); | ||||||
this.pumpCancellationTokenSource.Cancel(); | ||||||
this.pumpCancellationTokenSource.Dispose(); | ||||||
this.receiveCancellationTokenSource.Dispose(); | ||||||
this.receivePump = null; | ||||||
} | ||||||
} | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -445,6 +445,16 @@ public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handle | |||||
this.InnerReceiver.RegisterMessageHandler(handler, messageHandlerOptions); | ||||||
} | ||||||
|
||||||
/// <summary> | ||||||
/// Cancels the continuuous reception of messages without closing the underlying service bus connection and unregisters the message handler. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
typo There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typo
Suggested change
|
||||||
/// <remarks>Register a message handler first, using <see cref="RegisterMessageHandler(Func{Message, CancellationToken, Task}, Func{ExceptionReceivedEventArgs, Task})"/> | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Also add a remark mentioning the behavior when this is called multiple times There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, looks like there is a difference in the xml doc between different files. Could you make sure all xmldocs of this method have the same comment In reply to: 302802465 [](ancestors = 302802465,302802451) |
||||||
/// or <see cref="RegisterMessageHandler(Func{Message, CancellationToken, Task}, MessageHandlerOptions)"/></remarks> | ||||||
/// </summary> | ||||||
public void UnregisterMessageHandler() | ||||||
{ | ||||||
this.InnerReceiver.UnregisterMessageHandler(); | ||||||
} | ||||||
|
||||||
/// <summary> | ||||||
/// Receive session messages continuously from the queue. Registers a message handler and begins a new thread to receive session-messages. | ||||||
/// This handler(<see cref="Func{IMessageSession, Message, CancellationToken, Task}"/>) is awaited on every time a new message is received by the queue client. | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -419,6 +419,16 @@ public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handle | |||||
this.InnerSubscriptionClient.InnerReceiver.RegisterMessageHandler(handler, messageHandlerOptions); | ||||||
} | ||||||
|
||||||
/// <summary> | ||||||
/// Cancels the continuuous reception of messages without closing the underlying service bus connection and unregisters the message handler. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typo
Suggested change
|
||||||
/// <remarks>Register a message handler first, using <see cref="RegisterMessageHandler(Func{Message, CancellationToken, Task}, Func{ExceptionReceivedEventArgs, Task})"/> | ||||||
/// or <see cref="RegisterMessageHandler(Func{Message, CancellationToken, Task}, MessageHandlerOptions)"/></remarks> | ||||||
/// </summary> | ||||||
public void UnregisterMessageHandler() | ||||||
{ | ||||||
this.InnerSubscriptionClient.InnerReceiver.UnregisterMessageHandler(); | ||||||
} | ||||||
|
||||||
/// <summary> | ||||||
/// Receive session messages continuously from the queue. Registers a message handler and begins a new thread to receive session-messages. | ||||||
/// This handler(<see cref="Func{IMessageSession, Message, CancellationToken, Task}"/>) is awaited on every time a new message is received by the subscription client. | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
// Copyright (c) Microsoft. All rights reserved. | ||
// Licensed under the MIT license. See LICENSE file in the project root for full license information. | ||
|
||
namespace Microsoft.Azure.ServiceBus.UnitTests | ||
{ | ||
using System.Collections.Generic; | ||
using System.Threading.Tasks; | ||
using Microsoft.Azure.ServiceBus.Core; | ||
using Xunit; | ||
|
||
public sealed class MessageReceiverTests : SenderReceiverClientTestBase | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We don't have a test which ensures that once |
||
{ | ||
public static IEnumerable<object[]> TestPermutations => new object[][] | ||
{ | ||
// Expected structure: { usePartitionedTopic, useSessionTopic, maxCurrentCalls } | ||
new object[] { false, false, 1 }, | ||
new object[] { false, false, 5 }, | ||
new object[] { true, false, 1 }, | ||
new object[] { true, false, 5 }, | ||
}; | ||
|
||
[Theory] | ||
[MemberData(nameof(TestPermutations))] | ||
[LiveTest] | ||
[DisplayTestMethodName] | ||
public Task UnregisterMessageHandlerPeekLockWithAutoCompleteTrue(bool partitioned, bool sessionEnabled, int maxConcurrentCalls) | ||
{ | ||
return this.UnregisterMessageHandlerAsync(partitioned, sessionEnabled, maxConcurrentCalls, ReceiveMode.PeekLock, true); | ||
} | ||
|
||
[Theory] | ||
[MemberData(nameof(TestPermutations))] | ||
[LiveTest] | ||
[DisplayTestMethodName] | ||
public Task UnregisterMessageHandlerReceiveDelete(bool partitioned, bool sessionEnabled, int maxConcurrentCalls) | ||
{ | ||
return this.UnregisterMessageHandlerAsync(partitioned, sessionEnabled, maxConcurrentCalls, ReceiveMode.ReceiveAndDelete, false); | ||
} | ||
|
||
private async Task UnregisterMessageHandlerAsync(bool partitioned, bool sessionEnabled, int maxConcurrentCalls, ReceiveMode mode, bool autoComplete) | ||
{ | ||
const int messageCount = 10; | ||
|
||
await ServiceBusScope.UsingTopicAsync(partitioned, sessionEnabled, async (topicName, subscriptionName) => | ||
{ | ||
var topicClient = new TopicClient(TestUtility.NamespaceConnectionString, topicName); | ||
var subscriptionClient = new SubscriptionClient( | ||
TestUtility.NamespaceConnectionString, | ||
topicName, | ||
subscriptionName, | ||
mode); | ||
|
||
try | ||
{ | ||
await this.MessageHandlerUnregisterAsyncTestCase( | ||
topicClient.InnerSender, | ||
subscriptionClient.InnerSubscriptionClient.InnerReceiver, | ||
maxConcurrentCalls, | ||
autoComplete, | ||
messageCount); | ||
} | ||
finally | ||
{ | ||
await subscriptionClient.CloseAsync(); | ||
await topicClient.CloseAsync(); | ||
} | ||
}); | ||
} | ||
|
||
|
||
[Theory] | ||
[MemberData(nameof(TestPermutations))] | ||
[LiveTest] | ||
[DisplayTestMethodName] | ||
public Task MultipleMessageHandlersPeekLockWithAutoCompleteTrue(bool partitioned, bool sessionEnabled, int maxConcurrentCalls) | ||
{ | ||
return this.MultipleMessageHandlersAsync(partitioned, sessionEnabled, maxConcurrentCalls, ReceiveMode.PeekLock, true); | ||
} | ||
|
||
[Theory] | ||
[MemberData(nameof(TestPermutations))] | ||
[LiveTest] | ||
[DisplayTestMethodName] | ||
public Task MultipleMessageHandlersReceiveDelete(bool partitioned, bool sessionEnabled, int maxConcurrentCalls) | ||
{ | ||
return this.MultipleMessageHandlersAsync(partitioned, sessionEnabled, maxConcurrentCalls, ReceiveMode.ReceiveAndDelete, false); | ||
} | ||
|
||
private async Task MultipleMessageHandlersAsync(bool partitioned, bool sessionEnabled, int maxConcurrentCalls, ReceiveMode mode, bool autoComplete) | ||
{ | ||
const int messageCount = 10; | ||
|
||
await ServiceBusScope.UsingTopicAsync(partitioned, sessionEnabled, async (topicName, subscriptionName) => | ||
{ | ||
var topicClient = new TopicClient(TestUtility.NamespaceConnectionString, topicName); | ||
var subscriptionClient = new SubscriptionClient( | ||
TestUtility.NamespaceConnectionString, | ||
topicName, | ||
subscriptionName, | ||
mode); | ||
|
||
try | ||
{ | ||
await this.MultipleMessageHandlerAsyncTestCase( | ||
topicClient.InnerSender, | ||
subscriptionClient.InnerSubscriptionClient.InnerReceiver, | ||
maxConcurrentCalls, | ||
autoComplete, | ||
messageCount); | ||
} | ||
finally | ||
{ | ||
await subscriptionClient.CloseAsync(); | ||
await topicClient.CloseAsync(); | ||
} | ||
}); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo