Skip to content

Commit

Permalink
Fix SemaphoreSlim release logic (#941)
Browse files Browse the repository at this point in the history
Under some circumstances, SemaphoreSlim.WaitAsync() can fail, e.g.
throwing an OperationCanceledException. In these cases, the internal
semaphore counter didn't change, so SemaphoreSlim.Release() should not
be called, to avoid exceptions.

Classes affected:
- SqlServerMemory (initialization)
- SimpleQueue (message dispatching)
  • Loading branch information
dluc authored Dec 12, 2024
1 parent 695fddb commit e8ac5fc
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 5 deletions.
13 changes: 10 additions & 3 deletions extensions/SQLServer/SQLServer/SqlServerMemory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -396,10 +396,12 @@ private async Task InitAsync(CancellationToken cancellationToken)
{
if (this._isReady) { return; }

await this._initSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);

var lockAcquired = false;
try
{
await this._initSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
lockAcquired = true;

if (this._isReady) { return; }

await this.CacheSqlServerMajorVersionNumberAsync(cancellationToken).ConfigureAwait(false);
Expand All @@ -408,7 +410,12 @@ private async Task InitAsync(CancellationToken cancellationToken)
}
finally
{
this._initSemaphore.Release();
// Decrease the internal counter only it the lock was acquired,
// e.g. not when WaitAsync times out or throws some exception
if (lockAcquired)
{
this._initSemaphore.Release();
}
}
}

Expand Down
18 changes: 16 additions & 2 deletions service/Core/Pipeline/Queue/DevTools/SimpleQueues.cs
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,13 @@ private void PopulateQueue(object? sender, ElapsedEventArgs elapsedEventArgs)
{
Task.Run(async () =>
{
var lockAcquired = false;
try
{
if (this._queue.Count >= this._config.FetchBatchSize) { return; }

await s_lock.WaitAsync(this._cancellation.Token).ConfigureAwait(false);
lockAcquired = true;

// Loop through all messages on storage
var messagesOnStorage = (await this._fileSystem.GetAllFileNamesAsync(this._queueName, "", this._cancellation.Token).ConfigureAwait(false)).ToList();
Expand Down Expand Up @@ -340,7 +342,12 @@ private void PopulateQueue(object? sender, ElapsedEventArgs elapsedEventArgs)
}
finally
{
s_lock.Release();
// Decrease the internal counter only it the lock was acquired,
// e.g. not when WaitAsync times out or throws some exception
if (lockAcquired)
{
s_lock.Release();
}
}
}, this._cancellation.Token);
}
Expand All @@ -354,11 +361,13 @@ private void DispatchMessage(object? sender, ElapsedEventArgs e)
{
Task.Run(async () =>
{
var lockAcquired = false;
try
{
if (this._queue.IsEmpty) { return; }

await s_lock.WaitAsync(this._cancellation.Token).ConfigureAwait(false);
lockAcquired = true;

this._log.LogTrace("Dispatching {MessageCount} messages", this._queue.Count);

Expand All @@ -373,7 +382,12 @@ private void DispatchMessage(object? sender, ElapsedEventArgs e)
}
finally
{
s_lock.Release();
// Decrease the internal counter only it the lock was acquired,
// e.g. not when WaitAsync times out or throws some exception
if (lockAcquired)
{
s_lock.Release();
}
}
}, this._cancellation.Token);
}
Expand Down

0 comments on commit e8ac5fc

Please sign in to comment.