diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs index 76fdd8e3f0775..a03b11bd710d8 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs @@ -11,6 +11,10 @@ public CreateMessageBatchOptions() { } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] public override string ToString() { throw null; } } + public enum LinkCloseMode + { + Detach = 0, + } public sealed partial class ProcessErrorEventArgs : System.EventArgs { public ProcessErrorEventArgs(System.Exception exception, Azure.Messaging.ServiceBus.ServiceBusErrorSource errorSource, string fullyQualifiedNamespace, string entityPath) { } @@ -67,7 +71,7 @@ public ServiceBusClient(string fullyQualifiedNamespace, Azure.Core.TokenCredenti public ServiceBusClient(string fullyQualifiedNamespace, Azure.Core.TokenCredential credential, Azure.Messaging.ServiceBus.ServiceBusClientOptions options) { } public ServiceBusClient(string connectionString, Azure.Messaging.ServiceBus.ServiceBusClientOptions options) { } public string FullyQualifiedNamespace { get { throw null; } } - public bool IsDisposed { get { throw null; } } + public bool IsClosed { get { throw null; } } public Azure.Messaging.ServiceBus.ServiceBusTransportType TransportType { get { throw null; } } public virtual Azure.Messaging.ServiceBus.ServiceBusProcessor CreateProcessor(string queueName) { throw null; } public virtual Azure.Messaging.ServiceBus.ServiceBusProcessor CreateProcessor(string queueName, Azure.Messaging.ServiceBus.ServiceBusProcessorOptions options) { throw null; } @@ -182,12 +186,13 @@ public static partial class ServiceBusModelFactory [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] public static Azure.Messaging.ServiceBus.Management.TopicProperties TopicProperties(string name, long maxSizeInMegabytes = (long)0, bool requiresDuplicateDetection = false, System.TimeSpan defaultMessageTimeToLive = default(System.TimeSpan), System.TimeSpan autoDeleteOnIdle = default(System.TimeSpan), System.TimeSpan duplicateDetectionHistoryTimeWindow = default(System.TimeSpan), bool enableBatchedOperations = false, Azure.Messaging.ServiceBus.Management.EntityStatus status = default(Azure.Messaging.ServiceBus.Management.EntityStatus)) { throw null; } } - public partial class ServiceBusProcessor + public partial class ServiceBusProcessor : System.IAsyncDisposable { protected ServiceBusProcessor() { } public bool AutoComplete { get { throw null; } } public string EntityPath { get { throw null; } } public string FullyQualifiedNamespace { get { throw null; } } + public bool IsClosed { get { throw null; } } public bool IsProcessing { get { throw null; } } public System.TimeSpan MaxAutoLockRenewalDuration { get { throw null; } } public int MaxConcurrentCalls { get { throw null; } } @@ -196,6 +201,8 @@ protected ServiceBusProcessor() { } public Azure.Messaging.ServiceBus.ReceiveMode ReceiveMode { get { throw null; } } public event System.Func ProcessErrorAsync { add { } remove { } } public event System.Func ProcessMessageAsync { add { } remove { } } + public virtual System.Threading.Tasks.Task CloseAsync(Azure.Messaging.ServiceBus.LinkCloseMode closeMode = Azure.Messaging.ServiceBus.LinkCloseMode.Detach, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public System.Threading.Tasks.ValueTask DisposeAsync() { throw null; } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] public override bool Equals(object obj) { throw null; } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] @@ -256,10 +263,11 @@ public partial class ServiceBusReceiver : System.IAsyncDisposable protected ServiceBusReceiver() { } public string EntityPath { get { throw null; } } public string FullyQualifiedNamespace { get { throw null; } } - public bool IsDisposed { get { throw null; } } + public bool IsClosed { get { throw null; } } public int PrefetchCount { get { throw null; } } public Azure.Messaging.ServiceBus.ReceiveMode ReceiveMode { get { throw null; } } public virtual System.Threading.Tasks.Task AbandonMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Collections.Generic.IDictionary propertiesToModify = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public virtual System.Threading.Tasks.Task CloseAsync(Azure.Messaging.ServiceBus.LinkCloseMode closeMode = Azure.Messaging.ServiceBus.LinkCloseMode.Detach, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public virtual System.Threading.Tasks.Task CompleteMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public virtual System.Threading.Tasks.Task DeadLetterMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Collections.Generic.IDictionary propertiesToModify = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public virtual System.Threading.Tasks.Task DeadLetterMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, string deadLetterReason, string deadLetterErrorDescription = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } @@ -325,10 +333,11 @@ public partial class ServiceBusSender : System.IAsyncDisposable protected ServiceBusSender() { } public string EntityPath { get { throw null; } } public string FullyQualifiedNamespace { get { throw null; } } - public bool IsDisposed { get { throw null; } } + public bool IsClosed { get { throw null; } } public string ViaEntityPath { get { throw null; } } public virtual System.Threading.Tasks.Task CancelScheduledMessageAsync(long sequenceNumber, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public virtual System.Threading.Tasks.Task CancelScheduledMessagesAsync(System.Collections.Generic.IEnumerable sequenceNumbers, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public virtual System.Threading.Tasks.Task CloseAsync(Azure.Messaging.ServiceBus.LinkCloseMode closeMode = Azure.Messaging.ServiceBus.LinkCloseMode.Detach, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public virtual System.Threading.Tasks.ValueTask CreateMessageBatchAsync(Azure.Messaging.ServiceBus.CreateMessageBatchOptions options, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public virtual System.Threading.Tasks.ValueTask CreateMessageBatchAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } public virtual System.Threading.Tasks.ValueTask DisposeAsync() { throw null; } @@ -355,12 +364,13 @@ public ServiceBusSenderOptions() { } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] public override string ToString() { throw null; } } - public partial class ServiceBusSessionProcessor + public partial class ServiceBusSessionProcessor : System.IAsyncDisposable { protected ServiceBusSessionProcessor() { } public bool AutoComplete { get { throw null; } } public string EntityPath { get { throw null; } } public string FullyQualifiedNamespace { get { throw null; } } + public bool IsClosed { get { throw null; } } public bool IsProcessing { get { throw null; } } public System.TimeSpan MaxAutoLockRenewalDuration { get { throw null; } } public int MaxConcurrentCallsPerSession { get { throw null; } } @@ -372,6 +382,8 @@ protected ServiceBusSessionProcessor() { } public event System.Func ProcessMessageAsync { add { } remove { } } public event System.Func SessionClosingAsync { add { } remove { } } public event System.Func SessionInitializingAsync { add { } remove { } } + public virtual System.Threading.Tasks.Task CloseAsync(Azure.Messaging.ServiceBus.LinkCloseMode closeMode = Azure.Messaging.ServiceBus.LinkCloseMode.Detach, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public System.Threading.Tasks.ValueTask DisposeAsync() { throw null; } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] public override bool Equals(object obj) { throw null; } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Client/ServiceBusClient.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Client/ServiceBusClient.cs index 6087018ae62f5..b35bb1bafc01e 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Client/ServiceBusClient.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Client/ServiceBusClient.cs @@ -32,13 +32,13 @@ public class ServiceBusClient : IAsyncDisposable public string FullyQualifiedNamespace => Connection.FullyQualifiedNamespace; /// - /// Indicates whether or not this has been disposed. + /// Indicates whether or not this has been closed. /// /// /// - /// true if the client is disposed; otherwise, false. + /// true if the client is closed; otherwise, false. /// - public bool IsDisposed { get; private set; } = false; + public bool IsClosed { get; private set; } = false; /// /// The transport type used for this . @@ -53,7 +53,6 @@ public class ServiceBusClient : IAsyncDisposable /// /// The instance of which can be mocked for testing. /// - /// internal ServiceBusEventSource Logger { get; set; } = ServiceBusEventSource.Log; /// @@ -70,20 +69,20 @@ public class ServiceBusClient : IAsyncDisposable [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Justification = "This signature must match the IAsyncDisposable interface.")] public virtual async ValueTask DisposeAsync() { - Logger.ClientDisposeStart(typeof(ServiceBusClient), Identifier); - IsDisposed = true; + Logger.ClientCloseStart(typeof(ServiceBusClient), Identifier); + IsClosed = true; try { await Connection.CloseAsync(CancellationToken.None).ConfigureAwait(false); } catch (Exception ex) { - Logger.ClientDisposeException(typeof(ServiceBusClient), Identifier, ex); + Logger.ClientCloseException(typeof(ServiceBusClient), Identifier, ex); throw; } finally { - Logger.ClientDisposeComplete(typeof(ServiceBusClient), Identifier); + Logger.ClientCloseComplete(typeof(ServiceBusClient), Identifier); } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs index 51d66405059db..ca8edd2001b24 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs @@ -105,9 +105,9 @@ internal ServiceBusEventSource() { } internal const int ManagementSerializedExceptionEvent = 42; internal const int RunOperationExceptionEvent = 43; - internal const int ClientDisposeStartEvent = 44; - internal const int ClientDisposeCompleteEvent = 45; - internal const int ClientDisposeExceptionEvent = 46; + internal const int ClientCloseStartEvent = 44; + internal const int ClientCloseCompleteEvent = 45; + internal const int ClientCloseExceptionEvent = 46; internal const int RenewSessionLockStartEvent = 47; internal const int RenewSessionLockCompleteEvent = 48; @@ -1185,31 +1185,31 @@ public virtual void ClientCreateExceptionCore( /// /// Indicates that a client is closing, which may correspond to /// a , , - /// or . + /// , or a . /// /// /// The type of client being closed. /// An identifier to associate with the client. /// [NonEvent] - public virtual void ClientDisposeStart( + public virtual void ClientCloseStart( Type clientType, string identifier) { if (IsEnabled()) { - ClientDisposeStartCore(clientType.Name, identifier ?? string.Empty); + ClientCloseStartCore(clientType.Name, identifier ?? string.Empty); } } - [Event(ClientDisposeStartEvent, Level = EventLevel.Verbose, Message = "Closing a {0} (Identifier '{1}').")] - public virtual void ClientDisposeStartCore( + [Event(ClientCloseStartEvent, Level = EventLevel.Verbose, Message = "Closing a {0} (Identifier '{1}').")] + public virtual void ClientCloseStartCore( string clientType, string identifier) { if (IsEnabled()) { - WriteEvent(ClientDisposeStartEvent, clientType, identifier); + WriteEvent(ClientCloseStartEvent, clientType, identifier); } } @@ -1222,24 +1222,24 @@ public virtual void ClientDisposeStartCore( /// An identifier to associate with the client. /// [NonEvent] - public virtual void ClientDisposeComplete( + public virtual void ClientCloseComplete( Type clientType, string identifier) { if (IsEnabled()) { - ClientDisposeCompleteCore(clientType.Name, identifier ?? string.Empty); + ClientCloseCompleteCore(clientType.Name, identifier ?? string.Empty); } } - [Event(ClientDisposeCompleteEvent, Level = EventLevel.Verbose, Message = "A {0} has been closed (Identifier '{1}').")] - public virtual void ClientDisposeCompleteCore( + [Event(ClientCloseCompleteEvent, Level = EventLevel.Verbose, Message = "A {0} has been closed (Identifier '{1}').")] + public virtual void ClientCloseCompleteCore( string clientType, string identifier) { if (IsEnabled()) { - WriteEvent(ClientDisposeCompleteEvent, clientType, identifier); + WriteEvent(ClientCloseCompleteEvent, clientType, identifier); } } @@ -1253,26 +1253,26 @@ public virtual void ClientDisposeCompleteCore( /// The message for the exception that occurred. /// [NonEvent] - public virtual void ClientDisposeException( + public virtual void ClientCloseException( Type clientType, string identifier, Exception exception) { if (IsEnabled()) { - ClientDisposeExceptionCore(clientType.Name, identifier ?? string.Empty, exception.ToString()); + ClientCloseExceptionCore(clientType.Name, identifier ?? string.Empty, exception.ToString()); } } - [Event(ClientDisposeExceptionEvent, Level = EventLevel.Error, Message = "An exception occurred while closing a {0} (Identifier '{1}'). Error Message: '{2}'")] - public virtual void ClientDisposeExceptionCore( + [Event(ClientCloseExceptionEvent, Level = EventLevel.Error, Message = "An exception occurred while closing a {0} (Identifier '{1}'). Error Message: '{2}'")] + public virtual void ClientCloseExceptionCore( string clientType, string identifier, string exception) { if (IsEnabled()) { - WriteEvent(ClientDisposeExceptionEvent, clientType, identifier, exception); + WriteEvent(ClientCloseExceptionEvent, clientType, identifier, exception); } } #endregion diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/LinkCloseMode.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/LinkCloseMode.cs new file mode 100644 index 0000000000000..c28d008fce7ef --- /dev/null +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/LinkCloseMode.cs @@ -0,0 +1,16 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +namespace Azure.Messaging.ServiceBus +{ + /// + /// The mode in which links will be closed. + /// + public enum LinkCloseMode + { + /// + /// The link will be detached when closing. + /// + Detach = 0 + } +} diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs index 156f76279361a..381f59368ea68 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs @@ -69,7 +69,8 @@ public ReceiverManager( } public virtual async Task CloseReceiverIfNeeded( - CancellationToken cancellationToken) + CancellationToken cancellationToken, + bool forceClose = false) { try { @@ -269,7 +270,7 @@ private async Task RenewMessageLock( TimeSpan delay = CalculateRenewDelay(message.LockedUntil); await Task.Delay(delay, cancellationToken).ConfigureAwait(false); - if (Receiver.IsDisposed) + if (Receiver.IsClosed) { break; } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs index 255e1af6374d1..f7789fab8bca5 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs @@ -16,15 +16,15 @@ namespace Azure.Messaging.ServiceBus { /// - /// The provides an abstraction around a set of that - /// allows using an event based model for processing received . It is constructed by calling - /// . + /// The provides an abstraction around a set of + /// that allows using an event based model for processing received . + /// It is constructed by calling . /// The message handler is specified with the /// property. The error handler is specified with the property. /// To start processing after the handlers have been specified, call . /// #pragma warning disable CA1001 // Types that own disposable fields should be disposable - public class ServiceBusProcessor + public class ServiceBusProcessor : IAsyncDisposable #pragma warning restore CA1001 // Types that own disposable fields should be disposable { private Func _processMessageAsync; @@ -145,6 +145,22 @@ public class ServiceBusProcessor internal int MaxConcurrentSessions { get; } internal int MaxConcurrentCallsPerSession { get; } + /// + /// Indicates whether or not this has been closed. + /// + /// + /// + /// true if the processor is closed; otherwise, false. + /// + public bool IsClosed + { + get => _closed; + private set => _closed = value; + } + + /// Indicates whether or not this instance has been closed. + private volatile bool _closed = false; + private readonly string[] _sessionIds; private readonly EntityScopeFactory _scopeFactory; private readonly IList _plugins; @@ -434,6 +450,7 @@ internal event Func SessionClosingAsync public virtual async Task StartProcessingAsync( CancellationToken cancellationToken = default) { + Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusProcessor)); cancellationToken.ThrowIfCancellationRequested(); bool releaseGuard = false; try @@ -486,6 +503,12 @@ public virtual async Task StartProcessingAsync( private void InitializeReceiverManagers() { + if (_receiverManagers.Count > 0) + { + // already initialized - this can happen if stopping and then restarting + return; + } + if (IsSessionProcessor) { var numReceivers = _sessionIds.Length > 0 ? _sessionIds.Length : MaxConcurrentSessions; @@ -559,8 +582,9 @@ private void ValidateMessageHandler() /// /// Signals the processor to stop processing messaging. Should this method be - /// called while the processor is not running, an - /// is thrown. + /// called while the processor is not running, no action is taken. This method + /// will not close the underlying receivers, but will cause the receivers to stop + /// receiving. To close the underlying receivers, should be called. /// /// /// A instance to @@ -599,17 +623,6 @@ public virtual async Task StopProcessingAsync(CancellationToken cancellationToke ActiveReceiveTask.Dispose(); ActiveReceiveTask = null; - - foreach (ReceiverManager receiverManager in _receiverManagers) - { - await receiverManager.CloseReceiverIfNeeded( - cancellationToken) - .ConfigureAwait(false); - } - } - else - { - throw new InvalidOperationException(Resources.MessageProcessorIsNotRunning); } } catch (Exception exception) @@ -707,5 +720,37 @@ internal void EnsureNotRunningAndInvoke(Action action) throw new InvalidOperationException(Resources.RunningMessageProcessorCannotPerformOperation); } } + + /// + /// Performs the task needed to clean up resources used by the . + /// + /// The mode indicating what should happen to the link when closing. + /// An optional instance to signal the + /// request to cancel the operation. + public virtual async Task CloseAsync( + LinkCloseMode closeMode = LinkCloseMode.Detach, + CancellationToken cancellationToken = default) + { + IsClosed = true; + + if (IsProcessing) + { + await StopProcessingAsync(cancellationToken).ConfigureAwait(false); + } + foreach (ReceiverManager receiverManager in _receiverManagers) + { + await receiverManager.CloseReceiverIfNeeded( + cancellationToken, + forceClose: true) + .ConfigureAwait(false); + } + } + + /// + /// Performs the task needed to clean up resources used by the . + /// This is equivalent to calling with the default . + /// + public async ValueTask DisposeAsync() => + await CloseAsync().ConfigureAwait(false); } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessor.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessor.cs index 86a9b5d3b3072..17bbc21805d5e 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessor.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessor.cs @@ -7,7 +7,10 @@ using System.Diagnostics.CodeAnalysis; using System.Threading; using System.Threading.Tasks; +using Azure.Core.Amqp; using Azure.Messaging.ServiceBus.Plugins; +using Microsoft.Azure.Amqp; +using Microsoft.Azure.Amqp.Framing; namespace Azure.Messaging.ServiceBus { @@ -19,7 +22,7 @@ namespace Azure.Messaging.ServiceBus /// property. The error handler is specified with the property. /// To start processing after the handlers have been specified, call . /// - public class ServiceBusSessionProcessor + public class ServiceBusSessionProcessor : IAsyncDisposable { private readonly ServiceBusProcessor _innerProcessor; @@ -44,6 +47,15 @@ public class ServiceBusSessionProcessor /// public bool AutoComplete => _innerProcessor.AutoComplete; + /// + /// Indicates whether or not this has been closed. + /// + /// + /// + /// true if the processor is closed; otherwise, false. + /// + public bool IsClosed => _innerProcessor.IsClosed; + /// /// Gets the maximum duration within which the session lock will be /// renewed automatically. @@ -209,5 +221,23 @@ public virtual async Task StopProcessingAsync(CancellationToken cancellationToke /// [EditorBrowsable(EditorBrowsableState.Never)] public override string ToString() => base.ToString(); + + /// + /// Performs the task needed to clean up resources used by the . + /// + /// The mode indicating what should happen to the link when closing. + /// An optional instance to signal the + /// request to cancel the operation. + public virtual async Task CloseAsync( + LinkCloseMode closeMode = LinkCloseMode.Detach, + CancellationToken cancellationToken = default) => + await _innerProcessor.CloseAsync(closeMode).ConfigureAwait(false); + + /// + /// Performs the task needed to clean up resources used by the . + /// This is equivalent to calling with the default . + /// + public async ValueTask DisposeAsync() => + await CloseAsync().ConfigureAwait(false); } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/SessionReceiverManager.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/SessionReceiverManager.cs index 67ed2563ffd0f..f1d7a9ede3d1e 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/SessionReceiverManager.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/SessionReceiverManager.cs @@ -163,8 +163,14 @@ private async Task CreateReceiver(CancellationToken processorCancellationToken) } public override async Task CloseReceiverIfNeeded( - CancellationToken processorCancellationToken) + CancellationToken processorCancellationToken, + bool forceClose = false) { + if (forceClose) + { + await CloseReceiver(processorCancellationToken).ConfigureAwait(false); + return; + } bool releaseSemaphore = false; try { @@ -177,7 +183,7 @@ public override async Task CloseReceiverIfNeeded( return; } _threadCount--; - if (_threadCount == 0) + if (_threadCount == 0 && !processorCancellationToken.IsCancellationRequested) { if (!_keepOpenOnReceiveTimeout || !AutoRenewLock || @@ -198,7 +204,7 @@ public override async Task CloseReceiverIfNeeded( private async Task CloseReceiver(CancellationToken cancellationToken) { - if (_receiver == null || _receiver.IsDisposed) + if (_receiver == null || _receiver.IsClosed) { return; } @@ -329,7 +335,7 @@ private async Task RenewSessionLock() TimeSpan delay = CalculateRenewDelay(_receiver.SessionLockedUntil); await Task.Delay(delay, sessionLockRenewalCancellationToken).ConfigureAwait(false); - if (_receiver.IsDisposed) + if (_receiver.IsClosed) { break; } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiver.cs index c9ac69c87c862..5cd8312a6531c 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiver.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiver.cs @@ -60,13 +60,20 @@ public class ServiceBusReceiver : IAsyncDisposable internal string Identifier { get; } /// - /// Indicates whether or not this has been disposed. + /// Indicates whether or not this has been closed. /// /// /// - /// true if the client is disposed; otherwise, false. + /// true if the receiver is closed; otherwise, false. /// - public bool IsDisposed { get; private set; } = false; + public bool IsClosed + { + get => _closed; + private set => _closed = value; + } + + /// Indicates whether or not this instance has been closed. + private volatile bool _closed = false; /// /// The policy to use for determining retry behavior for when an operation fails. @@ -184,6 +191,33 @@ internal ServiceBusReceiver( /// protected ServiceBusReceiver() { } + /// + /// Performs the task needed to clean up resources used by the . + /// + /// The mode indicating what should happen to the link when closing. + /// An optional instance to signal the + /// request to cancel the operation. + public virtual async Task CloseAsync( + LinkCloseMode closeMode = LinkCloseMode.Detach, + CancellationToken cancellationToken = default) + { + IsClosed = true; + Type clientType = GetType(); + + Logger.ClientCloseStart(clientType, Identifier); + try + { + await InnerReceiver.CloseAsync(CancellationToken.None).ConfigureAwait(false); + } + catch (Exception ex) + { + Logger.ClientCloseException(clientType, Identifier, ex); + throw; + } + + Logger.ClientCloseComplete(clientType, Identifier); + } + /// /// Receives a list of from the entity using mode. /// defaults to PeekLock mode. @@ -203,7 +237,7 @@ public virtual async Task> ReceiveMessa CancellationToken cancellationToken = default) { Argument.AssertAtLeast(maxMessages, 1, nameof(maxMessages)); - Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusReceiver)); + Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusReceiver)); if (maxWaitTime.HasValue) { Argument.AssertPositive(maxWaitTime.Value, nameof(maxWaitTime)); @@ -388,7 +422,7 @@ private async Task> PeekMessagesInterna int maxMessages, CancellationToken cancellationToken) { - Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusReceiver)); + Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusReceiver)); cancellationToken.ThrowIfCancellationRequested(); Logger.PeekMessageStart(Identifier, sequenceNumber, maxMessages); using DiagnosticScope scope = ScopeFactory.CreateScope( @@ -465,7 +499,7 @@ internal virtual async Task CompleteMessageAsync( CancellationToken cancellationToken = default) { ThrowIfLockTokenIsEmpty(lockToken); - Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusReceiver)); + Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusReceiver)); Argument.AssertNotNullOrEmpty(lockToken, nameof(lockToken)); ThrowIfNotPeekLockMode(); cancellationToken.ThrowIfCancellationRequested(); @@ -542,7 +576,7 @@ internal virtual async Task AbandonMessageAsync( CancellationToken cancellationToken = default) { ThrowIfLockTokenIsEmpty(lockToken); - Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusReceiver)); + Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusReceiver)); ThrowIfNotPeekLockMode(); cancellationToken.ThrowIfCancellationRequested(); Logger.AbandonMessageStart(Identifier, 1, lockToken); @@ -703,7 +737,7 @@ private async Task DeadLetterInternalAsync( CancellationToken cancellationToken = default) { ThrowIfLockTokenIsEmpty(lockToken); - Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusReceiver)); + Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusReceiver)); cancellationToken.ThrowIfCancellationRequested(); ThrowIfNotPeekLockMode(); Logger.DeadLetterMessageStart(Identifier, 1, lockToken); @@ -783,7 +817,7 @@ internal virtual async Task DeferMessageAsync( CancellationToken cancellationToken = default) { ThrowIfLockTokenIsEmpty(lockToken); - Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusReceiver)); + Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusReceiver)); ThrowIfNotPeekLockMode(); cancellationToken.ThrowIfCancellationRequested(); Logger.DeferMessageStart(Identifier, 1, lockToken); @@ -863,7 +897,7 @@ public virtual async Task> ReceiveDefer IEnumerable sequenceNumbers, CancellationToken cancellationToken = default) { - Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusReceiver)); + Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusReceiver)); Argument.AssertNotNullOrEmpty(sequenceNumbers, nameof(sequenceNumbers)); cancellationToken.ThrowIfCancellationRequested(); var sequenceNumbersList = sequenceNumbers.ToList(); @@ -936,7 +970,7 @@ internal virtual async Task RenewMessageLockAsync( CancellationToken cancellationToken = default) { ThrowIfLockTokenIsEmpty(lockToken); - Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusReceiver)); + Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusReceiver)); ThrowIfNotPeekLockMode(); ThrowIfSessionReceiver(); cancellationToken.ThrowIfCancellationRequested(); @@ -978,28 +1012,13 @@ private void ThrowIfSessionReceiver() /// /// Performs the task needed to clean up resources used by the . + /// This is equivalent to calling with the default . /// /// /// A task to be resolved on when the operation has completed. [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Justification = "This signature must match the IAsyncDisposable interface.")] - public virtual async ValueTask DisposeAsync() - { - IsDisposed = true; - Type clientType = GetType(); - - Logger.ClientDisposeStart(clientType, Identifier); - try - { - await InnerReceiver.CloseAsync(CancellationToken.None).ConfigureAwait(false); - } - catch (Exception ex) - { - Logger.ClientDisposeException(clientType, Identifier, ex); - throw; - } - - Logger.ClientDisposeComplete(clientType, Identifier); - } + public virtual async ValueTask DisposeAsync() => + await CloseAsync().ConfigureAwait(false); /// /// Determines whether the specified is equal to this instance. diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusSessionReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusSessionReceiver.cs index 259574b7209d6..2bf944418c26a 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusSessionReceiver.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusSessionReceiver.cs @@ -98,7 +98,7 @@ protected ServiceBusSessionReceiver() : base() { } /// The session state as byte array. public virtual async Task GetSessionStateAsync(CancellationToken cancellationToken = default) { - Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusSessionReceiver)); + Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusSessionReceiver)); cancellationToken.ThrowIfCancellationRequested(); Logger.GetSessionStateStart(Identifier, SessionId); using DiagnosticScope scope = ScopeFactory.CreateScope( @@ -137,7 +137,7 @@ public virtual async Task SetSessionStateAsync( byte[] sessionState, CancellationToken cancellationToken = default) { - Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusSessionReceiver)); + Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusSessionReceiver)); cancellationToken.ThrowIfCancellationRequested(); Logger.SetSessionStateStart(Identifier, SessionId); using DiagnosticScope scope = ScopeFactory.CreateScope( @@ -177,7 +177,7 @@ public virtual async Task SetSessionStateAsync( /// public virtual async Task RenewSessionLockAsync(CancellationToken cancellationToken = default) { - Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusSessionReceiver)); + Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusSessionReceiver)); cancellationToken.ThrowIfCancellationRequested(); Logger.RenewSessionLockStart(Identifier, SessionId); using DiagnosticScope scope = ScopeFactory.CreateScope( diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/RuleManager/ServiceBusRuleManager.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/RuleManager/ServiceBusRuleManager.cs index 42089d9a05b14..790a0205fc2ad 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/RuleManager/ServiceBusRuleManager.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/RuleManager/ServiceBusRuleManager.cs @@ -214,18 +214,18 @@ public virtual async ValueTask DisposeAsync() { IsDisposed = true; - ServiceBusEventSource.Log.ClientDisposeStart(typeof(ServiceBusRuleManager), Identifier); + ServiceBusEventSource.Log.ClientCloseStart(typeof(ServiceBusRuleManager), Identifier); try { await InnerRuleManager.CloseAsync(CancellationToken.None).ConfigureAwait(false); } catch (Exception ex) { - ServiceBusEventSource.Log.ClientDisposeException(typeof(ServiceBusRuleManager), Identifier, ex); + ServiceBusEventSource.Log.ClientCloseException(typeof(ServiceBusRuleManager), Identifier, ex); throw; } - ServiceBusEventSource.Log.ClientDisposeComplete(typeof(ServiceBusRuleManager), Identifier); + ServiceBusEventSource.Log.ClientCloseComplete(typeof(ServiceBusRuleManager), Identifier); } /// diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs index 0864a522e0009..6d8334234deba 100755 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs @@ -43,14 +43,20 @@ public class ServiceBusSender : IAsyncDisposable public string EntityPath { get; } /// - /// Indicates whether or not this has been disposed. + /// Indicates whether or not this has been closed. /// /// /// - /// true if the client is disposed; otherwise, false. + /// true if the sender is closed; otherwise, false. /// - /// - public bool IsDisposed { get; private set; } = false; + public bool IsClosed + { + get => _closed; + private set => _closed = value; + } + + /// Indicates whether or not this instance has been closed. + private volatile bool _closed = false; /// /// The instance of which can be mocked for testing. @@ -180,7 +186,7 @@ public virtual async Task SendMessagesAsync( CancellationToken cancellationToken = default) { Argument.AssertNotNull(messages, nameof(messages)); - Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusSender)); + Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusSender)); IList messageList = messages.ToList(); if (messageList.Count == 0) { @@ -308,7 +314,7 @@ public virtual async ValueTask CreateMessageBatchAsync( CreateMessageBatchOptions options, CancellationToken cancellationToken = default) { - Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusSender)); + Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusSender)); options = options?.Clone() ?? new CreateMessageBatchOptions(); cancellationToken.ThrowIfCancellationRequested(); Logger.CreateMessageBatchStart(Identifier); @@ -343,7 +349,7 @@ public virtual async Task SendMessagesAsync( CancellationToken cancellationToken = default) { Argument.AssertNotNull(messageBatch, nameof(messageBatch)); - Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusSender)); + Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusSender)); cancellationToken.ThrowIfCancellationRequested(); Logger.SendMessageStart(Identifier, messageBatch.Count); using DiagnosticScope scope = CreateDiagnosticScope( @@ -425,7 +431,7 @@ public virtual async Task ScheduleMessagesAsync( CancellationToken cancellationToken = default) { Argument.AssertNotNullOrEmpty(messages, nameof(messages)); - Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusSender)); + Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusSender)); cancellationToken.ThrowIfCancellationRequested(); var messageList = messages.ToList(); await ApplyPlugins(messageList).ConfigureAwait(false); @@ -483,7 +489,7 @@ public virtual async Task CancelScheduledMessagesAsync( IEnumerable sequenceNumbers, CancellationToken cancellationToken = default) { - Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusSender)); + Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusSender)); cancellationToken.ThrowIfCancellationRequested(); var sequenceNumberList = sequenceNumbers.ToArray(); Logger.CancelScheduledMessagesStart(Identifier, sequenceNumberList); @@ -510,15 +516,16 @@ public virtual async Task CancelScheduledMessagesAsync( /// /// Performs the task needed to clean up resources used by the . /// - /// - /// A task to be resolved on when the operation has completed. - /// - [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Justification = "This signature must match the IAsyncDisposable interface.")] - public virtual async ValueTask DisposeAsync() + /// The mode indicating what should happen to the link when closing. + /// An optional instance to signal the + /// request to cancel the operation. + public virtual async Task CloseAsync( + LinkCloseMode closeMode = LinkCloseMode.Detach, + CancellationToken cancellationToken = default) { - IsDisposed = true; + IsClosed = true; - Logger.ClientDisposeStart(typeof(ServiceBusSender), Identifier); + Logger.ClientCloseStart(typeof(ServiceBusSender), Identifier); try { @@ -526,13 +533,23 @@ public virtual async ValueTask DisposeAsync() } catch (Exception ex) { - Logger.ClientDisposeException(typeof(ServiceBusSender), Identifier, ex); + Logger.ClientCloseException(typeof(ServiceBusSender), Identifier, ex); throw; } - Logger.ClientDisposeComplete(typeof(ServiceBusSender), Identifier); + Logger.ClientCloseComplete(typeof(ServiceBusSender), Identifier); } + /// + /// Performs the task needed to clean up resources used by the . + /// This is equivalent to calling with the default . + /// + /// + /// A task to be resolved on when the operation has completed. + [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Justification = "This signature must match the IAsyncDisposable interface.")] + public virtual async ValueTask DisposeAsync() => + await CloseAsync().ConfigureAwait(false); + /// /// Determines whether the specified is equal to this instance. /// diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Client/ServiceBusClientLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Client/ServiceBusClientLiveTests.cs index 9c1ada3a6bba4..ed837a3951d59 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Client/ServiceBusClientLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Client/ServiceBusClientLiveTests.cs @@ -38,7 +38,7 @@ public async Task GetChildClientFromClosedParentClientThrows(bool useSessions) Assert.AreEqual(message.Body.ToString(), receivedMessage.Body.ToString()); await client.DisposeAsync(); - Assert.IsTrue(client.IsDisposed); + Assert.IsTrue(client.IsClosed); if (!useSessions) { Assert.Throws(() => client.CreateReceiver(scope.QueueName)); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Client/ServiceBusClientTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Client/ServiceBusClientTests.cs index 4bd25e8513e90..83b2d8a0c17ce 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Client/ServiceBusClientTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Client/ServiceBusClientTests.cs @@ -240,7 +240,7 @@ public void ValidateClientProperties() ServiceBusClient client = new ServiceBusClient(fakeConnection); Assert.AreEqual("not-real.servicebus.windows.net", client.FullyQualifiedNamespace); Assert.IsNotNull(client.Identifier); - Assert.IsFalse(client.IsDisposed); + Assert.IsFalse(client.IsClosed); } /// diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceLiveTests.cs index a4f2ab05b6b98..1ca9abef50d7f 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceLiveTests.cs @@ -103,8 +103,8 @@ public async Task LogsEvents() _listener.SingleEventById(ServiceBusEventSource.CancelScheduledMessageCompleteEvent, e => e.Payload.Contains(sender.Identifier)); await receiver.DisposeAsync(); - _listener.SingleEventById(ServiceBusEventSource.ClientDisposeStartEvent, e => e.Payload.Contains(nameof(ServiceBusReceiver)) && e.Payload.Contains(receiver.Identifier)); - _listener.SingleEventById(ServiceBusEventSource.ClientDisposeCompleteEvent, e => e.Payload.Contains(nameof(ServiceBusReceiver)) && e.Payload.Contains(receiver.Identifier)); + _listener.SingleEventById(ServiceBusEventSource.ClientCloseStartEvent, e => e.Payload.Contains(nameof(ServiceBusReceiver)) && e.Payload.Contains(receiver.Identifier)); + _listener.SingleEventById(ServiceBusEventSource.ClientCloseCompleteEvent, e => e.Payload.Contains(nameof(ServiceBusReceiver)) && e.Payload.Contains(receiver.Identifier)); // link closed event is fired asynchronously, so add a small delay await Task.Delay(TimeSpan.FromSeconds(5)); _listener.SingleEventById(ServiceBusEventSource.ReceiveLinkClosedEvent, e => e.Payload.Contains(receiver.Identifier)); @@ -116,12 +116,12 @@ public async Task LogsEvents() _listener.SingleEventById(ServiceBusEventSource.MaxMessagesExceedsPrefetchEvent, e => e.Payload.Contains(receiver.Identifier)); await sender.DisposeAsync(); - _listener.SingleEventById(ServiceBusEventSource.ClientDisposeStartEvent, e => e.Payload.Contains(nameof(ServiceBusSender)) && e.Payload.Contains(sender.Identifier)); - _listener.SingleEventById(ServiceBusEventSource.ClientDisposeCompleteEvent, e => e.Payload.Contains(nameof(ServiceBusSender)) && e.Payload.Contains(sender.Identifier)); + _listener.SingleEventById(ServiceBusEventSource.ClientCloseStartEvent, e => e.Payload.Contains(nameof(ServiceBusSender)) && e.Payload.Contains(sender.Identifier)); + _listener.SingleEventById(ServiceBusEventSource.ClientCloseCompleteEvent, e => e.Payload.Contains(nameof(ServiceBusSender)) && e.Payload.Contains(sender.Identifier)); await client.DisposeAsync(); - _listener.SingleEventById(ServiceBusEventSource.ClientDisposeStartEvent, e => e.Payload.Contains(nameof(ServiceBusClient)) && e.Payload.Contains(client.Identifier)); - _listener.SingleEventById(ServiceBusEventSource.ClientDisposeCompleteEvent, e => e.Payload.Contains(nameof(ServiceBusClient)) && e.Payload.Contains(client.Identifier)); + _listener.SingleEventById(ServiceBusEventSource.ClientCloseStartEvent, e => e.Payload.Contains(nameof(ServiceBusClient)) && e.Payload.Contains(client.Identifier)); + _listener.SingleEventById(ServiceBusEventSource.ClientCloseCompleteEvent, e => e.Payload.Contains(nameof(ServiceBusClient)) && e.Payload.Contains(client.Identifier)); } } @@ -176,23 +176,23 @@ public async Task LogsSessionEvents() _listener.SingleEventById(ServiceBusEventSource.PeekMessageCompleteEvent, e => e.Payload.Contains(sessionReceiver.Identifier)); await receiver.DisposeAsync(); - _listener.SingleEventById(ServiceBusEventSource.ClientDisposeStartEvent, e => e.Payload.Contains(nameof(ServiceBusReceiver)) && e.Payload.Contains(receiver.Identifier)); - _listener.SingleEventById(ServiceBusEventSource.ClientDisposeCompleteEvent, e => e.Payload.Contains(nameof(ServiceBusReceiver)) && e.Payload.Contains(receiver.Identifier)); + _listener.SingleEventById(ServiceBusEventSource.ClientCloseStartEvent, e => e.Payload.Contains(nameof(ServiceBusReceiver)) && e.Payload.Contains(receiver.Identifier)); + _listener.SingleEventById(ServiceBusEventSource.ClientCloseCompleteEvent, e => e.Payload.Contains(nameof(ServiceBusReceiver)) && e.Payload.Contains(receiver.Identifier)); await sessionReceiver.DisposeAsync(); - _listener.SingleEventById(ServiceBusEventSource.ClientDisposeStartEvent, e => e.Payload.Contains(nameof(ServiceBusSessionReceiver)) && e.Payload.Contains(sessionReceiver.Identifier)); - _listener.SingleEventById(ServiceBusEventSource.ClientDisposeCompleteEvent, e => e.Payload.Contains(nameof(ServiceBusSessionReceiver)) && e.Payload.Contains(sessionReceiver.Identifier)); + _listener.SingleEventById(ServiceBusEventSource.ClientCloseStartEvent, e => e.Payload.Contains(nameof(ServiceBusSessionReceiver)) && e.Payload.Contains(sessionReceiver.Identifier)); + _listener.SingleEventById(ServiceBusEventSource.ClientCloseCompleteEvent, e => e.Payload.Contains(nameof(ServiceBusSessionReceiver)) && e.Payload.Contains(sessionReceiver.Identifier)); await Task.Delay(TimeSpan.FromSeconds(2)); _listener.SingleEventById(ServiceBusEventSource.ReceiveLinkClosedEvent, e => e.Payload.Contains(sessionReceiver.Identifier) && e.Payload.Contains(sessionReceiver.SessionId)); await sender.DisposeAsync(); - _listener.SingleEventById(ServiceBusEventSource.ClientDisposeStartEvent, e => e.Payload.Contains(nameof(ServiceBusSender)) && e.Payload.Contains(sender.Identifier)); - _listener.SingleEventById(ServiceBusEventSource.ClientDisposeCompleteEvent, e => e.Payload.Contains(nameof(ServiceBusSender)) && e.Payload.Contains(sender.Identifier)); + _listener.SingleEventById(ServiceBusEventSource.ClientCloseStartEvent, e => e.Payload.Contains(nameof(ServiceBusSender)) && e.Payload.Contains(sender.Identifier)); + _listener.SingleEventById(ServiceBusEventSource.ClientCloseCompleteEvent, e => e.Payload.Contains(nameof(ServiceBusSender)) && e.Payload.Contains(sender.Identifier)); await client.DisposeAsync(); - _listener.SingleEventById(ServiceBusEventSource.ClientDisposeStartEvent, e => e.Payload.Contains(nameof(ServiceBusClient)) && e.Payload.Contains(client.Identifier)); - _listener.SingleEventById(ServiceBusEventSource.ClientDisposeCompleteEvent, e => e.Payload.Contains(nameof(ServiceBusClient)) && e.Payload.Contains(client.Identifier)); + _listener.SingleEventById(ServiceBusEventSource.ClientCloseStartEvent, e => e.Payload.Contains(nameof(ServiceBusClient)) && e.Payload.Contains(client.Identifier)); + _listener.SingleEventById(ServiceBusEventSource.ClientCloseCompleteEvent, e => e.Payload.Contains(nameof(ServiceBusClient)) && e.Payload.Contains(client.Identifier)); } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceTests.cs index d154665b0c746..ed0a9acf3b373 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceTests.cs @@ -1266,62 +1266,6 @@ public void StartProcessingExceptionLogsEvents() Times.Once); } - [Test] - public async Task StopProcessingExceptionLogsEvents() - { - var mockLogger = new Mock(); - var mockTransportReceiver = new Mock(); - var mockConnection = GetMockConnection(mockTransportReceiver); - mockTransportReceiver.Setup( - transportReceiver => transportReceiver.ReceiveMessagesAsync( - 1, - It.IsAny(), - It.IsAny())) - .Returns(Task.FromResult((IReadOnlyList) - new List - { - new ServiceBusReceivedMessage - { - LockTokenGuid = Guid.NewGuid() - } - })); - var processor = new ServiceBusProcessor(mockConnection.Object, "queueName", false, new ServiceBusPlugin[] { }, new ServiceBusProcessorOptions - { - AutoComplete = false, - MaxAutoLockRenewalDuration = TimeSpan.Zero - }) - { - Logger = mockLogger.Object - }; - processor.ProcessErrorAsync += ExceptionHandler; - processor.ProcessMessageAsync += MessageHandler; - - async Task MessageHandler(ProcessMessageEventArgs arg) - { - // simulate IO - await Task.Delay(1000); - } - - await processor.StartProcessingAsync(); - await processor.StopProcessingAsync(); - - Assert.That( - async () => await processor.StopProcessingAsync(), - Throws.InstanceOf()); - - mockLogger - .Verify( - log => log.StopProcessingStart( - processor.Identifier), - Times.Once); - mockLogger - .Verify( - log => log.StopProcessingException( - processor.Identifier, - It.IsAny()), - Times.Once); - } - private Mock GetMockConnection(Mock mockTransportReceiver) { var mockConnection = new Mock(); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusLiveTestBase.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusLiveTestBase.cs index 208c7b1324b0b..6c87ed034ad2b 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusLiveTestBase.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusLiveTestBase.cs @@ -29,7 +29,7 @@ protected ServiceBusClient GetNoRetryClient() options); } - protected ServiceBusClient GetClient(int tryTimeout = 10) + protected ServiceBusClient GetClient(int tryTimeout = 15) { var retryOptions = new ServiceBusRetryOptions(); if (tryTimeout != default) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs index c8a4445419522..ec92b55b87c05 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs @@ -412,7 +412,11 @@ Task ProcessErrors(ProcessErrorEventArgs args) await processor.StartProcessingAsync(); await taskCompletionSource.Task; Assert.True(exceptionReceivedHandlerCalled); - await processor.StopProcessingAsync(); + await processor.CloseAsync(); + + Assert.That( + async () => await processor.StartProcessingAsync(), + Throws.InstanceOf()); } [Test] @@ -439,9 +443,7 @@ public void StartStopMultipleTimes() processor.StopProcessingAsync(), processor.StopProcessingAsync() }; - Assert.That( - async () => await Task.WhenAll(stopTasks), - Throws.InstanceOf()); + Assert.DoesNotThrowAsync(async () => await Task.WhenAll(stopTasks)); } [Test] diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs index 1a399392de05f..4014bc2c35e9f 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs @@ -146,6 +146,8 @@ public void ProcessorOptionsSetOnClient() Assert.AreEqual(options.MaxAutoLockRenewalDuration, processor.MaxAutoLockRenewalDuration); Assert.AreEqual(fullyQualifiedNamespace, processor.FullyQualifiedNamespace); Assert.AreEqual(options.MaxReceiveWaitTime, processor.MaxReceiveWaitTime); + Assert.IsFalse(processor.IsClosed); + Assert.IsFalse(processor.IsProcessing); } [Test] diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs index 3910b724584c2..379053e52ca8a 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs @@ -960,7 +960,7 @@ async Task ProcessMessage(ProcessSessionMessageEventArgs args) } } await tcs.Task; - await processor.StopProcessingAsync(); + await processor.CloseAsync(); Assert.AreEqual(specifiedSessionCount, messageCt); @@ -1109,7 +1109,7 @@ async Task ProcessMessage(ProcessSessionMessageEventArgs args) } } await tcs.Task; - await processor.StopProcessingAsync(); + await processor.CloseAsync(); foreach (var sessionId in sessionIds) { Assert.True(receivedMessagesAfterLockLost.ContainsKey(sessionId)); @@ -1486,7 +1486,7 @@ async Task ProcessMessage(ProcessSessionMessageEventArgs args) [TestCase(10, 10, 1)] [TestCase(10, 5, 2)] [TestCase(10, 20, 5)] - [Timeout(60 * 1000 * 10)] + [Timeout(60 * 1000 * 15)] public async Task MaxCallsPerSessionRespected(int numSessions, int maxConcurrentSessions, int maxCallsPerSession) { await using (var scope = await ServiceBusScope.CreateWithQueue( @@ -1571,5 +1571,68 @@ async Task ProcessMessage(ProcessSessionMessageEventArgs args) Assert.AreEqual(totalMessages, messageCt); } } + + [Test] + public async Task StopProcessingDoesNotCloseLink() + { + await using (var scope = await ServiceBusScope.CreateWithQueue( + enablePartitioning: false, + enableSession: true)) + { + await using var client = GetClient(); + var sender = client.CreateSender(scope.QueueName); + await sender.SendMessageAsync(GetMessage("sessionId")); + var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions + { + AutoComplete = false, + MaxConcurrentSessions = 1 + }); + TaskCompletionSource tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + Task ProcessMessage(ProcessSessionMessageEventArgs args) + { + tcs.TrySetResult(true); + return Task.CompletedTask; + } + processor.ProcessMessageAsync += ProcessMessage; + processor.ProcessErrorAsync += ExceptionHandler; + + await processor.StartProcessingAsync(); + await tcs.Task; + await processor.StopProcessingAsync(); + Assert.IsFalse(processor.IsClosed); + Assert.IsFalse(processor.IsProcessing); + + Assert.That( + async () => await GetNoRetryClient().CreateSessionReceiverAsync( + scope.QueueName, + new ServiceBusSessionReceiverOptions { SessionId = "sessionId" }), + Throws.InstanceOf().And.Property(nameof(ServiceBusException.Reason)). + EqualTo(ServiceBusFailureReason.SessionCannotBeLocked)); + + // can restart + tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + await sender.SendMessageAsync(GetMessage("sessionId")); + await processor.StartProcessingAsync(); + Assert.IsTrue(processor.IsProcessing); + + await tcs.Task; + + // dispose will close the link, so we should be able to receive from this session + await processor.DisposeAsync(); + Assert.IsTrue(processor.IsClosed); + await sender.SendMessageAsync(GetMessage("sessionId")); + var receiver = await client.CreateSessionReceiverAsync(scope.QueueName, new ServiceBusSessionReceiverOptions + { + SessionId = "sessionId" + }); + var msg = await receiver.ReceiveMessageAsync(); + Assert.IsNotNull(msg); + + Assert.That( + async () => await processor.StartProcessingAsync(), + Throws.InstanceOf()); + } + } } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs index 754f23ad48d82..6412eea9d54ce 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs @@ -160,6 +160,8 @@ public void ProcessorOptionsSetOnClient() Assert.AreEqual(options.ReceiveMode, processor.ReceiveMode); Assert.AreEqual(options.MaxAutoLockRenewalDuration, processor.MaxAutoLockRenewalDuration); Assert.AreEqual(fullyQualifiedNamespace, processor.FullyQualifiedNamespace); + Assert.IsFalse(processor.IsClosed); + Assert.IsFalse(processor.IsProcessing); } [Test]