Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add custom auth option for event producer factory #92

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions deploy/templates/default-azuredeploy-webjobs.json
Original file line number Diff line number Diff line change
Expand Up @@ -611,18 +611,18 @@
"EventBatching:MaxEvents": "[parameters('JobMaxEvents')]",
"Checkpoint:BlobPrefix": "",
"Checkpoint:CheckpointBatchCount": 5,
"CheckpointStorage:ServiceManagedIdentityAuth": true,
"CheckpointStorage:AuthenticationType": "ManagedIdentity",
"CheckpointStorage:BlobStorageContainerUri": "[variables('checkpoint_storage_uri')]",
"TemplateStorage:ServiceManagedIdentityAuth": true,
"TemplateStorage:AuthenticationType": "ManagedIdentity",
"TemplateStorage:BlobStorageContainerUri": "[variables('template_storage_uri')]",
"InputEventHub:EventHubNamespaceFQDN": "[variables('eventhub_fqdn')]",
"InputEventHub:EventHubConsumerGroup": "$Default",
"InputEventHub:EventHubName": "devicedata",
"InputEventHub:ServiceManagedIdentityAuth": true,
"InputEventHub:AuthenticationType": "ManagedIdentity",
"NormalizationEventHub:EventHubNamespaceFQDN": "[variables('eventhub_fqdn')]",
"NormalizationEventHub:EventHubConsumerGroup": "$Default",
"NormalizationEventHub:EventHubName": "normalizeddata",
"NormalizationEventHub:ServiceManagedIdentityAuth": true,
"NormalizationEventHub:AuthenticationType": "ManagedIdentity",
"FhirVersion": "[parameters('FhirVersion')]",
"FhirService:Url": "[concat('@Microsoft.KeyVault(SecretUri=', reference(resourceId('Microsoft.KeyVault/vaults/secrets', variables('key_vault_name'),'fhirserver-url'), '2018-02-14').secretUriWithVersion, ')')]",
"FhirService:Authority": "[concat('@Microsoft.KeyVault(SecretUri=', reference(resourceId('Microsoft.KeyVault/vaults/secrets', variables('key_vault_name'),'fhirserver-authority'), '2018-02-14').secretUriWithVersion, ')')]",
Expand Down
18 changes: 4 additions & 14 deletions src/console/Normalize/Processor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@ public class Processor : IEventConsumer
private ITemplateManager _templateManager;
private ITelemetryLogger _logger;
private IConfiguration _env;
private IAsyncCollector<IMeasurement> _collector;

public Processor(
[Blob("template/%Template:DeviceContent%", FileAccess.Read)] string templateDefinition,
ITemplateManager templateManager,
IAsyncCollector<IMeasurement> collector,
IConfiguration configuration,
ITelemetryLogger logger)
{
_templateDefinition = templateDefinition;
_templateManager = templateManager;
_collector = collector;
_logger = logger;
_env = configuration;
}
Expand Down Expand Up @@ -70,21 +73,8 @@ public async Task ConsumeAsync(IEnumerable<IEventMessage> events)
});

var dataNormalizationService = new MeasurementEventNormalizationService(_logger, template);

var collector = CreateCollector();

await dataNormalizationService.ProcessAsync(eventHubEvents, collector).ConfigureAwait(false);
}

private IAsyncCollector<IMeasurement> CreateCollector()
{
var eventHubProducerOptions = new EventProducerClientOptions();
_env.GetSection("NormalizationEventHub").Bind(eventHubProducerOptions);

var eventHubProducerFactory = new EventProducerClientFactory();
var eventHubProducerClient = eventHubProducerFactory.GetEventHubProducerClient(eventHubProducerOptions);

return new MeasurementToEventMessageAsyncCollector(new EventHubProducerService(eventHubProducerClient));
await dataNormalizationService.ProcessAsync(eventHubEvents, _collector).ConfigureAwait(false);
}
}
}
33 changes: 24 additions & 9 deletions src/console/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@
using Azure.Messaging.EventHubs;
using Microsoft.ApplicationInsights;
using Microsoft.ApplicationInsights.Extensibility;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Health.Common.Storage;
using Microsoft.Health.Events.Common;
using Microsoft.Health.Events.EventCheckpointing;
using Microsoft.Health.Events.EventConsumers;
using Microsoft.Health.Events.EventConsumers.Service;
using Microsoft.Health.Events.EventHubProcessor;
using Microsoft.Health.Events.EventProducers;
using Microsoft.Health.Events.Repository;
using Microsoft.Health.Fhir.Ingest.Console.Template;
using Microsoft.Health.Fhir.Ingest.Data;
using Microsoft.Health.Fhir.Ingest.Service;
using Microsoft.Health.Logging.Telemetry;
using System;
Expand All @@ -25,8 +28,8 @@ namespace Microsoft.Health.Fhir.Ingest.Console
{
public class Startup
{
private const string _deviceDataEventHubType = ApplicationType.Normalization;
private const string _normalizedDataEventHubType = ApplicationType.MeasurementToFhir;
private const string _normalizationAppType = ApplicationType.Normalization;
private const string _measurementToFhirAppType = ApplicationType.MeasurementToFhir;
public Startup(IConfiguration configuration)
{
Configuration = configuration;
Expand Down Expand Up @@ -67,13 +70,14 @@ public virtual List<IEventConsumer> ResolveEventConsumers(IServiceProvider servi
var templateManager = serviceProvider.GetRequiredService<TemplateManager>();
var logger = serviceProvider.GetRequiredService<ITelemetryLogger>();

if (applicationType == _deviceDataEventHubType)
if (applicationType == _normalizationAppType)
{
template = Configuration.GetSection("Template:DeviceContent").Value;
var deviceDataNormalization = new Normalize.Processor(template, templateManager, Configuration, logger);
var collector = ResolveEventCollector(serviceProvider);
var deviceDataNormalization = new Normalize.Processor(template, templateManager, collector, Configuration, logger);
eventConsumers.Add(deviceDataNormalization);
}
else if (applicationType == _normalizedDataEventHubType)
else if (applicationType == _measurementToFhirAppType)
{
template = Configuration.GetSection("Template:FhirMapping").Value;
var importService = serviceProvider.GetRequiredService<MeasurementFhirImportService>();
Expand Down Expand Up @@ -113,16 +117,27 @@ public virtual IEventConsumerService ResolveEventConsumerService(IServiceProvide
return new EventConsumerService(eventConsumers, logger);
}

public virtual IAsyncCollector<IMeasurement> ResolveEventCollector(IServiceProvider serviceProvider)
{
var eventHubProducerOptions = new EventHubClientOptions();
Configuration.GetSection("NormalizationEventHub").Bind(eventHubProducerOptions);

var eventHubProducerFactory = serviceProvider.GetRequiredService<IEventProducerClientFactory>();
var eventHubProducerClient = eventHubProducerFactory.GetEventHubProducerClient(eventHubProducerOptions);

return new MeasurementToEventMessageAsyncCollector(new EventHubProducerService(eventHubProducerClient));
}

public virtual EventProcessorClient ResolveEventProcessorClient(IServiceProvider serviceProvider)
{
var eventProcessorOptions = new EventProcessorClientFactoryOptions();
var eventProcessorOptions = new EventHubClientOptions();
var applicationType = GetConsoleApplicationType();

if (applicationType == _deviceDataEventHubType)
if (applicationType == _normalizationAppType)
{
Configuration.GetSection("InputEventHub").Bind(eventProcessorOptions);
}
else if (applicationType == _normalizedDataEventHubType)
else if (applicationType == _measurementToFhirAppType)
{
Configuration.GetSection("NormalizationEventHub").Bind(eventProcessorOptions);
}
Expand All @@ -131,7 +146,7 @@ public virtual EventProcessorClient ResolveEventProcessorClient(IServiceProvider
throw new Exception($"Unable to determine event processor options from application type {applicationType}");
}

var eventProcessorClientFactory = new EventProcessorClientFactory();
var eventProcessorClientFactory = serviceProvider.GetRequiredService<IEventProcessorClientFactory>();
var eventProcessorClientOptions = new EventProcessorClientOptions();
eventProcessorClientOptions.MaximumWaitTime = TimeSpan.FromSeconds(60);

Expand Down
8 changes: 4 additions & 4 deletions src/console/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,24 @@
"EventBatching:MaxEvents": 300,
"Checkpoint:BlobPrefix": "",
"Checkpoint:CheckpointBatchCount": 5,
"CheckpointStorage:ServiceManagedIdentityAuth": true,
"CheckpointStorage:AuthenticationType": "ManagedIdentity",
"CheckpointStorage:BlobStorageContainerUri": "",
"TemplateStorage:ServiceManagedIdentityAuth": true,
"TemplateStorage:AuthenticationType": "ManagedIdentity",
"TemplateStorage:BlobStorageContainerUri": "",
"Console:ApplicationType": "",
"FhirService:Authority": "",
"FhirService:ClientId": "",
"FhirService:ClientSecret": "",
"FhirService:Resource": "",
"FhirService:Url": "",
"InputEventHub:AuthenticationType": "ManagedIdentity",
"InputEventHub:EventHubNamespaceFQDN": "",
"InputEventHub:EventHubConsumerGroup": "$Default",
"InputEventHub:EventHubName": "devicedata",
"InputEventHub:ServiceManagedIdentityAuth": true,
"NormalizationEventHub:AuthenticationType": "ManagedIdentity",
"NormalizationEventHub:EventHubNamespaceFQDN": "",
"NormalizationEventHub:EventHubConsumerGroup": "$Default",
"NormalizationEventHub:EventHubName": "normalizeddata",
"NormalizationEventHub:ServiceManagedIdentityAuth": true,
"ResourceIdentity:ResourceIdentityServiceType": "Create",
"ResourceIdentity:DefaultDeviceIdentifierSystem": "",
"Template:DeviceContent": "devicecontent.json",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,37 @@ namespace Microsoft.Health.Common.Storage
{
public class BlobContainerClientFactory
{
public BlobContainerClient CreateStorageClient(BlobContainerClientOptions options)
public BlobContainerClient CreateStorageClient(BlobContainerClientOptions options, IAzureCredentialProvider provider = null)
{
EnsureArg.IsNotNull(options);
var containerUri = EnsureArg.IsNotNull(options.BlobStorageContainerUri);
var containerUri = EnsureArg.IsNotNull(options.BlobStorageContainerUri, nameof(options.BlobStorageContainerUri));
var blobUri = new BlobUriBuilder(containerUri);

if (options.ServiceManagedIdentityAuth)
if (options.AuthenticationType == AuthenticationType.ManagedIdentity)
{
var tokenCredential = new DefaultAzureCredential();
return new BlobContainerClient(containerUri, tokenCredential);
}
else if (!string.IsNullOrEmpty(options.ConnectionString))
else if (options.AuthenticationType == AuthenticationType.ConnectionString)
{
return new BlobContainerClient(containerUri.ToString(), blobUri.BlobContainerName);
EnsureArg.IsNotNull(options.ConnectionString, nameof(options.ConnectionString));
EnsureArg.IsNotNull(blobUri.BlobContainerName);

return new BlobContainerClient(options.ConnectionString, blobUri.BlobContainerName);
}
else if (options.AuthenticationType == AuthenticationType.Custom)
{
EnsureArg.IsNotNull(provider);

var tokenCredential = provider.GetCredential();
return new BlobContainerClient(containerUri, tokenCredential);
}
else
{
throw new Exception($"Unable to create blob container client for {blobUri}");
var ex = $"Unable to create blob container client for {blobUri}.";
var message = "No authentication type was specified for BlobContainerClientOptions";
throw new Exception($"{ex} {message}");
}
}

public BlobContainerClient CreateStorageClient(BlobContainerClientOptions options, IAzureCredentialProvider provider)
{
EnsureArg.IsNotNull(options);
var containerUri = EnsureArg.IsNotNull(options.BlobStorageContainerUri);

var tokenCredential = provider.GetCredential();
return new BlobContainerClient(containerUri, tokenCredential);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,24 @@

namespace Microsoft.Health.Common.Storage
{
public enum AuthenticationType
{
/// <summary>A managed identity is used to connect to Storage.</summary>
ManagedIdentity,

/// <summary>A connection string is used to connect to Storage.</summary>
ConnectionString,

/// <summary>A custom authentication method is used to connect to Storage.</summary>
Custom,
}

public class BlobContainerClientOptions
{
public AuthenticationType AuthenticationType { get; set; }

public Uri BlobStorageContainerUri { get; set; }

public string ConnectionString { get; set; }

public bool ServiceManagedIdentityAuth { get; set; }

public bool CustomizedAuth { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,30 @@
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

namespace Microsoft.Health.Events.EventHubProcessor
namespace Microsoft.Health.Events.Common
{
public class EventProcessorClientFactoryOptions
public enum AuthenticationType
{
/// <summary>A managed identity is used to connect to the Event Hub.</summary>
ManagedIdentity,

/// <summary>A connection string is used to connect to the Event Hub.</summary>
ConnectionString,

/// <summary>A custom authentication method is used to connect to the Event Hub.</summary>
Custom,
}

public class EventHubClientOptions
{
public AuthenticationType AuthenticationType { get; set; }

public string EventHubNamespaceFQDN { get; set; }

public string EventHubConsumerGroup { get; set; }

public string EventHubName { get; set; }

public string ConnectionString { get; set; }

public bool ServiceManagedIdentityAuth { get; set; }

public bool CustomizedAuth { get; set; }
}
}
33 changes: 33 additions & 0 deletions src/lib/Microsoft.Health.Events/Common/EventHubFormatter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using EnsureThat;

namespace Microsoft.Health.Events.Common
{
public static class EventHubFormatter
{
public static string GetEventHubFQDN(string host)
{
EnsureArg.IsNotEmptyOrWhiteSpace(host);

if (Uri.IsWellFormedUriString(host, UriKind.Absolute))
{
var uri = new Uri(host);
host = uri.Host;
}

if (Uri.CheckHostName(host) != UriHostNameType.Unknown)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, I personal prefer put validations first and let it fail as early as possible.

e.g.

if (!Uri.IsWellFormedUriString(host, UriKind.Absolute)) {
throw new Exception("not well formatted.");
}

var uri = new Uri(host);
host = uri.Host;

if(Uri.CheckHostName(host) == UriHostNameType.Unknown)
throw new Exception("host is unknown");
)

return host;

{
return host;
}
else
{
throw new Exception($"The event hub FQDN: {host} is not valid");
}
}
}
}
Loading