Skip to content

Commit

Permalink
[6.0] Modify thread pool thread counting to be a bit more defensive (d…
Browse files Browse the repository at this point in the history
…otnet#70479)

* [6.0] Modify thread pool thread counting to be a bit more defensive

- Port of dotnet#70478 to 6.0
- An unexpected underflow in one or more thread counts can lead to a large number of threads to be created continually
- Prevented underflows in changes to thread counts, such that following an unexpected underflow, subsequent paired increments and decrements would avoid repeating the underflow
- Verified by creating an unexpected underflow in the debugger

* Address feedback
  • Loading branch information
kouvel authored Jun 11, 2022
1 parent 2cf1ec2 commit 2ecbdbb
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,61 +31,52 @@ private void SetInt16Value(short value, byte shift) =>
/// </summary>
public short NumProcessingWork
{
get => GetInt16Value(NumProcessingWorkShift);
get
{
short value = GetInt16Value(NumProcessingWorkShift);
Debug.Assert(value >= 0);
return value;
}
set
{
Debug.Assert(value >= 0);
SetInt16Value(value, NumProcessingWorkShift);
SetInt16Value(Math.Max((short)0, value), NumProcessingWorkShift);
}
}

public void SubtractNumProcessingWork(short value)
{
Debug.Assert(value >= 0);
Debug.Assert(value <= NumProcessingWork);

_data -= (ulong)(ushort)value << NumProcessingWorkShift;
}

public void InterlockedDecrementNumProcessingWork()
{
Debug.Assert(NumProcessingWorkShift == 0);

ThreadCounts counts = new ThreadCounts(Interlocked.Decrement(ref _data));
Debug.Assert(counts.NumProcessingWork >= 0);
}

/// <summary>
/// Number of thread pool threads that currently exist.
/// </summary>
public short NumExistingThreads
{
get => GetInt16Value(NumExistingThreadsShift);
get
{
short value = GetInt16Value(NumExistingThreadsShift);
Debug.Assert(value >= 0);
return value;
}
set
{
Debug.Assert(value >= 0);
SetInt16Value(value, NumExistingThreadsShift);
SetInt16Value(Math.Max((short)0, value), NumExistingThreadsShift);
}
}

public void SubtractNumExistingThreads(short value)
{
Debug.Assert(value >= 0);
Debug.Assert(value <= NumExistingThreads);

_data -= (ulong)(ushort)value << NumExistingThreadsShift;
}

/// <summary>
/// Max possible thread pool threads we want to have.
/// </summary>
public short NumThreadsGoal
{
get => GetInt16Value(NumThreadsGoalShift);
get
{
short value = GetInt16Value(NumThreadsGoalShift);
Debug.Assert(value > 0);
return value;
}
set
{
Debug.Assert(value > 0);
SetInt16Value(value, NumThreadsGoalShift);
SetInt16Value(Math.Max((short)1, value), NumThreadsGoalShift);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,14 @@ private static void WorkerThreadStart()
// Since this thread is currently registered as an existing thread, if more work comes in meanwhile,
// this thread would be expected to satisfy the new work. Ensure that NumExistingThreads is not
// decreased below NumProcessingWork, as that would be indicative of such a case.
short numExistingThreads = counts.NumExistingThreads;
if (numExistingThreads <= counts.NumProcessingWork)
if (counts.NumExistingThreads <= counts.NumProcessingWork)
{
// In this case, enough work came in that this thread should not time out and should go back to work.
break;
}

ThreadCounts newCounts = counts;
newCounts.SubtractNumExistingThreads(1);
short newNumExistingThreads = (short)(numExistingThreads - 1);
short newNumExistingThreads = --newCounts.NumExistingThreads;
short newNumThreadsGoal =
Math.Max(
threadPoolInstance.MinThreadsGoal,
Expand Down Expand Up @@ -159,7 +157,23 @@ private static void WorkerThreadStart()
/// </summary>
private static void RemoveWorkingWorker(PortableThreadPool threadPoolInstance)
{
threadPoolInstance._separated.counts.InterlockedDecrementNumProcessingWork();
// A compare-exchange loop is used instead of Interlocked.Decrement or Interlocked.Add to defensively prevent
// NumProcessingWork from underflowing. See the setter for NumProcessingWork.
ThreadCounts counts = threadPoolInstance._separated.counts;
while (true)
{
ThreadCounts newCounts = counts;
newCounts.NumProcessingWork--;

ThreadCounts countsBeforeUpdate =
threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
if (countsBeforeUpdate == counts)
{
break;
}

counts = countsBeforeUpdate;
}

// It's possible that we decided we had thread requests just before a request came in,
// but reduced the worker count *after* the request came in. In this case, we might
Expand Down Expand Up @@ -221,8 +235,8 @@ internal static void MaybeAddWorkingWorker(PortableThreadPool threadPoolInstance
while (true)
{
ThreadCounts newCounts = counts;
newCounts.SubtractNumProcessingWork((short)toCreate);
newCounts.SubtractNumExistingThreads((short)toCreate);
newCounts.NumProcessingWork -= (short)toCreate;
newCounts.NumExistingThreads -= (short)toCreate;

ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
if (oldCounts == counts)
Expand Down Expand Up @@ -259,7 +273,7 @@ internal static bool ShouldStopProcessingWorkNow(PortableThreadPool threadPoolIn
}

ThreadCounts newCounts = counts;
newCounts.SubtractNumProcessingWork(1);
newCounts.NumProcessingWork--;

ThreadCounts oldCounts = threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);

Expand Down

0 comments on commit 2ecbdbb

Please sign in to comment.