Skip to content

Commit

Permalink
Merge branch 'main' into issue-934
Browse files Browse the repository at this point in the history
  • Loading branch information
dluc authored Dec 12, 2024
2 parents b963e97 + e8ac5fc commit 2a9a35b
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 5 deletions.
13 changes: 10 additions & 3 deletions extensions/SQLServer/SQLServer/SqlServerMemory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -396,10 +396,12 @@ private async Task InitAsync(CancellationToken cancellationToken)
{
if (this._isReady) { return; }

await this._initSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);

var lockAcquired = false;
try
{
await this._initSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
lockAcquired = true;

if (this._isReady) { return; }

await this.CacheSqlServerMajorVersionNumberAsync(cancellationToken).ConfigureAwait(false);
Expand All @@ -408,7 +410,12 @@ private async Task InitAsync(CancellationToken cancellationToken)
}
finally
{
this._initSemaphore.Release();
// Decrease the internal counter only it the lock was acquired,
// e.g. not when WaitAsync times out or throws some exception
if (lockAcquired)
{
this._initSemaphore.Release();
}
}
}

Expand Down
18 changes: 16 additions & 2 deletions service/Core/Pipeline/Queue/DevTools/SimpleQueues.cs
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,13 @@ private void PopulateQueue(object? sender, ElapsedEventArgs elapsedEventArgs)
{
Task.Run(async () =>
{
var lockAcquired = false;
try
{
if (this._queue.Count >= this._config.FetchBatchSize) { return; }

await s_lock.WaitAsync(this._cancellation.Token).ConfigureAwait(false);
lockAcquired = true;

// Loop through all messages on storage
var messagesOnStorage = (await this._fileSystem.GetAllFileNamesAsync(this._queueName, "", this._cancellation.Token).ConfigureAwait(false)).ToList();
Expand Down Expand Up @@ -340,7 +342,12 @@ private void PopulateQueue(object? sender, ElapsedEventArgs elapsedEventArgs)
}
finally
{
s_lock.Release();
// Decrease the internal counter only it the lock was acquired,
// e.g. not when WaitAsync times out or throws some exception
if (lockAcquired)
{
s_lock.Release();
}
}
}, this._cancellation.Token);
}
Expand All @@ -354,11 +361,13 @@ private void DispatchMessage(object? sender, ElapsedEventArgs e)
{
Task.Run(async () =>
{
var lockAcquired = false;
try
{
if (this._queue.IsEmpty) { return; }

await s_lock.WaitAsync(this._cancellation.Token).ConfigureAwait(false);
lockAcquired = true;

this._log.LogTrace("Dispatching {MessageCount} messages", this._queue.Count);

Expand All @@ -373,7 +382,12 @@ private void DispatchMessage(object? sender, ElapsedEventArgs e)
}
finally
{
s_lock.Release();
// Decrease the internal counter only it the lock was acquired,
// e.g. not when WaitAsync times out or throws some exception
if (lockAcquired)
{
s_lock.Release();
}
}
}, this._cancellation.Token);
}
Expand Down

0 comments on commit 2a9a35b

Please sign in to comment.