Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clear Event Hub Device Watermarks when source event hub changed #114

Merged
merged 4 commits into from
Jun 25, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions src/console/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,18 @@ public virtual StorageCheckpointClient ResolveCheckpointClient(IServiceProvider
var checkpointBlobClient = factory.CreateStorageClient(checkpointContainerOptions);
var logger = serviceProvider.GetRequiredService<ITelemetryLogger>();

storageOptions.BlobPrefix = applicationType;
var checkpointClient = new StorageCheckpointClient(checkpointBlobClient, storageOptions, logger);
var eventProcessorOptions = new EventHubClientOptions();
if (applicationType == _normalizationAppType)
{
Configuration.GetSection("InputEventHub").Bind(eventProcessorOptions);
}
else if (applicationType == _measurementToFhirAppType)
{
Configuration.GetSection("NormalizationEventHub").Bind(eventProcessorOptions);
}

storageOptions.BlobPrefix = $"{applicationType}/{storageOptions.BlobPrefix}";
pallar-ms marked this conversation as resolved.
Show resolved Hide resolved
var checkpointClient = new StorageCheckpointClient(checkpointBlobClient, storageOptions, eventProcessorOptions, logger);
return checkpointClient;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,7 @@ public interface ICheckpointClient
Task PublishCheckpointAsync(string partitionId);

Task<Checkpoint> GetCheckpointForPartitionAsync(string partitionId);

Task ResetCheckpointsAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using EnsureThat;
using Microsoft.Health.Events.Common;
using Microsoft.Health.Events.Model;
using Microsoft.Health.Events.Telemetry;
using Microsoft.Health.Logging.Telemetry;
Expand All @@ -23,17 +24,25 @@ namespace Microsoft.Health.Events.EventCheckpointing
{
public class StorageCheckpointClient : ICheckpointClient
{
private ConcurrentDictionary<string, Checkpoint> _checkpoints;
private ConcurrentDictionary<string, int> _lastCheckpointTracker;
private int _lastCheckpointMaxCount;
private BlobContainerClient _storageClient;
private ITelemetryLogger _log;

public StorageCheckpointClient(BlobContainerClient containerClient, StorageCheckpointOptions options, ITelemetryLogger log)
private readonly string _blobCheckpointPrefix;
private readonly string _blobPath;
private readonly ConcurrentDictionary<string, Checkpoint> _checkpoints;
private readonly int _lastCheckpointMaxCount;
private readonly ConcurrentDictionary<string, int> _lastCheckpointTracker;
private readonly ITelemetryLogger _log;
private readonly BlobContainerClient _storageClient;

public StorageCheckpointClient(BlobContainerClient containerClient, StorageCheckpointOptions options, EventHubClientOptions eventHubClientOptions, ITelemetryLogger log)
{
EnsureArg.IsNotNull(containerClient);
EnsureArg.IsNotNull(options);
BlobPrefix = options.BlobPrefix;
EnsureArg.IsNotNull(eventHubClientOptions);

// Blob path for checkpoints includes the event hub name to scope the checkpoints per source event hub.
_blobCheckpointPrefix = $"{options.BlobPrefix}/checkpoint/";
pallar-ms marked this conversation as resolved.
Show resolved Hide resolved
_blobPath = eventHubClientOptions != default ?
pallar-ms marked this conversation as resolved.
Show resolved Hide resolved
$"{_blobCheckpointPrefix}{eventHubClientOptions.EventHubNamespaceFQDN}/{eventHubClientOptions.EventHubName}/" :
_blobCheckpointPrefix;
pallar-ms marked this conversation as resolved.
Show resolved Hide resolved

_lastCheckpointMaxCount = int.Parse(options.CheckpointBatchCount);
_checkpoints = new ConcurrentDictionary<string, Checkpoint>();
Expand All @@ -42,8 +51,6 @@ public StorageCheckpointClient(BlobContainerClient containerClient, StorageCheck
_log = log;
}

public string BlobPrefix { get; }

public BlobContainerClient GetBlobContainerClient()
{
return _storageClient;
Expand All @@ -55,7 +62,7 @@ public async Task UpdateCheckpointAsync(Checkpoint checkpoint)
EnsureArg.IsNotNullOrWhiteSpace(checkpoint.Id);
var lastProcessed = EnsureArg.IsNotNullOrWhiteSpace(checkpoint.LastProcessed.DateTime.ToString("MM/dd/yyyy hh:mm:ss.fff tt"));

var blobName = $"{BlobPrefix}/checkpoint/{checkpoint.Id}";
var blobName = $"{checkpoint.Prefix}{checkpoint.Id}";
var blobClient = _storageClient.GetBlobClient(blobName);

var metadata = new Dictionary<string, string>()
Expand Down Expand Up @@ -87,7 +94,7 @@ public async Task UpdateCheckpointAsync(Checkpoint checkpoint)

public Task<Checkpoint> GetCheckpointForPartitionAsync(string partitionIdentifier)
{
var prefix = $"{BlobPrefix}/checkpoint/{partitionIdentifier}";
var prefix = $"{_blobPath}{partitionIdentifier}";

Task<Checkpoint> GetCheckpointAsync()
{
Expand All @@ -103,7 +110,7 @@ Task<Checkpoint> GetCheckpointAsync()
DateTimeOffset.TryParse(str, null, DateTimeStyles.AssumeUniversal, out lastEventTimestamp);
}

checkpoint.Prefix = BlobPrefix;
checkpoint.Prefix = _blobPath;
checkpoint.Id = partitionId;
checkpoint.LastProcessed = lastEventTimestamp;
}
Expand Down Expand Up @@ -136,7 +143,7 @@ public async Task SetCheckpointAsync(IEventMessage eventArgs)
var checkpoint = new Checkpoint();
checkpoint.LastProcessed = eventArgs.EnqueuedTime;
checkpoint.Id = partitionId;
checkpoint.Prefix = BlobPrefix;
checkpoint.Prefix = _blobPath;

_checkpoints[partitionId] = checkpoint;
var count = _lastCheckpointTracker.AddOrUpdate(partitionId, 1, (key, value) => value + 1);
Expand All @@ -161,5 +168,28 @@ public async Task PublishCheckpointAsync(string partitionId)
Checkpoint checkpoint = _checkpoints[partitionId];
await UpdateCheckpointAsync(checkpoint);
}

/// <summary>
/// Deletes the previously recorded checkpoints if the current checkpoint blob path (corresponding to the input event hub) has changed.
/// </summary>
public async Task ResetCheckpointsAsync()
{
try
{
foreach (BlobItem blob in _storageClient.GetBlobs(states: BlobStates.All, prefix: _blobCheckpointPrefix, cancellationToken: CancellationToken.None))
pallar-ms marked this conversation as resolved.
Show resolved Hide resolved
{
if (!blob.Name.Contains(_blobPath, StringComparison.OrdinalIgnoreCase))
pallar-ms marked this conversation as resolved.
Show resolved Hide resolved
{
await _storageClient.DeleteBlobAsync(blob.Name, cancellationToken: CancellationToken.None);
}
}
}
#pragma warning disable CA1031
catch (Exception ex)
#pragma warning restore CA1031
{
_log.LogError(new Exception($"Unable to reset checkpoints. {ex.Message}"));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public async Task RunAsync(EventProcessorClient processor, CancellationToken ct)
{
EnsureArg.IsNotNull(processor);

// Reset previous checkpoints corresponding to an older source event hub (i.e. applicable if the source event hub changes)
await _checkpointClient.ResetCheckpointsAsync();
pallar-ms marked this conversation as resolved.
Show resolved Hide resolved

// Processes two types of events
// 1) Event hub events
// 2) Maximum wait events. These are generated when we have not received an event hub
Expand Down
174 changes: 174 additions & 0 deletions test/Microsoft.Health.Events.UnitTest/StorageCheckpointTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Microsoft.Health.Events.Common;
using Microsoft.Health.Events.EventCheckpointing;
using Microsoft.Health.Logging.Telemetry;
using NSubstitute;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

namespace Microsoft.Health.Events.UnitTest
{
public class StorageCheckpointTests
{
private readonly BlobContainerClient _blobContainerClient;
private readonly EventHubClientOptions _eventHubClientOptions;
private readonly ITelemetryLogger _logger;
private readonly StorageCheckpointOptions _storageCheckpointOptions;

private readonly string _blobCheckpointPrefix;
private readonly string _blobPath;
private readonly string _eventHubName;
private readonly string _eventHubNamespaceFQDN;

public StorageCheckpointTests()
{
_blobContainerClient = Substitute.For<BlobContainerClient>();

_storageCheckpointOptions = new StorageCheckpointOptions()
{
BlobPrefix = "Normalization",
CheckpointBatchCount = "5"
};

_eventHubClientOptions = new EventHubClientOptions();
_eventHubNamespaceFQDN = "test.servicebus.windows.net";
_eventHubName = "devicedata";

// Blob path corresponds to current event hub name
_blobCheckpointPrefix = $"{_storageCheckpointOptions.BlobPrefix}/checkpoint/";
_blobPath = $"{_blobCheckpointPrefix}{_eventHubNamespaceFQDN}/{_eventHubName}/";

IReadOnlyList<BlobItem> mockBlobItems = new List<BlobItem>()
{
BlobsModelFactory.BlobItem(name: $"{_blobPath}1"),
BlobsModelFactory.BlobItem(name: $"{_blobPath}10"),
BlobsModelFactory.BlobItem(name: $"{_blobPath}20")
};

var mockPageBlobItems = Page<BlobItem>.FromValues(mockBlobItems, "continuationToken", Substitute.For<Response>());
var mockPageableBlobItems = Pageable<BlobItem>.FromPages(new[] { mockPageBlobItems });

_blobContainerClient.GetBlobs(states: BlobStates.All, prefix: _blobCheckpointPrefix, cancellationToken: CancellationToken.None)
.Returns(mockPageableBlobItems);

_logger = Substitute.For<ITelemetryLogger>();
}

[Fact]
public void GivenNullParameters_WhenStorageCheckpointClientCreated_Throws()
{
Assert.Throws<ArgumentNullException>(() => new StorageCheckpointClient(null, _storageCheckpointOptions, _eventHubClientOptions, _logger));
Assert.Throws<ArgumentNullException>(() => new StorageCheckpointClient(_blobContainerClient, null, _eventHubClientOptions, _logger));
Assert.Throws<ArgumentNullException>(() => new StorageCheckpointClient(_blobContainerClient, _storageCheckpointOptions, null, _logger));
}

[Fact]
public async Task GivenUnchangedEventHubOptions_WhenResetCheckpointsAsyncCalled_ThenNoCheckpointsAreDeleted()
{
_eventHubClientOptions.EventHubNamespaceFQDN = _eventHubNamespaceFQDN;
_eventHubClientOptions.EventHubName = _eventHubName;

var storageClient = new StorageCheckpointClient(_blobContainerClient, _storageCheckpointOptions, _eventHubClientOptions, _logger);
await storageClient.ResetCheckpointsAsync();

// Given that the source event hub didn't change, verify that no checkpoint deletions occured.
_blobContainerClient.Received(1).GetBlobs(states: BlobStates.All, prefix: _blobCheckpointPrefix, cancellationToken: CancellationToken.None);
await _blobContainerClient.ReceivedWithAnyArgs(0).DeleteBlobAsync(null);
}

[Fact]
public async Task GivenUpdatedEventHubNamespace_WhenResetCheckpointsAsyncCalled_ThenPreviousCheckpointsAreDeleted()
{
_eventHubClientOptions.EventHubNamespaceFQDN = "newtest.servicebus.windows.net";
_eventHubClientOptions.EventHubName = _eventHubName;

var storageClient = new StorageCheckpointClient(_blobContainerClient, _storageCheckpointOptions, _eventHubClientOptions, _logger);
await storageClient.ResetCheckpointsAsync();

// Given that the event hub namespace changed and is therefore a new source, verify that the checkpoints corresponding to the old source will be deleted.
_blobContainerClient.Received(1).GetBlobs(states: BlobStates.All, prefix: _blobCheckpointPrefix, cancellationToken: CancellationToken.None);
await _blobContainerClient.ReceivedWithAnyArgs(3).DeleteBlobAsync(null);
await _blobContainerClient.Received(1).DeleteBlobAsync($"{_blobPath}1");
await _blobContainerClient.Received(1).DeleteBlobAsync($"{_blobPath}10");
await _blobContainerClient.Received(1).DeleteBlobAsync($"{_blobPath}20");
}

[Fact]
public async Task GivenUpdatedEventHubName_WhenResetCheckpointsAsyncCalled_ThenPreviousCheckpointsAreDeleted()
{
_eventHubClientOptions.EventHubNamespaceFQDN = _eventHubNamespaceFQDN;
_eventHubClientOptions.EventHubName = "newdevicedata";

var storageClient = new StorageCheckpointClient(_blobContainerClient, _storageCheckpointOptions, _eventHubClientOptions, _logger);
await storageClient.ResetCheckpointsAsync();

// Given that the event hub changed and is therefore a new source, verify that the checkpoints corresponding to the old source will be deleted.
_blobContainerClient.Received(1).GetBlobs(states: BlobStates.All, prefix: _blobCheckpointPrefix, cancellationToken: CancellationToken.None);
await _blobContainerClient.ReceivedWithAnyArgs(3).DeleteBlobAsync(null);
await _blobContainerClient.Received(1).DeleteBlobAsync($"{_blobPath}1");
await _blobContainerClient.Received(1).DeleteBlobAsync($"{_blobPath}10");
await _blobContainerClient.Received(1).DeleteBlobAsync($"{_blobPath}20");
}

[Fact]
public async Task GivenUpdatedEventHubNamespaceAndEventHubName_WhenResetCheckpointsAsyncCalled_ThenPreviousCheckpointsAreDeleted()
{
_eventHubClientOptions.EventHubNamespaceFQDN = "newtest.servicebus.windows.net";
_eventHubClientOptions.EventHubName = "newdevicedata";

var storageClient = new StorageCheckpointClient(_blobContainerClient, _storageCheckpointOptions, _eventHubClientOptions, _logger);
await storageClient.ResetCheckpointsAsync();

// Given that the event hub namespace and the event hub name changed and is therefore a new source, verify that the checkpoints corresponding to the old source will be deleted.
_blobContainerClient.Received(1).GetBlobs(states: BlobStates.All, prefix: _blobCheckpointPrefix, cancellationToken: CancellationToken.None);
await _blobContainerClient.ReceivedWithAnyArgs(3).DeleteBlobAsync(null);
await _blobContainerClient.Received(1).DeleteBlobAsync($"{_blobPath}1");
await _blobContainerClient.Received(1).DeleteBlobAsync($"{_blobPath}10");
await _blobContainerClient.Received(1).DeleteBlobAsync($"{_blobPath}20");
}

[Fact]
public async Task GivenDifferentAppType_WhenResetCheckpointsAsyncCalled_ThenCheckpointsOfOtherAppsAreNotDeleted()
{
_eventHubClientOptions.EventHubNamespaceFQDN = _eventHubNamespaceFQDN;
_eventHubClientOptions.EventHubName = "newdevicedata";
_storageCheckpointOptions.BlobPrefix = "MeasurementToFhir";
var fhirconvBlobCheckpointPrefix = $"{_storageCheckpointOptions.BlobPrefix }/checkpoint/";
var fhirconvBlobPath = $"{fhirconvBlobCheckpointPrefix}{_eventHubNamespaceFQDN}/{_eventHubName}/";

IReadOnlyList<BlobItem> mockBlobItems = new List<BlobItem>()
{
BlobsModelFactory.BlobItem(name: $"{fhirconvBlobPath}1"),
BlobsModelFactory.BlobItem(name: $"{fhirconvBlobPath}10"),
BlobsModelFactory.BlobItem(name: $"{fhirconvBlobPath}20")
};

var mockPageBlobItems = Page<BlobItem>.FromValues(mockBlobItems, "continuationToken", Substitute.For<Response>());
var mockPageableBlobItems = Pageable<BlobItem>.FromPages(new[] { mockPageBlobItems });

_blobContainerClient.GetBlobs(states: BlobStates.All, prefix: fhirconvBlobCheckpointPrefix, cancellationToken: CancellationToken.None)
.Returns(mockPageableBlobItems);

var storageClient = new StorageCheckpointClient(_blobContainerClient, _storageCheckpointOptions, _eventHubClientOptions, _logger);
await storageClient.ResetCheckpointsAsync();

// Given that we are processing events for a different app type and the source changed, verify that the previous checkpoints corresponding to this app are deleted.
_blobContainerClient.Received(1).GetBlobs(states: BlobStates.All, prefix: fhirconvBlobCheckpointPrefix, cancellationToken: CancellationToken.None);
await _blobContainerClient.ReceivedWithAnyArgs(3).DeleteBlobAsync(null);
await _blobContainerClient.Received(1).DeleteBlobAsync($"{fhirconvBlobPath}1");
await _blobContainerClient.Received(1).DeleteBlobAsync($"{fhirconvBlobPath}10");
await _blobContainerClient.Received(1).DeleteBlobAsync($"{fhirconvBlobPath}20");

// Given that we are processing events for a different app type, verify that the checkpoints corresponding to the other apps are not deleted.
_blobContainerClient.Received(0).GetBlobs(states: BlobStates.All, prefix: _blobCheckpointPrefix, cancellationToken: CancellationToken.None);
await _blobContainerClient.Received(0).DeleteBlobAsync($"{_blobPath}1");
await _blobContainerClient.Received(0).DeleteBlobAsync($"{_blobPath}10");
await _blobContainerClient.Received(0).DeleteBlobAsync($"{_blobPath}20");
}
}
}