From 08cf218ddf4554d70874830ca54507919baa4b48 Mon Sep 17 00:00:00 2001 From: Richard Park Date: Tue, 17 Nov 2020 17:31:53 -0800 Subject: [PATCH 1/4] Updating the sample to show a bit more about how to write a long-running processor. --- .../servicebus/ServiceBusProcessorSample.java | 48 +++++++++++++------ 1 file changed, 34 insertions(+), 14 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorSample.java index 7a2018e0ac724..341d1112e9581 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorSample.java @@ -3,6 +3,7 @@ package com.azure.messaging.servicebus; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -24,18 +25,40 @@ public static void main(String[] args) throws InterruptedException { System.out.println("Received message " + message.getBody().toString()); }; + final CountDownLatch countdownLatch = new CountDownLatch(1); + // Consumer that handles any errors that occur when receiving messages Consumer errorHandler = errorContext -> { - System.out.println("Error when receiving messages " + errorContext.getException().getMessage()); if (errorContext.getException() instanceof ServiceBusException) { - ServiceBusException serviceBusException = (ServiceBusException) errorContext.getException(); - System.out.printf("Error source %s, reason %s\n", serviceBusException.getErrorSource(), - serviceBusException.getReason()); + final ServiceBusException serviceBusException = (ServiceBusException) errorContext.getException(); + final ServiceBusFailureReason reason = serviceBusException.getReason(); + + if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED + || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND + || reason == ServiceBusFailureReason.UNAUTHORIZED) { + System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s\n", + reason, serviceBusException.toString()); + countdownLatch.countDown(); + } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) { + System.out.printf("Message lock lost for message: %s", errorContext.getException().toString()); + } else if (reason == ServiceBusFailureReason.SERVICE_BUSY) { + try { + // choosing an arbitrary amount of time to wait. + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } else { + System.out.printf("Error source %s, reason %s, message: %s\n", serviceBusException.getErrorSource(), + reason, errorContext.getException().toString()); + } + } else { + System.out.printf("Exception: %s\n", errorContext.getException().toString()); } }; // Create an instance of the processor through the ServiceBusClientBuilder - ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder() + final ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder() .connectionString("<< connection-string >>") .processor() .queueName("<< queue name >>") @@ -46,16 +69,13 @@ public static void main(String[] args) throws InterruptedException { System.out.println("Starting the processor"); processorClient.start(); - TimeUnit.SECONDS.sleep(10); - System.out.println("Stopping the processor"); - processorClient.stop(); - - TimeUnit.SECONDS.sleep(10); - System.out.println("Resuming the processor"); - processorClient.start(); + System.out.println("Listening for 10 seconds..."); + if (countdownLatch.await(10, TimeUnit.SECONDS)) { + System.out.println("Closing processor due to fatal error"); + } else { + System.out.println("Closing processor"); + } - TimeUnit.SECONDS.sleep(10); - System.out.println("Closing the processor"); processorClient.close(); } } From 477e88c27dc4b98467f55d3477fbf21f3b31ad21 Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Wed, 18 Nov 2020 09:43:53 -0800 Subject: [PATCH 2/4] Update sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorSample.java Co-authored-by: Srikanta <51379715+srnagar@users.noreply.github.com> --- .../azure/messaging/servicebus/ServiceBusProcessorSample.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorSample.java index 341d1112e9581..596123954de04 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorSample.java @@ -50,7 +50,7 @@ public static void main(String[] args) throws InterruptedException { } } else { System.out.printf("Error source %s, reason %s, message: %s\n", serviceBusException.getErrorSource(), - reason, errorContext.getException().toString()); + reason, errorContext.getException().getMessage()); } } else { System.out.printf("Exception: %s\n", errorContext.getException().toString()); From 366faa180947a854dbb3e7bbd14a5d8004e3521d Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Wed, 18 Nov 2020 09:44:02 -0800 Subject: [PATCH 3/4] Update sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorSample.java Co-authored-by: Srikanta <51379715+srnagar@users.noreply.github.com> --- .../azure/messaging/servicebus/ServiceBusProcessorSample.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorSample.java b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorSample.java index 596123954de04..afdaaa7653d74 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorSample.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ServiceBusProcessorSample.java @@ -37,7 +37,7 @@ public static void main(String[] args) throws InterruptedException { || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND || reason == ServiceBusFailureReason.UNAUTHORIZED) { System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s\n", - reason, serviceBusException.toString()); + reason, serviceBusException.getMessage()); countdownLatch.countDown(); } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) { System.out.printf("Message lock lost for message: %s", errorContext.getException().toString()); From 4b1a7a2a9632fad36ecc3384d84b63af8068ba78 Mon Sep 17 00:00:00 2001 From: Richard Park Date: Wed, 18 Nov 2020 14:04:07 -0800 Subject: [PATCH 4/4] kick CI