diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs index 20e9e47dccd07..86a516076dc38 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs @@ -31,9 +31,6 @@ internal partial class BlobsCheckpointStore : StorageManager private static readonly string BlobsResourceDoesNotExist = "The Azure Storage Blobs container or blob used by the Event Processor Client does not exist."; #pragma warning restore CA1810 - /// A regular expression used to capture strings enclosed in double quotes. - private static readonly Regex DoubleQuotesExpression = new Regex("\"(.*)\"", RegexOptions.Compiled); - /// An ETag value to be used for permissive matching when querying Storage. private static readonly ETag IfNoneMatchAllTag = new ETag("*"); @@ -126,7 +123,7 @@ public override async Task> ListOw cancellationToken.ThrowIfCancellationRequested(); ListOwnershipStart(fullyQualifiedNamespace, eventHubName, consumerGroup); - List result = new List(); + var result = new List(); try { @@ -156,7 +153,7 @@ public override async Task> ListOw catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound) { ListOwnershipError(fullyQualifiedNamespace, eventHubName, consumerGroup, ex); - throw new RequestFailedException(BlobsResourceDoesNotExist); + throw new RequestFailedException(BlobsResourceDoesNotExist, ex); } finally { @@ -231,14 +228,9 @@ public override async Task> ClaimO } // Small workaround to retrieve the eTag. The current storage SDK returns it enclosed in - // double quotes ('"ETAG_VALUE"' instead of 'ETAG_VALUE'). + // double quotes ("ETAG_VALUE" instead of ETAG_VALUE). - var match = DoubleQuotesExpression.Match(ownership.Version); - - if (match.Success) - { - ownership.Version = match.Groups[1].ToString(); - } + ownership.Version = ownership.Version?.Trim('"'); claimedOwnership.Add(ownership); OwnershipClaimed(ownership.PartitionId, ownership.FullyQualifiedNamespace, ownership.EventHubName, ownership.ConsumerGroup, ownership.OwnerIdentifier); @@ -250,7 +242,7 @@ public override async Task> ClaimO catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound || ex.ErrorCode == BlobErrorCode.BlobNotFound) { ClaimOwnershipError(ownership.PartitionId, ownership.FullyQualifiedNamespace, ownership.EventHubName, ownership.ConsumerGroup, ownership.OwnerIdentifier, ex); - throw new RequestFailedException(BlobsResourceDoesNotExist); + throw new RequestFailedException(BlobsResourceDoesNotExist, ex); } catch (Exception ex) { @@ -303,12 +295,14 @@ public override async Task> ListCheckpoint if (InitializeWithLegacyCheckpoints) { - // Legacy checkpoints are not normalized to lowercase + // Legacy checkpoints are not normalized to lowercase. + var legacyPrefix = string.Format(CultureInfo.InvariantCulture, FunctionsLegacyCheckpointPrefix, fullyQualifiedNamespace, eventHubName, consumerGroup); await foreach (BlobItem blob in ContainerClient.GetBlobsAsync(prefix: legacyPrefix, cancellationToken: cancellationToken).ConfigureAwait(false)) { - // Skip new checkpoints and empty blobs + // Skip new checkpoints and empty blobs. + if (blob.Properties.ContentLength == 0) { continue; @@ -316,7 +310,8 @@ public override async Task> ListCheckpoint var partitionId = blob.Name.Substring(legacyPrefix.Length); - // Check whether there is already a checkpoint for this partition id + // Check whether there is already a checkpoint for this partition id. + if (checkpoints.Any(existingCheckpoint => string.Equals(existingCheckpoint.PartitionId, partitionId, StringComparison.Ordinal))) { continue; @@ -335,7 +330,7 @@ public override async Task> ListCheckpoint catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound) { ListCheckpointsError(fullyQualifiedNamespace, eventHubName, consumerGroup, ex); - throw new RequestFailedException(BlobsResourceDoesNotExist); + throw new RequestFailedException(BlobsResourceDoesNotExist, ex); } catch (Exception ex) { @@ -502,7 +497,8 @@ private async Task CreateLegacyCheckpoint(string fully } else { - // Skip checkpoints without an offset without logging an error + // Skip checkpoints without an offset without logging an error. + return null; } } @@ -563,6 +559,7 @@ public override async Task UpdateCheckpointAsync(EventProcessorCheckpoint checkp catch (RequestFailedException ex) when ((ex.ErrorCode == BlobErrorCode.BlobNotFound) || (ex.ErrorCode == BlobErrorCode.ContainerNotFound)) { // If the blob wasn't present, fall-back to trying to create a new one. + using var blobContent = new MemoryStream(Array.Empty()); await blobClient.UploadAsync(blobContent, metadata: metadata, cancellationToken: cancellationToken).ConfigureAwait(false); } @@ -570,7 +567,7 @@ public override async Task UpdateCheckpointAsync(EventProcessorCheckpoint checkp catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound) { UpdateCheckpointError(checkpoint.PartitionId, checkpoint.FullyQualifiedNamespace, checkpoint.EventHubName, checkpoint.ConsumerGroup, ex); - throw new RequestFailedException(BlobsResourceDoesNotExist); + throw new RequestFailedException(BlobsResourceDoesNotExist, ex); } catch (Exception ex) { diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/PartitionLoadBalancer.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/PartitionLoadBalancer.cs index 82d78116243c8..3da1c42110c8b 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/PartitionLoadBalancer.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/Processor/PartitionLoadBalancer.cs @@ -179,10 +179,16 @@ public virtual async ValueTask RunLoadBalancin .ConfigureAwait(false)) .ToList(); } + catch (TaskCanceledException) + { + throw; + } + catch (OperationCanceledException) + { + throw new TaskCanceledException(); + } catch (Exception ex) { - cancellationToken.ThrowIfCancellationRequested(); - // If ownership list retrieval fails, give up on the current cycle. There's nothing more we can do // without an updated ownership list. Set the EventHubName to null so it doesn't modify the exception // message. This exception message is used so the processor can retrieve the raw Operation string, and @@ -198,14 +204,12 @@ public virtual async ValueTask RunLoadBalancin return default; } - var unclaimedPartitions = new HashSet(partitionIds); - // Create a partition distribution dictionary from the complete ownership list we have, mapping an owner's identifier to the list of // partitions it owns. When an event processor goes down and it has only expired ownership, it will not be taken into consideration // by others. The expiration time defaults to 30 seconds, but it may be overridden by a derived class. + var unclaimedPartitions = new HashSet(partitionIds); var utcNow = GetDateTimeOffsetNow(); - var activeOwnership = default(EventProcessorPartitionOwnership); ActiveOwnershipWithDistribution.Clear(); @@ -290,7 +294,6 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio }); await StorageManager.ClaimOwnershipAsync(ownershipToRelinquish, cancellationToken).ConfigureAwait(false); - InstanceOwnership.Clear(); } @@ -422,7 +425,7 @@ public virtual async Task RelinquishOwnershipAsync(CancellationToken cancellatio // No ownership has been claimed. - return new ValueTask<(bool, EventProcessorPartitionOwnership)>((false, default(EventProcessorPartitionOwnership))); + return new ValueTask<(bool, EventProcessorPartitionOwnership)>((false, default)); } ///