-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Service bus adding max auto lock renew duration in processor client #20317
Conversation
ReceiverOptions getReceiverOptions() { | ||
ReceiverOptions receiverOptions = null; | ||
final ServiceBusReceiverAsyncClient client = asyncClient.get(); | ||
if (client != null) { | ||
receiverOptions = client.getReceiverOptions(); | ||
} | ||
return receiverOptions; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think this method is required. This is added just to test if the value is set. Instead, we should test that the maxLockRenewDuration is honored if it is set in the builder.
...ssaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java
Outdated
Show resolved
Hide resolved
...ssaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
// Assert & Act | ||
System.out.println("Starting the processor"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove all System.out.print* statements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some output help to debug if any issue.
I have changed them to use logger.
ServiceBusReceivedMessage message = context.getMessage(); | ||
if (message.getMessageId().equals(expectedMessageId)) { | ||
System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(), | ||
message.getSequenceNumber(), message.getBody()); | ||
countDownLatch.countDown(); | ||
} else { | ||
System.out.printf("Received message, message id did not match. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(), | ||
message.getSequenceNumber(), message.getBody()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this ensure that the lock is renewed until maxAutoLockRenewalDuration
and stops after? This will count down the latch immediately after the message is received resulting in the test passing without verifying the lock duration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this change, the default message lock is set to 5 minute. And user was not able to overwrite it.
Now user set it to 15 seconds.
The message has to be received twice, first time we lock the message and we do not complete it, the message goes back to the queue and received again. The count down has to count twice for same message id.
This ensure that we honor, user's maxAutoLockRenewalDuration instead of 5 minutes.
The fact that lock is released and message come back, that shows that the lock renew operation stops .
...ing-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageUtils.java
Outdated
Show resolved
Hide resolved
MessagingEntityType entityType, int entityIndex, boolean sharedConnection, AmqpRetryOptions amqpRetryOptions) { | ||
|
||
ServiceBusClientBuilder builder = getBuilder(useCredentials, sharedConnection); | ||
builder.retryOptions(amqpRetryOptions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to override the default retry options?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we need to set this test case. The default value is 1 minute. For this test setting it to 30 seconds.
void receiveMessage(MessagingEntityType entityType, boolean isSessionEnabled) throws InterruptedException { | ||
// Arrange | ||
// The message is locked for this duration at a time. | ||
final int lockTimeoutDurationSeconds = 15; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is set as the tryTimeoutDuration. Why do we need to configure this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'tryTimeoutDuration' is used as 'inactivity time' : means if there is no activity on link (no message arriving in this time), It will close the link. The is defaulted to 60 seconds for my test, so I am setting it to 30 seconds.
if (isAutoCompleteEnabled) { | ||
lockCleanup.accept(messageContext); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to check isAutoCompleteEnabled
to clean up the auto-renew lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isAutoCompleteEnabled : If user has disabled autoComplete, we need lock renew to continue , we do not want to clean up in this case. When lock renew expire, it will be cleaned at that time.
754aa91
into
Azure:feature/servicebus-support-amqp-data-types-17614
* Service bus adding max auto lock renew duration in processor client (#20317) * Added API in builder for "adding max auto lock renew duration in processor client" * Service Bus - Merging previously approved -cross transaction feature - into feature branch - preparation for April release. (#20356) * ServiceBus - cross entity transaction feature (#19863) New Feature: Cross entity transaction API * Service bus : Amqp Types SEQUENCE and VALUE (#20285) * Adding feature AMQP SEQUENCE/VALUE data type implementation * Merge master into branch * beta version change * getting ready to release beta * Updating beta version for event hubs * Fixing unit test case * Removing unwanted import * Fixing test subscriber name (#20666) Co-authored-by: hemanttanwar <hetanwar@users.noreply.github.com> * SB april beta release updates (#20672) * SB april beta release updates * Increment package version after release of com.azure azure-messaging-servicebus (#20690) * Use multiple subscribers for `maxConcurrentCalls` in ServiceBusProcessorClient (#21085) * Merge feature branch into release branch * Updated core-amqp version to release as beta
…22426) * Service bus adding max auto lock renew duration in processor client (#20317) * Added API in builder for "adding max auto lock renew duration in processor client" * Service Bus - Merging previously approved -cross transaction feature - into feature branch - preparation for April release. (#20356) * ServiceBus - cross entity transaction feature (#19863) New Feature: Cross entity transaction API * Service bus : Amqp Types SEQUENCE and VALUE (#20285) * Adding feature AMQP SEQUENCE/VALUE data type implementation * added subQueue API
We missed this API earlier in processor client, so adding this.
Also adding live test for Processor client to send and receive message.
Links : [FEATURE REQ] Provide the ability to configure maxAutoRenewDuration for ServiceBusProcessorClient #19929
PS: April beta release will be from branch "feature/servicebus-support-amqp-data-types-17614" : This I am merging this feature in this branch.