Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs Client] Idempotent Producer Client #15034

Merged
merged 1 commit into from
Sep 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Resources.Designer.cs
100755 → 100644

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Resources.resx
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@
<value>The requested transport type, '{0}' is not supported.</value>
</data>
<data name="CannotSendWithPartitionIdAndPartitionKey" xml:space="preserve">
<value>A producer created for a specific partition cannot send events using a partition key. This producer is associated with partition '{0}'.</value>
<value>An event cannot be published using both a partition key and a partition identifier. This operation specified partition key `{0}` and partition id `{1}`.</value>
</data>
<data name="UnsupportedCredential" xml:space="preserve">
<value>The credential is not a known and supported credential type. Please use a JWT credential or shared key credential.</value>
Expand Down Expand Up @@ -289,6 +289,12 @@
<value>One or more exceptions occured during event processing. Please see the inner exceptions for more detail.</value>
</data>
<data name="OnlyOneSharedAccessAuthorizationMayBeSpecified" xml:space="preserve">
<value>The authorization for a connection string may specifiy a shared key or precomputed shared access signature, but not both. Please verify that your connection string does not have the `SharedAccessSignature` token if you are passing the `SharedKeyName` and `SharedKey`.</value>
<value>The authorization for a connection string may specify a shared key or pre-computed shared access signature, but not both. Please verify that your connection string does not have the `SharedAccessSignature` token if you are passing the `SharedKeyName` and `SharedKey`.</value>
</data>
<data name="CannotPublishToGateway" xml:space="preserve">
<value>The producer was configured to use features that require publishing to a specific partition. Publishing with automatic routing or using a partition key is not supported by this producer.</value>
</data>
<data name="IdempotentAlreadyPublished" xml:space="preserve">
<value>These events have already been successfully published. When idempotent publishing is enabled, events that were acknowledged by the Event Hubs service may not be published again.</value>
</data>
</root>
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ public EventHubProducerClient(string connectionString, string eventHubName, Azur
public virtual System.Threading.Tasks.Task<string[]> GetPartitionIdsAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Messaging.EventHubs.PartitionProperties> GetPartitionPropertiesAsync(string partitionId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SendAsync(Azure.Messaging.EventHubs.Producer.EventDataBatch eventBatch, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.EventData> eventBatch, Azure.Messaging.EventHubs.Producer.SendEventOptions options, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.EventData> eventSet, Azure.Messaging.EventHubs.Producer.SendEventOptions options, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SendAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventHubs.EventData> eventBatch, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
[System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)]
public override string ToString() { throw null; }
Expand Down Expand Up @@ -495,9 +495,9 @@ public partial class PartitionPublishingProperties
{
protected internal PartitionPublishingProperties(bool isIdempotentPublishingEnabled, long? producerGroupId, short? ownerLevel, int? lastPublishedSequenceNumber) { }
public bool IsIdempotentPublishingEnabled { get { throw null; } }
public int? LastPublishedSequenceNumber { get { throw null; } set { } }
public short? OwnerLevel { get { throw null; } set { } }
public long? ProducerGroupId { get { throw null; } set { } }
public int? LastPublishedSequenceNumber { get { throw null; } }
public short? OwnerLevel { get { throw null; } }
public long? ProducerGroupId { get { throw null; } }
}
public partial class SendEventOptions
{
Expand Down
9 changes: 8 additions & 1 deletion sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpClient.cs
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Core;
using Azure.Messaging.EventHubs.Diagnostics;
using Azure.Messaging.EventHubs.Producer;
using Microsoft.Azure.Amqp;

namespace Azure.Messaging.EventHubs.Amqp
Expand Down Expand Up @@ -374,11 +375,15 @@ public override async Task<PartitionProperties> GetPartitionPropertiesAsync(stri
/// </summary>
///
/// <param name="partitionId">The identifier of the partition to which the transport producer should be bound; if <c>null</c>, the producer is unbound.</param>
/// <param name="requestedFeatures">The flags specifying the set of special transport features that should be opted-into.</param>
/// <param name="partitionOptions">The set of options, if any, that should be considered when initializing the producer.</param>
/// <param name="retryPolicy">The policy which governs retry behavior and try timeouts.</param>
///
/// <returns>A <see cref="TransportProducer"/> configured in the requested manner.</returns>
///
public override TransportProducer CreateProducer(string partitionId,
TransportProducerFeatures requestedFeatures,
PartitionPublishingOptions partitionOptions,
EventHubsRetryPolicy retryPolicy)
{
Argument.AssertNotClosed(_closed, nameof(AmqpClient));
Expand All @@ -389,7 +394,9 @@ public override TransportProducer CreateProducer(string partitionId,
partitionId,
ConnectionScope,
MessageConverter,
retryPolicy
retryPolicy,
requestedFeatures,
partitionOptions
);
}

Expand Down
126 changes: 113 additions & 13 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ internal class AmqpProducer : TransportProducer
///
private string PartitionId { get; }

/// <summary>
/// The flags specifying the set of special transport features that this producer has opted-into.
/// </summary>
///
private TransportProducerFeatures ActiveFeatures { get; }

/// <summary>
/// The policy to use for determining retry behavior for when an operation fails.
/// </summary>
Expand Down Expand Up @@ -92,6 +98,17 @@ internal class AmqpProducer : TransportProducer
///
private long? MaximumMessageSize { get; set; }

/// <summary>
/// The set of partition publishing properties active for this producer at the time it was initialized.
/// </summary>
///
/// <remarks>
/// It is important to note that these properties are a snapshot of the service state at the time when the
/// producer was initialized; they do not necessarily represent the current state of the service.
/// </remarks>
///
private PartitionPublishingProperties InitializedPartitionProperties { get; set; }

/// <summary>
/// Initializes a new instance of the <see cref="AmqpProducer"/> class.
/// </summary>
Expand All @@ -101,6 +118,8 @@ internal class AmqpProducer : TransportProducer
/// <param name="connectionScope">The AMQP connection context for operations.</param>
/// <param name="messageConverter">The converter to use for translating between AMQP messages and client types.</param>
/// <param name="retryPolicy">The retry policy to consider when an operation fails.</param>
/// <param name="requestedFeatures">The flags specifying the set of special transport features that should be opted-into.</param>
/// <param name="partitionOptions">The set of options, if any, that should be considered when initializing the producer.</param>
///
/// <remarks>
/// As an internal type, this class performs only basic sanity checks against its arguments. It
Expand All @@ -115,8 +134,12 @@ public AmqpProducer(string eventHubName,
string partitionId,
AmqpConnectionScope connectionScope,
AmqpMessageConverter messageConverter,
EventHubsRetryPolicy retryPolicy)
EventHubsRetryPolicy retryPolicy,
TransportProducerFeatures requestedFeatures = TransportProducerFeatures.None,
PartitionPublishingOptions partitionOptions = null)
{
partitionOptions ??= new PartitionPublishingOptions();

Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName));
Argument.AssertNotNull(connectionScope, nameof(connectionScope));
Argument.AssertNotNull(messageConverter, nameof(messageConverter));
Expand All @@ -127,9 +150,10 @@ public AmqpProducer(string eventHubName,
RetryPolicy = retryPolicy;
ConnectionScope = connectionScope;
MessageConverter = messageConverter;
ActiveFeatures = requestedFeatures;

SendLink = new FaultTolerantAmqpObject<SendingAmqpLink>(
timeout => CreateLinkAndEnsureProducerStateAsync(partitionId, timeout, CancellationToken.None),
timeout => CreateLinkAndEnsureProducerStateAsync(partitionId, partitionOptions, timeout, CancellationToken.None),
link =>
{
link.Session?.SafeClose();
Expand Down Expand Up @@ -211,7 +235,6 @@ public override async ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatc
if (!MaximumMessageSize.HasValue)
{
var failedAttemptCount = 0;
var retryDelay = default(TimeSpan?);
var tryTimeout = RetryPolicy.CalculateTryTimeout(0);

while ((!cancellationToken.IsCancellationRequested) && (!_closed))
Expand All @@ -223,13 +246,13 @@ public override async ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatc
}
catch (Exception ex)
{
Exception activeEx = ex.TranslateServiceException(EventHubName);
++failedAttemptCount;

// Determine if there should be a retry for the next attempt; if so enforce the delay but do not quit the loop.
// Otherwise, bubble the exception.

++failedAttemptCount;
retryDelay = RetryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount);
var activeEx = ex.TranslateServiceException(EventHubName);
var retryDelay = RetryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount);

if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!cancellationToken.IsCancellationRequested))
{
Expand All @@ -247,13 +270,7 @@ public override async ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatc
}
}

// If MaximumMessageSize has not been populated nor exception thrown
// by this point, then cancellation has been requested.

if (!MaximumMessageSize.HasValue)
{
throw new TaskCanceledException();
}
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
}

