From 967af1a0663c6748fa0f9580a9dd9715fb0f348b Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Thu, 3 Dec 2020 13:24:20 -0800 Subject: [PATCH] Add token credential auth support (#17308) --- ...oft.Azure.WebJobs.Extensions.EventHubs.sln | 14 ++ ...obs.Extensions.EventHubs.netstandard2.0.cs | 7 +- .../src/Config/EventHubClientFactory.cs | 213 ++++++++++++++++++ .../Config/EventHubExtensionConfigProvider.cs | 44 ++-- .../src/Config/EventHubOptions.cs | 168 ++------------ .../EventHubWebJobsBuilderExtensions.cs | 8 +- .../src/EventHubAttribute.cs | 5 +- .../src/EventHubTriggerAttribute.cs | 4 +- .../src/Listeners/EventHubListener.cs | 4 +- ....Azure.WebJobs.Extensions.EventHubs.csproj | 3 + .../EventHubsConnectionStringBuilder.cs | 94 -------- .../src/Processor/EventProcessorHost.cs | 15 ++ ...EventHubTriggerAttributeBindingProvider.cs | 36 +-- .../src/Triggers/EventHubTriggerInput.cs | 2 - .../tests/BlobsCheckpointStoreTests.cs | 5 +- .../tests/EventHubEndToEndTests.cs | 149 +++++++++--- .../tests/EventHubListenerTests.cs | 5 +- .../tests/EventHubTests.cs | 26 --- .../tests/EventHubsClientFactoryTests.cs | 153 +++++++++++++ .../tests/EventHubsScaleMonitorTests.cs | 16 +- 20 files changed, 576 insertions(+), 395 deletions(-) create mode 100644 sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubClientFactory.cs delete mode 100644 sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventHubsConnectionStringBuilder.cs create mode 100644 sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsClientFactoryTests.cs diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/Microsoft.Azure.WebJobs.Extensions.EventHubs.sln b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/Microsoft.Azure.WebJobs.Extensions.EventHubs.sln index 49671484fc38a..0913de28658e2 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/Microsoft.Azure.WebJobs.Extensions.EventHubs.sln +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/Microsoft.Azure.WebJobs.Extensions.EventHubs.sln @@ -11,6 +11,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Azure.WebJobs.Ext EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Azure.Messaging.EventHubs", "..\Azure.Messaging.EventHubs\src\Azure.Messaging.EventHubs.csproj", "{B51ECD35-11DA-46D2-89D7-9DE3888CF896}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Extensions.Azure", "..\..\extensions\Microsoft.Extensions.Azure\src\Microsoft.Extensions.Azure.csproj", "{E772769A-7CE1-4FBC-A084-C0936EB2C766}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -72,5 +74,17 @@ Global {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|x64.Build.0 = Release|Any CPU {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|x86.ActiveCfg = Release|Any CPU {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|x86.Build.0 = Release|Any CPU + {E772769A-7CE1-4FBC-A084-C0936EB2C766}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E772769A-7CE1-4FBC-A084-C0936EB2C766}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E772769A-7CE1-4FBC-A084-C0936EB2C766}.Debug|x64.ActiveCfg = Debug|Any CPU + {E772769A-7CE1-4FBC-A084-C0936EB2C766}.Debug|x64.Build.0 = Debug|Any CPU + {E772769A-7CE1-4FBC-A084-C0936EB2C766}.Debug|x86.ActiveCfg = Debug|Any CPU + {E772769A-7CE1-4FBC-A084-C0936EB2C766}.Debug|x86.Build.0 = Debug|Any CPU + {E772769A-7CE1-4FBC-A084-C0936EB2C766}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E772769A-7CE1-4FBC-A084-C0936EB2C766}.Release|Any CPU.Build.0 = Release|Any CPU + {E772769A-7CE1-4FBC-A084-C0936EB2C766}.Release|x64.ActiveCfg = Release|Any CPU + {E772769A-7CE1-4FBC-A084-C0936EB2C766}.Release|x64.Build.0 = Release|Any CPU + {E772769A-7CE1-4FBC-A084-C0936EB2C766}.Release|x86.ActiveCfg = Release|Any CPU + {E772769A-7CE1-4FBC-A084-C0936EB2C766}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection EndGlobal diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/api/Microsoft.Azure.WebJobs.Extensions.EventHubs.netstandard2.0.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/api/Microsoft.Azure.WebJobs.Extensions.EventHubs.netstandard2.0.cs index 89c6e265be79c..c5913c73088db 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/api/Microsoft.Azure.WebJobs.Extensions.EventHubs.netstandard2.0.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/api/Microsoft.Azure.WebJobs.Extensions.EventHubs.netstandard2.0.cs @@ -5,9 +5,8 @@ namespace Microsoft.Azure.WebJobs public sealed partial class EventHubAttribute : System.Attribute { public EventHubAttribute(string eventHubName) { } - [Microsoft.Azure.WebJobs.Description.ConnectionStringAttribute] - public string Connection { get { throw null; } set { } } [Microsoft.Azure.WebJobs.Description.AutoResolveAttribute] + public string Connection { get { throw null; } set { } } public string EventHubName { get { throw null; } } } [Microsoft.Azure.WebJobs.Description.BindingAttribute] @@ -15,7 +14,9 @@ public EventHubAttribute(string eventHubName) { } public sealed partial class EventHubTriggerAttribute : System.Attribute { public EventHubTriggerAttribute(string eventHubName) { } + [Microsoft.Azure.WebJobs.Description.AutoResolveAttribute] public string Connection { get { throw null; } set { } } + [Microsoft.Azure.WebJobs.Description.AutoResolveAttribute] public string ConsumerGroup { get { throw null; } set { } } public string EventHubName { get { throw null; } } } @@ -30,8 +31,6 @@ public EventHubOptions() { } public bool InvokeProcessorAfterReceiveTimeout { get { throw null; } set { } } public string LeaseContainerName { get { throw null; } set { } } public int MaxBatchSize { get { throw null; } set { } } - public void AddEventHubProducerClient(Azure.Messaging.EventHubs.Producer.EventHubProducerClient client) { } - public void AddEventHubProducerClient(string eventHubName, Azure.Messaging.EventHubs.Producer.EventHubProducerClient client) { } public void AddReceiver(string eventHubName, string receiverConnectionString) { } public void AddReceiver(string eventHubName, string receiverConnectionString, string storageConnectionString) { } public void AddSender(string eventHubName, string sendConnectionString) { } diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubClientFactory.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubClientFactory.cs new file mode 100644 index 0000000000000..08de06d34dd39 --- /dev/null +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubClientFactory.cs @@ -0,0 +1,213 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Concurrent; +using Azure.Core; +using Azure.Messaging.EventHubs.Consumer; +using Azure.Messaging.EventHubs.Producer; +using Azure.Storage.Blobs; +using Microsoft.Azure.WebJobs.EventHubs.Processor; +using Microsoft.Azure.WebJobs.Extensions.Clients.Shared; +using Microsoft.Azure.WebJobs.Host; +using Microsoft.Extensions.Azure; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Options; + +namespace Microsoft.Azure.WebJobs.EventHubs +{ + internal class EventHubClientFactory + { + private readonly IConfiguration _configuration; + private readonly AzureComponentFactory _componentFactory; + private readonly EventHubOptions _options; + private readonly INameResolver _nameResolver; + private readonly ConcurrentDictionary _producerCache; + private readonly ConcurrentDictionary _consumerCache = new (); + + public EventHubClientFactory( + IConfiguration configuration, + AzureComponentFactory componentFactory, + IOptions options, + INameResolver nameResolver) + { + _configuration = configuration; + _componentFactory = componentFactory; + _options = options.Value; + _nameResolver = nameResolver; + _producerCache = new ConcurrentDictionary(_options.RegisteredProducers); + } + + internal EventHubProducerClient GetEventHubProducerClient(string eventHubName, string connection) + { + eventHubName = _nameResolver.ResolveWholeString(eventHubName); + + return _producerCache.GetOrAdd(eventHubName, key => + { + if (!string.IsNullOrWhiteSpace(connection)) + { + var info = ResolveConnectionInformation(connection); + + if (info.FullyQualifiedEndpoint != null && + info.TokenCredential != null) + { + return new EventHubProducerClient(info.FullyQualifiedEndpoint, eventHubName, info.TokenCredential); + } + + return new EventHubProducerClient(NormalizeConnectionString(info.ConnectionString, eventHubName)); + } + + throw new InvalidOperationException("No event hub sender named " + eventHubName); + }); + } + + internal EventProcessorHost GetEventProcessorHost(string eventHubName, string connection, string consumerGroup) + { + eventHubName = _nameResolver.ResolveWholeString(eventHubName); + consumerGroup ??= EventHubConsumerClient.DefaultConsumerGroupName; + + if (_options.RegisteredConsumerCredentials.TryGetValue(eventHubName, out var creds)) + { + return new EventProcessorHost(consumerGroup: consumerGroup, + connectionString: creds.EventHubConnectionString, + eventHubName: eventHubName, + options: _options.EventProcessorOptions, + eventBatchMaximumCount: _options.MaxBatchSize, + invokeProcessorAfterReceiveTimeout: _options.InvokeProcessorAfterReceiveTimeout, + exceptionHandler: _options.ExceptionHandler); + } + else if (!string.IsNullOrEmpty(connection)) + { + var info = ResolveConnectionInformation(connection); + + if (info.FullyQualifiedEndpoint != null && + info.TokenCredential != null) + { + return new EventProcessorHost(consumerGroup: consumerGroup, + fullyQualifiedNamespace: info.FullyQualifiedEndpoint, + eventHubName: eventHubName, + credential: info.TokenCredential, + options: _options.EventProcessorOptions, + eventBatchMaximumCount: _options.MaxBatchSize, + invokeProcessorAfterReceiveTimeout: _options.InvokeProcessorAfterReceiveTimeout, + exceptionHandler: _options.ExceptionHandler); + } + + return new EventProcessorHost(consumerGroup: consumerGroup, + connectionString: NormalizeConnectionString(info.ConnectionString, eventHubName), + eventHubName: eventHubName, + options: _options.EventProcessorOptions, + eventBatchMaximumCount: _options.MaxBatchSize, + invokeProcessorAfterReceiveTimeout: _options.InvokeProcessorAfterReceiveTimeout, + exceptionHandler: _options.ExceptionHandler); + } + + throw new InvalidOperationException("No event hub receiver named " + eventHubName); + } + + internal IEventHubConsumerClient GetEventHubConsumerClient(string eventHubName, string connection, string consumerGroup) + { + eventHubName = _nameResolver.ResolveWholeString(eventHubName); + consumerGroup ??= EventHubConsumerClient.DefaultConsumerGroupName; + + return _consumerCache.GetOrAdd(eventHubName, name => + { + EventHubConsumerClient client = null; + if (_options.RegisteredConsumerCredentials.TryGetValue(eventHubName, out var creds)) + { + client = new EventHubConsumerClient(consumerGroup, creds.EventHubConnectionString, eventHubName); + } + else if (!string.IsNullOrEmpty(connection)) + { + var info = ResolveConnectionInformation(connection); + + if (info.FullyQualifiedEndpoint != null && + info.TokenCredential != null) + { + client = new EventHubConsumerClient(consumerGroup, info.FullyQualifiedEndpoint, eventHubName, info.TokenCredential); + } + else + { + client = new EventHubConsumerClient(consumerGroup, NormalizeConnectionString(info.ConnectionString, eventHubName)); + } + } + + if (client != null) + { + return new EventHubConsumerClientImpl(client); + } + + throw new InvalidOperationException("No event hub receiver named " + eventHubName); + }); + } + + internal BlobContainerClient GetCheckpointStoreClient(string eventHubName) + { + string storageConnectionString = null; + if (_options.RegisteredConsumerCredentials.TryGetValue(eventHubName, out var creds)) + { + storageConnectionString = creds.StorageConnectionString; + } + + // Fall back to default if not explicitly registered + return new BlobContainerClient(storageConnectionString ?? _configuration.GetWebJobsConnectionString(ConnectionStringNames.Storage), _options.LeaseContainerName); + } + + internal static string NormalizeConnectionString(string originalConnectionString, string eventHubName) + { + var connectionString = ConnectionString.Parse(originalConnectionString); + + if (!connectionString.ContainsSegmentKey("EntityPath")) + { + connectionString.Add("EntityPath", eventHubName); + } + + return connectionString.ToString(); + } + + private EventHubsConnectionInformation ResolveConnectionInformation(string connection) + { + IConfigurationSection connectionSection = _configuration.GetWebJobsConnectionStringSection(connection); + if (!connectionSection.Exists()) + { + // Not found + throw new InvalidOperationException($"EventHub account connection string '{connection}' does not exist." + + $"Make sure that it is a defined App Setting."); + } + + if (!string.IsNullOrWhiteSpace(connectionSection.Value)) + { + return new EventHubsConnectionInformation(connectionSection.Value); + } + + var fullyQualifiedNamespace = connectionSection["fullyQualifiedNamespace"]; + if (string.IsNullOrWhiteSpace(fullyQualifiedNamespace)) + { + // Not found + throw new InvalidOperationException($"Connection should have an 'fullyQualifiedNamespace' property or be a string representing a connection string."); + } + + var credential = _componentFactory.CreateTokenCredential(connectionSection); + + return new EventHubsConnectionInformation(fullyQualifiedNamespace, credential); + } + + private record EventHubsConnectionInformation + { + public EventHubsConnectionInformation(string connectionString) + { + ConnectionString = connectionString; + } + + public EventHubsConnectionInformation(string fullyQualifiedEndpoint, TokenCredential tokenCredential) + { + FullyQualifiedEndpoint = fullyQualifiedEndpoint; + TokenCredential = tokenCredential; + } + + public string ConnectionString { get; } + public string FullyQualifiedEndpoint { get; } + public TokenCredential TokenCredential { get; } + } + } +} \ No newline at end of file diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs index ed4ce2a6ba4e9..195117a843882 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs @@ -22,22 +22,24 @@ namespace Microsoft.Azure.WebJobs.EventHubs [Extension("EventHubs", configurationSection: "EventHubs")] internal class EventHubExtensionConfigProvider : IExtensionConfigProvider { - private IConfiguration _config; private readonly IOptions _options; private readonly ILoggerFactory _loggerFactory; private readonly IConverterManager _converterManager; - private readonly INameResolver _nameResolver; private readonly IWebJobsExtensionConfiguration _configuration; - - public EventHubExtensionConfigProvider(IConfiguration config, IOptions options, ILoggerFactory loggerFactory, - IConverterManager converterManager, INameResolver nameResolver, IWebJobsExtensionConfiguration configuration) + private readonly EventHubClientFactory _clientFactory; + + public EventHubExtensionConfigProvider( + IOptions options, + ILoggerFactory loggerFactory, + IConverterManager converterManager, + IWebJobsExtensionConfiguration configuration, + EventHubClientFactory clientFactory) { - _config = config; _options = options; _loggerFactory = loggerFactory; _converterManager = converterManager; - _nameResolver = nameResolver; _configuration = configuration; + _clientFactory = clientFactory; } internal Action ExceptionHandler { get; set; } @@ -54,7 +56,7 @@ public void Initialize(ExtensionConfigContext context) throw new ArgumentNullException(nameof(context)); } - _options.Value.SetExceptionHandler(ExceptionReceivedHandler); + _options.Value.ExceptionHandler = ExceptionReceivedHandler; _configuration.ConfigurationSection.Bind(_options); context @@ -65,7 +67,7 @@ public void Initialize(ExtensionConfigContext context) .AddOpenConverter(ConvertPocoToEventData); // register our trigger binding provider - var triggerBindingProvider = new EventHubTriggerAttributeBindingProvider(_config, _nameResolver, _converterManager, _options, _loggerFactory); + var triggerBindingProvider = new EventHubTriggerAttributeBindingProvider(_converterManager, _options, _loggerFactory, _clientFactory); context.AddBindingRule() .BindToTrigger(triggerBindingProvider); @@ -74,10 +76,7 @@ public void Initialize(ExtensionConfigContext context) .BindToCollector(BuildFromAttribute); context.AddBindingRule() - .BindToInput(attribute => - { - return _options.Value.GetEventHubProducerClient(attribute.EventHubName, attribute.Connection); - }); + .BindToInput(attribute => _clientFactory.GetEventHubProducerClient(attribute.EventHubName, attribute.Connection)); ExceptionHandler = (e => { @@ -93,26 +92,9 @@ internal static void LogExceptionReceivedEvent(ExceptionReceivedEventArgs e, ILo Utility.LogException(e.Exception, message, logger); } - private static LogLevel GetLogLevel(Exception ex) - { - var ehex = ex as EventHubsException; - if (!(ex is OperationCanceledException) && (ehex == null || !ehex.IsTransient)) - { - // any non-transient exceptions or unknown exception types - // we want to log as errors - return LogLevel.Error; - } - else - { - // transient messaging errors we log as info so we have a record - // of them, but we don't treat them as actual errors - return LogLevel.Information; - } - } - private IAsyncCollector BuildFromAttribute(EventHubAttribute attribute) { - EventHubProducerClient client = _options.Value.GetEventHubProducerClient(attribute.EventHubName, attribute.Connection); + EventHubProducerClient client = _clientFactory.GetEventHubProducerClient(attribute.EventHubName, attribute.Connection); return new EventHubAsyncCollector(new EventHubProducerClientImpl(client, _loggerFactory)); } diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs index 23fa767b1cf10..42c7ba57d1123 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs @@ -2,21 +2,14 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Globalization; using System.Text; -using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; -using Azure.Messaging.EventHubs.Core; using Azure.Messaging.EventHubs.Primitives; -using Azure.Messaging.EventHubs.Processor; using Azure.Messaging.EventHubs.Producer; -using Azure.Storage.Blobs; using Microsoft.Azure.WebJobs.EventHubs.Processor; using Microsoft.Azure.WebJobs.Hosting; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Newtonsoft.Json.Linq; @@ -24,15 +17,6 @@ namespace Microsoft.Azure.WebJobs.EventHubs { public class EventHubOptions : IOptionsFormatter { - // Event Hub Names are case-insensitive. - // The same path can have multiple connection strings with different permissions (sending and receiving), - // so we track senders and receivers separately and infer which one to use based on the EventHub (sender) vs. EventHubTrigger (receiver) attribute. - // Connection strings may also encapsulate different endpoints. - - // The client cache must be thread safe because clients are accessed/added on the function - private readonly ConcurrentDictionary _clients = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); - private readonly Dictionary _receiverCreds = new Dictionary(StringComparer.OrdinalIgnoreCase); - /// /// Name of the blob container that the EventHostProcessor instances uses to coordinate load balancing listening on an event hub. /// Each event hub gets its own blob prefix within the container. @@ -58,10 +42,7 @@ public EventHubOptions() /// public int BatchCheckpointFrequency { - get - { - return _batchCheckpointFrequency; - } + get => _batchCheckpointFrequency; set { @@ -80,10 +61,7 @@ public int BatchCheckpointFrequency /// public int MaxBatchSize { - get - { - return _maxBatchSize; - } + get => _maxBatchSize; set { @@ -99,38 +77,21 @@ public int MaxBatchSize public EventProcessorOptions EventProcessorOptions { get; } - private Action _exceptionHandler; + internal Action ExceptionHandler { get; set; } - internal void SetExceptionHandler(Action exceptionHandler) - { - if (exceptionHandler == null) - { - throw new ArgumentNullException(nameof(exceptionHandler)); - } - - _exceptionHandler = exceptionHandler; - } - - /// - /// Add an existing client for sending messages to an event hub. Infer the eventHub name from client.path - /// - /// - public void AddEventHubProducerClient(EventHubProducerClient client) - { - if (client == null) - { - throw new ArgumentNullException(nameof(client)); - } - string eventHubName = client.EventHubName; - AddEventHubProducerClient(eventHubName, client); - } + // Event Hub Names are case-insensitive. + // The same path can have multiple connection strings with different permissions (sending and receiving), + // so we track senders and receivers separately and infer which one to use based on the EventHub (sender) vs. EventHubTrigger (receiver) attribute. + // Connection strings may also encapsulate different endpoints. + internal Dictionary RegisteredProducers { get; } = new (); + internal Dictionary RegisteredConsumerCredentials { get; } = new (); /// - /// Add an existing client for sending messages to an event hub. Infer the eventHub name from client.path + /// Add an existing client for sending messages to an event hub. /// /// name of the event hub /// - public void AddEventHubProducerClient(string eventHubName, EventHubProducerClient client) + internal void AddEventHubProducerClient(string eventHubName, EventHubProducerClient client) { if (eventHubName == null) { @@ -141,7 +102,7 @@ public void AddEventHubProducerClient(string eventHubName, EventHubProducerClien throw new ArgumentNullException(nameof(client)); } - _clients[eventHubName] = client; + RegisteredProducers[eventHubName] = client; } /// @@ -160,13 +121,7 @@ public void AddSender(string eventHubName, string sendConnectionString) throw new ArgumentNullException(nameof(sendConnectionString)); } - EventHubsConnectionStringBuilder sb = new EventHubsConnectionStringBuilder(sendConnectionString); - if (string.IsNullOrWhiteSpace(sb.EntityPath)) - { - sb.EntityPath = eventHubName; - } - - var client = new EventHubProducerClient(sb.ToString()); + var client = new EventHubProducerClient(EventHubClientFactory.NormalizeConnectionString(sendConnectionString, eventHubName)); AddEventHubProducerClient(eventHubName, client); } @@ -186,7 +141,7 @@ public void AddReceiver(string eventHubName, string receiverConnectionString) throw new ArgumentNullException(nameof(receiverConnectionString)); } - this._receiverCreds[eventHubName] = new ReceiverCreds + RegisteredConsumerCredentials[eventHubName] = new ReceiverCredentials { EventHubConnectionString = receiverConnectionString }; @@ -213,96 +168,13 @@ public void AddReceiver(string eventHubName, string receiverConnectionString, st throw new ArgumentNullException(nameof(storageConnectionString)); } - this._receiverCreds[eventHubName] = new ReceiverCreds + RegisteredConsumerCredentials[eventHubName] = new ReceiverCredentials { EventHubConnectionString = receiverConnectionString, StorageConnectionString = storageConnectionString }; } - internal EventHubProducerClient GetEventHubProducerClient(string eventHubName, string connection) - { - EventHubProducerClient client; - - if (string.IsNullOrEmpty(eventHubName)) - { - EventHubsConnectionStringBuilder builder = new EventHubsConnectionStringBuilder(connection); - eventHubName = builder.EntityPath; - } - - if (_clients.TryGetValue(eventHubName, out client)) - { - return client; - } - else if (!string.IsNullOrWhiteSpace(connection)) - { - return _clients.GetOrAdd(eventHubName, key => - { - AddSender(key, connection); - return _clients[key]; - }); - } - throw new InvalidOperationException("No event hub sender named " + eventHubName); - } - - // Lookup a listener for receiving events given the name provided in the [EventHubTrigger] attribute. - internal EventProcessorHost GetEventProcessorHost(string eventHubName, string consumerGroup) - { - ReceiverCreds creds; - if (this._receiverCreds.TryGetValue(eventHubName, out creds)) - { - consumerGroup ??= EventHubConsumerClient.DefaultConsumerGroupName; - - // Use blob prefix support available in EPH starting in 2.2.6 - EventProcessorHost host = new EventProcessorHost(consumerGroup: consumerGroup, - connectionString: creds.EventHubConnectionString, - eventHubName: eventHubName, - options: this.EventProcessorOptions, - eventBatchMaximumCount: _maxBatchSize, - invokeProcessorAfterReceiveTimeout: InvokeProcessorAfterReceiveTimeout, exceptionHandler: _exceptionHandler); - - return host; - } - - throw new InvalidOperationException("No event hub receiver named " + eventHubName); - } - - internal IEventHubConsumerClient GetEventHubConsumerClient(string eventHubName, string consumerGroup) - { - ReceiverCreds creds; - if (this._receiverCreds.TryGetValue(eventHubName, out creds)) - { - consumerGroup ??= EventHubConsumerClient.DefaultConsumerGroupName; - - // Use blob prefix support available in EPH starting in 2.2.6 - return new EventHubConsumerClientImpl(new EventHubConsumerClient( - consumerGroup, - creds.EventHubConnectionString, - eventHubName)); - } - - throw new InvalidOperationException("No event hub receiver named " + eventHubName); - } - - - internal string GetCheckpointStoreConnectionString(IConfiguration config, string eventHubName) - { - ReceiverCreds creds; - if (this._receiverCreds.TryGetValue(eventHubName, out creds)) - { - var storageConnectionString = creds.StorageConnectionString; - if (storageConnectionString == null) - { - string defaultStorageString = config.GetWebJobsConnectionString(ConnectionStringNames.Storage); - storageConnectionString = defaultStorageString; - } - - return storageConnectionString; - } - - throw new InvalidOperationException("No event hub receiver named " + eventHubName); - } - private static string EscapeStorageCharacter(char character) { var ordinalValue = (ushort)character; @@ -352,14 +224,6 @@ private static string EscapeBlobPath(string path) return sb.ToString(); } - internal static string GetEventHubNamespace(EventHubsConnectionStringBuilder connectionString) - { - // EventHubs only have 1 endpoint. - var url = connectionString.Endpoint; - var @namespace = url.Host; - return @namespace; - } - /// /// Get the blob prefix used with EventProcessorHost for a given event hub. /// @@ -413,7 +277,7 @@ public string Format() // Hold credentials for a given eventHub name. // Multiple consumer groups (and multiple listeners) on the same hub can share the same credentials. - private class ReceiverCreds + internal class ReceiverCredentials { // Required. public string EventHubConnectionString { get; set; } diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubWebJobsBuilderExtensions.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubWebJobsBuilderExtensions.cs index 16a3662e08371..014c8c1ec4f3c 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubWebJobsBuilderExtensions.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubWebJobsBuilderExtensions.cs @@ -4,6 +4,7 @@ using System; using Microsoft.Azure.WebJobs; using Microsoft.Azure.WebJobs.EventHubs; +using Microsoft.Extensions.Azure; using Microsoft.Extensions.DependencyInjection; namespace Microsoft.Extensions.Hosting @@ -37,10 +38,9 @@ public static IWebJobsBuilder AddEventHubs(this IWebJobsBuilder builder, Action< builder.AddExtension() .BindOptions(); - builder.Services.Configure(options => - { - configure(options); - }); + builder.Services.AddAzureClientsCore(); + builder.Services.AddSingleton(); + builder.Services.Configure(configure); return builder; } diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/EventHubAttribute.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/EventHubAttribute.cs index e340b59a4c608..58e64fa2180a1 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/EventHubAttribute.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/EventHubAttribute.cs @@ -25,13 +25,12 @@ public EventHubAttribute(string eventHubName) /// /// The name of the event hub. /// - [AutoResolve] - public string EventHubName { get; private set; } + public string EventHubName { get; } /// /// Gets or sets the optional connection string name that contains the Event Hub connection string. If missing, tries to use a registered event hub sender. /// - [ConnectionString] + [AutoResolve] public string Connection { get; set; } } } \ No newline at end of file diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/EventHubTriggerAttribute.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/EventHubTriggerAttribute.cs index e9570890b72ed..8165785239f2a 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/EventHubTriggerAttribute.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/EventHubTriggerAttribute.cs @@ -25,16 +25,18 @@ public EventHubTriggerAttribute(string eventHubName) /// /// Name of the event hub. /// - public string EventHubName { get; private set; } + public string EventHubName { get; } /// /// Optional Name of the consumer group. If missing, then use the default name, "$Default" /// + [AutoResolve] public string ConsumerGroup { get; set; } /// /// Gets or sets the optional app setting name that contains the Event Hub connection string. If missing, tries to use a registered event hub receiver. /// + [AutoResolve] public string Connection { get; set; } } } \ No newline at end of file diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs index 75639ba2f5545..b8380d8cdc7a6 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs @@ -38,7 +38,7 @@ public EventHubListener( ITriggeredFunctionExecutor executor, EventProcessorHost eventProcessorHost, bool singleDispatch, - Func clientFactory, + IEventHubConsumerClient consumerClient, BlobsCheckpointStore checkpointStore, EventHubOptions options, ILogger logger) @@ -53,7 +53,7 @@ public EventHubListener( _scaleMonitor = new Lazy( () => new EventHubsScaleMonitor( functionId, - clientFactory(), + consumerClient, checkpointStore, _logger)); } diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj index a07e3f740b577..cccc3144bb5a5 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj @@ -15,15 +15,18 @@ + + + True True diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventHubsConnectionStringBuilder.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventHubsConnectionStringBuilder.cs deleted file mode 100644 index 595a96113bd31..0000000000000 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventHubsConnectionStringBuilder.cs +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Collections.Generic; -using System.Linq; - -namespace Microsoft.Azure.WebJobs.EventHubs.Processor -{ - /// - /// A simple builder which is useful for making small changes to parts of an EventHubs connection string. This class provides no validation, it is expected - /// that if the connection string is invalid it will be detected when it is passed to the actual event hub client. - /// - internal class EventHubsConnectionStringBuilder - { - private const string EntityPathConnectionStringKeyName = "EntityPath"; - private const string EndpointConnectionStringKeyName = "Endpoint"; - - private static readonly char[] ComponentsSplitChars = new char[] { ';' }; - private static readonly char[] ComponentSplitChars = new char[] { '=' }; - - private Dictionary _components; - - public string EntityPath - { - get - { - if (_components.TryGetValue(EntityPathConnectionStringKeyName, out string value)) - { - return value; - } - - return null; - } - - set - { - if (value == null) - { - _components.Remove(EntityPathConnectionStringKeyName); - } - else - { - _components[EntityPathConnectionStringKeyName] = value; - } - } - } - - public Uri Endpoint { - get - { - if (_components.TryGetValue(EndpointConnectionStringKeyName, out string value)) - { - return new Uri(value); - } - - return null; - } - - set - { - if (value == null) - { - _components.Remove(EndpointConnectionStringKeyName); - } - else - { - _components[EndpointConnectionStringKeyName] = value.ToString(); - } - } - } - - public EventHubsConnectionStringBuilder(string connectionString) - { - if (connectionString == null) - { - throw new ArgumentNullException(nameof(connectionString)); - } - - _components = new Dictionary(StringComparer.OrdinalIgnoreCase); - - foreach (string component in connectionString.Split(ComponentsSplitChars, StringSplitOptions.RemoveEmptyEntries)) - { - var parts = component.Split(ComponentSplitChars, 2); - _components[parts[0]] = parts[1]; - } - } - - public override string ToString() - { - return string.Join(";", _components.Select(x => { return $"{x.Key}={x.Value}"; })); - } - } -} diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs index 066d708668c4e..647e123ec93d0 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs @@ -7,6 +7,7 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; +using Azure.Core; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Primitives; using Azure.Messaging.EventHubs.Processor; @@ -41,6 +42,20 @@ public EventProcessorHost(string consumerGroup, _leaseInfos = new ConcurrentDictionary(); } + public EventProcessorHost(string consumerGroup, + string fullyQualifiedNamespace, + TokenCredential credential, + string eventHubName, + EventProcessorOptions options, + int eventBatchMaximumCount, + bool invokeProcessorAfterReceiveTimeout, + Action exceptionHandler) : base(eventBatchMaximumCount, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, options) + { + _invokeProcessorAfterReceiveTimeout = invokeProcessorAfterReceiveTimeout; + _exceptionHandler = exceptionHandler; + _leaseInfos = new ConcurrentDictionary(); + } + protected override async Task> ClaimOwnershipAsync(IEnumerable desiredOwnership, CancellationToken cancellationToken) { return await _checkpointStore.ClaimOwnershipAsync(desiredOwnership, cancellationToken).ConfigureAwait(false); diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs index 30368a0ff6eb1..b5090123a5fe2 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs @@ -4,17 +4,13 @@ using System; using System.Reflection; using System.Threading.Tasks; -using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Core; -using Azure.Messaging.EventHubs.Primitives; using Azure.Messaging.EventHubs.Processor; using Azure.Storage.Blobs; -using Microsoft.Azure.WebJobs.Host; using Microsoft.Azure.WebJobs.Host.Bindings; using Microsoft.Azure.WebJobs.Host.Listeners; using Microsoft.Azure.WebJobs.Host.Triggers; using Microsoft.Azure.WebJobs.Logging; -using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -22,23 +18,20 @@ namespace Microsoft.Azure.WebJobs.EventHubs { internal class EventHubTriggerAttributeBindingProvider : ITriggerBindingProvider { - private readonly INameResolver _nameResolver; private readonly ILogger _logger; - private readonly IConfiguration _config; private readonly IOptions _options; + private readonly EventHubClientFactory _clientFactory; private readonly IConverterManager _converterManager; public EventHubTriggerAttributeBindingProvider( - IConfiguration configuration, - INameResolver nameResolver, IConverterManager converterManager, IOptions options, - ILoggerFactory loggerFactory) + ILoggerFactory loggerFactory, + EventHubClientFactory clientFactory) { - _config = configuration; - _nameResolver = nameResolver; _converterManager = converterManager; _options = options; + _clientFactory = clientFactory; _logger = loggerFactory?.CreateLogger(LogCategories.CreateTriggerCategory("EventHub")); } @@ -58,26 +51,11 @@ public Task TryCreateAsync(TriggerBindingProviderContext contex return Task.FromResult(null); } - string resolvedEventHubName = _nameResolver.ResolveWholeString(attribute.EventHubName); - - string consumerGroup = attribute.ConsumerGroup ?? EventHubConsumerClient.DefaultConsumerGroupName; - string resolvedConsumerGroup = _nameResolver.ResolveWholeString(consumerGroup); - - if (!string.IsNullOrWhiteSpace(attribute.Connection)) - { - var connection = _nameResolver.ResolveWholeString(attribute.Connection); - var connectionString = _config.GetConnectionStringOrSetting(connection); - _options.Value.AddReceiver(resolvedEventHubName, connectionString); - } - - var eventHostListener = _options.Value.GetEventProcessorHost(resolvedEventHubName, resolvedConsumerGroup); - var checkpointStoreConnectionString = _options.Value.GetCheckpointStoreConnectionString(_config, resolvedEventHubName); - Func> createListener = (factoryContext, singleDispatch) => { var checkpointStore = new BlobsCheckpointStore( - new BlobContainerClient(checkpointStoreConnectionString, _options.Value.LeaseContainerName), + _clientFactory.GetCheckpointStoreClient(attribute.EventHubName), _options.Value.EventProcessorOptions.RetryOptions.ToRetryPolicy(), factoryContext.Descriptor.Id, _logger); @@ -85,9 +63,9 @@ public Task TryCreateAsync(TriggerBindingProviderContext contex IListener listener = new EventHubListener( factoryContext.Descriptor.Id, factoryContext.Executor, - eventHostListener, + _clientFactory.GetEventProcessorHost(attribute.EventHubName, attribute.Connection, attribute.ConsumerGroup), singleDispatch, - () => _options.Value.GetEventHubConsumerClient(resolvedEventHubName, consumerGroup), + _clientFactory.GetEventHubConsumerClient(attribute.EventHubName, attribute.Connection, attribute.ConsumerGroup), checkpointStore, _options.Value, _logger); diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerInput.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerInput.cs index 4b6348b2b2fd2..115e0c07a03c6 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerInput.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerInput.cs @@ -2,11 +2,9 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using Azure.Messaging.EventHubs; -using Azure.Messaging.EventHubs.Consumer; using System.Collections.Generic; using System.Globalization; using Azure.Messaging.EventHubs.Primitives; -using Microsoft.Azure.WebJobs.EventHubs.Processor; namespace Microsoft.Azure.WebJobs.EventHubs { diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/BlobsCheckpointStoreTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/BlobsCheckpointStoreTests.cs index 74d716b27aa4c..acb99c636e4eb 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/BlobsCheckpointStoreTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/BlobsCheckpointStoreTests.cs @@ -15,6 +15,7 @@ using Microsoft.Azure.WebJobs.Host.TestCommon; using Moq; using NUnit.Framework; +using LogLevel = Microsoft.Extensions.Logging.LogLevel; namespace Microsoft.Azure.WebJobs.EventHubs.UnitTests { @@ -42,7 +43,7 @@ public void ListCheckpointsAsync_LogsOnRequestErrors() Assert.ThrowsAsync(async () => await store.ListCheckpointsAsync(_namespace, _eventHubName, _consumerGroup, CancellationToken.None)); - var warning = testLoggerProvider.GetAllLogMessages().Single(p => p.Level == Extensions.Logging.LogLevel.Warning); + var warning = testLoggerProvider.GetAllLogMessages().Single(p => p.Level == LogLevel.Warning); var expectedWarning = "Function 'EventHubsTriggerFunction': An exception occurred when listing checkpoints for " + "FullyQualifiedNamespace: 'TestNamespace'; EventHubName: 'TestEventHubName'; ConsumerGroup: 'TestConsumerGroup'."; Assert.AreEqual(expectedWarning, warning.FormattedMessage); @@ -72,7 +73,7 @@ public async Task ListCheckpointsAsync_LogsOnInvalidCheckpoints() await store.ListCheckpointsAsync(_namespace, _eventHubName, _consumerGroup, CancellationToken.None); - var warning = testLoggerProvider.GetAllLogMessages().Single(p => p.Level == Extensions.Logging.LogLevel.Warning); + var warning = testLoggerProvider.GetAllLogMessages().Single(p => p.Level == LogLevel.Warning); var expectedWarning = "Function 'EventHubsTriggerFunction': An invalid checkpoint was found for partition: '0' of " + "FullyQualifiedNamespace: 'TestNamespace'; EventHubName: 'TestEventHubName'; ConsumerGroup: 'TestConsumerGroup'. " + "This checkpoint is not valid and will be ignored."; diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs index a2df425094c55..06f606252e42e 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs @@ -72,17 +72,17 @@ await Task.WhenAll [Test] public async Task EventHub_PocoBinding() { - var tuple = BuildHost(); - using (var host = tuple.Item1) + var (jobHost, host) = BuildHost(); + using (jobHost) { var method = typeof(EventHubTestBindToPocoJobs).GetMethod(nameof(EventHubTestBindToPocoJobs.SendEvent_TestHub), BindingFlags.Static | BindingFlags.Public); - await host.CallAsync(method, new { input = "{ Name: 'foo', Value: '" + _testId +"' }" }); + await jobHost.CallAsync(method, new { input = "{ Name: 'foo', Value: '" + _testId +"' }" }); bool result = _eventWait.WaitOne(Timeout); Assert.True(result); } - var logs = tuple.Item2.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage); + var logs = host.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage); CollectionAssert.Contains(logs, $"PocoValues(foo,{_testId})"); } @@ -90,16 +90,16 @@ public async Task EventHub_PocoBinding() [Test] public async Task EventHub_StringBinding() { - var tuple = BuildHost(); - using (var host = tuple.Item1) + var (jobHost, host) = BuildHost(); + using (jobHost) { var method = typeof(EventHubTestBindToStringJobs).GetMethod(nameof(EventHubTestBindToStringJobs.SendEvent_TestHub), BindingFlags.Static | BindingFlags.Public); - await host.CallAsync(method, new { input = _testId }); + await jobHost.CallAsync(method, new { input = _testId }); bool result = _eventWait.WaitOne(Timeout); Assert.True(result); - var logs = tuple.Item2.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage); + var logs = host.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage); CollectionAssert.Contains(logs, $"Input({_testId})"); } @@ -108,17 +108,17 @@ public async Task EventHub_StringBinding() [Test] public async Task EventHub_SingleDispatch() { - Tuple tuple = BuildHost(); - using (var host = tuple.Item1) + var (jobHost, host) = BuildHost(); + using (jobHost) { var method = typeof(EventHubTestSingleDispatchJobs).GetMethod(nameof(EventHubTestSingleDispatchJobs.SendEvent_TestHub), BindingFlags.Static | BindingFlags.Public); - await host.CallAsync(method, new { input = _testId }); + await jobHost.CallAsync(method, new { input = _testId }); bool result = _eventWait.WaitOne(Timeout); Assert.True(result); } - IEnumerable logMessages = tuple.Item2.GetTestLoggerProvider() + IEnumerable logMessages = host.GetTestLoggerProvider() .GetAllLogMessages(); Assert.AreEqual(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) @@ -135,23 +135,72 @@ public async Task EventHub_SingleDispatch() && x.FormattedMessage.Contains("Sending events to EventHub")).Count() > 0); } + [Test] + public async Task CanSendAndReceive_ConnectionStringUsingAddMethods() + { + await AssertCanSendReceiveMessage(host => + host.ConfigureServices(services => + services.Configure(options => + { + options.AddSender(_eventHubScope.EventHubName, EventHubsTestEnvironment.Instance.EventHubsConnectionString); + options.AddReceiver(_eventHubScope.EventHubName, EventHubsTestEnvironment.Instance.EventHubsConnectionString); + }))); + } + + [Test] + public async Task CanSendAndReceive_ConnectionStringInConfiguration() + { + await AssertCanSendReceiveMessage(host => + host.ConfigureAppConfiguration(configurationBuilder => + configurationBuilder.AddInMemoryCollection(new Dictionary() + { + {"TestConnection", EventHubsTestEnvironment.Instance.EventHubsConnectionString} + }))); + } + + [Test] + public async Task CanSendAndReceive_TokenCredentialInConfiguration() + { + await AssertCanSendReceiveMessage(host => + host.ConfigureAppConfiguration(configurationBuilder => + configurationBuilder.AddInMemoryCollection(new Dictionary() + { + {"TestConnection:fullyQualifiedNamespace", EventHubsTestEnvironment.Instance.FullyQualifiedNamespace}, + {"TestConnection:clientId", EventHubsTestEnvironment.Instance.ClientId}, + {"TestConnection:clientSecret", EventHubsTestEnvironment.Instance.ClientSecret}, + {"TestConnection:tenantId", EventHubsTestEnvironment.Instance.TenantId}, + }))); + } + + public async Task AssertCanSendReceiveMessage(Action hostConfiguration) + { + var (jobHost, host) = BuildHost(hostConfiguration); + using (jobHost) + { + await jobHost.CallAsync(nameof(EventHubTestSingleDispatchJobWithConnection.SendEvent_TestHub), new { input = _testId }); + + bool result = _eventWait.WaitOne(Timeout); + Assert.True(result); + } + } + [Test] public async Task EventHub_MultipleDispatch() { - Tuple tuple = BuildHost(); - using (var host = tuple.Item1) + var (jobHost, host) = BuildHost(); + using (jobHost) { // send some events BEFORE starting the host, to ensure // the events are received in batch var method = typeof(EventHubTestMultipleDispatchJobs).GetMethod("SendEvents_TestHub", BindingFlags.Static | BindingFlags.Public); int numEvents = 5; - await host.CallAsync(method, new { numEvents = numEvents, input = _testId }); + await jobHost.CallAsync(method, new { numEvents = numEvents, input = _testId }); bool result = _eventWait.WaitOne(Timeout); Assert.True(result); } - IEnumerable logMessages = tuple.Item2.GetTestLoggerProvider() + IEnumerable logMessages = host.GetTestLoggerProvider() .GetAllLogMessages(); Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) @@ -171,12 +220,12 @@ public async Task EventHub_MultipleDispatch() [Test] public async Task EventHub_PartitionKey() { - Tuple tuple = BuildHost(); - using (var host = tuple.Item1) + var (jobHost, host) = BuildHost(); + using (jobHost) { var method = typeof(EventHubPartitionKeyTestJobs).GetMethod("SendEvents_TestHub", BindingFlags.Static | BindingFlags.Public); _eventWait = new ManualResetEvent(initialState: false); - await host.CallAsync(method, new { input = _testId }); + await jobHost.CallAsync(method, new { input = _testId }); bool result = _eventWait.WaitOne(Timeout); @@ -326,17 +375,53 @@ public static void ProcessMultiplePartitionEvents([EventHubTrigger(TestHubName)] } } - private Tuple BuildHost() + public class EventHubTestSingleDispatchJobWithConnection + { + public static void SendEvent_TestHub(string input, [EventHub(TestHubName, Connection = "TestConnection")] out EventData evt) + { + evt = new EventData(Encoding.UTF8.GetBytes(input)); + evt.Properties.Add("TestProp1", "value1"); + evt.Properties.Add("TestProp2", "value2"); + } + + public static void ProcessSingleEvent([EventHubTrigger(TestHubName, Connection = "TestConnection")] string evt, DateTime enqueuedTimeUtc, IDictionary properties) + { + // filter for the ID the current test is using + if (evt == _testId) + { + Assert.True((DateTime.Now - enqueuedTimeUtc).TotalSeconds < 30); + + Assert.AreEqual("value1", properties["TestProp1"]); + Assert.AreEqual("value2", properties["TestProp2"]); + + _eventWait.Set(); + } + } + } + + private (JobHost, IHost) BuildHost(Action configurationDelegate = null) { var eventHubName = _eventHubScope.EventHubName; - JobHost jobHost; - IHost host = new HostBuilder() + + configurationDelegate ??= builder => + { + builder.ConfigureServices(services => + { + services.Configure(options => + { + options.AddSender(eventHubName, EventHubsTestEnvironment.Instance.EventHubsConnectionString); + options.AddReceiver(eventHubName, EventHubsTestEnvironment.Instance.EventHubsConnectionString); + }); + }); + }; + + var hostBuilder = new HostBuilder() .ConfigureAppConfiguration(builder => { builder.AddInMemoryCollection(new Dictionary() { - { "webjobstesthub", eventHubName }, - { "AzureWebJobsStorage", StorageTestEnvironment.Instance.StorageConnectionString } + {"webjobstesthub", eventHubName}, + {"AzureWebJobsStorage", StorageTestEnvironment.Instance.StorageConnectionString} }); }) .ConfigureServices(services => @@ -352,23 +437,21 @@ private Tuple BuildHost() { b.AddEventHubs(options => { - // TODO: alternative? - //options.EventProcessorOptions.EnableReceiverRuntimeMetric = true; - var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(eventHubName); - options.AddSender(eventHubName, connectionString); - options.AddReceiver(eventHubName, connectionString); + options.EventProcessorOptions.TrackLastEnqueuedEventProperties = true; }); }) .ConfigureLogging(b => { b.SetMinimumLevel(LogLevel.Debug); - }) - .Build(); + }); + + configurationDelegate(hostBuilder); + var host = hostBuilder.Build(); - jobHost = host.GetJobHost(); + var jobHost = host.GetJobHost(); jobHost.StartAsync().GetAwaiter().GetResult(); - return new Tuple(jobHost, host); + return (jobHost, host); } public class TestPoco { diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs index 6920dbd99450a..1796e7c5f62c1 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs @@ -4,14 +4,11 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Reflection; using System.Threading; using System.Threading.Tasks; using Azure.Messaging.EventHubs; -using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Primitives; using Azure.Messaging.EventHubs.Processor; -using Azure.Storage.Blobs; using Microsoft.Azure.WebJobs.EventHubs.Listeners; using Microsoft.Azure.WebJobs.EventHubs.Processor; using Microsoft.Azure.WebJobs.Host.Executors; @@ -229,7 +226,7 @@ public void GetMonitor_ReturnsExpectedValue() Mock.Of(), host, false, - () => consumerClientMock.Object, + consumerClientMock.Object, Mock.Of(), new EventHubOptions(), testLogger); diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs index 33aa82cbe9c69..b663a9428c8f9 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs @@ -7,8 +7,6 @@ using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Primitives; -using Azure.Messaging.EventHubs.Processor; -using Azure.Storage.Blobs; using Microsoft.Azure.WebJobs.EventHubs.Processor; using Microsoft.Azure.WebJobs.Host; using Microsoft.Azure.WebJobs.Host.TestCommon; @@ -16,7 +14,6 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Options; -using Moq; using NUnit.Framework; namespace Microsoft.Azure.WebJobs.EventHubs.UnitTests @@ -160,29 +157,6 @@ public void TriggerStrategy() Assert.Null(contract["partitioncontext"]); // case insensitive } - // Validate that if connection string has EntityPath, that takes precedence over the parameter. - [TestCase("k1", "Endpoint=sb://test89123-ns-x.servicebus.windows.net/;SharedAccessKeyName=ReceiveRule;SharedAccessKey=secretkey")] - [TestCase("path2", "Endpoint=sb://test89123-ns-x.servicebus.windows.net/;SharedAccessKeyName=ReceiveRule;SharedAccessKey=secretkey;EntityPath=path2")] - public void EntityPathInConnectionString(string expectedPathName, string connectionString) - { - EventHubOptions options = new EventHubOptions(); - - // Test sender - options.AddSender("k1", connectionString); - var client = options.GetEventHubProducerClient("k1", null); - Assert.AreEqual(expectedPathName, client.EventHubName); - } - - // Validate that if connection string has EntityPath, that takes precedence over the parameter. - [TestCase("k1", "Endpoint=sb://test89123-ns-x.servicebus.windows.net/;SharedAccessKeyName=ReceiveRule;SharedAccessKey=secretkey")] - [TestCase("path2", "Endpoint=sb://test89123-ns-x.servicebus.windows.net/;SharedAccessKeyName=ReceiveRule;SharedAccessKey=secretkey;EntityPath=path2")] - public void GetEventHubClient_AddsConnection(string expectedPathName, string connectionString) - { - EventHubOptions options = new EventHubOptions(); - var client = options.GetEventHubProducerClient("k1", connectionString); - Assert.AreEqual(expectedPathName, client.EventHubName); - } - [TestCase("e", "n1", "n1/e/")] [TestCase("e--1", "host_.path.foo", "host_.path.foo/e--1/")] [TestCase("Ab", "Cd", "cd/ab/")] diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsClientFactoryTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsClientFactoryTests.cs new file mode 100644 index 0000000000000..0e1dc9d55ced2 --- /dev/null +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsClientFactoryTests.cs @@ -0,0 +1,153 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using Azure.Identity; +using Microsoft.Extensions.Azure; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Options; +using Moq; +using NUnit.Framework; + +namespace Microsoft.Azure.WebJobs.EventHubs.UnitTests +{ + public class EventHubsClientFactoryTests + { + private const string ConnectionString = "Endpoint=sb://test89123-ns-x.servicebus.windows.net/;SharedAccessKeyName=ReceiveRule;SharedAccessKey=secretkey"; + private const string ConnectionStringWithEventHub = "Endpoint=sb://test89123-ns-x.servicebus.windows.net/;SharedAccessKeyName=ReceiveRule;SharedAccessKey=secretkey;EntityPath=path2"; + + // Validate that if connection string has EntityPath, that takes precedence over the parameter. + [TestCase("k1", ConnectionString)] + [TestCase("path2", ConnectionStringWithEventHub)] + public void EntityPathInConnectionString(string expectedPathName, string connectionString) + { + EventHubOptions options = new EventHubOptions(); + + // Test sender + options.AddSender("k1", connectionString); + + var configuration = CreateConfiguration(); + var factory = new EventHubClientFactory(configuration, Mock.Of(), Options.Create(options), new DefaultNameResolver(configuration)); + + var client = factory.GetEventHubProducerClient("k1", null); + Assert.AreEqual(expectedPathName, client.EventHubName); + } + + // Validate that if connection string has EntityPath, that takes precedence over the parameter. + [TestCase("k1", ConnectionString)] + [TestCase("path2", ConnectionStringWithEventHub)] + public void GetEventHubClient_AddsConnection(string expectedPathName, string connectionString) + { + EventHubOptions options = new EventHubOptions(); + var configuration = CreateConfiguration(new KeyValuePair("connection", connectionString)); + + var factory = new EventHubClientFactory(configuration, Mock.Of(), Options.Create(options), new DefaultNameResolver(configuration)); + + var client = factory.GetEventHubProducerClient("k1", "connection"); + Assert.AreEqual(expectedPathName, client.EventHubName); + } + + [Test] + public void CreatesClientsFromConfigWithConnectionString() + { + EventHubOptions options = new EventHubOptions(); + var configuration = CreateConfiguration(new KeyValuePair("connection", ConnectionString)); + + var factory = new EventHubClientFactory(configuration, Mock.Of(), Options.Create(options), new DefaultNameResolver(configuration)); + var producer = factory.GetEventHubProducerClient("k1", "connection"); + var consumer = factory.GetEventHubConsumerClient("k1", "connection", null); + var host = factory.GetEventProcessorHost("k1", "connection", null); + + Assert.AreEqual("k1", producer.EventHubName); + Assert.AreEqual("k1", consumer.EventHubName); + Assert.AreEqual("k1", host.EventHubName); + + Assert.AreEqual("test89123-ns-x.servicebus.windows.net", producer.FullyQualifiedNamespace); + Assert.AreEqual("test89123-ns-x.servicebus.windows.net", consumer.FullyQualifiedNamespace); + Assert.AreEqual("test89123-ns-x.servicebus.windows.net", host.FullyQualifiedNamespace); + } + + [Test] + public void CreatesClientsFromConfigWithFullyQualifiedNamespace() + { + EventHubOptions options = new EventHubOptions(); + var componentFactoryMock = new Mock(); + componentFactoryMock.Setup(c => c.CreateTokenCredential( + It.Is(c=> c["fullyQualifiedNamespace"] != null))) + .Returns(new DefaultAzureCredential()); + + var configuration = CreateConfiguration(new KeyValuePair("connection:fullyQualifiedNamespace", "test89123-ns-x.servicebus.windows.net")); + + var factory = new EventHubClientFactory(configuration, componentFactoryMock.Object, Options.Create(options), new DefaultNameResolver(configuration)); + var producer = factory.GetEventHubProducerClient("k1", "connection"); + var consumer = factory.GetEventHubConsumerClient("k1", "connection", null); + var host = factory.GetEventProcessorHost("k1", "connection", null); + + Assert.AreEqual("k1", producer.EventHubName); + Assert.AreEqual("k1", consumer.EventHubName); + Assert.AreEqual("k1", host.EventHubName); + + Assert.AreEqual("test89123-ns-x.servicebus.windows.net", producer.FullyQualifiedNamespace); + Assert.AreEqual("test89123-ns-x.servicebus.windows.net", consumer.FullyQualifiedNamespace); + Assert.AreEqual("test89123-ns-x.servicebus.windows.net", host.FullyQualifiedNamespace); + } + + [Test] + public void ConsumersAndProducersAreCached() + { + EventHubOptions options = new EventHubOptions(); + var configuration = CreateConfiguration(new KeyValuePair("connection", ConnectionString)); + + var factory = new EventHubClientFactory(configuration, Mock.Of(), Options.Create(options), new DefaultNameResolver(configuration)); + var producer = factory.GetEventHubProducerClient("k1", "connection"); + var consumer = factory.GetEventHubConsumerClient("k1", "connection", null); + var producer2 = factory.GetEventHubProducerClient("k1", "connection"); + var consumer2 = factory.GetEventHubConsumerClient("k1", "connection", null); + + Assert.AreSame(producer, producer2); + Assert.AreSame(consumer, consumer2); + } + + [Test] + public void UsesDefaultConnectionToStorageAccount() + { + EventHubOptions options = new EventHubOptions(); + + // Test sender + options.AddReceiver("k1", ConnectionString); + + var configuration = CreateConfiguration(new KeyValuePair("AzureWebJobsStorage", "UseDevelopmentStorage=true")); + + var factory = new EventHubClientFactory(configuration, Mock.Of(), Options.Create(options), new DefaultNameResolver(configuration)); + + var client = factory.GetCheckpointStoreClient("k1"); + Assert.AreEqual("azure-webjobs-eventhub", client.Name); + Assert.AreEqual("devstoreaccount1", client.AccountName); + } + + [Test] + public void UsesRegisteredConnectionToStorageAccount() + { + EventHubOptions options = new EventHubOptions(); + + // Test sender + options.AddReceiver("k1", + ConnectionString, + "BlobEndpoint=http://blobs/;AccountName=test;AccountKey=abc2564="); + + var configuration = CreateConfiguration(); + + var factory = new EventHubClientFactory(configuration, Mock.Of(), Options.Create(options), new DefaultNameResolver(configuration)); + + var client = factory.GetCheckpointStoreClient("k1"); + Assert.AreEqual("azure-webjobs-eventhub", client.Name); + Assert.AreEqual("http://blobs/azure-webjobs-eventhub", client.Uri.ToString()); + } + + private IConfiguration CreateConfiguration(params KeyValuePair[] data) + { + return new ConfigurationBuilder().AddInMemoryCollection(data).Build(); + } + } +} \ No newline at end of file diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsScaleMonitorTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsScaleMonitorTests.cs index ab41a36ab2f0c..c4f58180367d3 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsScaleMonitorTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsScaleMonitorTests.cs @@ -243,10 +243,10 @@ public void GetScaleStatus_InstancesPerPartitionThresholdExceeded_ReturnsVote_Sc var logs = _loggerProvider.GetAllLogMessages().ToArray(); var log = logs[0]; - Assert.AreEqual(Extensions.Logging.LogLevel.Information, log.Level); + Assert.AreEqual(LogLevel.Information, log.Level); Assert.AreEqual("WorkerCount (17) > PartitionCount (16).", log.FormattedMessage); log = logs[1]; - Assert.AreEqual(Extensions.Logging.LogLevel.Information, log.Level); + Assert.AreEqual(LogLevel.Information, log.Level); Assert.AreEqual($"Number of instances (17) is too high relative to number of partitions (16) for EventHubs entity ({_eventHubName}, {_consumerGroup}).", log.FormattedMessage); // verify again with a non generic context instance @@ -283,10 +283,10 @@ public void GetScaleStatus_EventsPerWorkerThresholdExceeded_ReturnsVote_ScaleOut var logs = _loggerProvider.GetAllLogMessages().ToArray(); var log = logs[0]; - Assert.AreEqual(Extensions.Logging.LogLevel.Information, log.Level); + Assert.AreEqual(LogLevel.Information, log.Level); Assert.AreEqual("EventCount (2900) > WorkerCount (1) * 1,000.", log.FormattedMessage); log = logs[1]; - Assert.AreEqual(Extensions.Logging.LogLevel.Information, log.Level); + Assert.AreEqual(LogLevel.Information, log.Level); Assert.AreEqual($"Event count (2900) for EventHubs entity ({_eventHubName}, {_consumerGroup}) " + $"is too high relative to the number of instances (1).", log.FormattedMessage); @@ -323,7 +323,7 @@ public void GetScaleStatus_EventHubIdle_ReturnsVote_ScaleIn() var logs = _loggerProvider.GetAllLogMessages().ToArray(); var log = logs[0]; - Assert.AreEqual(Extensions.Logging.LogLevel.Information, log.Level); + Assert.AreEqual(LogLevel.Information, log.Level); Assert.AreEqual($"'{_eventHubName}' is idle.", log.FormattedMessage); } @@ -350,7 +350,7 @@ public void GetScaleStatus_EventCountIncreasing_ReturnsVote_ScaleOut() var logs = _loggerProvider.GetAllLogMessages().ToArray(); var log = logs[0]; - Assert.AreEqual(Extensions.Logging.LogLevel.Information, log.Level); + Assert.AreEqual(LogLevel.Information, log.Level); Assert.AreEqual($"Event count is increasing for '{_eventHubName}'.", log.FormattedMessage); } @@ -377,7 +377,7 @@ public void GetScaleStatus_EventCountDecreasing_ReturnsVote_ScaleOut() var logs = _loggerProvider.GetAllLogMessages().ToArray(); var log = logs[0]; - Assert.AreEqual(Extensions.Logging.LogLevel.Information, log.Level); + Assert.AreEqual(LogLevel.Information, log.Level); Assert.AreEqual($"Event count is decreasing for '{_eventHubName}'.", log.FormattedMessage); } @@ -404,7 +404,7 @@ public void GetScaleStatus_EventHubSteady_ReturnsVote_ScaleIn() var logs = _loggerProvider.GetAllLogMessages().ToArray(); var log = logs[0]; - Assert.AreEqual(Extensions.Logging.LogLevel.Information, log.Level); + Assert.AreEqual(LogLevel.Information, log.Level); Assert.AreEqual($"EventHubs entity '{_eventHubName}' is steady.", log.FormattedMessage); } [Test]