diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/Azure.Messaging.EventHubs.Processor.sln b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/Azure.Messaging.EventHubs.Processor.sln index 29a6ec78e5ec4..5fb4ca4c149b6 100755 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/Azure.Messaging.EventHubs.Processor.sln +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/Azure.Messaging.EventHubs.Processor.sln @@ -11,6 +11,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Core.TestFramework", EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "External", "External", "{797FF941-76FD-45FD-AC17-A73DFE2BA621}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Azure.Messaging.EventHubs", "..\Azure.Messaging.EventHubs\src\Azure.Messaging.EventHubs.csproj", "{D82CC3DD-E6CC-4688-BDA8-16E7A303C35B}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -29,12 +31,17 @@ Global {7DFF0E65-DC9A-410D-9A11-AD6A06860FE1}.Debug|Any CPU.Build.0 = Debug|Any CPU {7DFF0E65-DC9A-410D-9A11-AD6A06860FE1}.Release|Any CPU.ActiveCfg = Release|Any CPU {7DFF0E65-DC9A-410D-9A11-AD6A06860FE1}.Release|Any CPU.Build.0 = Release|Any CPU + {D82CC3DD-E6CC-4688-BDA8-16E7A303C35B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D82CC3DD-E6CC-4688-BDA8-16E7A303C35B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D82CC3DD-E6CC-4688-BDA8-16E7A303C35B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D82CC3DD-E6CC-4688-BDA8-16E7A303C35B}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE EndGlobalSection GlobalSection(NestedProjects) = preSolution {7DFF0E65-DC9A-410D-9A11-AD6A06860FE1} = {797FF941-76FD-45FD-AC17-A73DFE2BA621} + {D82CC3DD-E6CC-4688-BDA8-16E7A303C35B} = {797FF941-76FD-45FD-AC17-A73DFE2BA621} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {44BD3BD5-61DF-464D-8627-E00B0BC4B3A3} diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj index 9b4a53d270560..6f06c7f05c43e 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Azure.Messaging.EventHubs.Processor.csproj @@ -13,7 +13,10 @@ - + + + + diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/EventProcessorClientEventSource.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/EventProcessorClientEventSource.cs old mode 100755 new mode 100644 index 91184e9dc24f2..7545c37c53f19 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/EventProcessorClientEventSource.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/Diagnostics/EventProcessorClientEventSource.cs @@ -178,5 +178,26 @@ public virtual void UpdateCheckpointError(string partitionId, WriteEvent(25, partitionId ?? string.Empty, identifier ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, errorMessage ?? string.Empty); } } + + /// + /// Indicates that the process of cleaning up after startup validation has experienced an exception. + /// + /// + /// A unique name used to identify the event processor. + /// The name of the Event Hub that the processor is associated with. + /// The name of the consumer group that the processor is associated with. + /// The message for the exception that occurred. + /// + [Event(26, Level = EventLevel.Error, Message = "An exception occurred while attempting to perform cleanup after validating the processor configuration and permissions during startup for processor instance with identifier '{1}' for Event Hub: {2} and Consumer Group: {3}. Error Message: '{4}'")] + public virtual void ValidationCleanupError(string identifier, + string eventHubName, + string consumerGroup, + string errorMessage) + { + if (IsEnabled()) + { + WriteEvent(26, identifier ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, errorMessage ?? string.Empty); + } + } } } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs index e5c20d66f52c0..8864c73564083 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs @@ -7,10 +7,12 @@ using System.ComponentModel; using System.Diagnostics.CodeAnalysis; using System.Globalization; +using System.IO; using System.Runtime.ExceptionServices; using System.Threading; using System.Threading.Tasks; using Azure.Core; +using Azure.Core.Pipeline; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Core; using Azure.Messaging.EventHubs.Diagnostics; @@ -53,6 +55,9 @@ public class EventProcessorClient : EventProcessor /// The primitive for synchronizing access during start and set handler operations. private readonly SemaphoreSlim ProcessorStatusGuard = new SemaphoreSlim(1, 1); + /// The client provided to perform storage operations related to checkpoints and ownership. + private BlobContainerClient _containerClient; + /// The handler to be called just before event processing starts for a given partition. private Func _partitionInitializingAsync; @@ -398,6 +403,8 @@ public EventProcessorClient(BlobContainerClient checkpointStore, EventProcessorClientOptions clientOptions) : base((clientOptions ?? DefaultClientOptions).CacheEventCount, consumerGroup, connectionString, eventHubName, CreateOptions(clientOptions)) { Argument.AssertNotNull(checkpointStore, nameof(checkpointStore)); + + _containerClient = checkpointStore; StorageManager = CreateStorageManager(checkpointStore); } @@ -425,6 +432,8 @@ public EventProcessorClient(BlobContainerClient checkpointStore, EventProcessorClientOptions clientOptions = default) : base((clientOptions ?? DefaultClientOptions).CacheEventCount, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, CreateOptions(clientOptions)) { Argument.AssertNotNull(checkpointStore, nameof(checkpointStore)); + + _containerClient = checkpointStore; StorageManager = CreateStorageManager(checkpointStore); } @@ -452,6 +461,8 @@ public EventProcessorClient(BlobContainerClient checkpointStore, EventProcessorClientOptions clientOptions = default) : base((clientOptions ?? DefaultClientOptions).CacheEventCount, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, CreateOptions(clientOptions)) { Argument.AssertNotNull(checkpointStore, nameof(checkpointStore)); + + _containerClient = checkpointStore; StorageManager = CreateStorageManager(checkpointStore); } @@ -479,6 +490,8 @@ public EventProcessorClient(BlobContainerClient checkpointStore, EventProcessorClientOptions clientOptions = default) : base((clientOptions ?? DefaultClientOptions).CacheEventCount, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, CreateOptions(clientOptions)) { Argument.AssertNotNull(checkpointStore, nameof(checkpointStore)); + + _containerClient = checkpointStore; StorageManager = CreateStorageManager(checkpointStore); } @@ -587,40 +600,15 @@ protected EventProcessorClient() : base() /// /// A instance to signal the request to cancel the start operation. This won't affect the once it starts running. /// - public override async Task StartProcessingAsync(CancellationToken cancellationToken = default) - { - cancellationToken.ThrowIfCancellationRequested(); - var releaseGuard = false; - - try - { - await ProcessorStatusGuard.WaitAsync(cancellationToken).ConfigureAwait(false); - releaseGuard = true; - - if (_processEventAsync == null) - { - throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.CannotStartEventProcessorWithoutHandler, nameof(ProcessEventAsync))); - } - - if (_processErrorAsync == null) - { - throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.CannotStartEventProcessorWithoutHandler, nameof(ProcessErrorAsync))); - } - - await base.StartProcessingAsync(cancellationToken).ConfigureAwait(false); - } - catch (OperationCanceledException) - { - throw new TaskCanceledException(); - } - finally - { - if (releaseGuard) - { - ProcessorStatusGuard.Release(); - } - } - } + /// + /// As the processor starts, it will attempt to detect configuration and permissions errors that would prevent it from + /// being able to recover without intervention. For example, an incorrect connection string or the inability to write to the + /// storage container would be detected. These exceptions will be packaged as an , and will cause + /// to fail. + /// + /// + public override async Task StartProcessingAsync(CancellationToken cancellationToken = default) => + await StartProcessingInternalAsync(true, cancellationToken).ConfigureAwait(false); /// /// Signals the to begin processing events. Should this method be called while the processor @@ -629,40 +617,15 @@ public override async Task StartProcessingAsync(CancellationToken cancellationTo /// /// A instance to signal the request to cancel the start operation. This won't affect the once it starts running. /// - public override void StartProcessing(CancellationToken cancellationToken = default) - { - cancellationToken.ThrowIfCancellationRequested(); - var releaseGuard = false; - - try - { - ProcessorStatusGuard.Wait(cancellationToken); - releaseGuard = true; - - if (_processEventAsync == null) - { - throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.CannotStartEventProcessorWithoutHandler, nameof(ProcessEventAsync))); - } - - if (_processErrorAsync == null) - { - throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.CannotStartEventProcessorWithoutHandler, nameof(ProcessErrorAsync))); - } - - base.StartProcessing(cancellationToken); - } - catch (OperationCanceledException) - { - throw new TaskCanceledException(); - } - finally - { - if (releaseGuard) - { - ProcessorStatusGuard.Release(); - } - } - } + /// + /// As the processor starts, it will attempt to detect configuration and permissions errors that would prevent it from + /// being able to recover without intervention. For example, an incorrect connection string or the inability to write to the + /// storage container would be detected. These exceptions will be packaged as an , and will cause + /// to fail. + /// + /// + public override void StartProcessing(CancellationToken cancellationToken = default) => + StartProcessingInternalAsync(false, cancellationToken).EnsureCompleted(); /// /// Signals the to stop processing events. Should this method be called while the processor @@ -808,23 +771,8 @@ internal Task UpdateCheckpointAsync(EventData eventData, /// starting location set. /// /// - protected override async Task> ListCheckpointsAsync(CancellationToken cancellationToken) - { - var checkpoints = await StorageManager.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, cancellationToken).ConfigureAwait(false); - - // If there was no initialization handler, no custom starting positions - // could have been specified. Return the checkpoints without further processing. - - if (_partitionInitializingAsync == null) - { - return checkpoints; - } - - // Process the checkpoints to inject mock checkpoints for partitions that - // specify a custom default and do not have an actual checkpoint. - - return ProcessCheckpointStartingPositions(checkpoints); - } + protected override Task> ListCheckpointsAsync(CancellationToken cancellationToken) => + throw new InvalidOperationException(Resources.ListCheckpointsAsyncObsolete); /// /// Returns a checkpoint for the Event Hub, consumer group, and partition ID associated with the @@ -1092,48 +1040,138 @@ protected override async Task OnPartitionProcessingStoppedAsync(EventProcessorPa } /// - /// Creates a to use for interacting with durable storage. - /// - /// - /// The client responsible for interaction with durable storage, responsible for persisting checkpoints and load-balancing state. - /// - /// A with the requested configuration. - /// - private StorageManager CreateStorageManager(BlobContainerClient checkpointStore) => new BlobsCheckpointStore(checkpointStore, RetryPolicy); - - /// - /// Processes the starting positions for checkpoints, ensuring that any overrides set by the - /// handler are respected when no natural checkpoint exists for the partition. + /// Signals the to begin processing events. Should this method be called while the processor + /// is running, no action is taken. /// /// - /// The checkpoint set to process. + /// When true, the method will be executed asynchronously; otherwise, it will execute synchronously. + /// A instance to signal the request to cancel the start operation. This won't affect the once it starts running. /// - /// An enumerable consisting of the and a set of artificial checkpoints for any overrides applied to the starting position. - /// - private IEnumerable ProcessCheckpointStartingPositions(IEnumerable sourceCheckpoints) + private async Task StartProcessingInternalAsync(bool async, + CancellationToken cancellationToken) { - var knownCheckpoints = new HashSet(); + cancellationToken.ThrowIfCancellationRequested(); + + var capturedValidationException = default(Exception); + var releaseGuard = false; + + try + { + // Acquire the semaphore used to synchronize processor starts and stops, respecting + // the async flag. When this is held, the state of the processor is stable. + + if (async) + { + await ProcessorStatusGuard.WaitAsync(cancellationToken).ConfigureAwait(false); + } + else + { + ProcessorStatusGuard.Wait(cancellationToken); + } + + releaseGuard = true; + + // Validate that the required handlers are assigned. + + if (_processEventAsync == null) + { + throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.CannotStartEventProcessorWithoutHandler, nameof(ProcessEventAsync))); + } + + if (_processErrorAsync == null) + { + throw new InvalidOperationException(string.Format(CultureInfo.CurrentCulture, Resources.CannotStartEventProcessorWithoutHandler, nameof(ProcessErrorAsync))); + } + + // Allow the base class to perform its startup operation; this will include validation for + // the basic Event Hubs and storage configuration. + + if (async) + { + await base.StartProcessingAsync(cancellationToken).ConfigureAwait(false); + } + else + { + base.StartProcessing(cancellationToken); + } - // Return the checkpoints and track which partitions they belong to. + // Because the base class has no understanding of what concrete storage type is in use and + // does not directly make use of some of its operations, such as writing a checkpoint. Validate + // these additional needs if a storage client is available. - foreach (var checkpoint in sourceCheckpoints) + if (_containerClient != null) + { + try + { + if (async) + { + await ValidateStartupAsync(async, _containerClient, cancellationToken).ConfigureAwait(false); + } + else + { + ValidateStartupAsync(async, _containerClient, cancellationToken).EnsureCompleted(); + } + } + catch (AggregateException ex) + { + // Capture the validation exception and log, but do not throw. Because this is + // a fatal exception and the processing task was already started, StopProcessing + // will need to be called, which requires the semaphore. The validation exception + // will be handled after the start operation has officially completed and the + // semaphore has been released. + + capturedValidationException = ex.Flatten(); + } + } + } + catch (OperationCanceledException) { - knownCheckpoints.Add(checkpoint.PartitionId); - yield return checkpoint; + throw new TaskCanceledException(); + } + finally + { + if (releaseGuard) + { + ProcessorStatusGuard.Release(); + } } - // For any partitions with custom default starting point, emit an artificial - // checkpoint if a natural checkpoint did not exist. + // If there was a validation exception captured, then stop the processor now + // that it is safe to do so. - foreach (var partition in PartitionStartingPositionDefaults.Keys) + if (capturedValidationException != null) { - if (!knownCheckpoints.Contains(partition)) + try + { + if (async) + { + await StopProcessingAsync(CancellationToken.None).ConfigureAwait(false); + } + else + { + StopProcessing(CancellationToken.None); + } + } + catch { - yield return CreateCheckpointWithDefaultStartingPosition(partition); + // An exception is expected here, as the processor configuration was invalid and + // processing was canceled. It will have already been logged; ignore it here. } + + ExceptionDispatchInfo.Capture(capturedValidationException).Throw(); } } + /// + /// Creates a to use for interacting with durable storage. + /// + /// + /// The client responsible for interaction with durable storage, responsible for persisting checkpoints and load-balancing state. + /// + /// A with the requested configuration. + /// + private StorageManager CreateStorageManager(BlobContainerClient checkpointStore) => new BlobsCheckpointStore(checkpointStore, RetryPolicy); + /// /// Creates a checkpoint with a default starting position set. /// @@ -1154,6 +1192,71 @@ private EventProcessorCheckpoint CreateCheckpointWithDefaultStartingPosition(str }; } + /// + /// Performs the tasks needed to validate basic configuration and permissions of the dependencies needed for + /// the processor to function. + /// + /// + /// When true, the method will be executed asynchronously; otherwise, it will execute synchronously. + /// The to use for validating storage operations. + /// A instance to signal the request to cancel the start operation. + /// + /// Any validation failures will result in an aggregate exception. + /// + private async Task ValidateStartupAsync(bool async, + BlobContainerClient containerClient, + CancellationToken cancellationToken = default) + { + var blobClient = containerClient.GetBlobClient(Guid.NewGuid().ToString("N")); + + // Write an blob with metadata, simulating the approach used for checkpoint and ownership + // data creation. + + try + { + using var blobContent = new MemoryStream(Array.Empty()); + var blobMetadata = new Dictionary {{ "name", blobClient.Name }}; + + if (async) + { + await blobClient.UploadAsync(blobContent, metadata: blobMetadata, cancellationToken: cancellationToken).ConfigureAwait(false); + } + else + { + blobClient.Upload(blobContent, metadata: blobMetadata, cancellationToken: cancellationToken); + } + } + catch (OperationCanceledException) + { + throw; + } + catch (Exception ex) + { + throw new AggregateException(ex); + } + finally + { + // Remove the test blob if written; do so without respecting a cancellation request to + // ensure that the container is left in a consistent state. + + try + { + if (async) + { + await blobClient.DeleteIfExistsAsync(cancellationToken: CancellationToken.None).ConfigureAwait(false); + } + else + { + blobClient.DeleteIfExists(cancellationToken: CancellationToken.None); + } + } + catch (Exception ex) + { + Logger.ValidationCleanupError(Identifier, EventHubName, ConsumerGroup, ex.Message); + } + } + } + /// /// Invokes a specified action only if this instance is not running. /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Diagnostics/DiagnosticsTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Diagnostics/DiagnosticsTests.cs index f0574bd250fcf..7cc507c01a6d4 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Diagnostics/DiagnosticsTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Diagnostics/DiagnosticsTests.cs @@ -11,12 +11,11 @@ using Azure.Messaging.EventHubs.Diagnostics; using Azure.Messaging.EventHubs.Primitives; using Azure.Messaging.EventHubs.Processor.Diagnostics; -using Azure.Messaging.EventHubs.Tests; using Moq; using Moq.Protected; using NUnit.Framework; -namespace Azure.Messaging.EventHubs.Processor.Tests +namespace Azure.Messaging.EventHubs.Tests { /// /// The suite of tests for validating the diagnostics instrumentation diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientLiveTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientLiveTests.cs index 07b45e7d2cf32..4f87159442199 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientLiveTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientLiveTests.cs @@ -13,6 +13,7 @@ using Azure.Messaging.EventHubs.Primitives; using Azure.Messaging.EventHubs.Processor; using Azure.Messaging.EventHubs.Producer; +using Azure.Storage.Blobs; using Moq; using NUnit.Framework; @@ -531,6 +532,184 @@ public async Task ProcessorClientBeginsWithTheNextEventAfterCheckpointing() } } + /// + /// Verifies that the can read a set of published events. + /// + /// + [Test] + [TestCase(true)] + [TestCase(false)] + public async Task ProcessorClientDetectsAnInvalidEventHubsConnectionString(bool async) + { + // Setup the environment. + + await using EventHubScope scope = await EventHubScope.CreateAsync(2); + var connectionString = "Endpoint=sb://fake.servicebus.windows.net/;SharedAccessKeyName=FakeSharedAccessKey;SharedAccessKey=<< FAKE >>"; + + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + + // Create the processor and attempt to start. + + var processor = new EventProcessorClient(Mock.Of(), EventHubConsumerClient.DefaultConsumerGroupName, connectionString, scope.EventHubName); + processor.ProcessErrorAsync += _ => Task.CompletedTask; + processor.ProcessEventAsync += _ => Task.CompletedTask; + + if (async) + { + Assert.That(async () => await processor.StartProcessingAsync(cancellationSource.Token), Throws.InstanceOf()); + } + else + { + Assert.That(() => processor.StartProcessing(cancellationSource.Token), Throws.InstanceOf()); + } + + await processor.StopProcessingAsync(cancellationSource.Token); + cancellationSource.Cancel(); + } + + /// + /// Verifies that the can read a set of published events. + /// + /// + [Test] + [TestCase(true)] + [TestCase(false)] + public async Task ProcessorClientDetectsAnInvalidEventHubName(bool async) + { + // Setup the environment. + + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + + // Create the processor and attempt to start. + + var processor = new EventProcessorClient(Mock.Of(), EventHubConsumerClient.DefaultConsumerGroupName, EventHubsTestEnvironment.Instance.EventHubsConnectionString, "fake"); + processor.ProcessErrorAsync += _ => Task.CompletedTask; + processor.ProcessEventAsync += _ => Task.CompletedTask; + + if (async) + { + Assert.That(async () => await processor.StartProcessingAsync(cancellationSource.Token), Throws.InstanceOf()); + } + else + { + Assert.That(() => processor.StartProcessing(cancellationSource.Token), Throws.InstanceOf()); + } + + await processor.StopProcessingAsync(cancellationSource.Token); + cancellationSource.Cancel(); + } + + /// + /// Verifies that the can read a set of published events. + /// + /// + [Test] + [TestCase(true)] + [TestCase(false)] + public async Task ProcessorClientDetectsAnInvalidConsumerGroup(bool async) + { + // Setup the environment. + + await using EventHubScope scope = await EventHubScope.CreateAsync(2); + + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + + // Create the processor and attempt to start. + + var processor = new EventProcessorClient(Mock.Of(), "fake", EventHubsTestEnvironment.Instance.EventHubsConnectionString, scope.EventHubName); + processor.ProcessErrorAsync += _ => Task.CompletedTask; + processor.ProcessEventAsync += _ => Task.CompletedTask; + + if (async) + { + Assert.That(async () => await processor.StartProcessingAsync(cancellationSource.Token), Throws.InstanceOf()); + } + else + { + Assert.That(() => processor.StartProcessing(cancellationSource.Token), Throws.InstanceOf()); + } + + await processor.StopProcessingAsync(cancellationSource.Token); + cancellationSource.Cancel(); + } + + /// + /// Verifies that the can read a set of published events. + /// + /// + [Test] + [TestCase(true)] + [TestCase(false)] + public async Task ProcessorClientDetectsAnInvalidStorageConnectionString(bool async) + { + // Setup the environment. + + await using EventHubScope eventHubScope = await EventHubScope.CreateAsync(2); + await using StorageScope storageScope = await StorageScope.CreateAsync(); + + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + + // Create the processor and attempt to start. + + var storageConnectionString = StorageTestEnvironment.Instance.StorageConnectionString.Replace(StorageTestEnvironment.Instance.StorageEndpointSuffix, "fake.com"); + var containerClient = new BlobContainerClient(storageConnectionString, storageScope.ContainerName); + var processor = new EventProcessorClient(containerClient, eventHubScope.ConsumerGroups[0], EventHubsTestEnvironment.Instance.EventHubsConnectionString, eventHubScope.EventHubName); + processor.ProcessErrorAsync += _ => Task.CompletedTask; + processor.ProcessEventAsync += _ => Task.CompletedTask; + + if (async) + { + Assert.That(async () => await processor.StartProcessingAsync(cancellationSource.Token), Throws.InstanceOf()); + } + else + { + Assert.That(() => processor.StartProcessing(cancellationSource.Token), Throws.InstanceOf()); + } + + await processor.StopProcessingAsync(cancellationSource.Token); + cancellationSource.Cancel(); + } + + /// + /// Verifies that the can read a set of published events. + /// + /// + [Test] + [TestCase(true)] + [TestCase(false)] + public async Task ProcessorClientDetectsAnInvalidStorageContainer(bool async) + { + // Setup the environment. + + await using EventHubScope eventHubScope = await EventHubScope.CreateAsync(2); + + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + + // Create the processor and attempt to start. + + var containerClient = new BlobContainerClient(StorageTestEnvironment.Instance.StorageConnectionString, "fake"); + var processor = new EventProcessorClient(containerClient, eventHubScope.ConsumerGroups[0], EventHubsTestEnvironment.Instance.EventHubsConnectionString, eventHubScope.EventHubName); + processor.ProcessErrorAsync += _ => Task.CompletedTask; + processor.ProcessEventAsync += _ => Task.CompletedTask; + + if (async) + { + Assert.That(async () => await processor.StartProcessingAsync(cancellationSource.Token), Throws.InstanceOf()); + } + else + { + Assert.That(() => processor.StartProcessing(cancellationSource.Token), Throws.InstanceOf()); + } + + await processor.StopProcessingAsync(cancellationSource.Token); + cancellationSource.Cancel(); + } + /// /// Creates an that uses mock storage and /// a connection based on a connection string. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs index 34b4aef747070..d37bcce2365ed 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/Processor/EventProcessorClientTests.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Reflection; using System.Threading; @@ -13,7 +14,9 @@ using Azure.Messaging.EventHubs.Primitives; using Azure.Messaging.EventHubs.Processor; using Azure.Messaging.EventHubs.Processor.Diagnostics; +using Azure.Storage; using Azure.Storage.Blobs; +using Azure.Storage.Blobs.Models; using Moq; using NUnit.Framework; @@ -390,6 +393,103 @@ void assertStartProcessing() cancellationSource.Cancel(); } + /// + /// Verifies functionality of the + /// and methods. + /// + /// + [Test] + [TestCase(true)] + [TestCase(false)] + public async Task StartProcessingValidatesBlobsCanBeWritten(bool async) + { + using var cancellationSource = new CancellationTokenSource(); + + var capturedException = default(Exception); + var expectedException = new AccessViolationException("Stop violating my access!"); + var mockContainerClient = new Mock(); + var mockBlobClient = new MockBlobClient("dummy") { UploadException = expectedException }; + + mockContainerClient + .Setup(client => client.GetBlobClient(It.IsAny())) + .Returns(mockBlobClient); + + var processorClient = new TestEventProcessorClient(mockContainerClient.Object, "consumerGroup", "namespace", "eventHub", Mock.Of(), Mock.Of(), default); + processorClient.ProcessEventAsync += eventArgs => Task.CompletedTask; + processorClient.ProcessErrorAsync += eventArgs => Task.CompletedTask; + + try + { + if (async) + { + await processorClient.StartProcessingAsync(cancellationSource.Token); + } + else + { + processorClient.StartProcessing(cancellationSource.Token); + } + } + catch (Exception ex) + { + capturedException = ex; + } + + Assert.That(capturedException, Is.Not.Null, "An exception should have been thrown."); + Assert.That(capturedException, Is.InstanceOf(), "A validation exception should be surfaced as an AggregateException."); + Assert.That(((AggregateException)capturedException).InnerExceptions.Count, Is.EqualTo(1), "There should have been a single validation exception."); + + var innerException = ((AggregateException)capturedException).InnerExceptions.First(); + Assert.That(innerException, Is.SameAs(expectedException), "The source of the validation exception should have been exposed."); + Assert.That(processorClient.IsRunning, Is.False, "The processor should not be running after a validation exception."); + } + + /// + /// Verifies functionality of the + /// and methods. + /// + /// + [Test] + [TestCase(true)] + [TestCase(false)] + public async Task StartProcessingLogsWhenValidationCleanupFails(bool async) + { + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + + var expectedException = new AccessViolationException("Stop violating my access!"); + var mockLogger = new Mock(); + var mockContainerClient = new Mock(); + var mockBlobClient = new MockBlobClient("dummy") { DeleteException = expectedException }; + + mockContainerClient + .Setup(client => client.GetBlobClient(It.IsAny())) + .Returns(mockBlobClient); + + var processorClient = new TestEventProcessorClient(mockContainerClient.Object, "consumerGroup", "namespace", "eventHub", Mock.Of(), Mock.Of(), default); + + processorClient.Logger = mockLogger.Object; + processorClient.ProcessEventAsync += eventArgs => Task.CompletedTask; + processorClient.ProcessErrorAsync += eventArgs => Task.CompletedTask; + + if (async) + { + Assert.That(async () => await processorClient.StartProcessingAsync(cancellationSource.Token), Throws.Nothing); + } + else + { + Assert.That(() => processorClient.StartProcessing(cancellationSource.Token), Throws.Nothing); + } + + mockLogger.Verify(log => log.ValidationCleanupError( + processorClient.Identifier, + processorClient.EventHubName, + processorClient.ConsumerGroup, expectedException.Message), + Times.Once); + + await processorClient.StopProcessingAsync(cancellationSource.Token).IgnoreExceptions(); + cancellationSource.Cancel(); + } + /// /// Verifies functionality of the /// and methods. @@ -1072,31 +1172,33 @@ public async Task EventProcessingLogsExceptions() } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// [Test] - public async Task ListCheckpointsDelegatesToTheStorageManager() + public async Task GetCheckpointAsyncDelegatesToTheStorageManager() { using var cancellationSource = new CancellationTokenSource(); cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + var partitionId = "5"; var mockStorageManager = new Mock(); var processorClient = new TestEventProcessorClient(mockStorageManager.Object, "consumerGroup", "namespace", "eventHub", Mock.Of(), Mock.Of(), default); mockStorageManager - .Setup(storage => storage.ListCheckpointsAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .ReturnsAsync(default(IEnumerable)); + .Setup(storage => storage.GetCheckpointAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(default(EventProcessorCheckpoint)); - await processorClient.InvokeListCheckpointsAsync(cancellationSource.Token); + await processorClient.InvokeGetCheckpointAsync(partitionId, cancellationSource.Token); Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled."); mockStorageManager - .Verify(storage => storage.ListCheckpointsAsync( + .Verify(storage => storage.GetCheckpointAsync( processorClient.FullyQualifiedNamespace, processorClient.EventHubName, processorClient.ConsumerGroup, + partitionId, It.IsAny()), Times.Once); @@ -1104,12 +1206,12 @@ public async Task ListCheckpointsDelegatesToTheStorageManager() } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// [Test] - public async Task ListCheckpointsIncludesInitializeEventHandlerStartingPositionWhenNoNaturalCheckpointExists() + public async Task GetCheckpointIncludesInitializeEventHandlerStartingPositionWhenNoNaturalCheckpointExists() { using var cancellationSource = new CancellationTokenSource(); cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); @@ -1120,15 +1222,9 @@ public async Task ListCheckpointsIncludesInitializeEventHandlerStartingPositionW var mockStorageManager = new Mock(); var processorClient = new TestEventProcessorClient(mockStorageManager.Object, "consumerGroup", "namespace", "eventHub", Mock.Of(), Mock.Of(), options); - var sourceCheckpoints = new[] - { - new EventProcessorCheckpoint { PartitionId = "7", StartingPosition = EventPosition.FromOffset(111) }, - new EventProcessorCheckpoint { PartitionId = "4", StartingPosition = EventPosition.FromOffset(222) } - }; - mockStorageManager - .Setup(storage => storage.ListCheckpointsAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .ReturnsAsync(sourceCheckpoints); + .Setup(storage => storage.GetCheckpointAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(default(EventProcessorCheckpoint)); processorClient.PartitionInitializingAsync += eventArgs => { @@ -1139,26 +1235,22 @@ public async Task ListCheckpointsIncludesInitializeEventHandlerStartingPositionW await processorClient.InvokeOnInitializingPartitionAsync(new TestEventProcessorPartition(partitionId), cancellationSource.Token); Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled."); - var checkpoints = (await processorClient.InvokeListCheckpointsAsync(cancellationSource.Token))?.ToList(); + var checkpoint = await processorClient.InvokeGetCheckpointAsync(partitionId, cancellationSource.Token); Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled."); - Assert.That(checkpoints, Is.Not.Null, "A set of checkpoints should have been returned."); - Assert.That(checkpoints.Count, Is.EqualTo(sourceCheckpoints.Length + 1), "The source checkpoints and the initialized partition should have been in the set."); - - var partitionCheckpoint = checkpoints.SingleOrDefault(checkpoint => checkpoint.PartitionId == partitionId); - Assert.That(partitionCheckpoint, Is.Not.Null, "A checkpoint for the initialized partition should have been injected."); - Assert.That(partitionCheckpoint.StartingPosition, Is.EqualTo(startingPosition), "The injected checkpoint should have respected the value that the initialization event handler set."); + Assert.That(checkpoint, Is.Not.Null, "A checkpoint should have been injected for the partition."); + Assert.That(checkpoint.StartingPosition, Is.EqualTo(startingPosition), "The injected checkpoint should have respected the value that the initialization event handler set."); cancellationSource.Cancel(); } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// [Test] - public async Task ListCheckpointsPrefersNaturalCheckpointOverInitializeEventHandlerStartingPosition() + public async Task GetCheckpointPrefersNaturalCheckpointOverInitializeEventHandlerStartingPosition() { using var cancellationSource = new CancellationTokenSource(); cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); @@ -1170,16 +1262,9 @@ public async Task ListCheckpointsPrefersNaturalCheckpointOverInitializeEventHand var mockStorageManager = new Mock(); var processorClient = new TestEventProcessorClient(mockStorageManager.Object, "consumerGroup", "namespace", "eventHub", Mock.Of(), Mock.Of(), options); - var sourceCheckpoints = new[] - { - new EventProcessorCheckpoint { PartitionId = "7", StartingPosition = EventPosition.FromOffset(111) }, - new EventProcessorCheckpoint { PartitionId = "4", StartingPosition = EventPosition.FromOffset(222) }, - new EventProcessorCheckpoint { PartitionId = partitionId, StartingPosition = checkpointStartingPosition } - }; - mockStorageManager - .Setup(storage => storage.ListCheckpointsAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .ReturnsAsync(sourceCheckpoints); + .Setup(storage => storage.GetCheckpointAsync(It.IsAny(), It.IsAny(), It.IsAny(), partitionId, It.IsAny())) + .ReturnsAsync(new EventProcessorCheckpoint { PartitionId = partitionId, StartingPosition = checkpointStartingPosition }); processorClient.PartitionInitializingAsync += eventArgs => { @@ -1190,26 +1275,22 @@ public async Task ListCheckpointsPrefersNaturalCheckpointOverInitializeEventHand await processorClient.InvokeOnInitializingPartitionAsync(new TestEventProcessorPartition(partitionId), cancellationSource.Token); Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled."); - var checkpoints = (await processorClient.InvokeListCheckpointsAsync(cancellationSource.Token))?.ToList(); + var checkpoint = await processorClient.InvokeGetCheckpointAsync(partitionId, cancellationSource.Token); Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled."); - Assert.That(checkpoints, Is.Not.Null, "A set of checkpoints should have been returned."); - Assert.That(checkpoints.Count, Is.EqualTo(sourceCheckpoints.Length), "The source checkpoints should have been in the set."); - - var partitionCheckpoint = checkpoints.SingleOrDefault(checkpoint => checkpoint.PartitionId == partitionId); - Assert.That(partitionCheckpoint, Is.Not.Null, "A checkpoint for the initialized partition should exist naturally."); - Assert.That(partitionCheckpoint.StartingPosition, Is.EqualTo(checkpointStartingPosition), "The natural checkpoint should have respected the value that the initialization event handler set."); + Assert.That(checkpoint, Is.Not.Null, "A checkpoints should have been found for the partition."); + Assert.That(checkpoint.StartingPosition, Is.EqualTo(checkpointStartingPosition), "The natural checkpoint should have respected the value that the initialization event handler set."); cancellationSource.Cancel(); } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// [Test] - public async Task ListCheckpointsReturnsNaturalCheckpointsWhenNoInitializeEventHandlerIsRegistered() + public async Task GetCheckpointReturnsNaturalCheckpointsWhenNoInitializeEventHandlerIsRegistered() { using var cancellationSource = new CancellationTokenSource(); cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); @@ -1219,28 +1300,40 @@ public async Task ListCheckpointsReturnsNaturalCheckpointsWhenNoInitializeEventH var mockStorageManager = new Mock(); var processorClient = new TestEventProcessorClient(mockStorageManager.Object, "consumerGroup", "namespace", "eventHub", Mock.Of(), Mock.Of(), options); - var sourceCheckpoints = new[] - { - new EventProcessorCheckpoint { PartitionId = "7", StartingPosition = EventPosition.FromOffset(111) }, - new EventProcessorCheckpoint { PartitionId = "4", StartingPosition = EventPosition.FromOffset(222) } - }; - mockStorageManager - .Setup(storage => storage.ListCheckpointsAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) - .ReturnsAsync(sourceCheckpoints); + .Setup(storage => storage.GetCheckpointAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(default(EventProcessorCheckpoint)); await processorClient.InvokeOnInitializingPartitionAsync(new TestEventProcessorPartition(partitionId), cancellationSource.Token); Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled."); - var checkpoints = (await processorClient.InvokeListCheckpointsAsync(cancellationSource.Token))?.ToList(); + var checkpoint = await processorClient.InvokeGetCheckpointAsync(partitionId, cancellationSource.Token); + Assert.That(cancellationSource.IsCancellationRequested, Is.False, "The cancellation token should not have been signaled."); + Assert.That(checkpoint, Is.Null, "No handler was registered for the partition; no checkpoint should have been injected."); - Assert.That(checkpoints, Is.Not.Null, "A set of checkpoints should have been returned."); - Assert.That(checkpoints.Count, Is.EqualTo(sourceCheckpoints.Length), "The source checkpoints should have been in the set."); + cancellationSource.Cancel(); + } - var partitionCheckpoint = checkpoints.SingleOrDefault(checkpoint => checkpoint.PartitionId == partitionId); - Assert.That(partitionCheckpoint, Is.Null, "No handler was registered for the partition; no checkpoint should have been injected."); + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + public void ListCheckpointsIsDeprecatedAndThrows() + { + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + + var mockStorageManager = new Mock(); + var processorClient = new TestEventProcessorClient(mockStorageManager.Object, "consumerGroup", "namespace", "eventHub", Mock.Of(), Mock.Of(), default); + + mockStorageManager + .Setup(storage => storage.ListCheckpointsAsync(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .ReturnsAsync(default(IEnumerable)); + Assert.That(async () => await processorClient.InvokeListCheckpointsAsync(cancellationSource.Token), Throws.InstanceOf()); cancellationSource.Cancel(); } @@ -1535,11 +1628,23 @@ internal TestEventProcessorClient(StorageManager storageManager, InjectedConnection = connection; } + internal TestEventProcessorClient(BlobContainerClient containerClient, + string consumerGroup, + string fullyQualifiedNamespace, + string eventHubName, + TokenCredential credential, + EventHubConnection connection, + EventProcessorClientOptions options) : base(containerClient, consumerGroup, fullyQualifiedNamespace, eventHubName, credential, options) + { + InjectedConnection = connection; + } + public Task InvokeOnProcessingEventBatchAsync(IEnumerable events, EventProcessorPartition partition, CancellationToken cancellationToken) => base.OnProcessingEventBatchAsync(events, partition, cancellationToken); public Task InvokeOnProcessingErrorAsync(Exception exception, EventProcessorPartition partition, string operationDescription, CancellationToken cancellationToken) => base.OnProcessingErrorAsync(exception, partition, operationDescription, cancellationToken); public Task InvokeOnInitializingPartitionAsync(EventProcessorPartition partition, CancellationToken cancellationToken) => base.OnInitializingPartitionAsync(partition, cancellationToken); public Task InvokeOnPartitionProcessingStoppedAsync(EventProcessorPartition partition, ProcessingStoppedReason reason, CancellationToken cancellationToken) => base.OnPartitionProcessingStoppedAsync(partition, reason, cancellationToken); public Task> InvokeListCheckpointsAsync(CancellationToken cancellationToken) => base.ListCheckpointsAsync(cancellationToken); + public Task InvokeGetCheckpointAsync(string partitionId, CancellationToken cancellationToken) => base.GetCheckpointAsync(partitionId, cancellationToken); public Task> InvokeListOwnershipAsync(CancellationToken cancellationToken) => base.ListOwnershipAsync(cancellationToken); public Task> InvokeClaimOwnershipAsync(IEnumerable desiredOwnership, CancellationToken cancellationToken) => base.ClaimOwnershipAsync(desiredOwnership, cancellationToken); protected override EventHubConnection CreateConnection() => InjectedConnection; @@ -1553,5 +1658,69 @@ public class TestEventProcessorPartition : EventProcessorPartition { public TestEventProcessorPartition(string partitionId) { PartitionId = partitionId; } } + + /// + /// A mock used for testing purposes. + /// + /// + public class MockBlobClient : BlobClient + { + public override string Name { get; } + public Exception UploadException; + public Exception DeleteException; + + public MockBlobClient(string blobName) + { + Name = blobName; + } + + public override Task> UploadAsync(Stream content, BlobHttpHeaders httpHeaders = null, IDictionary metadata = null, BlobRequestConditions conditions = null, IProgress progressHandler = null, AccessTier? accessTier = null, StorageTransferOptions transferOptions = default, CancellationToken cancellationToken = default) + { + if (UploadException != null) + { + throw UploadException; + } + + return Task.FromResult( + Response.FromValue( + BlobsModelFactory.BlobContentInfo(new ETag("etag"), new DateTimeOffset(2015, 10, 27, 00, 00, 00, 00, TimeSpan.Zero), Array.Empty(), string.Empty, 0L), + Mock.Of())); + } + + public override Response Upload(Stream content, BlobHttpHeaders httpHeaders = null, IDictionary metadata = null, BlobRequestConditions conditions = null, IProgress progressHandler = null, AccessTier? accessTier = null, StorageTransferOptions transferOptions = default, CancellationToken cancellationToken = default) + { + if (UploadException != null) + { + throw UploadException; + } + + return Response.FromValue( + BlobsModelFactory.BlobContentInfo(new ETag("etag"), new DateTimeOffset(2015, 10, 27, 00, 00, 00, 00, TimeSpan.Zero), Array.Empty(), string.Empty, 0L), + Mock.Of()); + } + + public override Task> DeleteIfExistsAsync(DeleteSnapshotsOption snapshotsOption = DeleteSnapshotsOption.None, BlobRequestConditions conditions = null, CancellationToken cancellationToken = default) + { + if (DeleteException != null) + { + throw DeleteException; + } + + return Task.FromResult(Response.FromValue(true, Mock.Of())); + } + + public override Response DeleteIfExists(DeleteSnapshotsOption snapshotsOption = DeleteSnapshotsOption.None, BlobRequestConditions conditions = null, CancellationToken cancellationToken = default) + { + if (DeleteException != null) + { + throw DeleteException; + } + + return Response.FromValue(true, Mock.Of()); + } + + public override Task> GetPropertiesAsync(BlobRequestConditions conditions = null, CancellationToken cancellationToken = default) => + Task.FromResult(Response.FromValue(Mock.Of(), Mock.Of())); + } } } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Resources.Designer.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Resources.Designer.cs index d0e3034bac962..8a4720d4a0901 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Resources.Designer.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Resources.Designer.cs @@ -736,5 +736,16 @@ internal static string IdempotentAlreadyPublished return ResourceManager.GetString("IdempotentAlreadyPublished", resourceCulture); } } + + /// + /// Looks up a localized string similar to The ListCheckpointsAsync method has been superseded by GetCheckpointAsync and should no longer be called.. + /// + internal static string ListCheckpointsAsyncObsolete + { + get + { + return ResourceManager.GetString("ListCheckpointsAsyncObsolete", resourceCulture); + } + } } } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Resources.resx b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Resources.resx index 40c511371738b..c558626cfbd8b 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Resources.resx +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Resources.resx @@ -297,4 +297,7 @@ These events have already been successfully published. When idempotent publishing is enabled, events that were acknowledged by the Event Hubs service may not be published again. + + The ListCheckpointsAsync method has been superseded by GetCheckpointAsync and should no longer be called. + \ No newline at end of file diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs index 9b0cb0f0988c0..a29f2587528ed 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs @@ -415,6 +415,13 @@ private EventProcessor(int eventBatchMaximumCount, /// /// A instance to signal the request to cancel the start operation. This won't affect the once it starts running. /// + /// + /// As the processor starts, it will attempt to detect configuration and permissions errors that would prevent it from + /// being able to recover without intervention. For example, an incorrect connection string or the inability to query the + /// Event Hub would be detected. These exceptions will be packaged as an , and will cause + /// to fail. + /// + /// public virtual async Task StartProcessingAsync(CancellationToken cancellationToken = default) => await StartProcessingInternalAsync(true, cancellationToken).ConfigureAwait(false); @@ -425,6 +432,13 @@ public virtual async Task StartProcessingAsync(CancellationToken cancellationTok /// /// A instance to signal the request to cancel the start operation. This won't affect the once it starts running. /// + /// + /// As the processor starts, it will attempt to detect configuration and permissions errors that would prevent it from + /// being able to recover without intervention. For example, an incorrect connection string or the inability to query the + /// Event Hub would be detected. These exceptions will be packaged as an , and will cause + /// to fail. + /// + /// public virtual void StartProcessing(CancellationToken cancellationToken = default) => StartProcessingInternalAsync(false, cancellationToken).EnsureCompleted(); @@ -790,6 +804,54 @@ async Task performProcessing() ); } + /// + /// Performs the tasks needed to validate basic configuration and permissions of the dependencies needed for + /// the processor to function. + /// + /// + /// When true, the method will be executed asynchronously; otherwise, it will execute synchronously. + /// A instance to signal the request to cancel the start operation. + /// + /// Any validation failures will result in an aggregate exception. + /// + internal virtual async Task ValidateStartupAsync(bool async, + CancellationToken cancellationToken = default) + { + var validationTask = Task.WhenAll + ( + ValidateEventHubsConnectionAsync(cancellationToken), + ValidateStorageConnectionAsync(cancellationToken) + ); + + if (async) + { + try + { + await validationTask.ConfigureAwait(false); + } + catch + { + // If the validation task has an exception, it will be the aggregate exception + // that we wish to surface. Use that if it is available. + + if (validationTask.Exception != null) + { + throw validationTask.Exception; + } + + throw; + } + } + else + { + // Wait is used over GetAwaiter().GetResult() because it will + // ensure an AggregateException is thrown rather than unwrapping and + // throwing only the first exception. + + validationTask.Wait(cancellationToken); + } + } + /// /// Creates an to use for communicating with the Event Hubs service. /// @@ -1021,6 +1083,7 @@ private async Task StartProcessingInternalAsync(bool async, cancellationToken.ThrowIfCancellationRequested(); Logger.EventProcessorStart(Identifier, EventHubName, ConsumerGroup); + var capturedValidationException = default(Exception); var releaseGuard = false; try @@ -1059,6 +1122,37 @@ private async Task StartProcessingInternalAsync(bool async, ActivePartitionProcessors.Clear(); _runningProcessorTask = RunProcessingAsync(_runningProcessorCancellationSource.Token); + + // Validate the processor configuration and ensuring basic permissions are held for + // service operations. + + try + { + if (async) + { + await ValidateStartupAsync(async, cancellationToken).ConfigureAwait(false); + } + else + { + ValidateStartupAsync(async, cancellationToken).EnsureCompleted(); + } + } + catch (AggregateException ex) + { + // Capture the validation exception and log, but do not throw. Because this is + // a fatal exception and the processing task was already started, StopProcessing + // will need to be called, which requires the semaphore. The validation exception + // will be handled after the start operation has officially completed and the + // semaphore has been released. + + capturedValidationException = ex.Flatten(); + Logger.EventProcessorStartError(Identifier, EventHubName, ConsumerGroup, ex.Message); + + // Canceling the main source here won't cause a problem and will help expedite stopping + // the processor later. + + _runningProcessorCancellationSource?.Cancel(); + } } catch (OperationCanceledException ex) { @@ -1083,6 +1177,31 @@ private async Task StartProcessingInternalAsync(bool async, ProcessorRunningGuard.Release(); } } + + // If there was a validation exception captured, then stop the processor now + // that it is safe to do so. + + if (capturedValidationException != null) + { + try + { + if (async) + { + await StopProcessingInternalAsync(async, CancellationToken.None).ConfigureAwait(false); + } + else + { + StopProcessingInternalAsync(async, CancellationToken.None).EnsureCompleted(); + } + } + catch + { + // An exception is expected here, as the processor configuration was invalid and + // processing was canceled. It will have already been logged; ignore it here. + } + + ExceptionDispatchInfo.Capture(capturedValidationException).Throw(); + } } /// @@ -1615,6 +1734,40 @@ private Task InvokeOnProcessingErrorAsync(Exception exception, string operationDescription, CancellationToken cancellationToken) => Task.Run(() => OnProcessingErrorAsync(exception, partition, operationDescription, cancellationToken), CancellationToken.None); + /// + /// Performs the actions needed to validate the connection to the requested + /// Event Hub. + /// + /// + /// A instance to signal the request to cancel the validation. + /// + private async Task ValidateEventHubsConnectionAsync(CancellationToken cancellationToken = default) + { + // Validate that the Event Hubs connection is valid by querying properties of the Event Hub. + // This is core functionality for the processor to discover partitions and validates read access. + + var connection = CreateConnection(); + await using var connectionAwaiter = connection.ConfigureAwait(false); + await connection.GetPropertiesAsync(RetryPolicy, cancellationToken).ConfigureAwait(false); + } + + /// + /// Performs the actions needed to validate the connection to the storage + /// provider for checkpoints and ownership. + /// + /// + /// A instance to signal the request to cancel the validation. + /// + private async Task ValidateStorageConnectionAsync(CancellationToken cancellationToken) + { + // Because the processor does not have knowledge of what storage implementation is in use, + // it cannot perform any specific in-depth validations. Use the standard checkpoint query + // for an invalid partition; this should ensure that the basic storage connection can be made + // and that a read operation is valid. + + await GetCheckpointAsync("-1", cancellationToken).ConfigureAwait(false); + } + /// /// Creates a to use for interacting with durable storage. /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.Infrastructure.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.Infrastructure.cs index f584ac2f24f15..32e8582162c9f 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.Infrastructure.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.Infrastructure.cs @@ -78,6 +78,10 @@ public async Task ReadLastEnqueuedEventPropertiesReadsPropertiesWhenThePartition .Setup(processor => processor.CreateConsumer(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) .Returns(Mock.Of()); + mockProcessor + .Setup(processor => processor.ValidateStartupAsync(It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + await mockProcessor.Object.StartProcessingAsync(cancellationSource.Token); Assert.That(mockProcessor.Object.Status, Is.EqualTo(EventProcessorStatus.Running), "The processor should not fault if a load balancing cycle fails."); @@ -131,6 +135,10 @@ public async Task ReadLastEnqueuedEventPropertiesThrowsWhenThePartitionIsNotOwne .Setup(processor => processor.CreateConsumer(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) .Returns(Mock.Of()); + mockProcessor + .Setup(processor => processor.ValidateStartupAsync(It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + await mockProcessor.Object.StartProcessingAsync(cancellationSource.Token); Assert.That(mockProcessor.Object.Status, Is.EqualTo(EventProcessorStatus.Running), "The processor should not fault if a load balancing cycle fails."); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.MainProcessingLoop.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.MainProcessingLoop.cs index b0a8a9fca188b..b6103ce0b28f4 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.MainProcessingLoop.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.MainProcessingLoop.cs @@ -48,6 +48,12 @@ public async Task BackgroundProcessingDispatchesTopLevelExceptions() .Setup(processor => processor.CreateConnection()) .Throws(expectedException); + mockProcessor + .Setup(processor => processor.ValidateStartupAsync( + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + // Delay the return from the error handler by slightly longer than cancellation is triggered in // order to validate that the handler call does not block or delay other processor operations. @@ -98,6 +104,12 @@ public async Task BackgroundProcessingLogsTopLevelExceptions() .Setup(processor => processor.CreateConnection()) .Throws(expectedException); + mockProcessor + .Setup(processor => processor.ValidateStartupAsync( + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + mockLogger .Setup(log => log.EventProcessorTaskError(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) .Callback(() => completionSource.TrySetResult(true)); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.StartStop.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.StartStop.cs index 4b2c9c648d78f..8c7ed4a438e31 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.StartStop.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Primitives/EventProcessorTests.StartStop.cs @@ -70,6 +70,12 @@ public async Task StartProcessingStartsTheProcessing(bool async) .Setup(processor => processor.CreateConnection()) .Returns(Mock.Of()); + mockProcessor + .Setup(processor => processor.ValidateStartupAsync( + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + if (async) { await mockProcessor.Object.StartProcessingAsync(cancellationSource.Token); @@ -126,6 +132,12 @@ public async Task StartProcessingStartsTheLoadBalancer(bool async) .Setup(processor => processor.CreateConnection()) .Returns(mockConnection.Object); + mockProcessor + .Setup(processor => processor.ValidateStartupAsync( + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + if (async) { await mockProcessor.Object.StartProcessingAsync(cancellationSource.Token); @@ -172,6 +184,12 @@ public async Task StartProcessingDoesNotAttemptToStartWhenRunning(bool async) .Setup(processor => processor.CreateConnection()) .Returns(Mock.Of()); + mockProcessor + .Setup(processor => processor.ValidateStartupAsync( + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + if (async) { await mockProcessor.Object.StartProcessingAsync(cancellationSource.Token); @@ -206,6 +224,220 @@ public async Task StartProcessingDoesNotAttemptToStartWhenRunning(bool async) await mockProcessor.Object.StopProcessingAsync(cancellationSource.Token).IgnoreExceptions(); } + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + [TestCase(true)] + [TestCase(false)] + public async Task StartProcessingValidatesTheEventHubsConnectionCanBeCreated(bool async) + { + using var cancellationSource = new CancellationTokenSource(); + + var capturedException = default(Exception); + var expectedException = new DivideByZeroException("The universe will now end."); + var mockProcessor = new Mock>(4, "consumerGroup", "namespace", "eventHub", Mock.Of(), default(EventProcessorOptions)) { CallBase = true }; + + mockProcessor + .Setup(processor => processor.CreateConnection()) + .Throws(expectedException); + + try + { + if (async) + { + await mockProcessor.Object.StartProcessingAsync(cancellationSource.Token); + } + else + { + mockProcessor.Object.StartProcessing(cancellationSource.Token); + } + } + catch (Exception ex) + { + capturedException = ex; + } + + Assert.That(capturedException, Is.Not.Null, "An exception should have been thrown."); + Assert.That(capturedException, Is.InstanceOf(), "A validation exception should be surfaced as an AggregateException."); + Assert.That(((AggregateException)capturedException).InnerExceptions.Count, Is.EqualTo(1), "There should have been a single validation exception."); + + var innerException = ((AggregateException)capturedException).InnerExceptions.First(); + Assert.That(innerException, Is.SameAs(expectedException), "The source of the validation exception should have been exposed."); + Assert.That(mockProcessor.Object.IsRunning, Is.False, "The processor should not be running after a validation exception."); + } + + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + [TestCase(true)] + [TestCase(false)] + public async Task StartProcessingValidatesTheEventHubCanBeQueried(bool async) + { + using var cancellationSource = new CancellationTokenSource(); + + var capturedException = default(Exception); + var expectedException = new DivideByZeroException("The universe will now end."); + var mockConnection = new Mock(); + var mockProcessor = new Mock>(4, "consumerGroup", "namespace", "eventHub", Mock.Of(), default(EventProcessorOptions)) { CallBase = true }; + + mockProcessor + .Setup(processor => processor.CreateConnection()) + .Returns(mockConnection.Object); + + mockConnection + .Setup(connection => connection.GetPropertiesAsync( + It.IsAny(), + It.IsAny())) + .Throws(expectedException); + + try + { + if (async) + { + await mockProcessor.Object.StartProcessingAsync(cancellationSource.Token); + } + else + { + mockProcessor.Object.StartProcessing(cancellationSource.Token); + } + } + catch (Exception ex) + { + capturedException = ex; + } + + Assert.That(capturedException, Is.Not.Null, "An exception should have been thrown."); + Assert.That(capturedException, Is.InstanceOf(), "A validation exception should be surfaced as an AggregateException."); + Assert.That(((AggregateException)capturedException).InnerExceptions.Count, Is.EqualTo(1), "There should have been a single validation exception."); + + var innerException = ((AggregateException)capturedException).InnerExceptions.First(); + Assert.That(innerException, Is.SameAs(expectedException), "The source of the validation exception should have been exposed."); + Assert.That(mockProcessor.Object.IsRunning, Is.False, "The processor should not be running after a validation exception."); + } + + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + [TestCase(true)] + [TestCase(false)] + public async Task StartProcessingValidatesCheckpointsCanBeQueried(bool async) + { + using var cancellationSource = new CancellationTokenSource(); + + var capturedException = default(Exception); + var expectedException = new DivideByZeroException("The universe will now end."); + var mockProcessor = new Mock>(4, "consumerGroup", "namespace", "eventHub", Mock.Of(), default(EventProcessorOptions)) { CallBase = true }; + + mockProcessor + .Setup(processor => processor.CreateConnection()) + .Returns(Mock.Of()); + + mockProcessor + .Protected() + .Setup("GetCheckpointAsync", + ItExpr.IsAny(), + ItExpr.IsAny()) + .Throws(expectedException); + + try + { + if (async) + { + await mockProcessor.Object.StartProcessingAsync(cancellationSource.Token); + } + else + { + mockProcessor.Object.StartProcessing(cancellationSource.Token); + } + } + catch (Exception ex) + { + capturedException = ex; + } + + Assert.That(capturedException, Is.Not.Null, "An exception should have been thrown."); + Assert.That(capturedException, Is.InstanceOf(), "A validation exception should be surfaced as an AggregateException."); + Assert.That(((AggregateException)capturedException).InnerExceptions.Count, Is.EqualTo(1), "There should have been a single validation exception."); + + var innerException = ((AggregateException)capturedException).InnerExceptions.First(); + Assert.That(innerException, Is.SameAs(expectedException), "The source of the validation exception should have been exposed."); + Assert.That(mockProcessor.Object.IsRunning, Is.False, "The processor should not be running after a validation exception."); + } + + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + [TestCase(true)] + [TestCase(false)] + public async Task StartProcessingSurfacesMultipleValidationFailures(bool async) + { + using var cancellationSource = new CancellationTokenSource(); + + var capturedException = default(Exception); + var eventHubException = new DivideByZeroException("The universe will now end."); + var storageException = new FormatException("I find your format offensive."); + var mockConnection = new Mock(); + var mockProcessor = new Mock>(4, "consumerGroup", "namespace", "eventHub", Mock.Of(), default(EventProcessorOptions)) { CallBase = true }; + + mockProcessor + .Setup(processor => processor.CreateConnection()) + .Returns(mockConnection.Object); + + mockProcessor + .Protected() + .Setup("GetCheckpointAsync", + ItExpr.IsAny(), + ItExpr.IsAny()) + .Throws(storageException); + + mockConnection + .Setup(connection => connection.GetPropertiesAsync( + It.IsAny(), + It.IsAny())) + .Throws(eventHubException); + + try + { + if (async) + { + await mockProcessor.Object.StartProcessingAsync(cancellationSource.Token); + } + else + { + mockProcessor.Object.StartProcessing(cancellationSource.Token); + } + } + catch (Exception ex) + { + capturedException = ex; + } + + Assert.That(capturedException, Is.Not.Null, "An exception should have been thrown."); + Assert.That(capturedException, Is.InstanceOf(), "A validation exception should be surfaced as an AggregateException."); + Assert.That(mockProcessor.Object.IsRunning, Is.False, "The processor should not be running after a validation exception."); + + var aggregateException = (AggregateException)capturedException; + Assert.That(aggregateException.InnerExceptions.Count, Is.EqualTo(2), "There should have been two validation exceptions."); + + var eventHubInnerException = aggregateException.InnerExceptions.Where(ex => ReferenceEquals(ex, eventHubException)).FirstOrDefault(); + Assert.That(eventHubInnerException, Is.Not.Null, "The Event Hub exception should have been surfaced."); + + var storageInnerException = aggregateException.InnerExceptions.Where(ex => ReferenceEquals(ex, storageException)).FirstOrDefault(); + Assert.That(storageInnerException, Is.Not.Null, "The storage exception should have been surfaced."); + } + /// /// Verifies functionality of the /// method. @@ -560,6 +792,12 @@ public async Task StopProcessingSurfacesExceptions(bool async) .Callback(() => completionSource.TrySetResult(true)) .Throws(expectedException); + mockProcessor + .Setup(processor => processor.ValidateStartupAsync( + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + await mockProcessor.Object.StartProcessingAsync(cancellationSource.Token); await completionSource.Task; @@ -601,6 +839,12 @@ public async Task StopProcessingResetsState(bool async) .Throws(expectedException) .Returns(Mock.Of()); + mockProcessor + .Setup(processor => processor.ValidateStartupAsync( + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + // Starting the processor should result in an exception on the first call, which should leave it in a faulted state. await mockProcessor.Object.StartProcessingAsync(cancellationSource.Token); @@ -738,6 +982,12 @@ public async Task StopProcessingLogsErrorDuringShutdown(bool async) .Setup(processor => processor.CreateConnection()) .Returns(default(EventHubConnection)); + mockProcessor + .Setup(processor => processor.ValidateStartupAsync( + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + await mockProcessor.Object.StartProcessingAsync(cancellationSource.Token); Assert.That(mockProcessor.Object.IsRunning, Is.True, "The processor should be running."); Assert.That(mockProcessor.Object.Status, Is.EqualTo(EventProcessorStatus.Running), "The processor status should report that it is running."); @@ -797,6 +1047,12 @@ public async Task StopProcessingLogsFaultedTaskDuringShutdown(bool async) .Setup(processor => processor.CreateConnection()) .Throws(expectedException); + mockProcessor + .Setup(processor => processor.ValidateStartupAsync( + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + await mockProcessor.Object.StartProcessingAsync(cancellationSource.Token); Assert.That(mockProcessor.Object.IsRunning, Is.False, "The processor should have faulted during startup."); Assert.That(GetRunningProcessorTask(mockProcessor.Object).IsFaulted, Is.True, "The task for processing should be faulted.");