From 30ff14ef56958b04ab7814ef139df7f540935f54 Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Wed, 3 May 2023 01:23:23 +0200 Subject: [PATCH 1/4] Add failing test --- .../test/scala/cats/effect/unsafe/WorkerThreadNameSpec.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/jvm/src/test/scala/cats/effect/unsafe/WorkerThreadNameSpec.scala b/tests/jvm/src/test/scala/cats/effect/unsafe/WorkerThreadNameSpec.scala index 31efb94d28..2ccef9927d 100644 --- a/tests/jvm/src/test/scala/cats/effect/unsafe/WorkerThreadNameSpec.scala +++ b/tests/jvm/src/test/scala/cats/effect/unsafe/WorkerThreadNameSpec.scala @@ -54,6 +54,7 @@ class WorkerThreadNameSpec extends BaseSpec with TestInstances { "WorkerThread" should { "rename itself when entering and exiting blocking region" in real { for { + _ <- IO.cede computeThread <- threadInfo (computeThreadName, _) = computeThread blockerThread <- IO.blocking(threadInfo).flatten @@ -65,6 +66,8 @@ class WorkerThreadNameSpec extends BaseSpec with TestInstances { } yield { // Start with the regular prefix computeThreadName must startWith("io-compute") + // correct WSTP index (threadCount is 1, so the only possible index is 0) + computeThreadName must endWith("-0") // Check that entering a blocking region changes the name blockerThreadName must startWith("io-blocker") // Check that the same thread is renamed again when it is readded to the compute pool @@ -75,6 +78,8 @@ class WorkerThreadNameSpec extends BaseSpec with TestInstances { "blocker thread not found after reset") resetBlockerThread must beSome((_: String).startsWith("io-compute")) .setMessage("blocker thread name was not reset") + resetBlockerThread must beSome((_: String).endsWith("-0")) + .setMessage("blocker thread index was not correct") } } } From dadbc38c3b46f7394aa9a77a55347bb3f66fe6a4 Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Wed, 3 May 2023 01:27:44 +0200 Subject: [PATCH 2/4] Fix initial nameIndex --- core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 8c23c00c8f..d5918d9060 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -98,7 +98,7 @@ private final class WorkerThread( private val indexTransfer: LinkedTransferQueue[Integer] = new LinkedTransferQueue() private[this] val runtimeBlockingExpiration: Duration = pool.runtimeBlockingExpiration - val nameIndex: Int = pool.blockedWorkerThreadNamingIndex.incrementAndGet() + val nameIndex: Int = pool.blockedWorkerThreadNamingIndex.getAndIncrement() // Constructor code. { From f813fc3f5dffbade27adb34d5a01c5a3e7c5ac87 Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Wed, 3 May 2023 02:44:57 +0200 Subject: [PATCH 3/4] Add failing test for new worker naming --- .../scala/cats/effect/unsafe/WorkerThreadNameSpec.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/tests/jvm/src/test/scala/cats/effect/unsafe/WorkerThreadNameSpec.scala b/tests/jvm/src/test/scala/cats/effect/unsafe/WorkerThreadNameSpec.scala index 2ccef9927d..f3c936e011 100644 --- a/tests/jvm/src/test/scala/cats/effect/unsafe/WorkerThreadNameSpec.scala +++ b/tests/jvm/src/test/scala/cats/effect/unsafe/WorkerThreadNameSpec.scala @@ -60,16 +60,23 @@ class WorkerThreadNameSpec extends BaseSpec with TestInstances { blockerThread <- IO.blocking(threadInfo).flatten (blockerThreadName, blockerThreadId) = blockerThread _ <- IO.cede + // The new worker (which replaced the thread which became a blocker) should also have a correct name + newComputeThread <- threadInfo + (newComputeThreadName, _) = newComputeThread // Force the previously blocking thread to become a compute thread by converting // the pool of compute threads (size=1) to blocker threads resetComputeThreads <- List.fill(2)(threadInfo <* IO.blocking(())).parSequence } yield { // Start with the regular prefix computeThreadName must startWith("io-compute") - // correct WSTP index (threadCount is 1, so the only possible index is 0) + // Correct WSTP index (threadCount is 1, so the only possible index is 0) computeThreadName must endWith("-0") // Check that entering a blocking region changes the name blockerThreadName must startWith("io-blocker") + // Check that the replacement compute thread has correct name + newComputeThreadName must startWith("io-compute") + // And index + newComputeThreadName must endWith("-0") // Check that the same thread is renamed again when it is readded to the compute pool val resetBlockerThread = resetComputeThreads.collectFirst { case (name, `blockerThreadId`) => name From 7723cf857c9c35c220f56633f836fcd89fbc9cf6 Mon Sep 17 00:00:00 2001 From: Daniel Urban Date: Wed, 3 May 2023 02:49:14 +0200 Subject: [PATCH 4/4] Fix new worker naming --- core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index d5918d9060..3a23414cf6 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -668,6 +668,9 @@ private final class WorkerThread( val idx = index val clone = new WorkerThread(idx, queue, parked, external, fiberBag, pool) + // Make sure the clone gets our old name: + val clonePrefix = pool.threadPrefix + clone.setName(s"$clonePrefix-$idx") pool.replaceWorker(idx, clone) pool.blockedWorkerThreadCounter.incrementAndGet() clone.start()