From 4394fc9f72859b3c38be18fa8244afe123a9deb6 Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Mon, 16 Nov 2020 14:59:36 -0800 Subject: [PATCH 1/8] Enable WebJobs EventHubs tests --- ...oft.Azure.WebJobs.Extensions.EventHubs.sln | 14 ++++++++ .../src/Config/EventHubOptions.cs | 1 + .../src/Listeners/EventHubListener.cs | 23 +++++-------- .../src/Processor/EventProcessorHost.cs | 12 ++++--- ...EventHubTriggerAttributeBindingProvider.cs | 6 ---- .../EventHubTriggerBindingStrategy.cs | 4 +-- .../tests/EventHubEndToEndTests.cs | 32 +++++++------------ .../tests/EventHubListenerTests.cs | 14 ++++---- .../tests/EventHubsTestEnvironment.cs | 20 ++++++++++++ ....WebJobs.Extensions.EventHubs.Tests.csproj | 1 + sdk/eventhub/test-resources.json | 26 +++++++++++++-- 11 files changed, 96 insertions(+), 57 deletions(-) create mode 100644 sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsTestEnvironment.cs diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/Microsoft.Azure.WebJobs.Extensions.EventHubs.sln b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/Microsoft.Azure.WebJobs.Extensions.EventHubs.sln index a2c5395c24617..e9a30dd211f3c 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/Microsoft.Azure.WebJobs.Extensions.EventHubs.sln +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/Microsoft.Azure.WebJobs.Extensions.EventHubs.sln @@ -7,6 +7,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Azure.WebJobs.Ext EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests", "tests\Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj", "{91E0D968-2082-4959-A294-6F1B790ECECF}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Azure.Core.TestFramework", "..\..\core\Azure.Core.TestFramework\src\Azure.Core.TestFramework.csproj", "{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -44,5 +46,17 @@ Global {91E0D968-2082-4959-A294-6F1B790ECECF}.Release|x64.Build.0 = Release|Any CPU {91E0D968-2082-4959-A294-6F1B790ECECF}.Release|x86.ActiveCfg = Release|Any CPU {91E0D968-2082-4959-A294-6F1B790ECECF}.Release|x86.Build.0 = Release|Any CPU + {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|x64.ActiveCfg = Debug|Any CPU + {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|x64.Build.0 = Debug|Any CPU + {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|x86.ActiveCfg = Debug|Any CPU + {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|x86.Build.0 = Debug|Any CPU + {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|Any CPU.Build.0 = Release|Any CPU + {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|x64.ActiveCfg = Release|Any CPU + {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|x64.Build.0 = Release|Any CPU + {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|x86.ActiveCfg = Release|Any CPU + {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection EndGlobal diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs index c55e9e26a16ce..8658c3a59d9ad 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs @@ -271,6 +271,7 @@ internal EventProcessorHost GetEventProcessorHost(IConfiguration config, string // Use blob prefix support available in EPH starting in 2.2.6 EventProcessorHost host = new EventProcessorHost( + eventHubName: eventHubName, eventHubPath: actualPath, consumerGroupName: consumerGroup, eventHubConnectionString: sb.ToString(), diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs index f32192bc0639f..ea58a8cae59a2 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs @@ -26,11 +26,6 @@ namespace Microsoft.Azure.WebJobs.EventHubs internal sealed class EventHubListener : IListener, IEventProcessorFactory, IScaleMonitorProvider { private static readonly Dictionary EmptyScope = new Dictionary(); - private readonly string _functionId; - private readonly string _eventHubName; - private readonly string _consumerGroup; - private readonly string _connectionString; - private readonly string _storageConnectionString; private readonly ITriggeredFunctionExecutor _executor; private readonly EventProcessorHost _eventProcessorHost; private readonly bool _singleDispatch; @@ -42,10 +37,6 @@ internal sealed class EventHubListener : IListener, IEventProcessorFactory, ISca public EventHubListener( string functionId, - string eventHubName, - string consumerGroup, - string connectionString, - string storageConnectionString, ITriggeredFunctionExecutor executor, EventProcessorHost eventProcessorHost, bool singleDispatch, @@ -53,17 +44,19 @@ public EventHubListener( ILogger logger, BlobContainerClient blobContainer = null) { - _functionId = functionId; - _eventHubName = eventHubName; - _consumerGroup = consumerGroup; - _connectionString = connectionString; - _storageConnectionString = storageConnectionString; _executor = executor; _eventProcessorHost = eventProcessorHost; _singleDispatch = singleDispatch; _options = options; _logger = logger; - _scaleMonitor = new Lazy(() => new EventHubsScaleMonitor(_functionId, _eventHubName, _consumerGroup, _connectionString, _storageConnectionString, _logger, blobContainer)); + _scaleMonitor = new Lazy(() => new EventHubsScaleMonitor( + functionId, + eventProcessorHost.EventHubName, + eventProcessorHost.ConsumerGroupName, + eventProcessorHost.EventHubConnectionString, + eventProcessorHost.StorageConnectionString, + _logger, + blobContainer)); } void IListener.Cancel() diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs index 21a09057aa873..d4d6e995e2e02 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs @@ -23,17 +23,19 @@ namespace Microsoft.Azure.WebJobs.EventHubs.Processor { internal class EventProcessorHost { - private string EventHubPath { get; } - private string ConsumerGroupName { get; } - private string EventHubConnectionString { get; } - private string StorageConnectionString { get; } + public string EventHubName { get; } + public string EventHubPath { get; } + public string ConsumerGroupName { get; } + public string EventHubConnectionString { get; } + public string StorageConnectionString { get; } private string LeaseContainerName { get; } private string LegacyCheckpointStorageBlobPrefix { get; } private Processor CurrentProcessor { get; set; } private Action ExceptionHandler { get; } - public EventProcessorHost(string eventHubPath, string consumerGroupName, string eventHubConnectionString, string storageConnectionString, string leaseContainerName, string legacyCheckpointStorageBlobPrefix, Action exceptionHandler) + public EventProcessorHost(string eventHubName, string eventHubPath, string consumerGroupName, string eventHubConnectionString, string storageConnectionString, string leaseContainerName, string legacyCheckpointStorageBlobPrefix, Action exceptionHandler) { + EventHubName = eventHubName; EventHubPath = eventHubPath; ConsumerGroupName = consumerGroupName; EventHubConnectionString = eventHubConnectionString; diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs index 66c2ceda6398a..0bb9e19c2995a 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs @@ -69,17 +69,11 @@ public Task TryCreateAsync(TriggerBindingProviderContext contex var eventHostListener = _options.Value.GetEventProcessorHost(_config, resolvedEventHubName, resolvedConsumerGroup); - string storageConnectionString = _config.GetWebJobsConnectionString(ConnectionStringNames.Storage); - Func> createListener = (factoryContext, singleDispatch) => { IListener listener = new EventHubListener( factoryContext.Descriptor.Id, - resolvedEventHubName, - resolvedConsumerGroup, - connectionString, - storageConnectionString, factoryContext.Executor, eventHostListener, singleDispatch, diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerBindingStrategy.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerBindingStrategy.cs index c32242f8e397e..385f0c2a1a50a 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerBindingStrategy.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerBindingStrategy.cs @@ -98,7 +98,7 @@ internal static void AddBindingData(Dictionary bindingData, Even var partitionKeys = new string[length]; var offsets = new string[length]; var sequenceNumbers = new long[length]; - var enqueuedTimesUtc = new DateTimeOffset[length]; + var enqueuedTimesUtc = new DateTime[length]; var properties = new IDictionary[length]; var systemProperties = new IDictionary[length]; @@ -114,7 +114,7 @@ internal static void AddBindingData(Dictionary bindingData, Even partitionKeys[i] = events[i].PartitionKey; offsets[i] = events[i].Offset.ToString(CultureInfo.InvariantCulture); sequenceNumbers[i] = events[i].SequenceNumber; - enqueuedTimesUtc[i] = events[i].EnqueuedTime; + enqueuedTimesUtc[i] = events[i].EnqueuedTime.DateTime; properties[i] = events[i].Properties; systemProperties[i] = GetSystemPropertiesForBinding(events[i]); } diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs index f2dc0a5ffb7da..1ad38f5c96b81 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs @@ -8,6 +8,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; +using Azure.Core.TestFramework; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Producer; using Microsoft.Azure.WebJobs.Host.TestCommon; @@ -18,8 +19,7 @@ namespace Microsoft.Azure.WebJobs.Host.EndToEndTests { - [Category("Live")] - public class EventHubEndToEndTests + public class EventHubEndToEndTests: LiveTestBase { private const string TestHubName = "webjobstesthub"; private const int Timeout = 30000; @@ -35,7 +35,6 @@ public EventHubEndToEndTests() } [Test] - [Ignore("Failing test. Tracked by #16715")] public async Task EventHub_PocoBinding() { var tuple = BuildHost(); @@ -54,7 +53,6 @@ public async Task EventHub_PocoBinding() } [Test] - [Ignore("Failing test. Tracked by #16715")] public async Task EventHub_StringBinding() { var tuple = BuildHost(); @@ -73,7 +71,6 @@ public async Task EventHub_StringBinding() } [Test] - [Ignore("Failing test. Tracked by #16715")] public async Task EventHub_SingleDispatch() { Tuple tuple = BuildHost(); @@ -107,7 +104,6 @@ public async Task EventHub_SingleDispatch() } [Test] - [Ignore("Failing test. Tracked by #16715")] public async Task EventHub_MultipleDispatch() { Tuple tuple = BuildHost(); @@ -144,7 +140,6 @@ public async Task EventHub_MultipleDispatch() } [Test] - [Ignore("Failing test. Tracked by #16715")] public async Task EventHub_PartitionKey() { Tuple tuple = BuildHost(); @@ -304,26 +299,23 @@ public static void ProcessMultiplePartitionEvents([EventHubTrigger(TestHubName)] private Tuple BuildHost() { - JobHost jobHost = null; - - var config = new ConfigurationBuilder() - .AddEnvironmentVariables() - .AddTestSettings() - .Build(); - - const string connectionName = "AzureWebJobsTestHubConnection"; - string connection = config.GetConnectionStringOrSetting(connectionName); - Assert.True(!string.IsNullOrEmpty(connection), $"Required test connection string '{connectionName}' is missing."); - + JobHost jobHost; IHost host = new HostBuilder() + .ConfigureAppConfiguration(builder => + { + builder.AddInMemoryCollection(new Dictionary() + { + { "AzureWebJobsStorage", TestEnvironment.StorageAccountConnectionString } + }); + }) .ConfigureDefaultTestHost(b => { b.AddEventHubs(options => { // TODO: alternative? //options.EventProcessorOptions.EnableReceiverRuntimeMetric = true; - options.AddSender(TestHubName, connection); - options.AddReceiver(TestHubName, connection); + options.AddSender(TestHubName, TestEnvironment.EventHubsNamespaceConnectionString); + options.AddReceiver(TestHubName, TestEnvironment.EventHubsNamespaceConnectionString); }); }) .ConfigureLogging(b => diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs index 053849c675d23..75d169267c532 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs @@ -12,6 +12,7 @@ using Azure.Messaging.EventHubs.Processor; using Azure.Storage.Blobs; using Microsoft.Azure.WebJobs.EventHubs.Listeners; +using Microsoft.Azure.WebJobs.EventHubs.Processor; using Microsoft.Azure.WebJobs.Host.Executors; using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Azure.WebJobs.Host.TestCommon; @@ -191,16 +192,17 @@ public void GetMonitor_ReturnsExpectedValue() var functionId = "FunctionId"; var eventHubName = "EventHubName"; var consumerGroup = "ConsumerGroup"; - var storageUri = new Uri("https://eventhubsteststorageaccount.blob.core.windows.net/"); var testLogger = new TestLogger("Test"); + var host = new EventProcessorHost( + eventHubName, + null, + consumerGroup, + "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123=", + "DefaultEndpointsProtocol=https;AccountName=EventHubScaleMonitorFakeTestAccount;AccountKey=ABCDEFG;EndpointSuffix=core.windows.net", null, null, null); var listener = new EventHubListener( functionId, - eventHubName, - consumerGroup, - "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123=", - "DefaultEndpointsProtocol=https;AccountName=EventHubScaleMonitorFakeTestAccount;AccountKey=ABCDEFG;EndpointSuffix=core.windows.net", new Mock(MockBehavior.Strict).Object, - null, + host, false, new EventHubOptions(), testLogger, diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsTestEnvironment.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsTestEnvironment.cs new file mode 100644 index 0000000000000..18e9ec32fad0c --- /dev/null +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsTestEnvironment.cs @@ -0,0 +1,20 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Azure.Core.TestFramework; + +namespace Microsoft.Azure.WebJobs.Host.EndToEndTests +{ + public class EventHubsTestEnvironment: TestEnvironment + { + /// The Event Hubs namespace to use for the test run. + public string EventHubsNamespaceConnectionString => GetVariable("EVENTHUB_NAMESPACE_CONNECTION_STRING"); + + /// The environment variable value for the storage account connection string, lazily evaluated. + public string StorageAccountConnectionString => GetVariable("EVENTHUB_PROCESSOR_STORAGE_CONNECTION_STRING"); + + public EventHubsTestEnvironment() : base("eventhub") + { + } + } +} \ No newline at end of file diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj index f8bfb6cab68e1..68331576b46e4 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj @@ -18,6 +18,7 @@ + diff --git a/sdk/eventhub/test-resources.json b/sdk/eventhub/test-resources.json index 75620cc74761b..304ff431181f0 100755 --- a/sdk/eventhub/test-resources.json +++ b/sdk/eventhub/test-resources.json @@ -87,6 +87,15 @@ "tier": "Standard" } }, + { + "type": "Microsoft.EventHub/namespaces/eventhubs", + "apiVersion": "2017-04-01", + "name": "[concat(variables('eventHubsNamespace'), '/webjobstesthub')]", + "location": "[parameters('location')]", + "dependsOn": [ + "[resourceId('Microsoft.EventHub/namespaces', variables('eventHubsNamespace'))]" + ] + }, { "type": "Microsoft.Storage/storageAccounts", "apiVersion": "2019-04-01", @@ -175,12 +184,23 @@ "deleteRetentionPolicy": { "enabled": false } - } + }, + "resources": [ + { + "name": "azure-webjobs-eventhub", + "type": "containers", + "apiVersion": "2019-04-01", + "dependsOn": [ + "[resourceId('Microsoft.Storage/storageAccounts/blobServices', variables('storageAccount'), 'default')]" + ], + "properties": {} + } + ] }, { "type": "Microsoft.Authorization/roleAssignments", "apiVersion": "2019-04-01-preview", - "name": "[guid(resourceGroup().id, deployment().name, parameters('baseName'), variables('eventHubsDataOwnerRoleId'))]", + "name": "[guid(resourceGroup().id, deployment().name, parameters('baseName'), variables('eventHubsDataOwnerRoleId'), parameters('testApplicationOid'))]", "properties": { "roleDefinitionId": "[resourceId('Microsoft.Authorization/roleDefinitions', variables('eventHubsDataOwnerRoleId'))]", "principalId": "[parameters('testApplicationOid')]", @@ -190,7 +210,7 @@ { "type": "Microsoft.Authorization/roleAssignments", "apiVersion": "2019-04-01-preview", - "name": "[guid(resourceGroup().id, deployment().name, parameters('baseName'), variables('contributorRoleId'))]", + "name": "[guid(resourceGroup().id, deployment().name, parameters('baseName'), variables('contributorRoleId'), parameters('testApplicationOid'))]", "properties": { "roleDefinitionId": "[resourceId('Microsoft.Authorization/roleDefinitions', variables('contributorRoleId'))]", "principalId": "[parameters('testApplicationOid')]", From c083087111bf28a4b1a19bbe87a36344f2556bc6 Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Tue, 17 Nov 2020 10:00:16 -0800 Subject: [PATCH 2/8] progress? --- .../src/EventHubProducerClientImpl.cs | 3 +- .../src/Listeners/EventHubListener.cs | 2 +- ....Azure.WebJobs.Extensions.EventHubs.csproj | 7 ++ .../src/Processor/EventProcessorHost.cs | 83 +++++++++++-------- .../EventHubTriggerBindingStrategy.cs | 4 +- .../tests/EventHubEndToEndTests.cs | 2 + ....WebJobs.Extensions.EventHubs.Tests.csproj | 1 + 7 files changed, 65 insertions(+), 37 deletions(-) diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/EventHubProducerClientImpl.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/EventHubProducerClientImpl.cs index a7a08943e1fff..085e2c6da2874 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/EventHubProducerClientImpl.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/EventHubProducerClientImpl.cs @@ -24,7 +24,8 @@ public async Task CreateBatchAsync(CancellationToken cancellati public async Task SendAsync(IEventDataBatch batch, CancellationToken cancellationToken) { - await _client.SendAsync(((EventDataBatchImpl) batch).Batch, cancellationToken).ConfigureAwait(false); + var eventDataBatch = ((EventDataBatchImpl) batch).Batch; + await _client.SendAsync(eventDataBatch, cancellationToken).ConfigureAwait(false); } } } \ No newline at end of file diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs index ea58a8cae59a2..18510ae770353 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs @@ -10,7 +10,6 @@ using System.Threading; using System.Threading.Tasks; using Azure.Messaging.EventHubs; -using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Processor; using Azure.Storage.Blobs; using Microsoft.Azure.WebJobs.EventHubs.Listeners; @@ -209,6 +208,7 @@ public async Task ProcessEventsAsync(ProcessorPartitionContext context, IEnumera // code, and capture/log/persist failed events, since they won't be retried. if (messages.Any()) { + context.CheckpointEvent = messages.Last(); await CheckpointAsync(context).ConfigureAwait(false); } } diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj index a09729b209eb3..6a5dcb4552e36 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj @@ -14,4 +14,11 @@ + + + + + + + diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs index d4d6e995e2e02..278c3d034ed0f 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs @@ -17,7 +17,6 @@ using Azure.Messaging.EventHubs.Processor; using Azure.Storage.Blobs; using Azure.Storage.Blobs.Models; -using System.Text.Json.Serialization; namespace Microsoft.Azure.WebJobs.EventHubs.Processor { @@ -52,7 +51,16 @@ public async Task RegisterEventProcessorFactoryAsync(IEventProcessorFactory fact throw new InvalidOperationException("Processor has already been started"); } - CurrentProcessor = new Processor(maxBatchSize, ConsumerGroupName, EventHubConnectionString, LegacyCheckpointStorageBlobPrefix, EventHubPath, options, factory, invokeProcessorAfterReceiveTimeout, ExceptionHandler, new BlobContainerClient(StorageConnectionString, LeaseContainerName)); + CurrentProcessor = new Processor(maxBatchSize, + ConsumerGroupName, + EventHubConnectionString, + LegacyCheckpointStorageBlobPrefix, + EventHubPath, + options, + factory, + invokeProcessorAfterReceiveTimeout, + ExceptionHandler, + new BlobContainerClient(StorageConnectionString, LeaseContainerName)); await CurrentProcessor.StartProcessingAsync().ConfigureAwait(false); } @@ -178,8 +186,11 @@ protected override async Task> ListCheckpo await foreach (BlobItem item in ContainerClient.GetBlobsAsync(traits: BlobTraits.Metadata, prefix: checkpointBlobsPrefix, cancellationToken: cancellationToken).ConfigureAwait(false)) { - if (long.TryParse(item.Metadata[OffsetMetadataKey], NumberStyles.Integer, CultureInfo.InvariantCulture, out long offset) && - long.TryParse(item.Metadata[SequenceNumberMetadataKey], NumberStyles.Integer, CultureInfo.InvariantCulture, out long sequenceNumber)) + if (item.Metadata.TryGetValue(OffsetMetadataKey, out string offsetMetadata) && + long.TryParse(offsetMetadata, NumberStyles.Integer, CultureInfo.InvariantCulture, out long offset) && + item.Metadata.TryGetValue(SequenceNumberMetadataKey, out string sequenceNumberMetadata) && + long.TryParse(sequenceNumberMetadata, NumberStyles.Integer, CultureInfo.InvariantCulture, out long sequenceNumber) && + offset < -100) { string partitionId = item.Name.Substring(checkpointBlobsPrefix.Length); @@ -197,31 +208,34 @@ protected override async Task> ListCheckpo // Check to see if there are any additional checkpoints in the older location that the V4 SDK would write to. If so, use them (this is helpful when moving from the V4 to V5 SDK, // since it means we will not have to reprocess messages processed and checkpointed by the older SDK). - string legacyCheckpointAndOwnershipPrefix = $"{LegacyCheckpointStorageBlobPrefix}{ConsumerGroup}/"; - await foreach (BlobItem item in ContainerClient.GetBlobsAsync(prefix: legacyCheckpointAndOwnershipPrefix, cancellationToken: cancellationToken).ConfigureAwait(false)) - { - string partitionId = item.Name.Substring(legacyCheckpointAndOwnershipPrefix.Length); - if (!checkpoints.ContainsKey(partitionId)) - { - using MemoryStream checkpointStream = new MemoryStream(); - await ContainerClient.GetBlobClient(item.Name).DownloadToAsync(checkpointStream, cancellationToken: cancellationToken).ConfigureAwait(false); - checkpointStream.Position = 0; - BlobPartitionLease lease = await JsonSerializer.DeserializeAsync(checkpointStream, cancellationToken: cancellationToken).ConfigureAwait(false); - if (long.TryParse(lease.Offset, out long offset)) - { - LeaseInfos.TryAdd(partitionId, new LeaseInfo(offset, lease.SequenceNumber ?? 0)); - checkpoints.Add(partitionId, new EventProcessorCheckpoint() - { - ConsumerGroup = ConsumerGroup, - EventHubName = EventHubName, - FullyQualifiedNamespace = FullyQualifiedNamespace, - PartitionId = partitionId, - StartingPosition = EventPosition.FromOffset(offset, isInclusive: false) - }); - } - } - } + // TODO: Reenable reading V4 checkpoints + + // string legacyCheckpointAndOwnershipPrefix = $"{LegacyCheckpointStorageBlobPrefix}{ConsumerGroup}/"; + // await foreach (BlobItem item in ContainerClient.GetBlobsAsync(prefix: legacyCheckpointAndOwnershipPrefix, cancellationToken: cancellationToken).ConfigureAwait(false)) + // { + // string partitionId = item.Name.Substring(legacyCheckpointAndOwnershipPrefix.Length); + // if (!checkpoints.ContainsKey(partitionId)) + // { + // using MemoryStream checkpointStream = new MemoryStream(); + // await ContainerClient.GetBlobClient(item.Name).DownloadToAsync(checkpointStream, cancellationToken: cancellationToken).ConfigureAwait(false); + // checkpointStream.Position = 0; + // BlobPartitionLease lease = await JsonSerializer.DeserializeAsync(checkpointStream, cancellationToken: cancellationToken).ConfigureAwait(false); + // + // if (long.TryParse(lease.Offset, out long offset)) + // { + // LeaseInfos.TryAdd(partitionId, new LeaseInfo(offset, lease.SequenceNumber ?? 0)); + // checkpoints.Add(partitionId, new EventProcessorCheckpoint() + // { + // ConsumerGroup = ConsumerGroup, + // EventHubName = EventHubName, + // FullyQualifiedNamespace = FullyQualifiedNamespace, + // PartitionId = partitionId, + // StartingPosition = EventPosition.FromOffset(offset, isInclusive: false) + // }); + // } + // } + // } return checkpoints.Values; } @@ -244,13 +258,16 @@ protected override async Task> Lis Version = blob.Properties.ETag.Value.ToString(), }); } - return partitonOwnerships; } internal virtual async Task CheckpointAsync(string partitionId, EventData checkpointEvent, CancellationToken cancellationToken = default) { - string checkpointBlob = string.Format(CultureInfo.InvariantCulture, CheckpointPrefixFormat + partitionId, FullyQualifiedNamespace.ToLowerInvariant(), EventHubName.ToLowerInvariant(), ConsumerGroup.ToLowerInvariant()); + string checkpointBlob = string.Format(CultureInfo.InvariantCulture, + CheckpointPrefixFormat + partitionId, + FullyQualifiedNamespace.ToLowerInvariant(), + EventHubName.ToLowerInvariant(), + ConsumerGroup.ToLowerInvariant()); Dictionary checkpointMetadata = new Dictionary() { { OffsetMetadataKey, checkpointEvent.Offset.ToString(CultureInfo.InvariantCulture) }, @@ -273,15 +290,15 @@ internal virtual async Task CheckpointAsync(string partitionId, EventData checkp }; using MemoryStream legacyCheckpointStream = new MemoryStream(); - await JsonSerializer.SerializeAsync(legacyCheckpointStream, lease).ConfigureAwait(false); + await JsonSerializer.SerializeAsync(legacyCheckpointStream, lease, cancellationToken: cancellationToken).ConfigureAwait(false); string legacyCheckpointBlob = $"{LegacyCheckpointStorageBlobPrefix}{ConsumerGroup}/{partitionId}"; - Dictionary legacyCheckpoitMetadata = new Dictionary() + Dictionary legacyCheckpointMetadata = new Dictionary() { { OwningHostMedataKey, Identifier } }; - await ContainerClient.GetBlobClient(checkpointBlob).UploadAsync(legacyCheckpointStream, metadata: legacyCheckpoitMetadata, cancellationToken: cancellationToken).ConfigureAwait(false); + await ContainerClient.GetBlobClient(legacyCheckpointBlob).UploadAsync(legacyCheckpointStream, metadata: legacyCheckpointMetadata, cancellationToken: cancellationToken).ConfigureAwait(false); } internal virtual LeaseInfo GetLeaseInfo(string partitionId) diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerBindingStrategy.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerBindingStrategy.cs index 385f0c2a1a50a..92d1752d331f1 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerBindingStrategy.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerBindingStrategy.cs @@ -125,7 +125,7 @@ private static void AddBindingData(Dictionary bindingData, Event SafeAddValue(() => bindingData.Add("PartitionKey", eventData.PartitionKey)); SafeAddValue(() => bindingData.Add("Offset", eventData.Offset)); SafeAddValue(() => bindingData.Add("SequenceNumber", eventData.SequenceNumber)); - SafeAddValue(() => bindingData.Add("EnqueuedTimeUtc", eventData.EnqueuedTime)); + SafeAddValue(() => bindingData.Add("EnqueuedTimeUtc", eventData.EnqueuedTime.Date)); SafeAddValue(() => bindingData.Add("Properties", eventData.Properties)); SafeAddValue(() => bindingData.Add("SystemProperties", GetSystemPropertiesForBinding(eventData))); } @@ -155,7 +155,7 @@ private static IDictionary GetSystemPropertiesForBinding(EventDa modifiedDictionary["SequenceNumber"] = eventData.SequenceNumber; modifiedDictionary["Offset"] = eventData.Offset; modifiedDictionary["PartitionKey"] = eventData.PartitionKey; - modifiedDictionary["EnqueuedTimeUtc"] = eventData.EnqueuedTime; + modifiedDictionary["EnqueuedTimeUtc"] = eventData.EnqueuedTime.DateTime; return modifiedDictionary; } } diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs index 1ad38f5c96b81..caebdfd51e837 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs @@ -169,6 +169,7 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName)] string evt, string partitionKey, DateTime enqueuedTimeUtc, IDictionary properties, IDictionary systemProperties) { + Console.WriteLine($"Got event Offset: {systemProperties["Offset"]} SequenceNumber: {systemProperties["SequenceNumber"]} Partition {partitionKey}"); // filter for the ID the current test is using if (evt == _testId) { @@ -320,6 +321,7 @@ private Tuple BuildHost() }) .ConfigureLogging(b => { + b.AddConsole(); b.SetMinimumLevel(LogLevel.Debug); }) .Build(); diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj index 68331576b46e4..b6f512d5c34e6 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj @@ -7,6 +7,7 @@ + From 8641af3da5bde22970be70e68fac3ab37f04c337 Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Tue, 17 Nov 2020 14:00:14 -0800 Subject: [PATCH 3/8] Use shared blob store --- .../src/Config/EventHubOptions.cs | 1 - ....Azure.WebJobs.Extensions.EventHubs.csproj | 6 +- .../src/Processor/EventProcessorHost.cs | 220 +----------------- .../tests/EventHubListenerTests.cs | 2 +- .../tests/EventHubTests.cs | 1 - 5 files changed, 16 insertions(+), 214 deletions(-) diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs index 8658c3a59d9ad..ff390e84c1c10 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs @@ -277,7 +277,6 @@ internal EventProcessorHost GetEventProcessorHost(IConfiguration config, string eventHubConnectionString: sb.ToString(), storageConnectionString: storageConnectionString, leaseContainerName: LeaseContainerName, - legacyCheckpointStorageBlobPrefix: blobPrefix, exceptionHandler: _exceptionHandler); return host; diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj index 6a5dcb4552e36..6d14388318a76 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj @@ -4,7 +4,7 @@ netstandard2.0 Microsoft Azure WebJobs SDK EventHubs Extension 5.0.0-beta.1 - $(NoWarn);AZC0001;CS1591 + $(NoWarn);AZC0001;CS1591;SA1636 sign.snk @@ -15,10 +15,10 @@ + - - + diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs index 278c3d034ed0f..30e9682d51b52 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs @@ -4,19 +4,13 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; -using System.Globalization; -using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; -using System.Text.Json; -using Azure; using Azure.Messaging.EventHubs; -using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Primitives; using Azure.Messaging.EventHubs.Processor; using Azure.Storage.Blobs; -using Azure.Storage.Blobs.Models; namespace Microsoft.Azure.WebJobs.EventHubs.Processor { @@ -28,11 +22,10 @@ internal class EventProcessorHost public string EventHubConnectionString { get; } public string StorageConnectionString { get; } private string LeaseContainerName { get; } - private string LegacyCheckpointStorageBlobPrefix { get; } private Processor CurrentProcessor { get; set; } private Action ExceptionHandler { get; } - public EventProcessorHost(string eventHubName, string eventHubPath, string consumerGroupName, string eventHubConnectionString, string storageConnectionString, string leaseContainerName, string legacyCheckpointStorageBlobPrefix, Action exceptionHandler) + public EventProcessorHost(string eventHubName, string eventHubPath, string consumerGroupName, string eventHubConnectionString, string storageConnectionString, string leaseContainerName, Action exceptionHandler) { EventHubName = eventHubName; EventHubPath = eventHubPath; @@ -40,7 +33,6 @@ public EventProcessorHost(string eventHubName, string eventHubPath, string consu EventHubConnectionString = eventHubConnectionString; StorageConnectionString = storageConnectionString; LeaseContainerName = leaseContainerName; - LegacyCheckpointStorageBlobPrefix = legacyCheckpointStorageBlobPrefix; ExceptionHandler = exceptionHandler; } @@ -54,7 +46,6 @@ public async Task RegisterEventProcessorFactoryAsync(IEventProcessorFactory fact CurrentProcessor = new Processor(maxBatchSize, ConsumerGroupName, EventHubConnectionString, - LegacyCheckpointStorageBlobPrefix, EventHubPath, options, factory, @@ -88,217 +79,41 @@ internal class Processor : EventProcessor private Action ExceptionHandler { get; } private ConcurrentDictionary LeaseInfos { get; } - private BlobContainerClient ContainerClient { get; } + private BlobsCheckpointStore CheckpointStore { get; } - // These constants match the constant used by the V5 EventHubs SDK. - private const string OwnershipPrefixFormat = "{0}/{1}/{2}/ownership/"; - private const string CheckpointPrefixFormat = "{0}/{1}/{2}/checkpoint/"; - private const string OwnerIdentifierMetadataKey = "ownerid"; - private const string OffsetMetadataKey = "offset"; - private const string SequenceNumberMetadataKey = "sequencenumber"; - - // In addition to being stored inside the lease blob, the V4 SDK also writes ownership information - // as metadata on the blob, using this key. - private const string OwningHostMedataKey = "OWNINGHOST"; - - private string LegacyCheckpointStorageBlobPrefix { get; } - - public Processor(int eventBatchMaximumCount, string consumerGroup, string connectionString, string legacyCheckpointStorageBlobPrefix, string eventHubName, EventProcessorOptions options, IEventProcessorFactory processorFactory, bool invokeProcessorAfterRecieveTimeout, Action exceptionHandler, BlobContainerClient containerClient) : base(eventBatchMaximumCount, consumerGroup, connectionString, eventHubName, options) + public Processor(int eventBatchMaximumCount, string consumerGroup, string connectionString, string eventHubName, EventProcessorOptions options, IEventProcessorFactory processorFactory, bool invokeProcessorAfterRecieveTimeout, Action exceptionHandler, BlobContainerClient containerClient) : base(eventBatchMaximumCount, consumerGroup, connectionString, eventHubName, options) { ProcessorFactory = processorFactory; InvokeProcessorAfterRecieveTimeout = invokeProcessorAfterRecieveTimeout; ExceptionHandler = exceptionHandler; LeaseInfos = new ConcurrentDictionary(); - ContainerClient = containerClient; - LegacyCheckpointStorageBlobPrefix = legacyCheckpointStorageBlobPrefix; + CheckpointStore = new BlobsCheckpointStore(containerClient, RetryPolicy); } protected override async Task> ClaimOwnershipAsync(IEnumerable desiredOwnership, CancellationToken cancellationToken) { - List claimedOwnerships = new List(); - - foreach (EventProcessorPartitionOwnership ownership in desiredOwnership) - { - Dictionary ownershipMetadata = new Dictionary() - { - { OwnerIdentifierMetadataKey, ownership.OwnerIdentifier }, - }; - - // Construct the path to the blob and get a blob client for it so we can interact with it. - string ownershipBlob = string.Format(CultureInfo.InvariantCulture, OwnershipPrefixFormat + ownership.PartitionId, ownership.FullyQualifiedNamespace.ToLowerInvariant(), ownership.EventHubName.ToLowerInvariant(), ownership.ConsumerGroup.ToLowerInvariant()); - BlobClient ownershipBlobClient = ContainerClient.GetBlobClient(ownershipBlob); - - try - { - if (ownership.Version == null) - { - // In this case, we are trying to claim ownership of a partition which was previously unowned, and hence did not have an ownership file. To ensure only a single host grabs the partition, - // we use a conditional request so that we only create our blob in the case where it does not yet exist. - BlobRequestConditions requestConditions = new BlobRequestConditions() { IfNoneMatch = new ETag("*") }; - - using MemoryStream emptyStream = new MemoryStream(Array.Empty()); - BlobContentInfo info = await ownershipBlobClient.UploadAsync(emptyStream, metadata: ownershipMetadata, conditions: requestConditions, cancellationToken: cancellationToken).ConfigureAwait(false); - - claimedOwnerships.Add(new EventProcessorPartitionOwnership() - { - ConsumerGroup = ownership.ConsumerGroup, - EventHubName = ownership.EventHubName, - FullyQualifiedNamespace = ownership.FullyQualifiedNamespace, - LastModifiedTime = info.LastModified, - OwnerIdentifier = ownership.OwnerIdentifier, - PartitionId = ownership.PartitionId, - Version = info.ETag.ToString(), - }); - } - else - { - // In this case, the partition is owned by some other host. The ownership file already exists, so we just need to change metadata on it. But we should only do this if the metadata has not - // changed between when we listed ownership and when we are trying to claim ownership, i.e. the ETag for the file has not changed. - BlobRequestConditions requestConditions = new BlobRequestConditions() { IfMatch = new ETag(ownership.Version) }; - BlobInfo info = await ownershipBlobClient.SetMetadataAsync(ownershipMetadata, requestConditions, cancellationToken).ConfigureAwait(false); - - claimedOwnerships.Add(new EventProcessorPartitionOwnership() - { - ConsumerGroup = ownership.ConsumerGroup, - EventHubName = ownership.EventHubName, - FullyQualifiedNamespace = ownership.FullyQualifiedNamespace, - LastModifiedTime = info.LastModified, - OwnerIdentifier = ownership.OwnerIdentifier, - PartitionId = ownership.PartitionId, - Version = info.ETag.ToString(), - }); - } - } - catch (RequestFailedException e) when (e.ErrorCode == BlobErrorCode.BlobAlreadyExists || e.ErrorCode == BlobErrorCode.ConditionNotMet) - { - // In this case, another host has claimed the partition before we did. That's safe to ignore. We'll still try to claim other partitions. - } - } - - return claimedOwnerships; + return await CheckpointStore.ClaimOwnershipAsync(desiredOwnership, cancellationToken).ConfigureAwait(false); } protected override async Task> ListCheckpointsAsync(CancellationToken cancellationToken) { - // First, we read information from the location that the EventHubs V5 SDK writes to. - Dictionary checkpoints = new Dictionary(); - string checkpointBlobsPrefix = string.Format(CultureInfo.InvariantCulture, CheckpointPrefixFormat, FullyQualifiedNamespace.ToLowerInvariant(), EventHubName.ToLowerInvariant(), ConsumerGroup.ToLowerInvariant()); - - await foreach (BlobItem item in ContainerClient.GetBlobsAsync(traits: BlobTraits.Metadata, prefix: checkpointBlobsPrefix, cancellationToken: cancellationToken).ConfigureAwait(false)) - { - if (item.Metadata.TryGetValue(OffsetMetadataKey, out string offsetMetadata) && - long.TryParse(offsetMetadata, NumberStyles.Integer, CultureInfo.InvariantCulture, out long offset) && - item.Metadata.TryGetValue(SequenceNumberMetadataKey, out string sequenceNumberMetadata) && - long.TryParse(sequenceNumberMetadata, NumberStyles.Integer, CultureInfo.InvariantCulture, out long sequenceNumber) && - offset < -100) - { - string partitionId = item.Name.Substring(checkpointBlobsPrefix.Length); - - LeaseInfos.TryAdd(partitionId, new LeaseInfo(offset, sequenceNumber)); - checkpoints.Add(partitionId, new EventProcessorCheckpoint() - { - ConsumerGroup = ConsumerGroup, - EventHubName = EventHubName, - FullyQualifiedNamespace = FullyQualifiedNamespace, - PartitionId = partitionId, - StartingPosition = EventPosition.FromOffset(offset, isInclusive: false) - }); - } - } - - // Check to see if there are any additional checkpoints in the older location that the V4 SDK would write to. If so, use them (this is helpful when moving from the V4 to V5 SDK, - // since it means we will not have to reprocess messages processed and checkpointed by the older SDK). - - // TODO: Reenable reading V4 checkpoints - - // string legacyCheckpointAndOwnershipPrefix = $"{LegacyCheckpointStorageBlobPrefix}{ConsumerGroup}/"; - // await foreach (BlobItem item in ContainerClient.GetBlobsAsync(prefix: legacyCheckpointAndOwnershipPrefix, cancellationToken: cancellationToken).ConfigureAwait(false)) - // { - // string partitionId = item.Name.Substring(legacyCheckpointAndOwnershipPrefix.Length); - // if (!checkpoints.ContainsKey(partitionId)) - // { - // using MemoryStream checkpointStream = new MemoryStream(); - // await ContainerClient.GetBlobClient(item.Name).DownloadToAsync(checkpointStream, cancellationToken: cancellationToken).ConfigureAwait(false); - // checkpointStream.Position = 0; - // BlobPartitionLease lease = await JsonSerializer.DeserializeAsync(checkpointStream, cancellationToken: cancellationToken).ConfigureAwait(false); - // - // if (long.TryParse(lease.Offset, out long offset)) - // { - // LeaseInfos.TryAdd(partitionId, new LeaseInfo(offset, lease.SequenceNumber ?? 0)); - // checkpoints.Add(partitionId, new EventProcessorCheckpoint() - // { - // ConsumerGroup = ConsumerGroup, - // EventHubName = EventHubName, - // FullyQualifiedNamespace = FullyQualifiedNamespace, - // PartitionId = partitionId, - // StartingPosition = EventPosition.FromOffset(offset, isInclusive: false) - // }); - // } - // } - // } - - return checkpoints.Values; + return await CheckpointStore.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, cancellationToken).ConfigureAwait(false); } protected override async Task> ListOwnershipAsync(CancellationToken cancellationToken) { - List partitonOwnerships = new List(); - string ownershipBlobsPefix = string.Format(CultureInfo.InvariantCulture, OwnershipPrefixFormat, FullyQualifiedNamespace.ToLowerInvariant(), EventHubName.ToLowerInvariant(), ConsumerGroup.ToLowerInvariant()); - - await foreach (BlobItem blob in ContainerClient.GetBlobsAsync(traits: BlobTraits.Metadata, prefix: ownershipBlobsPefix, cancellationToken: cancellationToken).ConfigureAwait(false)) - { - partitonOwnerships.Add(new EventProcessorPartitionOwnership() - { - ConsumerGroup = ConsumerGroup, - EventHubName = EventHubName, - FullyQualifiedNamespace = FullyQualifiedNamespace, - LastModifiedTime = blob.Properties.LastModified.GetValueOrDefault(), - OwnerIdentifier = blob.Metadata[OwnerIdentifierMetadataKey], - PartitionId = blob.Name.Substring(ownershipBlobsPefix.Length), - Version = blob.Properties.ETag.Value.ToString(), - }); - } - return partitonOwnerships; + return await CheckpointStore.ListOwnershipAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, cancellationToken).ConfigureAwait(false); } internal virtual async Task CheckpointAsync(string partitionId, EventData checkpointEvent, CancellationToken cancellationToken = default) { - string checkpointBlob = string.Format(CultureInfo.InvariantCulture, - CheckpointPrefixFormat + partitionId, - FullyQualifiedNamespace.ToLowerInvariant(), - EventHubName.ToLowerInvariant(), - ConsumerGroup.ToLowerInvariant()); - Dictionary checkpointMetadata = new Dictionary() - { - { OffsetMetadataKey, checkpointEvent.Offset.ToString(CultureInfo.InvariantCulture) }, - { SequenceNumberMetadataKey, checkpointEvent.SequenceNumber.ToString(CultureInfo.InvariantCulture) } - }; - - using MemoryStream emptyStream = new MemoryStream(Array.Empty()); - await ContainerClient.GetBlobClient(checkpointBlob).UploadAsync(emptyStream, metadata: checkpointMetadata, cancellationToken: cancellationToken).ConfigureAwait(false); - - LeaseInfos[partitionId] = new LeaseInfo(checkpointEvent.Offset, checkpointEvent.SequenceNumber); - - // In addition to writing a checkpoint in the V5 format, we also write one in the older V4 format, as some processes (e.g. the scale controller) expect - // checkpoints in the older format. This also makes it possible to move to an earlier version of the SDK without having to re-process events. - BlobPartitionLease lease = new BlobPartitionLease() + await CheckpointStore.UpdateCheckpointAsync(new EventProcessorCheckpoint() { PartitionId = partitionId, - Owner = Identifier, - Offset = checkpointEvent.Offset.ToString(CultureInfo.InvariantCulture), - SequenceNumber = checkpointEvent.SequenceNumber, - }; - - using MemoryStream legacyCheckpointStream = new MemoryStream(); - await JsonSerializer.SerializeAsync(legacyCheckpointStream, lease, cancellationToken: cancellationToken).ConfigureAwait(false); - - string legacyCheckpointBlob = $"{LegacyCheckpointStorageBlobPrefix}{ConsumerGroup}/{partitionId}"; - Dictionary legacyCheckpointMetadata = new Dictionary() - { - { OwningHostMedataKey, Identifier } - }; - - await ContainerClient.GetBlobClient(legacyCheckpointBlob).UploadAsync(legacyCheckpointStream, metadata: legacyCheckpointMetadata, cancellationToken: cancellationToken).ConfigureAwait(false); + ConsumerGroup = ConsumerGroup, + EventHubName = EventHubName, + FullyQualifiedNamespace = FullyQualifiedNamespace + }, checkpointEvent, cancellationToken).ConfigureAwait(false); } internal virtual LeaseInfo GetLeaseInfo(string partitionId) @@ -354,17 +169,6 @@ protected override Task OnPartitionProcessingStoppedAsync(Partition partition, P { return partition.Processor.CloseAsync(partition.Context, reason); } - - // The V4 SDK stored lease and checkpoint information in a single JSON file with these properties - private class BlobPartitionLease - { - public string PartitionId { get; set; } - public string Owner { get; set; } - public string Token { get; set; } - public long? Epoch { get; set; } - public string Offset { get; set; } - public long? SequenceNumber { get; set; } - } } } } diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs index 75d169267c532..05eaa9fa91ebb 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs @@ -198,7 +198,7 @@ public void GetMonitor_ReturnsExpectedValue() null, consumerGroup, "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123=", - "DefaultEndpointsProtocol=https;AccountName=EventHubScaleMonitorFakeTestAccount;AccountKey=ABCDEFG;EndpointSuffix=core.windows.net", null, null, null); + "DefaultEndpointsProtocol=https;AccountName=EventHubScaleMonitorFakeTestAccount;AccountKey=ABCDEFG;EndpointSuffix=core.windows.net", null, null); var listener = new EventHubListener( functionId, new Mock(MockBehavior.Strict).Object, diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs index 96b20e710392c..36273718e6652 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs @@ -252,7 +252,6 @@ internal static ProcessorPartitionContext GetPartitionContext(string partitionId var processor = new EventProcessorHost.Processor(Int32.MaxValue, consumerGroupName, "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123=", - "", eventHubPath, new EventProcessorOptions(), null, From a83e8ae5037c70fd1bf5f7341cc94c1c3f2556da Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Tue, 17 Nov 2020 14:31:13 -0800 Subject: [PATCH 4/8] works --- .../src/Config/EventHubExtensionConfigProvider.cs | 2 +- .../src/EventHubProducerClientImpl.cs | 7 ++++++- .../src/Triggers/EventHubTriggerBindingStrategy.cs | 2 +- .../tests/EventHubEndToEndTests.cs | 8 ++++---- 4 files changed, 12 insertions(+), 7 deletions(-) diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs index ebd98b7f29167..ed4ce2a6ba4e9 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs @@ -113,7 +113,7 @@ private static LogLevel GetLogLevel(Exception ex) private IAsyncCollector BuildFromAttribute(EventHubAttribute attribute) { EventHubProducerClient client = _options.Value.GetEventHubProducerClient(attribute.EventHubName, attribute.Connection); - return new EventHubAsyncCollector(new EventHubProducerClientImpl(client)); + return new EventHubAsyncCollector(new EventHubProducerClientImpl(client, _loggerFactory)); } private static string ConvertEventDataToString(EventData x) diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/EventHubProducerClientImpl.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/EventHubProducerClientImpl.cs index 085e2c6da2874..f1f54628bb5a5 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/EventHubProducerClientImpl.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/EventHubProducerClientImpl.cs @@ -4,6 +4,8 @@ using System.Threading; using System.Threading.Tasks; using Azure.Messaging.EventHubs.Producer; +using Microsoft.Azure.WebJobs.Logging; +using Microsoft.Extensions.Logging; namespace Microsoft.Azure.WebJobs { @@ -11,10 +13,12 @@ namespace Microsoft.Azure.WebJobs internal class EventHubProducerClientImpl : IEventHubProducerClient { private readonly EventHubProducerClient _client; + private readonly ILogger _logger; - public EventHubProducerClientImpl(EventHubProducerClient client) + public EventHubProducerClientImpl(EventHubProducerClient client, ILoggerFactory loggerFactory) { _client = client; + _logger = loggerFactory?.CreateLogger(LogCategories.Executor); } public async Task CreateBatchAsync(CancellationToken cancellationToken) @@ -24,6 +28,7 @@ public async Task CreateBatchAsync(CancellationToken cancellati public async Task SendAsync(IEventDataBatch batch, CancellationToken cancellationToken) { + _logger?.LogDebug("Sending events to EventHub"); var eventDataBatch = ((EventDataBatchImpl) batch).Batch; await _client.SendAsync(eventDataBatch, cancellationToken).ConfigureAwait(false); } diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerBindingStrategy.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerBindingStrategy.cs index 92d1752d331f1..7513851fbf1bf 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerBindingStrategy.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerBindingStrategy.cs @@ -125,7 +125,7 @@ private static void AddBindingData(Dictionary bindingData, Event SafeAddValue(() => bindingData.Add("PartitionKey", eventData.PartitionKey)); SafeAddValue(() => bindingData.Add("Offset", eventData.Offset)); SafeAddValue(() => bindingData.Add("SequenceNumber", eventData.SequenceNumber)); - SafeAddValue(() => bindingData.Add("EnqueuedTimeUtc", eventData.EnqueuedTime.Date)); + SafeAddValue(() => bindingData.Add("EnqueuedTimeUtc", eventData.EnqueuedTime.DateTime)); SafeAddValue(() => bindingData.Add("Properties", eventData.Properties)); SafeAddValue(() => bindingData.Add("SystemProperties", GetSystemPropertiesForBinding(eventData))); } diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs index caebdfd51e837..6ce93e63cd531 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs @@ -27,7 +27,8 @@ public class EventHubEndToEndTests: LiveTestBase private static string _testId; private static List _results; - public EventHubEndToEndTests() + [SetUp] + public void SetUp() { _results = new List(); _testId = Guid.NewGuid().ToString(); @@ -48,7 +49,7 @@ public async Task EventHub_PocoBinding() var logs = tuple.Item2.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage); - CollectionAssert.Contains($"PocoValues(foo,{_testId})", logs); + CollectionAssert.Contains(logs, $"PocoValues(foo,{_testId})"); } } @@ -66,7 +67,7 @@ public async Task EventHub_StringBinding() var logs = tuple.Item2.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage); - CollectionAssert.Contains($"Input({_testId})", logs); + CollectionAssert.Contains(logs, $"Input({_testId})"); } } @@ -169,7 +170,6 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName)] string evt, string partitionKey, DateTime enqueuedTimeUtc, IDictionary properties, IDictionary systemProperties) { - Console.WriteLine($"Got event Offset: {systemProperties["Offset"]} SequenceNumber: {systemProperties["SequenceNumber"]} Partition {partitionKey}"); // filter for the ID the current test is using if (evt == _testId) { From eb03e705e7d960d42b860f27fc313587adfe1949 Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Tue, 17 Nov 2020 15:11:26 -0800 Subject: [PATCH 5/8] fix broken tests --- .../tests/EventHubTests.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs index 36273718e6652..bb226ca9e8eb5 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs @@ -84,7 +84,7 @@ public void GetBindingData_SingleDispatch_ReturnsExpectedValue() Assert.AreEqual(evt.PartitionKey, bindingData["PartitionKey"]); Assert.AreEqual(evt.Offset, bindingData["Offset"]); Assert.AreEqual(evt.SequenceNumber, bindingData["SequenceNumber"]); - Assert.AreEqual(evt.EnqueuedTime, bindingData["EnqueuedTimeUtc"]); + Assert.AreEqual(evt.EnqueuedTime.DateTime, bindingData["EnqueuedTimeUtc"]); Assert.AreSame(evt.Properties, bindingData["Properties"]); IDictionary bindingDataSysProps = bindingData["SystemProperties"] as Dictionary; Assert.NotNull(bindingDataSysProps); @@ -132,7 +132,7 @@ public void GetBindingData_MultipleDispatch_ReturnsExpectedValue() Assert.AreEqual(events.Length, ((string[])bindingData["PartitionKeyArray"]).Length); Assert.AreEqual(events.Length, ((string[])bindingData["OffsetArray"]).Length); Assert.AreEqual(events.Length, ((long[])bindingData["SequenceNumberArray"]).Length); - Assert.AreEqual(events.Length, ((DateTimeOffset[])bindingData["EnqueuedTimeUtcArray"]).Length); + Assert.AreEqual(events.Length, ((DateTime[])bindingData["EnqueuedTimeUtcArray"]).Length); Assert.AreEqual(events.Length, ((IDictionary[])bindingData["PropertiesArray"]).Length); Assert.AreEqual(events.Length, ((IDictionary[])bindingData["SystemPropertiesArray"]).Length); From 553e62b00efba67a3407ebc8e547657a5b96c0d6 Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Wed, 18 Nov 2020 13:38:36 -0800 Subject: [PATCH 6/8] testing changes --- .../src/Testing/EventHubsTestEnvironment.cs | 21 +--- .../Connection/EventHubConnectionLiveTests.cs | 4 +- ...oft.Azure.WebJobs.Extensions.EventHubs.sln | 42 ++++--- ....Azure.WebJobs.Extensions.EventHubs.csproj | 5 +- .../tests/EventHubEndToEndTests.cs | 114 ++++++++++++------ .../tests/EventHubsTestEnvironment.cs | 20 --- ....WebJobs.Extensions.EventHubs.Tests.csproj | 11 +- 7 files changed, 121 insertions(+), 96 deletions(-) delete mode 100644 sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsTestEnvironment.cs diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventHubsTestEnvironment.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventHubsTestEnvironment.cs index e961551815744..0700d25dbe6d8 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventHubsTestEnvironment.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventHubsTestEnvironment.cs @@ -137,7 +137,7 @@ public sealed class EventHubsTestEnvironment: TestEnvironment /// Initializes a new instance of . /// /// - private EventHubsTestEnvironment() : base("eventhub") + public EventHubsTestEnvironment() : base("eventhub") { ParsedConnectionString = new Lazy(() => EventHubsConnectionStringProperties.Parse(EventHubsConnectionString), LazyThreadSafetyMode.ExecutionAndPublication); ActiveEventHubsNamespace = new Lazy(EnsureEventHubsNamespace, LazyThreadSafetyMode.ExecutionAndPublication); @@ -167,25 +167,6 @@ private EventHubsTestEnvironment() : base("eventhub") /// public string BuildConnectionStringForEventHub(string eventHubName) => $"{ EventHubsConnectionString };EntityPath={ eventHubName }"; - /// - /// Builds a connection string for the Event Hubs namespace used for Live tests, creating a shared access signature - /// in place of the shared key. - /// - /// - /// The name of the Event Hub to base the connection string on. - /// The audience to use for the shared access signature. - /// The duration, in minutes, that the signature should be considered valid for. - /// - /// The namespace connection string with a shared access signature based on the shared key of the current scope. - /// - public string BuildConnectionStringWithSharedAccessSignature(string eventHubName, - string signatureAudience, - int validDurationMinutes = 30) - { - var signature = new SharedAccessSignature(signatureAudience, SharedAccessKeyName, SharedAccessKey, TimeSpan.FromMinutes(validDurationMinutes)); - return $"Endpoint=sb://{ ParsedConnectionString.Value.FullyQualifiedNamespace };EntityPath={ eventHubName };SharedAccessSignature={ signature.Value }"; - } - /// /// Ensures that an Event Hubs namespace is available for the test run, using one if provided by the /// or creating a new Azure resource specific diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Connection/EventHubConnectionLiveTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Connection/EventHubConnectionLiveTests.cs index d36f56756b3e2..cf764a957143a 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Connection/EventHubConnectionLiveTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Connection/EventHubConnectionLiveTests.cs @@ -81,7 +81,9 @@ public async Task ConnectionCanConnectToEventHubsUsingSharedAccessSignatureConne { var options = new EventHubConnectionOptions(); var audience = EventHubConnection.BuildConnectionAudience(options.TransportType, EventHubsTestEnvironment.Instance.FullyQualifiedNamespace, scope.EventHubName); - var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringWithSharedAccessSignature(scope.EventHubName, audience); + EventHubsTestEnvironment tempQualifier = EventHubsTestEnvironment.Instance; + var signature = new SharedAccessSignature(audience, tempQualifier.SharedAccessKeyName, tempQualifier.SharedAccessKey, TimeSpan.FromMinutes(30)); + var connectionString = $"Endpoint=sb://{tempQualifier.FullyQualifiedNamespace };EntityPath={ scope.EventHubName };SharedAccessSignature={ signature.Value }"; await using (var connection = new TestConnectionWithTransport(connectionString, options)) { diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/Microsoft.Azure.WebJobs.Extensions.EventHubs.sln b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/Microsoft.Azure.WebJobs.Extensions.EventHubs.sln index e9a30dd211f3c..49671484fc38a 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/Microsoft.Azure.WebJobs.Extensions.EventHubs.sln +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/Microsoft.Azure.WebJobs.Extensions.EventHubs.sln @@ -3,12 +3,14 @@ Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 15 VisualStudioVersion = 15.0.26124.0 MinimumVisualStudioVersion = 15.0.26124.0 -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Azure.WebJobs.Extensions.EventHubs", "src\Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj", "{CFBC2DEF-E738-4380-A8DE-236433E15CF6}" -EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests", "tests\Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj", "{91E0D968-2082-4959-A294-6F1B790ECECF}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Azure.Core.TestFramework", "..\..\core\Azure.Core.TestFramework\src\Azure.Core.TestFramework.csproj", "{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Azure.WebJobs.Extensions.EventHubs", "src\Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj", "{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Azure.Messaging.EventHubs", "..\Azure.Messaging.EventHubs\src\Azure.Messaging.EventHubs.csproj", "{B51ECD35-11DA-46D2-89D7-9DE3888CF896}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -22,18 +24,6 @@ Global HideSolutionNode = FALSE EndGlobalSection GlobalSection(ProjectConfigurationPlatforms) = postSolution - {CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Debug|Any CPU.Build.0 = Debug|Any CPU - {CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Debug|x64.ActiveCfg = Debug|Any CPU - {CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Debug|x64.Build.0 = Debug|Any CPU - {CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Debug|x86.ActiveCfg = Debug|Any CPU - {CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Debug|x86.Build.0 = Debug|Any CPU - {CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Release|Any CPU.ActiveCfg = Release|Any CPU - {CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Release|Any CPU.Build.0 = Release|Any CPU - {CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Release|x64.ActiveCfg = Release|Any CPU - {CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Release|x64.Build.0 = Release|Any CPU - {CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Release|x86.ActiveCfg = Release|Any CPU - {CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Release|x86.Build.0 = Release|Any CPU {91E0D968-2082-4959-A294-6F1B790ECECF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {91E0D968-2082-4959-A294-6F1B790ECECF}.Debug|Any CPU.Build.0 = Debug|Any CPU {91E0D968-2082-4959-A294-6F1B790ECECF}.Debug|x64.ActiveCfg = Debug|Any CPU @@ -58,5 +48,29 @@ Global {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|x64.Build.0 = Release|Any CPU {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|x86.ActiveCfg = Release|Any CPU {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|x86.Build.0 = Release|Any CPU + {B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Debug|x64.ActiveCfg = Debug|Any CPU + {B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Debug|x64.Build.0 = Debug|Any CPU + {B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Debug|x86.ActiveCfg = Debug|Any CPU + {B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Debug|x86.Build.0 = Debug|Any CPU + {B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Release|Any CPU.Build.0 = Release|Any CPU + {B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Release|x64.ActiveCfg = Release|Any CPU + {B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Release|x64.Build.0 = Release|Any CPU + {B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Release|x86.ActiveCfg = Release|Any CPU + {B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Release|x86.Build.0 = Release|Any CPU + {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Debug|x64.ActiveCfg = Debug|Any CPU + {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Debug|x64.Build.0 = Debug|Any CPU + {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Debug|x86.ActiveCfg = Debug|Any CPU + {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Debug|x86.Build.0 = Debug|Any CPU + {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|Any CPU.Build.0 = Release|Any CPU + {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|x64.ActiveCfg = Release|Any CPU + {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|x64.Build.0 = Release|Any CPU + {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|x86.ActiveCfg = Release|Any CPU + {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection EndGlobal diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj index 6d14388318a76..c021d4adac8e4 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj @@ -9,11 +9,14 @@ - + + + + diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs index 6ce93e63cd531..ac42ec6dd15cc 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs @@ -10,29 +10,63 @@ using System.Threading.Tasks; using Azure.Core.TestFramework; using Azure.Messaging.EventHubs; +using Azure.Messaging.EventHubs.Processor.Tests; using Azure.Messaging.EventHubs.Producer; +using Azure.Messaging.EventHubs.Tests; +using Microsoft.Azure.WebJobs.EventHubs; using Microsoft.Azure.WebJobs.Host.TestCommon; using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using NUnit.Framework; namespace Microsoft.Azure.WebJobs.Host.EndToEndTests { - public class EventHubEndToEndTests: LiveTestBase + [NonParallelizable] + [LiveOnly] + public class EventHubEndToEndTests { - private const string TestHubName = "webjobstesthub"; + private const string TestHubName = "%webjobstesthub%"; private const int Timeout = 30000; private static EventWaitHandle _eventWait; private static string _testId; private static List _results; + /// The active Event Hub resource scope for the test fixture. + private EventHubScope _eventHubScope; + + /// The active Blob storage resource scope for the test fixture. + private StorageScope _storageScope; + + /// + /// Performs the tasks needed to initialize the test fixture. This + /// method runs once for the entire fixture, prior to running any tests. + /// + /// [SetUp] - public void SetUp() + public async Task FixtureSetUp() { _results = new List(); _testId = Guid.NewGuid().ToString(); _eventWait = new ManualResetEvent(initialState: false); + _eventHubScope = await EventHubScope.CreateAsync(2); + _storageScope = await StorageScope.CreateAsync(); + } + + /// + /// Performs the tasks needed to cleanup the test fixture after all + /// tests have run. This method runs once for the entire fixture. + /// + /// + [TearDown] + public async Task FixtureTearDown() + { + await Task.WhenAll + ( + _eventHubScope.DisposeAsync().AsTask(), + _storageScope.DisposeAsync().AsTask() + ); } [Test] @@ -46,11 +80,11 @@ public async Task EventHub_PocoBinding() bool result = _eventWait.WaitOne(Timeout); Assert.True(result); + } - var logs = tuple.Item2.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage); + var logs = tuple.Item2.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage); - CollectionAssert.Contains(logs, $"PocoValues(foo,{_testId})"); - } + CollectionAssert.Contains(logs, $"PocoValues(foo,{_testId})"); } [Test] @@ -82,26 +116,23 @@ public async Task EventHub_SingleDispatch() bool result = _eventWait.WaitOne(Timeout); Assert.True(result); + } - // Wait for checkpointing - await Task.Delay(3000); - - IEnumerable logMessages = tuple.Item2.GetTestLoggerProvider() - .GetAllLogMessages(); + IEnumerable logMessages = tuple.Item2.GetTestLoggerProvider() + .GetAllLogMessages(); - Assert.AreEqual(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) - && x.FormattedMessage.Contains("Trigger Details:") - && x.FormattedMessage.Contains("Offset:")).Count(), 1); + Assert.AreEqual(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) + && x.FormattedMessage.Contains("Trigger Details:") + && x.FormattedMessage.Contains("Offset:")).Count(), 1); - Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) - && x.FormattedMessage.Contains("OpenAsync")).Count() > 0); + Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) + && x.FormattedMessage.Contains("OpenAsync")).Count() > 0); - Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) - && x.FormattedMessage.Contains("CheckpointAsync")).Count() > 0); + Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) + && x.FormattedMessage.Contains("CheckpointAsync")).Count() > 0); - Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) - && x.FormattedMessage.Contains("Sending events to EventHub")).Count() > 0); - } + Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) + && x.FormattedMessage.Contains("Sending events to EventHub")).Count() > 0); } [Test] @@ -118,26 +149,23 @@ public async Task EventHub_MultipleDispatch() bool result = _eventWait.WaitOne(Timeout); Assert.True(result); + } - // Wait for checkpointing - await Task.Delay(3000); - - IEnumerable logMessages = tuple.Item2.GetTestLoggerProvider() - .GetAllLogMessages(); + IEnumerable logMessages = tuple.Item2.GetTestLoggerProvider() + .GetAllLogMessages(); - Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) - && x.FormattedMessage.Contains("Trigger Details:") - && x.FormattedMessage.Contains("Offset:")).Count() > 0); + Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) + && x.FormattedMessage.Contains("Trigger Details:") + && x.FormattedMessage.Contains("Offset:")).Count() > 0); - Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) - && x.FormattedMessage.Contains("OpenAsync")).Count() > 0); + Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) + && x.FormattedMessage.Contains("OpenAsync")).Count() > 0); - Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) - && x.FormattedMessage.Contains("CheckpointAsync")).Count() > 0); + Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) + && x.FormattedMessage.Contains("CheckpointAsync")).Count() > 0); - Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) - && x.FormattedMessage.Contains("Sending events to EventHub")).Count() > 0); - } + Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage) + && x.FormattedMessage.Contains("Sending events to EventHub")).Count() > 0); } [Test] @@ -300,23 +328,31 @@ public static void ProcessMultiplePartitionEvents([EventHubTrigger(TestHubName)] private Tuple BuildHost() { + var eventHubName = _eventHubScope.EventHubName; JobHost jobHost; IHost host = new HostBuilder() .ConfigureAppConfiguration(builder => { builder.AddInMemoryCollection(new Dictionary() { - { "AzureWebJobsStorage", TestEnvironment.StorageAccountConnectionString } + { "webjobstesthub", eventHubName }, + { "AzureWebJobsStorage", StorageTestEnvironment.Instance.StorageConnectionString } }); }) + .ConfigureServices(services => + { + // Speedup shutdown + services.Configure(options => options.EventProcessorOptions.MaximumWaitTime = TimeSpan.FromSeconds(5)); + }) .ConfigureDefaultTestHost(b => { b.AddEventHubs(options => { // TODO: alternative? //options.EventProcessorOptions.EnableReceiverRuntimeMetric = true; - options.AddSender(TestHubName, TestEnvironment.EventHubsNamespaceConnectionString); - options.AddReceiver(TestHubName, TestEnvironment.EventHubsNamespaceConnectionString); + var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(eventHubName); + options.AddSender(eventHubName, connectionString); + options.AddReceiver(eventHubName, connectionString); }); }) .ConfigureLogging(b => diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsTestEnvironment.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsTestEnvironment.cs deleted file mode 100644 index 18e9ec32fad0c..0000000000000 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubsTestEnvironment.cs +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using Azure.Core.TestFramework; - -namespace Microsoft.Azure.WebJobs.Host.EndToEndTests -{ - public class EventHubsTestEnvironment: TestEnvironment - { - /// The Event Hubs namespace to use for the test run. - public string EventHubsNamespaceConnectionString => GetVariable("EVENTHUB_NAMESPACE_CONNECTION_STRING"); - - /// The environment variable value for the storage account connection string, lazily evaluated. - public string StorageAccountConnectionString => GetVariable("EVENTHUB_PROCESSOR_STORAGE_CONNECTION_STRING"); - - public EventHubsTestEnvironment() : base("eventhub") - { - } - } -} \ No newline at end of file diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj index b6f512d5c34e6..ae28a33febc4a 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj @@ -6,8 +6,13 @@ + + + + + @@ -15,12 +20,16 @@ - + + + + + From a811707c630d59342a5d14ec926088fda861573d Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Wed, 18 Nov 2020 14:30:37 -0800 Subject: [PATCH 7/8] more --- .../src/Config/EventHubOptions.cs | 20 ++++------------- .../src/Processor/EventProcessorHost.cs | 6 ++--- .../tests/EventHubEndToEndTests.cs | 6 ++++- .../tests/EventHubListenerTests.cs | 1 - sdk/eventhub/test-resources.json | 22 +------------------ 5 files changed, 12 insertions(+), 43 deletions(-) diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs index ff390e84c1c10..e214e83a73dbc 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs @@ -6,6 +6,7 @@ using System.Collections.Generic; using System.Globalization; using System.Text; +using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Primitives; using Azure.Messaging.EventHubs.Producer; @@ -32,7 +33,8 @@ public class EventHubOptions : IOptionsFormatter /// Name of the blob container that the EventHostProcessor instances uses to coordinate load balancing listening on an event hub. /// Each event hub gets its own blob prefix within the container. /// - public const string LeaseContainerName = "azure-webjobs-eventhub"; + public string LeaseContainerName { get; set; } = "azure-webjobs-eventhub"; + private int _batchCheckpointFrequency = 1; public EventHubOptions() @@ -256,25 +258,11 @@ internal EventProcessorHost GetEventProcessorHost(IConfiguration config, string storageConnectionString = defaultStorageString; } - // If the connection string provides a hub name, that takes precedence. - // Note that connection strings *can't* specify a consumerGroup, so must always be passed in. - string actualPath = eventHubName; - EventHubsConnectionStringBuilder sb = new EventHubsConnectionStringBuilder(creds.EventHubConnectionString); - if (sb.EntityPath != null) - { - actualPath = sb.EntityPath; - sb.EntityPath = null; // need to remove to use with EventProcessorHost - } - - var @namespace = GetEventHubNamespace(sb); - var blobPrefix = GetBlobPrefix(actualPath, @namespace); - // Use blob prefix support available in EPH starting in 2.2.6 EventProcessorHost host = new EventProcessorHost( eventHubName: eventHubName, - eventHubPath: actualPath, consumerGroupName: consumerGroup, - eventHubConnectionString: sb.ToString(), + eventHubConnectionString: creds.EventHubConnectionString, storageConnectionString: storageConnectionString, leaseContainerName: LeaseContainerName, exceptionHandler: _exceptionHandler); diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs index 30e9682d51b52..fdca41bad5b12 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs @@ -17,7 +17,6 @@ namespace Microsoft.Azure.WebJobs.EventHubs.Processor internal class EventProcessorHost { public string EventHubName { get; } - public string EventHubPath { get; } public string ConsumerGroupName { get; } public string EventHubConnectionString { get; } public string StorageConnectionString { get; } @@ -25,10 +24,9 @@ internal class EventProcessorHost private Processor CurrentProcessor { get; set; } private Action ExceptionHandler { get; } - public EventProcessorHost(string eventHubName, string eventHubPath, string consumerGroupName, string eventHubConnectionString, string storageConnectionString, string leaseContainerName, Action exceptionHandler) + public EventProcessorHost(string eventHubName, string consumerGroupName, string eventHubConnectionString, string storageConnectionString, string leaseContainerName, Action exceptionHandler) { EventHubName = eventHubName; - EventHubPath = eventHubPath; ConsumerGroupName = consumerGroupName; EventHubConnectionString = eventHubConnectionString; StorageConnectionString = storageConnectionString; @@ -46,7 +44,7 @@ public async Task RegisterEventProcessorFactoryAsync(IEventProcessorFactory fact CurrentProcessor = new Processor(maxBatchSize, ConsumerGroupName, EventHubConnectionString, - EventHubPath, + EventHubName, options, factory, invokeProcessorAfterReceiveTimeout, diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs index ac42ec6dd15cc..3f009a40f5f0a 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs @@ -342,7 +342,11 @@ private Tuple BuildHost() .ConfigureServices(services => { // Speedup shutdown - services.Configure(options => options.EventProcessorOptions.MaximumWaitTime = TimeSpan.FromSeconds(5)); + services.Configure(options => + { + options.LeaseContainerName = _storageScope.ContainerName; + options.EventProcessorOptions.MaximumWaitTime = TimeSpan.FromSeconds(5); + }); }) .ConfigureDefaultTestHost(b => { diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs index 05eaa9fa91ebb..3cbcba40e4caa 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs @@ -195,7 +195,6 @@ public void GetMonitor_ReturnsExpectedValue() var testLogger = new TestLogger("Test"); var host = new EventProcessorHost( eventHubName, - null, consumerGroup, "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123=", "DefaultEndpointsProtocol=https;AccountName=EventHubScaleMonitorFakeTestAccount;AccountKey=ABCDEFG;EndpointSuffix=core.windows.net", null, null); diff --git a/sdk/eventhub/test-resources.json b/sdk/eventhub/test-resources.json index 304ff431181f0..439c2c27a9250 100755 --- a/sdk/eventhub/test-resources.json +++ b/sdk/eventhub/test-resources.json @@ -87,15 +87,6 @@ "tier": "Standard" } }, - { - "type": "Microsoft.EventHub/namespaces/eventhubs", - "apiVersion": "2017-04-01", - "name": "[concat(variables('eventHubsNamespace'), '/webjobstesthub')]", - "location": "[parameters('location')]", - "dependsOn": [ - "[resourceId('Microsoft.EventHub/namespaces', variables('eventHubsNamespace'))]" - ] - }, { "type": "Microsoft.Storage/storageAccounts", "apiVersion": "2019-04-01", @@ -184,18 +175,7 @@ "deleteRetentionPolicy": { "enabled": false } - }, - "resources": [ - { - "name": "azure-webjobs-eventhub", - "type": "containers", - "apiVersion": "2019-04-01", - "dependsOn": [ - "[resourceId('Microsoft.Storage/storageAccounts/blobServices', variables('storageAccount'), 'default')]" - ], - "properties": {} - } - ] + } }, { "type": "Microsoft.Authorization/roleAssignments", From 2e0ce90d6536c75a1ca01a44d5be04e6af6cb8ba Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Wed, 18 Nov 2020 15:29:40 -0800 Subject: [PATCH 8/8] Pull out auto console logging --- ...crosoft.Azure.WebJobs.Extensions.EventHubs.netstandard2.0.cs | 2 +- .../tests/EventHubEndToEndTests.cs | 1 - .../Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj | 1 - 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/api/Microsoft.Azure.WebJobs.Extensions.EventHubs.netstandard2.0.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/api/Microsoft.Azure.WebJobs.Extensions.EventHubs.netstandard2.0.cs index 20c77bebd9e70..89c6e265be79c 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/api/Microsoft.Azure.WebJobs.Extensions.EventHubs.netstandard2.0.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/api/Microsoft.Azure.WebJobs.Extensions.EventHubs.netstandard2.0.cs @@ -24,11 +24,11 @@ namespace Microsoft.Azure.WebJobs.EventHubs { public partial class EventHubOptions : Microsoft.Azure.WebJobs.Hosting.IOptionsFormatter { - public const string LeaseContainerName = "azure-webjobs-eventhub"; public EventHubOptions() { } public int BatchCheckpointFrequency { get { throw null; } set { } } public Azure.Messaging.EventHubs.Primitives.EventProcessorOptions EventProcessorOptions { get { throw null; } } public bool InvokeProcessorAfterReceiveTimeout { get { throw null; } set { } } + public string LeaseContainerName { get { throw null; } set { } } public int MaxBatchSize { get { throw null; } set { } } public void AddEventHubProducerClient(Azure.Messaging.EventHubs.Producer.EventHubProducerClient client) { } public void AddEventHubProducerClient(string eventHubName, Azure.Messaging.EventHubs.Producer.EventHubProducerClient client) { } diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs index 3f009a40f5f0a..a2df425094c55 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs @@ -361,7 +361,6 @@ private Tuple BuildHost() }) .ConfigureLogging(b => { - b.AddConsole(); b.SetMinimumLevel(LogLevel.Debug); }) .Build(); diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj index ae28a33febc4a..3148382ef0384 100644 --- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj +++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj @@ -11,7 +11,6 @@ -