diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java index 1da1431a127a0..4d0e12a6845bf 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java @@ -19,6 +19,8 @@ import com.azure.core.amqp.models.CbsAuthorizationType; import com.azure.core.annotation.ServiceClientBuilder; import com.azure.core.annotation.ServiceClientProtocol; +import com.azure.core.credential.AzureNamedKeyCredential; +import com.azure.core.credential.AzureSasCredential; import com.azure.core.credential.TokenCredential; import com.azure.core.exception.AzureException; import com.azure.core.util.ClientOptions; @@ -235,6 +237,55 @@ public ServiceBusClientBuilder credential(String fullyQualifiedNamespace, TokenC return this; } + /** + * Sets the credential for the Service Bus resource. + * + * @param fullyQualifiedNamespace for the Service Bus. + * @param credential {@link AzureNamedKeyCredential} to be used for authentication. + * + * @return The updated {@link ServiceBusClientBuilder} object. + */ + public ServiceBusClientBuilder credential(String fullyQualifiedNamespace, AzureNamedKeyCredential credential) { + + this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace, + "'fullyQualifiedNamespace' cannot be null."); + Objects.requireNonNull(credential, "'credential' cannot be null."); + + this.credentials = new ServiceBusSharedKeyCredential(credential.getAzureNamedKey().getName(), + credential.getAzureNamedKey().getKey(), ServiceBusConstants.TOKEN_VALIDITY); + + if (CoreUtils.isNullOrEmpty(fullyQualifiedNamespace)) { + throw logger.logExceptionAsError( + new IllegalArgumentException("'fullyQualifiedNamespace' cannot be an empty string.")); + } + + return this; + } + + /** + * Sets the credential for the Service Bus resource. + * + * @param fullyQualifiedNamespace for the Service Bus. + * @param credential {@link AzureSasCredential} to be used for authentication. + * + * @return The updated {@link ServiceBusClientBuilder} object. + */ + public ServiceBusClientBuilder credential(String fullyQualifiedNamespace, AzureSasCredential credential) { + + this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace, + "'fullyQualifiedNamespace' cannot be null."); + Objects.requireNonNull(credential, "'credential' cannot be null."); + + this.credentials = new ServiceBusSharedKeyCredential(credential.getSignature()); + + if (CoreUtils.isNullOrEmpty(fullyQualifiedNamespace)) { + throw logger.logExceptionAsError( + new IllegalArgumentException("'fullyQualifiedNamespace' cannot be an empty string.")); + } + + return this; + } + /** * Sets the proxy configuration to use for {@link ServiceBusSenderAsyncClient}. When a proxy is configured, {@link * AmqpTransportType#AMQP_WEB_SOCKETS} must be used for the transport type. diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/IntegrationTestBase.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/IntegrationTestBase.java index 4d65ca84d8379..fc191238fb2d7 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/IntegrationTestBase.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/IntegrationTestBase.java @@ -6,6 +6,7 @@ import com.azure.core.amqp.AmqpTransportType; import com.azure.core.amqp.ProxyAuthenticationType; import com.azure.core.amqp.ProxyOptions; +import com.azure.core.amqp.implementation.ConnectionStringProperties; import com.azure.core.test.TestBase; import com.azure.core.test.TestMode; import com.azure.core.util.AsyncCloseable; @@ -19,6 +20,7 @@ import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder; import com.azure.messaging.servicebus.implementation.DispositionStatus; import com.azure.messaging.servicebus.implementation.MessagingEntityType; + import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -114,8 +116,20 @@ public TestMode getTestMode() { return CoreUtils.isNullOrEmpty(getConnectionString()) ? TestMode.PLAYBACK : TestMode.RECORD; } - public String getConnectionString() { - return TestUtils.getConnectionString(); + public static String getConnectionString() { + return TestUtils.getConnectionString(false); + } + + public static String getConnectionString(boolean withSas) { + return TestUtils.getConnectionString(withSas); + } + + protected static ConnectionStringProperties getConnectionStringProperties() { + return new ConnectionStringProperties(getConnectionString(false)); + } + + protected static ConnectionStringProperties getConnectionStringProperties(boolean withSas) { + return new ConnectionStringProperties(getConnectionString(withSas)); } public String getFullyQualifiedDomainName() { diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java index 80d57e93ce8c7..1c13357d2f2c2 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusClientBuilderTest.java @@ -6,7 +6,11 @@ import com.azure.core.amqp.AmqpTransportType; import com.azure.core.amqp.ProxyAuthenticationType; import com.azure.core.amqp.ProxyOptions; +import com.azure.core.amqp.implementation.ConnectionStringProperties; +import com.azure.core.credential.AzureNamedKeyCredential; +import com.azure.core.credential.AzureSasCredential; import com.azure.core.util.Configuration; +import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusReceiverClientBuilder; import com.azure.messaging.servicebus.ServiceBusClientBuilder.ServiceBusSenderClientBuilder; import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; @@ -16,6 +20,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import reactor.test.StepVerifier; import java.net.InetSocketAddress; import java.net.Proxy; @@ -28,7 +33,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -class ServiceBusClientBuilderTest { +import static java.nio.charset.StandardCharsets.UTF_8; + +class ServiceBusClientBuilderTest extends IntegrationTestBase { private static final String NAMESPACE_NAME = "dummyNamespaceName"; private static final String DEFAULT_DOMAIN_NAME = "servicebus.windows.net/"; private static final String ENDPOINT_FORMAT = "sb://%s.%s"; @@ -51,6 +58,12 @@ class ServiceBusClientBuilderTest { ENDPOINT, SHARED_ACCESS_KEY_NAME, SHARED_ACCESS_KEY, QUEUE_NAME); private static final Proxy PROXY_ADDRESS = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(PROXY_HOST, Integer.parseInt(PROXY_PORT))); + private static final String TEST_MESSAGE = "SSLorem ipsum dolor sit amet, consectetur adipiscing elit. Donec vehicula posuere lobortis. Aliquam finibus volutpat dolor, faucibus pellentesque ipsum bibendum vitae. Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos. Ut sit amet urna hendrerit, dapibus justo a, sodales justo. Mauris finibus augue id pulvinar congue. Nam maximus luctus ipsum, at commodo ligula euismod ac. Phasellus vitae lacus sit amet diam porta placerat. \nUt sodales efficitur sapien ut posuere. Morbi sed tellus est. Proin eu erat purus. Proin massa nunc, condimentum id iaculis dignissim, consectetur et odio. Cras suscipit sem eu libero aliquam tincidunt. Nullam ut arcu suscipit, eleifend velit in, cursus libero. Ut eleifend facilisis odio sit amet feugiat. Phasellus at nunc sit amet elit sagittis commodo ac in nisi. Fusce vitae aliquam quam. Integer vel nibh euismod, tempus elit vitae, pharetra est. Duis vulputate enim a elementum dignissim. Morbi dictum enim id elit scelerisque, in elementum nulla pharetra. \nAenean aliquet aliquet condimentum. Proin dapibus dui id libero tempus feugiat. Sed commodo ligula a lectus mattis, vitae tincidunt velit auctor. Fusce quis semper dui. Phasellus eu efficitur sem. Ut non sem sit amet enim condimentum venenatis id dictum massa. Nullam sagittis lacus a neque sodales, et ultrices arcu mattis. Aliquam erat volutpat. \nAenean fringilla quam elit, id mattis purus vestibulum nec. Praesent porta eros in dapibus molestie. Vestibulum orci libero, tincidunt et turpis eget, condimentum lobortis enim. Fusce suscipit ante et mauris consequat cursus nec laoreet lorem. Maecenas in sollicitudin diam, non tincidunt purus. Nunc mauris purus, laoreet eget interdum vitae, placerat a sapien. In mi risus, blandit eu facilisis nec, molestie suscipit leo. Pellentesque molestie urna vitae dui faucibus bibendum. \nDonec quis ipsum ultricies, imperdiet ex vel, scelerisque eros. Ut at urna arcu. Vestibulum rutrum odio dolor, vitae cursus nunc pulvinar vel. Donec accumsan sapien in malesuada tempor. Maecenas in condimentum eros. Sed vestibulum facilisis massa a iaculis. Etiam et nibh felis. Donec maximus, sem quis vestibulum gravida, turpis risus congue dolor, pharetra tincidunt lectus nisi at velit."; + + ServiceBusClientBuilderTest() { + super(new ClientLogger(ServiceBusClientBuilderTest.class)); + } + @Test void deadLetterqueueClient() { // Arrange @@ -243,6 +256,103 @@ public void testConnectionStringWithSas() { .connectionString("Endpoint=sb://sb-name.servicebus.windows.net/;EntityPath=sb-name")); } + @Test + public void testBatchSendEventByAzureNameKeyCredential() { + ConnectionStringProperties properties = getConnectionStringProperties(); + String fullyQualifiedNamespace = getFullyQualifiedDomainName(); + String sharedAccessKeyName = properties.getSharedAccessKeyName(); + String sharedAccessKey = properties.getSharedAccessKey(); + String queueName = getQueueName(0); + + final ServiceBusMessage testData = new ServiceBusMessage(TEST_MESSAGE.getBytes(UTF_8)); + + ServiceBusSenderAsyncClient senderAsyncClient = new ServiceBusClientBuilder() + .credential(fullyQualifiedNamespace, new AzureNamedKeyCredential(sharedAccessKeyName, sharedAccessKey)) + .sender() + .queueName(queueName) + .buildAsyncClient(); + try { + StepVerifier.create( + senderAsyncClient.createMessageBatch().flatMap(batch -> { + assertTrue(batch.tryAddMessage(testData)); + return senderAsyncClient.sendMessages(batch); + }) + ).verifyComplete(); + } finally { + senderAsyncClient.close(); + } + } + + + @Test + public void testBatchSendEventByAzureSasCredential() { + ConnectionStringProperties properties = getConnectionStringProperties(true); + String fullyQualifiedNamespace = getFullyQualifiedDomainName(); + String sharedAccessSignature = properties.getSharedAccessSignature(); + String queueName = getQueueName(0); + + final ServiceBusMessage testData = new ServiceBusMessage(TEST_MESSAGE.getBytes(UTF_8)); + + ServiceBusSenderAsyncClient senderAsyncClient = new ServiceBusClientBuilder() + .credential(fullyQualifiedNamespace, + new AzureSasCredential(sharedAccessSignature)) + .sender() + .queueName(queueName) + .buildAsyncClient(); + try { + StepVerifier.create( + senderAsyncClient.createMessageBatch().flatMap(batch -> { + assertTrue(batch.tryAddMessage(testData)); + return senderAsyncClient.sendMessages(batch); + }) + ).verifyComplete(); + } finally { + senderAsyncClient.close(); + } + } + + @Test + public void testConnectionWithAzureNameKeyCredential() { + String fullyQualifiedNamespace = "sb-name.servicebus.windows.net"; + String sharedAccessKeyName = "SharedAccessKeyName test-value"; + String sharedAccessKey = "SharedAccessKey test-value"; + + assertThrows(NullPointerException.class, () -> new ServiceBusClientBuilder() + .credential(null, + new AzureNamedKeyCredential(sharedAccessKeyName, sharedAccessKey))); + + assertThrows(IllegalArgumentException.class, () -> new ServiceBusClientBuilder() + .credential("", + new AzureNamedKeyCredential(sharedAccessKeyName, sharedAccessKey))); + + assertThrows(IllegalArgumentException.class, () -> new ServiceBusClientBuilder() + .credential(fullyQualifiedNamespace, + new AzureNamedKeyCredential(sharedAccessKeyName, sharedAccessKey))); + + assertThrows(NullPointerException.class, () -> new ServiceBusClientBuilder() + .credential(fullyQualifiedNamespace, (AzureNamedKeyCredential) null)); + + } + + @Test + public void testConnectionWithAzureSasCredential() { + String fullyQualifiedNamespace = "sb-name.servicebus.windows.net"; + String sharedAccessSignature = "SharedAccessSignature test-value"; + + assertThrows(NullPointerException.class, () -> new ServiceBusClientBuilder() + .credential(null, new AzureSasCredential(sharedAccessSignature))); + + assertThrows(IllegalArgumentException.class, () -> new ServiceBusClientBuilder() + .credential("", new AzureSasCredential(sharedAccessSignature))); + + assertThrows(IllegalArgumentException.class, () -> new ServiceBusClientBuilder() + .credential(fullyQualifiedNamespace, new AzureSasCredential(sharedAccessSignature))); + + assertThrows(NullPointerException.class, () -> new ServiceBusClientBuilder() + .credential(fullyQualifiedNamespace, (AzureSasCredential) null)); + + } + private static Stream getProxyConfigurations() { return Stream.of( Arguments.of("http://localhost:8080", true), diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java index 9d26ff6d04867..a5f6343b38792 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSessionManagerTest.java @@ -14,32 +14,27 @@ import com.azure.core.credential.TokenCredential; import com.azure.core.util.ClientOptions; import com.azure.core.util.logging.ClientLogger; -import com.azure.messaging.servicebus.implementation.MessagingEntityType; -import com.azure.messaging.servicebus.implementation.ServiceBusAmqpConnection; -import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor; -import com.azure.messaging.servicebus.implementation.ServiceBusConstants; -import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode; -import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink; +import com.azure.messaging.servicebus.implementation.*; import com.azure.messaging.servicebus.models.ServiceBusReceiveMode; import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.engine.SslDomain; import org.apache.qpid.proton.message.Message; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInfo; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; +import org.junit.jupiter.api.AfterEach; import org.mockito.Mock; -import org.mockito.Mockito; +import org.mockito.Captor; +import org.mockito.ArgumentCaptor; import org.mockito.MockitoAnnotations; -import reactor.core.publisher.EmitterProcessor; -import reactor.core.publisher.Flux; +import org.mockito.Mockito; +import reactor.core.publisher.ReplayProcessor; import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; -import reactor.core.publisher.ReplayProcessor; +import reactor.core.publisher.EmitterProcessor; +import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import reactor.test.publisher.TestPublisher; diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TestUtils.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TestUtils.java index 5671b2a6e84d1..b197595326f21 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TestUtils.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/TestUtils.java @@ -81,7 +81,10 @@ public class TestUtils { * * @return The namespace connection string. */ - public static String getConnectionString() { + public static String getConnectionString(boolean withSas) { + if (withSas) { + return System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING_WITH_SAS"); + } return System.getenv("AZURE_SERVICEBUS_NAMESPACE_CONNECTION_STRING"); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationAsyncClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationAsyncClientIntegrationTest.java index 3109e1fd9d190..f769c2baae884 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationAsyncClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/administration/ServiceBusAdministrationAsyncClientIntegrationTest.java @@ -829,7 +829,7 @@ void getSubscriptionRuntimePropertiesUnauthorizedClient(HttpClient httpClient) { // Arrange final String connectionString = interceptorManager.isPlaybackMode() ? "Endpoint=sb://foo.servicebus.windows.net;SharedAccessKeyName=dummyKey;SharedAccessKey=dummyAccessKey" - : TestUtils.getConnectionString(); + : TestUtils.getConnectionString(false); final String connectionStringUpdated = connectionString.replace("SharedAccessKey=", "SharedAccessKey=fake-key-"); @@ -990,7 +990,7 @@ void updateRuleResponse(HttpClient httpClient) { private ServiceBusAdministrationAsyncClient createClient(HttpClient httpClient) { final String connectionString = interceptorManager.isPlaybackMode() ? "Endpoint=sb://foo.servicebus.windows.net;SharedAccessKeyName=dummyKey;SharedAccessKey=dummyAccessKey" - : TestUtils.getConnectionString(); + : TestUtils.getConnectionString(false); final ServiceBusAdministrationClientBuilder builder = new ServiceBusAdministrationClientBuilder() .httpLogOptions(new HttpLogOptions().setLogLevel(HttpLogDetailLevel.BODY_AND_HEADERS)) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusAdministrationClientImplIntegrationTests.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusAdministrationClientImplIntegrationTests.java index 9f20d26223c38..11d990e90690c 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusAdministrationClientImplIntegrationTests.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusAdministrationClientImplIntegrationTests.java @@ -234,7 +234,7 @@ void listQueuesImplementation(HttpClient httpClient) { private ServiceBusManagementClientImpl createClient(HttpClient httpClient) { final String connectionString = interceptorManager.isPlaybackMode() ? "Endpoint=sb://foo.servicebus.windows.net;SharedAccessKeyName=dummyKey;SharedAccessKey=dummyAccessKey" - : TestUtils.getConnectionString(); + : TestUtils.getConnectionString(false); final ConnectionStringProperties properties = new ConnectionStringProperties(connectionString); final ServiceBusSharedKeyCredential credential = new ServiceBusSharedKeyCredential( properties.getSharedAccessKeyName(), properties.getSharedAccessKey());