// Ensure that there was a maximum size populated; if none was provided,
Expand All @@ -265,6 +282,74 @@ public override async ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatc
return new AmqpEventBatch(MessageConverter, options);
}

/// <summary>
/// Reads the set of partition publishing properties active for this producer at the time it was initialized.
/// </summary>
///
/// <param name="cancellationToken">The cancellation token to consider when creating the link.</param>
///
/// <returns>The set of <see cref="PartitionPublishingProperties" /> observed when the producer was initialized.</returns>
///
/// <remarks>
/// It is important to note that these properties are a snapshot of the service state at the time when the
/// producer was initialized; they do not necessarily represent the current state of the service.
/// </remarks>
///
public override async ValueTask<PartitionPublishingProperties> ReadInitializationPublishingPropertiesAsync(CancellationToken cancellationToken)
{
Argument.AssertNotClosed(_closed, nameof(AmqpProducer));

// If the properties were already initialized, use them.

if (InitializedPartitionProperties != null)
{
return InitializedPartitionProperties;
}

// Initialize the properties by forcing the link to be opened.

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

var failedAttemptCount = 0;
var tryTimeout = RetryPolicy.CalculateTryTimeout(0);

while ((!cancellationToken.IsCancellationRequested) && (!_closed))
{
try
{
await SendLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout)).ConfigureAwait(false);
break;
}
catch (Exception ex)
{
++failedAttemptCount;

// Determine if there should be a retry for the next attempt; if so enforce the delay but do not quit the loop.
// Otherwise, bubble the exception.

var activeEx = ex.TranslateServiceException(EventHubName);
var retryDelay = RetryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount);

