Skip to content

Commit

Permalink
[Event Hubs Client] Processor Options Fix (Azure#15725)
Browse files Browse the repository at this point in the history
The focus of these changes is to correct an issue where the load balancing
interval specified in the processing options was not respected and to add
the intended timing options to the `EventProcessorClientOptions`, which,
despite being part of the approved design, were overlooked and not implemented
as part of the load balancing strategy changes.
  • Loading branch information
jsquire authored and suhas92 committed Oct 12, 2020
1 parent 11a01ff commit 182f699
Show file tree
Hide file tree
Showing 12 changed files with 173 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public EventProcessorClientOptions() { }
public Azure.Messaging.EventHubs.EventHubConnectionOptions ConnectionOptions { get { throw null; } set { } }
public string Identifier { get { throw null; } set { } }
public Azure.Messaging.EventHubs.Processor.LoadBalancingStrategy LoadBalancingStrategy { get { throw null; } set { } }
public System.TimeSpan LoadBalancingUpdateInterval { get { throw null; } set { } }
public System.TimeSpan? MaximumWaitTime { get { throw null; } set { } }
public System.TimeSpan PartitionOwnershipExpirationInterval { get { throw null; } set { } }
public int PrefetchCount { get { throw null; } set { } }
public long? PrefetchSizeInBytes { get { throw null; } set { } }
public Azure.Messaging.EventHubs.EventHubsRetryOptions RetryOptions { get { throw null; } set { } }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1052,7 +1052,9 @@ private static EventProcessorOptions CreateOptions(EventProcessorClientOptions c
TrackLastEnqueuedEventProperties = clientOptions.TrackLastEnqueuedEventProperties,
LoadBalancingStrategy = clientOptions.LoadBalancingStrategy,
PrefetchCount = clientOptions.PrefetchCount,
PrefetchSizeInBytes = clientOptions.PrefetchSizeInBytes
PrefetchSizeInBytes = clientOptions.PrefetchSizeInBytes,
LoadBalancingUpdateInterval = clientOptions.LoadBalancingUpdateInterval,
PartitionOwnershipExpirationInterval = clientOptions.PartitionOwnershipExpirationInterval
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ public class EventProcessorClientOptions
/// <summary>The prefetch size limit to use for the partition receiver.</summary>
private long? _prefetchSizeInBytes = default;

/// <summary>The desired amount of time to allow between load balancing verification attempts.</summary>
private TimeSpan _loadBalancingUpdateInterval = TimeSpan.FromSeconds(10);

/// <summary>The desired amount of time to consider a partition owned by a specific event processor.</summary>
private TimeSpan _partitionOwnershipExpirationInterval = TimeSpan.FromSeconds(30);

/// <summary>The set of options to use for configuring the connection to the Event Hubs service.</summary>
private EventHubConnectionOptions _connectionOptions = new EventHubConnectionOptions();

Expand Down Expand Up @@ -207,6 +213,48 @@ public long? PrefetchSizeInBytes
}
}

/// <summary>
/// The desired amount of time to allow between load balancing verification attempts.
/// </summary>
///
/// <value>If not specified, a load balancing interval of 10 seconds will be assumed.</value>
///
/// <remarks>
/// Because load balancing holds less priority than processing events, this interval
/// should be considered the minimum time that will elapse between verification attempts; operations
/// with higher priority may cause a minor delay longer than this interval for load balancing.
/// </remarks>
///
public TimeSpan LoadBalancingUpdateInterval
{
get => _loadBalancingUpdateInterval;

set
{
Argument.AssertNotNegative(value, nameof(LoadBalancingUpdateInterval));
_loadBalancingUpdateInterval = value;
}
}

