Skip to content

Commit

Permalink
Updating IoTHubConnectionSample.
Browse files Browse the repository at this point in the history
  • Loading branch information
conniey committed Jun 9, 2021
1 parent 28914d4 commit ee3b95e
Showing 1 changed file with 29 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

import static java.nio.charset.StandardCharsets.UTF_8;

Expand All @@ -66,7 +67,7 @@
* @see <a href="https://docs.microsoft.com/azure/iot-hub/iot-hub-dev-guide-sas#security-tokens">Generating
* security tokens.</a>
*/
public class IoTHubConnectionSample {
public final class IoTHubConnectionSample {

/**
* Main method for sample.
Expand Down Expand Up @@ -105,16 +106,32 @@ public static void main(String[] args) throws IOException {
reactor.stop();
});

final Mono<EventHubProducerAsyncClient> producerClient = eventHubsConnectionString
final Mono<EventHubProducerAsyncClient> producerClientMono = eventHubsConnectionString
.map(connectionString -> {
return new EventHubClientBuilder()
.connectionString(connectionString)
.buildAsyncProducerClient();
});

final EventHubProducerAsyncClient client = producerClient.block();
System.out.println("Created client: " + client.getFullyQualifiedNamespace());
client.close();
// Leverage Mono.usingWhen so the producer client is disposed of after we finish using it.
// In production, users would probably cache the Mono's result, reusing the EventHubProducerAsyncClient and
// finally closing it.
final Mono<EventHubProperties> runOperation = Mono.usingWhen(producerClientMono,
producer -> producer.getEventHubProperties(),
producer -> Mono.fromRunnable(() -> producer.close()));

final EventHubProperties eventHubProperties = runOperation.block();
if (eventHubProperties == null) {
System.err.println("No properties were retrieved.");
return;
}

final String partitionIds = eventHubProperties.getPartitionIds()
.stream()
.collect(Collectors.joining(", "));

System.out.printf("Event Hub Name: [%s]. Created At: %s. partitionIds: [%s]%n", eventHubProperties.getName(),
eventHubProperties.getCreatedAt(), partitionIds);
}

/**
Expand Down Expand Up @@ -294,7 +311,7 @@ public void onConnectionInit(Event e) {

@Override
public void onConnectionRemoteOpen(Event e) {
System.out.println("ConnectionRemoteOpen: " + e.getConnection().getRemoteState());
System.out.println("Connection state: " + e.getConnection().getRemoteState());
final Connection connection = e.getConnection();
connectionSink.emitValue(connection, Sinks.EmitFailureHandler.FAIL_FAST);
}
Expand Down Expand Up @@ -349,16 +366,16 @@ private static AccessToken generateSharedAccessSignature(String policyName, Stri
final String stringToSign = URLEncoder.encode(resourceUri, UTF_8) + "\n" + expiresOnEpochSeconds;
final byte[] decodedKey = Base64.getDecoder().decode(sharedAccessKey);

Mac sha256HMAC;
SecretKeySpec secretKey;
String HASH_ALGORITHM = "HmacSHA256";
final Mac sha256HMAC;
final SecretKeySpec secretKey;
final String hmacSHA256 = "HmacSHA256";
try {
sha256HMAC = Mac.getInstance(HASH_ALGORITHM);
secretKey = new SecretKeySpec(decodedKey, HASH_ALGORITHM);
sha256HMAC = Mac.getInstance(hmacSHA256);
secretKey = new SecretKeySpec(decodedKey, hmacSHA256);
sha256HMAC.init(secretKey);
} catch (NoSuchAlgorithmException e) {
throw new UnsupportedOperationException(
String.format("Unable to create hashing algorithm '%s'", HASH_ALGORITHM), e);
String.format("Unable to create hashing algorithm '%s'", hmacSHA256), e);
} catch (InvalidKeyException e) {
throw new IllegalArgumentException("'sharedAccessKey' is an invalid value for the hashing algorithm.", e);
}
Expand Down

0 comments on commit ee3b95e

Please sign in to comment.