diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubClientFactory.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubClientFactory.cs index e81209464b79d..af6d7e8b7c867 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubClientFactory.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubClientFactory.cs @@ -129,7 +129,15 @@ internal IEventHubConsumerClient GetEventHubConsumerClient(string eventHubName, EventHubConsumerClient client = null; if (_options.RegisteredConsumerCredentials.TryGetValue(eventHubName, out var creds)) { - client = new EventHubConsumerClient(consumerGroup, creds.EventHubConnectionString, eventHubName); + client = new EventHubConsumerClient( + consumerGroup, + creds.EventHubConnectionString, + eventHubName, + new EventHubConsumerClientOptions + { + RetryOptions = _options.RetryOptions, + ConnectionOptions = _options.ConnectionOptions + }); } else if (!string.IsNullOrEmpty(connection)) { @@ -138,11 +146,27 @@ internal IEventHubConsumerClient GetEventHubConsumerClient(string eventHubName, if (info.FullyQualifiedEndpoint != null && info.TokenCredential != null) { - client = new EventHubConsumerClient(consumerGroup, info.FullyQualifiedEndpoint, eventHubName, info.TokenCredential); + client = new EventHubConsumerClient( + consumerGroup, + info.FullyQualifiedEndpoint, + eventHubName, + info.TokenCredential, + new EventHubConsumerClientOptions + { + RetryOptions = _options.RetryOptions, + ConnectionOptions = _options.ConnectionOptions + }); } else { - client = new EventHubConsumerClient(consumerGroup, NormalizeConnectionString(info.ConnectionString, eventHubName)); + client = new EventHubConsumerClient( + consumerGroup, + NormalizeConnectionString(info.ConnectionString, eventHubName), + new EventHubConsumerClientOptions + { + RetryOptions = _options.RetryOptions, + ConnectionOptions = _options.ConnectionOptions + }); } } diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsClientFactoryTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsClientFactoryTests.cs index 0e1dc9d55ced2..77210c868e0bf 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsClientFactoryTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsClientFactoryTests.cs @@ -3,7 +3,13 @@ using System; using System.Collections.Generic; +using System.Reflection; using Azure.Identity; +using Azure.Messaging.EventHubs; +using Azure.Messaging.EventHubs.Consumer; +using Azure.Messaging.EventHubs.Primitives; +using Azure.Messaging.EventHubs.Producer; +using Microsoft.Azure.WebJobs.EventHubs.Processor; using Microsoft.Extensions.Azure; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Options; @@ -25,12 +31,12 @@ public void EntityPathInConnectionString(string expectedPathName, string connect EventHubOptions options = new EventHubOptions(); // Test sender - options.AddSender("k1", connectionString); + options.AddSender(expectedPathName, connectionString); var configuration = CreateConfiguration(); var factory = new EventHubClientFactory(configuration, Mock.Of(), Options.Create(options), new DefaultNameResolver(configuration)); - var client = factory.GetEventHubProducerClient("k1", null); + var client = factory.GetEventHubProducerClient(expectedPathName, null); Assert.AreEqual(expectedPathName, client.EventHubName); } @@ -44,7 +50,7 @@ public void GetEventHubClient_AddsConnection(string expectedPathName, string con var factory = new EventHubClientFactory(configuration, Mock.Of(), Options.Create(options), new DefaultNameResolver(configuration)); - var client = factory.GetEventHubProducerClient("k1", "connection"); + var client = factory.GetEventHubProducerClient(expectedPathName, "connection"); Assert.AreEqual(expectedPathName, client.EventHubName); } @@ -145,6 +151,120 @@ public void UsesRegisteredConnectionToStorageAccount() Assert.AreEqual("http://blobs/azure-webjobs-eventhub", client.Uri.ToString()); } + [TestCase("k1", ConnectionString)] + [TestCase("path2", ConnectionStringWithEventHub)] + public void RespectsConnectionOptionsForProducer(string expectedPathName, string connectionString) + { + var testEndpoint = new Uri("http://mycustomendpoint.com"); + EventHubOptions options = new EventHubOptions + { + ConnectionOptions = new EventHubConnectionOptions + { + CustomEndpointAddress = testEndpoint + }, + RetryOptions = new EventHubsRetryOptions + { + MaximumRetries = 10 + } + }; + + options.AddSender(expectedPathName, connectionString); + + var configuration = CreateConfiguration(); + var factory = new EventHubClientFactory(configuration, Mock.Of(), Options.Create(options), new DefaultNameResolver(configuration)); + + var producer = factory.GetEventHubProducerClient(expectedPathName, null); + EventHubConnection connection = (EventHubConnection)typeof(EventHubProducerClient).GetProperty("Connection", BindingFlags.NonPublic | BindingFlags.Instance) + .GetValue(producer); + EventHubConnectionOptions connectionOptions = (EventHubConnectionOptions)typeof(EventHubConnection).GetProperty("Options", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(connection); + + Assert.AreEqual(testEndpoint, connectionOptions.CustomEndpointAddress); + Assert.AreEqual(expectedPathName, producer.EventHubName); + + EventHubProducerClientOptions producerOptions = (EventHubProducerClientOptions)typeof(EventHubProducerClient).GetProperty("Options", BindingFlags.NonPublic | BindingFlags.Instance).GetValue(producer); + Assert.AreEqual(10, producerOptions.RetryOptions.MaximumRetries); + Assert.AreEqual(expectedPathName, producer.EventHubName); + } + + [TestCase("k1", ConnectionString)] + [TestCase("path2", ConnectionStringWithEventHub)] + public void RespectsConnectionOptionsForConsumer(string expectedPathName, string connectionString) + { + var testEndpoint = new Uri("http://mycustomendpoint.com"); + EventHubOptions options = new EventHubOptions + { + ConnectionOptions = new EventHubConnectionOptions + { + CustomEndpointAddress = testEndpoint + }, + RetryOptions = new EventHubsRetryOptions + { + MaximumRetries = 10 + } + }; + + options.AddReceiver(expectedPathName, connectionString); + + var configuration = CreateConfiguration(); + var factory = new EventHubClientFactory(configuration, Mock.Of(), Options.Create(options), new DefaultNameResolver(configuration)); + + var consumer = factory.GetEventHubConsumerClient(expectedPathName, null, "consumer"); + var consumerClient = (EventHubConsumerClient)typeof(EventHubConsumerClientImpl) + .GetField("_client", BindingFlags.NonPublic | BindingFlags.Instance) + .GetValue(consumer); + EventHubConnection connection = (EventHubConnection)typeof(EventHubConsumerClient) + .GetProperty("Connection", BindingFlags.NonPublic | BindingFlags.Instance) + .GetValue(consumerClient); + EventHubConnectionOptions connectionOptions = (EventHubConnectionOptions)typeof(EventHubConnection) + .GetProperty("Options", BindingFlags.NonPublic | BindingFlags.Instance) + .GetValue(connection); + Assert.AreEqual(testEndpoint, connectionOptions.CustomEndpointAddress); + + EventHubsRetryPolicy retryPolicy = (EventHubsRetryPolicy)typeof(EventHubConsumerClient) + .GetProperty("RetryPolicy", BindingFlags.NonPublic | BindingFlags.Instance) + .GetValue(consumerClient); + + // Reflection was still necessary here because BasicRetryOptions (which is the concrete derived type) + // is internal. + EventHubsRetryOptions retryOptions = (EventHubsRetryOptions)retryPolicy.GetType() + .GetProperty("Options", BindingFlags.Public | BindingFlags.Instance) + .GetValue(retryPolicy); + Assert.AreEqual(10, retryOptions.MaximumRetries); + Assert.AreEqual(expectedPathName, consumer.EventHubName); + } + + [TestCase("k1", ConnectionString)] + [TestCase("path2", ConnectionStringWithEventHub)] + public void RespectsConnectionOptionsForProcessor(string expectedPathName, string connectionString) + { + var testEndpoint = new Uri("http://mycustomendpoint.com"); + EventHubOptions options = new EventHubOptions + { + ConnectionOptions = new EventHubConnectionOptions + { + CustomEndpointAddress = testEndpoint + }, + RetryOptions = new EventHubsRetryOptions + { + MaximumRetries = 10 + } + }; + + options.AddReceiver(expectedPathName, connectionString); + + var configuration = CreateConfiguration(); + var factory = new EventHubClientFactory(configuration, Mock.Of(), Options.Create(options), new DefaultNameResolver(configuration)); + + var processor = factory.GetEventProcessorHost(expectedPathName, null, "consumer"); + EventProcessorOptions processorOptions = (EventProcessorOptions)typeof(EventProcessor) + .GetProperty("Options", BindingFlags.NonPublic | BindingFlags.Instance) + .GetValue(processor); + Assert.AreEqual(testEndpoint, processorOptions.ConnectionOptions.CustomEndpointAddress); + + Assert.AreEqual(10, processorOptions.RetryOptions.MaximumRetries); + Assert.AreEqual(expectedPathName, processor.EventHubName); + } + private IConfiguration CreateConfiguration(params KeyValuePair[] data) { return new ConfigurationBuilder().AddInMemoryCollection(data).Build();