Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add functionality to unregister message handler to servicebus sdk #6360

Closed
wants to merge 4 commits into from

Conversation

rangp
Copy link

@rangp rangp commented May 21, 2019

The MessageReceiver class and thus the topic and queue clients have the method RegisterMessageHandler that allow registration of a message handler that will be used to process incoming messages without the need to manually receive or complete those messages.

However there is no method to unregister the message handler and the only way to stop automatic message processing seems to be calling CloseAsync which will stop message reception but will also immediately close the service bus connection, making it impossible to complete messages that are currently processed, if the processing is long running (And PeekLock is used). Cancelling the processing upon CloseAsync is not an option in my use-case since the processing is kind of a transaction scope and has to be completed fully or not at all.

Hence I feel the need for functionality that allows one to cancel automatic message reception while still keeping the service bus connection open. This pull request introduces the method UnregisterMessageHandler implemented in a way that will solve my problem and I think it would be a consistent thing to add to the API, that could also be used to change message handlers without closing the service bus connection.

Kind regards
Peter

@msftclas
Copy link

msftclas commented May 21, 2019

CLA assistant check
All CLA requirements met.

@azuresdkci
Copy link
Contributor

Can one of the admins verify this patch?

@ramya-rao-a ramya-rao-a requested review from nemakam and binzywu and removed request for AlexGhiondea June 28, 2019 01:23
await Task.Delay(TimeSpan.FromSeconds(5));
}

Assert.True(count < messageCount);
Copy link
Contributor

@nemakam nemakam Jul 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

messageCount [](start = 40, length = 12)

Should this be count <= maxConcurrentCalls?

messageReceiver.UnregisterMessageHandler();

// recursively register a new handler and expect it to handle the open messages
HandlerRegistration(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HandlerRegistration(false); [](start = 28, length = 27)

Am I missing something? How is this even working? For maxConcurrentCall of say 5, there are 5 different instances of this handler code which are trying to call RegisterMessageHandler. And there is always a race condition where two of them try to register without unregistering in between. And that would throw.


Interlocked.Increment(ref count);
},
new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = maxConcurrentCalls, AutoComplete = autoComplete });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExceptionReceivedHandler [](start = 46, length = 24)

I am thinking for these tests we need to make sure there are no exceptions. We need to have another variable and Assert that there are no exceptions received during this time.

messageReceiver.UnregisterMessageHandler();

// Simulate "long running" task
Thread.Sleep(1000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread.Sleep(1000); [](start = 28, length = 19)

await Task.Delay()


// Wait for the OnMessage Tasks to finish
var stopwatch = Stopwatch.StartNew();
while (stopwatch.Elapsed.TotalSeconds <= 30)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

30 [](start = 61, length = 2)

4 different permutations of this test, which essentially means 2 mins added run time. Not a blocker, but maybe consider optimizing this to a smaller number.

Copy link
Contributor

@nemakam nemakam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🕐

@@ -445,6 +445,16 @@ public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handle
this.InnerReceiver.RegisterMessageHandler(handler, messageHandlerOptions);
}

/// <summary>
/// Cancels the continuuous reception of messages without closing the underlying service bus connection and unregisters the message handler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

continuuous [](start = 24, length = 11)

typo

@@ -445,6 +445,16 @@ public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handle
this.InnerReceiver.RegisterMessageHandler(handler, messageHandlerOptions);
}

/// <summary>
/// Cancels the continuuous reception of messages without closing the underlying service bus connection and unregisters the message handler.
/// <remarks>Register a message handler first, using <see cref="RegisterMessageHandler(Func{Message, CancellationToken, Task}, Func{ExceptionReceivedEventArgs, Task})"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remarks [](start = 13, length = 7)

Also add a remark mentioning the behavior when this is called multiple times

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same in other clients


In reply to: 302802451 [](ancestors = 302802451)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, looks like there is a difference in the xml doc between different files. Could you make sure all xmldocs of this method have the same comment


In reply to: 302802465 [](ancestors = 302802465,302802451)

@nemakam
Copy link
Contributor

nemakam commented Jul 12, 2019

sealed class MessageReceivePump

This needs to be done for SessionReceivePump as well to keep it consistent.


Refers to: sdk/servicebus/Microsoft.Azure.ServiceBus/src/MessageReceivePump.cs:13 in 8637512. [](commit_id = 8637512, deletion_comment = False)


