Skip to content

Commit

Permalink
[Event Hubs] Track Two (Link Close Fix) (#9352)
Browse files Browse the repository at this point in the history
The focus of these changes is to explicitly close the session associated
with an AMQP link when the link itself is closed via the FaultTolerantAmqpObject
abstraction.
  • Loading branch information
jsquire authored Jan 7, 2020
1 parent f2f0ed4 commit 02c143c
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,14 @@ protected AmqpClient(string host,
Credential = credential;
MessageConverter = messageConverter ?? new AmqpMessageConverter();
ConnectionScope = connectionScope ?? new AmqpConnectionScope(ServiceEndpoint, eventHubName, credential, clientOptions.TransportType, clientOptions.Proxy);
ManagementLink = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(timeout => ConnectionScope.OpenManagementLinkAsync(timeout, CancellationToken.None), link => link.SafeClose());

ManagementLink = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(
timeout => ConnectionScope.OpenManagementLinkAsync(timeout, CancellationToken.None),
link =>
{
link.Session?.SafeClose();
link.SafeClose();
});
}
finally
{
Expand Down
27 changes: 16 additions & 11 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -159,17 +159,22 @@ public AmqpConsumer(string eventHubName,
RetryPolicy = retryPolicy;
MessageConverter = messageConverter;

ReceiveLink = new FaultTolerantAmqpObject<ReceivingAmqpLink>(timeout =>
ConnectionScope.OpenConsumerLinkAsync(
consumerGroup,
partitionId,
CurrentEventPosition,
timeout,
prefetchCount ?? DefaultPrefetchCount,
ownerLevel,
trackLastEnqueuedEventProperties,
CancellationToken.None),
link => link.SafeClose());
ReceiveLink = new FaultTolerantAmqpObject<ReceivingAmqpLink>(
timeout =>
ConnectionScope.OpenConsumerLinkAsync(
consumerGroup,
partitionId,
CurrentEventPosition,
timeout,
prefetchCount ?? DefaultPrefetchCount,
ownerLevel,
trackLastEnqueuedEventProperties,
CancellationToken.None),
link =>
{
link.Session?.SafeClose();
link.SafeClose();
});
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,14 @@ public AmqpProducer(string eventHubName,
RetryPolicy = retryPolicy;
ConnectionScope = connectionScope;
MessageConverter = messageConverter;
SendLink = new FaultTolerantAmqpObject<SendingAmqpLink>(timeout => CreateLinkAndEnsureProducerStateAsync(partitionId, timeout, CancellationToken.None), link => link.SafeClose());

SendLink = new FaultTolerantAmqpObject<SendingAmqpLink>(
timeout => CreateLinkAndEnsureProducerStateAsync(partitionId, timeout, CancellationToken.None),
link =>
{
link.Session?.SafeClose();
link.SafeClose();
});
}

/// <summary>
Expand Down

0 comments on commit 02c143c

Please sign in to comment.