Skip to content

Commit

Permalink
[service bus] Update ServiceBusProcessor sample to demo how to write …
Browse files Browse the repository at this point in the history
…a long-running processor. (#17633)

As part of the work to add in a ServiceBusErrorContext we also want to showcase how users can tease out the various errors that are reported. 

This PR updates the current processor sample to angle more towards how to keep a ServiceBusProcessor running long-term, including handling certain errors that _might_ be fatal (it's always up to the user to choose to terminate the processor).

Fix for #17490
  • Loading branch information
richardpark-msft authored Nov 18, 2020
1 parent ac42bfe commit 3be698a
Showing 1 changed file with 34 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.messaging.servicebus;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

Expand All @@ -24,18 +25,40 @@ public static void main(String[] args) throws InterruptedException {
System.out.println("Received message " + message.getBody().toString());
};

final CountDownLatch countdownLatch = new CountDownLatch(1);

// Consumer that handles any errors that occur when receiving messages
Consumer<ServiceBusErrorContext> errorHandler = errorContext -> {
System.out.println("Error when receiving messages " + errorContext.getException().getMessage());
if (errorContext.getException() instanceof ServiceBusException) {
ServiceBusException serviceBusException = (ServiceBusException) errorContext.getException();
System.out.printf("Error source %s, reason %s\n", serviceBusException.getErrorSource(),
serviceBusException.getReason());
final ServiceBusException serviceBusException = (ServiceBusException) errorContext.getException();
final ServiceBusFailureReason reason = serviceBusException.getReason();

if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED
|| reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND
|| reason == ServiceBusFailureReason.UNAUTHORIZED) {
System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s\n",
reason, serviceBusException.getMessage());
countdownLatch.countDown();
} else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) {
System.out.printf("Message lock lost for message: %s", errorContext.getException().toString());
} else if (reason == ServiceBusFailureReason.SERVICE_BUSY) {
try {
// choosing an arbitrary amount of time to wait.
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
System.out.printf("Error source %s, reason %s, message: %s\n", serviceBusException.getErrorSource(),
reason, errorContext.getException().getMessage());
}
} else {
System.out.printf("Exception: %s\n", errorContext.getException().toString());
}
};

// Create an instance of the processor through the ServiceBusClientBuilder
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
final ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
.connectionString("<< connection-string >>")
.processor()
.queueName("<< queue name >>")
Expand All @@ -46,16 +69,13 @@ public static void main(String[] args) throws InterruptedException {
System.out.println("Starting the processor");
processorClient.start();

TimeUnit.SECONDS.sleep(10);
System.out.println("Stopping the processor");
processorClient.stop();

TimeUnit.SECONDS.sleep(10);
System.out.println("Resuming the processor");
processorClient.start();
System.out.println("Listening for 10 seconds...");
if (countdownLatch.await(10, TimeUnit.SECONDS)) {
System.out.println("Closing processor due to fatal error");
} else {
System.out.println("Closing processor");
}

TimeUnit.SECONDS.sleep(10);
System.out.println("Closing the processor");
processorClient.close();
}
}

0 comments on commit 3be698a

Please sign in to comment.