diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 8c23c00c8f..3a23414cf6 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -98,7 +98,7 @@ private final class WorkerThread( private val indexTransfer: LinkedTransferQueue[Integer] = new LinkedTransferQueue() private[this] val runtimeBlockingExpiration: Duration = pool.runtimeBlockingExpiration - val nameIndex: Int = pool.blockedWorkerThreadNamingIndex.incrementAndGet() + val nameIndex: Int = pool.blockedWorkerThreadNamingIndex.getAndIncrement() // Constructor code. { @@ -668,6 +668,9 @@ private final class WorkerThread( val idx = index val clone = new WorkerThread(idx, queue, parked, external, fiberBag, pool) + // Make sure the clone gets our old name: + val clonePrefix = pool.threadPrefix + clone.setName(s"$clonePrefix-$idx") pool.replaceWorker(idx, clone) pool.blockedWorkerThreadCounter.incrementAndGet() clone.start() diff --git a/tests/jvm/src/test/scala/cats/effect/unsafe/WorkerThreadNameSpec.scala b/tests/jvm/src/test/scala/cats/effect/unsafe/WorkerThreadNameSpec.scala index 31efb94d28..f3c936e011 100644 --- a/tests/jvm/src/test/scala/cats/effect/unsafe/WorkerThreadNameSpec.scala +++ b/tests/jvm/src/test/scala/cats/effect/unsafe/WorkerThreadNameSpec.scala @@ -54,19 +54,29 @@ class WorkerThreadNameSpec extends BaseSpec with TestInstances { "WorkerThread" should { "rename itself when entering and exiting blocking region" in real { for { + _ <- IO.cede computeThread <- threadInfo (computeThreadName, _) = computeThread blockerThread <- IO.blocking(threadInfo).flatten (blockerThreadName, blockerThreadId) = blockerThread _ <- IO.cede + // The new worker (which replaced the thread which became a blocker) should also have a correct name + newComputeThread <- threadInfo + (newComputeThreadName, _) = newComputeThread // Force the previously blocking thread to become a compute thread by converting // the pool of compute threads (size=1) to blocker threads resetComputeThreads <- List.fill(2)(threadInfo <* IO.blocking(())).parSequence } yield { // Start with the regular prefix computeThreadName must startWith("io-compute") + // Correct WSTP index (threadCount is 1, so the only possible index is 0) + computeThreadName must endWith("-0") // Check that entering a blocking region changes the name blockerThreadName must startWith("io-blocker") + // Check that the replacement compute thread has correct name + newComputeThreadName must startWith("io-compute") + // And index + newComputeThreadName must endWith("-0") // Check that the same thread is renamed again when it is readded to the compute pool val resetBlockerThread = resetComputeThreads.collectFirst { case (name, `blockerThreadId`) => name @@ -75,6 +85,8 @@ class WorkerThreadNameSpec extends BaseSpec with TestInstances { "blocker thread not found after reset") resetBlockerThread must beSome((_: String).startsWith("io-compute")) .setMessage("blocker thread name was not reset") + resetBlockerThread must beSome((_: String).endsWith("-0")) + .setMessage("blocker thread index was not correct") } } }