Skip to content

Commit

Permalink
Enable EventHubs WebJobs extension live tests (#17029)
Browse files Browse the repository at this point in the history
Goals:
1. Get E2E live tests running so we can start iterating while having some level of validation
  • Loading branch information
pakrym authored Nov 24, 2020
1 parent d1e4e46 commit 7f69b5c
Show file tree
Hide file tree
Showing 17 changed files with 222 additions and 356 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public sealed class EventHubsTestEnvironment: TestEnvironment
/// Initializes a new instance of <see cref="EventHubsTestEnvironment"/>.
/// </summary>
///
private EventHubsTestEnvironment() : base("eventhub")
public EventHubsTestEnvironment() : base("eventhub")
{
ParsedConnectionString = new Lazy<EventHubsConnectionStringProperties>(() => EventHubsConnectionStringProperties.Parse(EventHubsConnectionString), LazyThreadSafetyMode.ExecutionAndPublication);
ActiveEventHubsNamespace = new Lazy<NamespaceProperties>(EnsureEventHubsNamespace, LazyThreadSafetyMode.ExecutionAndPublication);
Expand Down Expand Up @@ -167,25 +167,6 @@ private EventHubsTestEnvironment() : base("eventhub")
///
public string BuildConnectionStringForEventHub(string eventHubName) => $"{ EventHubsConnectionString };EntityPath={ eventHubName }";

/// <summary>
/// Builds a connection string for the Event Hubs namespace used for Live tests, creating a shared access signature
/// in place of the shared key.
/// </summary>
///
/// <param name="eventHubName">The name of the Event Hub to base the connection string on.</param>
/// <param name="signatureAudience">The audience to use for the shared access signature.</param>
/// <param name="validDurationMinutes">The duration, in minutes, that the signature should be considered valid for.</param>
///
/// <returns>The namespace connection string with a shared access signature based on the shared key of the current scope.</value>
///
public string BuildConnectionStringWithSharedAccessSignature(string eventHubName,
string signatureAudience,
int validDurationMinutes = 30)
{
var signature = new SharedAccessSignature(signatureAudience, SharedAccessKeyName, SharedAccessKey, TimeSpan.FromMinutes(validDurationMinutes));
return $"Endpoint=sb://{ ParsedConnectionString.Value.FullyQualifiedNamespace };EntityPath={ eventHubName };SharedAccessSignature={ signature.Value }";
}

/// <summary>
/// Ensures that an Event Hubs namespace is available for the test run, using one if provided by the
/// <see cref="EventHubsNamespaceConnectionStringEnvironmentVariable" /> or creating a new Azure resource specific
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ public async Task ConnectionCanConnectToEventHubsUsingSharedAccessSignatureConne
{
var options = new EventHubConnectionOptions();
var audience = EventHubConnection.BuildConnectionAudience(options.TransportType, EventHubsTestEnvironment.Instance.FullyQualifiedNamespace, scope.EventHubName);
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringWithSharedAccessSignature(scope.EventHubName, audience);
EventHubsTestEnvironment tempQualifier = EventHubsTestEnvironment.Instance;
var signature = new SharedAccessSignature(audience, tempQualifier.SharedAccessKeyName, tempQualifier.SharedAccessKey, TimeSpan.FromMinutes(30));
var connectionString = $"Endpoint=sb://{tempQualifier.FullyQualifiedNamespace };EntityPath={ scope.EventHubName };SharedAccessSignature={ signature.Value }";

await using (var connection = new TestConnectionWithTransport(connectionString, options))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26124.0
MinimumVisualStudioVersion = 15.0.26124.0
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Azure.WebJobs.Extensions.EventHubs", "src\Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj", "{CFBC2DEF-E738-4380-A8DE-236433E15CF6}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests", "tests\Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj", "{91E0D968-2082-4959-A294-6F1B790ECECF}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Azure.Core.TestFramework", "..\..\core\Azure.Core.TestFramework\src\Azure.Core.TestFramework.csproj", "{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Azure.WebJobs.Extensions.EventHubs", "src\Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj", "{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Azure.Messaging.EventHubs", "..\Azure.Messaging.EventHubs\src\Azure.Messaging.EventHubs.csproj", "{B51ECD35-11DA-46D2-89D7-9DE3888CF896}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -20,18 +24,6 @@ Global
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Debug|x64.ActiveCfg = Debug|Any CPU
{CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Debug|x64.Build.0 = Debug|Any CPU
{CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Debug|x86.ActiveCfg = Debug|Any CPU
{CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Debug|x86.Build.0 = Debug|Any CPU
{CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Release|Any CPU.Build.0 = Release|Any CPU
{CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Release|x64.ActiveCfg = Release|Any CPU
{CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Release|x64.Build.0 = Release|Any CPU
{CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Release|x86.ActiveCfg = Release|Any CPU
{CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Release|x86.Build.0 = Release|Any CPU
{91E0D968-2082-4959-A294-6F1B790ECECF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{91E0D968-2082-4959-A294-6F1B790ECECF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{91E0D968-2082-4959-A294-6F1B790ECECF}.Debug|x64.ActiveCfg = Debug|Any CPU
Expand All @@ -44,5 +36,41 @@ Global
{91E0D968-2082-4959-A294-6F1B790ECECF}.Release|x64.Build.0 = Release|Any CPU
{91E0D968-2082-4959-A294-6F1B790ECECF}.Release|x86.ActiveCfg = Release|Any CPU
{91E0D968-2082-4959-A294-6F1B790ECECF}.Release|x86.Build.0 = Release|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|x64.ActiveCfg = Debug|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|x64.Build.0 = Debug|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|x86.ActiveCfg = Debug|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|x86.Build.0 = Debug|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|Any CPU.Build.0 = Release|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|x64.ActiveCfg = Release|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|x64.Build.0 = Release|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|x86.ActiveCfg = Release|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|x86.Build.0 = Release|Any CPU
{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Debug|x64.ActiveCfg = Debug|Any CPU
{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Debug|x64.Build.0 = Debug|Any CPU
{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Debug|x86.ActiveCfg = Debug|Any CPU
{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Debug|x86.Build.0 = Debug|Any CPU
{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Release|Any CPU.Build.0 = Release|Any CPU
{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Release|x64.ActiveCfg = Release|Any CPU
{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Release|x64.Build.0 = Release|Any CPU
{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Release|x86.ActiveCfg = Release|Any CPU
{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Release|x86.Build.0 = Release|Any CPU
{B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Debug|x64.ActiveCfg = Debug|Any CPU
{B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Debug|x64.Build.0 = Debug|Any CPU
{B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Debug|x86.ActiveCfg = Debug|Any CPU
{B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Debug|x86.Build.0 = Debug|Any CPU
{B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|Any CPU.Build.0 = Release|Any CPU
{B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|x64.ActiveCfg = Release|Any CPU
{B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|x64.Build.0 = Release|Any CPU
{B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|x86.ActiveCfg = Release|Any CPU
{B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ namespace Microsoft.Azure.WebJobs.EventHubs
{
public partial class EventHubOptions : Microsoft.Azure.WebJobs.Hosting.IOptionsFormatter
{
public const string LeaseContainerName = "azure-webjobs-eventhub";
public EventHubOptions() { }
public int BatchCheckpointFrequency { get { throw null; } set { } }
public Azure.Messaging.EventHubs.Primitives.EventProcessorOptions EventProcessorOptions { get { throw null; } }
public bool InvokeProcessorAfterReceiveTimeout { get { throw null; } set { } }
public string LeaseContainerName { get { throw null; } set { } }
public int MaxBatchSize { get { throw null; } set { } }
public void AddEventHubProducerClient(Azure.Messaging.EventHubs.Producer.EventHubProducerClient client) { }
public void AddEventHubProducerClient(string eventHubName, Azure.Messaging.EventHubs.Producer.EventHubProducerClient client) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private static LogLevel GetLogLevel(Exception ex)
private IAsyncCollector<EventData> BuildFromAttribute(EventHubAttribute attribute)
{
EventHubProducerClient client = _options.Value.GetEventHubProducerClient(attribute.EventHubName, attribute.Connection);
return new EventHubAsyncCollector(new EventHubProducerClientImpl(client));
return new EventHubAsyncCollector(new EventHubProducerClientImpl(client, _loggerFactory));
}

private static string ConvertEventDataToString(EventData x)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Collections.Generic;
using System.Globalization;
using System.Text;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Primitives;
using Azure.Messaging.EventHubs.Producer;
Expand All @@ -32,7 +33,8 @@ public class EventHubOptions : IOptionsFormatter
/// Name of the blob container that the EventHostProcessor instances uses to coordinate load balancing listening on an event hub.
/// Each event hub gets its own blob prefix within the container.
/// </summary>
public const string LeaseContainerName = "azure-webjobs-eventhub";
public string LeaseContainerName { get; set; } = "azure-webjobs-eventhub";

private int _batchCheckpointFrequency = 1;

public EventHubOptions()
Expand Down Expand Up @@ -256,27 +258,13 @@ internal EventProcessorHost GetEventProcessorHost(IConfiguration config, string
storageConnectionString = defaultStorageString;
}

// If the connection string provides a hub name, that takes precedence.
// Note that connection strings *can't* specify a consumerGroup, so must always be passed in.
string actualPath = eventHubName;
EventHubsConnectionStringBuilder sb = new EventHubsConnectionStringBuilder(creds.EventHubConnectionString);
if (sb.EntityPath != null)
{
actualPath = sb.EntityPath;
sb.EntityPath = null; // need to remove to use with EventProcessorHost
}

var @namespace = GetEventHubNamespace(sb);
var blobPrefix = GetBlobPrefix(actualPath, @namespace);

// Use blob prefix support available in EPH starting in 2.2.6
EventProcessorHost host = new EventProcessorHost(
eventHubPath: actualPath,
eventHubName: eventHubName,
consumerGroupName: consumerGroup,
eventHubConnectionString: sb.ToString(),
eventHubConnectionString: creds.EventHubConnectionString,
storageConnectionString: storageConnectionString,
leaseContainerName: LeaseContainerName,
legacyCheckpointStorageBlobPrefix: blobPrefix,
exceptionHandler: _exceptionHandler);

return host;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs.Producer;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs
{
/// TODO: Remove when https://github.com/Azure/azure-sdk-for-net/issues/9117 is fixed
internal class EventHubProducerClientImpl : IEventHubProducerClient
{
private readonly EventHubProducerClient _client;
private readonly ILogger _logger;

public EventHubProducerClientImpl(EventHubProducerClient client)
public EventHubProducerClientImpl(EventHubProducerClient client, ILoggerFactory loggerFactory)
{
_client = client;
_logger = loggerFactory?.CreateLogger(LogCategories.Executor);
}

public async Task<IEventDataBatch> CreateBatchAsync(CancellationToken cancellationToken)
Expand All @@ -24,7 +28,9 @@ public async Task<IEventDataBatch> CreateBatchAsync(CancellationToken cancellati

public async Task SendAsync(IEventDataBatch batch, CancellationToken cancellationToken)
{
await _client.SendAsync(((EventDataBatchImpl) batch).Batch, cancellationToken).ConfigureAwait(false);
_logger?.LogDebug("Sending events to EventHub");
var eventDataBatch = ((EventDataBatchImpl) batch).Batch;
await _client.SendAsync(eventDataBatch, cancellationToken).ConfigureAwait(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
using Microsoft.Azure.WebJobs.EventHubs.Listeners;
Expand All @@ -26,11 +25,6 @@ namespace Microsoft.Azure.WebJobs.EventHubs
internal sealed class EventHubListener : IListener, IEventProcessorFactory, IScaleMonitorProvider
{
private static readonly Dictionary<string, object> EmptyScope = new Dictionary<string, object>();
private readonly string _functionId;
private readonly string _eventHubName;
private readonly string _consumerGroup;
private readonly string _connectionString;
private readonly string _storageConnectionString;
private readonly ITriggeredFunctionExecutor _executor;
private readonly EventProcessorHost _eventProcessorHost;
private readonly bool _singleDispatch;
Expand All @@ -42,28 +36,26 @@ internal sealed class EventHubListener : IListener, IEventProcessorFactory, ISca

public EventHubListener(
string functionId,
string eventHubName,
string consumerGroup,
string connectionString,
string storageConnectionString,
ITriggeredFunctionExecutor executor,
EventProcessorHost eventProcessorHost,
bool singleDispatch,
EventHubOptions options,
ILogger logger,
BlobContainerClient blobContainer = null)
{
_functionId = functionId;
_eventHubName = eventHubName;
_consumerGroup = consumerGroup;
_connectionString = connectionString;
_storageConnectionString = storageConnectionString;
_executor = executor;
_eventProcessorHost = eventProcessorHost;
_singleDispatch = singleDispatch;
_options = options;
_logger = logger;
_scaleMonitor = new Lazy<EventHubsScaleMonitor>(() => new EventHubsScaleMonitor(_functionId, _eventHubName, _consumerGroup, _connectionString, _storageConnectionString, _logger, blobContainer));
_scaleMonitor = new Lazy<EventHubsScaleMonitor>(() => new EventHubsScaleMonitor(
functionId,
eventProcessorHost.EventHubName,
eventProcessorHost.ConsumerGroupName,
eventProcessorHost.EventHubConnectionString,
eventProcessorHost.StorageConnectionString,
_logger,
blobContainer));
}

void IListener.Cancel()
Expand Down Expand Up @@ -216,6 +208,7 @@ public async Task ProcessEventsAsync(ProcessorPartitionContext context, IEnumera
// code, and capture/log/persist failed events, since they won't be retried.
if (messages.Any())
{
context.CheckpointEvent = messages.Last();
await CheckpointAsync(context).ConfigureAwait(false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,24 @@
<TargetFramework>netstandard2.0</TargetFramework>
<Description>Microsoft Azure WebJobs SDK EventHubs Extension</Description>
<Version>5.0.0-beta.1</Version>
<NoWarn>$(NoWarn);AZC0001;CS1591</NoWarn>
<NoWarn>$(NoWarn);AZC0001;CS1591;SA1636</NoWarn>
<AssemblyOriginatorKeyFile>sign.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Messaging.EventHubs" />
<PackageReference Include="Azure.Storage.Blobs" />
<PackageReference Include="Microsoft.Azure.WebJobs" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="../../Azure.Messaging.EventHubs/src/Azure.Messaging.EventHubs.csproj" />
</ItemGroup>

<ItemGroup>
<Compile Include="$(AzureCoreSharedSources)Argument.cs" />
<Compile Include="../../Azure.Messaging.EventHubs.Shared/src/Processor/StorageManager.cs"/>
<Compile Include="../../Azure.Messaging.EventHubs.Shared/src/Core/CancellationTokenExtensions.cs"/>
<Compile Include="../../Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/*.cs"/>
</ItemGroup>

</Project>
Loading

0 comments on commit 7f69b5c

Please sign in to comment.