Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continue renewing lock while processing #22257

Merged
merged 3 commits into from
Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public virtual bool IsClosed
internal IList<ServiceBusPlugin> Plugins { get; set; } = new List<ServiceBusPlugin>();

/// <summary>
/// Performs the task needed to clean up resources used by the <see cref="ServiceBusConnection" />,
/// including ensuring that the connection itself has been closed.
/// Performs the task needed to clean up resources used by the <see cref="ServiceBusClient" />,
/// including ensuring that the client itself has been closed.
/// </summary>
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private async Task ProcessOneMessage(
Receiver.ReceiveMode == ServiceBusReceiveMode.PeekLock &&
AutoRenewLock)
{
renewLockCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
renewLockCancellationTokenSource = new CancellationTokenSource();
renewLock = RenewMessageLock(
message,
renewLockCancellationTokenSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ internal class SessionReceiverManager : ReceiverManager
private ServiceBusSessionReceiver _receiver;
private CancellationTokenSource _sessionLockRenewalCancellationSource;
private Task _sessionLockRenewalTask;
private CancellationTokenSource _sessionCancellationSource = new CancellationTokenSource();
private CancellationTokenSource _sessionCancellationSource;
private bool _receiveTimeout;

protected override ServiceBusReceiver Receiver => _receiver;
Expand Down Expand Up @@ -113,7 +113,7 @@ private async Task CreateAndInitializeSessionReceiver(
CancellationToken processorCancellationToken)
{
await CreateReceiver(processorCancellationToken).ConfigureAwait(false);
_sessionCancellationSource = CancellationTokenSource.CreateLinkedTokenSource(processorCancellationToken);
_sessionCancellationSource = new CancellationTokenSource();

if (AutoRenewLock)
{
Expand Down Expand Up @@ -180,11 +180,17 @@ public override async Task CloseReceiverIfNeeded(
return;
}
_threadCount--;
if (_threadCount == 0 && !processorCancellationToken.IsCancellationRequested)

if (_threadCount == 0)
{
// Even if there are no current receive tasks, we should leave the
// receiver open if _keepOpenOnReceiveTimeout is true - which happens
// when a list of session Ids is specified and this list is less than the
// MaxConcurrentSessions.
if ((_receiveTimeout && !_keepOpenOnReceiveTimeout) ||
!AutoRenewLock ||
_sessionLockRenewalCancellationSource.IsCancellationRequested)
// if the session is cancelled we should still close the receiver
// as this means the session lock was lost.
_sessionCancellationSource.IsCancellationRequested)
{
await CloseReceiver(processorCancellationToken).ConfigureAwait(false);
}
Expand Down Expand Up @@ -257,15 +263,17 @@ public override async Task ReceiveAndProcessMessagesAsync(CancellationToken proc
// so simply return and allow this to be tried again on next thread
return;
}

using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(processorCancellationToken, _sessionCancellationSource.Token);
// loop within the context of this thread
while (!_sessionCancellationSource.Token.IsCancellationRequested)
while (!linkedTokenSource.Token.IsCancellationRequested)
{
errorSource = ServiceBusErrorSource.Receive;
IReadOnlyList<ServiceBusReceivedMessage> messages = await Receiver.ReceiveMessagesAsync(
maxMessages: 1,
maxWaitTime: _maxReceiveWaitTime,
isProcessor: true,
cancellationToken: _sessionCancellationSource.Token).ConfigureAwait(false);
cancellationToken: linkedTokenSource.Token).ConfigureAwait(false);
ServiceBusReceivedMessage message = messages.Count == 0 ? null : messages[0];
if (message == null)
{
Expand All @@ -277,21 +285,21 @@ public override async Task ReceiveAndProcessMessagesAsync(CancellationToken proc
await ProcessOneMessageWithinScopeAsync(
message,
DiagnosticProperty.ProcessSessionMessageActivityName,
_sessionCancellationSource.Token).ConfigureAwait(false);
linkedTokenSource.Token).ConfigureAwait(false);
}
}
catch (Exception ex)
when (!(ex is TaskCanceledException) ||
// If the user manually throws a TCE, then we should log it.
(!_sessionCancellationSource.IsCancellationRequested &&
// Even though the _sessionCancellationSource is linked to processorCancellationToken,
// we need to check both here in case the processor token gets cancelled before the
// session token is linked.
!processorCancellationToken.IsCancellationRequested))
!processorCancellationToken.IsCancellationRequested))
{
if (ex is ServiceBusException sbException && sbException.ProcessorErrorSource.HasValue)
if (ex is ServiceBusException sbException)
{
errorSource = sbException.ProcessorErrorSource.Value;
if (sbException.ProcessorErrorSource.HasValue)
{
errorSource = sbException.ProcessorErrorSource.Value;
}

// Signal cancellation so user event handlers can stop whatever processing they are doing
// as soon as we know the session lock has been lost. Note, we don't have analogous handling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -829,5 +829,44 @@ Task ProcessMessage(ProcessMessageEventArgs args)
await processor.StopProcessingAsync();
}
}

[Test]
public async Task AutoLockRenewalContinuesUntilProcessingCompletes()
{
var lockDuration = TimeSpan.FromSeconds(10);
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false, lockDuration: lockDuration))
{
await using var client = CreateClient();
var sender = client.CreateSender(scope.QueueName);
int messageCount = 10;

await sender.SendMessagesAsync(GetMessages(messageCount));

await using var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions
{
MaxConcurrentCalls = 10
});

