Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement an Event Hubs Shared Access Key Credential #21228

Merged
merged 14 commits into from
Jun 9, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.azure.core.annotation.ServiceClientBuilder;
import com.azure.core.annotation.ServiceClientProtocol;
import com.azure.core.credential.TokenCredential;
import com.azure.core.credential.AzureNamedKeyCredential;
import com.azure.core.credential.AzureSasCredential;
import com.azure.core.exception.AzureException;
import com.azure.core.util.ClientOptions;
import com.azure.core.util.Configuration;
Expand Down Expand Up @@ -340,6 +342,77 @@ public EventHubClientBuilder credential(String fullyQualifiedNamespace, String e
return this;
}

/**
* Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.
*
* @param fullyQualifiedNamespace The fully qualified name for the Event Hubs namespace. This is likely to be
* similar to <strong>{@literal "{your-namespace}.servicebus.windows.net}"</strong>.
* @param eventHubName The name of the Event Hub to connect the client to.
* @param credential The shared access name and key credential to use for authorization.
* Access controls may be specified by the Event Hubs namespace or the requested Event Hub,
* depending on Azure configuration.
*
* @return The updated {@link EventHubClientBuilder} object.
* @throws IllegalArgumentException if {@code fullyQualifiedNamespace} or {@code eventHubName} is an empty
* string.
* @throws NullPointerException if {@code fullyQualifiedNamespace}, {@code eventHubName}, {@code credentials} is
* null.
*/
public EventHubClientBuilder credential(String fullyQualifiedNamespace, String eventHubName,
AzureNamedKeyCredential credential) {

this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace,
"'fullyQualifiedNamespace' cannot be null.");
this.eventHubName = Objects.requireNonNull(eventHubName, "'eventHubName' cannot be null.");

if (CoreUtils.isNullOrEmpty(fullyQualifiedNamespace)) {
throw logger.logExceptionAsError(new IllegalArgumentException("'host' cannot be an empty string."));
} else if (CoreUtils.isNullOrEmpty(eventHubName)) {
throw logger.logExceptionAsError(new IllegalArgumentException("'eventHubName' cannot be an empty string."));
}

Objects.requireNonNull(credential, "'credential' cannot be null.");
this.credentials = new EventHubSharedKeyCredential(credential.getAzureNamedKey().getName(),
credential.getAzureNamedKey().getKey(), ClientConstants.TOKEN_VALIDITY);

return this;
}

/**
* Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.
*
* @param fullyQualifiedNamespace The fully qualified name for the Event Hubs namespace. This is likely to be
* similar to <strong>{@literal "{your-namespace}.servicebus.windows.net}"</strong>.
* @param eventHubName The name of the Event Hub to connect the client to.
* @param credential The shared access signature credential to use for authorization.
* Access controls may be specified by the Event Hubs namespace or the requested Event Hub,
* depending on Azure configuration.
*
* @return The updated {@link EventHubClientBuilder} object.
* @throws IllegalArgumentException if {@code fullyQualifiedNamespace} or {@code eventHubName} is an empty
* string.
* @throws NullPointerException if {@code fullyQualifiedNamespace}, {@code eventHubName}, {@code credentials} is
* null.
*/
public EventHubClientBuilder credential(String fullyQualifiedNamespace, String eventHubName,
AzureSasCredential credential) {

this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace,
"'fullyQualifiedNamespace' cannot be null.");
this.eventHubName = Objects.requireNonNull(eventHubName, "'eventHubName' cannot be null.");

if (CoreUtils.isNullOrEmpty(fullyQualifiedNamespace)) {
throw logger.logExceptionAsError(new IllegalArgumentException("'host' cannot be an empty string."));
} else if (CoreUtils.isNullOrEmpty(eventHubName)) {
throw logger.logExceptionAsError(new IllegalArgumentException("'eventHubName' cannot be an empty string."));
}

Objects.requireNonNull(credential, "'credential' cannot be null.");
this.credentials = new EventHubSharedKeyCredential(credential.getSignature());

return this;
}

