Skip to content

Commit

Permalink
Set v2 as default for sync receiver, ensure sync receiver does lock r…
Browse files Browse the repository at this point in the history
…enewal and change live test to run on v2 clients by default (Azure#40511)

* Set v2 as default for sync receiver, ensure sync receiver does lock renewal and change live test to run on v2 clients by default

* attempt to use fresh entities for ServiceBusReceiverAsyncClientIntegrationTest::autoRenewLockOnReceiveMessage
  • Loading branch information
anuchandy authored Jun 8, 2024
1 parent 6bf439c commit de95691
Show file tree
Hide file tree
Showing 9 changed files with 19 additions and 13 deletions.
2 changes: 2 additions & 0 deletions sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Features Added

- Setting the v2 stack as the default for "Synchronous Receiver Client".

### Breaking Changes

### Bugs Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class LockRenewalOperation implements AutoCloseable, Disposable {
cancellationProcessor.onComplete();
}, () -> {
if (status.compareAndSet(LockRenewalStatus.RUNNING, LockRenewalStatus.COMPLETE)) {
logger.verbose("Renewing session lock task completed.");
logger.verbose("Renewing lock task completed.");
}

cancellationProcessor.onComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1140,7 +1140,7 @@ private static final class V2StackSupport {
private static final String NON_SESSION_SYNC_RECEIVE_KEY = "com.azure.messaging.servicebus.nonSession.syncReceive.v2";
private static final ConfigurationProperty<Boolean> NON_SESSION_SYNC_RECEIVE_PROPERTY = ConfigurationPropertyBuilder.ofBoolean(NON_SESSION_SYNC_RECEIVE_KEY)
.environmentVariableName(NON_SESSION_SYNC_RECEIVE_KEY)
.defaultValue(false) // 'Non-Session' Sync Receiver Client is not on the new v2 stack by default.
.defaultValue(true) // 'Non-Session' Sync Receiver Client is on the new v2 stack by default.
.shared(true)
.build();
private final AtomicReference<Boolean> nonSessionSyncReceiveFlag = new AtomicReference<>();
Expand Down Expand Up @@ -1172,7 +1172,7 @@ private static final class V2StackSupport {
private static final String SESSION_SYNC_RECEIVE_KEY = "com.azure.messaging.servicebus.session.syncReceive.v2";
private static final ConfigurationProperty<Boolean> SESSION_SYNC_RECEIVE_PROPERTY = ConfigurationPropertyBuilder.ofBoolean(SESSION_SYNC_RECEIVE_KEY)
.environmentVariableName(SESSION_SYNC_RECEIVE_KEY)
.defaultValue(false) // 'Session' Sync Receiver Client is not on the new v2 stack by default
.defaultValue(true) // 'Session' Sync Receiver Client is on the new v2 stack by default
.shared(true)
.build();
private final AtomicReference<Boolean> sessionSyncReceiveFlag = new AtomicReference<>();
Expand All @@ -1193,13 +1193,13 @@ boolean isNonSessionAsyncReceiveEnabled(Configuration configuration) {
}

/**
* Non-Session SyncClient is not on the v2 stack by default, but the application may opt into the v2 stack.
* Non-Session SyncClient is on the v2 stack by default, but the application may opt out.
*
* @param configuration the client configuration.
* @return true if Sync receive should use the v2 stack.
*/
boolean isNonSessionSyncReceiveEnabled(Configuration configuration) {
return isOptedIn(configuration, NON_SESSION_SYNC_RECEIVE_PROPERTY, nonSessionSyncReceiveFlag);
return !isOptedOut(configuration, NON_SESSION_SYNC_RECEIVE_PROPERTY, nonSessionSyncReceiveFlag);
}

/**
Expand Down Expand Up @@ -1233,13 +1233,13 @@ boolean isSessionReactorAsyncReceiveEnabled(Configuration configuration) {
}

/**
* Session SyncClient is not on the v2 stack by default, but the application may opt into the v2 stack.
* Session SyncClient is on the v2 stack by default, but the application may opt out.
*
* @param configuration the client configuration.
* @return true if session Sync receive should use the v2 stack.
*/
boolean isSessionSyncReceiveEnabled(Configuration configuration) {
return isOptedIn(configuration, SESSION_SYNC_RECEIVE_PROPERTY, sessionSyncReceiveFlag);
return !isOptedOut(configuration, SESSION_SYNC_RECEIVE_PROPERTY, sessionSyncReceiveFlag);
}

// Obtain the shared connection-cache based on the V2-Stack.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1932,7 +1932,8 @@ private Flux<ServiceBusReceivedMessage> nonSessionReactiveReceiveV2() {

Flux<ServiceBusReceivedMessage> nonSessionSyncReceiveV2() {
assert isOnV2 && !isSessionEnabled;
return getOrCreateConsumer().receive();
final Flux<ServiceBusReceivedMessage> messages = getOrCreateConsumer().receive();
return receiverOptions.isAutoLockRenewEnabled() ? messages.doOnNext(this::beginLockRenewal) : messages;
}

private Flux<ServiceBusReceivedMessage> sessionReactiveReceiveV2() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ protected ServiceBusClientBuilder getBuilder(boolean useCredentials) {
.clientOptions(optionsWithTracing)
.transportType(AmqpTransportType.AMQP)
.scheduler(scheduler)
.configuration(v1OrV2(false)); // // Disabling v2 to begin with.
.configuration(v1OrV2(true));

logger.info("Getting Builder using credentials : [{}] ", useCredentials);
if (useCredentials) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ protected ServiceBusClientBuilder getBuilder(boolean useCredentials, boolean sha
.retryOptions(RETRY_OPTIONS)
.transportType(AmqpTransportType.AMQP)
.scheduler(scheduler)
.configuration(v1OrV2(false)); // Disabling v2 to begin with.
.configuration(v1OrV2(true));
}

private ServiceBusSenderAsyncClient createSender(MessagingEntityType entityType, int entityIndex, boolean isSessionEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,15 +902,15 @@ void autoRenewLockOnReceiveMessage(MessagingEntityType entityType, boolean isSes
// Arrange
final AtomicInteger lockRenewCount = new AtomicInteger();

setSender(entityType, TestUtils.USE_CASE_DEFAULT, isSessionEnabled);
setSender(entityType, TestUtils.USE_CASE_AUTO_RENEW_RECEIVE, isSessionEnabled);

final String messageId = UUID.randomUUID().toString();
final ServiceBusMessage message = getMessage(messageId, isSessionEnabled);

// Send the message to verify.
sendMessage(message).block();

setReceiver(entityType, TestUtils.USE_CASE_DEFAULT, isSessionEnabled);
setReceiver(entityType, TestUtils.USE_CASE_AUTO_RENEW_RECEIVE, isSessionEnabled);

// Act & Assert
StepVerifier.create(receiver.receiveMessages().flatMap(received -> {
Expand Down Expand Up @@ -1262,6 +1262,7 @@ private void testRenewLock(MessagingEntityType entityType, Duration lockRenewalD
}

@Test
@Disabled("V2 low level async-receiver impl is missing a check to error if reactive app subscribed more than once.")
void receiveTwice() {
setSenderAndReceiver(MessagingEntityType.QUEUE, TestUtils.USE_CASE_DEFAULT, false);
final String messageId = UUID.randomUUID().toString();
Expand All @@ -1284,6 +1285,7 @@ void receiveTwice() {
}

@Test
@Disabled("V2 low level async-receiver impl is missing a check to error if reactive app subscribed more than once.")
void receiveActiveSubscription() {
setSenderAndReceiver(MessagingEntityType.QUEUE, TestUtils.USE_CASE_DEFAULT, false);
final String messageId = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class TestUtils {
static final int USE_CASE_MULTIPLE_SESSIONS1 = 29;
static final int USE_CASE_MULTIPLE_SESSIONS2 = 30;
static final int USE_CASE_MULTIPLE_SESSIONS3 = 31;
static final int USE_CASE_AUTO_RENEW_RECEIVE = 32;
static final Configuration GLOBAL_CONFIGURATION = Configuration.getGlobalConfiguration();

// An application property key to identify where in the stream this message was created.
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/test-resources.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
"namespaceName": "[concat('sb-java-', parameters('baseName'))]",
"queueName": "queue",
"queueSessionName": "queue-session",
"numberOfInstances": 32,
"numberOfInstances": 33,
"subscriptionName": "subscription",
"subscriptionSessionName": "subscription-session",
"serviceBusDataOwnerRoleId": "[concat('/subscriptions/', subscription().subscriptionId, '/providers/Microsoft.Authorization/roleDefinitions/090c5cfd-751d-490a-894a-3ce6f1109419')]",
Expand Down

0 comments on commit de95691

Please sign in to comment.