Skip to content

Commit

Permalink
Continue renewing lock while processing (#22257)
Browse files Browse the repository at this point in the history
* Continue renewing lock while processing

* Fix transaction tests

* Fix docs for SBClient
  • Loading branch information
JoshLove-msft authored Jun 29, 2021
1 parent e0623b2 commit f553697
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public virtual bool IsClosed
internal IList<ServiceBusPlugin> Plugins { get; set; } = new List<ServiceBusPlugin>();

/// <summary>
/// Performs the task needed to clean up resources used by the <see cref="ServiceBusConnection" />,
/// including ensuring that the connection itself has been closed.
/// Performs the task needed to clean up resources used by the <see cref="ServiceBusClient" />,
/// including ensuring that the client itself has been closed.
/// </summary>
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private async Task ProcessOneMessage(
Receiver.ReceiveMode == ServiceBusReceiveMode.PeekLock &&
AutoRenewLock)
{
renewLockCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
renewLockCancellationTokenSource = new CancellationTokenSource();
renewLock = RenewMessageLock(
message,
renewLockCancellationTokenSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ internal class SessionReceiverManager : ReceiverManager
private ServiceBusSessionReceiver _receiver;
private CancellationTokenSource _sessionLockRenewalCancellationSource;
private Task _sessionLockRenewalTask;
private CancellationTokenSource _sessionCancellationSource = new CancellationTokenSource();
private CancellationTokenSource _sessionCancellationSource;
private bool _receiveTimeout;

protected override ServiceBusReceiver Receiver => _receiver;
Expand Down Expand Up @@ -113,7 +113,7 @@ private async Task CreateAndInitializeSessionReceiver(
CancellationToken processorCancellationToken)
{
await CreateReceiver(processorCancellationToken).ConfigureAwait(false);
_sessionCancellationSource = CancellationTokenSource.CreateLinkedTokenSource(processorCancellationToken);
_sessionCancellationSource = new CancellationTokenSource();

if (AutoRenewLock)
{
Expand Down Expand Up @@ -180,11 +180,17 @@ public override async Task CloseReceiverIfNeeded(
return;
}
_threadCount--;
if (_threadCount == 0 && !processorCancellationToken.IsCancellationRequested)

if (_threadCount == 0)
{
// Even if there are no current receive tasks, we should leave the
// receiver open if _keepOpenOnReceiveTimeout is true - which happens
// when a list of session Ids is specified and this list is less than the
// MaxConcurrentSessions.
if ((_receiveTimeout && !_keepOpenOnReceiveTimeout) ||
!AutoRenewLock ||
_sessionLockRenewalCancellationSource.IsCancellationRequested)
// if the session is cancelled we should still close the receiver
// as this means the session lock was lost.
_sessionCancellationSource.IsCancellationRequested)
{
await CloseReceiver(processorCancellationToken).ConfigureAwait(false);
}
Expand Down Expand Up @@ -257,15 +263,17 @@ public override async Task ReceiveAndProcessMessagesAsync(CancellationToken proc
// so simply return and allow this to be tried again on next thread
return;
}

using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(processorCancellationToken, _sessionCancellationSource.Token);
// loop within the context of this thread
while (!_sessionCancellationSource.Token.IsCancellationRequested)
while (!linkedTokenSource.Token.IsCancellationRequested)
{
errorSource = ServiceBusErrorSource.Receive;
IReadOnlyList<ServiceBusReceivedMessage> messages = await Receiver.ReceiveMessagesAsync(
maxMessages: 1,
maxWaitTime: _maxReceiveWaitTime,
isProcessor: true,
cancellationToken: _sessionCancellationSource.Token).ConfigureAwait(false);
cancellationToken: linkedTokenSource.Token).ConfigureAwait(false);
ServiceBusReceivedMessage message = messages.Count == 0 ? null : messages[0];
if (message == null)
{
Expand All @@ -277,21 +285,21 @@ public override async Task ReceiveAndProcessMessagesAsync(CancellationToken proc
await ProcessOneMessageWithinScopeAsync(
message,
DiagnosticProperty.ProcessSessionMessageActivityName,
_sessionCancellationSource.Token).ConfigureAwait(false);
linkedTokenSource.Token).ConfigureAwait(false);
}
}
catch (Exception ex)
when (!(ex is TaskCanceledException) ||
// If the user manually throws a TCE, then we should log it.
(!_sessionCancellationSource.IsCancellationRequested &&
// Even though the _sessionCancellationSource is linked to processorCancellationToken,
// we need to check both here in case the processor token gets cancelled before the
// session token is linked.
!processorCancellationToken.IsCancellationRequested))
!processorCancellationToken.IsCancellationRequested))
{
if (ex is ServiceBusException sbException && sbException.ProcessorErrorSource.HasValue)
if (ex is ServiceBusException sbException)
{
errorSource = sbException.ProcessorErrorSource.Value;
if (sbException.ProcessorErrorSource.HasValue)
{
errorSource = sbException.ProcessorErrorSource.Value;
}

// Signal cancellation so user event handlers can stop whatever processing they are doing
// as soon as we know the session lock has been lost. Note, we don't have analogous handling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -829,5 +829,44 @@ Task ProcessMessage(ProcessMessageEventArgs args)
await processor.StopProcessingAsync();
}
}

[Test]
public async Task AutoLockRenewalContinuesUntilProcessingCompletes()
{
var lockDuration = TimeSpan.FromSeconds(10);
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false, lockDuration: lockDuration))
{
await using var client = CreateClient();
var sender = client.CreateSender(scope.QueueName);
int messageCount = 10;

await sender.SendMessagesAsync(GetMessages(messageCount));

await using var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions
{
MaxConcurrentCalls = 10
});

int receivedCount = 0;
var tcs = new TaskCompletionSource<bool>();

async Task ProcessMessage(ProcessMessageEventArgs args)
{
var ct = Interlocked.Increment(ref receivedCount);
if (ct == messageCount)
{
tcs.SetResult(true);
}

await Task.Delay(lockDuration.Add(lockDuration));
}
processor.ProcessMessageAsync += ProcessMessage;
processor.ProcessErrorAsync += ExceptionHandler;

await processor.StartProcessingAsync();
await tcs.Task;
await processor.StopProcessingAsync();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ Task ExceptionHandler(ProcessErrorEventArgs args)

await processor.StartProcessingAsync();
await tcs.Task;
await processor.StopProcessingAsync();
await processor.CloseAsync();

if (settleMethod == "" || settleMethod == "Abandon")
{
Expand Down Expand Up @@ -1291,14 +1291,7 @@ public async Task ErrorSourceRespected(ServiceBusErrorSource errorSource)

Task CloseSessionHandler(ProcessSessionEventArgs arg)
{
try
{
throw new TestException();
}
finally
{
tcs.TrySetResult(true);
}
throw new TestException();
}

async Task SessionErrorHandler(ProcessErrorEventArgs eventArgs)
Expand Down Expand Up @@ -1388,6 +1381,9 @@ await client.AcceptSessionAsync(
case ServiceBusErrorSource.Complete:
await Task.Delay(delayDuration);
break;
case ServiceBusErrorSource.CloseSession:
tcs.TrySetResult(true);
break;
}
}
finally
Expand All @@ -1396,7 +1392,7 @@ await client.AcceptSessionAsync(
}
}
await tcs.Task;
await processor.StopProcessingAsync();
await processor.CloseAsync();
if (errorSource != ServiceBusErrorSource.AcceptSession)
{
Assert.AreEqual(1, messageCt);
Expand Down Expand Up @@ -1707,5 +1703,44 @@ async Task ProcessMessage(ProcessMessageEventArgs args)
Assert.IsNull(msg);
}
}