/**
* Sets the proxy configuration to use for {@link EventHubAsyncClient}. When a proxy is configured, {@link
* AmqpTransportType#AMQP_WEB_SOCKETS} must be used for the transport type.
Expand Down Expand Up @@ -484,7 +557,7 @@ public EventHubConsumerClient buildConsumerClient() {
* either {@link #connectionString(String)} or {@link #credential(String, String, TokenCredential)}. Or, if a
* proxy is specified but the transport type is not {@link AmqpTransportType#AMQP_WEB_SOCKETS web sockets}.
*/
public EventHubProducerAsyncClient buildAsyncProducerClient() {
public EventHubProducerClient buildAsyncProducerClient() {
v-hongli1 marked this conversation as resolved.
Show resolved Hide resolved
return buildAsyncClient().createProducer();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.amqp.ProxyAuthenticationType;
import com.azure.core.amqp.ProxyOptions;
import com.azure.core.amqp.implementation.ConnectionStringProperties;
import com.azure.core.util.Configuration;
import com.azure.core.credential.AzureNamedKeyCredential;
import com.azure.core.credential.AzureSasCredential;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.ClientConstants;
import java.util.stream.Stream;
import org.junit.jupiter.api.Assertions;
Expand All @@ -21,10 +25,14 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import reactor.test.StepVerifier;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class EventHubClientBuilderTest {
import static java.nio.charset.StandardCharsets.UTF_8;

public class EventHubClientBuilderTest extends IntegrationTestBase {
private static final String NAMESPACE_NAME = "dummyNamespaceName";
private static final String DEFAULT_DOMAIN_NAME = "servicebus.windows.net/";

Expand All @@ -40,6 +48,35 @@ public class EventHubClientBuilderTest {
ENDPOINT, SHARED_ACCESS_KEY_NAME, SHARED_ACCESS_KEY, EVENT_HUB_NAME);
private static final Proxy PROXY_ADDRESS = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(PROXY_HOST, Integer.parseInt(PROXY_PORT)));

private static final String TEST_CONTENTS = "SSLorem ipsum dolor sit amet, consectetur adipiscing elit. Donec "
+ "vehicula posuere lobortis. Aliquam finibus volutpat dolor, faucibus pellentesque ipsum bibendum vitae. "
+ "Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos. Ut sit amet "
+ "urna hendrerit, dapibus justo a, sodales justo. Mauris finibus augue id pulvinar congue. Nam maximus "
+ "luctus ipsum, at commodo ligula euismod ac. Phasellus vitae lacus sit amet diam porta placerat. \n"
+ "Ut sodales efficitur sapien ut posuere. Morbi sed tellus est. Proin eu erat purus. Proin massa nunc, "
+ "condimentum id iaculis dignissim, consectetur et odio. Cras suscipit sem eu libero aliquam tincidunt. "
+ "Nullam ut arcu suscipit, eleifend velit in, cursus libero. Ut eleifend facilisis odio sit amet feugiat. "
+ "Phasellus at nunc sit amet elit sagittis commodo ac in nisi. Fusce vitae aliquam quam. Integer vel nibh "
+ "euismod, tempus elit vitae, pharetra est. Duis vulputate enim a elementum dignissim. Morbi dictum enim id "
+ "elit scelerisque, in elementum nulla pharetra. \n"
+ "Aenean aliquet aliquet condimentum. Proin dapibus dui id libero tempus feugiat. Sed commodo ligula a "
+ "lectus mattis, vitae tincidunt velit auctor. Fusce quis semper dui. Phasellus eu efficitur sem. Ut non sem"
+ " sit amet enim condimentum venenatis id dictum massa. Nullam sagittis lacus a neque sodales, et ultrices "
+ "arcu mattis. Aliquam erat volutpat. \n"
+ "Aenean fringilla quam elit, id mattis purus vestibulum nec. Praesent porta eros in dapibus molestie. "
+ "Vestibulum orci libero, tincidunt et turpis eget, condimentum lobortis enim. Fusce suscipit ante et mauris"
+ " consequat cursus nec laoreet lorem. Maecenas in sollicitudin diam, non tincidunt purus. Nunc mauris "
+ "purus, laoreet eget interdum vitae, placerat a sapien. In mi risus, blandit eu facilisis nec, molestie "
+ "suscipit leo. Pellentesque molestie urna vitae dui faucibus bibendum. \n"
+ "Donec quis ipsum ultricies, imperdiet ex vel, scelerisque eros. Ut at urna arcu. Vestibulum rutrum odio "
+ "dolor, vitae cursus nunc pulvinar vel. Donec accumsan sapien in malesuada tempor. Maecenas in condimentum "
+ "eros. Sed vestibulum facilisis massa a iaculis. Etiam et nibh felis. Donec maximus, sem quis vestibulum "
+ "gravida, turpis risus congue dolor, pharetra tincidunt lectus nisi at velit.";

EventHubClientBuilderTest() {
super(new ClientLogger(EventHubClientBuilderTest.class));
}

@Test
public void missingConnectionString() {
final EventHubClientBuilder builder = new EventHubClientBuilder();
Expand Down Expand Up @@ -125,6 +162,115 @@ public void testProxyOptionsConfiguration(String proxyConfiguration, boolean exp
Assertions.assertEquals(expectedClientCreation, clientCreated);
}

@Test
v-hongli1 marked this conversation as resolved.
Show resolved Hide resolved
public void sendAndReceiveEventByAzureNameKeyCredential() {
ConnectionStringProperties properties = getConnectionStringProperties();
String fullyQualifiedNamespace = getFullyQualifiedDomainName();
String sharedAccessKeyName = properties.getSharedAccessKeyName();;
String sharedAccessKey = properties.getSharedAccessKey();
String eventHubName = getEventHubName();

final EventData testData = new EventData(TEST_CONTENTS.getBytes(UTF_8));

EventHubProducerClient asyncProducerClient = new EventHubClientBuilder()
.credential(fullyQualifiedNamespace, eventHubName,
new AzureNamedKeyCredential(sharedAccessKeyName, sharedAccessKey))
.buildAsyncProducerClient();
asyncProducerClient.send(testData);
v-hongli1 marked this conversation as resolved.
Show resolved Hide resolved

EventHubConsumerAsyncClient asyncConsumerClient = new EventHubClientBuilder()
.credential(fullyQualifiedNamespace, eventHubName,
new AzureNamedKeyCredential(sharedAccessKeyName, sharedAccessKey))
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.buildAsyncConsumerClient();
StepVerifier.create(asyncConsumerClient.receive())
.assertNext(consumer -> Assertions.assertEquals(consumer.getData().getBody(), TEST_CONTENTS.getBytes(UTF_8)))
.verifyComplete();

asyncProducerClient.close();
asyncConsumerClient.close();
}


@Test
public void sendAndReceiveEventByAzureSasCredential() {
ConnectionStringProperties properties = getConnectionStringProperties(true);
String fullyQualifiedNamespace = getFullyQualifiedDomainName();
String sharedAccessSignature = properties.getSharedAccessSignature();
String eventHubName = getEventHubName();

final EventData testData = new EventData(TEST_CONTENTS.getBytes(UTF_8));

EventHubProducerClient asyncProducerClient = new EventHubClientBuilder()
.credential(fullyQualifiedNamespace, eventHubName,
new AzureSasCredential(sharedAccessSignature))
.buildAsyncProducerClient();
asyncProducerClient.send(testData);

EventHubConsumerAsyncClient asyncConsumerClient = new EventHubClientBuilder()
.credential(fullyQualifiedNamespace, eventHubName,
new AzureSasCredential(sharedAccessSignature))
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.buildAsyncConsumerClient();
StepVerifier.create(asyncConsumerClient.receive())
.assertNext(consumer -> Assertions.assertEquals(consumer.getData().getBody(), TEST_CONTENTS.getBytes(UTF_8)))
.verifyComplete();

asyncProducerClient.close();
asyncConsumerClient.close();
}

@Test
public void testConnectionWithAzureNameKeyCredential() {
String fullyQualifiedNamespace = "sb-name.servicebus.windows.net";
String sharedAccessKeyName = "SharedAccessKeyName test-value";
String sharedAccessKey = "SharedAccessKey test-value";
String eventHubName = "test-event-hub-name";

assertThrows(NullPointerException.class, () -> new EventHubClientBuilder()
.credential(null, eventHubName,
new AzureNamedKeyCredential(sharedAccessKeyName, sharedAccessKey)));

assertThrows(NullPointerException.class, () -> new EventHubClientBuilder()
.credential(fullyQualifiedNamespace, null,
new AzureNamedKeyCredential(sharedAccessKeyName, sharedAccessKey)));

assertThrows(IllegalArgumentException.class, () -> new EventHubClientBuilder()
.credential("", eventHubName,
new AzureNamedKeyCredential(sharedAccessKeyName, sharedAccessKey)));

assertThrows(IllegalArgumentException.class, () -> new EventHubClientBuilder()
.credential(fullyQualifiedNamespace, "",
new AzureNamedKeyCredential(sharedAccessKeyName, sharedAccessKey)));

assertThrows(NullPointerException.class, () -> new EventHubClientBuilder()
.credential(fullyQualifiedNamespace, eventHubName, (AzureNamedKeyCredential) null));

}

@Test
public void testConnectionWithAzureSasCredential() {
String fullyQualifiedNamespace = "sb-name.servicebus.windows.net";
String sharedAccessSignature = "SharedAccessSignature test-value";
String eventHubName = "test-event-hub-name";

assertThrows(NullPointerException.class, () -> new EventHubClientBuilder()
.credential(null, eventHubName, new AzureSasCredential(sharedAccessSignature)));

assertThrows(NullPointerException.class, () -> new EventHubClientBuilder()
.credential(fullyQualifiedNamespace, null, new AzureSasCredential(sharedAccessSignature)));

assertThrows(IllegalArgumentException.class, () -> new EventHubClientBuilder()
.credential("", eventHubName, new AzureSasCredential(sharedAccessSignature)));

assertThrows(IllegalArgumentException.class, () -> new EventHubClientBuilder()
.credential(fullyQualifiedNamespace, "", new AzureSasCredential(sharedAccessSignature)));

assertThrows(NullPointerException.class, () -> new EventHubClientBuilder()
.credential(fullyQualifiedNamespace, eventHubName, (AzureSasCredential) null));

}

private static Stream<Arguments> getProxyConfigurations() {
return Stream.of(
Arguments.of("http://localhost:8080", true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,11 @@ protected static EventHubClientBuilder createBuilder(boolean useCredentials) {
}

protected static ConnectionStringProperties getConnectionStringProperties() {
return new ConnectionStringProperties(getConnectionString());
return new ConnectionStringProperties(getConnectionString(false));
}

protected static ConnectionStringProperties getConnectionStringProperties(boolean withSas) {
return new ConnectionStringProperties(getConnectionString(withSas));
}

/**
Expand Down