From 280101402db73f4824825a568ea895e639d42e1b Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Mon, 7 Dec 2020 12:15:43 -0800 Subject: [PATCH] Add legacy checkpoint reading support to EventHubs (#17335) --- .../BlobsCheckpointStoreTests.cs | 607 +++++++++++++----- .../BlobsCheckpointStore.cs | 163 ++++- 2 files changed, 606 insertions(+), 164 deletions(-) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs index c98d6c5232428..8fc6b663a657f 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.IO; using System.Linq; +using System.Text; using System.Threading; using System.Threading.Tasks; using Azure.Messaging.EventHubs.Consumer; @@ -87,7 +88,8 @@ public void ConstructorRequiresRetryPolicy() [Test] public async Task ListOwnershipLogsStartAndComplete() { - var blobList = new List{ + var blobList = new List + { BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/ownership/{Guid.NewGuid().ToString()}", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), @@ -143,7 +145,8 @@ public async Task ClaimOwnershipLogsStartAndComplete() } }; - var target = new BlobsCheckpointStore(new MockBlobContainerClient(), + var mockBlobContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/ownership/1", _ => { }); + var target = new BlobsCheckpointStore(mockBlobContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); var mockLog = new Mock(); target.Logger = mockLog.Object; @@ -177,7 +180,7 @@ public void ClaimOwnershipLogsErrors() var expectedException = new DllNotFoundException("BOOM!"); var mockLog = new Mock(); - var mockContainerClient = new MockBlobContainerClient() { BlobClientUploadBlobException = expectedException }; + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/ownership/1", client => client.UploadBlobException = expectedException); var target = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); target.Logger = mockLog.Object; @@ -206,7 +209,8 @@ public async Task ClaimOwnershipForNewPartitionLogsOwnershipClaimed() } }; - var target = new BlobsCheckpointStore(new MockBlobContainerClient(), + var mockBlobContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/ownership/1", _ => { }); + var target = new BlobsCheckpointStore(mockBlobContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); var mockLog = new Mock(); target.Logger = mockLog.Object; @@ -240,7 +244,8 @@ public async Task ClaimOwnershipForExistingPartitionLogsOwnershipClaimed() } }; - var target = new BlobsCheckpointStore(new MockBlobContainerClient { BlobInfo = blobInfo }, + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/ownership/1", client => client.BlobInfo = blobInfo); + var target = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); var mockLog = new Mock(); target.Logger = mockLog.Object; @@ -274,7 +279,8 @@ public async Task ClaimOwnershipForExistingPartitionWithWrongEtagLogsOwnershipNo } }; - var target = new BlobsCheckpointStore(new MockBlobContainerClient { BlobInfo = blobInfo }, + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/ownership/1", client => client.BlobInfo = blobInfo); + var target = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); var mockLog = new Mock(); target.Logger = mockLog.Object; @@ -306,8 +312,10 @@ public void ClaimOwnershipForMissingPartitionThrowsAndLogsOwnershipNotClaimable( } }; - var target = new BlobsCheckpointStore(new MockBlobContainerClient(), + var mockBlobContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/ownership/1", _ => { }); + var target = new BlobsCheckpointStore(mockBlobContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); + var mockLog = new Mock(); target.Logger = mockLog.Object; @@ -321,7 +329,8 @@ public void ClaimOwnershipForMissingPartitionThrowsAndLogsOwnershipNotClaimable( [Test] public async Task ListCheckpointsLogsStartAndComplete() { - var blobList = new List{ + var blobList = new List + { BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{Guid.NewGuid().ToString()}", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), @@ -353,7 +362,8 @@ public async Task ListCheckpointsUsesOffsetAsTheStartingPositionWhenPresent() var expectedOffset = 13; var expectedStartingPosition = EventPosition.FromOffset(expectedOffset, false); - var blobList = new List{ + var blobList = new List + { BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{Guid.NewGuid().ToString()}", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), @@ -382,7 +392,8 @@ public async Task ListCheckpointsUsesSequenceNumberAsTheStartingPositionWhenNoOf var expectedSequence = 133; var expectedStartingPosition = EventPosition.FromSequenceNumber(expectedSequence, false); - var blobList = new List{ + var blobList = new List + { BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{Guid.NewGuid().ToString()}", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), @@ -409,7 +420,8 @@ public async Task ListCheckpointsConsidersDataInvalidWithNoOffsetOrSequenceNumbe { var partitionId = "67"; - var blobList = new List{ + var blobList = new List + { BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{partitionId}", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), @@ -433,6 +445,249 @@ public async Task ListCheckpointsConsidersDataInvalidWithNoOffsetOrSequenceNumbe mockLogger.Verify(log => log.InvalidCheckpointFound(partitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup)); } + /// + /// Verifies basic functionality of ListCheckpointsAsync and ensures the starting position is set correctly. + /// + /// + [Test] + public async Task ListCheckpointsPreferredNewCheckpointOverLegacy() + { + string partitionId = Guid.NewGuid().ToString(); + var blobList = new List + { + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{partitionId}", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag), contentLength: 0), + "snapshot", + new Dictionary + { + {BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()}, + {BlobMetadataKey.SequenceNumber, "960182"}, + {BlobMetadataKey.Offset, "14"} + }), + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/{partitionId}", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot") + }; + + var containerClient = new MockBlobContainerClient() { Blobs = blobList }; + containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/{partitionId}", client => + { + client.Content = Encoding.UTF8.GetBytes("{" + + "\"PartitionId\":\"0\"," + + "\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," + + "\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," + + "\"Epoch\":386," + + "\"Offset\":\"13\"," + + "\"SequenceNumber\":960180" + + "}"); + }); + + var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), readLegacyCheckpoints: true); + var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); + + Assert.That(checkpoints, Has.One.Items, "A single checkpoint should have been returned."); + Assert.That(checkpoints.Single().StartingPosition, Is.EqualTo(EventPosition.FromOffset(14, false))); + Assert.That(checkpoints.Single().PartitionId, Is.EqualTo(partitionId)); + } + + /// + /// Verifies basic functionality of ListCheckpointsAsync and ensures the starting position is set correctly. + /// + /// + [Test] + public async Task ListCheckpointsMergesNewAndLegacyCheckpoints() + { + string partitionId1 = Guid.NewGuid().ToString(); + string partitionId2 = Guid.NewGuid().ToString(); + var blobList = new List + { + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{partitionId1}", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag), contentLength: 0), + "snapshot", + new Dictionary + { + {BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()}, + {BlobMetadataKey.SequenceNumber, "960182"}, + {BlobMetadataKey.Offset, "14"} + }), + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/{partitionId2}", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot") + }; + + var containerClient = new MockBlobContainerClient() { Blobs = blobList }; + containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/{partitionId2}", client => + { + client.Content = Encoding.UTF8.GetBytes("{" + + "\"PartitionId\":\"0\"," + + "\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," + + "\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," + + "\"Epoch\":386," + + "\"Offset\":\"13\"," + + "\"SequenceNumber\":960180" + + "}"); + }); + + var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), readLegacyCheckpoints: true); + var checkpoints = (await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken())).ToArray(); + + Assert.That(checkpoints, Has.Exactly(2).Items, "Two checkpoints should have been returned."); + Assert.That(checkpoints[0].StartingPosition, Is.EqualTo(EventPosition.FromOffset(14, false))); + Assert.That(checkpoints[0].PartitionId, Is.EqualTo(partitionId1)); + Assert.That(checkpoints[1].StartingPosition, Is.EqualTo(EventPosition.FromOffset(13, false))); + Assert.That(checkpoints[1].PartitionId, Is.EqualTo(partitionId2)); + } + + /// + /// Verifies basic functionality of ListCheckpointsAsync and ensures the starting position is set correctly. + /// + /// + [Test] + public async Task ListCheckpointsUsesOffsetAsTheStartingPositionWhenPresentInLegacyCheckpoint() + { + var blobList = new List + { + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot") + }; + + var containerClient = new MockBlobContainerClient() { Blobs = blobList }; + containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", client => + { + client.Content = Encoding.UTF8.GetBytes("{" + + "\"PartitionId\":\"0\"," + + "\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," + + "\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," + + "\"Epoch\":386," + + "\"Offset\":\"13\"," + + "\"SequenceNumber\":960180" + + "}"); + }); + + var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), readLegacyCheckpoints: true); + var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); + + Assert.That(checkpoints, Is.Not.Null, "A set of checkpoints should have been returned."); + Assert.That(checkpoints.Single().StartingPosition, Is.EqualTo(EventPosition.FromOffset(13, false))); + Assert.That(checkpoints.Single().PartitionId, Is.EqualTo("0")); + } + + /// + /// Verifies basic functionality of ListCheckpointsAsync and ensures the starting position is set correctly. + /// + /// + [Test] + public async Task ListCheckpointsUsesSequenceNumberAsTheStartingPositionWhenNoOffsetIsPresentInLegacyCheckpoint() + { + var blobList = new List + { + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot") + }; + + var containerClient = new MockBlobContainerClient() { Blobs = blobList }; + containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", client => + { + client.Content = Encoding.UTF8.GetBytes("{" + + "\"PartitionId\":\"0\"," + + "\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," + + "\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," + + "\"Epoch\":386," + + "\"SequenceNumber\":960180" + + "}"); + }); + + var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), readLegacyCheckpoints: true); + var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); + + Assert.That(checkpoints, Is.Not.Null, "A set of checkpoints should have been returned."); + Assert.That(checkpoints.Single().StartingPosition, Is.EqualTo(EventPosition.FromSequenceNumber(960180, false))); + Assert.That(checkpoints.Single().PartitionId, Is.EqualTo("0")); + } + + /// + /// Verifies basic functionality of ListCheckpointsAsync and ensures the starting position is set correctly. + /// + /// + [Test] + public async Task ListCheckpointsConsidersDataInvalidWithNoOffsetOrSequenceNumberLegacyCheckpoint() + { + var blobList = new List + { + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot") + }; + + var containerClient = new MockBlobContainerClient() { Blobs = blobList }; + containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", client => + { + client.Content = Encoding.UTF8.GetBytes("{" + + "\"PartitionId\":\"0\"," + + "\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," + + "\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," + + "\"Epoch\":386}"); + }); + + var mockLogger = new Mock(); + var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), readLegacyCheckpoints: true); + + target.Logger = mockLogger.Object; + + var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); + + Assert.That(checkpoints, Is.Not.Null, "A set of checkpoints should have been returned."); + Assert.That(checkpoints.Any(), Is.False, "No valid checkpoints should exist."); + + mockLogger.Verify(log => log.InvalidCheckpointFound("0", FullyQualifiedNamespace, EventHubName, ConsumerGroup)); + } + + /// + /// Verifies basic functionality of ListCheckpointsAsync and ensures the starting position is set correctly. + /// + /// + [TestCase("")] + [TestCase("{\"PartitionId\":\"0\",\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\",")] + [TestCase("\0\0\0")] + public async Task ListCheckpointsConsidersDataInvalidWithLegacyCheckpointBlobContainingInvalidJson(string json) + { + var blobList = new List + { + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot") + }; + + var containerClient = new MockBlobContainerClient() { Blobs = blobList }; + + containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", client => + { + client.Content = Encoding.UTF8.GetBytes(json); + }); + + var mockLogger = new Mock(); + var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), readLegacyCheckpoints: true); + + target.Logger = mockLogger.Object; + + var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); + + Assert.That(checkpoints, Is.Not.Null, "A set of checkpoints should have been returned."); + Assert.That(checkpoints.Any(), Is.False, "No valid checkpoints should exist."); + + mockLogger.Verify(log => log.InvalidCheckpointFound("0", FullyQualifiedNamespace, EventHubName, ConsumerGroup)); + } + /// /// Verifies basic functionality of ListCheckpointsAsync and ensures the appropriate events are emitted when errors occur. /// @@ -460,7 +715,8 @@ public async Task ListCheckpointsLogsInvalidCheckpoint() { var partitionId = Guid.NewGuid().ToString(); - var blobList = new List{ + var blobList = new List + { BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{partitionId}", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), @@ -511,14 +767,23 @@ public async Task UpdateCheckpointLogsStartAndCompleteWhenTheBlobExists() var blobInfo = BlobsModelFactory.BlobInfo(new ETag($@"""{MatchingEtag}"""), DateTime.UtcNow); - var blobList = new List{ + var blobList = new List + { BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/ownership/{Guid.NewGuid().ToString()}", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), "snapshot", new Dictionary {{BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()}}) }; - var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList, BlobInfo = blobInfo, BlobClientUploadBlobException = new Exception("Upload should not be called") }, + + var mockContainerClient = new MockBlobContainerClient() { Blobs = blobList }; + mockContainerClient.AddBlobClient("fqns/name/group/checkpoint/1", client => + { + client.BlobInfo = blobInfo; + client.UploadBlobException = new Exception("Upload should not be called"); + }); + + var target = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); var mockLog = new Mock(); target.Logger = mockLog.Object; @@ -543,14 +808,17 @@ public async Task UpdateCheckpointLogsStartAndCompleteWhenTheBlobDoesNotExist() PartitionId = PartitionId, }; - var blobList = new List{ + var blobList = new List + { BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/ownership/{Guid.NewGuid().ToString()}", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), "snapshot", new Dictionary {{BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()}}) }; - var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, + var mockBlobContainerClient = new MockBlobContainerClient() { Blobs = blobList }; + mockBlobContainerClient.AddBlobClient("fqns/name/group/checkpoint/1", _ => { }); + var target = new BlobsCheckpointStore(mockBlobContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); var mockLog = new Mock(); target.Logger = mockLog.Object; @@ -578,7 +846,13 @@ public void UpdateCheckpointLogsErrorsWhenTheBlobExists() var expectedException = new DllNotFoundException("BOOM!"); var mockLog = new Mock(); - var mockContainerClient = new MockBlobContainerClient() { BlobClientSetMetadataException = expectedException, BlobClientUploadBlobException = new Exception("Upload should not be called") }; + + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/checkpoint/1", client => + { + client.BlobClientSetMetadataException = expectedException; + client.UploadBlobException = new Exception("Upload should not be called"); + }); + var target = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); target.Logger = mockLog.Object; @@ -605,7 +879,10 @@ public void UpdateCheckpointLogsErrorsWhenTheBlobDoesNotExist() var expectedException = new DllNotFoundException("BOOM!"); var mockLog = new Mock(); - var mockContainerClient = new MockBlobContainerClient() { BlobClientUploadBlobException = expectedException }; + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/checkpoint/1", client => + { + client.UploadBlobException = expectedException; + }); var target = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); target.Logger = mockLog.Object; @@ -630,7 +907,8 @@ public void UpdateCheckpointForMissingContainerThrowsAndLogsCheckpointUpdateErro }; var ex = new RequestFailedException(404, BlobErrorCode.ContainerNotFound.ToString(), BlobErrorCode.ContainerNotFound.ToString(), null); - var target = new BlobsCheckpointStore(new MockBlobContainerClient(blobClientUploadBlobException: ex), + var mockBlobContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/checkpoint/1", client => client.UploadBlobException = ex); + var target = new BlobsCheckpointStore(mockBlobContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); var mockLog = new Mock(); target.Logger = mockLog.Object; @@ -773,10 +1051,7 @@ public void ClaimOwnershipAsyncRetriesAndSurfacesRetriableExceptionsWhenVersionI var expectedServiceCalls = (maximumRetries + 1); var serviceCalls = 0; - var mockRetryPolicy = new Mock(); - var mockContainerClient = new MockBlobContainerClient(); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, mockRetryPolicy.Object); var ownership = new EventProcessorPartitionOwnership { @@ -791,11 +1066,15 @@ public void ClaimOwnershipAsyncRetriesAndSurfacesRetriableExceptionsWhenVersionI .Setup(policy => policy.CalculateRetryDelay(It.Is(value => value == exception), It.Is(value => value <= maximumRetries))) .Returns(TimeSpan.FromMilliseconds(5)); - mockContainerClient.BlobClientUploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/ownership/pid", client => { - serviceCalls++; - throw exception; - }; + client.UploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => + { + serviceCalls++; + throw exception; + }; + }); + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, mockRetryPolicy.Object); // To ensure that the test does not hang for the duration, set a timeout to force completion // after a shorter period of time. @@ -820,9 +1099,6 @@ public void ClaimOwnershipAsyncSurfacesNonRetriableExceptionsWhenVersionIsNull(E var expectedServiceCalls = 1; var serviceCalls = 0; - var mockContainerClient = new MockBlobContainerClient(); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); - var ownership = new EventProcessorPartitionOwnership { FullyQualifiedNamespace = "ns", @@ -832,11 +1108,15 @@ public void ClaimOwnershipAsyncSurfacesNonRetriableExceptionsWhenVersionIsNull(E PartitionId = "pid" }; - mockContainerClient.BlobClientUploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/ownership/pid", client => { - serviceCalls++; - throw exception; - }; + client.UploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => + { + serviceCalls++; + throw exception; + }; + }); + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); // To ensure that the test does not hang for the duration, set a timeout to force completion // after a shorter period of time. @@ -864,9 +1144,6 @@ public void ClaimOwnershipAsyncRetriesAndSurfacesRetriableExceptionsWhenVersionI var serviceCalls = 0; var mockRetryPolicy = new Mock(); - var mockContainerClient = new MockBlobContainerClient(); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, mockRetryPolicy.Object); - var ownership = new EventProcessorPartitionOwnership { FullyQualifiedNamespace = "ns", @@ -881,13 +1158,15 @@ public void ClaimOwnershipAsyncRetriesAndSurfacesRetriableExceptionsWhenVersionI .Setup(policy => policy.CalculateRetryDelay(It.Is(value => value == exception), It.Is(value => value <= maximumRetries))) .Returns(TimeSpan.FromMilliseconds(5)); - mockContainerClient.BlobInfo = Mock.Of(); - - mockContainerClient.BlobClientSetMetadataAsyncCallback = (metadata, conditions, token) => + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/ownership/pid", client => { - serviceCalls++; - throw exception; - }; + client.SetMetadataAsyncCallback = (metadata, conditions, token) => + { + serviceCalls++; + throw exception; + }; + }); + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, mockRetryPolicy.Object); // To ensure that the test does not hang for the duration, set a timeout to force completion // after a shorter period of time. @@ -911,10 +1190,6 @@ public void ClaimOwnershipAsyncSurfacesNonRetriableExceptionsWhenVersionIsNotNul { var expectedServiceCalls = 1; var serviceCalls = 0; - - var mockContainerClient = new MockBlobContainerClient(); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); - var ownership = new EventProcessorPartitionOwnership { FullyQualifiedNamespace = "ns", @@ -925,13 +1200,15 @@ public void ClaimOwnershipAsyncSurfacesNonRetriableExceptionsWhenVersionIsNotNul Version = "eTag" }; - mockContainerClient.BlobInfo = Mock.Of(); - - mockContainerClient.BlobClientSetMetadataAsyncCallback = (metadata, conditions, token) => + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/ownership/pid", client => { - serviceCalls++; - throw exception; - }; + client.SetMetadataAsyncCallback = (metadata, conditions, token) => + { + serviceCalls++; + throw exception; + }; + }); + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); // To ensure that the test does not hang for the duration, set a timeout to force completion // after a shorter period of time. @@ -954,9 +1231,6 @@ public void ClaimOwnershipAsyncSurfacesNonRetriableExceptionsWhenVersionIsNotNul [TestCase("eTag")] public async Task ClaimOwnershipAsyncDelegatesTheCancellationToken(string version) { - var mockContainerClient = new MockBlobContainerClient(); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); - var ownership = new EventProcessorPartitionOwnership { FullyQualifiedNamespace = "ns", @@ -973,32 +1247,36 @@ public async Task ClaimOwnershipAsyncDelegatesTheCancellationToken(string versio // UploadAsync will be called if eTag is null; SetMetadataAsync is used otherwise. - if (version == null) + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/ownership/pid", client => { - mockContainerClient.BlobClientUploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => + if (version == null) { - if (!stateBeforeCancellation.HasValue) + client.UploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => { - stateBeforeCancellation = token.IsCancellationRequested; - cancellationSource.Cancel(); - stateAfterCancellation = token.IsCancellationRequested; - } - }; - } - else - { - mockContainerClient.BlobInfo = Mock.Of(); - - mockContainerClient.BlobClientSetMetadataAsyncCallback = (metadata, conditions, token) => + if (!stateBeforeCancellation.HasValue) + { + stateBeforeCancellation = token.IsCancellationRequested; + cancellationSource.Cancel(); + stateAfterCancellation = token.IsCancellationRequested; + } + }; + } + else { - if (!stateBeforeCancellation.HasValue) + client.BlobInfo = Mock.Of(); + + client.SetMetadataAsyncCallback = (metadata, conditions, token) => { - stateBeforeCancellation = token.IsCancellationRequested; - cancellationSource.Cancel(); - stateAfterCancellation = token.IsCancellationRequested; - } - }; - } + if (!stateBeforeCancellation.HasValue) + { + stateBeforeCancellation = token.IsCancellationRequested; + cancellationSource.Cancel(); + stateAfterCancellation = token.IsCancellationRequested; + } + }; + } + }); + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); await checkpointStore.ClaimOwnershipAsync(new List() { ownership }, cancellationSource.Token); @@ -1159,7 +1437,18 @@ public void UpdateCheckpointAsyncRetriesAndSurfacesRetriableExceptionsWhenTheBlo var serviceCalls = 0; var blobInfo = BlobsModelFactory.BlobInfo(new ETag($@"""{MatchingEtag}"""), DateTime.UtcNow); - var mockContainerClient = new MockBlobContainerClient { BlobInfo = blobInfo, BlobClientUploadBlobException = new Exception("Upload should not be called") }; + + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/checkpoint/pid", client => + { + client.BlobInfo = blobInfo; + client.UploadBlobException = new Exception("Upload should not be called"); + client.SetMetadataAsyncCallback = (metadata, conditions, token) => + { + serviceCalls++; + throw exception; + }; + }); + var mockRetryPolicy = new Mock(); var checkpointStore = new BlobsCheckpointStore(mockContainerClient, mockRetryPolicy.Object); @@ -1175,12 +1464,6 @@ public void UpdateCheckpointAsyncRetriesAndSurfacesRetriableExceptionsWhenTheBlo .Setup(policy => policy.CalculateRetryDelay(It.Is(value => value == exception), It.Is(value => value <= maximumRetries))) .Returns(TimeSpan.FromMilliseconds(5)); - mockContainerClient.BlobClientSetMetadataAsyncCallback = (metadata, conditions, token) => - { - serviceCalls++; - throw exception; - }; - // To ensure that the test does not hang for the duration, set a timeout to force completion // after a shorter period of time. @@ -1206,9 +1489,7 @@ public void UpdateCheckpointAsyncRetriesAndSurfacesRetriableExceptionsWhenTheBlo var expectedServiceCalls = (maximumRetries + 1); var serviceCalls = 0; - var mockContainerClient = new MockBlobContainerClient(); var mockRetryPolicy = new Mock(); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, mockRetryPolicy.Object); var checkpoint = new EventProcessorCheckpoint { @@ -1226,11 +1507,15 @@ public void UpdateCheckpointAsyncRetriesAndSurfacesRetriableExceptionsWhenTheBlo .Setup(policy => policy.CalculateRetryDelay(It.Is(value => value == exception), It.Is(value => value <= maximumRetries))) .Returns(TimeSpan.FromMilliseconds(5)); - mockContainerClient.BlobClientUploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/checkpoint/pid", client => { - serviceCalls++; - throw exception; - }; + client.UploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => + { + serviceCalls++; + throw exception; + }; + }); + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, mockRetryPolicy.Object); // To ensure that the test does not hang for the duration, set a timeout to force completion // after a shorter period of time. @@ -1256,7 +1541,16 @@ public void UpdateCheckpointAsyncSurfacesNonRetriableExceptionsWhenTheBlobExists var serviceCalls = 0; var blobInfo = BlobsModelFactory.BlobInfo(new ETag($@"""{MatchingEtag}"""), DateTime.UtcNow); - var mockContainerClient = new MockBlobContainerClient { BlobInfo = blobInfo, BlobClientUploadBlobException = new Exception("Upload should not be called") }; + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/checkpoint/pid", client => + { + client.BlobInfo = blobInfo; + client.UploadBlobException = new Exception("Upload should not be called"); + client.SetMetadataAsyncCallback = (metadata, conditions, token) => + { + serviceCalls++; + throw exception; + }; + }); var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); var checkpoint = new EventProcessorCheckpoint @@ -1267,12 +1561,6 @@ public void UpdateCheckpointAsyncSurfacesNonRetriableExceptionsWhenTheBlobExists PartitionId = "pid" }; - mockContainerClient.BlobClientSetMetadataAsyncCallback = (metadata, conditions, token) => - { - serviceCalls++; - throw exception; - }; - // To ensure that the test does not hang for the duration, set a timeout to force completion // after a shorter period of time. @@ -1296,9 +1584,6 @@ public void UpdateCheckpointAsyncSurfacesNonRetriableExceptionsWhenTheBlobDoesNo var expectedServiceCalls = 1; var serviceCalls = 0; - var mockContainerClient = new MockBlobContainerClient(); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); - var checkpoint = new EventProcessorCheckpoint { FullyQualifiedNamespace = "ns", @@ -1307,11 +1592,15 @@ public void UpdateCheckpointAsyncSurfacesNonRetriableExceptionsWhenTheBlobDoesNo PartitionId = "pid" }; - mockContainerClient.BlobClientUploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/checkpoint/pid", client => { - serviceCalls++; - throw exception; - }; + client.UploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => + { + serviceCalls++; + throw exception; + }; + }); + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); // To ensure that the test does not hang for the duration, set a timeout to force completion // after a shorter period of time. @@ -1333,7 +1622,25 @@ public void UpdateCheckpointAsyncSurfacesNonRetriableExceptionsWhenTheBlobDoesNo public async Task UpdateCheckpointAsyncDelegatesTheCancellationTokenWhenTheBlobExists() { var blobInfo = BlobsModelFactory.BlobInfo(new ETag($@"""{MatchingEtag}"""), DateTime.UtcNow); - var mockContainerClient = new MockBlobContainerClient { BlobInfo = blobInfo, BlobClientUploadBlobException = new Exception("Upload should not be called") }; + + using var cancellationSource = new CancellationTokenSource(); + var stateBeforeCancellation = default(bool?); + var stateAfterCancellation = default(bool?); + + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/checkpoint/pid", client => + { + client.BlobInfo = blobInfo; + client.UploadBlobException = new Exception("Upload should not be called"); + client.SetMetadataAsyncCallback = (metadata, conditions, token) => + { + if (!stateBeforeCancellation.HasValue) + { + stateBeforeCancellation = token.IsCancellationRequested; + cancellationSource.Cancel(); + stateAfterCancellation = token.IsCancellationRequested; + } + }; + }); var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); var checkpoint = new EventProcessorCheckpoint @@ -1344,20 +1651,6 @@ public async Task UpdateCheckpointAsyncDelegatesTheCancellationTokenWhenTheBlobE PartitionId = "pid" }; - using var cancellationSource = new CancellationTokenSource(); - var stateBeforeCancellation = default(bool?); - var stateAfterCancellation = default(bool?); - - mockContainerClient.BlobClientSetMetadataAsyncCallback = (metadata, conditions, token) => - { - if (!stateBeforeCancellation.HasValue) - { - stateBeforeCancellation = token.IsCancellationRequested; - cancellationSource.Cancel(); - stateAfterCancellation = token.IsCancellationRequested; - } - }; - await checkpointStore.UpdateCheckpointAsync(checkpoint, new EventData(Array.Empty()), cancellationSource.Token); Assert.That(stateBeforeCancellation.HasValue, Is.True, "State before cancellation should have been captured."); @@ -1374,9 +1667,6 @@ public async Task UpdateCheckpointAsyncDelegatesTheCancellationTokenWhenTheBlobE [Test] public async Task UpdateCheckpointAsyncDelegatesTheCancellationTokenWhenTheBlobDoesNotExist() { - var mockContainerClient = new MockBlobContainerClient(); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); - var checkpoint = new EventProcessorCheckpoint { FullyQualifiedNamespace = "ns", @@ -1389,15 +1679,20 @@ public async Task UpdateCheckpointAsyncDelegatesTheCancellationTokenWhenTheBlobD var stateBeforeCancellation = default(bool?); var stateAfterCancellation = default(bool?); - mockContainerClient.BlobClientUploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/checkpoint/pid", client => { - if (!stateBeforeCancellation.HasValue) + client.UploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => { - stateBeforeCancellation = token.IsCancellationRequested; - cancellationSource.Cancel(); - stateAfterCancellation = token.IsCancellationRequested; - } - }; + if (!stateBeforeCancellation.HasValue) + { + stateBeforeCancellation = token.IsCancellationRequested; + cancellationSource.Cancel(); + stateAfterCancellation = token.IsCancellationRequested; + } + }; + }); + + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); await checkpointStore.UpdateCheckpointAsync(checkpoint, new EventData(Array.Empty()), cancellationSource.Token); @@ -1437,21 +1732,15 @@ private class MockBlobContainerClient : BlobContainerClient public override string AccountName { get; } public override string Name { get; } internal IEnumerable Blobs; - internal BlobInfo BlobInfo; - internal Exception BlobClientUploadBlobException; - internal Exception BlobClientSetMetadataException; internal Exception GetBlobsAsyncException; internal Action GetBlobsAsyncCallback; - internal Action, BlobRequestConditions, IProgress, AccessTier?, StorageTransferOptions, CancellationToken> BlobClientUploadAsyncCallback; - internal Action, BlobRequestConditions, CancellationToken> BlobClientSetMetadataAsyncCallback; + internal Dictionary BlobClients = new (); public MockBlobContainerClient(string accountName = "blobAccount", string containerName = "container", - Exception getBlobsAsyncException = null, - Exception blobClientUploadBlobException = null) + Exception getBlobsAsyncException = null) { GetBlobsAsyncException = getBlobsAsyncException; - BlobClientUploadBlobException = blobClientUploadBlobException; Blobs = Enumerable.Empty(); AccountName = accountName; Name = containerName; @@ -1466,37 +1755,37 @@ public override AsyncPageable GetBlobsAsync(BlobTraits traits = BlobTr throw GetBlobsAsyncException; } - return new MockAsyncPageable(Blobs); + return new MockAsyncPageable(Blobs.Where(b => prefix == null || b.Name.StartsWith(prefix, StringComparison.Ordinal))); } public override BlobClient GetBlobClient(string blobName) { - return new MockBlobClient(blobName, BlobInfo, BlobClientUploadBlobException, BlobClientSetMetadataException, BlobClientUploadAsyncCallback, BlobClientSetMetadataAsyncCallback); + return BlobClients[blobName]; + } + + internal MockBlobContainerClient AddBlobClient(string name, Action configure) + { + var client = new MockBlobClient(name); + configure(client); + BlobClients[name] = client; + return this; } } private class MockBlobClient : BlobClient { public override string Name { get; } + internal BlobInfo BlobInfo; - internal Exception BlobClientUploadBlobException; + internal Exception UploadBlobException; internal Exception BlobClientSetMetadataException; - private Action, BlobRequestConditions, IProgress, AccessTier?, StorageTransferOptions, CancellationToken> UploadAsyncCallback; - private Action, BlobRequestConditions, CancellationToken> SetMetadataAsyncCallback; - - public MockBlobClient(string blobName, - BlobInfo blobInfo = null, - Exception blobClientUploadBlobException = null, - Exception blobClientSetMetadataException = null, - Action, BlobRequestConditions, IProgress, AccessTier?, StorageTransferOptions, CancellationToken> uploadAsyncCallback = null, - Action, BlobRequestConditions, CancellationToken> setMetadataAsyncCallback = null) - { - BlobClientUploadBlobException = blobClientUploadBlobException; - BlobClientSetMetadataException = blobClientSetMetadataException; - UploadAsyncCallback = uploadAsyncCallback; - SetMetadataAsyncCallback = setMetadataAsyncCallback; + internal byte[] Content; + internal Action, BlobRequestConditions, IProgress, AccessTier?, StorageTransferOptions, CancellationToken> UploadAsyncCallback; + internal Action, BlobRequestConditions, CancellationToken> SetMetadataAsyncCallback; + + public MockBlobClient(string blobName) + { Name = blobName; - BlobInfo = blobInfo; } public override Task> SetMetadataAsync(IDictionary metadata, BlobRequestConditions conditions = null, CancellationToken cancellationToken = default(CancellationToken)) @@ -1525,9 +1814,9 @@ public override Task> UploadAsync(Stream content, Blob { UploadAsyncCallback?.Invoke(content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, cancellationToken); - if (BlobClientUploadBlobException != null) + if (UploadBlobException != null) { - throw BlobClientUploadBlobException; + throw UploadBlobException; } if (BlobInfo != null) @@ -1540,6 +1829,12 @@ public override Task> UploadAsync(Stream content, Blob BlobsModelFactory.BlobContentInfo(new ETag("etag"), DateTime.UtcNow, new byte[] { }, string.Empty, 0L), Mock.Of())); } + + public override async Task DownloadToAsync(Stream destination, CancellationToken cancellationToken) + { + await destination.WriteAsync(Content, 0, Content.Length, cancellationToken); + return Mock.Of(); + } } private class MockAsyncPageable : AsyncPageable diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs index 2e73deaa5f37a..2f27f1cc54554 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs @@ -5,6 +5,8 @@ using System.Collections.Generic; using System.Globalization; using System.IO; +using System.Linq; +using System.Text.Json; using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; @@ -14,6 +16,7 @@ using Azure.Messaging.EventHubs.Primitives; using Azure.Storage.Blobs; using Azure.Storage.Blobs.Models; +using Azure.Storage.Blobs.Specialized; namespace Azure.Messaging.EventHubs.Processor { @@ -41,6 +44,13 @@ internal partial class BlobsCheckpointStore : StorageManager /// private const string CheckpointPrefix = "{0}/{1}/{2}/checkpoint/"; + /// + /// Specifies a string that filters the results to return only legacy checkpoint blobs whose name begins + /// with the specified prefix. + /// + /// + private const string LegacyCheckpointPrefix = "{0}/{1}/{2}/"; + /// /// Specifies a string that filters the results to return only ownership blobs whose name begins /// with the specified prefix. @@ -61,21 +71,30 @@ internal partial class BlobsCheckpointStore : StorageManager /// private EventHubsRetryPolicy RetryPolicy { get; } + /// + /// Indicates whether to read legacy checkpoints when no current version checkpoints are available. + /// + /// + private bool ReadLegacyCheckpoints { get; } + /// /// Initializes a new instance of the class. /// /// /// The client used to interact with the Azure Blob Storage service. /// The retry policy to use as the basis for interacting with the Storage Blobs service. + /// Indicates whether to read legacy checkpoint when no current version checkpoint is available for a partition. /// public BlobsCheckpointStore(BlobContainerClient blobContainerClient, - EventHubsRetryPolicy retryPolicy) + EventHubsRetryPolicy retryPolicy, + bool readLegacyCheckpoints = false) { Argument.AssertNotNull(blobContainerClient, nameof(blobContainerClient)); Argument.AssertNotNull(retryPolicy, nameof(retryPolicy)); ContainerClient = blobContainerClient; RetryPolicy = retryPolicy; + ReadLegacyCheckpoints = readLegacyCheckpoints; BlobsCheckpointStoreCreated(nameof(BlobsCheckpointStore), blobContainerClient.AccountName, blobContainerClient.Name); } @@ -275,11 +294,9 @@ public override async Task> ListCheckpoint cancellationToken.ThrowIfCancellationRequested(); ListCheckpointsStart(fullyQualifiedNamespace, eventHubName, consumerGroup); - var prefix = string.Format(CultureInfo.InvariantCulture, CheckpointPrefix, fullyQualifiedNamespace.ToLowerInvariant(), eventHubName.ToLowerInvariant(), consumerGroup.ToLowerInvariant()); - var checkpointCount = 0; - - async Task> listCheckpointsAsync(CancellationToken listCheckpointsToken) + async Task> listCheckpointsAsync(CancellationToken listCheckpointsToken) { + var prefix = string.Format(CultureInfo.InvariantCulture, CheckpointPrefix, fullyQualifiedNamespace.ToLowerInvariant(), eventHubName.ToLowerInvariant(), consumerGroup.ToLowerInvariant()); var checkpoints = new List(); await foreach (BlobItem blob in ContainerClient.GetBlobsAsync(traits: BlobTraits.Metadata, prefix: prefix, cancellationToken: listCheckpointsToken).ConfigureAwait(false)) @@ -322,13 +339,81 @@ async Task> listCheckpointsAsync(Cancellat } } - checkpointCount = checkpoints.Count; return checkpoints; }; + async Task> listLegacyCheckpointsAsync(List existingCheckpoints, CancellationToken listCheckpointsToken) + { + var legacyPrefix = string.Format(CultureInfo.InvariantCulture, LegacyCheckpointPrefix, fullyQualifiedNamespace.ToLowerInvariant(), eventHubName.ToLowerInvariant(), consumerGroup.ToLowerInvariant()); + var checkpoints = new List(); + + await foreach (BlobItem blob in ContainerClient.GetBlobsAsync(prefix: legacyPrefix, cancellationToken: listCheckpointsToken).ConfigureAwait(false)) + { + // Skip new checkpoints and empty blobs + if (blob.Properties.ContentLength == 0) + { + continue; + } + + var partitionId = blob.Name.Substring(legacyPrefix.Length); + + // Check whether there is already a checkpoint for this partition id + if (existingCheckpoints.Any(existingCheckpoint => string.Equals(existingCheckpoint.PartitionId, partitionId, StringComparison.Ordinal))) + { + continue; + } + + var startingPosition = default(EventPosition?); + + BlobBaseClient blobClient = ContainerClient.GetBlobClient(blob.Name); + using var memoryStream = new MemoryStream(); + await blobClient.DownloadToAsync(memoryStream, listCheckpointsToken).ConfigureAwait(false); + + TryReadLegacyCheckpoint( + memoryStream.GetBuffer().AsSpan(0, (int)memoryStream.Length), + out long? offset, + out long? sequenceNumber); + + if (offset.HasValue) + { + startingPosition = EventPosition.FromOffset(offset.Value, false); + } + else if (sequenceNumber.HasValue) + { + startingPosition = EventPosition.FromSequenceNumber(sequenceNumber.Value, false); + } + + if (startingPosition.HasValue) + { + checkpoints.Add(new BlobStorageCheckpoint + { + FullyQualifiedNamespace = fullyQualifiedNamespace, + EventHubName = eventHubName, + ConsumerGroup = consumerGroup, + PartitionId = partitionId, + StartingPosition = startingPosition.Value, + Offset = offset, + SequenceNumber = sequenceNumber + }); + } + else + { + InvalidCheckpointFound(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup); + } + } + + return checkpoints; + }; + + List checkpoints = null; try { - return await ApplyRetryPolicy(listCheckpointsAsync, cancellationToken).ConfigureAwait(false); + checkpoints = await ApplyRetryPolicy(listCheckpointsAsync, cancellationToken).ConfigureAwait(false); + if (ReadLegacyCheckpoints) + { + checkpoints.AddRange(await ApplyRetryPolicy(ct => listLegacyCheckpointsAsync(checkpoints, ct), cancellationToken).ConfigureAwait(false)); + } + return checkpoints; } catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound) { @@ -342,7 +427,7 @@ async Task> listCheckpointsAsync(Cancellat } finally { - ListCheckpointsComplete(fullyQualifiedNamespace, eventHubName, consumerGroup, checkpointCount); + ListCheckpointsComplete(fullyQualifiedNamespace, eventHubName, consumerGroup, checkpoints?.Count ?? 0); } } @@ -493,6 +578,68 @@ async Task wrapper(CancellationToken token) return result; } + /// + /// Attempts to read a legacy checkpoint JSON format and extract an offset and a sequence number + /// + /// The binary representation of the checkpoint JSON. + /// The parsed offset. null if not found. + /// The parsed sequence number. null if not found. + /// + /// Sample checkpoint JSON: + /// { + /// "PartitionId":"0", + /// "Owner":"681d365b-de1b-4288-9733-76294e17daf0", + /// "Token":"2d0c4276-827d-4ca4-a345-729caeca3b82", + /// "Epoch":386, + /// "Offset":"8591964920", + /// "SequenceNumber":960180 + /// } + /// + private static void TryReadLegacyCheckpoint(Span data, out long? offset, out long? sequenceNumber) + { + offset = null; + sequenceNumber = null; + + var jsonReader = new Utf8JsonReader(data); + try + { + if (!jsonReader.Read() || jsonReader.TokenType != JsonTokenType.StartObject) return; + + while (jsonReader.Read() && jsonReader.TokenType == JsonTokenType.PropertyName) + { + switch (jsonReader.GetString()) + { + case "Offset": + if (!jsonReader.Read() || + jsonReader.GetString() is not string offsetString || + !long.TryParse(offsetString, out long offsetValue)) + { + return; + } + + offset = offsetValue; + break; + case "SequenceNumber": + if (!jsonReader.Read() || + !jsonReader.TryGetInt64(out long sequenceNumberValue)) + { + return; + } + + sequenceNumber = sequenceNumberValue; + break; + default: + jsonReader.Skip(); + break; + } + } + } + catch (JsonException) + { + // Ignore this because if the data is malformed, it will be treated as if the checkpoint didn't exist. + } + } + /// /// Indicates that an attempt to retrieve a list of ownership has completed. ///