From b2ddaff78cc9557556aedb4d2de5814dd0cd4271 Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 9 Jun 2021 10:12:30 -0700 Subject: [PATCH] Updating IoTHubConnectionSample. --- .../eventhubs/IoTHubConnectionSample.java | 68 ++++++++++++------- 1 file changed, 42 insertions(+), 26 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/IoTHubConnectionSample.java b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/IoTHubConnectionSample.java index 4a35be9d99fc3..a2c830722bb24 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/IoTHubConnectionSample.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/IoTHubConnectionSample.java @@ -89,37 +89,53 @@ public static void main(String[] args) throws IOException { final AccessToken accessToken = generateSharedAccessSignature(properties.getSharedAccessKeyName(), properties.getSharedAccessKey(), resource, Duration.ofMinutes(10)); - final ProtonJHandler handler = new ProtonJHandler("iot-connection-id", properties.getHostname(), - username, accessToken); - final Reactor reactor = Proton.reactor(handler); - - // reactor.run() is a blocking call, so we'll run it and then stop it when we are completed. - Schedulers.boundedElastic().schedule(() -> reactor.run()); - - // Cache the result of this connection string so we can use it again without doing all the work again. - final Mono eventHubsConnectionString = - getConnectionString(handler, properties, entityPath + "/$management") - .cache() - .doFinally(signal -> { - // After we're done fetching a compatible Event Hubs connection string, stop the reactor. - handler.closeAsync().block(Duration.ofSeconds(10)); - reactor.stop(); - }); - - final Mono producerClientMono = eventHubsConnectionString - .map(connectionString -> { - return new EventHubClientBuilder() - .connectionString(connectionString) - .buildAsyncProducerClient(); - }); + final Reactor reactor = Proton.reactor(); + + // Leverage Mono.usingWhen to dispose of the resources after we finish using them. + final Mono connectionStringMono = Mono.usingWhen( + Mono.fromCallable(() -> { + final ProtonJHandler handler = new ProtonJHandler("iot-connection-id", properties.getHostname(), + username, accessToken); + reactor.setHandler(handler); + + // reactor.run() is a blocking call, so we schedule this on another thread. It'll stop processing items + // when we call reactor.stop() later on. + Schedulers.boundedElastic().schedule(() -> reactor.run()); + + return handler; + }), + handler -> getConnectionString(handler, properties, entityPath + "/$management"), + handler -> { + // After we're done fetching a compatible Event Hubs connection string, stop the reactor. + reactor.stop(); + return handler.closeAsync(); + }) + // Cache the result of this operation so additional downstream subscribers can make use of the value + // instead of us having to create another reactor. + .cache(); // Leverage Mono.usingWhen so the producer client is disposed of after we finish using it. // In production, users would probably cache the Mono's result, reusing the EventHubProducerAsyncClient and // finally closing it. - final Mono runOperation = Mono.usingWhen(producerClientMono, - producer -> producer.getEventHubProperties(), - producer -> Mono.fromRunnable(() -> producer.close())); + final Mono runOperation = Mono.usingWhen( + connectionStringMono.map(connectionString -> { + System.out.println("Acquired Event Hubs compatible connection string."); + return new EventHubClientBuilder() + .connectionString(connectionString) + .buildAsyncProducerClient(); + }), + producer -> { + System.out.println("Created producer client."); + + return producer.getEventHubProperties(); + }, + producer -> Mono.fromRunnable(() -> { + System.out.println("Disposing of producer client."); + producer.close(); + })); + + // Blocking here to turn this into a synchronous operation because we no longer need asynchronous operations. final EventHubProperties eventHubProperties = runOperation.block(); if (eventHubProperties == null) { System.err.println("No properties were retrieved.");