[Test]
public async Task AutoLockRenewalContinuesUntilProcessingCompletes()
{
var lockDuration = TimeSpan.FromSeconds(10);
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true, lockDuration: lockDuration))
{
await using var client = CreateClient();
var sender = client.CreateSender(scope.QueueName);
int messageCount = 10;

await sender.SendMessagesAsync(GetMessages(messageCount, "sessionId"));

await using var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions
{
MaxConcurrentCallsPerSession = 10
});

int receivedCount = 0;
var tcs = new TaskCompletionSource<bool>();

async Task ProcessMessage(ProcessSessionMessageEventArgs args)
{
var ct = Interlocked.Increment(ref receivedCount);
if (ct == messageCount)
{
tcs.SetResult(true);
}

await Task.Delay(lockDuration.Add(lockDuration));
}
processor.ProcessMessageAsync += ProcessMessage;
processor.ProcessErrorAsync += ExceptionHandler;

await processor.StartProcessingAsync();
await tcs.Task;
await processor.StopProcessingAsync();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ public async Task CrossEntityTransactionProcessorRollback()
await using var queueB = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false);
await using var queueC = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false);
var senderA = client.CreateSender(queueA.QueueName);
var processorA = client.CreateProcessor(queueA.QueueName);
await using var processorA = client.CreateProcessor(queueA.QueueName);

var receiverA = client.CreateReceiver(queueA.QueueName);
var receiverB = client.CreateReceiver(queueB.QueueName);
Expand Down Expand Up @@ -828,7 +828,7 @@ public async Task CrossEntityTransactionProcessor()
await using var queueB = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false);
await using var queueC = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false);
var senderA = noTxClient.CreateSender(queueA.QueueName);
var processorA = client.CreateProcessor(queueA.QueueName);
await using var processorA = client.CreateProcessor(queueA.QueueName);

var receiverA = noTxClient.CreateReceiver(queueA.QueueName);
var receiverB = noTxClient.CreateReceiver(queueB.QueueName);
Expand Down Expand Up @@ -878,7 +878,7 @@ public async Task CrossEntityTransactionSessionProcessorRollback()
await using var queueB = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true);
await using var queueC = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true);
var senderA = client.CreateSender(queueA.QueueName);
var processorA = client.CreateSessionProcessor(queueA.QueueName);
await using var processorA = client.CreateSessionProcessor(queueA.QueueName);
var senderB = client.CreateSender(queueB.QueueName);
var senderC = client.CreateSender(queueC.QueueName);

Expand Down Expand Up @@ -923,7 +923,7 @@ public async Task CrossEntityTransactionSessionProcessor()
await using var noTxClient = CreateNoRetryClient();
var senderA = noTxClient.CreateSender(queueA.QueueName);

var processorA = client.CreateSessionProcessor(queueA.QueueName);
await using var processorA = client.CreateSessionProcessor(queueA.QueueName);
var senderB = client.CreateSender(queueB.QueueName);
var senderC = client.CreateSender(queueC.QueueName);

Expand Down

0 comments on commit f553697

Please sign in to comment.