/// <summary>
/// The desired amount of time to consider a partition owned by a specific event processor
/// instance before the ownership is considered stale and the partition becomes eligible to be
/// requested by another event processor that wishes to assume responsibility for processing it.
/// </summary>
///
/// <value>If not specified, an ownership interval of 30 seconds will be assumed.</value>
///
public TimeSpan PartitionOwnershipExpirationInterval
{
get => _partitionOwnershipExpirationInterval;

set
{
Argument.AssertNotNegative(value, nameof(PartitionOwnershipExpirationInterval));
_partitionOwnershipExpirationInterval = value;
}
}

/// <summary>
/// Gets or sets the options used for configuring the connection to the Event Hubs service.
/// </summary>
Expand Down Expand Up @@ -281,7 +329,9 @@ internal EventProcessorClientOptions Clone() =>
_maximumWaitTime = _maximumWaitTime,
_cacheEventCount = _cacheEventCount,
_prefetchCount = _prefetchCount,
_prefetchSizeInBytes = PrefetchSizeInBytes,
_prefetchSizeInBytes = _prefetchSizeInBytes,
_loadBalancingUpdateInterval = _loadBalancingUpdateInterval,
_partitionOwnershipExpirationInterval = _partitionOwnershipExpirationInterval,
_connectionOptions = ConnectionOptions.Clone(),
_retryOptions = RetryOptions.Clone()
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public void CloneProducesACopy()
CacheEventCount = 1,
PrefetchCount = 0,
PrefetchSizeInBytes = 200,
LoadBalancingUpdateInterval = TimeSpan.FromDays(99),
PartitionOwnershipExpirationInterval = TimeSpan.FromMilliseconds(5),
RetryOptions = new EventHubsRetryOptions { TryTimeout = TimeSpan.FromMinutes(1), Delay = TimeSpan.FromMinutes(4) },
ConnectionOptions = new EventHubConnectionOptions { TransportType = EventHubsTransportType.AmqpWebSockets }
};
Expand All @@ -48,6 +50,8 @@ public void CloneProducesACopy()
Assert.That(clone.CacheEventCount, Is.EqualTo(options.CacheEventCount), "The event cache size of the clone should match.");
Assert.That(clone.PrefetchCount, Is.EqualTo(options.PrefetchCount), "The prefetch count of the clone should match.");
Assert.That(clone.PrefetchSizeInBytes, Is.EqualTo(options.PrefetchSizeInBytes), "The prefetch byte size of the clone should match.");
Assert.That(clone.LoadBalancingUpdateInterval, Is.EqualTo(options.LoadBalancingUpdateInterval), "The load balancing interval of the clone should match.");
Assert.That(clone.PartitionOwnershipExpirationInterval, Is.EqualTo(options.PartitionOwnershipExpirationInterval), "The ownership interval of the clone should match.");
Assert.That(clone.ConnectionOptions.TransportType, Is.EqualTo(options.ConnectionOptions.TransportType), "The connection options of the clone should copy properties.");
Assert.That(clone.ConnectionOptions, Is.Not.SameAs(options.ConnectionOptions), "The connection options of the clone should be a copy, not the same instance.");
Assert.That(clone.RetryOptions.IsEquivalentTo(options.RetryOptions), Is.True, "The retry options of the clone should be considered equal.");
Expand Down Expand Up @@ -148,6 +152,34 @@ public void PrefetchSizeInBytesAllowsNull()
Assert.That(() => new EventProcessorClientOptions { PrefetchSizeInBytes = null }, Throws.Nothing);
}

/// <summary>
/// Verifies functionality of the <see cref="EventProcessorClientOptions.LoadBalancingUpdateInterval" />
/// property.
/// </summary>
///
[Test]
[TestCase(-1)]
[TestCase(-10)]
[TestCase(-100)]
public void LoadBalancingUpdateIntervalIsValidated(int intervalSeconds)
{
Assert.That(() => new EventProcessorClientOptions { LoadBalancingUpdateInterval = TimeSpan.FromSeconds(intervalSeconds) }, Throws.InstanceOf<ArgumentOutOfRangeException>());
}

