Skip to content

Commit

Permalink
Add metrics to count when worker blocked/respawned
Browse files Browse the repository at this point in the history
  • Loading branch information
armanbilge committed Dec 19, 2024
1 parent 40e909a commit 5144188
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 0 deletions.
11 changes: 11 additions & 0 deletions core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,8 @@ private[effect] final class WorkerThread[P <: AnyRef](
// This `WorkerThread` has already been prepared for blocking.
// There is no need to spawn another `WorkerThread`.
} else {
metrics.incrementBlockingCount()

// Spawn a new `WorkerThread` to take the place of this thread, as the
// current thread prepares to execute a blocking action.

Expand Down Expand Up @@ -902,6 +904,7 @@ private[effect] final class WorkerThread[P <: AnyRef](
// a worker thread about to run blocking code is **not** parked, and
// therefore, another worker thread would not even see it as a candidate
// for unparking.
metrics.incrementRespawnCount()
val idx = index
val clone =
new WorkerThread(
Expand Down Expand Up @@ -971,6 +974,14 @@ private[effect] object WorkerThread {
private[this] var idleTime: Long = 0
def getIdleTime(): Long = idleTime
def addIdleTime(x: Long): Unit = idleTime += x

private[this] var blockingCount: Long = 0
def getBlockingCount(): Long = blockingCount
def incrementBlockingCount(): Unit = blockingCount += 1

private[this] var respawnCount: Long = 0
def getRespawnCount(): Long = respawnCount
def incrementRespawnCount(): Unit = respawnCount += 1
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ trait WorkerThreadMetrics {
*/
def idleTime(): Long

/**
* The total number of times that this WorkerThread has switched to a blocking thread and been
* replaced.
*/
def blockingCount(): Long

/**
* The total number of times that this WorkerThread has been replaced by a newly spawned
* thread.
*/
def respawnCount(): Long

/**
* LocalQueue-specific metrics of this WorkerThread.
*/
Expand Down Expand Up @@ -244,6 +256,8 @@ object WorkStealingPoolMetrics {

private val metrics = wstp.metrices(idx)
def idleTime(): Long = metrics.getIdleTime()
def blockingCount(): Long = metrics.getBlockingCount()
def respawnCount(): Long = metrics.getBlockingCount()

val localQueue: LocalQueueMetrics = localQueueMetrics(wstp.localQueues(index))
val timerHeap: TimerHeapMetrics = timerHeapMetrics(wstp.sleepers(index))
Expand Down

0 comments on commit 5144188

Please sign in to comment.