Skip to content

Commit

Permalink
Clear Event Hub Device Watermarks when source event hub changed (#114)
Browse files Browse the repository at this point in the history
* * Update the checkpoint blob path to include event hub details
* Clear checkpoints stored in blob paths pertaining to older event hub

* addressing PR comments.

* addressing PR comments.

* fix typo
  • Loading branch information
pallar-ms authored Jun 25, 2021
1 parent a0f6447 commit a4480db
Show file tree
Hide file tree
Showing 6 changed files with 324 additions and 19 deletions.
14 changes: 12 additions & 2 deletions src/console/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,18 @@ public virtual StorageCheckpointClient ResolveCheckpointClient(IServiceProvider
var checkpointBlobClient = factory.CreateStorageClient(checkpointContainerOptions);
var logger = serviceProvider.GetRequiredService<ITelemetryLogger>();

storageOptions.BlobPrefix = applicationType;
var checkpointClient = new StorageCheckpointClient(checkpointBlobClient, storageOptions, logger);
var eventProcessorOptions = new EventHubClientOptions();
if (applicationType == _normalizationAppType)
{
Configuration.GetSection("InputEventHub").Bind(eventProcessorOptions);
}
else if (applicationType == _measurementToFhirAppType)
{
Configuration.GetSection("NormalizationEventHub").Bind(eventProcessorOptions);
}

storageOptions.BlobPrefix = $"{applicationType}/{storageOptions.BlobPrefix}";
var checkpointClient = new StorageCheckpointClient(checkpointBlobClient, storageOptions, eventProcessorOptions, logger);
return checkpointClient;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,7 @@ public interface ICheckpointClient
Task PublishCheckpointAsync(string partitionId);

Task<Checkpoint> GetCheckpointForPartitionAsync(string partitionId);

Task ResetCheckpointsAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
using System.Threading;
using System.Threading.Tasks;
using Azure;
using Azure.Messaging.EventHubs;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using EnsureThat;
using Microsoft.Health.Events.Common;
using Microsoft.Health.Events.Model;
using Microsoft.Health.Events.Telemetry;
using Microsoft.Health.Logging.Telemetry;
Expand All @@ -23,27 +25,35 @@ namespace Microsoft.Health.Events.EventCheckpointing
{
public class StorageCheckpointClient : ICheckpointClient
{
private ConcurrentDictionary<string, Checkpoint> _checkpoints;
private ConcurrentDictionary<string, int> _lastCheckpointTracker;
private int _lastCheckpointMaxCount;
private BlobContainerClient _storageClient;
private ITelemetryLogger _log;

public StorageCheckpointClient(BlobContainerClient containerClient, StorageCheckpointOptions options, ITelemetryLogger log)
private readonly string _blobCheckpointPrefix;
private readonly string _blobPath;
private readonly ConcurrentDictionary<string, Checkpoint> _checkpoints;
private readonly int _lastCheckpointMaxCount;
private readonly ConcurrentDictionary<string, int> _lastCheckpointTracker;
private readonly ITelemetryLogger _log;
private readonly BlobContainerClient _storageClient;

public StorageCheckpointClient(BlobContainerClient containerClient, StorageCheckpointOptions storageCheckpointOptions, EventHubClientOptions eventHubClientOptions, ITelemetryLogger log)
{
EnsureArg.IsNotNull(containerClient);
EnsureArg.IsNotNull(options);
BlobPrefix = options.BlobPrefix;
EnsureArg.IsNotNull(containerClient, nameof(containerClient));
EnsureArg.IsNotNull(storageCheckpointOptions, nameof(storageCheckpointOptions));
EnsureArg.IsNotNull(eventHubClientOptions, nameof(eventHubClientOptions));

(string eventHubNamespaceFQDN, string eventHubName) = GetEventHubProperties(eventHubClientOptions);
EnsureArg.IsNotNullOrWhiteSpace(eventHubNamespaceFQDN, nameof(eventHubNamespaceFQDN));
EnsureArg.IsNotNullOrWhiteSpace(eventHubName, nameof(eventHubName));

// Blob path for checkpoints includes the event hub name to scope the checkpoints per source event hub.
_blobCheckpointPrefix = $"{storageCheckpointOptions.BlobPrefix}/checkpoint/";
_blobPath = $"{_blobCheckpointPrefix}{eventHubNamespaceFQDN}/{eventHubName}/";

_lastCheckpointMaxCount = int.Parse(options.CheckpointBatchCount);
_lastCheckpointMaxCount = int.Parse(storageCheckpointOptions.CheckpointBatchCount);
_checkpoints = new ConcurrentDictionary<string, Checkpoint>();
_lastCheckpointTracker = new ConcurrentDictionary<string, int>();
_storageClient = containerClient;
_log = log;
}

public string BlobPrefix { get; }

public BlobContainerClient GetBlobContainerClient()
{
return _storageClient;
Expand All @@ -55,7 +65,7 @@ public async Task UpdateCheckpointAsync(Checkpoint checkpoint)
EnsureArg.IsNotNullOrWhiteSpace(checkpoint.Id);
var lastProcessed = EnsureArg.IsNotNullOrWhiteSpace(checkpoint.LastProcessed.DateTime.ToString("MM/dd/yyyy hh:mm:ss.fff tt"));

var blobName = $"{BlobPrefix}/checkpoint/{checkpoint.Id}";
var blobName = $"{checkpoint.Prefix}{checkpoint.Id}";
var blobClient = _storageClient.GetBlobClient(blobName);

var metadata = new Dictionary<string, string>()
Expand Down Expand Up @@ -87,7 +97,7 @@ public async Task UpdateCheckpointAsync(Checkpoint checkpoint)

public Task<Checkpoint> GetCheckpointForPartitionAsync(string partitionIdentifier)
{
var prefix = $"{BlobPrefix}/checkpoint/{partitionIdentifier}";
var prefix = $"{_blobPath}{partitionIdentifier}";

Task<Checkpoint> GetCheckpointAsync()
{
Expand All @@ -103,7 +113,7 @@ Task<Checkpoint> GetCheckpointAsync()
DateTimeOffset.TryParse(str, null, DateTimeStyles.AssumeUniversal, out lastEventTimestamp);
}

checkpoint.Prefix = BlobPrefix;
checkpoint.Prefix = _blobPath;
checkpoint.Id = partitionId;
checkpoint.LastProcessed = lastEventTimestamp;
}
Expand Down Expand Up @@ -136,7 +146,7 @@ public async Task SetCheckpointAsync(IEventMessage eventArgs)
var checkpoint = new Checkpoint();
checkpoint.LastProcessed = eventArgs.EnqueuedTime;
checkpoint.Id = partitionId;
checkpoint.Prefix = BlobPrefix;
checkpoint.Prefix = _blobPath;

_checkpoints[partitionId] = checkpoint;
var count = _lastCheckpointTracker.AddOrUpdate(partitionId, 1, (key, value) => value + 1);
Expand All @@ -161,5 +171,70 @@ public async Task PublishCheckpointAsync(string partitionId)
Checkpoint checkpoint = _checkpoints[partitionId];
await UpdateCheckpointAsync(checkpoint);
}

/// <summary>
/// Deletes the previously recorded checkpoints if the current checkpoint blob path (corresponding to the input event hub) has changed.
/// </summary>
public async Task ResetCheckpointsAsync()
{
try
{
_log.LogTrace($"Entering {nameof(ResetCheckpointsAsync)}...");

foreach (BlobItem blob in _storageClient.GetBlobs(states: BlobStates.All, prefix: _blobCheckpointPrefix, cancellationToken: CancellationToken.None))
{
if (!blob.Name.Contains(_blobPath, StringComparison.OrdinalIgnoreCase))
{
try
{
await _storageClient.DeleteBlobAsync(blob.Name, cancellationToken: CancellationToken.None);
_log.LogTrace($"Blob checkpoint path changed to {_blobPath}. Deleted checkpoint {blob.Name}.");
}
#pragma warning disable CA1031
catch (Exception ex)
#pragma warning restore CA1031
{
_log.LogError(new Exception($"Unable to delete checkpoint {blob.Name} with error {ex.Message}"));
}
}
}

_log.LogTrace($"Exiting {nameof(ResetCheckpointsAsync)}.");
}
#pragma warning disable CA1031
catch (Exception ex)
#pragma warning restore CA1031
{
_log.LogError(new Exception($"Unable to reset checkpoints. {ex.Message}"));
}
}

private (string eventHubNamespaceFQDN, string eventHubName) GetEventHubProperties(EventHubClientOptions eventHubClientOptions)
{
// If the authentication type for the event hub is ConnectionString, then parse the event hub properties (eventHubNamspaceFQDN and eventHubName) from the provided connection string,
// else return the supplied eventHubClientOptions properties for eventHubNamspaceFQDN and eventHubOptions.
var eventHubNamespaceFQDN = eventHubClientOptions.EventHubNamespaceFQDN;
var eventHubName = eventHubClientOptions.EventHubName;

if (eventHubClientOptions.AuthenticationType == AuthenticationType.ConnectionString)
{
EnsureArg.IsNotNull(eventHubClientOptions.ConnectionString, nameof(eventHubClientOptions.ConnectionString));

try
{
var eventHubsConnectionStringProperties = EventHubsConnectionStringProperties.Parse(eventHubClientOptions.ConnectionString);
eventHubNamespaceFQDN = eventHubsConnectionStringProperties.FullyQualifiedNamespace;
eventHubName = eventHubsConnectionStringProperties.EventHubName;
}
#pragma warning disable CA1031
catch (Exception ex)
#pragma warning restore CA1031
{
_log.LogError(new Exception($"Unable to parse event hub properties. {ex.Message}"));
}
}

return (eventHubNamespaceFQDN, eventHubName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ public class StorageCheckpointOptions
{
public const string Settings = "Checkpoint";

/// <summary>
/// Configurable prefix for the blob path where the checkpoints will be stored.
/// The provided prefix will be appended to the app type so as to have the checkpoints individually maintained per app type.
/// The entire blob path will comprise of this blob prefix and the event hub details(event hub namespace FQDN and event hub name)
/// appended to it, to ensure that the checkpoints are appropriately managed if the source event hub changes.
/// For example, for the Normalization app if the provided BlobPrefix is "devicedata", the complete blob path for
/// the respective checkpoints will be - Normalization/devicedata/checkpoint/*eventHubNamespaceFQDN*/*eventHubName*/
/// </summary>
public string BlobPrefix { get; set; }

public string CheckpointBatchCount { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public async Task RunAsync(EventProcessorClient processor, CancellationToken ct)
{
EnsureArg.IsNotNull(processor);

// Reset previous checkpoints corresponding to an older source event hub (i.e. applicable if the source event hub changes)
await _checkpointClient.ResetCheckpointsAsync();

// Processes two types of events
// 1) Event hub events
// 2) Maximum wait events. These are generated when we have not received an event hub
Expand Down
Loading

0 comments on commit a4480db

Please sign in to comment.