/// <summary>
/// Verifies functionality of the <see cref="EventProcessorClientOptions.PartitionOwnershipExpirationInterval " />
/// property.
/// </summary>
///
[Test]
[TestCase(-1)]
[TestCase(-10)]
[TestCase(-100)]
public void PartitionOwnershipExpirationInterval(int intervalSeconds)
{
Assert.That(() => new EventProcessorClientOptions { PartitionOwnershipExpirationInterval = TimeSpan.FromSeconds(intervalSeconds) }, Throws.InstanceOf<ArgumentOutOfRangeException>());
}

/// <summary>
/// Verifies functionality of the <see cref="EventProcessorClientOptions.ConnectionOptions" />
/// property.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ void assertOptionsMatch(EventProcessorOptions expected,
MaximumWaitTime = TimeSpan.FromDays(54),
TrackLastEnqueuedEventProperties = true,
PrefetchCount = 5,
PrefetchSizeInBytes = 500
PrefetchSizeInBytes = 500,
LoadBalancingUpdateInterval = TimeSpan.FromDays(65),
PartitionOwnershipExpirationInterval = TimeSpan.FromMilliseconds(65)
};

var expectedOptions = InvokeCreateOptions(clientOptions);
Expand Down Expand Up @@ -1412,7 +1414,9 @@ public void ClientOptionsCanBeTranslated()
TrackLastEnqueuedEventProperties = true,
LoadBalancingStrategy = LoadBalancingStrategy.Greedy,
PrefetchCount = 9990,
PrefetchSizeInBytes = 400
PrefetchSizeInBytes = 400,
LoadBalancingUpdateInterval = TimeSpan.FromSeconds(45),
PartitionOwnershipExpirationInterval = TimeSpan.FromMilliseconds(44)
};

var defaultOptions = new EventProcessorOptions();
Expand All @@ -1429,10 +1433,10 @@ public void ClientOptionsCanBeTranslated()
Assert.That(processorOptions.LoadBalancingStrategy, Is.EqualTo(clientOptions.LoadBalancingStrategy), "The load balancing strategy should have been set.");
Assert.That(processorOptions.PrefetchCount, Is.EqualTo(clientOptions.PrefetchCount), "The prefetch count should have been set.");
Assert.That(processorOptions.PrefetchSizeInBytes, Is.EqualTo(clientOptions.PrefetchSizeInBytes), "The prefetch byte size should have been set.");
Assert.That(processorOptions.LoadBalancingUpdateInterval, Is.EqualTo(clientOptions.LoadBalancingUpdateInterval), "The load balancing interval should have been set.");
Assert.That(processorOptions.PartitionOwnershipExpirationInterval, Is.EqualTo(clientOptions.PartitionOwnershipExpirationInterval), "The partition ownership interval should have been set.");

Assert.That(processorOptions.DefaultStartingPosition, Is.EqualTo(defaultOptions.DefaultStartingPosition), "The default starting position should not have been set.");
Assert.That(processorOptions.LoadBalancingUpdateInterval, Is.EqualTo(defaultOptions.LoadBalancingUpdateInterval), "The load balancing interval should not have been set.");
Assert.That(processorOptions.PartitionOwnershipExpirationInterval, Is.EqualTo(defaultOptions.PartitionOwnershipExpirationInterval), "The partition ownership interval should not have been set.");
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,44 +36,44 @@ internal class PartitionLoadBalancer
///
private readonly Dictionary<string, List<EventProcessorPartitionOwnership>> ActiveOwnershipWithDistribution = new Dictionary<string, List<EventProcessorPartitionOwnership>>(StringComparer.OrdinalIgnoreCase);

/// <summary>
/// The minimum amount of time for an ownership to be considered expired without further updates.
/// </summary>
///
private TimeSpan OwnershipExpiration;

/// <summary>
/// The fully qualified Event Hubs namespace that the processor is associated with. This is likely
/// to be similar to <c>{yournamespace}.servicebus.windows.net</c>.
/// </summary>
///
public string FullyQualifiedNamespace { get; private set; }
public string FullyQualifiedNamespace { get; }