if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!cancellationToken.IsCancellationRequested))
{
await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false);
tryTimeout = RetryPolicy.CalculateTryTimeout(failedAttemptCount);
}
else if (ex is AmqpException)
{
ExceptionDispatchInfo.Capture(activeEx).Throw();
}
else
{
throw;
}
}
}

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
return InitializedPartitionProperties;
}

/// <summary>
/// Closes the connection to the transport producer instance.
/// </summary>
Expand Down Expand Up @@ -430,6 +515,7 @@ protected virtual async Task SendAsync(Func<AmqpMessage> messageFactory,
/// </summary>
///
/// <param name="partitionId">The identifier of the Event Hub partition to which it is bound; if unbound, <c>null</c>.</param>
/// <param name="partitionOptions">The set of options, if any, that should be considered when initializing the producer.</param>
/// <param name="timeout">The timeout to apply when creating the link.</param>
/// <param name="cancellationToken">The cancellation token to consider when creating the link.</param>
///
Expand All @@ -443,6 +529,7 @@ protected virtual async Task SendAsync(Func<AmqpMessage> messageFactory,
/// </remarks>
jsquire marked this conversation as resolved.
Show resolved Hide resolved
///
protected virtual async Task<SendingAmqpLink> CreateLinkAndEnsureProducerStateAsync(string partitionId,
PartitionPublishingOptions partitionOptions,
TimeSpan timeout,
CancellationToken cancellationToken)
{
Expand All @@ -465,6 +552,19 @@ protected virtual async Task<SendingAmqpLink> CreateLinkAndEnsureProducerStateAs
await Task.Delay(15, cancellationToken).ConfigureAwait(false);
MaximumMessageSize = (long)link.Settings.MaxMessageSize;
}

if (InitializedPartitionProperties == null)
{
if ((ActiveFeatures & TransportProducerFeatures.IdempotentPublishing) != 0)
{
throw new NotImplementedException(nameof(CreateLinkAndEnsureProducerStateAsync));
jsquire marked this conversation as resolved.
Show resolved Hide resolved
}
else
{
InitializedPartitionProperties = new PartitionPublishingProperties(false, null, null, null);
}
}

}
catch (Exception ex)
{
Expand Down
Loading