Skip to content

Commit

Permalink
Add idle time metric
Browse files Browse the repository at this point in the history
  • Loading branch information
armanbilge committed Dec 19, 2024
1 parent a52da5c commit 40e909a
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](
private[unsafe] val fiberBags: Array[WeakBag[Runnable]] = new Array(threadCount)
private[unsafe] val pollers: Array[P] =
new Array[AnyRef](threadCount).asInstanceOf[Array[P]]
private[unsafe] val metrices: Array[WorkerThread.Metrics] = new Array(threadCount)

def accessPoller(cb: P => Unit): Unit = {

Expand Down Expand Up @@ -160,6 +161,8 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](
fiberBags(i) = fiberBag
val poller = system.makePoller()
pollers(i) = poller
val metrics = new WorkerThread.Metrics
metrices(i) = metrics

val thread =
new WorkerThread(
Expand All @@ -171,6 +174,7 @@ private[effect] final class WorkStealingThreadPool[P <: AnyRef](
sleepersHeap,
system,
poller,
metrics,
this)

workerThreads(i) = thread
Expand Down
28 changes: 23 additions & 5 deletions core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import java.lang.Long.MIN_VALUE
import java.util.concurrent.{LinkedTransferQueue, ThreadLocalRandom}
import java.util.concurrent.atomic.AtomicBoolean

import WorkerThread.Metrics

/**
* Implementation of the worker thread at the heart of the [[WorkStealingThreadPool]].
*
Expand All @@ -55,6 +57,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
private[this] var sleepers: TimerHeap,
private[this] val system: PollingSystem.WithPoller[P],
private[this] var _poller: P,
private[this] var metrics: Metrics,
// Reference to the `WorkStealingThreadPool` in which this thread operates.
pool: WorkStealingThreadPool[P])
extends Thread
Expand Down Expand Up @@ -401,8 +404,6 @@ private[effect] final class WorkerThread[P <: AnyRef](
}
}

now = System.nanoTime()

if (nextState != 4) {
// after being unparked, we re-check sleepers;
// if we find an already expired one, we go
Expand All @@ -423,7 +424,10 @@ private[effect] final class WorkerThread[P <: AnyRef](
def parkLoop(): Boolean = {
while (!done.get()) {
// Park the thread until further notice.
val start = System.nanoTime()
val polled = system.poll(_poller, -1, reportFailure)
now = System.nanoTime() // update now
metrics.addIdleTime(now - start)

// the only way we can be interrupted here is if it happened *externally* (probably sbt)
if (isInterrupted()) {
Expand Down Expand Up @@ -455,15 +459,17 @@ private[effect] final class WorkerThread[P <: AnyRef](
val nanos = triggerTime - now

if (nanos > 0L) {
val start = now
val polled = system.poll(_poller, nanos, reportFailure)
// we already parked and time passed, so update time again
// it doesn't matter if we timed out or were awakened, the update is free-ish
now = System.nanoTime()
metrics.addIdleTime(now - start)

if (isInterrupted()) {
pool.shutdown()
false // we know `done` is `true`
} else {
// we already parked and time passed, so update time again
// it doesn't matter if we timed out or were awakened, the update is free-ish
now = System.nanoTime()
if (parked.get()) {
// we were either awakened spuriously, or we timed out or polled an event
if (polled || (triggerTime - now <= 0)) {
Expand Down Expand Up @@ -506,6 +512,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
fiberBag = null
_active = null
_poller = null.asInstanceOf[P]
metrics = null

// Add this thread to the cached threads data structure, to be picked up
// by another thread in the future.
Expand Down Expand Up @@ -906,6 +913,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
sleepers,
system,
_poller,
metrics,
pool)
// Make sure the clone gets our old name:
val clonePrefix = pool.threadPrefix
Expand Down Expand Up @@ -956,3 +964,13 @@ private[effect] final class WorkerThread[P <: AnyRef](
def getSuspendedFiberCount(): Int =
fiberBag.size
}

private[effect] object WorkerThread {

final class Metrics {
private[this] var idleTime: Long = 0
def getIdleTime(): Long = idleTime
def addIdleTime(x: Long): Unit = idleTime += x
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ trait WorkerThreadMetrics {
*/
def index: Int

/**
* The total amount of time in nanoseconds that this WorkerThread has been parked.
*/
def idleTime(): Long

/**
* LocalQueue-specific metrics of this WorkerThread.
*/
Expand Down Expand Up @@ -236,6 +241,10 @@ object WorkStealingPoolMetrics {
idx: Int
): WorkerThreadMetrics = new WorkerThreadMetrics {
val index: Int = idx

private val metrics = wstp.metrices(idx)
def idleTime(): Long = metrics.getIdleTime()

val localQueue: LocalQueueMetrics = localQueueMetrics(wstp.localQueues(index))
val timerHeap: TimerHeapMetrics = timerHeapMetrics(wstp.sleepers(index))
}
Expand Down

0 comments on commit 40e909a

Please sign in to comment.