Skip to content

Commit

Permalink
Updating IoTHubConnectionSample.
Browse files Browse the repository at this point in the history
  • Loading branch information
conniey committed Jun 9, 2021
1 parent ee3b95e commit b2ddaff
Showing 1 changed file with 42 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,37 +89,53 @@ public static void main(String[] args) throws IOException {
final AccessToken accessToken = generateSharedAccessSignature(properties.getSharedAccessKeyName(),
properties.getSharedAccessKey(), resource, Duration.ofMinutes(10));

final ProtonJHandler handler = new ProtonJHandler("iot-connection-id", properties.getHostname(),
username, accessToken);
final Reactor reactor = Proton.reactor(handler);

// reactor.run() is a blocking call, so we'll run it and then stop it when we are completed.
Schedulers.boundedElastic().schedule(() -> reactor.run());

// Cache the result of this connection string so we can use it again without doing all the work again.
final Mono<String> eventHubsConnectionString =
getConnectionString(handler, properties, entityPath + "/$management")
.cache()
.doFinally(signal -> {
// After we're done fetching a compatible Event Hubs connection string, stop the reactor.
handler.closeAsync().block(Duration.ofSeconds(10));
reactor.stop();
});

final Mono<EventHubProducerAsyncClient> producerClientMono = eventHubsConnectionString
.map(connectionString -> {
return new EventHubClientBuilder()
.connectionString(connectionString)
.buildAsyncProducerClient();
});
final Reactor reactor = Proton.reactor();

// Leverage Mono.usingWhen to dispose of the resources after we finish using them.
final Mono<String> connectionStringMono = Mono.usingWhen(
Mono.fromCallable(() -> {
final ProtonJHandler handler = new ProtonJHandler("iot-connection-id", properties.getHostname(),
username, accessToken);
reactor.setHandler(handler);

// reactor.run() is a blocking call, so we schedule this on another thread. It'll stop processing items
// when we call reactor.stop() later on.
Schedulers.boundedElastic().schedule(() -> reactor.run());

return handler;
}),
handler -> getConnectionString(handler, properties, entityPath + "/$management"),
handler -> {
// After we're done fetching a compatible Event Hubs connection string, stop the reactor.
reactor.stop();
return handler.closeAsync();
})
// Cache the result of this operation so additional downstream subscribers can make use of the value
// instead of us having to create another reactor.
.cache();

// Leverage Mono.usingWhen so the producer client is disposed of after we finish using it.
// In production, users would probably cache the Mono's result, reusing the EventHubProducerAsyncClient and
// finally closing it.
final Mono<EventHubProperties> runOperation = Mono.usingWhen(producerClientMono,
producer -> producer.getEventHubProperties(),
producer -> Mono.fromRunnable(() -> producer.close()));
final Mono<EventHubProperties> runOperation = Mono.usingWhen(
connectionStringMono.map(connectionString -> {
System.out.println("Acquired Event Hubs compatible connection string.");

return new EventHubClientBuilder()
.connectionString(connectionString)
.buildAsyncProducerClient();
}),
producer -> {
System.out.println("Created producer client.");

return producer.getEventHubProperties();
},
producer -> Mono.fromRunnable(() -> {
System.out.println("Disposing of producer client.");
producer.close();
}));

// Blocking here to turn this into a synchronous operation because we no longer need asynchronous operations.
final EventHubProperties eventHubProperties = runOperation.block();
if (eventHubProperties == null) {
System.err.println("No properties were retrieved.");
Expand Down

0 comments on commit b2ddaff

Please sign in to comment.