diff --git a/core/js/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/js/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index 91c0104582..7fd877c8e5 100644 --- a/core/js/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/js/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -27,4 +27,5 @@ private[effect] sealed abstract class WorkStealingThreadPool private () def reportFailure(cause: Throwable): Unit = () private[effect] def executeFiber(fiber: IOFiber[_]): Unit = { val _ = fiber } private[effect] def rescheduleFiber(fiber: IOFiber[_]): Unit = { val _ = fiber } + private[effect] def rescheduleFiberAndNotify(fiber: IOFiber[_]): Unit = { val _ = fiber } } diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala index 716e0936f6..65ac1e80bc 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkStealingThreadPool.scala @@ -282,21 +282,31 @@ private[effect] final class WorkStealingThreadPool( */ private[effect] def executeFiber(fiber: IOFiber[_]): Unit = { if (Thread.currentThread().isInstanceOf[WorkerThread]) { - rescheduleFiber(fiber) + rescheduleFiberAndNotify(fiber) } else { externalQueue.enqueue(fiber) notifyParked() } } + /** + * Reschedules the given fiber directly on the local work stealing queue on the same thread, + * but with the possibility to skip notifying other fibers of a potential steal target, which + * reduces contention in workloads running on fewer worker threads. This method executes an + * unchecked cast to a `WorkerThread` and should only ever be called directly from a + * `WorkerThread`. + */ + private[effect] def rescheduleFiber(fiber: IOFiber[_]): Unit = { + Thread.currentThread().asInstanceOf[WorkerThread].smartEnqueue(fiber, externalQueue) + } + /** * Reschedules the given fiber directly on the local work stealing queue on the same thread. * This method executes an unchecked cast to a `WorkerThread` and should only ever be called * directly from a `WorkerThread`. */ - private[effect] def rescheduleFiber(fiber: IOFiber[_]): Unit = { - Thread.currentThread().asInstanceOf[WorkerThread].enqueue(fiber, externalQueue) - notifyParked() + private[effect] def rescheduleFiberAndNotify(fiber: IOFiber[_]): Unit = { + Thread.currentThread().asInstanceOf[WorkerThread].enqueueAndNotify(fiber, externalQueue) } /** diff --git a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala index 1d1333a271..d2e788350c 100644 --- a/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala +++ b/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala @@ -61,10 +61,33 @@ private final class WorkerThread( @volatile private[unsafe] var sleeping: Boolean = false /** - * A forwarder method for enqueuing a fiber to the local work stealing queue. + * Enqueues a fiber to the local work stealing queue. This method always + * notifies another thread that a steal should be attempted from this queue. */ - def enqueue(fiber: IOFiber[_], external: ExternalQueue): Unit = + def enqueueAndNotify(fiber: IOFiber[_], external: ExternalQueue): Unit = { queue.enqueue(fiber, external) + pool.notifyParked() + } + + /** + * Enqueues a fiber to the local work stealing queue. This method can skip + * notifying another thread about potential work to be stolen if it can be + * determined that this is a mostly single fiber workload. + */ + def smartEnqueue(fiber: IOFiber[_], external: ExternalQueue): Unit = { + // Check if the local queue is empty **before** enqueueing the given fiber. + val empty = queue.isEmpty() + queue.enqueue(fiber, external) + if (tick == ExternalCheckIterationsMask || !empty) { + // On the next iteration, this worker thread will check for new work from + // the external queue, which means that another thread should be woken up + // to contend for the enqueued fiber. + // It could also be the case that the current queue was not empty before + // the fiber was enqueued, in which case another thread should wake up + // to help out. + pool.notifyParked() + } + } /** * A forwarder method for stealing work from the local work stealing queue in diff --git a/core/shared/src/main/scala/cats/effect/IOFiber.scala b/core/shared/src/main/scala/cats/effect/IOFiber.scala index 1f9cc8855f..0dd6303713 100644 --- a/core/shared/src/main/scala/cats/effect/IOFiber.scala +++ b/core/shared/src/main/scala/cats/effect/IOFiber.scala @@ -603,7 +603,7 @@ private final class IOFiber[A]( // println(s"<$name> spawning <$childName>") - reschedule(ec)(fiber) + rescheduleAndNotify(ec)(fiber) runLoop(succeeded(fiber, 0), nextIteration) @@ -872,18 +872,26 @@ private final class IOFiber[A]( } private[this] def reschedule(ec: ExecutionContext)(fiber: IOFiber[_]): Unit = - if (ec.isInstanceOf[WorkStealingThreadPool]) { + if (ec.isInstanceOf[WorkStealingThreadPool]) ec.asInstanceOf[WorkStealingThreadPool].rescheduleFiber(fiber) - } else { - try { - ec.execute(fiber) - } catch { - case _: RejectedExecutionException => - /* - * swallow this exception, since it means we're being externally murdered, - * so we should just... drop the runloop - */ - } + else + scheduleOnForeignEC(ec)(fiber) + + private[this] def rescheduleAndNotify(ec: ExecutionContext)(fiber: IOFiber[_]): Unit = + if (ec.isInstanceOf[WorkStealingThreadPool]) + ec.asInstanceOf[WorkStealingThreadPool].rescheduleFiberAndNotify(fiber) + else + scheduleOnForeignEC(ec)(fiber) + + private[this] def scheduleOnForeignEC(ec: ExecutionContext)(fiber: IOFiber[_]): Unit = + try { + ec.execute(fiber) + } catch { + case _: RejectedExecutionException => + /* + * swallow this exception, since it means we're being externally murdered, + * so we should just... drop the runloop + */ } // TODO figure out if the JVM ever optimizes this away diff --git a/core/shared/src/test/scala/cats/effect/Runners.scala b/core/shared/src/test/scala/cats/effect/Runners.scala index 58055380b7..fdd0e4da7b 100644 --- a/core/shared/src/test/scala/cats/effect/Runners.scala +++ b/core/shared/src/test/scala/cats/effect/Runners.scala @@ -287,8 +287,6 @@ trait Runners extends SpecificationLike with RunnersPlatform { outer => def mustEqual(a: A) = fa.flatMap { res => IO(res must beEqualTo(a)) } } - - def unsafeRun[A](ioa: IO[A])(implicit ticker: Ticker): Outcome[Option, Throwable, A] = try { var results: Outcome[Option, Throwable, A] = Outcome.Succeeded(None)