Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable EventHubs WebJobs extension live tests #17029

Merged
11 commits merged into from
Nov 24, 2020
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public sealed class EventHubsTestEnvironment: TestEnvironment
/// Initializes a new instance of <see cref="EventHubsTestEnvironment"/>.
/// </summary>
///
private EventHubsTestEnvironment() : base("eventhub")
public EventHubsTestEnvironment() : base("eventhub")
{
ParsedConnectionString = new Lazy<EventHubsConnectionStringProperties>(() => EventHubsConnectionStringProperties.Parse(EventHubsConnectionString), LazyThreadSafetyMode.ExecutionAndPublication);
ActiveEventHubsNamespace = new Lazy<NamespaceProperties>(EnsureEventHubsNamespace, LazyThreadSafetyMode.ExecutionAndPublication);
Expand Down Expand Up @@ -167,25 +167,6 @@ private EventHubsTestEnvironment() : base("eventhub")
///
public string BuildConnectionStringForEventHub(string eventHubName) => $"{ EventHubsConnectionString };EntityPath={ eventHubName }";

/// <summary>
/// Builds a connection string for the Event Hubs namespace used for Live tests, creating a shared access signature
/// in place of the shared key.
/// </summary>
///
/// <param name="eventHubName">The name of the Event Hub to base the connection string on.</param>
/// <param name="signatureAudience">The audience to use for the shared access signature.</param>
/// <param name="validDurationMinutes">The duration, in minutes, that the signature should be considered valid for.</param>
///
/// <returns>The namespace connection string with a shared access signature based on the shared key of the current scope.</value>
///
public string BuildConnectionStringWithSharedAccessSignature(string eventHubName,
string signatureAudience,
int validDurationMinutes = 30)
{
var signature = new SharedAccessSignature(signatureAudience, SharedAccessKeyName, SharedAccessKey, TimeSpan.FromMinutes(validDurationMinutes));
return $"Endpoint=sb://{ ParsedConnectionString.Value.FullyQualifiedNamespace };EntityPath={ eventHubName };SharedAccessSignature={ signature.Value }";
}

/// <summary>
/// Ensures that an Event Hubs namespace is available for the test run, using one if provided by the
/// <see cref="EventHubsNamespaceConnectionStringEnvironmentVariable" /> or creating a new Azure resource specific
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ public async Task ConnectionCanConnectToEventHubsUsingSharedAccessSignatureConne
{
var options = new EventHubConnectionOptions();
var audience = EventHubConnection.BuildConnectionAudience(options.TransportType, EventHubsTestEnvironment.Instance.FullyQualifiedNamespace, scope.EventHubName);
var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringWithSharedAccessSignature(scope.EventHubName, audience);
EventHubsTestEnvironment tempQualifier = EventHubsTestEnvironment.Instance;
var signature = new SharedAccessSignature(audience, tempQualifier.SharedAccessKeyName, tempQualifier.SharedAccessKey, TimeSpan.FromMinutes(30));
var connectionString = $"Endpoint=sb://{tempQualifier.FullyQualifiedNamespace };EntityPath={ scope.EventHubName };SharedAccessSignature={ signature.Value }";

await using (var connection = new TestConnectionWithTransport(connectionString, options))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26124.0
MinimumVisualStudioVersion = 15.0.26124.0
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Azure.WebJobs.Extensions.EventHubs", "src\Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj", "{CFBC2DEF-E738-4380-A8DE-236433E15CF6}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests", "tests\Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj", "{91E0D968-2082-4959-A294-6F1B790ECECF}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Azure.Core.TestFramework", "..\..\core\Azure.Core.TestFramework\src\Azure.Core.TestFramework.csproj", "{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Azure.WebJobs.Extensions.EventHubs", "src\Microsoft.Azure.WebJobs.Extensions.EventHubs.csproj", "{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Azure.Messaging.EventHubs", "..\Azure.Messaging.EventHubs\src\Azure.Messaging.EventHubs.csproj", "{B51ECD35-11DA-46D2-89D7-9DE3888CF896}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -20,18 +24,6 @@ Global
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Debug|x64.ActiveCfg = Debug|Any CPU
{CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Debug|x64.Build.0 = Debug|Any CPU
{CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Debug|x86.ActiveCfg = Debug|Any CPU
{CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Debug|x86.Build.0 = Debug|Any CPU
{CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Release|Any CPU.Build.0 = Release|Any CPU
{CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Release|x64.ActiveCfg = Release|Any CPU
{CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Release|x64.Build.0 = Release|Any CPU
{CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Release|x86.ActiveCfg = Release|Any CPU
{CFBC2DEF-E738-4380-A8DE-236433E15CF6}.Release|x86.Build.0 = Release|Any CPU
{91E0D968-2082-4959-A294-6F1B790ECECF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{91E0D968-2082-4959-A294-6F1B790ECECF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{91E0D968-2082-4959-A294-6F1B790ECECF}.Debug|x64.ActiveCfg = Debug|Any CPU
Expand All @@ -44,5 +36,41 @@ Global
{91E0D968-2082-4959-A294-6F1B790ECECF}.Release|x64.Build.0 = Release|Any CPU
{91E0D968-2082-4959-A294-6F1B790ECECF}.Release|x86.ActiveCfg = Release|Any CPU
{91E0D968-2082-4959-A294-6F1B790ECECF}.Release|x86.Build.0 = Release|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|x64.ActiveCfg = Debug|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|x64.Build.0 = Debug|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|x86.ActiveCfg = Debug|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|x86.Build.0 = Debug|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|Any CPU.Build.0 = Release|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|x64.ActiveCfg = Release|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|x64.Build.0 = Release|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|x86.ActiveCfg = Release|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|x86.Build.0 = Release|Any CPU
{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Debug|x64.ActiveCfg = Debug|Any CPU
{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Debug|x64.Build.0 = Debug|Any CPU
{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Debug|x86.ActiveCfg = Debug|Any CPU
{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Debug|x86.Build.0 = Debug|Any CPU
{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Release|Any CPU.Build.0 = Release|Any CPU
{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Release|x64.ActiveCfg = Release|Any CPU
{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Release|x64.Build.0 = Release|Any CPU
{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Release|x86.ActiveCfg = Release|Any CPU
{B5B06EAD-54CA-4F69-959C-1AEEEE19B832}.Release|x86.Build.0 = Release|Any CPU
{B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Debug|x64.ActiveCfg = Debug|Any CPU
{B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Debug|x64.Build.0 = Debug|Any CPU
{B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Debug|x86.ActiveCfg = Debug|Any CPU
{B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Debug|x86.Build.0 = Debug|Any CPU
{B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|Any CPU.Build.0 = Release|Any CPU
{B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|x64.ActiveCfg = Release|Any CPU
{B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|x64.Build.0 = Release|Any CPU
{B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|x86.ActiveCfg = Release|Any CPU
{B51ECD35-11DA-46D2-89D7-9DE3888CF896}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ namespace Microsoft.Azure.WebJobs.EventHubs
{
public partial class EventHubOptions : Microsoft.Azure.WebJobs.Hosting.IOptionsFormatter
{
public const string LeaseContainerName = "azure-webjobs-eventhub";
public EventHubOptions() { }
public int BatchCheckpointFrequency { get { throw null; } set { } }
public Azure.Messaging.EventHubs.Primitives.EventProcessorOptions EventProcessorOptions { get { throw null; } }
public bool InvokeProcessorAfterReceiveTimeout { get { throw null; } set { } }
public string LeaseContainerName { get { throw null; } set { } }
public int MaxBatchSize { get { throw null; } set { } }
public void AddEventHubProducerClient(Azure.Messaging.EventHubs.Producer.EventHubProducerClient client) { }
public void AddEventHubProducerClient(string eventHubName, Azure.Messaging.EventHubs.Producer.EventHubProducerClient client) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private static LogLevel GetLogLevel(Exception ex)
private IAsyncCollector<EventData> BuildFromAttribute(EventHubAttribute attribute)
{
EventHubProducerClient client = _options.Value.GetEventHubProducerClient(attribute.EventHubName, attribute.Connection);
return new EventHubAsyncCollector(new EventHubProducerClientImpl(client));
return new EventHubAsyncCollector(new EventHubProducerClientImpl(client, _loggerFactory));
}

private static string ConvertEventDataToString(EventData x)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Collections.Generic;
using System.Globalization;
using System.Text;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Primitives;
using Azure.Messaging.EventHubs.Producer;
Expand All @@ -32,7 +33,8 @@ public class EventHubOptions : IOptionsFormatter
/// Name of the blob container that the EventHostProcessor instances uses to coordinate load balancing listening on an event hub.
/// Each event hub gets its own blob prefix within the container.
/// </summary>
public const string LeaseContainerName = "azure-webjobs-eventhub";
public string LeaseContainerName { get; set; } = "azure-webjobs-eventhub";

private int _batchCheckpointFrequency = 1;

public EventHubOptions()
Expand Down Expand Up @@ -256,27 +258,13 @@ internal EventProcessorHost GetEventProcessorHost(IConfiguration config, string
storageConnectionString = defaultStorageString;
}

// If the connection string provides a hub name, that takes precedence.
// Note that connection strings *can't* specify a consumerGroup, so must always be passed in.
string actualPath = eventHubName;
EventHubsConnectionStringBuilder sb = new EventHubsConnectionStringBuilder(creds.EventHubConnectionString);
if (sb.EntityPath != null)
{
actualPath = sb.EntityPath;
sb.EntityPath = null; // need to remove to use with EventProcessorHost
}

var @namespace = GetEventHubNamespace(sb);
var blobPrefix = GetBlobPrefix(actualPath, @namespace);

// Use blob prefix support available in EPH starting in 2.2.6
EventProcessorHost host = new EventProcessorHost(
eventHubPath: actualPath,
eventHubName: eventHubName,
pakrym marked this conversation as resolved.
Show resolved Hide resolved
consumerGroupName: consumerGroup,
eventHubConnectionString: sb.ToString(),
eventHubConnectionString: creds.EventHubConnectionString,
storageConnectionString: storageConnectionString,
leaseContainerName: LeaseContainerName,
legacyCheckpointStorageBlobPrefix: blobPrefix,
exceptionHandler: _exceptionHandler);

return host;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs.Producer;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs
{
/// TODO: Remove when https://github.com/Azure/azure-sdk-for-net/issues/9117 is fixed
internal class EventHubProducerClientImpl : IEventHubProducerClient
{
private readonly EventHubProducerClient _client;
private readonly ILogger _logger;

public EventHubProducerClientImpl(EventHubProducerClient client)
public EventHubProducerClientImpl(EventHubProducerClient client, ILoggerFactory loggerFactory)
{
_client = client;
_logger = loggerFactory?.CreateLogger(LogCategories.Executor);
}

public async Task<IEventDataBatch> CreateBatchAsync(CancellationToken cancellationToken)
Expand All @@ -24,7 +28,9 @@ public async Task<IEventDataBatch> CreateBatchAsync(CancellationToken cancellati

public async Task SendAsync(IEventDataBatch batch, CancellationToken cancellationToken)
{
await _client.SendAsync(((EventDataBatchImpl) batch).Batch, cancellationToken).ConfigureAwait(false);
_logger?.LogDebug("Sending events to EventHub");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some tests rely on this message.

var eventDataBatch = ((EventDataBatchImpl) batch).Batch;
await _client.SendAsync(eventDataBatch, cancellationToken).ConfigureAwait(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
using Microsoft.Azure.WebJobs.EventHubs.Listeners;
Expand All @@ -26,11 +25,6 @@ namespace Microsoft.Azure.WebJobs.EventHubs
internal sealed class EventHubListener : IListener, IEventProcessorFactory, IScaleMonitorProvider
{
private static readonly Dictionary<string, object> EmptyScope = new Dictionary<string, object>();
private readonly string _functionId;
private readonly string _eventHubName;
private readonly string _consumerGroup;
private readonly string _connectionString;
private readonly string _storageConnectionString;
private readonly ITriggeredFunctionExecutor _executor;
private readonly EventProcessorHost _eventProcessorHost;
private readonly bool _singleDispatch;
Expand All @@ -42,28 +36,26 @@ internal sealed class EventHubListener : IListener, IEventProcessorFactory, ISca

public EventHubListener(
string functionId,
string eventHubName,
string consumerGroup,
string connectionString,
string storageConnectionString,
ITriggeredFunctionExecutor executor,
EventProcessorHost eventProcessorHost,
bool singleDispatch,
EventHubOptions options,
ILogger logger,
BlobContainerClient blobContainer = null)
{
_functionId = functionId;
_eventHubName = eventHubName;
_consumerGroup = consumerGroup;
_connectionString = connectionString;
_storageConnectionString = storageConnectionString;
_executor = executor;
_eventProcessorHost = eventProcessorHost;
_singleDispatch = singleDispatch;
_options = options;
_logger = logger;
_scaleMonitor = new Lazy<EventHubsScaleMonitor>(() => new EventHubsScaleMonitor(_functionId, _eventHubName, _consumerGroup, _connectionString, _storageConnectionString, _logger, blobContainer));
_scaleMonitor = new Lazy<EventHubsScaleMonitor>(() => new EventHubsScaleMonitor(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be ThreadSafetyMode.PublicationOnly or is it expensive to potentially throw one out in a race?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a benefit to it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand it, a minor one. Instead of grabbing a primitive to ensure that only one thread can create the value, it will allow multiple threads to race for it and keep just the first via atomic swap.

functionId,
eventProcessorHost.EventHubName,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not extremely happy with the data flow here but I started with removing the duplication first.

eventProcessorHost.ConsumerGroupName,
eventProcessorHost.EventHubConnectionString,
eventProcessorHost.StorageConnectionString,
_logger,
blobContainer));
}

void IListener.Cancel()
Expand Down Expand Up @@ -216,6 +208,7 @@ public async Task ProcessEventsAsync(ProcessorPartitionContext context, IEnumera
// code, and capture/log/persist failed events, since they won't be retried.
if (messages.Any())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to consider capturing the messages.ToArray() return from line 153 and using that here; you're enumerating an IEnumerable<T> multiple times. Pragmatically, we return IReadOnlyList<EventData> from the EventProcessor<T>. Since you're already paying the array transform cost, may as well use it. I wonder if, long term, it would make sense to use IReadOnlyList instead of an array and avoid the transforms altogether.

{
context.CheckpointEvent = messages.Last();
await CheckpointAsync(context).ConfigureAwait(false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,24 @@
<TargetFramework>netstandard2.0</TargetFramework>
<Description>Microsoft Azure WebJobs SDK EventHubs Extension</Description>
<Version>5.0.0-beta.1</Version>
<NoWarn>$(NoWarn);AZC0001;CS1591</NoWarn>
<NoWarn>$(NoWarn);AZC0001;CS1591;SA1636</NoWarn>
<AssemblyOriginatorKeyFile>sign.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Messaging.EventHubs" />
<PackageReference Include="Azure.Storage.Blobs" />
<PackageReference Include="Microsoft.Azure.WebJobs" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="../../Azure.Messaging.EventHubs/src/Azure.Messaging.EventHubs.csproj" />
</ItemGroup>

<ItemGroup>
<Compile Include="$(AzureCoreSharedSources)Argument.cs" />
<Compile Include="../../Azure.Messaging.EventHubs.Shared/src/Processor/StorageManager.cs"/>
<Compile Include="../../Azure.Messaging.EventHubs.Shared/src/Core/CancellationTokenExtensions.cs"/>
<Compile Include="../../Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/*.cs"/>
</ItemGroup>

</Project>
Loading