Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable EventHubs WebJobs extension live tests #17029

Merged
11 commits merged into from
Nov 24, 2020
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Azure.WebJobs.Ext
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests", "tests\Microsoft.Azure.WebJobs.Extensions.EventHubs.Tests.csproj", "{91E0D968-2082-4959-A294-6F1B790ECECF}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Azure.Core.TestFramework", "..\..\core\Azure.Core.TestFramework\src\Azure.Core.TestFramework.csproj", "{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -44,5 +46,17 @@ Global
{91E0D968-2082-4959-A294-6F1B790ECECF}.Release|x64.Build.0 = Release|Any CPU
{91E0D968-2082-4959-A294-6F1B790ECECF}.Release|x86.ActiveCfg = Release|Any CPU
{91E0D968-2082-4959-A294-6F1B790ECECF}.Release|x86.Build.0 = Release|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|x64.ActiveCfg = Debug|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|x64.Build.0 = Debug|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|x86.ActiveCfg = Debug|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Debug|x86.Build.0 = Debug|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|Any CPU.Build.0 = Release|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|x64.ActiveCfg = Release|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|x64.Build.0 = Release|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|x86.ActiveCfg = Release|Any CPU
{DCDC9E7A-94F5-4765-B6B1-DCA4E0E04EF7}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private static LogLevel GetLogLevel(Exception ex)
private IAsyncCollector<EventData> BuildFromAttribute(EventHubAttribute attribute)
{
EventHubProducerClient client = _options.Value.GetEventHubProducerClient(attribute.EventHubName, attribute.Connection);
return new EventHubAsyncCollector(new EventHubProducerClientImpl(client));
return new EventHubAsyncCollector(new EventHubProducerClientImpl(client, _loggerFactory));
}

private static string ConvertEventDataToString(EventData x)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,12 @@ internal EventProcessorHost GetEventProcessorHost(IConfiguration config, string

// Use blob prefix support available in EPH starting in 2.2.6
EventProcessorHost host = new EventProcessorHost(
eventHubName: eventHubName,
pakrym marked this conversation as resolved.
Show resolved Hide resolved
eventHubPath: actualPath,
consumerGroupName: consumerGroup,
eventHubConnectionString: sb.ToString(),
storageConnectionString: storageConnectionString,
leaseContainerName: LeaseContainerName,
legacyCheckpointStorageBlobPrefix: blobPrefix,
exceptionHandler: _exceptionHandler);

return host;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs.Producer;
using Microsoft.Azure.WebJobs.Logging;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs
{
/// TODO: Remove when https://github.com/Azure/azure-sdk-for-net/issues/9117 is fixed
internal class EventHubProducerClientImpl : IEventHubProducerClient
{
private readonly EventHubProducerClient _client;
private readonly ILogger _logger;

public EventHubProducerClientImpl(EventHubProducerClient client)
public EventHubProducerClientImpl(EventHubProducerClient client, ILoggerFactory loggerFactory)
{
_client = client;
_logger = loggerFactory?.CreateLogger(LogCategories.Executor);
}

public async Task<IEventDataBatch> CreateBatchAsync(CancellationToken cancellationToken)
Expand All @@ -24,7 +28,9 @@ public async Task<IEventDataBatch> CreateBatchAsync(CancellationToken cancellati

public async Task SendAsync(IEventDataBatch batch, CancellationToken cancellationToken)
{
await _client.SendAsync(((EventDataBatchImpl) batch).Batch, cancellationToken).ConfigureAwait(false);
_logger?.LogDebug("Sending events to EventHub");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some tests rely on this message.

var eventDataBatch = ((EventDataBatchImpl) batch).Batch;
await _client.SendAsync(eventDataBatch, cancellationToken).ConfigureAwait(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
using Microsoft.Azure.WebJobs.EventHubs.Listeners;
Expand All @@ -26,11 +25,6 @@ namespace Microsoft.Azure.WebJobs.EventHubs
internal sealed class EventHubListener : IListener, IEventProcessorFactory, IScaleMonitorProvider
{
private static readonly Dictionary<string, object> EmptyScope = new Dictionary<string, object>();
private readonly string _functionId;
private readonly string _eventHubName;
private readonly string _consumerGroup;
private readonly string _connectionString;
private readonly string _storageConnectionString;
private readonly ITriggeredFunctionExecutor _executor;
private readonly EventProcessorHost _eventProcessorHost;
private readonly bool _singleDispatch;
Expand All @@ -42,28 +36,26 @@ internal sealed class EventHubListener : IListener, IEventProcessorFactory, ISca

public EventHubListener(
string functionId,
string eventHubName,
string consumerGroup,
string connectionString,
string storageConnectionString,
ITriggeredFunctionExecutor executor,
EventProcessorHost eventProcessorHost,
bool singleDispatch,
EventHubOptions options,
ILogger logger,
BlobContainerClient blobContainer = null)
{
_functionId = functionId;
_eventHubName = eventHubName;
_consumerGroup = consumerGroup;
_connectionString = connectionString;
_storageConnectionString = storageConnectionString;
_executor = executor;
_eventProcessorHost = eventProcessorHost;
_singleDispatch = singleDispatch;
_options = options;
_logger = logger;
_scaleMonitor = new Lazy<EventHubsScaleMonitor>(() => new EventHubsScaleMonitor(_functionId, _eventHubName, _consumerGroup, _connectionString, _storageConnectionString, _logger, blobContainer));
_scaleMonitor = new Lazy<EventHubsScaleMonitor>(() => new EventHubsScaleMonitor(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be ThreadSafetyMode.PublicationOnly or is it expensive to potentially throw one out in a race?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a benefit to it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand it, a minor one. Instead of grabbing a primitive to ensure that only one thread can create the value, it will allow multiple threads to race for it and keep just the first via atomic swap.

functionId,
eventProcessorHost.EventHubName,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not extremely happy with the data flow here but I started with removing the duplication first.

eventProcessorHost.ConsumerGroupName,
eventProcessorHost.EventHubConnectionString,
eventProcessorHost.StorageConnectionString,
_logger,
blobContainer));
}

void IListener.Cancel()
Expand Down Expand Up @@ -216,6 +208,7 @@ public async Task ProcessEventsAsync(ProcessorPartitionContext context, IEnumera
// code, and capture/log/persist failed events, since they won't be retried.
if (messages.Any())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to consider capturing the messages.ToArray() return from line 153 and using that here; you're enumerating an IEnumerable<T> multiple times. Pragmatically, we return IReadOnlyList<EventData> from the EventProcessor<T>. Since you're already paying the array transform cost, may as well use it. I wonder if, long term, it would make sense to use IReadOnlyList instead of an array and avoid the transforms altogether.

{
context.CheckpointEvent = messages.Last();
await CheckpointAsync(context).ConfigureAwait(false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<TargetFramework>netstandard2.0</TargetFramework>
<Description>Microsoft Azure WebJobs SDK EventHubs Extension</Description>
<Version>5.0.0-beta.1</Version>
<NoWarn>$(NoWarn);AZC0001;CS1591</NoWarn>
<NoWarn>$(NoWarn);AZC0001;CS1591;SA1636</NoWarn>
<AssemblyOriginatorKeyFile>sign.snk</AssemblyOriginatorKeyFile>
</PropertyGroup>

Expand All @@ -14,4 +14,11 @@
<PackageReference Include="Microsoft.Azure.WebJobs" />
</ItemGroup>

<ItemGroup>
<Compile Include="$(AzureCoreSharedSources)Argument.cs" />
<Compile Include="../../Azure.Messaging.EventHubs.Shared/src/Processor/StorageManager.cs"/>
<Compile Include="../../Azure.Messaging.EventHubs.Shared/src/Core/CancellationTokenExtensions.cs"/>
<Compile Include="../../Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/*.cs"/>
</ItemGroup>

</Project>
Loading