diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs b/sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs index 143dadce3e888..247e133431ed6 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/api/Azure.Messaging.EventHubs.netstandard2.0.cs @@ -464,9 +464,10 @@ public EventHubProducerClient(string connectionString, string eventHubName, Azur public override int GetHashCode() { throw null; } public virtual System.Threading.Tasks.Task GetPartitionIdsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public virtual System.Threading.Tasks.Task GetPartitionPropertiesAsync(string partitionId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public virtual System.Threading.Tasks.Task ReadPartitionPublishingPropertiesAsync(string partitionId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public virtual System.Threading.Tasks.Task SendAsync(Azure.Messaging.EventHubs.Producer.EventDataBatch eventBatch, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public virtual System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IEnumerable eventSet, Azure.Messaging.EventHubs.Producer.SendEventOptions options, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } - public virtual System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IEnumerable eventBatch, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public virtual System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IEnumerable eventSet, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] public override string ToString() { throw null; } } @@ -490,6 +491,12 @@ public PartitionPublishingOptions() { } public short? OwnerLevel { get { throw null; } set { } } public long? ProducerGroupId { get { throw null; } set { } } public int? StartingSequenceNumber { get { throw null; } set { } } + [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] + public override bool Equals(object obj) { throw null; } + [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] + public override int GetHashCode() { throw null; } + [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] + public override string ToString() { throw null; } } public partial class PartitionPublishingProperties { @@ -498,6 +505,12 @@ protected internal PartitionPublishingProperties(bool isIdempotentPublishingEnab public int? LastPublishedSequenceNumber { get { throw null; } } public short? OwnerLevel { get { throw null; } } public long? ProducerGroupId { get { throw null; } } + [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] + public override bool Equals(object obj) { throw null; } + [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] + public override int GetHashCode() { throw null; } + [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] + public override string ToString() { throw null; } } public partial class SendEventOptions { diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpEventBatch.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpEventBatch.cs index e39fdc50fc691..b7da325e8e767 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpEventBatch.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpEventBatch.cs @@ -52,16 +52,17 @@ internal class AmqpEventBatch : TransportEventBatch public override long SizeInBytes => _sizeBytes; /// - /// The publishing sequence number assigned to the first event in the batch at the time - /// the batch was successfully published. + /// A flag that indicates whether space should be reserved for a publishing + /// sequence number when the event size is measured. If false, a sequence + /// number is not used in size calculations. /// /// /// - /// The starting published sequence number is only populated and relevant when certain features + /// The sequence number is only populated and relevant when certain features /// of the producer are enabled. For example, it is used by idempotent publishing. /// /// - public override int? StartingPublishedSequenceNumber { get; set; } + public override bool ReserveSpaceForSequenceNumber { get; } /// /// The count of events contained in the batch. @@ -93,9 +94,11 @@ internal class AmqpEventBatch : TransportEventBatch /// /// The converter to use for translating into the corresponding AMQP message. /// The set of options to apply to the batch. + /// A flag that indicates whether space should be reserved for a publishing sequence number when the event size is measured. If false, a sequence number is not used in size calculations. /// public AmqpEventBatch(AmqpMessageConverter messageConverter, - CreateBatchOptions options) + CreateBatchOptions options, + bool reserveSpaceForSequenceNumber) { Argument.AssertNotNull(messageConverter, nameof(messageConverter)); Argument.AssertNotNull(options, nameof(options)); @@ -104,13 +107,13 @@ public AmqpEventBatch(AmqpMessageConverter messageConverter, MessageConverter = messageConverter; Options = options; MaximumSizeInBytes = options.MaximumSizeInBytes.Value; + ReserveSpaceForSequenceNumber = reserveSpaceForSequenceNumber; // Initialize the size by reserving space for the batch envelope. using AmqpMessage envelope = messageConverter.CreateBatchFromEvents(Enumerable.Empty(), options.PartitionKey); ReservedSize = envelope.SerializedMessageSize; _sizeBytes = ReservedSize; - } /// @@ -127,7 +130,12 @@ public override bool TryAdd(EventData eventData) Argument.AssertNotNull(eventData, nameof(eventData)); Argument.AssertNotDisposed(_disposed, nameof(EventDataBatch)); - AmqpMessage eventMessage = MessageConverter.CreateMessageFromEvent(eventData, Options.PartitionKey); + if (ReserveSpaceForSequenceNumber) + { + eventData.PendingPublishSequenceNumber = int.MaxValue; + } + + var eventMessage = MessageConverter.CreateMessageFromEvent(eventData, Options.PartitionKey); try { @@ -152,6 +160,7 @@ public override bool TryAdd(EventData eventData) } finally { + eventData.PendingPublishSequenceNumber = default; eventMessage?.Dispose(); } } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs index 649fd1ef25b7e..1f6e499fa1524 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.Globalization; +using System.Runtime.CompilerServices; using System.Runtime.ExceptionServices; using System.Threading; using System.Threading.Tasks; @@ -277,9 +278,9 @@ public override async ValueTask CreateBatchAsync(CreateBatc // default to the maximum size allowed by the link. options.MaximumSizeInBytes ??= MaximumMessageSize; - Argument.AssertInRange(options.MaximumSizeInBytes.Value, EventHubProducerClient.MinimumBatchSizeLimit, MaximumMessageSize.Value, nameof(options.MaximumSizeInBytes)); - return new AmqpEventBatch(MessageConverter, options); + + return new AmqpEventBatch(MessageConverter, options, IsSequenceMeasurementRequired(ActiveFeatures)); } /// @@ -574,6 +575,18 @@ protected virtual async Task CreateLinkAndEnsureProducerStateAs return link; } + /// + /// Determines if measuring a sequence number is required to accurately calculate + /// the size of an event. + /// + /// + /// The set of features which are active for the producer. + /// + /// true if a sequence number should be measured; otherwise, false. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static bool IsSequenceMeasurementRequired(TransportProducerFeatures activeFeatures) => ((activeFeatures & TransportProducerFeatures.IdempotentPublishing) != 0); + /// /// Uses the minimum value of the two specified instances. /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportEventBatch.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportEventBatch.cs index c3a098d5af5b0..168d315736b12 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportEventBatch.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Core/TransportEventBatch.cs @@ -31,23 +31,17 @@ internal abstract class TransportEventBatch : IDisposable public abstract long SizeInBytes { get; } /// - /// The publishing sequence number assigned to the first event in the batch at the time - /// the batch was successfully published. + /// A flag that indicates whether space should be reserved for a publishing + /// sequence number when the event size is measured. If false, a sequence + /// number is not used in size calculations. /// /// - /// - /// The sequence number of the first event in the batch, if the batch was successfully - /// published by a sequence-aware producer. If the producer was not configured to apply - /// sequence numbering or if the batch has not yet been successfully published, this member - /// will be null. - /// - /// /// - /// The starting published sequence number is only populated and relevant when certain features + /// The sequence number is only populated and relevant when certain features /// of the producer are enabled. For example, it is used by idempotent publishing. /// /// - public abstract int? StartingPublishedSequenceNumber { get; set; } + public abstract bool ReserveSpaceForSequenceNumber { get; } /// /// The count of events contained in the batch. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/EventData.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/EventData.cs index e1289e06e181b..cf4c0dae67a31 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/EventData.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/EventData.cs @@ -323,7 +323,7 @@ protected EventData(ReadOnlyMemory eventBody, /// Transitions the pending publishing sequence number to the published sequence number. /// /// - internal void CommitPublishingSequenceNumber() + internal void CommitPublishingState() { PublishedSequenceNumber = PendingPublishSequenceNumber; PendingPublishSequenceNumber = default; diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/CreateBatchOptions.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/CreateBatchOptions.cs old mode 100755 new mode 100644 index e5eed9223b67f..703168c855bbf --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/CreateBatchOptions.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/CreateBatchOptions.cs @@ -40,20 +40,6 @@ public long? MaximumSizeInBytes } } - /// - /// Creates a new copy of the current , cloning its attributes into a new instance. - /// - /// - /// A new copy of . - /// - internal CreateBatchOptions Clone() => - new CreateBatchOptions - { - PartitionId = PartitionId, - PartitionKey = PartitionKey, - _maximumSizeInBytes = MaximumSizeInBytes - }; - /// /// Determines whether the specified is equal to this instance. /// @@ -82,5 +68,19 @@ internal CreateBatchOptions Clone() => /// [EditorBrowsable(EditorBrowsableState.Never)] public override string ToString() => base.ToString(); + + /// + /// Creates a new copy of the current , cloning its attributes into a new instance. + /// + /// + /// A new copy of . + /// + internal new CreateBatchOptions Clone() => + new CreateBatchOptions + { + PartitionId = PartitionId, + PartitionKey = PartitionKey, + _maximumSizeInBytes = _maximumSizeInBytes + }; } } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventDataBatch.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventDataBatch.cs index f1b8166706943..60bc8f766772b 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventDataBatch.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventDataBatch.cs @@ -58,11 +58,7 @@ public sealed class EventDataBatch : IDisposable /// of the producer are enabled. For example, it is used by idempotent publishing. /// /// - public int? StartingPublishedSequenceNumber - { - get => InnerBatch.StartingPublishedSequenceNumber; - internal set => InnerBatch.StartingPublishedSequenceNumber = value; - } + public int? StartingPublishedSequenceNumber { get; internal set; } /// /// The count of events contained in the batch. diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs index ce99b97f046ad..3ce07f263c725 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/EventHubProducerClient.cs @@ -46,6 +46,9 @@ public class EventHubProducerClient : IAsyncDisposable /// The set of default publishing options to use when no specific options are requested. private static readonly SendEventOptions DefaultSendOptions = new SendEventOptions(); + /// The set of default publishing options to use when no specific options are requested. + private static readonly CreateBatchOptions DefaultCreateBatchOptions = new CreateBatchOptions(); + /// Sets how long a dedicated would sit in memory before its would remove and close it. private static readonly TimeSpan PartitionProducerLifespan = TimeSpan.FromMinutes(5); @@ -391,6 +394,44 @@ public virtual async Task GetPartitionPropertiesAsync(strin return await Connection.GetPartitionPropertiesAsync(partitionId, RetryPolicy, cancellationToken).ConfigureAwait(false); } + /// + /// A set of information about the state of publishing for a partition, as observed by the . This + /// data can always be read, but will only be populated with information relevant to the features which are active for the producer client. + /// + /// + /// The unique identifier of a partition associated with the Event Hub. + /// An optional instance to signal the request to cancel the operation. + /// + /// The set of information about the publishing state of the requested partition, within the context of this producer. + /// + public virtual async Task ReadPartitionPublishingPropertiesAsync(string partitionId, + CancellationToken cancellationToken = default) + { + Argument.AssertNotClosed(IsClosed, nameof(EventHubProducerClient)); + Argument.AssertNotNullOrEmpty(partitionId, nameof(partitionId)); + + var partitionState = PartitionState.GetOrAdd(partitionId, new PartitionPublishingState(partitionId)); + cancellationToken.ThrowIfCancellationRequested(); + + try + { + await partitionState.PublishingGuard.WaitAsync(cancellationToken).ConfigureAwait(false); + + if (!partitionState.IsInitialized) + { + + cancellationToken.ThrowIfCancellationRequested(); + await InitializePartitionStateAsync(partitionState, cancellationToken).ConfigureAwait(false); + } + + return CreatePublishingPropertiesFromPartitionState(Options, partitionState); + } + finally + { + partitionState.PublishingGuard.Release(); + } + } + /// /// Sends an event to the associated Event Hub using a batched approach. If the size of the event exceeds the /// maximum size of a single batch, an exception will be triggered and the send will fail. @@ -442,7 +483,7 @@ internal virtual async Task SendAsync(EventData eventData, /// validated until this method is invoked. The call will fail if the size of the specified set of events exceeds the maximum allowable size of a single batch. /// /// - /// The set of event data to send. + /// The set of event data to send. /// An optional instance to signal the request to cancel the operation. /// /// A task to be resolved on when the operation has completed. @@ -456,8 +497,8 @@ internal virtual async Task SendAsync(EventData eventData, /// /// /// - public virtual async Task SendAsync(IEnumerable eventBatch, - CancellationToken cancellationToken = default) => await SendAsync(eventBatch, null, cancellationToken).ConfigureAwait(false); + public virtual async Task SendAsync(IEnumerable eventSet, + CancellationToken cancellationToken = default) => await SendAsync(eventSet, null, cancellationToken).ConfigureAwait(false); /// /// Sends a set of events to the associated Event Hub using a batched approach. Because the batch is implicitly created, the size of the event set is not @@ -483,7 +524,7 @@ public virtual async Task SendAsync(IEnumerable eventSet, SendEventOptions options, CancellationToken cancellationToken = default) { - options ??= DefaultSendOptions; + options = options?.Clone() ?? DefaultSendOptions; Argument.AssertNotNull(eventSet, nameof(eventSet)); AssertSinglePartitionReference(options.PartitionId, options.PartitionKey); @@ -573,7 +614,7 @@ public virtual async Task SendAsync(EventDataBatch eventBatch, public virtual async ValueTask CreateBatchAsync(CreateBatchOptions options, CancellationToken cancellationToken = default) { - options = options?.Clone() ?? new CreateBatchOptions(); + options = options?.Clone() ?? DefaultCreateBatchOptions; AssertSinglePartitionReference(options.PartitionId, options.PartitionKey); TransportEventBatch transportBatch = await PartitionProducerPool.EventHubProducer.CreateBatchAsync(options, cancellationToken).ConfigureAwait(false); @@ -862,7 +903,7 @@ private async Task SendIdempotentAsync(IReadOnlyList eventSet, foreach (var eventData in eventSet) { - eventData.CommitPublishingSequenceNumber(); + eventData.CommitPublishingState(); } } catch @@ -1010,27 +1051,50 @@ private async Task InitializePartitionStateAsync(PartitionPublishingState partit return; } - var producer = PartitionProducerPool.GetPooledProducer(partitionState.PartitionId, PartitionProducerLifespan); - var properties = await producer.TransportProducer.ReadInitializationPublishingPropertiesAsync(cancellationToken).ConfigureAwait(false); + var attempts = 0; + var pooledProducer = PartitionProducerPool.GetPooledProducer(partitionState.PartitionId, PartitionProducerLifespan); - partitionState.ProducerGroupId = properties.ProducerGroupId; - partitionState.OwnerLevel = properties.OwnerLevel; - partitionState.LastPublishedSequenceNumber = properties.LastPublishedSequenceNumber; + while (!cancellationToken.IsCancellationRequested) + { + try + { + await using var _ = pooledProducer.ConfigureAwait(false); + var properties = await pooledProducer.TransportProducer.ReadInitializationPublishingPropertiesAsync(cancellationToken).ConfigureAwait(false); - // If the state was not initialized and no exception has occurred, then the service is behaving - // unexpectedly and the client should be considered invalid. + partitionState.ProducerGroupId = properties.ProducerGroupId; + partitionState.OwnerLevel = properties.OwnerLevel; + partitionState.LastPublishedSequenceNumber = properties.LastPublishedSequenceNumber; - if (!partitionState.IsInitialized) - { - throw new EventHubsException(false, EventHubName, EventHubsException.FailureReason.InvalidClientState); + // If the state was not initialized and no exception has occurred, then the service is behaving + // unexpectedly and the client should be considered invalid. + + if (!partitionState.IsInitialized) + { + throw new EventHubsException(false, EventHubName, EventHubsException.FailureReason.InvalidClientState); + } + + EventHubsEventSource.Log.IdempotentPublishInitializeState( + EventHubName, + partitionState.PartitionId, + partitionState.ProducerGroupId.Value, + partitionState.OwnerLevel.Value, + partitionState.LastPublishedSequenceNumber.Value); + + return; + } + catch (EventHubsException eventHubException) + when (eventHubException.Reason == EventHubsException.FailureReason.ClientClosed && ShouldRecreateProducer(pooledProducer.TransportProducer, partitionState.PartitionId)) + { + if (++attempts >= MaximumCreateProducerAttempts) + { + throw; + } + + pooledProducer = PartitionProducerPool.GetPooledProducer(partitionState.PartitionId, PartitionProducerLifespan); + } } - EventHubsEventSource.Log.IdempotentPublishInitializeState( - EventHubName, - partitionState.PartitionId, - partitionState.ProducerGroupId.Value, - partitionState.OwnerLevel.Value, - partitionState.LastPublishedSequenceNumber.Value); + throw new TaskCanceledException(); } /// @@ -1186,5 +1250,23 @@ private static int NextSequence(int currentSequence) /// [MethodImpl(MethodImplOptions.AggressiveInlining)] private static bool RequiresStatefulPartitions(EventHubProducerClientOptions options) => options.EnableIdempotentPartitions; + + /// + /// Creates a set of publishing properties based on the configuration of a producer and the current + /// partition publishing state. + /// + /// + /// The options that describe the configuration of the producer. + /// The current state of publishing for the partition, as observed by the producer.. + /// + /// The set of properties that represents the current state. + /// + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static PartitionPublishingProperties CreatePublishingPropertiesFromPartitionState(EventHubProducerClientOptions options, + PartitionPublishingState state) => + new PartitionPublishingProperties(options.EnableIdempotentPartitions, + state.ProducerGroupId, + state.OwnerLevel, + state.LastPublishedSequenceNumber); } } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingOptions.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingOptions.cs index 5b02ef5891356..88537813d0e0f 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingOptions.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingOptions.cs @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -using Azure.Messaging.EventHubs.Core; +using System.ComponentModel; namespace Azure.Messaging.EventHubs.Producer { @@ -79,6 +79,35 @@ public class PartitionPublishingOptions /// public int? StartingSequenceNumber { get; set; } + /// + /// Determines whether the specified is equal to this instance. + /// + /// + /// The to compare with this instance. + /// + /// true if the specified is equal to this instance; otherwise, false. + /// + [EditorBrowsable(EditorBrowsableState.Never)] + public override bool Equals(object obj) => base.Equals(obj); + + /// + /// Returns a hash code for this instance. + /// + /// + /// A hash code for this instance, suitable for use in hashing algorithms and data structures like a hash table. + /// + [EditorBrowsable(EditorBrowsableState.Never)] + public override int GetHashCode() => base.GetHashCode(); + + /// + /// Converts the instance to string representation. + /// + /// + /// A that represents this instance. + /// + [EditorBrowsable(EditorBrowsableState.Never)] + public override string ToString() => base.ToString(); + /// /// Creates a new copy of the current , cloning its attributes into a new instance. /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingProperties.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingProperties.cs index 31383b3ef499d..2f605aee18cd3 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingProperties.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/PartitionPublishingProperties.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. +using System.ComponentModel; + namespace Azure.Messaging.EventHubs.Producer { /// @@ -36,6 +38,35 @@ public class PartitionPublishingProperties /// public int? LastPublishedSequenceNumber { get; } + /// + /// Determines whether the specified is equal to this instance. + /// + /// + /// The to compare with this instance. + /// + /// true if the specified is equal to this instance; otherwise, false. + /// + [EditorBrowsable(EditorBrowsableState.Never)] + public override bool Equals(object obj) => base.Equals(obj); + + /// + /// Returns a hash code for this instance. + /// + /// + /// A hash code for this instance, suitable for use in hashing algorithms and data structures like a hash table. + /// + [EditorBrowsable(EditorBrowsableState.Never)] + public override int GetHashCode() => base.GetHashCode(); + + /// + /// Converts the instance to string representation. + /// + /// + /// A that represents this instance. + /// + [EditorBrowsable(EditorBrowsableState.Never)] + public override string ToString() => base.ToString(); + /// /// Initializes a new instance of the class. /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/SendEventOptions.cs b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/SendEventOptions.cs old mode 100755 new mode 100644 index f8adc582e2870..8d7fabefa12e0 --- a/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/SendEventOptions.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/src/Producer/SendEventOptions.cs @@ -114,5 +114,18 @@ internal SendEventOptions(string partitionId, /// [EditorBrowsable(EditorBrowsableState.Never)] public override string ToString() => base.ToString(); + + /// + /// Creates a new copy of the current , cloning its attributes into a new instance. + /// + /// + /// A new copy of . + /// + internal SendEventOptions Clone() => + new SendEventOptions + { + PartitionId = PartitionId, + PartitionKey = PartitionKey + }; } } diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpEventBatchTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpEventBatchTests.cs old mode 100755 new mode 100644 index 6e4d0fcd60c1d..e8a621c1081d0 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpEventBatchTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Amqp/AmqpEventBatchTests.cs @@ -18,6 +18,7 @@ namespace Azure.Messaging.EventHubs.Tests /// The suite of tests for the /// class. /// + /// [TestFixture] public class AmqpEventBatchTests { @@ -28,7 +29,7 @@ public class AmqpEventBatchTests [Test] public void ConstructorValidatesTheMessageConverter() { - Assert.That(() => new AmqpEventBatch(null, new CreateBatchOptions { MaximumSizeInBytes = 31 }), Throws.ArgumentNullException); + Assert.That(() => new AmqpEventBatch(null, new CreateBatchOptions { MaximumSizeInBytes = 31 }, default), Throws.ArgumentNullException); } /// @@ -43,7 +44,7 @@ public void ConstructorValidatesTheOptions() CreateBatchFromEventsHandler = (_e, _p) => Mock.Of() }; - Assert.That(() => new AmqpEventBatch(mockConverter, null), Throws.ArgumentNullException); + Assert.That(() => new AmqpEventBatch(mockConverter, null, default), Throws.ArgumentNullException); } /// @@ -58,7 +59,7 @@ public void ConstructorValidatesTheMaximumSize() CreateBatchFromEventsHandler = (_e, _p) => Mock.Of() }; - Assert.That(() => new AmqpEventBatch(mockConverter, new CreateBatchOptions { MaximumSizeInBytes = null }), Throws.ArgumentNullException); + Assert.That(() => new AmqpEventBatch(mockConverter, new CreateBatchOptions { MaximumSizeInBytes = null }, default), Throws.ArgumentNullException); } /// @@ -76,7 +77,7 @@ public void ConstructorSetsTheMaximumSize() CreateBatchFromEventsHandler = (_e, _p) => Mock.Of() }; - var batch = new AmqpEventBatch(mockConverter, options); + var batch = new AmqpEventBatch(mockConverter, options, default); Assert.That(batch.MaximumSizeInBytes, Is.EqualTo(maximumSize)); } @@ -98,7 +99,7 @@ public void ConstructorInitializesTheSizeToABatchEnvelope() .Setup(message => message.SerializedMessageSize) .Returns(batchEnvelopeSize); - var batch = new AmqpEventBatch(mockConverter, new CreateBatchOptions { MaximumSizeInBytes = 27 }); + var batch = new AmqpEventBatch(mockConverter, new CreateBatchOptions { MaximumSizeInBytes = 27 }, default); Assert.That(batch.SizeInBytes, Is.EqualTo(batchEnvelopeSize)); } @@ -115,7 +116,7 @@ public void TryAddValidatesTheEvent() CreateBatchFromEventsHandler = (_e, _p) => Mock.Of() }; - var batch = new AmqpEventBatch(mockConverter, new CreateBatchOptions { MaximumSizeInBytes = 25 }); + var batch = new AmqpEventBatch(mockConverter, new CreateBatchOptions { MaximumSizeInBytes = 25 }, default); Assert.That(() => batch.TryAdd(null), Throws.ArgumentNullException); } @@ -132,12 +133,98 @@ public void TryAddValidatesNotDisposed() CreateBatchFromEventsHandler = (_e, _p) => Mock.Of() }; - var batch = new AmqpEventBatch(mockConverter, new CreateBatchOptions { MaximumSizeInBytes = 25 }); + var batch = new AmqpEventBatch(mockConverter, new CreateBatchOptions { MaximumSizeInBytes = 25 }, default); batch.Dispose(); Assert.That(() => batch.TryAdd(new EventData(new byte[0])), Throws.InstanceOf()); } + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + [TestCase(true)] + [TestCase(false)] + public void TryAddHonorsTheMeasureSequenceNumber(bool measureSequenceNumber) + { + var maximumSize = 50; + var batchEnvelopeSize = 0; + var capturedSequence = default(int?); + var options = new CreateBatchOptions { MaximumSizeInBytes = maximumSize }; + var mockEnvelope = new Mock(); + var mockEvent = new Mock(); + + var mockConverter = new InjectableMockConverter + { + CreateBatchFromEventsHandler = (_e, _p) => mockEnvelope.Object, + + CreateMessageFromEventHandler = (_e, _p) => + { + capturedSequence = _e.PendingPublishSequenceNumber; + return mockEvent.Object; + } + }; + + mockEnvelope + .Setup(message => message.SerializedMessageSize) + .Returns(batchEnvelopeSize); + + mockEvent + .Setup(message => message.SerializedMessageSize) + .Returns(maximumSize); + + var batch = new AmqpEventBatch(mockConverter, options, measureSequenceNumber); + batch.TryAdd(EventGenerator.CreateEvents(1).Single()); + + var expectationConstraint = (measureSequenceNumber) + ? Is.Not.Null + : Is.Null; + + Assert.That(capturedSequence, expectationConstraint); + } + + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + public void TryAddRemovesTheMeasureSequenceNumber() + { + var maximumSize = 50; + var batchEnvelopeSize = 0; + var capturedEvent = default(EventData); + var options = new CreateBatchOptions { MaximumSizeInBytes = maximumSize }; + var mockEnvelope = new Mock(); + var mockEvent = new Mock(); + + var mockConverter = new InjectableMockConverter + { + CreateBatchFromEventsHandler = (_e, _p) => mockEnvelope.Object, + + CreateMessageFromEventHandler = (_e, _p) => + { + capturedEvent = _e; + return mockEvent.Object; + } + }; + + mockEnvelope + .Setup(message => message.SerializedMessageSize) + .Returns(batchEnvelopeSize); + + mockEvent + .Setup(message => message.SerializedMessageSize) + .Returns(maximumSize); + + var batch = new AmqpEventBatch(mockConverter, options, true); + batch.TryAdd(EventGenerator.CreateEvents(1).Single()); + + Assert.That(capturedEvent.PublishedSequenceNumber, Is.Null); + } + /// /// Verifies functionality of the /// method. @@ -165,7 +252,7 @@ public void TryAddDoesNotAcceptAnEventBiggerThanTheMaximumSize() .Setup(message => message.SerializedMessageSize) .Returns(maximumSize); - var batch = new AmqpEventBatch(mockConverter, options); + var batch = new AmqpEventBatch(mockConverter, options, default); Assert.That(batch.TryAdd(new EventData(new byte[0])), Is.False, "An event of the maximum size is too large due to the reserved overhead."); } @@ -197,7 +284,7 @@ public void TryAddAcceptsAnEventSmallerThanTheMaximumSize() .Setup(message => message.SerializedMessageSize) .Returns(eventMessageSize); - var batch = new AmqpEventBatch(mockConverter, options); + var batch = new AmqpEventBatch(mockConverter, options, default); Assert.That(batch.TryAdd(new EventData(new byte[0])), Is.True); } @@ -239,7 +326,7 @@ public void TryAddAcceptEventsUntilTheMaximumSizeIsReached() eventMessages[index] = message.Object; } - var batch = new AmqpEventBatch(mockConverter, options); + var batch = new AmqpEventBatch(mockConverter, options, default); for (var index = 0; index < eventMessages.Length; ++index) { @@ -283,7 +370,7 @@ public void TryAddSetsTheCount() // Add the messages to the batch; all should be accepted. - var batch = new AmqpEventBatch(mockConverter, options); + var batch = new AmqpEventBatch(mockConverter, options, default); for (var index = 0; index < eventMessages.Length; ++index) { @@ -312,7 +399,7 @@ public void AsEnumerableValidatesTheTypeParameter() .Setup(message => message.SerializedMessageSize) .Returns(0); - var batch = new AmqpEventBatch(mockConverter, options); + var batch = new AmqpEventBatch(mockConverter, options, default); Assert.That(() => batch.AsEnumerable(), Throws.InstanceOf()); } @@ -347,7 +434,7 @@ public void AsEnumerableReturnsTheEvents() eventMessages[index] = message.Object; } - var batch = new AmqpEventBatch(mockConverter, options); + var batch = new AmqpEventBatch(mockConverter, options, default); for (var index = 0; index < eventMessages.Length; ++index) { @@ -396,7 +483,7 @@ public void DisposeCleansUpBatchMessages() // Add the messages to the batch; all should be accepted. - var batch = new AmqpEventBatch(mockConverter, options); + var batch = new AmqpEventBatch(mockConverter, options, default); for (var index = 0; index < eventMessages.Length; ++index) { @@ -442,7 +529,7 @@ public void ClearClearsTheCount() // Add the messages to the batch; all should be accepted. - var batch = new AmqpEventBatch(mockConverter, options); + var batch = new AmqpEventBatch(mockConverter, options, default); for (var index = 0; index < eventMessages.Length; ++index) { @@ -473,7 +560,7 @@ public void ClearClearsTheSize() .Setup(message => message.SerializedMessageSize) .Returns(9959); - var batch = new AmqpEventBatch(mockConverter, new CreateBatchOptions { MaximumSizeInBytes = 99 }); + var batch = new AmqpEventBatch(mockConverter, new CreateBatchOptions { MaximumSizeInBytes = 99 }, default); batch.Clear(); Assert.That(batch.SizeInBytes, Is.EqualTo(GetReservedSize(batch))); @@ -508,7 +595,7 @@ public void DisposeClearsTheCount() // Add the messages to the batch; all should be accepted. - var batch = new AmqpEventBatch(mockConverter, options); + var batch = new AmqpEventBatch(mockConverter, options, default); for (var index = 0; index < eventMessages.Length; ++index) { @@ -539,7 +626,7 @@ public void DisposeClearsTheSize() .Setup(message => message.SerializedMessageSize) .Returns(9959); - var batch = new AmqpEventBatch(mockConverter, new CreateBatchOptions { MaximumSizeInBytes = 99 }); + var batch = new AmqpEventBatch(mockConverter, new CreateBatchOptions { MaximumSizeInBytes = 99 }, default); batch.Dispose(); Assert.That(batch.SizeInBytes, Is.EqualTo(GetReservedSize(batch))); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventDataTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventDataTests.cs index 5bd82fe1e2023..5aa9065d480bc 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventDataTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Core/EventDataTests.cs @@ -55,7 +55,7 @@ public void BodyAsStreamAllowsAnEmptyBody() } /// - /// Verifies functionality of the + /// Verifies functionality of the /// property. /// /// @@ -71,7 +71,7 @@ public void CommitPublishingSequenceNumberTransitionsState() Assert.That(eventData.PendingPublishSequenceNumber, Is.EqualTo(expectedSequence), "The pending sequence number should have been set."); - eventData.CommitPublishingSequenceNumber(); + eventData.CommitPublishingState(); Assert.That(eventData.PublishedSequenceNumber, Is.EqualTo(expectedSequence), "The published sequence number should have been set."); Assert.That(eventData.PendingPublishSequenceNumber, Is.EqualTo(default(int?)), "The pending sequence number should have been cleared."); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventDataBatchTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventDataBatchTests.cs index a52516cf577dd..fc900112c0ca4 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventDataBatchTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventDataBatchTests.cs @@ -96,7 +96,6 @@ public void PropertyAccessIsDelegatedToTheTransportClient() Assert.That(batch.MaximumSizeInBytes, Is.EqualTo(mockBatch.MaximumSizeInBytes), "The maximum size should have been delegated."); Assert.That(batch.SizeInBytes, Is.EqualTo(mockBatch.SizeInBytes), "The size should have been delegated."); Assert.That(batch.Count, Is.EqualTo(mockBatch.Count), "The count should have been delegated."); - Assert.That(batch.StartingPublishedSequenceNumber, Is.EqualTo(mockBatch.StartingPublishedSequenceNumber), "The starting published sequence number should have been delegated."); } /// @@ -276,7 +275,7 @@ private class MockTransportBatch : TransportEventBatch public override long SizeInBytes { get; } = 100; - public override int? StartingPublishedSequenceNumber { get; set; } = 300; + public override bool ReserveSpaceForSequenceNumber { get; } = true; public override int Count { get; } = 400; diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs index 101335151e29a..db3240f12ba24 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/EventHubProducerClientTests.cs @@ -313,6 +313,138 @@ public async Task GetPartitionPropertiesUsesTheRetryPolicy() Assert.That(mockConnection.GetPartitionPropertiesInvokedWith, Is.SameAs(retryPolicy), "Either the call was not delegated or the retry policy was not passed."); } + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + public async Task ReadPartitionPublishingPropertiesAsyncValidatesNotClosed() + { + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + + var connectionString = "Endpoint=sb://somehost.com;SharedAccessKeyName=ABC;SharedAccessKey=123;EntityPath=somehub"; + var producer = new EventHubProducerClient(connectionString); + await producer.CloseAsync(cancellationSource.Token); + + Assert.That(async () => await producer.ReadPartitionPublishingPropertiesAsync("0", cancellationSource.Token), Throws.InstanceOf()); + } + + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + [TestCase(null)] + [TestCase("")] + public void ReadPartitionPublishingPropertiesAsyncValidatesThePartition(string partitionId) + { + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + + var producer = new EventHubProducerClient(new MockConnection()); + Assert.That(async () => await producer.ReadPartitionPublishingPropertiesAsync(partitionId, cancellationSource.Token), Throws.InstanceOf()); + } + + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + public async Task ReadPartitionPublishingPropertiesAsyncInitializesPartitionState() + { + var expectedPartition = "5"; + var expectedProperties = new PartitionPublishingProperties(true, 123, 456, 798); + var mockTransport = new Mock(); + var connection = new MockConnection(() => mockTransport.Object); + + var clientOptions = new EventHubProducerClientOptions + { + EnableIdempotentPartitions = true + }; + + clientOptions.PartitionOptions.Add("0", new PartitionPublishingOptions()); + clientOptions.PartitionOptions.Add("1", new PartitionPublishingOptions()); + + clientOptions.PartitionOptions.Add(expectedPartition, new PartitionPublishingOptions + { + ProducerGroupId = 999, + OwnerLevel = 999, + StartingSequenceNumber = 999 + }); + + var producer = new EventHubProducerClient(connection, clientOptions); + + mockTransport + .Setup(transportProducer => transportProducer.ReadInitializationPublishingPropertiesAsync(It.IsAny())) + .ReturnsAsync(expectedProperties) + .Verifiable(); + + var partitionStateCollection = GetPartitionState(producer); + Assert.That(partitionStateCollection, Is.Not.Null, "The collection for partition state should have been initialized with the client."); + + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + + await producer.ReadPartitionPublishingPropertiesAsync(expectedPartition, cancellationSource.Token); + + Assert.That(partitionStateCollection.TryGetValue(expectedPartition, out var partitionState), Is.True, "The state collection should have an entry for the partition."); + Assert.That(partitionState.ProducerGroupId, Is.EqualTo(expectedProperties.ProducerGroupId), "The producer group should match."); + Assert.That(partitionState.OwnerLevel, Is.EqualTo(expectedProperties.OwnerLevel), "The owner level should match."); + Assert.That(partitionState.LastPublishedSequenceNumber, Is.EqualTo(expectedProperties.LastPublishedSequenceNumber), "The sequence number should match."); + + mockTransport.VerifyAll(); + } + + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + public async Task ReadPartitionPublishingPropertiesAsyncReturnsPartitionState() + { + var expectedPartition = "5"; + var expectedProperties = new PartitionPublishingProperties(true, 123, 456, 798); + var mockTransport = new Mock(); + var connection = new MockConnection(() => mockTransport.Object); + + var producer = new EventHubProducerClient(connection, new EventHubProducerClientOptions + { + EnableIdempotentPartitions = true + }); + + var partitionStateCollection = GetPartitionState(producer); + Assert.That(partitionStateCollection, Is.Not.Null, "The collection for partition state should have been initialized with the client."); + + partitionStateCollection[expectedPartition] = new PartitionPublishingState(expectedPartition) + { + ProducerGroupId = expectedProperties.ProducerGroupId, + OwnerLevel = expectedProperties.OwnerLevel, + LastPublishedSequenceNumber = expectedProperties.LastPublishedSequenceNumber + }; + + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(EventHubsTestEnvironment.Instance.TestExecutionTimeLimit); + + var readProperties = await producer.ReadPartitionPublishingPropertiesAsync(expectedPartition, cancellationSource.Token); + + Assert.That(readProperties, Is.Not.Null, "The read properties should have been created."); + Assert.That(readProperties.ProducerGroupId, Is.EqualTo(expectedProperties.ProducerGroupId), "The producer group should match."); + Assert.That(readProperties.OwnerLevel, Is.EqualTo(expectedProperties.OwnerLevel), "The owner level should match."); + Assert.That(readProperties.LastPublishedSequenceNumber, Is.EqualTo(expectedProperties.LastPublishedSequenceNumber), "The sequence number should match."); + + mockTransport + .Verify(transportProducer => transportProducer.ReadInitializationPublishingPropertiesAsync( + It.IsAny()), + Times.Never, + "Partition state should not have been initialized twice."); + } + + /// /// Verifies functionality of the /// method. @@ -519,7 +651,9 @@ public async Task SendInvokesTheTransportProducer() (IEnumerable calledWithEvents, SendEventOptions calledWithOptions) = transportProducer.SendCalledWith; Assert.That(calledWithEvents, Is.EquivalentTo(events), "The events should contain same elements."); - Assert.That(calledWithOptions, Is.SameAs(options), "The options should be the same instance"); + Assert.That(calledWithOptions, Is.Not.SameAs(options), "The options should not be the same instance."); + Assert.That(calledWithOptions.PartitionId, Is.EqualTo(options.PartitionId), "The partition id of the options should match."); + Assert.That(calledWithOptions.PartitionKey, Is.SameAs(options.PartitionKey), "The partition key of the options should match."); } /// @@ -583,7 +717,7 @@ public void SendIdempotentDoesNotAllowResending() var events = EventGenerator.CreateEvents(5).Select(item => { item.PendingPublishSequenceNumber = 5; - item.CommitPublishingSequenceNumber(); + item.CommitPublishingState(); return item; }); @@ -1137,7 +1271,7 @@ public void SendIdempotentDoesNotAllowResendingWithABatchContainingPublishedEven var events = EventGenerator.CreateEvents(5).Skip(4).Select(item => { item.PendingPublishSequenceNumber = 5; - item.CommitPublishingSequenceNumber(); + item.CommitPublishingState(); return item; }); @@ -2292,7 +2426,7 @@ public void RetryLogicDetectsAnEmbeddedAmqpErrorForOperationCanceled() var cancellationTokenSource = new CancellationTokenSource(); transportProducer - .Setup(transportProducer => transportProducer.SendAsync(events, options, cancellationTokenSource.Token)) + .Setup(transportProducer => transportProducer.SendAsync(events, It.Is(paramOptions => paramOptions.PartitionId == options.PartitionId), cancellationTokenSource.Token)) .Throws(embeddedException); Assert.That(async () => await producerClient.SendAsync(events, options, cancellationTokenSource.Token), Throws.InstanceOf()); @@ -2517,7 +2651,7 @@ private class MockTransportBatch : TransportEventBatch public override long MaximumSizeInBytes { get; } public override long SizeInBytes { get; } - public override int? StartingPublishedSequenceNumber { get; set; } + public override bool ReserveSpaceForSequenceNumber { get; } public override int Count => Events.Count; public override IEnumerable AsEnumerable() => (IEnumerable)Events; public override void Clear() => Events.Clear(); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/SendEventOptionsTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/SendEventOptionsTests.cs new file mode 100644 index 0000000000000..73aa6d2584fa0 --- /dev/null +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/SendEventOptionsTests.cs @@ -0,0 +1,39 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using Azure.Messaging.EventHubs.Producer; +using NUnit.Framework; + +namespace Azure.Messaging.EventHubs.Tests +{ + /// + /// The suite of tests for the + /// class. + /// + /// + [TestFixture] + public class SendEventOptionsTests + { + /// + /// Verifies functionality of the + /// method. + /// + /// + [Test] + public void CloneProducesACopy() + { + var options = new SendEventOptions + { + PartitionId = "0", + PartitionKey = "some_partition_123" + }; + + var clone = options.Clone(); + Assert.That(clone, Is.Not.Null, "The clone should not be null."); + Assert.That(clone, Is.TypeOf(), "The clone should be a SendEventOptions instance."); + Assert.That(clone, Is.Not.SameAs(options), "The clone should not the same reference as the options."); + Assert.That(clone.PartitionId, Is.EqualTo(options.PartitionId), "The partition identifier of the clone should match."); + Assert.That(clone.PartitionKey, Is.EqualTo(options.PartitionKey), "The partition key of the clone should match."); + } + } +} diff --git a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/TransportProducerPoolTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/TransportProducerPoolTests.cs index 78c05281b783f..0aa222530a529 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/TransportProducerPoolTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs/tests/Producer/TransportProducerPoolTests.cs @@ -382,7 +382,7 @@ private class MockTransportBatch : TransportEventBatch public override long MaximumSizeInBytes { get; } public override long SizeInBytes { get; } public override int Count { get; } - public override int? StartingPublishedSequenceNumber { get; set; } + public override bool ReserveSpaceForSequenceNumber { get; } public override bool TryAdd(EventData eventData) => throw new NotImplementedException(); public override IEnumerable AsEnumerable() => throw new NotImplementedException(); public override void Dispose() => throw new NotImplementedException();