Skip to content

Commit

Permalink
[service bus] Use ServiceBusException rather than AmqpException and r…
Browse files Browse the repository at this point in the history
…ename ReceiveMode to ServiceBusReceiveMode (#17601)

ServiceBusException is basically a friendly envelope around an AmqpException. It's primary purpose is to give the user something simple they can try/catch that has a 'reason' code so they can programatically react to certain kinds of failures.

Also, renaming ReceiveMode to ServiceBusReceiveMode.

Fixes #17500 (exception type), #17555 (receive mode)
  • Loading branch information
richardpark-msft authored Nov 18, 2020
1 parent 7def120 commit 66eba6c
Show file tree
Hide file tree
Showing 55 changed files with 692 additions and 345 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.models.ReceiveMode;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
Expand All @@ -26,7 +26,7 @@ public class ReceiveAndDeleteMessageTest extends ServiceTest<ServiceBusStressOpt
* @param options to set performance test options.
*/
public ReceiveAndDeleteMessageTest(ServiceBusStressOptions options) {
super(options, ReceiveMode.RECEIVE_AND_DELETE);
super(options, ServiceBusReceiveMode.RECEIVE_AND_DELETE);
this.options = options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.models.ReceiveMode;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
Expand All @@ -27,7 +27,7 @@ public class ReceiveAndLockMessageTest extends ServiceTest<ServiceBusStressOptio
* @param options to set performance test options.
*/
public ReceiveAndLockMessageTest(ServiceBusStressOptions options) {
super(options, ReceiveMode.PEEK_LOCK);
super(options, ServiceBusReceiveMode.PEEK_LOCK);
this.options = options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package com.azure.messaging.servicebus.perf;

import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.models.ReceiveMode;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import reactor.core.publisher.Mono;

import java.util.UUID;
Expand All @@ -20,7 +20,7 @@ public class SendMessageTest extends ServiceTest<ServiceBusStressOptions> {
* @param options to set performance test options.
*/
public SendMessageTest(ServiceBusStressOptions options) {
super(options, ReceiveMode.PEEK_LOCK);
super(options, ServiceBusReceiveMode.PEEK_LOCK);
String messageId = UUID.randomUUID().toString();
message = new ServiceBusMessage(CONTENTS);
message.setMessageId(messageId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package com.azure.messaging.servicebus.perf;

import com.azure.messaging.servicebus.ServiceBusMessage;
import com.azure.messaging.servicebus.models.ReceiveMode;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
Expand All @@ -22,7 +22,7 @@ public class SendMessagesTest extends ServiceTest<ServiceBusStressOptions> {
* @param options to set performance test options.
*/
public SendMessagesTest(ServiceBusStressOptions options) {
super(options, ReceiveMode.PEEK_LOCK);
super(options, ServiceBusReceiveMode.PEEK_LOCK);
messages = new ArrayList<>();
for (int i = 0; i < options.getMessagesToSend(); ++i) {
ServiceBusMessage message = new ServiceBusMessage(CONTENTS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import com.azure.messaging.servicebus.ServiceBusReceiverClient;
import com.azure.messaging.servicebus.ServiceBusSenderAsyncClient;
import com.azure.messaging.servicebus.ServiceBusSenderClient;
import com.azure.messaging.servicebus.models.ReceiveMode;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import com.azure.perf.test.core.PerfStressOptions;
import com.azure.perf.test.core.PerfStressTest;

Expand Down Expand Up @@ -44,7 +44,7 @@ abstract class ServiceTest<TOptions extends PerfStressOptions> extends PerfStres
* @param receiveMode to receive messages.
* @throws IllegalArgumentException if environment variable not being available.
*/
ServiceTest(TOptions options, ReceiveMode receiveMode) {
ServiceTest(TOptions options, ServiceBusReceiveMode receiveMode) {
super(options);
String connectionString = System.getenv(AZURE_SERVICE_BUS_CONNECTION_STRING);
if (CoreUtils.isNullOrEmpty(connectionString)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,27 @@

package com.azure.messaging.servicebus;

import com.azure.messaging.servicebus.models.ReceiveMode;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;

import java.time.Duration;

/**
* Options set when creating a service bus receiver.
*/
class ReceiverOptions {
private final ReceiveMode receiveMode;
private final ServiceBusReceiveMode receiveMode;
private final int prefetchCount;
private final boolean enableAutoComplete;
private final String sessionId;
private final Integer maxConcurrentSessions;
private final Duration maxLockRenewDuration;

ReceiverOptions(ReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
ReceiverOptions(ServiceBusReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
boolean enableAutoComplete) {
this(receiveMode, prefetchCount, maxLockRenewDuration, enableAutoComplete, null, null);
}

ReceiverOptions(ReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
ReceiverOptions(ServiceBusReceiveMode receiveMode, int prefetchCount, Duration maxLockRenewDuration,
boolean enableAutoComplete, String sessionId, Integer maxConcurrentSessions) {
this.receiveMode = receiveMode;
this.prefetchCount = prefetchCount;
Expand All @@ -46,7 +46,7 @@ Duration getMaxLockRenewDuration() {
*
* @return the receive mode for the message.
*/
ReceiveMode getReceiveMode() {
ServiceBusReceiveMode getReceiveMode() {
return receiveMode;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import com.azure.messaging.servicebus.implementation.ServiceBusReactorAmqpConnection;
import com.azure.messaging.servicebus.implementation.ServiceBusSharedKeyCredential;
import com.azure.messaging.servicebus.implementation.models.ServiceBusProcessorClientOptions;
import com.azure.messaging.servicebus.models.ReceiveMode;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import com.azure.messaging.servicebus.models.SubQueue;
import org.apache.qpid.proton.engine.SslDomain;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -642,7 +642,7 @@ public final class ServiceBusSessionProcessorClientBuilder {
private final ServiceBusProcessorClientOptions processorClientOptions;
private final ServiceBusSessionReceiverClientBuilder sessionReceiverClientBuilder;
private Consumer<ServiceBusReceivedMessageContext> processMessage;
private Consumer<Throwable> processError;
private Consumer<ServiceBusErrorContext> processError;

private ServiceBusSessionProcessorClientBuilder() {
sessionReceiverClientBuilder = new ServiceBusSessionReceiverClientBuilder();
Expand All @@ -669,8 +669,8 @@ public ServiceBusSessionProcessorClientBuilder maxConcurrentSessions(int maxConc
}

/**
* Sets the prefetch count of the processor. For both {@link ReceiveMode#PEEK_LOCK PEEK_LOCK} and {@link
* ReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1.
* Sets the prefetch count of the processor. For both {@link ServiceBusReceiveMode#PEEK_LOCK PEEK_LOCK} and
* {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1.
*
* Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when
* and before the application starts the processor.
Expand Down Expand Up @@ -702,7 +702,7 @@ public ServiceBusSessionProcessorClientBuilder queueName(String queueName) {
*
* @return The modified {@link ServiceBusSessionProcessorClientBuilder} object.
*/
public ServiceBusSessionProcessorClientBuilder receiveMode(ReceiveMode receiveMode) {
public ServiceBusSessionProcessorClientBuilder receiveMode(ServiceBusReceiveMode receiveMode) {
sessionReceiverClientBuilder.receiveMode(receiveMode);
return this;
}
Expand Down Expand Up @@ -750,7 +750,8 @@ public ServiceBusSessionProcessorClientBuilder processMessage(
*
* @return The updated {@link ServiceBusProcessorClientBuilder} object
*/
public ServiceBusSessionProcessorClientBuilder processError(Consumer<Throwable> processError) {
public ServiceBusSessionProcessorClientBuilder processError(
Consumer<ServiceBusErrorContext> processError) {
this.processError = processError;
return this;
}
Expand Down Expand Up @@ -820,7 +821,7 @@ public final class ServiceBusSessionReceiverClientBuilder {
private Integer maxConcurrentSessions = null;
private int prefetchCount = DEFAULT_PREFETCH_COUNT;
private String queueName;
private ReceiveMode receiveMode = ReceiveMode.PEEK_LOCK;
private ServiceBusReceiveMode receiveMode = ServiceBusReceiveMode.PEEK_LOCK;
private String subscriptionName;
private String topicName;
private Duration maxAutoLockRenewDuration = MAX_LOCK_RENEW_DEFAULT_DURATION;
Expand All @@ -843,8 +844,8 @@ public ServiceBusSessionReceiverClientBuilder disableAutoComplete() {

/**
* Sets the amount of time to continue auto-renewing the session lock. Setting {@link Duration#ZERO} or
* {@code null} disables auto-renewal. For {@link ReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} mode,
* auto-renewal is disabled.
* {@code null} disables auto-renewal. For {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE}
* mode, auto-renewal is disabled.
*
* @param maxAutoLockRenewDuration the amount of time to continue auto-renewing the session lock.
* {@link Duration#ZERO} or {@code null} indicates that auto-renewal is disabled.
Expand Down Expand Up @@ -877,8 +878,8 @@ ServiceBusSessionReceiverClientBuilder maxConcurrentSessions(int maxConcurrentSe
}

/**
* Sets the prefetch count of the receiver. For both {@link ReceiveMode#PEEK_LOCK PEEK_LOCK} and {@link
* ReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1.
* Sets the prefetch count of the receiver. For both {@link ServiceBusReceiveMode#PEEK_LOCK PEEK_LOCK} and
* {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1.
*
* Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when
* and before the application asks for one using {@link ServiceBusReceiverAsyncClient#receiveMessages()}.
Expand Down Expand Up @@ -915,7 +916,7 @@ public ServiceBusSessionReceiverClientBuilder queueName(String queueName) {
*
* @return The modified {@link ServiceBusSessionReceiverClientBuilder} object.
*/
public ServiceBusSessionReceiverClientBuilder receiveMode(ReceiveMode receiveMode) {
public ServiceBusSessionReceiverClientBuilder receiveMode(ServiceBusReceiveMode receiveMode) {
this.receiveMode = receiveMode;
return this;
}
Expand Down Expand Up @@ -992,12 +993,12 @@ private ServiceBusReceiverAsyncClient buildAsyncClientForProcessor(boolean isAut
logger.warning(
"'enableAutoComplete' is not supported in synchronous client except through callback receive.");
enableAutoComplete = false;
} else if (enableAutoComplete && receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
} else if (enableAutoComplete && receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) {
logger.warning("'enableAutoComplete' is not needed in for RECEIVE_AND_DELETE mode.");
enableAutoComplete = false;
}

if (receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
if (receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) {
maxAutoLockRenewDuration = Duration.ZERO;
}

Expand Down Expand Up @@ -1059,12 +1060,12 @@ private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoComp
logger.warning(
"'enableAutoComplete' is not supported in synchronous client except through callback receive.");
enableAutoComplete = false;
} else if (enableAutoComplete && receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
} else if (enableAutoComplete && receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) {
logger.warning("'enableAutoComplete' is not needed in for RECEIVE_AND_DELETE mode.");
enableAutoComplete = false;
}

if (receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
if (receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) {
maxAutoLockRenewDuration = Duration.ZERO;
}

Expand Down Expand Up @@ -1095,16 +1096,16 @@ public final class ServiceBusProcessorClientBuilder {
private final ServiceBusReceiverClientBuilder serviceBusReceiverClientBuilder;
private final ServiceBusProcessorClientOptions processorClientOptions;
private Consumer<ServiceBusReceivedMessageContext> processMessage;
private Consumer<Throwable> processError;
private Consumer<ServiceBusErrorContext> processError;

private ServiceBusProcessorClientBuilder() {
serviceBusReceiverClientBuilder = new ServiceBusReceiverClientBuilder();
processorClientOptions = new ServiceBusProcessorClientOptions().setMaxConcurrentCalls(1);
}

/**
* Sets the prefetch count of the processor. For both {@link ReceiveMode#PEEK_LOCK PEEK_LOCK} and {@link
* ReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1.
* Sets the prefetch count of the processor. For both {@link ServiceBusReceiveMode#PEEK_LOCK PEEK_LOCK} and
* {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1.
*
* Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when
* and before the application starts the processor.
Expand Down Expand Up @@ -1136,7 +1137,7 @@ public ServiceBusProcessorClientBuilder queueName(String queueName) {
*
* @return The modified {@link ServiceBusProcessorClientBuilder} object.
*/
public ServiceBusProcessorClientBuilder receiveMode(ReceiveMode receiveMode) {
public ServiceBusProcessorClientBuilder receiveMode(ServiceBusReceiveMode receiveMode) {
serviceBusReceiverClientBuilder.receiveMode(receiveMode);
return this;
}
Expand Down Expand Up @@ -1184,7 +1185,7 @@ public ServiceBusProcessorClientBuilder processMessage(
*
* @return The updated {@link ServiceBusProcessorClientBuilder} object
*/
public ServiceBusProcessorClientBuilder processError(Consumer<Throwable> processError) {
public ServiceBusProcessorClientBuilder processError(Consumer<ServiceBusErrorContext> processError) {
this.processError = processError;
return this;
}
Expand Down Expand Up @@ -1254,7 +1255,7 @@ public final class ServiceBusReceiverClientBuilder {
private int prefetchCount = DEFAULT_PREFETCH_COUNT;
private String queueName;
private SubQueue subQueue;
private ReceiveMode receiveMode = ReceiveMode.PEEK_LOCK;
private ServiceBusReceiveMode receiveMode = ServiceBusReceiveMode.PEEK_LOCK;
private String subscriptionName;
private String topicName;
private Duration maxAutoLockRenewDuration = MAX_LOCK_RENEW_DEFAULT_DURATION;
Expand All @@ -1277,8 +1278,8 @@ public ServiceBusReceiverClientBuilder disableAutoComplete() {

/**
* Sets the amount of time to continue auto-renewing the lock. Setting {@link Duration#ZERO} or {@code null}
* disables auto-renewal. For {@link ReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} mode, auto-renewal is
* disabled.
* disables auto-renewal. For {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} mode,
* auto-renewal is disabled.
*
* @param maxAutoLockRenewDuration the amount of time to continue auto-renewing the lock. {@link Duration#ZERO}
* or {@code null} indicates that auto-renewal is disabled.
Expand All @@ -1293,8 +1294,8 @@ public ServiceBusReceiverClientBuilder maxAutoLockRenewDuration(Duration maxAuto
}

/**
* Sets the prefetch count of the receiver. For both {@link ReceiveMode#PEEK_LOCK PEEK_LOCK} and {@link
* ReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1.
* Sets the prefetch count of the receiver. For both {@link ServiceBusReceiveMode#PEEK_LOCK PEEK_LOCK} and
* {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1.
*
* Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when
* and before the application asks for one using {@link ServiceBusReceiverAsyncClient#receiveMessages()}.
Expand Down Expand Up @@ -1331,7 +1332,7 @@ public ServiceBusReceiverClientBuilder queueName(String queueName) {
*
* @return The modified {@link ServiceBusReceiverClientBuilder} object.
*/
public ServiceBusReceiverClientBuilder receiveMode(ReceiveMode receiveMode) {
public ServiceBusReceiverClientBuilder receiveMode(ServiceBusReceiveMode receiveMode) {
this.receiveMode = receiveMode;
return this;
}
Expand Down Expand Up @@ -1421,12 +1422,12 @@ ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) {
logger.warning(
"'enableAutoComplete' is not supported in synchronous client except through callback receive.");
enableAutoComplete = false;
} else if (enableAutoComplete && receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
} else if (enableAutoComplete && receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) {
logger.warning("'enableAutoComplete' is not needed in for RECEIVE_AND_DELETE mode.");
enableAutoComplete = false;
}

if (receiveMode == ReceiveMode.RECEIVE_AND_DELETE) {
if (receiveMode == ServiceBusReceiveMode.RECEIVE_AND_DELETE) {
maxAutoLockRenewDuration = Duration.ZERO;
}

Expand Down
Loading

0 comments on commit 66eba6c

Please sign in to comment.