Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split PortableThreadPool.WorkerThread start and loop body #84490

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2539,6 +2539,7 @@
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.ThreadCounts.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WaitThread.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerThread.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerThread.NonBrowser.cs"/>
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerTracking.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.Unix.cs" Condition="'$(TargetsUnix)' == 'true' or '$(TargetsBrowser)' == 'true' or '$(TargetsWasi)' == 'true'" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.Windows.cs" Condition="'$(TargetsWindows)' == 'true'" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics.Tracing;

namespace System.Threading
{
internal sealed partial class PortableThreadPool
{
/// <summary>
/// The worker thread infastructure for the CLR thread pool.
/// </summary>
private static partial class WorkerThread
{

/// <summary>
/// Semaphore for controlling how many threads are currently working.
/// </summary>
private static readonly LowLevelLifoSemaphore s_semaphore =
new LowLevelLifoSemaphore(
0,
MaxPossibleThreadCount,
AppContextConfigHelper.GetInt32Config(
"System.Threading.ThreadPool.UnfairSemaphoreSpinLimit",
SemaphoreSpinCountDefault,
false),
onWait: () =>
{
if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadWait(
(uint)ThreadPoolInstance._separated.counts.VolatileRead().NumExistingThreads);
}
});

private static readonly ThreadStart s_workerThreadStart = WorkerThreadStart;

private static void WorkerThreadStart()
{
Thread.CurrentThread.SetThreadPoolWorkerThreadName();

PortableThreadPool threadPoolInstance = ThreadPoolInstance;

if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStart(
(uint)threadPoolInstance._separated.counts.VolatileRead().NumExistingThreads);
}

LowLevelLock threadAdjustmentLock = threadPoolInstance._threadAdjustmentLock;
LowLevelLifoSemaphore semaphore = s_semaphore;

while (true)
{
bool spinWait = true;
while (semaphore.Wait(ThreadPoolThreadTimeoutMs, spinWait))
{
WorkerDoWork(threadPoolInstance, ref spinWait);
}

if (ShouldExitWorker(threadPoolInstance, threadAdjustmentLock))
{
break;
}
}
}


private static void CreateWorkerThread()
{
// Thread pool threads must start in the default execution context without transferring the context, so
// using UnsafeStart() instead of Start()
Thread workerThread = new Thread(s_workerThreadStart);
workerThread.IsThreadPoolThread = true;
workerThread.IsBackground = true;
// thread name will be set in thread proc
workerThread.UnsafeStart();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics.Tracing;
using System.Runtime.CompilerServices;

namespace System.Threading
{
Expand All @@ -28,147 +29,112 @@ private static partial class WorkerThread
// preexisting threads from running out of memory when using new stack space in low-memory situations.
public const int EstimatedAdditionalStackUsagePerThreadBytes = 64 << 10; // 64 KB

/// <summary>
/// Semaphore for controlling how many threads are currently working.
/// </summary>
private static readonly LowLevelLifoSemaphore s_semaphore =
new LowLevelLifoSemaphore(
0,
MaxPossibleThreadCount,
AppContextConfigHelper.GetInt32Config(
"System.Threading.ThreadPool.UnfairSemaphoreSpinLimit",
SemaphoreSpinCountDefault,
false),
onWait: () =>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void WorkerDoWork(PortableThreadPool threadPoolInstance, ref bool spinWait)
{
bool alreadyRemovedWorkingWorker = false;
while (TakeActiveRequest(threadPoolInstance))
{
threadPoolInstance._separated.lastDequeueTime = Environment.TickCount;
if (!ThreadPoolWorkQueue.Dispatch())
{
if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadWait(
(uint)ThreadPoolInstance._separated.counts.VolatileRead().NumExistingThreads);
}
});
// ShouldStopProcessingWorkNow() caused the thread to stop processing work, and it would have
// already removed this working worker in the counts. This typically happens when hill climbing
// decreases the worker thread count goal.
alreadyRemovedWorkingWorker = true;
break;
}

private static readonly ThreadStart s_workerThreadStart = WorkerThreadStart;
if (threadPoolInstance._separated.numRequestedWorkers <= 0)
{
break;
}

private static void WorkerThreadStart()
{
Thread.CurrentThread.SetThreadPoolWorkerThreadName();
// In highly bursty cases with short bursts of work, especially in the portable thread pool
// implementation, worker threads are being released and entering Dispatch very quickly, not finding
// much work in Dispatch, and soon afterwards going back to Dispatch, causing extra thrashing on
// data and some interlocked operations, and similarly when the thread pool runs out of work. Since
// there is a pending request for work, introduce a slight delay before serving the next request.
// The spin-wait is mainly for when the sleep is not effective due to there being no other threads
// to schedule.
Thread.UninterruptibleSleep0();
if (!Environment.IsSingleProcessor)
{
Thread.SpinWait(1);
}
}

PortableThreadPool threadPoolInstance = ThreadPoolInstance;
// Don't spin-wait on the semaphore next time if the thread was actively stopped from processing work,
// as it's unlikely that the worker thread count goal would be increased again so soon afterwards that
// the semaphore would be released within the spin-wait window
spinWait = !alreadyRemovedWorkingWorker;

if (NativeRuntimeEventSource.Log.IsEnabled())
if (!alreadyRemovedWorkingWorker)
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStart(
(uint)threadPoolInstance._separated.counts.VolatileRead().NumExistingThreads);
// If we woke up but couldn't find a request, or ran out of work items to process, we need to update
// the number of working workers to reflect that we are done working for now
RemoveWorkingWorker(threadPoolInstance);
}
}

LowLevelLock threadAdjustmentLock = threadPoolInstance._threadAdjustmentLock;
LowLevelLifoSemaphore semaphore = s_semaphore;
// returns true if the worker is shutting down
// returns false if we should do another iteration
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static bool ShouldExitWorker (PortableThreadPool threadPoolInstance, LowLevelLock threadAdjustmentLock)
{
// The thread cannot exit if it has IO pending, otherwise the IO may be canceled
if (IsIOPending)
{
return false;
}

while (true)
threadAdjustmentLock.Acquire();
try
{
bool spinWait = true;
while (semaphore.Wait(ThreadPoolThreadTimeoutMs, spinWait))
// At this point, the thread's wait timed out. We are shutting down this thread.
// We are going to decrement the number of existing threads to no longer include this one
// and then change the max number of threads in the thread pool to reflect that we don't need as many
// as we had. Finally, we are going to tell hill climbing that we changed the max number of threads.
ThreadCounts counts = threadPoolInstance._separated.counts;
while (true)
{
bool alreadyRemovedWorkingWorker = false;
while (TakeActiveRequest(threadPoolInstance))
{
threadPoolInstance._separated.lastDequeueTime = Environment.TickCount;
if (!ThreadPoolWorkQueue.Dispatch())
{
// ShouldStopProcessingWorkNow() caused the thread to stop processing work, and it would have
// already removed this working worker in the counts. This typically happens when hill climbing
// decreases the worker thread count goal.
alreadyRemovedWorkingWorker = true;
break;
}

if (threadPoolInstance._separated.numRequestedWorkers <= 0)
{
break;
}

// In highly bursty cases with short bursts of work, especially in the portable thread pool
// implementation, worker threads are being released and entering Dispatch very quickly, not finding
// much work in Dispatch, and soon afterwards going back to Dispatch, causing extra thrashing on
// data and some interlocked operations, and similarly when the thread pool runs out of work. Since
// there is a pending request for work, introduce a slight delay before serving the next request.
// The spin-wait is mainly for when the sleep is not effective due to there being no other threads
// to schedule.
Thread.UninterruptibleSleep0();
if (!Environment.IsSingleProcessor)
{
Thread.SpinWait(1);
}
}

// Don't spin-wait on the semaphore next time if the thread was actively stopped from processing work,
// as it's unlikely that the worker thread count goal would be increased again so soon afterwards that
// the semaphore would be released within the spin-wait window
spinWait = !alreadyRemovedWorkingWorker;

if (!alreadyRemovedWorkingWorker)
// Since this thread is currently registered as an existing thread, if more work comes in meanwhile,
// this thread would be expected to satisfy the new work. Ensure that NumExistingThreads is not
// decreased below NumProcessingWork, as that would be indicative of such a case.
if (counts.NumExistingThreads <= counts.NumProcessingWork)
{
// If we woke up but couldn't find a request, or ran out of work items to process, we need to update
// the number of working workers to reflect that we are done working for now
RemoveWorkingWorker(threadPoolInstance);
// In this case, enough work came in that this thread should not time out and should go back to work.
return false;
}
}

// The thread cannot exit if it has IO pending, otherwise the IO may be canceled
if (IsIOPending)
{
continue;
}

threadAdjustmentLock.Acquire();
try
{
// At this point, the thread's wait timed out. We are shutting down this thread.
// We are going to decrement the number of existing threads to no longer include this one
// and then change the max number of threads in the thread pool to reflect that we don't need as many
// as we had. Finally, we are going to tell hill climbing that we changed the max number of threads.
ThreadCounts counts = threadPoolInstance._separated.counts;
while (true)
ThreadCounts newCounts = counts;
short newNumExistingThreads = --newCounts.NumExistingThreads;
short newNumThreadsGoal =
Math.Max(
threadPoolInstance.MinThreadsGoal,
Math.Min(newNumExistingThreads, counts.NumThreadsGoal));
newCounts.NumThreadsGoal = newNumThreadsGoal;

ThreadCounts oldCounts =
threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
if (oldCounts == counts)
{
// Since this thread is currently registered as an existing thread, if more work comes in meanwhile,
// this thread would be expected to satisfy the new work. Ensure that NumExistingThreads is not
// decreased below NumProcessingWork, as that would be indicative of such a case.
if (counts.NumExistingThreads <= counts.NumProcessingWork)
HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.ThreadTimedOut);
if (NativeRuntimeEventSource.Log.IsEnabled())
{
// In this case, enough work came in that this thread should not time out and should go back to work.
break;
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStop((uint)newNumExistingThreads);
}

ThreadCounts newCounts = counts;
short newNumExistingThreads = --newCounts.NumExistingThreads;
short newNumThreadsGoal =
Math.Max(
threadPoolInstance.MinThreadsGoal,
Math.Min(newNumExistingThreads, counts.NumThreadsGoal));
newCounts.NumThreadsGoal = newNumThreadsGoal;

ThreadCounts oldCounts =
threadPoolInstance._separated.counts.InterlockedCompareExchange(newCounts, counts);
if (oldCounts == counts)
{
HillClimbing.ThreadPoolHillClimber.ForceChange(
newNumThreadsGoal,
HillClimbing.StateOrTransition.ThreadTimedOut);
if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStop((uint)newNumExistingThreads);
}
return;
}

counts = oldCounts;
return true;
}

counts = oldCounts;
}
finally
{
threadAdjustmentLock.Release();
}
}
finally
{
threadAdjustmentLock.Release();
}
}

Expand Down Expand Up @@ -300,17 +266,6 @@ private static bool TakeActiveRequest(PortableThreadPool threadPoolInstance)
}
return false;
}

private static void CreateWorkerThread()
{
// Thread pool threads must start in the default execution context without transferring the context, so
// using UnsafeStart() instead of Start()
Thread workerThread = new Thread(s_workerThreadStart);
workerThread.IsThreadPoolThread = true;
workerThread.IsBackground = true;
// thread name will be set in thread proc
workerThread.UnsafeStart();
}
}
}
}