Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clear Event Hub Device Watermarks when source event hub changed #114

Merged
merged 4 commits into from
Jun 25, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions src/console/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,18 @@ public virtual StorageCheckpointClient ResolveCheckpointClient(IServiceProvider
var checkpointBlobClient = factory.CreateStorageClient(checkpointContainerOptions);
var logger = serviceProvider.GetRequiredService<ITelemetryLogger>();

storageOptions.BlobPrefix = applicationType;
var checkpointClient = new StorageCheckpointClient(checkpointBlobClient, storageOptions, logger);
var eventProcessorOptions = new EventHubClientOptions();
if (applicationType == _normalizationAppType)
{
Configuration.GetSection("InputEventHub").Bind(eventProcessorOptions);
}
else if (applicationType == _measurementToFhirAppType)
{
Configuration.GetSection("NormalizationEventHub").Bind(eventProcessorOptions);
}

storageOptions.BlobPrefix = $"{applicationType}/{storageOptions.BlobPrefix}";
pallar-ms marked this conversation as resolved.
Show resolved Hide resolved
var checkpointClient = new StorageCheckpointClient(checkpointBlobClient, storageOptions, eventProcessorOptions, logger);
return checkpointClient;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,7 @@ public interface ICheckpointClient
Task PublishCheckpointAsync(string partitionId);

Task<Checkpoint> GetCheckpointForPartitionAsync(string partitionId);

Task ResetCheckpointsAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
using System.Threading;
using System.Threading.Tasks;
using Azure;
using Azure.Messaging.EventHubs;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using EnsureThat;
using Microsoft.Health.Events.Common;
using Microsoft.Health.Events.Model;
using Microsoft.Health.Events.Telemetry;
using Microsoft.Health.Logging.Telemetry;
Expand All @@ -23,27 +25,35 @@ namespace Microsoft.Health.Events.EventCheckpointing
{
public class StorageCheckpointClient : ICheckpointClient
{
private ConcurrentDictionary<string, Checkpoint> _checkpoints;
private ConcurrentDictionary<string, int> _lastCheckpointTracker;
private int _lastCheckpointMaxCount;
private BlobContainerClient _storageClient;
private ITelemetryLogger _log;

public StorageCheckpointClient(BlobContainerClient containerClient, StorageCheckpointOptions options, ITelemetryLogger log)
private readonly string _blobCheckpointPrefix;
private readonly string _blobPath;
private readonly ConcurrentDictionary<string, Checkpoint> _checkpoints;
private readonly int _lastCheckpointMaxCount;
private readonly ConcurrentDictionary<string, int> _lastCheckpointTracker;
private readonly ITelemetryLogger _log;
private readonly BlobContainerClient _storageClient;

public StorageCheckpointClient(BlobContainerClient containerClient, StorageCheckpointOptions storageCheckpointOptions, EventHubClientOptions eventHubClientOptions, ITelemetryLogger log)
{
EnsureArg.IsNotNull(containerClient);
EnsureArg.IsNotNull(options);
BlobPrefix = options.BlobPrefix;
EnsureArg.IsNotNull(containerClient, nameof(containerClient));
EnsureArg.IsNotNull(storageCheckpointOptions, nameof(storageCheckpointOptions));
EnsureArg.IsNotNull(eventHubClientOptions, nameof(eventHubClientOptions));

(string eventHubNamespaceFQDN, string eventHubName) = GetEventHubProperties(eventHubClientOptions);
EnsureArg.IsNotNullOrWhiteSpace(eventHubNamespaceFQDN, nameof(eventHubNamespaceFQDN));
EnsureArg.IsNotNullOrWhiteSpace(eventHubName, nameof(eventHubName));

// Blob path for checkpoints includes the event hub name to scope the checkpoints per source event hub.
_blobCheckpointPrefix = $"{storageCheckpointOptions.BlobPrefix}/checkpoint/";
_blobPath = $"{_blobCheckpointPrefix}{eventHubNamespaceFQDN}/{eventHubName}/";

_lastCheckpointMaxCount = int.Parse(options.CheckpointBatchCount);
_lastCheckpointMaxCount = int.Parse(storageCheckpointOptions.CheckpointBatchCount);
_checkpoints = new ConcurrentDictionary<string, Checkpoint>();
_lastCheckpointTracker = new ConcurrentDictionary<string, int>();
_storageClient = containerClient;
_log = log;
}

public string BlobPrefix { get; }

public BlobContainerClient GetBlobContainerClient()
{
return _storageClient;
Expand All @@ -55,7 +65,7 @@ public async Task UpdateCheckpointAsync(Checkpoint checkpoint)
EnsureArg.IsNotNullOrWhiteSpace(checkpoint.Id);
var lastProcessed = EnsureArg.IsNotNullOrWhiteSpace(checkpoint.LastProcessed.DateTime.ToString("MM/dd/yyyy hh:mm:ss.fff tt"));

var blobName = $"{BlobPrefix}/checkpoint/{checkpoint.Id}";
var blobName = $"{checkpoint.Prefix}{checkpoint.Id}";
var blobClient = _storageClient.GetBlobClient(blobName);

var metadata = new Dictionary<string, string>()
Expand Down Expand Up @@ -87,7 +97,7 @@ public async Task UpdateCheckpointAsync(Checkpoint checkpoint)

public Task<Checkpoint> GetCheckpointForPartitionAsync(string partitionIdentifier)
{
var prefix = $"{BlobPrefix}/checkpoint/{partitionIdentifier}";
var prefix = $"{_blobPath}{partitionIdentifier}";

Task<Checkpoint> GetCheckpointAsync()
{
Expand All @@ -103,7 +113,7 @@ Task<Checkpoint> GetCheckpointAsync()
DateTimeOffset.TryParse(str, null, DateTimeStyles.AssumeUniversal, out lastEventTimestamp);
}

checkpoint.Prefix = BlobPrefix;
checkpoint.Prefix = _blobPath;
checkpoint.Id = partitionId;
checkpoint.LastProcessed = lastEventTimestamp;
}
Expand Down Expand Up @@ -136,7 +146,7 @@ public async Task SetCheckpointAsync(IEventMessage eventArgs)
var checkpoint = new Checkpoint();
checkpoint.LastProcessed = eventArgs.EnqueuedTime;
checkpoint.Id = partitionId;
checkpoint.Prefix = BlobPrefix;
checkpoint.Prefix = _blobPath;

_checkpoints[partitionId] = checkpoint;
var count = _lastCheckpointTracker.AddOrUpdate(partitionId, 1, (key, value) => value + 1);
Expand All @@ -161,5 +171,66 @@ public async Task PublishCheckpointAsync(string partitionId)
Checkpoint checkpoint = _checkpoints[partitionId];
await UpdateCheckpointAsync(checkpoint);
}

/// <summary>
/// Deletes the previously recorded checkpoints if the current checkpoint blob path (corresponding to the input event hub) has changed.
/// </summary>
public async Task ResetCheckpointsAsync()
{
try
{
foreach (BlobItem blob in _storageClient.GetBlobs(states: BlobStates.All, prefix: _blobCheckpointPrefix, cancellationToken: CancellationToken.None))
pallar-ms marked this conversation as resolved.
Show resolved Hide resolved
{
if (!blob.Name.Contains(_blobPath, StringComparison.OrdinalIgnoreCase))
pallar-ms marked this conversation as resolved.
Show resolved Hide resolved
{
try
{
await _storageClient.DeleteBlobAsync(blob.Name, cancellationToken: CancellationToken.None);
_log.LogTrace($"Blob checkpoint path changed to {_blobPath}. Deleted checkpoint {blob.Name}.");
}
#pragma warning disable CA1031
catch (Exception ex)
#pragma warning restore CA1031
{
_log.LogError(new Exception($"Unable to delete checkpoint {blob.Name} with error {ex.Message}"));
}
}
}
}
#pragma warning disable CA1031
catch (Exception ex)
#pragma warning restore CA1031
{
_log.LogError(new Exception($"Unable to reset checkpoints. {ex.Message}"));
}
}

private (string eventHubNamespaceFQDN, string eventHubName) GetEventHubProperties(EventHubClientOptions eventHubClientOptions)
{
// If the authentication type for the event hub is ConnectionString, then parse the event hub properties (eventHubNamspaceFQDN and eventHubName) from the provided connection string,
// else return the supplied eventHubClientOptions properties for eventHubNamspaceFQDN and eventHubOptions.
var eventHubNamespaceFQDN = eventHubClientOptions.EventHubNamespaceFQDN;
var eventHubName = eventHubClientOptions.EventHubName;

if (eventHubClientOptions.AuthenticationType == AuthenticationType.ConnectionString)
{
EnsureArg.IsNotNull(eventHubClientOptions.ConnectionString, nameof(eventHubClientOptions.ConnectionString));

try
{
var eventHubsConnectionStringProperties = EventHubsConnectionStringProperties.Parse(eventHubClientOptions.ConnectionString);
eventHubNamespaceFQDN = eventHubsConnectionStringProperties.FullyQualifiedNamespace;
eventHubName = eventHubsConnectionStringProperties.EventHubName;
}
#pragma warning disable CA1031
catch (Exception ex)
#pragma warning restore CA1031
{
_log.LogError(new Exception($"Unable to parse event hub properties. {ex.Message}"));
}
}

return (eventHubNamespaceFQDN, eventHubName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public async Task RunAsync(EventProcessorClient processor, CancellationToken ct)
{
EnsureArg.IsNotNull(processor);

// Reset previous checkpoints corresponding to an older source event hub (i.e. applicable if the source event hub changes)
await _checkpointClient.ResetCheckpointsAsync();
pallar-ms marked this conversation as resolved.
Show resolved Hide resolved

// Processes two types of events
// 1) Event hub events
// 2) Maximum wait events. These are generated when we have not received an event hub
Expand Down
Loading