Skip to content

Commit

Permalink
[Event Hubs Client] Move Partition Initialization to the Background (#…
Browse files Browse the repository at this point in the history
…20733)

The focus of these changes is to o refactor partition initialization into
the background task responsible for processing the partition. Because
failures would still be surfaced through the error handler mechanism and
the flow for recovering would follow the existing partition processing failure,
there should be no change to usage patterns nor visible behavioral changes.
  • Loading branch information
jsquire authored May 5, 2021
1 parent 49f34a5 commit 24adb39
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -774,18 +774,16 @@ public virtual void EventProcessorPartitionProcessingStartError(string partition
/// <param name="identifier">A unique name used to identify the event processor.</param>
/// <param name="eventHubName">The name of the Event Hub that the processor is associated with.</param>
/// <param name="consumerGroup">The name of the consumer group that the processor is associated with.</param>
/// <param name="eventPosition">The description of the <see cref="EventPosition" /> used as the starting point for processing.</param>
///
[Event(39, Level = EventLevel.Verbose, Message = "Completed starting to process partition '{0}' using processor instance with identifier '{1}' for Event Hub: {2} and Consumer Group: {3}. Starting at position: {4}.")]
[Event(39, Level = EventLevel.Verbose, Message = "Completed starting to process partition '{0}' using processor instance with identifier '{1}' for Event Hub: {2} and Consumer Group: {3}.")]
public virtual void EventProcessorPartitionProcessingStartComplete(string partitionId,
string identifier,
string eventHubName,
string consumerGroup,
string eventPosition)
string consumerGroup)
{
if (IsEnabled())
{
WriteEvent(39, partitionId ?? string.Empty, identifier ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty, eventPosition ?? string.Empty);
WriteEvent(39, partitionId ?? string.Empty, identifier ?? string.Empty, eventHubName ?? string.Empty, consumerGroup ?? string.Empty);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,8 +582,8 @@ internal virtual async Task ProcessEventBatchAsync(TPartition partition,
/// </summary>
///
/// <param name="partition">The Event Hub partition whose processing should be started.</param>
/// <param name="startingPosition">The position within the event stream that processing should begin.</param>
/// <param name="cancellationSource">A <see cref="CancellationTokenSource"/> instance to signal the request to cancel the operation.</param>
/// <param name="startingPositionOverride">Allows for skipping partition initialization and directly overriding the position within the event stream where processing will begin.</param>
///
/// <returns>The <see cref="PartitionProcessor" /> encapsulating the processing task, its cancellation token, and associated state.</returns>
///
Expand All @@ -592,8 +592,8 @@ internal virtual async Task ProcessEventBatchAsync(TPartition partition,
/// </remarks>
///
internal virtual PartitionProcessor CreatePartitionProcessor(TPartition partition,
EventPosition startingPosition,
CancellationTokenSource cancellationSource)
CancellationTokenSource cancellationSource,
EventPosition? startingPositionOverride = null)
{
cancellationSource.Token.ThrowIfCancellationRequested<TaskCanceledException>();
var consumer = default(TransportConsumer);
Expand Down Expand Up @@ -630,6 +630,17 @@ async Task performProcessing()
var failedAttemptCount = 0;
var failedConsumerCount = 0;

// Determine the position to start processing from; this will occur during
// partition initialization normally, but may be superseded if an override
// was passed. In the event that initialization is run and encounters an
// exception, it takes responsibility for firing the error handler.

var startingPosition = startingPositionOverride switch
{
_ when startingPositionOverride.HasValue => startingPositionOverride.Value,
_ => await InitializePartitionForProcessingAsync(partition, cancellationSource.Token).ConfigureAwait(false)
};

// Create the connection to be used for spawning consumers; if the creation
// fails, then consider the processing task to be failed. The main processing
// loop will take responsibility for attempting to restart or relinquishing ownership.
Expand Down Expand Up @@ -1337,7 +1348,7 @@ string _ when (ActivePartitionProcessors.TryGetValue(partitionId, out var partit

if ((claimedOwnership != default) && (!ActivePartitionProcessors.ContainsKey(claimedOwnership.PartitionId)))
{
await TryStartProcessingPartitionAsync(claimedOwnership.PartitionId, cancellationToken).ConfigureAwait(false);
TryStartProcessingPartition(claimedOwnership.PartitionId, cancellationToken);
}
}

Expand All @@ -1363,7 +1374,7 @@ await Task.WhenAll(LoadBalancer.OwnedPartitionIds
if (!ActivePartitionProcessors.TryGetValue(partitionId, out var partitionProcessor) || partitionProcessor.ProcessingTask.IsCompleted)
{
await TryStopProcessingPartitionAsync(partitionId, ProcessingStoppedReason.OwnershipLost, cancellationToken).ConfigureAwait(false);
await TryStartProcessingPartitionAsync(partitionId, cancellationToken).ConfigureAwait(false);
TryStartProcessingPartition(partitionId, cancellationToken);
}
}))
.ConfigureAwait(false);
Expand All @@ -1382,31 +1393,24 @@ await Task.WhenAll(LoadBalancer.OwnedPartitionIds
}

/// <summary>
/// Attempts to begin processing the requested partition in the background and update tracking state
/// so that processing can be stopped.
/// Performs the actions needed to initialize a partition for processing; this
/// includes invoking the initialization handler and querying checkpoints.
/// </summary>
///
/// <param name="partitionId">The identifier of the Event Hub partition whose processing should be started.</param>
/// <param name="partition">The partition to initialize.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <returns><c>true</c> if processing was successfully started; otherwise, <c>false</c>.</returns>
/// <returns>The <see cref="EventPosition" /> to start processing from.</returns>
///
/// <remarks>
/// Exceptions encountered in this method will be logged and will result in the error handler being
/// invoked. They will not be surfaced to callers. This is intended to be a safe operation consumed
/// as part of the load balancing cycle, which is failure-tolerant.
/// This method will invoke the error handler should an exception be encountered; the
/// exception will then be bubbled to callers.
/// </remarks>
///
private async Task<bool> TryStartProcessingPartitionAsync(string partitionId,
CancellationToken cancellationToken)
private async Task<EventPosition> InitializePartitionForProcessingAsync(TPartition partition,
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
Logger.EventProcessorPartitionProcessingStart(partitionId, Identifier, EventHubName, ConsumerGroup);

var partition = new TPartition { PartitionId = partitionId };
var operationDescription = Resources.OperationClaimOwnership;
var startingPosition = Options.DefaultStartingPosition;
var cancellationSource = default(CancellationTokenSource);

try
{
Expand All @@ -1416,25 +1420,62 @@ private async Task<bool> TryStartProcessingPartitionAsync(string partitionId,

// Query the available checkpoints for the partition.

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
operationDescription = Resources.OperationListCheckpoints;
var checkpoint = await GetCheckpointAsync(partition.PartitionId, cancellationToken).ConfigureAwait(false);

// Determine the starting position for processing the partition.

var checkpoint = await GetCheckpointAsync(partitionId, cancellationToken).ConfigureAwait(false);
operationDescription = Resources.OperationClaimOwnership;

if (checkpoint != null)
{
startingPosition = checkpoint.StartingPosition;
return checkpoint.StartingPosition;
}

return Options.DefaultStartingPosition;
}
catch (Exception ex)
{
// The error handler is invoked as a fire-and-forget task; the processor does not assume responsibility
// for observing or surfacing exceptions that may occur in the handler.

_ = InvokeOnProcessingErrorAsync(ex, partition, operationDescription, CancellationToken.None);
throw;
}
}

/// <summary>
/// Attempts to begin processing the requested partition in the background and update tracking state
/// so that processing can be stopped.
/// </summary>
///
/// <param name="partitionId">The identifier of the Event Hub partition whose processing should be started.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <returns><c>true</c> if processing was successfully started; otherwise, <c>false</c>.</returns>
///
/// <remarks>
/// Exceptions encountered in this method will be logged and will result in the error handler being
/// invoked. They will not be surfaced to callers. This is intended to be a safe operation consumed
/// as part of the load balancing cycle, which is failure-tolerant.
/// </remarks>
///
private bool TryStartProcessingPartition(string partitionId,
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
Logger.EventProcessorPartitionProcessingStart(partitionId, Identifier, EventHubName, ConsumerGroup);

var partition = new TPartition { PartitionId = partitionId };
var cancellationSource = default(CancellationTokenSource);

try
{
// Create and register the partition processor. Ownership of the cancellationSource is transferred
// to the processor upon creation, including the responsibility for disposal.

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

cancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
var processor = CreatePartitionProcessor(partition, startingPosition, cancellationSource);
var processor = CreatePartitionProcessor(partition, cancellationSource);

ActivePartitionProcessors.AddOrUpdate(partitionId, processor, (key, value) => processor);
cancellationSource = null;
Expand All @@ -1446,7 +1487,7 @@ private async Task<bool> TryStartProcessingPartitionAsync(string partitionId,
// The error handler is invoked as a fire-and-forget task; the processor does not assume responsibility
// for observing or surfacing exceptions that may occur in the handler.

_ = InvokeOnProcessingErrorAsync(ex, partition, operationDescription, CancellationToken.None);
_ = InvokeOnProcessingErrorAsync(ex, partition, Resources.OperationClaimOwnership, CancellationToken.None);
Logger.EventProcessorPartitionProcessingStartError(partitionId, Identifier, EventHubName, ConsumerGroup, ex.Message);

cancellationSource?.Cancel();
Expand All @@ -1455,7 +1496,7 @@ private async Task<bool> TryStartProcessingPartitionAsync(string partitionId,
}
finally
{
Logger.EventProcessorPartitionProcessingStartComplete(partitionId, Identifier, EventHubName, ConsumerGroup, startingPosition.ToString());
Logger.EventProcessorPartitionProcessingStartComplete(partitionId, Identifier, EventHubName, ConsumerGroup);
}
}

Expand Down
Loading

0 comments on commit 24adb39

Please sign in to comment.