int receivedCount = 0;
var tcs = new TaskCompletionSource<bool>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var tcs = new TaskCompletionSource<bool>();
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I had auto-merge enabled, but I don't think this will be a huge impact being that this is a test rather than the produce code.


async Task ProcessMessage(ProcessMessageEventArgs args)
{
var ct = Interlocked.Increment(ref receivedCount);
if (ct == messageCount)
{
tcs.SetResult(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tcs.SetResult(true);
tcs.TrySetResult(true);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not necessary due to the increment, but can't hurt.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be necessary and could potentially obscure other issues in the test.

}

await Task.Delay(lockDuration.Add(lockDuration));
}
processor.ProcessMessageAsync += ProcessMessage;
processor.ProcessErrorAsync += ExceptionHandler;

await processor.StartProcessingAsync();
await tcs.Task;
await processor.StopProcessingAsync();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ Task ExceptionHandler(ProcessErrorEventArgs args)

await processor.StartProcessingAsync();
await tcs.Task;
await processor.StopProcessingAsync();
await processor.CloseAsync();

if (settleMethod == "" || settleMethod == "Abandon")
{
Expand Down Expand Up @@ -1291,14 +1291,7 @@ public async Task ErrorSourceRespected(ServiceBusErrorSource errorSource)

Task CloseSessionHandler(ProcessSessionEventArgs arg)
{
try
{
throw new TestException();
}
finally
{
tcs.TrySetResult(true);
}
throw new TestException();
}

async Task SessionErrorHandler(ProcessErrorEventArgs eventArgs)
Expand Down Expand Up @@ -1388,6 +1381,9 @@ await client.AcceptSessionAsync(
case ServiceBusErrorSource.Complete:
await Task.Delay(delayDuration);
break;
case ServiceBusErrorSource.CloseSession:
tcs.TrySetResult(true);
break;
}
}
finally
Expand All @@ -1396,7 +1392,7 @@ await client.AcceptSessionAsync(
}
}
await tcs.Task;
await processor.StopProcessingAsync();
await processor.CloseAsync();
if (errorSource != ServiceBusErrorSource.AcceptSession)
{
Assert.AreEqual(1, messageCt);
Expand Down Expand Up @@ -1707,5 +1703,44 @@ async Task ProcessMessage(ProcessMessageEventArgs args)
Assert.IsNull(msg);
}
}

[Test]
public async Task AutoLockRenewalContinuesUntilProcessingCompletes()
{
var lockDuration = TimeSpan.FromSeconds(10);
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true, lockDuration: lockDuration))
{
await using var client = CreateClient();
var sender = client.CreateSender(scope.QueueName);
int messageCount = 10;

await sender.SendMessagesAsync(GetMessages(messageCount, "sessionId"));

await using var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions
{
MaxConcurrentCallsPerSession = 10
});

int receivedCount = 0;
var tcs = new TaskCompletionSource<bool>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var tcs = new TaskCompletionSource<bool>();
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);


async Task ProcessMessage(ProcessSessionMessageEventArgs args)
{
var ct = Interlocked.Increment(ref receivedCount);
if (ct == messageCount)
{
tcs.SetResult(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tcs.SetResult(true);
tcs.TrySetResult(true);

}

await Task.Delay(lockDuration.Add(lockDuration));
}
processor.ProcessMessageAsync += ProcessMessage;
processor.ProcessErrorAsync += ExceptionHandler;

await processor.StartProcessingAsync();
await tcs.Task;
await processor.StopProcessingAsync();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ public async Task CrossEntityTransactionProcessorRollback()
await using var queueB = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false);
await using var queueC = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false);
var senderA = client.CreateSender(queueA.QueueName);
var processorA = client.CreateProcessor(queueA.QueueName);
await using var processorA = client.CreateProcessor(queueA.QueueName);

var receiverA = client.CreateReceiver(queueA.QueueName);
var receiverB = client.CreateReceiver(queueB.QueueName);
Expand Down Expand Up @@ -828,7 +828,7 @@ public async Task CrossEntityTransactionProcessor()
await using var queueB = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false);
await using var queueC = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false);
var senderA = noTxClient.CreateSender(queueA.QueueName);
var processorA = client.CreateProcessor(queueA.QueueName);
await using var processorA = client.CreateProcessor(queueA.QueueName);

var receiverA = noTxClient.CreateReceiver(queueA.QueueName);
var receiverB = noTxClient.CreateReceiver(queueB.QueueName);
Expand Down Expand Up @@ -878,7 +878,7 @@ public async Task CrossEntityTransactionSessionProcessorRollback()
await using var queueB = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true);
await using var queueC = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true);
var senderA = client.CreateSender(queueA.QueueName);
var processorA = client.CreateSessionProcessor(queueA.QueueName);
await using var processorA = client.CreateSessionProcessor(queueA.QueueName);
var senderB = client.CreateSender(queueB.QueueName);
var senderC = client.CreateSender(queueC.QueueName);

Expand Down Expand Up @@ -923,7 +923,7 @@ public async Task CrossEntityTransactionSessionProcessor()
await using var noTxClient = CreateNoRetryClient();
var senderA = noTxClient.CreateSender(queueA.QueueName);

var processorA = client.CreateSessionProcessor(queueA.QueueName);
await using var processorA = client.CreateSessionProcessor(queueA.QueueName);
var senderB = client.CreateSender(queueB.QueueName);
var senderC = client.CreateSender(queueC.QueueName);

Expand Down