diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventHubsTestEnvironment.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventHubsTestEnvironment.cs
index e961551815744..0700d25dbe6d8 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventHubsTestEnvironment.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Testing/EventHubsTestEnvironment.cs
@@ -137,7 +137,7 @@ public sealed class EventHubsTestEnvironment: TestEnvironment
/// Initializes a new instance of .
///
///
- private EventHubsTestEnvironment() : base("eventhub")
+ public EventHubsTestEnvironment() : base("eventhub")
{
ParsedConnectionString = new Lazy(() => EventHubsConnectionStringProperties.Parse(EventHubsConnectionString), LazyThreadSafetyMode.ExecutionAndPublication);
ActiveEventHubsNamespace = new Lazy(EnsureEventHubsNamespace, LazyThreadSafetyMode.ExecutionAndPublication);
@@ -167,25 +167,6 @@ private EventHubsTestEnvironment() : base("eventhub")
///
public string BuildConnectionStringForEventHub(string eventHubName) => $"{ EventHubsConnectionString };EntityPath={ eventHubName }";
- ///
- /// Builds a connection string for the Event Hubs namespace used for Live tests, creating a shared access signature
- /// in place of the shared key.
- ///
- ///
- /// The name of the Event Hub to base the connection string on.
- /// The audience to use for the shared access signature.
- /// The duration, in minutes, that the signature should be considered valid for.
- ///
- /// The namespace connection string with a shared access signature based on the shared key of the current scope.
- ///
- public string BuildConnectionStringWithSharedAccessSignature(string eventHubName,
- string signatureAudience,
- int validDurationMinutes = 30)
- {
- var signature = new SharedAccessSignature(signatureAudience, SharedAccessKeyName, SharedAccessKey, TimeSpan.FromMinutes(validDurationMinutes));
- return $"Endpoint=sb://{ ParsedConnectionString.Value.FullyQualifiedNamespace };EntityPath={ eventHubName };SharedAccessSignature={ signature.Value }";
- }
-
///
/// Ensures that an Event Hubs namespace is available for the test run, using one if provided by the
/// or creating a new Azure resource specific
diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Connection/EventHubConnectionLiveTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Connection/EventHubConnectionLiveTests.cs
index d36f56756b3e2..cf764a957143a 100644
--- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Connection/EventHubConnectionLiveTests.cs
+++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Connection/EventHubConnectionLiveTests.cs
@@ -81,7 +81,9 @@ public async Task ConnectionCanConnectToEventHubsUsingSharedAccessSignatureConne
{
var options = new EventHubConnectionOptions();
var audience = EventHubConnection.BuildConnectionAudience(options.TransportType, EventHubsTestEnvironment.Instance.FullyQualifiedNamespace, scope.EventHubName);
- var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringWithSharedAccessSignature(scope.EventHubName, audience);
+ EventHubsTestEnvironment tempQualifier = EventHubsTestEnvironment.Instance;
+ var signature = new SharedAccessSignature(audience, tempQualifier.SharedAccessKeyName, tempQualifier.SharedAccessKey, TimeSpan.FromMinutes(30));
+ var connectionString = $"Endpoint=sb://{tempQualifier.FullyQualifiedNamespace };EntityPath={ scope.EventHubName };SharedAccessSignature={ signature.Value }";
await using (var connection = new TestConnectionWithTransport(connectionString, options))
{
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/Microsoft.Azure.WebJobs.Extensions.EventHubs.sln b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/Microsoft.Azure.WebJobs.Extensions.EventHubs.sln
index a2c5395c24617..49671484fc38a 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/Microsoft.Azure.WebJobs.Extensions.EventHubs.sln
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/Microsoft.Azure.WebJobs.Extensions.EventHubs.sln
@@ -3,10 +3,14 @@ Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26124.0
MinimumVisualStudioVersion = 15.0.26124.0
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Azure.WebJobs.Extensions.EventHubs", "src\Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj", "{CFBC2DEF-E738-4380-A8DE-236433E15CF6}"
-EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests", "tests\Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj", "{91E0D968-2082-4959-A294-6F1B790ECECF}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Azure.Core.TestFramework", "..\..\core\Azure.Core.TestFramework\src\Azure.Core.TestFramework.csproj", "{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Azure.WebJobs.Extensions.EventHubs", "src\Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj", "{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Azure.Messaging.EventHubs", "..\Azure.Messaging.EventHubs\src\Azure.Messaging.EventHubs.csproj", "{B51ECD35-11DA-46D2-89D7-9DE3888CF896}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -20,18 +24,6 @@ Global
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
- {CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Debug|x64.ActiveCfg = Debug|Any CPU
- {CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Debug|x64.Build.0 = Debug|Any CPU
- {CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Debug|x86.ActiveCfg = Debug|Any CPU
- {CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Debug|x86.Build.0 = Debug|Any CPU
- {CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Release|Any CPU.Build.0 = Release|Any CPU
- {CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Release|x64.ActiveCfg = Release|Any CPU
- {CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Release|x64.Build.0 = Release|Any CPU
- {CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Release|x86.ActiveCfg = Release|Any CPU
- {CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Release|x86.Build.0 = Release|Any CPU
{91E0D968-2082-4959-A294-6F1B790ECECF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{91E0D968-2082-4959-A294-6F1B790ECECF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{91E0D968-2082-4959-A294-6F1B790ECECF}.Debug|x64.ActiveCfg = Debug|Any CPU
@@ -44,5 +36,41 @@ Global
{91E0D968-2082-4959-A294-6F1B790ECECF}.Release|x64.Build.0 = Release|Any CPU
{91E0D968-2082-4959-A294-6F1B790ECECF}.Release|x86.ActiveCfg = Release|Any CPU
{91E0D968-2082-4959-A294-6F1B790ECECF}.Release|x86.Build.0 = Release|Any CPU
+ {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|x64.Build.0 = Debug|Any CPU
+ {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|x86.Build.0 = Debug|Any CPU
+ {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|Any CPU.Build.0 = Release|Any CPU
+ {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|x64.ActiveCfg = Release|Any CPU
+ {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|x64.Build.0 = Release|Any CPU
+ {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|x86.ActiveCfg = Release|Any CPU
+ {DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|x86.Build.0 = Release|Any CPU
+ {B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Debug|x64.Build.0 = Debug|Any CPU
+ {B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Debug|x86.Build.0 = Debug|Any CPU
+ {B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Release|Any CPU.Build.0 = Release|Any CPU
+ {B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Release|x64.ActiveCfg = Release|Any CPU
+ {B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Release|x64.Build.0 = Release|Any CPU
+ {B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Release|x86.ActiveCfg = Release|Any CPU
+ {B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Release|x86.Build.0 = Release|Any CPU
+ {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Debug|x64.Build.0 = Debug|Any CPU
+ {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Debug|x86.Build.0 = Debug|Any CPU
+ {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|Any CPU.Build.0 = Release|Any CPU
+ {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|x64.ActiveCfg = Release|Any CPU
+ {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|x64.Build.0 = Release|Any CPU
+ {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|x86.ActiveCfg = Release|Any CPU
+ {B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
EndGlobal
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/api/Microsoft.Azure.WebJobs.Extensions.EventHubs.netstandard2.0.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/api/Microsoft.Azure.WebJobs.Extensions.EventHubs.netstandard2.0.cs
index 20c77bebd9e70..89c6e265be79c 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/api/Microsoft.Azure.WebJobs.Extensions.EventHubs.netstandard2.0.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/api/Microsoft.Azure.WebJobs.Extensions.EventHubs.netstandard2.0.cs
@@ -24,11 +24,11 @@ namespace Microsoft.Azure.WebJobs.EventHubs
{
public partial class EventHubOptions : Microsoft.Azure.WebJobs.Hosting.IOptionsFormatter
{
- public const string LeaseContainerName = "azure-webjobs-eventhub";
public EventHubOptions() { }
public int BatchCheckpointFrequency { get { throw null; } set { } }
public Azure.Messaging.EventHubs.Primitives.EventProcessorOptions EventProcessorOptions { get { throw null; } }
public bool InvokeProcessorAfterReceiveTimeout { get { throw null; } set { } }
+ public string LeaseContainerName { get { throw null; } set { } }
public int MaxBatchSize { get { throw null; } set { } }
public void AddEventHubProducerClient(Azure.Messaging.EventHubs.Producer.EventHubProducerClient client) { }
public void AddEventHubProducerClient(string eventHubName, Azure.Messaging.EventHubs.Producer.EventHubProducerClient client) { }
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs
index ebd98b7f29167..ed4ce2a6ba4e9 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubExtensionConfigProvider.cs
@@ -113,7 +113,7 @@ private static LogLevel GetLogLevel(Exception ex)
private IAsyncCollector BuildFromAttribute(EventHubAttribute attribute)
{
EventHubProducerClient client = _options.Value.GetEventHubProducerClient(attribute.EventHubName, attribute.Connection);
- return new EventHubAsyncCollector(new EventHubProducerClientImpl(client));
+ return new EventHubAsyncCollector(new EventHubProducerClientImpl(client, _loggerFactory));
}
private static string ConvertEventDataToString(EventData x)
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs
index c55e9e26a16ce..e214e83a73dbc 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Config/EventHubOptions.cs
@@ -6,6 +6,7 @@
using System.Collections.Generic;
using System.Globalization;
using System.Text;
+using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Primitives;
using Azure.Messaging.EventHubs.Producer;
@@ -32,7 +33,8 @@ public class EventHubOptions : IOptionsFormatter
/// Name of the blob container that the EventHostProcessor instances uses to coordinate load balancing listening on an event hub.
/// Each event hub gets its own blob prefix within the container.
///
- public const string LeaseContainerName = "azure-webjobs-eventhub";
+ public string LeaseContainerName { get; set; } = "azure-webjobs-eventhub";
+
private int _batchCheckpointFrequency = 1;
public EventHubOptions()
@@ -256,27 +258,13 @@ internal EventProcessorHost GetEventProcessorHost(IConfiguration config, string
storageConnectionString = defaultStorageString;
}
- // If the connection string provides a hub name, that takes precedence.
- // Note that connection strings *can't* specify a consumerGroup, so must always be passed in.
- string actualPath = eventHubName;
- EventHubsConnectionStringBuilder sb = new EventHubsConnectionStringBuilder(creds.EventHubConnectionString);
- if (sb.EntityPath != null)
- {
- actualPath = sb.EntityPath;
- sb.EntityPath = null; // need to remove to use with EventProcessorHost
- }
-
- var @namespace = GetEventHubNamespace(sb);
- var blobPrefix = GetBlobPrefix(actualPath, @namespace);
-
// Use blob prefix support available in EPH starting in 2.2.6
EventProcessorHost host = new EventProcessorHost(
- eventHubPath: actualPath,
+ eventHubName: eventHubName,
consumerGroupName: consumerGroup,
- eventHubConnectionString: sb.ToString(),
+ eventHubConnectionString: creds.EventHubConnectionString,
storageConnectionString: storageConnectionString,
leaseContainerName: LeaseContainerName,
- legacyCheckpointStorageBlobPrefix: blobPrefix,
exceptionHandler: _exceptionHandler);
return host;
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/EventHubProducerClientImpl.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/EventHubProducerClientImpl.cs
index a7a08943e1fff..f1f54628bb5a5 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/EventHubProducerClientImpl.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/EventHubProducerClientImpl.cs
@@ -4,6 +4,8 @@
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs.Producer;
+using Microsoft.Azure.WebJobs.Logging;
+using Microsoft.Extensions.Logging;
namespace Microsoft.Azure.WebJobs
{
@@ -11,10 +13,12 @@ namespace Microsoft.Azure.WebJobs
internal class EventHubProducerClientImpl : IEventHubProducerClient
{
private readonly EventHubProducerClient _client;
+ private readonly ILogger _logger;
- public EventHubProducerClientImpl(EventHubProducerClient client)
+ public EventHubProducerClientImpl(EventHubProducerClient client, ILoggerFactory loggerFactory)
{
_client = client;
+ _logger = loggerFactory?.CreateLogger(LogCategories.Executor);
}
public async Task CreateBatchAsync(CancellationToken cancellationToken)
@@ -24,7 +28,9 @@ public async Task CreateBatchAsync(CancellationToken cancellati
public async Task SendAsync(IEventDataBatch batch, CancellationToken cancellationToken)
{
- await _client.SendAsync(((EventDataBatchImpl) batch).Batch, cancellationToken).ConfigureAwait(false);
+ _logger?.LogDebug("Sending events to EventHub");
+ var eventDataBatch = ((EventDataBatchImpl) batch).Batch;
+ await _client.SendAsync(eventDataBatch, cancellationToken).ConfigureAwait(false);
}
}
}
\ No newline at end of file
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
index 06b485471cbd1..526ed8d0e9b91 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Listeners/EventHubListener.cs
@@ -10,7 +10,6 @@
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
-using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
using Microsoft.Azure.WebJobs.EventHubs.Listeners;
@@ -26,11 +25,6 @@ namespace Microsoft.Azure.WebJobs.EventHubs
internal sealed class EventHubListener : IListener, IEventProcessorFactory, IScaleMonitorProvider
{
private static readonly Dictionary EmptyScope = new Dictionary();
- private readonly string _functionId;
- private readonly string _eventHubName;
- private readonly string _consumerGroup;
- private readonly string _connectionString;
- private readonly string _storageConnectionString;
private readonly ITriggeredFunctionExecutor _executor;
private readonly EventProcessorHost _eventProcessorHost;
private readonly bool _singleDispatch;
@@ -42,10 +36,6 @@ internal sealed class EventHubListener : IListener, IEventProcessorFactory, ISca
public EventHubListener(
string functionId,
- string eventHubName,
- string consumerGroup,
- string connectionString,
- string storageConnectionString,
ITriggeredFunctionExecutor executor,
EventProcessorHost eventProcessorHost,
bool singleDispatch,
@@ -53,17 +43,19 @@ public EventHubListener(
ILogger logger,
BlobContainerClient blobContainer = null)
{
- _functionId = functionId;
- _eventHubName = eventHubName;
- _consumerGroup = consumerGroup;
- _connectionString = connectionString;
- _storageConnectionString = storageConnectionString;
_executor = executor;
_eventProcessorHost = eventProcessorHost;
_singleDispatch = singleDispatch;
_options = options;
_logger = logger;
- _scaleMonitor = new Lazy(() => new EventHubsScaleMonitor(_functionId, _eventHubName, _consumerGroup, _connectionString, _storageConnectionString, _logger, blobContainer));
+ _scaleMonitor = new Lazy(() => new EventHubsScaleMonitor(
+ functionId,
+ eventProcessorHost.EventHubName,
+ eventProcessorHost.ConsumerGroupName,
+ eventProcessorHost.EventHubConnectionString,
+ eventProcessorHost.StorageConnectionString,
+ _logger,
+ blobContainer));
}
void IListener.Cancel()
@@ -216,6 +208,7 @@ public async Task ProcessEventsAsync(ProcessorPartitionContext context, IEnumera
// code, and capture/log/persist failed events, since they won't be retried.
if (messages.Any())
{
+ context.CheckpointEvent = messages.Last();
await CheckpointAsync(context).ConfigureAwait(false);
}
}
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj
index a09729b209eb3..c021d4adac8e4 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj
@@ -4,14 +4,24 @@
netstandard2.0
Microsoft Azure WebJobs SDK EventHubs Extension
5.0.0-beta.1
- $(NoWarn);AZC0001;CS1591
+ $(NoWarn);AZC0001;CS1591;SA1636
sign.snk
-
+
+
+
+
+
+
+
+
+
+
+
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs
index ef98095a1d858..fdca41bad5b12 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Processor/EventProcessorHost.cs
@@ -4,42 +4,33 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
-using System.Globalization;
-using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
-using System.Text.Json;
-using Azure;
using Azure.Messaging.EventHubs;
-using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Primitives;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
-using Azure.Storage.Blobs.Models;
-using System.Text.Json.Serialization;
namespace Microsoft.Azure.WebJobs.EventHubs.Processor
{
internal class EventProcessorHost
{
- private string EventHubPath { get; }
- private string ConsumerGroupName { get; }
- private string EventHubConnectionString { get; }
- private string StorageConnectionString { get; }
+ public string EventHubName { get; }
+ public string ConsumerGroupName { get; }
+ public string EventHubConnectionString { get; }
+ public string StorageConnectionString { get; }
private string LeaseContainerName { get; }
- private string LegacyCheckpointStorageBlobPrefix { get; }
private Processor CurrentProcessor { get; set; }
private Action ExceptionHandler { get; }
- public EventProcessorHost(string eventHubPath, string consumerGroupName, string eventHubConnectionString, string storageConnectionString, string leaseContainerName, string legacyCheckpointStorageBlobPrefix, Action exceptionHandler)
+ public EventProcessorHost(string eventHubName, string consumerGroupName, string eventHubConnectionString, string storageConnectionString, string leaseContainerName, Action exceptionHandler)
{
- EventHubPath = eventHubPath;
+ EventHubName = eventHubName;
ConsumerGroupName = consumerGroupName;
EventHubConnectionString = eventHubConnectionString;
StorageConnectionString = storageConnectionString;
LeaseContainerName = leaseContainerName;
- LegacyCheckpointStorageBlobPrefix = legacyCheckpointStorageBlobPrefix;
ExceptionHandler = exceptionHandler;
}
@@ -50,7 +41,15 @@ public async Task RegisterEventProcessorFactoryAsync(IEventProcessorFactory fact
throw new InvalidOperationException("Processor has already been started");
}
- CurrentProcessor = new Processor(maxBatchSize, ConsumerGroupName, EventHubConnectionString, LegacyCheckpointStorageBlobPrefix, EventHubPath, options, factory, invokeProcessorAfterReceiveTimeout, ExceptionHandler, new BlobContainerClient(StorageConnectionString, LeaseContainerName));
+ CurrentProcessor = new Processor(maxBatchSize,
+ ConsumerGroupName,
+ EventHubConnectionString,
+ EventHubName,
+ options,
+ factory,
+ invokeProcessorAfterReceiveTimeout,
+ ExceptionHandler,
+ new BlobContainerClient(StorageConnectionString, LeaseContainerName));
await CurrentProcessor.StartProcessingAsync().ConfigureAwait(false);
}
@@ -78,208 +77,41 @@ internal class Processor : EventProcessor
private Action ExceptionHandler { get; }
private ConcurrentDictionary LeaseInfos { get; }
- private BlobContainerClient ContainerClient { get; }
+ private BlobsCheckpointStore CheckpointStore { get; }
- // These constants match the constant used by the V5 EventHubs SDK.
- private const string OwnershipPrefixFormat = "{0}/{1}/{2}/ownership/";
- private const string CheckpointPrefixFormat = "{0}/{1}/{2}/checkpoint/";
- private const string OwnerIdentifierMetadataKey = "ownerid";
- private const string OffsetMetadataKey = "offset";
- private const string SequenceNumberMetadataKey = "sequencenumber";
-
- // In addition to being stored inside the lease blob, the V4 SDK also writes ownership information
- // as metadata on the blob, using this key.
- private const string OwningHostMedataKey = "OWNINGHOST";
-
- private string LegacyCheckpointStorageBlobPrefix { get; }
-
- public Processor(int eventBatchMaximumCount, string consumerGroup, string connectionString, string legacyCheckpointStorageBlobPrefix, string eventHubName, EventProcessorOptions options, IEventProcessorFactory processorFactory, bool invokeProcessorAfterRecieveTimeout, Action exceptionHandler, BlobContainerClient containerClient) : base(eventBatchMaximumCount, consumerGroup, connectionString, eventHubName, options)
+ public Processor(int eventBatchMaximumCount, string consumerGroup, string connectionString, string eventHubName, EventProcessorOptions options, IEventProcessorFactory processorFactory, bool invokeProcessorAfterRecieveTimeout, Action exceptionHandler, BlobContainerClient containerClient) : base(eventBatchMaximumCount, consumerGroup, connectionString, eventHubName, options)
{
ProcessorFactory = processorFactory;
InvokeProcessorAfterRecieveTimeout = invokeProcessorAfterRecieveTimeout;
ExceptionHandler = exceptionHandler;
LeaseInfos = new ConcurrentDictionary();
- ContainerClient = containerClient;
- LegacyCheckpointStorageBlobPrefix = legacyCheckpointStorageBlobPrefix;
+ CheckpointStore = new BlobsCheckpointStore(containerClient, RetryPolicy);
}
protected override async Task> ClaimOwnershipAsync(IEnumerable desiredOwnership, CancellationToken cancellationToken)
{
- List claimedOwnerships = new List();
-
- foreach (EventProcessorPartitionOwnership ownership in desiredOwnership)
- {
- Dictionary ownershipMetadata = new Dictionary()
- {
- { OwnerIdentifierMetadataKey, ownership.OwnerIdentifier },
- };
-
- // Construct the path to the blob and get a blob client for it so we can interact with it.
- string ownershipBlob = string.Format(CultureInfo.InvariantCulture, OwnershipPrefixFormat + ownership.PartitionId, ownership.FullyQualifiedNamespace.ToLowerInvariant(), ownership.EventHubName.ToLowerInvariant(), ownership.ConsumerGroup.ToLowerInvariant());
- BlobClient ownershipBlobClient = ContainerClient.GetBlobClient(ownershipBlob);
-
- try
- {
- if (ownership.Version == null)
- {
- // In this case, we are trying to claim ownership of a partition which was previously unowned, and hence did not have an ownership file. To ensure only a single host grabs the partition,
- // we use a conditional request so that we only create our blob in the case where it does not yet exist.
- BlobRequestConditions requestConditions = new BlobRequestConditions() { IfNoneMatch = new ETag("*") };
-
- using MemoryStream emptyStream = new MemoryStream(Array.Empty());
- BlobContentInfo info = await ownershipBlobClient.UploadAsync(emptyStream, metadata: ownershipMetadata, conditions: requestConditions, cancellationToken: cancellationToken).ConfigureAwait(false);
-
- claimedOwnerships.Add(new EventProcessorPartitionOwnership()
- {
- ConsumerGroup = ownership.ConsumerGroup,
- EventHubName = ownership.EventHubName,
- FullyQualifiedNamespace = ownership.FullyQualifiedNamespace,
- LastModifiedTime = info.LastModified,
- OwnerIdentifier = ownership.OwnerIdentifier,
- PartitionId = ownership.PartitionId,
- Version = info.ETag.ToString(),
- });
- }
- else
- {
- // In this case, the partition is owned by some other host. The ownership file already exists, so we just need to change metadata on it. But we should only do this if the metadata has not
- // changed between when we listed ownership and when we are trying to claim ownership, i.e. the ETag for the file has not changed.
- BlobRequestConditions requestConditions = new BlobRequestConditions() { IfMatch = new ETag(ownership.Version) };
- BlobInfo info = await ownershipBlobClient.SetMetadataAsync(ownershipMetadata, requestConditions, cancellationToken).ConfigureAwait(false);
-
- claimedOwnerships.Add(new EventProcessorPartitionOwnership()
- {
- ConsumerGroup = ownership.ConsumerGroup,
- EventHubName = ownership.EventHubName,
- FullyQualifiedNamespace = ownership.FullyQualifiedNamespace,
- LastModifiedTime = info.LastModified,
- OwnerIdentifier = ownership.OwnerIdentifier,
- PartitionId = ownership.PartitionId,
- Version = info.ETag.ToString(),
- });
- }
- }
- catch (RequestFailedException e) when (e.ErrorCode == BlobErrorCode.BlobAlreadyExists || e.ErrorCode == BlobErrorCode.ConditionNotMet)
- {
- // In this case, another host has claimed the partition before we did. That's safe to ignore. We'll still try to claim other partitions.
- }
- }
-
- return claimedOwnerships;
+ return await CheckpointStore.ClaimOwnershipAsync(desiredOwnership, cancellationToken).ConfigureAwait(false);
}
protected override async Task> ListCheckpointsAsync(CancellationToken cancellationToken)
{
- // First, we read information from the location that the EventHubs V5 SDK writes to.
- Dictionary checkpoints = new Dictionary();
- string checkpointBlobsPrefix = string.Format(CultureInfo.InvariantCulture, CheckpointPrefixFormat, FullyQualifiedNamespace.ToLowerInvariant(), EventHubName.ToLowerInvariant(), ConsumerGroup.ToLowerInvariant());
-
- await foreach (BlobItem item in ContainerClient.GetBlobsAsync(traits: BlobTraits.Metadata, prefix: checkpointBlobsPrefix, cancellationToken: cancellationToken).ConfigureAwait(false))
- {
- if (long.TryParse(item.Metadata[OffsetMetadataKey], NumberStyles.Integer, CultureInfo.InvariantCulture, out long offset) &&
- long.TryParse(item.Metadata[SequenceNumberMetadataKey], NumberStyles.Integer, CultureInfo.InvariantCulture, out long sequenceNumber))
- {
- string partitionId = item.Name.Substring(checkpointBlobsPrefix.Length);
-
- LeaseInfos.TryAdd(partitionId, new LeaseInfo(offset, sequenceNumber));
- checkpoints.Add(partitionId, new EventProcessorCheckpoint()
- {
- ConsumerGroup = ConsumerGroup,
- EventHubName = EventHubName,
- FullyQualifiedNamespace = FullyQualifiedNamespace,
- PartitionId = partitionId,
- StartingPosition = EventPosition.FromOffset(offset, isInclusive: false)
- });
- }
- }
-
- // Check to see if there are any additional checkpoints in the older location that the V4 SDK would write to. If so, use them (this is helpful when moving from the V4 to V5 SDK,
- // since it means we will not have to reprocess messages processed and checkpointed by the older SDK).
- string legacyCheckpointAndOwnershipPrefix = $"{LegacyCheckpointStorageBlobPrefix}{ConsumerGroup}/";
- await foreach (BlobItem item in ContainerClient.GetBlobsAsync(prefix: legacyCheckpointAndOwnershipPrefix, cancellationToken: cancellationToken).ConfigureAwait(false))
- {
- string partitionId = item.Name.Substring(legacyCheckpointAndOwnershipPrefix.Length);
- if (!checkpoints.ContainsKey(partitionId))
- {
- using MemoryStream checkpointStream = new MemoryStream();
- await ContainerClient.GetBlobClient(item.Name).DownloadToAsync(checkpointStream, cancellationToken: cancellationToken).ConfigureAwait(false);
- checkpointStream.Position = 0;
- BlobPartitionLease lease = await JsonSerializer.DeserializeAsync(checkpointStream, cancellationToken: cancellationToken).ConfigureAwait(false);
-
- if (long.TryParse(lease.Offset, out long offset))
- {
- LeaseInfos.TryAdd(partitionId, new LeaseInfo(offset, lease.SequenceNumber ?? 0));
- checkpoints.Add(partitionId, new EventProcessorCheckpoint()
- {
- ConsumerGroup = ConsumerGroup,
- EventHubName = EventHubName,
- FullyQualifiedNamespace = FullyQualifiedNamespace,
- PartitionId = partitionId,
- StartingPosition = EventPosition.FromOffset(offset, isInclusive: false)
- });
- }
- }
- }
-
- return checkpoints.Values;
+ return await CheckpointStore.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, cancellationToken).ConfigureAwait(false);
}
protected override async Task> ListOwnershipAsync(CancellationToken cancellationToken)
{
- List partitonOwnerships = new List();
- string ownershipBlobsPefix = string.Format(CultureInfo.InvariantCulture, OwnershipPrefixFormat, FullyQualifiedNamespace.ToLowerInvariant(), EventHubName.ToLowerInvariant(), ConsumerGroup.ToLowerInvariant());
-
- await foreach (BlobItem blob in ContainerClient.GetBlobsAsync(traits: BlobTraits.Metadata, prefix: ownershipBlobsPefix, cancellationToken: cancellationToken).ConfigureAwait(false))
- {
- partitonOwnerships.Add(new EventProcessorPartitionOwnership()
- {
- ConsumerGroup = ConsumerGroup,
- EventHubName = EventHubName,
- FullyQualifiedNamespace = FullyQualifiedNamespace,
- LastModifiedTime = blob.Properties.LastModified.GetValueOrDefault(),
- OwnerIdentifier = blob.Metadata[OwnerIdentifierMetadataKey],
- PartitionId = blob.Name.Substring(ownershipBlobsPefix.Length),
- Version = blob.Properties.ETag.Value.ToString(),
- });
- }
-
- return partitonOwnerships;
+ return await CheckpointStore.ListOwnershipAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, cancellationToken).ConfigureAwait(false);
}
internal virtual async Task CheckpointAsync(string partitionId, EventData checkpointEvent, CancellationToken cancellationToken = default)
{
- string checkpointBlob = string.Format(CultureInfo.InvariantCulture, CheckpointPrefixFormat + partitionId, FullyQualifiedNamespace.ToLowerInvariant(), EventHubName.ToLowerInvariant(), ConsumerGroup.ToLowerInvariant());
- Dictionary checkpointMetadata = new Dictionary()
- {
- { OffsetMetadataKey, checkpointEvent.Offset.ToString(CultureInfo.InvariantCulture) },
- { SequenceNumberMetadataKey, checkpointEvent.SequenceNumber.ToString(CultureInfo.InvariantCulture) }
- };
-
- using MemoryStream emptyStream = new MemoryStream(Array.Empty());
- await ContainerClient.GetBlobClient(checkpointBlob).UploadAsync(emptyStream, metadata: checkpointMetadata, cancellationToken: cancellationToken).ConfigureAwait(false);
-
- LeaseInfos[partitionId] = new LeaseInfo(checkpointEvent.Offset, checkpointEvent.SequenceNumber);
-
- // In addition to writing a checkpoint in the V5 format, we also write one in the older V4 format, as some processes (e.g. the scale controller) expect
- // checkpoints in the older format. This also makes it possible to move to an earlier version of the SDK without having to re-process events.
- BlobPartitionLease lease = new BlobPartitionLease()
+ await CheckpointStore.UpdateCheckpointAsync(new EventProcessorCheckpoint()
{
PartitionId = partitionId,
- Owner = Identifier,
- Offset = checkpointEvent.Offset.ToString(CultureInfo.InvariantCulture),
- SequenceNumber = checkpointEvent.SequenceNumber,
- };
-
- using MemoryStream legacyCheckpointStream = new MemoryStream();
- await JsonSerializer.SerializeAsync(legacyCheckpointStream, lease, cancellationToken: cancellationToken).ConfigureAwait(false);
-
- string legacyCheckpointBlob = $"{LegacyCheckpointStorageBlobPrefix}{ConsumerGroup}/{partitionId}";
- Dictionary legacyCheckpoitMetadata = new Dictionary()
- {
- { OwningHostMedataKey, Identifier }
- };
-
- await ContainerClient.GetBlobClient(checkpointBlob).UploadAsync(legacyCheckpointStream, metadata: legacyCheckpoitMetadata, cancellationToken: cancellationToken).ConfigureAwait(false);
+ ConsumerGroup = ConsumerGroup,
+ EventHubName = EventHubName,
+ FullyQualifiedNamespace = FullyQualifiedNamespace
+ }, checkpointEvent, cancellationToken).ConfigureAwait(false);
}
internal virtual LeaseInfo GetLeaseInfo(string partitionId)
@@ -335,17 +167,6 @@ protected override Task OnPartitionProcessingStoppedAsync(Partition partition, P
{
return partition.Processor.CloseAsync(partition.Context, reason);
}
-
- // The V4 SDK stored lease and checkpoint information in a single JSON file with these properties
- private class BlobPartitionLease
- {
- public string PartitionId { get; set; }
- public string Owner { get; set; }
- public string Token { get; set; }
- public long? Epoch { get; set; }
- public string Offset { get; set; }
- public long? SequenceNumber { get; set; }
- }
}
}
}
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs
index 66c2ceda6398a..0bb9e19c2995a 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerAttributeBindingProvider.cs
@@ -69,17 +69,11 @@ public Task TryCreateAsync(TriggerBindingProviderContext contex
var eventHostListener = _options.Value.GetEventProcessorHost(_config, resolvedEventHubName, resolvedConsumerGroup);
- string storageConnectionString = _config.GetWebJobsConnectionString(ConnectionStringNames.Storage);
-
Func> createListener =
(factoryContext, singleDispatch) =>
{
IListener listener = new EventHubListener(
factoryContext.Descriptor.Id,
- resolvedEventHubName,
- resolvedConsumerGroup,
- connectionString,
- storageConnectionString,
factoryContext.Executor,
eventHostListener,
singleDispatch,
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerBindingStrategy.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerBindingStrategy.cs
index c32242f8e397e..7513851fbf1bf 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerBindingStrategy.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/src/Triggers/EventHubTriggerBindingStrategy.cs
@@ -98,7 +98,7 @@ internal static void AddBindingData(Dictionary bindingData, Even
var partitionKeys = new string[length];
var offsets = new string[length];
var sequenceNumbers = new long[length];
- var enqueuedTimesUtc = new DateTimeOffset[length];
+ var enqueuedTimesUtc = new DateTime[length];
var properties = new IDictionary[length];
var systemProperties = new IDictionary[length];
@@ -114,7 +114,7 @@ internal static void AddBindingData(Dictionary bindingData, Even
partitionKeys[i] = events[i].PartitionKey;
offsets[i] = events[i].Offset.ToString(CultureInfo.InvariantCulture);
sequenceNumbers[i] = events[i].SequenceNumber;
- enqueuedTimesUtc[i] = events[i].EnqueuedTime;
+ enqueuedTimesUtc[i] = events[i].EnqueuedTime.DateTime;
properties[i] = events[i].Properties;
systemProperties[i] = GetSystemPropertiesForBinding(events[i]);
}
@@ -125,7 +125,7 @@ private static void AddBindingData(Dictionary bindingData, Event
SafeAddValue(() => bindingData.Add("PartitionKey", eventData.PartitionKey));
SafeAddValue(() => bindingData.Add("Offset", eventData.Offset));
SafeAddValue(() => bindingData.Add("SequenceNumber", eventData.SequenceNumber));
- SafeAddValue(() => bindingData.Add("EnqueuedTimeUtc", eventData.EnqueuedTime));
+ SafeAddValue(() => bindingData.Add("EnqueuedTimeUtc", eventData.EnqueuedTime.DateTime));
SafeAddValue(() => bindingData.Add("Properties", eventData.Properties));
SafeAddValue(() => bindingData.Add("SystemProperties", GetSystemPropertiesForBinding(eventData)));
}
@@ -155,7 +155,7 @@ private static IDictionary GetSystemPropertiesForBinding(EventDa
modifiedDictionary["SequenceNumber"] = eventData.SequenceNumber;
modifiedDictionary["Offset"] = eventData.Offset;
modifiedDictionary["PartitionKey"] = eventData.PartitionKey;
- modifiedDictionary["EnqueuedTimeUtc"] = eventData.EnqueuedTime;
+ modifiedDictionary["EnqueuedTimeUtc"] = eventData.EnqueuedTime.DateTime;
return modifiedDictionary;
}
}
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
index f2dc0a5ffb7da..a2df425094c55 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubEndToEndTests.cs
@@ -8,34 +8,68 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
+using Azure.Core.TestFramework;
using Azure.Messaging.EventHubs;
+using Azure.Messaging.EventHubs.Processor.Tests;
using Azure.Messaging.EventHubs.Producer;
+using Azure.Messaging.EventHubs.Tests;
+using Microsoft.Azure.WebJobs.EventHubs;
using Microsoft.Azure.WebJobs.Host.TestCommon;
using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using NUnit.Framework;
namespace Microsoft.Azure.WebJobs.Host.EndToEndTests
{
- [Category("Live")]
+ [NonParallelizable]
+ [LiveOnly]
public class EventHubEndToEndTests
{
- private const string TestHubName = "webjobstesthub";
+ private const string TestHubName = "%webjobstesthub%";
private const int Timeout = 30000;
private static EventWaitHandle _eventWait;
private static string _testId;
private static List _results;
- public EventHubEndToEndTests()
+ /// The active Event Hub resource scope for the test fixture.
+ private EventHubScope _eventHubScope;
+
+ /// The active Blob storage resource scope for the test fixture.
+ private StorageScope _storageScope;
+
+ ///
+ /// Performs the tasks needed to initialize the test fixture. This
+ /// method runs once for the entire fixture, prior to running any tests.
+ ///
+ ///
+ [SetUp]
+ public async Task FixtureSetUp()
{
_results = new List();
_testId = Guid.NewGuid().ToString();
_eventWait = new ManualResetEvent(initialState: false);
+ _eventHubScope = await EventHubScope.CreateAsync(2);
+ _storageScope = await StorageScope.CreateAsync();
+ }
+
+ ///
+ /// Performs the tasks needed to cleanup the test fixture after all
+ /// tests have run. This method runs once for the entire fixture.
+ ///
+ ///
+ [TearDown]
+ public async Task FixtureTearDown()
+ {
+ await Task.WhenAll
+ (
+ _eventHubScope.DisposeAsync().AsTask(),
+ _storageScope.DisposeAsync().AsTask()
+ );
}
[Test]
- [Ignore("Failing test. Tracked by #16715")]
public async Task EventHub_PocoBinding()
{
var tuple = BuildHost();
@@ -46,15 +80,14 @@ public async Task EventHub_PocoBinding()
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
+ }
- var logs = tuple.Item2.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage);
+ var logs = tuple.Item2.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage);
- CollectionAssert.Contains($"PocoValues(foo,{_testId})", logs);
- }
+ CollectionAssert.Contains(logs, $"PocoValues(foo,{_testId})");
}
[Test]
- [Ignore("Failing test. Tracked by #16715")]
public async Task EventHub_StringBinding()
{
var tuple = BuildHost();
@@ -68,12 +101,11 @@ public async Task EventHub_StringBinding()
var logs = tuple.Item2.GetTestLoggerProvider().GetAllLogMessages().Select(p => p.FormattedMessage);
- CollectionAssert.Contains($"Input({_testId})", logs);
+ CollectionAssert.Contains(logs, $"Input({_testId})");
}
}
[Test]
- [Ignore("Failing test. Tracked by #16715")]
public async Task EventHub_SingleDispatch()
{
Tuple tuple = BuildHost();
@@ -84,30 +116,26 @@ public async Task EventHub_SingleDispatch()
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
+ }
- // Wait for checkpointing
- await Task.Delay(3000);
-
- IEnumerable logMessages = tuple.Item2.GetTestLoggerProvider()
- .GetAllLogMessages();
+ IEnumerable logMessages = tuple.Item2.GetTestLoggerProvider()
+ .GetAllLogMessages();
- Assert.AreEqual(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
- && x.FormattedMessage.Contains("Trigger Details:")
- && x.FormattedMessage.Contains("Offset:")).Count(), 1);
+ Assert.AreEqual(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
+ && x.FormattedMessage.Contains("Trigger Details:")
+ && x.FormattedMessage.Contains("Offset:")).Count(), 1);
- Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
- && x.FormattedMessage.Contains("OpenAsync")).Count() > 0);
+ Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
+ && x.FormattedMessage.Contains("OpenAsync")).Count() > 0);
- Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
- && x.FormattedMessage.Contains("CheckpointAsync")).Count() > 0);
+ Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
+ && x.FormattedMessage.Contains("CheckpointAsync")).Count() > 0);
- Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
- && x.FormattedMessage.Contains("Sending events to EventHub")).Count() > 0);
- }
+ Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
+ && x.FormattedMessage.Contains("Sending events to EventHub")).Count() > 0);
}
[Test]
- [Ignore("Failing test. Tracked by #16715")]
public async Task EventHub_MultipleDispatch()
{
Tuple tuple = BuildHost();
@@ -121,30 +149,26 @@ public async Task EventHub_MultipleDispatch()
bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
+ }
- // Wait for checkpointing
- await Task.Delay(3000);
-
- IEnumerable logMessages = tuple.Item2.GetTestLoggerProvider()
- .GetAllLogMessages();
+ IEnumerable logMessages = tuple.Item2.GetTestLoggerProvider()
+ .GetAllLogMessages();
- Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
- && x.FormattedMessage.Contains("Trigger Details:")
- && x.FormattedMessage.Contains("Offset:")).Count() > 0);
+ Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
+ && x.FormattedMessage.Contains("Trigger Details:")
+ && x.FormattedMessage.Contains("Offset:")).Count() > 0);
- Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
- && x.FormattedMessage.Contains("OpenAsync")).Count() > 0);
+ Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
+ && x.FormattedMessage.Contains("OpenAsync")).Count() > 0);
- Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
- && x.FormattedMessage.Contains("CheckpointAsync")).Count() > 0);
+ Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
+ && x.FormattedMessage.Contains("CheckpointAsync")).Count() > 0);
- Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
- && x.FormattedMessage.Contains("Sending events to EventHub")).Count() > 0);
- }
+ Assert.True(logMessages.Where(x => !string.IsNullOrEmpty(x.FormattedMessage)
+ && x.FormattedMessage.Contains("Sending events to EventHub")).Count() > 0);
}
[Test]
- [Ignore("Failing test. Tracked by #16715")]
public async Task EventHub_PartitionKey()
{
Tuple tuple = BuildHost();
@@ -304,26 +328,35 @@ public static void ProcessMultiplePartitionEvents([EventHubTrigger(TestHubName)]
private Tuple BuildHost()
{
- JobHost jobHost = null;
-
- var config = new ConfigurationBuilder()
- .AddEnvironmentVariables()
- .AddTestSettings()
- .Build();
-
- const string connectionName = "AzureWebJobsTestHubConnection";
- string connection = config.GetConnectionStringOrSetting(connectionName);
- Assert.True(!string.IsNullOrEmpty(connection), $"Required test connection string '{connectionName}' is missing.");
-
+ var eventHubName = _eventHubScope.EventHubName;
+ JobHost jobHost;
IHost host = new HostBuilder()
+ .ConfigureAppConfiguration(builder =>
+ {
+ builder.AddInMemoryCollection(new Dictionary()
+ {
+ { "webjobstesthub", eventHubName },
+ { "AzureWebJobsStorage", StorageTestEnvironment.Instance.StorageConnectionString }
+ });
+ })
+ .ConfigureServices(services =>
+ {
+ // Speedup shutdown
+ services.Configure(options =>
+ {
+ options.LeaseContainerName = _storageScope.ContainerName;
+ options.EventProcessorOptions.MaximumWaitTime = TimeSpan.FromSeconds(5);
+ });
+ })
.ConfigureDefaultTestHost(b =>
{
b.AddEventHubs(options =>
{
// TODO: alternative?
//options.EventProcessorOptions.EnableReceiverRuntimeMetric = true;
- options.AddSender(TestHubName, connection);
- options.AddReceiver(TestHubName, connection);
+ var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(eventHubName);
+ options.AddSender(eventHubName, connectionString);
+ options.AddReceiver(eventHubName, connectionString);
});
})
.ConfigureLogging(b =>
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs
index 053849c675d23..3cbcba40e4caa 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubListenerTests.cs
@@ -12,6 +12,7 @@
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
using Microsoft.Azure.WebJobs.EventHubs.Listeners;
+using Microsoft.Azure.WebJobs.EventHubs.Processor;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Azure.WebJobs.Host.TestCommon;
@@ -191,16 +192,16 @@ public void GetMonitor_ReturnsExpectedValue()
var functionId = "FunctionId";
var eventHubName = "EventHubName";
var consumerGroup = "ConsumerGroup";
- var storageUri = new Uri("https://eventhubsteststorageaccount.blob.core.windows.net/");
var testLogger = new TestLogger("Test");
+ var host = new EventProcessorHost(
+ eventHubName,
+ consumerGroup,
+ "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123=",
+ "DefaultEndpointsProtocol=https;AccountName=EventHubScaleMonitorFakeTestAccount;AccountKey=ABCDEFG;EndpointSuffix=core.windows.net", null, null);
var listener = new EventHubListener(
functionId,
- eventHubName,
- consumerGroup,
- "Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123=",
- "DefaultEndpointsProtocol=https;AccountName=EventHubScaleMonitorFakeTestAccount;AccountKey=ABCDEFG;EndpointSuffix=core.windows.net",
new Mock(MockBehavior.Strict).Object,
- null,
+ host,
false,
new EventHubOptions(),
testLogger,
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs
index 96b20e710392c..bb226ca9e8eb5 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/EventHubTests.cs
@@ -84,7 +84,7 @@ public void GetBindingData_SingleDispatch_ReturnsExpectedValue()
Assert.AreEqual(evt.PartitionKey, bindingData["PartitionKey"]);
Assert.AreEqual(evt.Offset, bindingData["Offset"]);
Assert.AreEqual(evt.SequenceNumber, bindingData["SequenceNumber"]);
- Assert.AreEqual(evt.EnqueuedTime, bindingData["EnqueuedTimeUtc"]);
+ Assert.AreEqual(evt.EnqueuedTime.DateTime, bindingData["EnqueuedTimeUtc"]);
Assert.AreSame(evt.Properties, bindingData["Properties"]);
IDictionary bindingDataSysProps = bindingData["SystemProperties"] as Dictionary;
Assert.NotNull(bindingDataSysProps);
@@ -132,7 +132,7 @@ public void GetBindingData_MultipleDispatch_ReturnsExpectedValue()
Assert.AreEqual(events.Length, ((string[])bindingData["PartitionKeyArray"]).Length);
Assert.AreEqual(events.Length, ((string[])bindingData["OffsetArray"]).Length);
Assert.AreEqual(events.Length, ((long[])bindingData["SequenceNumberArray"]).Length);
- Assert.AreEqual(events.Length, ((DateTimeOffset[])bindingData["EnqueuedTimeUtcArray"]).Length);
+ Assert.AreEqual(events.Length, ((DateTime[])bindingData["EnqueuedTimeUtcArray"]).Length);
Assert.AreEqual(events.Length, ((IDictionary[])bindingData["PropertiesArray"]).Length);
Assert.AreEqual(events.Length, ((IDictionary[])bindingData["SystemPropertiesArray"]).Length);
@@ -252,7 +252,6 @@ internal static ProcessorPartitionContext GetPartitionContext(string partitionId
var processor = new EventProcessorHost.Processor(Int32.MaxValue,
consumerGroupName,
"Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abc123=",
- "",
eventHubPath,
new EventProcessorOptions(),
null,
diff --git a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj
index 69e889b6e5eb8..e5bae2dd8aebf 100644
--- a/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj
+++ b/sdk/eventhub/Microsoft.Azure.WebJobs.Extensions.EventHubs/tests/Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj
@@ -6,7 +6,12 @@
+
+
+
+
+
@@ -14,11 +19,16 @@
-
+
+
+
+
+
+
diff --git a/sdk/eventhub/test-resources.json b/sdk/eventhub/test-resources.json
index 75620cc74761b..439c2c27a9250 100755
--- a/sdk/eventhub/test-resources.json
+++ b/sdk/eventhub/test-resources.json
@@ -180,7 +180,7 @@
{
"type": "Microsoft.Authorization/roleAssignments",
"apiVersion": "2019-04-01-preview",
- "name": "[guid(resourceGroup().id, deployment().name, parameters('baseName'), variables('eventHubsDataOwnerRoleId'))]",
+ "name": "[guid(resourceGroup().id, deployment().name, parameters('baseName'), variables('eventHubsDataOwnerRoleId'), parameters('testApplicationOid'))]",
"properties": {
"roleDefinitionId": "[resourceId('Microsoft.Authorization/roleDefinitions', variables('eventHubsDataOwnerRoleId'))]",
"principalId": "[parameters('testApplicationOid')]",
@@ -190,7 +190,7 @@
{
"type": "Microsoft.Authorization/roleAssignments",
"apiVersion": "2019-04-01-preview",
- "name": "[guid(resourceGroup().id, deployment().name, parameters('baseName'), variables('contributorRoleId'))]",
+ "name": "[guid(resourceGroup().id, deployment().name, parameters('baseName'), variables('contributorRoleId'), parameters('testApplicationOid'))]",
"properties": {
"roleDefinitionId": "[resourceId('Microsoft.Authorization/roleDefinitions', variables('contributorRoleId'))]",
"principalId": "[parameters('testApplicationOid')]",