// Wait for the OnMessage Tasks to finish
var stopwatch = Stopwatch.StartNew();
while (stopwatch.Elapsed.TotalSeconds <= 60)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

60 [](start = 53, length = 2)

Again, could you reduce this further (and enable prefetch on the receiver so that messages are received faster)

}
await Task.Delay(TimeSpan.FromSeconds(5));
}
Assert.True(count == messageCount);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True [](start = 19, length = 4)

Assert.Equal

bool autoComplete,
int messageCount)
{
var count = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could you please take care of indentation?


if (messageReceiver.ReceiveMode == ReceiveMode.PeekLock && !autoComplete)
{
// Completion should still work, even after message handler is stopped
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Completion should still work, even after message handler is stopped [](start = 52, length = 70)

nit: indentation

using Microsoft.Azure.ServiceBus.Core;
using Xunit;

public sealed class MessageReceiverTests : SenderReceiverClientTestBase
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessageReceiverTests [](start = 24, length = 20)

We don't have a test which ensures that once Unregister is called, there are no more messages that are received, i.e, ensure no handlers are called after that

@nemakam
Copy link
Contributor

nemakam commented Jul 12, 2019

@rangp, thanks for the PR. The overall design looks good. There are couple of small changes that I have requested.

@@ -419,6 +419,16 @@ public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handle
this.InnerSubscriptionClient.InnerReceiver.RegisterMessageHandler(handler, messageHandlerOptions);
}

/// <summary>
/// Cancels the continuuous reception of messages without closing the underlying service bus connection and unregisters the message handler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

Suggested change
/// Cancels the continuuous reception of messages without closing the underlying service bus connection and unregisters the message handler.
/// Cancels the continuous reception of messages without closing the underlying service bus connection and unregisters the message handler.

@@ -63,6 +63,14 @@ public interface IReceiverClient : IClientEntity
/// <remarks>Enable prefetch to speed up the receive rate.</remarks>
void RegisterMessageHandler(Func<Message, CancellationToken, Task> handler, MessageHandlerOptions messageHandlerOptions);

/// <summary>
/// Cancels the continuuous reception of messages without closing the underlying service bus connection and unregisters the message handler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

Suggested change
/// Cancels the continuuous reception of messages without closing the underlying service bus connection and unregisters the message handler.
/// Cancels the continuous reception of messages without closing the underlying service bus connection and unregisters the message handler.

@@ -899,6 +903,27 @@ public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handle
this.OnMessageHandler(messageHandlerOptions, handler);
}

/// <summary>
/// Cancels the continuuous reception of messages without closing the underlying service bus connection and unregisters the message handler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

Suggested change
/// Cancels the continuuous reception of messages without closing the underlying service bus connection and unregisters the message handler.
/// Cancels the continuous reception of messages without closing the underlying service bus connection and unregisters the message handler.

@@ -445,6 +445,16 @@ public void RegisterMessageHandler(Func<Message, CancellationToken, Task> handle
this.InnerReceiver.RegisterMessageHandler(handler, messageHandlerOptions);
}

/// <summary>
/// Cancels the continuuous reception of messages without closing the underlying service bus connection and unregisters the message handler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

Suggested change
/// Cancels the continuuous reception of messages without closing the underlying service bus connection and unregisters the message handler.
/// Cancels the continuous reception of messages without closing the underlying service bus connection and unregisters the message handler.

@SeanFeldman
Copy link
Contributor

@nemakam given that IReceiverClient is changing, this will be a breaking change.
Should this be indicated somehow?

@nemakam
Copy link
Contributor

nemakam commented Aug 8, 2019

@SeanFeldman , yeah. Maybe we could initially put this into the MessageReceiver class and not the interface and have a separate PR for changes in interface which can be checked in whenever the next major version is to be released.

@rangp , could you please separate out the PR with changes in the IMessageReceiver in a separate PR so that this change need not wait for the next major release.

@pakrym
Copy link
Contributor

pakrym commented Nov 4, 2019

This PR is stale and will be closed, please reopen if there are new updates.

@pakrym pakrym closed this Nov 4, 2019
@DorothySun216
Copy link
Contributor

We published this functionality in 5.0.0: Enable a way to Unregister Message Handler and Session Handler PR 14021

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants