Skip to content

Commit

Permalink
Add custom auth option for event producer factory (#92)
Browse files Browse the repository at this point in the history
* support custom auth option for event producer
  • Loading branch information
wi-y authored Mar 8, 2021
1 parent 0b90a99 commit 3dbcd56
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 106 deletions.
8 changes: 4 additions & 4 deletions deploy/templates/default-azuredeploy-webjobs.json
Original file line number Diff line number Diff line change
Expand Up @@ -611,18 +611,18 @@
"EventBatching:MaxEvents": "[parameters('JobMaxEvents')]",
"Checkpoint:BlobPrefix": "",
"Checkpoint:CheckpointBatchCount": 5,
"CheckpointStorage:ServiceManagedIdentityAuth": true,
"CheckpointStorage:AuthenticationType": "ManagedIdentity",
"CheckpointStorage:BlobStorageContainerUri": "[variables('checkpoint_storage_uri')]",
"TemplateStorage:ServiceManagedIdentityAuth": true,
"TemplateStorage:AuthenticationType": "ManagedIdentity",
"TemplateStorage:BlobStorageContainerUri": "[variables('template_storage_uri')]",
"InputEventHub:EventHubNamespaceFQDN": "[variables('eventhub_fqdn')]",
"InputEventHub:EventHubConsumerGroup": "$Default",
"InputEventHub:EventHubName": "devicedata",
"InputEventHub:ServiceManagedIdentityAuth": true,
"InputEventHub:AuthenticationType": "ManagedIdentity",
"NormalizationEventHub:EventHubNamespaceFQDN": "[variables('eventhub_fqdn')]",
"NormalizationEventHub:EventHubConsumerGroup": "$Default",
"NormalizationEventHub:EventHubName": "normalizeddata",
"NormalizationEventHub:ServiceManagedIdentityAuth": true,
"NormalizationEventHub:AuthenticationType": "ManagedIdentity",
"FhirVersion": "[parameters('FhirVersion')]",
"FhirService:Url": "[concat('@Microsoft.KeyVault(SecretUri=', reference(resourceId('Microsoft.KeyVault/vaults/secrets', variables('key_vault_name'),'fhirserver-url'), '2018-02-14').secretUriWithVersion, ')')]",
"FhirService:Authority": "[concat('@Microsoft.KeyVault(SecretUri=', reference(resourceId('Microsoft.KeyVault/vaults/secrets', variables('key_vault_name'),'fhirserver-authority'), '2018-02-14').secretUriWithVersion, ')')]",
Expand Down
18 changes: 4 additions & 14 deletions src/console/Normalize/Processor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@ public class Processor : IEventConsumer
private ITemplateManager _templateManager;
private ITelemetryLogger _logger;
private IConfiguration _env;
private IAsyncCollector<IMeasurement> _collector;

public Processor(
[Blob("template/%Template:DeviceContent%", FileAccess.Read)] string templateDefinition,
ITemplateManager templateManager,
IAsyncCollector<IMeasurement> collector,
IConfiguration configuration,
ITelemetryLogger logger)
{
_templateDefinition = templateDefinition;
_templateManager = templateManager;
_collector = collector;
_logger = logger;
_env = configuration;
}
Expand Down Expand Up @@ -70,21 +73,8 @@ public async Task ConsumeAsync(IEnumerable<IEventMessage> events)
});

var dataNormalizationService = new MeasurementEventNormalizationService(_logger, template);

var collector = CreateCollector();

await dataNormalizationService.ProcessAsync(eventHubEvents, collector).ConfigureAwait(false);
}

private IAsyncCollector<IMeasurement> CreateCollector()
{
var eventHubProducerOptions = new EventProducerClientOptions();
_env.GetSection("NormalizationEventHub").Bind(eventHubProducerOptions);

var eventHubProducerFactory = new EventProducerClientFactory();
var eventHubProducerClient = eventHubProducerFactory.GetEventHubProducerClient(eventHubProducerOptions);

return new MeasurementToEventMessageAsyncCollector(new EventHubProducerService(eventHubProducerClient));
await dataNormalizationService.ProcessAsync(eventHubEvents, _collector).ConfigureAwait(false);
}
}
}
33 changes: 24 additions & 9 deletions src/console/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@
using Azure.Messaging.EventHubs;
using Microsoft.ApplicationInsights;
using Microsoft.ApplicationInsights.Extensibility;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Health.Common.Storage;
using Microsoft.Health.Events.Common;
using Microsoft.Health.Events.EventCheckpointing;
using Microsoft.Health.Events.EventConsumers;
using Microsoft.Health.Events.EventConsumers.Service;
using Microsoft.Health.Events.EventHubProcessor;
using Microsoft.Health.Events.EventProducers;
using Microsoft.Health.Events.Repository;
using Microsoft.Health.Fhir.Ingest.Console.Template;
using Microsoft.Health.Fhir.Ingest.Data;
using Microsoft.Health.Fhir.Ingest.Service;
using Microsoft.Health.Logging.Telemetry;
using System;
Expand All @@ -25,8 +28,8 @@ namespace Microsoft.Health.Fhir.Ingest.Console
{
public class Startup
{
private const string _deviceDataEventHubType = ApplicationType.Normalization;
private const string _normalizedDataEventHubType = ApplicationType.MeasurementToFhir;
private const string _normalizationAppType = ApplicationType.Normalization;
private const string _measurementToFhirAppType = ApplicationType.MeasurementToFhir;
public Startup(IConfiguration configuration)
{
Configuration = configuration;
Expand Down Expand Up @@ -67,13 +70,14 @@ public virtual List<IEventConsumer> ResolveEventConsumers(IServiceProvider servi
var templateManager = serviceProvider.GetRequiredService<TemplateManager>();
var logger = serviceProvider.GetRequiredService<ITelemetryLogger>();

if (applicationType == _deviceDataEventHubType)
if (applicationType == _normalizationAppType)
{
template = Configuration.GetSection("Template:DeviceContent").Value;
var deviceDataNormalization = new Normalize.Processor(template, templateManager, Configuration, logger);
var collector = ResolveEventCollector(serviceProvider);
var deviceDataNormalization = new Normalize.Processor(template, templateManager, collector, Configuration, logger);
eventConsumers.Add(deviceDataNormalization);
}
else if (applicationType == _normalizedDataEventHubType)
else if (applicationType == _measurementToFhirAppType)
{
template = Configuration.GetSection("Template:FhirMapping").Value;
var importService = serviceProvider.GetRequiredService<MeasurementFhirImportService>();
Expand Down Expand Up @@ -113,16 +117,27 @@ public virtual IEventConsumerService ResolveEventConsumerService(IServiceProvide
return new EventConsumerService(eventConsumers, logger);
}

public virtual IAsyncCollector<IMeasurement> ResolveEventCollector(IServiceProvider serviceProvider)
{
var eventHubProducerOptions = new EventHubClientOptions();
Configuration.GetSection("NormalizationEventHub").Bind(eventHubProducerOptions);

var eventHubProducerFactory = serviceProvider.GetRequiredService<IEventProducerClientFactory>();
var eventHubProducerClient = eventHubProducerFactory.GetEventHubProducerClient(eventHubProducerOptions);

return new MeasurementToEventMessageAsyncCollector(new EventHubProducerService(eventHubProducerClient));
}

public virtual EventProcessorClient ResolveEventProcessorClient(IServiceProvider serviceProvider)
{
var eventProcessorOptions = new EventProcessorClientFactoryOptions();
var eventProcessorOptions = new EventHubClientOptions();
var applicationType = GetConsoleApplicationType();

if (applicationType == _deviceDataEventHubType)
if (applicationType == _normalizationAppType)
{
Configuration.GetSection("InputEventHub").Bind(eventProcessorOptions);
}
else if (applicationType == _normalizedDataEventHubType)
else if (applicationType == _measurementToFhirAppType)
{
Configuration.GetSection("NormalizationEventHub").Bind(eventProcessorOptions);
}
Expand All @@ -131,7 +146,7 @@ public virtual EventProcessorClient ResolveEventProcessorClient(IServiceProvider
throw new Exception($"Unable to determine event processor options from application type {applicationType}");
}

var eventProcessorClientFactory = new EventProcessorClientFactory();
var eventProcessorClientFactory = serviceProvider.GetRequiredService<IEventProcessorClientFactory>();
var eventProcessorClientOptions = new EventProcessorClientOptions();
eventProcessorClientOptions.MaximumWaitTime = TimeSpan.FromSeconds(60);

Expand Down
8 changes: 4 additions & 4 deletions src/console/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,24 @@
"EventBatching:MaxEvents": 300,
"Checkpoint:BlobPrefix": "",
"Checkpoint:CheckpointBatchCount": 5,
"CheckpointStorage:ServiceManagedIdentityAuth": true,
"CheckpointStorage:AuthenticationType": "ManagedIdentity",
"CheckpointStorage:BlobStorageContainerUri": "",
"TemplateStorage:ServiceManagedIdentityAuth": true,
"TemplateStorage:AuthenticationType": "ManagedIdentity",
"TemplateStorage:BlobStorageContainerUri": "",
"Console:ApplicationType": "",
"FhirService:Authority": "",
"FhirService:ClientId": "",
"FhirService:ClientSecret": "",
"FhirService:Resource": "",
"FhirService:Url": "",
"InputEventHub:AuthenticationType": "ManagedIdentity",
"InputEventHub:EventHubNamespaceFQDN": "",
"InputEventHub:EventHubConsumerGroup": "$Default",
"InputEventHub:EventHubName": "devicedata",
"InputEventHub:ServiceManagedIdentityAuth": true,
"NormalizationEventHub:AuthenticationType": "ManagedIdentity",
"NormalizationEventHub:EventHubNamespaceFQDN": "",
"NormalizationEventHub:EventHubConsumerGroup": "$Default",
"NormalizationEventHub:EventHubName": "normalizeddata",
"NormalizationEventHub:ServiceManagedIdentityAuth": true,
"ResourceIdentity:ResourceIdentityServiceType": "Create",
"ResourceIdentity:DefaultDeviceIdentifierSystem": "",
"Template:DeviceContent": "devicecontent.json",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,37 @@ namespace Microsoft.Health.Common.Storage
{
public class BlobContainerClientFactory
{
public BlobContainerClient CreateStorageClient(BlobContainerClientOptions options)
public BlobContainerClient CreateStorageClient(BlobContainerClientOptions options, IAzureCredentialProvider provider = null)
{
EnsureArg.IsNotNull(options);
var containerUri = EnsureArg.IsNotNull(options.BlobStorageContainerUri);
var containerUri = EnsureArg.IsNotNull(options.BlobStorageContainerUri, nameof(options.BlobStorageContainerUri));
var blobUri = new BlobUriBuilder(containerUri);

if (options.ServiceManagedIdentityAuth)
if (options.AuthenticationType == AuthenticationType.ManagedIdentity)
{
var tokenCredential = new DefaultAzureCredential();
return new BlobContainerClient(containerUri, tokenCredential);
}
else if (!string.IsNullOrEmpty(options.ConnectionString))
else if (options.AuthenticationType == AuthenticationType.ConnectionString)
{
return new BlobContainerClient(containerUri.ToString(), blobUri.BlobContainerName);
EnsureArg.IsNotNull(options.ConnectionString, nameof(options.ConnectionString));
EnsureArg.IsNotNull(blobUri.BlobContainerName);

return new BlobContainerClient(options.ConnectionString, blobUri.BlobContainerName);
}
else if (options.AuthenticationType == AuthenticationType.Custom)
{
EnsureArg.IsNotNull(provider);

var tokenCredential = provider.GetCredential();
return new BlobContainerClient(containerUri, tokenCredential);
}
else
{
throw new Exception($"Unable to create blob container client for {blobUri}");
var ex = $"Unable to create blob container client for {blobUri}.";
var message = "No authentication type was specified for BlobContainerClientOptions";
throw new Exception($"{ex} {message}");
}
}

public BlobContainerClient CreateStorageClient(BlobContainerClientOptions options, IAzureCredentialProvider provider)
{
EnsureArg.IsNotNull(options);
var containerUri = EnsureArg.IsNotNull(options.BlobStorageContainerUri);

var tokenCredential = provider.GetCredential();
return new BlobContainerClient(containerUri, tokenCredential);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,24 @@

namespace Microsoft.Health.Common.Storage
{
public enum AuthenticationType
{
/// <summary>A managed identity is used to connect to Storage.</summary>
ManagedIdentity,

/// <summary>A connection string is used to connect to Storage.</summary>
ConnectionString,

/// <summary>A custom authentication method is used to connect to Storage.</summary>
Custom,
}

public class BlobContainerClientOptions
{
public AuthenticationType AuthenticationType { get; set; }

public Uri BlobStorageContainerUri { get; set; }

public string ConnectionString { get; set; }

public bool ServiceManagedIdentityAuth { get; set; }

public bool CustomizedAuth { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,30 @@
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

namespace Microsoft.Health.Events.EventHubProcessor
namespace Microsoft.Health.Events.Common
{
public class EventProcessorClientFactoryOptions
public enum AuthenticationType
{
/// <summary>A managed identity is used to connect to the Event Hub.</summary>
ManagedIdentity,

/// <summary>A connection string is used to connect to the Event Hub.</summary>
ConnectionString,

/// <summary>A custom authentication method is used to connect to the Event Hub.</summary>
Custom,
}

public class EventHubClientOptions
{
public AuthenticationType AuthenticationType { get; set; }

public string EventHubNamespaceFQDN { get; set; }

public string EventHubConsumerGroup { get; set; }

public string EventHubName { get; set; }

public string ConnectionString { get; set; }

public bool ServiceManagedIdentityAuth { get; set; }

public bool CustomizedAuth { get; set; }
}
}
33 changes: 33 additions & 0 deletions src/lib/Microsoft.Health.Events/Common/EventHubFormatter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using EnsureThat;

namespace Microsoft.Health.Events.Common
{
public static class EventHubFormatter
{
public static string GetEventHubFQDN(string host)
{
EnsureArg.IsNotEmptyOrWhiteSpace(host);

if (Uri.IsWellFormedUriString(host, UriKind.Absolute))
{
var uri = new Uri(host);
host = uri.Host;
}

if (Uri.CheckHostName(host) != UriHostNameType.Unknown)
{
return host;
}
else
{
throw new Exception($"The event hub FQDN: {host} is not valid");
}
}
}
}
Loading

0 comments on commit 3dbcd56

Please sign in to comment.