/// <summary>
/// The name of the Event Hub that the processor is connected to, specific to the
/// Event Hubs namespace that contains it.
/// </summary>
///
public string EventHubName { get; private set; }
public string EventHubName { get; }

/// <summary>
/// The name of the consumer group this load balancer is associated with. Events will be
/// read only in the context of this group.
/// </summary>
///
public string ConsumerGroup { get; private set; }
public string ConsumerGroup { get; }

/// <summary>
/// The identifier of the EventProcessorClient that owns this load balancer.
/// </summary>
///
public string OwnerIdentifier { get; private set; }
public string OwnerIdentifier { get; }

/// <summary>
/// The minimum amount of time for an ownership to be considered expired without further updates.
/// </summary>
///
public TimeSpan OwnershipExpirationInterval { get; }

/// <summary>
/// The minimum amount of time to be elapsed between two load balancing verifications.
/// </summary>
///
public TimeSpan LoadBalanceInterval { get; set; } = TimeSpan.FromSeconds(10);
public TimeSpan LoadBalanceInterval { get; internal set; }

/// <summary>
/// Indicates whether the load balancer believes itself to be in a balanced state
Expand Down Expand Up @@ -110,14 +110,16 @@ internal class PartitionLoadBalancer
/// <param name="consumerGroup">The name of the consumer group this load balancer is associated with.</param>
/// <param name="fullyQualifiedNamespace">The fully qualified Event Hubs namespace that the processor is associated with.</param>
/// <param name="eventHubName">The name of the Event Hub that the processor is associated with.</param>
/// <param name="ownershipExpiration">The minimum amount of time for an ownership to be considered expired without further updates.</param>
/// <param name="ownershipExpirationInterval">The minimum amount of time for an ownership to be considered expired without further updates.</param>
/// <param name="loadBalancingInterval">The minimum amount of time to be elapsed between two load balancing verifications.</param>
///
public PartitionLoadBalancer(StorageManager storageManager,
string identifier,
string consumerGroup,
string fullyQualifiedNamespace,
string eventHubName,
TimeSpan ownershipExpiration)
TimeSpan ownershipExpirationInterval,
TimeSpan loadBalancingInterval)
{
Argument.AssertNotNull(storageManager, nameof(storageManager));
Argument.AssertNotNullOrEmpty(identifier, nameof(identifier));
Expand All @@ -130,7 +132,8 @@ public PartitionLoadBalancer(StorageManager storageManager,
FullyQualifiedNamespace = fullyQualifiedNamespace;
EventHubName = eventHubName;
ConsumerGroup = consumerGroup;
OwnershipExpiration = ownershipExpiration;
OwnershipExpirationInterval = ownershipExpirationInterval;
LoadBalanceInterval = loadBalancingInterval;
}

/// <summary>
Expand All @@ -139,6 +142,13 @@ public PartitionLoadBalancer(StorageManager storageManager,
///
protected PartitionLoadBalancer()
{
// Because this constructor is used heavily in testing, initialize the
// critical timing properties to their default option values.

var options = new EventProcessorOptions();

LoadBalanceInterval = options.LoadBalancingUpdateInterval;
OwnershipExpirationInterval = options.PartitionOwnershipExpirationInterval;
}

/// <summary>
Expand Down Expand Up @@ -202,7 +212,7 @@ public virtual async ValueTask<EventProcessorPartitionOwnership> RunLoadBalancin

foreach (EventProcessorPartitionOwnership ownership in completeOwnershipList)
{
if (utcNow.Subtract(ownership.LastModifiedTime) < OwnershipExpiration && !string.IsNullOrEmpty(ownership.OwnerIdentifier))
if (utcNow.Subtract(ownership.LastModifiedTime) < OwnershipExpirationInterval && !string.IsNullOrEmpty(ownership.OwnerIdentifier))
{
activeOwnership = ownership;

Expand Down
Loading

0 comments on commit 182f699

Please sign in to comment.