From 8b2c7e2ba45212409170e78d3c0a7cdb46d0ccd8 Mon Sep 17 00:00:00 2001 From: Jesse Squire Date: Fri, 2 Oct 2020 14:12:32 -0400 Subject: [PATCH] [Event Hubs Client] Minor Improvements The focus of these changes is to make several minor improvements, which are largely unrelated to one another. These include: - Documentation for the expected delay when stopping an Event Processor - Better documentation for client lifetimes and cachability - Additional test scenarios for the Idempotent Producer - Fixing a CODEOWNERS typo --- .github/CODEOWNERS | 2 +- .../src/EventProcessorClient.cs | 23 ++++++++++ .../src/Consumer/EventHubConsumerClient.cs | 19 +++++--- .../Primitives/EventProcessor{TPartition}.cs | 23 ++++++++++ .../src/Primitives/PartitionReceiver.cs | 9 +++- .../src/Producer/EventHubProducerClient.cs | 23 +++++++--- .../Producer/IdempotentPublishingLiveTests.cs | 45 +++++++++++++++++++ 7 files changed, 129 insertions(+), 15 deletions(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index bbaa554b59dba..82a6d30ad68fe 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -55,7 +55,7 @@ /sdk/keyvault/ @schaabs @heaths # PRLabel: %Cognitive - Metrics Advisor -sdk/metricsadvisor/ @kinelski +/sdk/metricsadvisor/ @kinelski # PRLabel: %Search /sdk/search/ @brjohnstmsft @arv100kri @bleroy @tg-msft @heaths diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs index 172bb181f6e02..9365151d29fc5 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/src/EventProcessorClient.cs @@ -27,6 +27,13 @@ namespace Azure.Messaging.EventHubs     ///   group pairing to share work by using a common storage platform to communicate.  Fault tolerance is also built-in,     ///   allowing the processor to be resilient in the face of errors.     ///  + /// + /// + /// The is safe to cache and use for the lifetime of an application, and that is best practice when the application + /// processes events regularly or semi-regularly. The processor holds responsibility for efficient resource management, working to keep resource usage low during + /// periods of inactivity and manage health during periods of higher use. Calling either the or + /// method when processing is complete or as the application is shutting down will ensure that network resources and other unmanaged objects are properly cleaned up. + ///     /// [SuppressMessage("Usage", "CA1001:Types that own disposable fields should be disposable.", Justification = "Disposal is managed internally as part of the Stop operation.")] public class EventProcessorClient : EventProcessor @@ -554,6 +561,14 @@ public override void StartProcessing(CancellationToken cancellationToken = defau /// /// A instance to signal the request to cancel the stop operation. If the operation is successfully canceled, the will keep running. /// + /// + /// When stopping, the processor will update the ownership of partitions that it was responsible for processing and clean up network resources used for communication with + /// the Event Hubs service. As a result, this method will perform network I/O and may need to wait for partition reads that were active to complete. + /// + /// Due to service calls and network latency, an invocation of this method may take slightly longer than the specified or + /// if the wait time was not configured, the duration of the of the configured retry policy. + /// + /// public override Task StopProcessingAsync(CancellationToken cancellationToken = default) => base.StopProcessingAsync(cancellationToken); /// @@ -563,6 +578,14 @@ public override void StartProcessing(CancellationToken cancellationToken = defau /// /// A instance to signal the request to cancel the stop operation. If the operation is successfully canceled, the will keep running. /// + /// + /// When stopping, the processor will update the ownership of partitions that it was responsible for processing and clean up network resources used for communication with + /// the Event Hubs service. As a result, this method will perform network I/O and may need to wait for partition reads that were active to complete. + /// + /// Due to service calls and network latency, an invocation of this method may take slightly longer than the specified or + /// if the wait time was not configured, the duration of the of the configured retry policy. + /// + /// public override void StopProcessing(CancellationToken cancellationToken = default) => base.StopProcessing(cancellationToken); /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Consumer/EventHubConsumerClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Consumer/EventHubConsumerClient.cs index f79990437246c..fbf8e0808ea5e 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Consumer/EventHubConsumerClient.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Consumer/EventHubConsumerClient.cs @@ -20,18 +20,25 @@ namespace Azure.Messaging.EventHubs.Consumer { /// - /// A client responsible for reading from a specific Event Hub - /// as a member of a specific consumer group. + /// A client responsible for reading from a specific Event Hub + /// as a member of a specific consumer group. /// - /// A consumer may be exclusive, which asserts ownership over associated partitions for the consumer + /// A consumer may be exclusive, which asserts ownership over associated partitions for the consumer /// group to ensure that only one consumer from that group is reading the from the partition. - /// These exclusive consumers are sometimes referred to as "Epoch Consumers." + /// These exclusive consumers are sometimes referred to as "Epoch Consumers." /// - /// A consumer may also be non-exclusive, allowing multiple consumers from the same consumer + /// A consumer may also be non-exclusive, allowing multiple consumers from the same consumer /// group to be actively reading events from a given partition. These non-exclusive consumers are - /// sometimes referred to as "Non-Epoch Consumers." + /// sometimes referred to as "Non-Epoch Consumers." /// /// + /// + /// The is safe to cache and use for the lifetime of an application, and that is best practice when the application + /// reads events regularly or semi-regularly. The consumer holds responsibility for efficient resource management, working to keep resource usage low during + /// periods of inactivity and manage health during periods of higher use. Calling either the or + /// method as the application is shutting down will ensure that network resources and other unmanaged objects are properly cleaned up. + /// + /// public class EventHubConsumerClient : IAsyncDisposable { /// The name of the default consumer group in the Event Hubs service. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor{TPartition}.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor{TPartition}.cs index 53de29667b396..753d13c1f0401 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor{TPartition}.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor{TPartition}.cs @@ -30,6 +30,13 @@ namespace Azure.Messaging.EventHubs.Primitives /// /// The context of the partition for which an operation is being performed. /// + /// + /// The is safe to cache and use for the lifetime of an application, and that is best practice when the application + /// processes events regularly or semi-regularly. The processor holds responsibility for efficient resource management, working to keep resource usage low during + /// periods of inactivity and manage health during periods of higher use. Calling either the or + /// method when processing is complete or as the application is shutting down will ensure that network resources and other unmanaged objects are properly cleaned up. + /// + /// [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] public abstract class EventProcessor where TPartition : EventProcessorPartition, new() { @@ -371,6 +378,14 @@ public virtual void StartProcessing(CancellationToken cancellationToken = defaul /// /// A instance to signal the request to cancel the stop operation. If the operation is successfully canceled, the will keep running. /// + /// + /// When stopping, the processor will update the ownership of partitions that it was responsible for processing and clean up network resources used for communication with + /// the Event Hubs service. As a result, this method will perform network I/O and may need to wait for partition reads that were active to complete. + /// + /// Due to service calls and network latency, an invocation of this method may take slightly longer than the specified or + /// if the wait time was not configured, the duration of the of the configured retry policy. + /// + /// public virtual async Task StopProcessingAsync(CancellationToken cancellationToken = default) => await StopProcessingInternalAsync(true, cancellationToken).ConfigureAwait(false); @@ -381,6 +396,14 @@ public virtual async Task StopProcessingAsync(CancellationToken cancellationToke /// /// A instance to signal the request to cancel the stop operation. If the operation is successfully canceled, the will keep running. /// + /// + /// When stopping, the processor will update the ownership of partitions that it was responsible for processing and clean up network resources used for communication with + /// the Event Hubs service. As a result, this method will perform network I/O and may need to wait for partition reads that were active to complete. + /// + /// Due to service calls and network latency, an invocation of this method may take slightly longer than the specified or + /// if the wait time was not configured, the duration of the of the configured retry policy. + /// + /// public virtual void StopProcessing(CancellationToken cancellationToken = default) => StopProcessingInternalAsync(false, cancellationToken).EnsureCompleted(); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/PartitionReceiver.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/PartitionReceiver.cs index a832bb968565a..6946fdc17bd91 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/PartitionReceiver.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/PartitionReceiver.cs @@ -22,9 +22,14 @@ namespace Azure.Messaging.EventHubs.Primitives /// /// /// - /// It is recommended that the EventProcessorClient or + /// It is recommended that the EventProcessorClient or /// be used for reading and processing events for the majority of scenarios. The partition receiver is - /// intended to enable scenarios with special needs which require more direct control. + /// intended to enable scenarios with special needs which require more direct control. + /// + /// The is safe to cache and use for the lifetime of an application, and that is best practice when the application + /// reads events regularly or semi-regularly. The receiver holds responsibility for efficient resource management, working to keep resource usage low during + /// periods of inactivity and manage health during periods of higher use. Calling either the or + /// method as the application is shutting down will ensure that network resources and other unmanaged objects are properly cleaned up. /// /// /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs index 914b784a696f5..5a84a3301b063 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs @@ -26,13 +26,24 @@ namespace Azure.Messaging.EventHubs.Producer /// /// /// - /// Allowing automatic routing of partitions is recommended when: - /// - The sending of events needs to be highly available. - /// - The event data should be evenly distributed among all available partitions. + /// + /// Allowing automatic routing of partitions is recommended when: + /// - The sending of events needs to be highly available. + /// - The event data should be evenly distributed among all available partitions. + /// /// - /// If no partition is specified, the following rules are used for automatically selecting one: - /// 1) Distribute the events equally amongst all available partitions using a round-robin approach. - /// 2) If a partition becomes unavailable, the Event Hubs service will automatically detect it and forward the message to another available partition. + /// + /// If no partition is specified, the following rules are used for automatically selecting one: + /// 1) Distribute the events equally amongst all available partitions using a round-robin approach. + /// 2) If a partition becomes unavailable, the Event Hubs service will automatically detect it and forward the message to another available partition. + /// + /// + /// + /// The is safe to cache and use for the lifetime of an application, and that is best practice when the application + /// publishes events regularly or semi-regularly. The producer holds responsibility for efficient resource management, working to keep resource usage low during + /// periods of inactivity and manage health during periods of higher use. Calling either the or + /// method as the application is shutting down will ensure that network resources and other unmanaged objects are properly cleaned up. + /// /// /// public class EventHubProducerClient : IAsyncDisposable diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/IdempotentPublishingLiveTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/IdempotentPublishingLiveTests.cs index 6894d6c487979..de7d0d2e605f8 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/IdempotentPublishingLiveTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/IdempotentPublishingLiveTests.cs @@ -542,5 +542,50 @@ public async Task ProducerCanInitializeWithPartialPartitionOptions() Assert.That(async () => await producer.SendAsync(EventGenerator.CreateEvents(10), new SendEventOptions { PartitionId = partition }, cancellationSource.Token), Throws.Nothing); } } + + /// + /// Verifies that the is able to + /// perform operations when the idempotent publishing feature is enabled. + /// + /// + [Test] + public async Task ProducerIsRejectedWithPartitionOptionsForInvalidState() + { + await using (EventHubScope scope = await EventHubScope.CreateAsync(2)) + { + var connectionString = EventHubsTestEnvironment.Instance.BuildConnectionStringForEventHub(scope.EventHubName); + var options = new EventHubProducerClientOptions { EnableIdempotentPartitions = true }; + + var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + + var partition = default(string); + var partitionProperties = default(PartitionPublishingProperties); + + // Create a producer for a small scope that will Send some events and read the properties. + + await using (var initialProducer = new EventHubProducerClient(connectionString, options)) + { + partition = (await initialProducer.GetPartitionIdsAsync(cancellationSource.Token)).Last(); + + await initialProducer.SendAsync(EventGenerator.CreateEvents(10), new SendEventOptions { PartitionId = partition }, cancellationSource.Token); + partitionProperties = await initialProducer.GetPartitionPublishingPropertiesAsync(partition); + } + + // Create a new producer using the previously read properties to set options for the partition. + + options.PartitionOptions.Add(partition, new PartitionPublishingOptions + { + ProducerGroupId = partitionProperties.ProducerGroupId, + OwnerLevel = partitionProperties.OwnerLevel, + StartingSequenceNumber = (partitionProperties.LastPublishedSequenceNumber - 5) + }); + + await using var producer = new EventHubProducerClient(connectionString, options); + + Assert.That(async () => await producer.SendAsync(EventGenerator.CreateEvents(10), new SendEventOptions { PartitionId = partition }, cancellationSource.Token), + Throws.InstanceOf().And.Property("Reason").EqualTo(EventHubsException.FailureReason.InvalidClientState)); + } + } } }