Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Contention on Map when using IO.unsafeToFuture() #2663

Open
yanns opened this issue Dec 18, 2021 · 4 comments
Open

Contention on Map when using IO.unsafeToFuture() #2663

yanns opened this issue Dec 18, 2021 · 4 comments

Comments

@yanns
Copy link
Contributor

yanns commented Dec 18, 2021

From https://discord.com/channels/632277896739946517/632278585700384799/921743315169325087

In a project based on Future, we introduce IO step by step. We use IO.unsafeToFuture() for interoperability.
I can observe the following locking:

Screenshot 2021-12-18 at 13 35 46

The whole application is running with one main ExecutionContext using a ForkJoinPool building very similarly to scala.concurrent.ExecutionContext.opportunistic.

We build our own IORuntime to re-use the main ExecutionContext like this:

import cats.effect.unsafe.{IORuntimeConfig, Scheduler}

import java.util.concurrent.ScheduledThreadPoolExecutor
import scala.concurrent.{blocking, ExecutionContext}

object IORuntimeFactory {

  /**
   * To be able to run some [[cats.effect.IO]], for example into a [[scala.concurrent.Future]],
   * we need a [[cats.effect.unsafe.IORuntime]]. This one is build from existing execution context.
   * It does not follow the best practices from cats effects as we are using the same
   * execution context for blocking and non blocking operations.
   *
   * This should be ok as long as we don't use it a lot.
   *
   * In general, it's better to avoid using it, and to consider using a [[cats.effect.std.Dispatcher]] instead.
   */
  def from(ec: ExecutionContext): cats.effect.unsafe.IORuntime = {
    val compute = ec
    val blockingEC = new ExecutionContext {
      override def execute(runnable: Runnable): Unit = ec.execute(() => blocking(runnable.run()))
      override def reportFailure(cause: Throwable): Unit = ec.reportFailure(cause)
    }
    cats.effect.unsafe.IORuntime(compute, blockingEC, scheduler, () => (), IORuntimeConfig())
  }

  /** same scheduler as in [[cats.effect.unsafe.IORuntime.global]] */
  val scheduler: Scheduler = {
    val threadPrefix: String = "io-scheduler"
    val scheduler = new ScheduledThreadPoolExecutor(
      1,
      { r =>
        val t = new Thread(r)
        t.setName(threadPrefix)
        t.setDaemon(true)
        t.setPriority(Thread.MAX_PRIORITY)
        t
      })
    scheduler.setRemoveOnCancelPolicy(true)
    Scheduler.fromScheduledExecutor(scheduler)
  }

}

This runtime can be instantiated many times.
For the observed contention, it is instantiated once and re-used.

@armanbilge
Copy link
Member

Thanks for the detailed write up! Just repeating a couple notes from Discord for anyone reading the issue here:

  1. The SynchronizedMap here is used for tracking fibers for the new "fiber dump" feature introduced in 3.3.0. So, the easiest way to remove this overhead is simply to disable diagnostics with -Dcats.effect.tracing.mode=off argument to JVM.
  2. Cats Effect's own WorkStealingThreadPool is far better optimized for these diagnostics. So if possible, using the WorkStealingThreadPool as the shared ExecutionContext for your application is better than using an external ExecutionContext for the IORuntime.

@yanns
Copy link
Contributor Author

yanns commented Dec 19, 2021

1. The `SynchronizedMap` here is used for tracking fibers for the new "fiber dump" feature introduced in 3.3.0. So, the easiest way to remove this overhead is simply to disable diagnostics with [`-Dcats.effect.tracing.mode=off`](https://typelevel.org/cats-effect/docs/tracing#configuration) argument to JVM.

This is such a good feature - difficult decision to disable it.

2. Cats Effect's own `WorkStealingThreadPool` is far better optimized for these diagnostics. So if possible, using the `WorkStealingThreadPool` as the shared `ExecutionContext` for your application is better than using an external `ExecutionContext` for the `IORuntime`.

OK I see.
Our application is heavily based on Future and only a small portion on it is using IO to enable to progressive migration.

As a main execution context, mainly used to run Future, I don't see that WorkStealingThreadPool can work here.

scala.concurrent.impl.ExecutionContextImpl.DefaultThreadFactory is able to detect scala.concurrent.blocking code blocks to add new threads if necessary, a feature that we cannot live without.
And we also have some metrics based on the ForkJoinPool, like getQueuedSubmissionCount:

val ThreadPoolMetrics: Map[MetricName, ForkJoinPool => Long] = {
    import ThreadPoolMetricsNames._
    Map(
      active -> (_.getActiveThreadCount),
      poolSize -> (_.getPoolSize),
      task -> (_.getActiveThreadCount),
      completedTask -> (_.getRunningThreadCount),
      queuedTasks -> (_.getQueuedSubmissionCount),
      stealCount -> (_.getStealCount)
    )
  }

I could not find any possibility to get some metrics from WorkStealingThreadPool.

At then end, either we keep this current execution context also for the IORuntime with the threads contention.
Or we have to use two execution contexts: one for the Future part, and one for the IO part.

@armanbilge
Copy link
Member

scala.concurrent.impl.ExecutionContextImpl.DefaultThreadFactory is able to detect scala.concurrent.blocking code blocks to add new threads if necessary, a feature that we cannot live without.

Good news, WorkStealingThreadPool supports this 👍

And we also have some metrics based on the ForkJoinPool, like getQueuedSubmissionCount:

WorkStealingThreadPool exposes similar metrics as MBeans, would that work for you? See the "Fiber Runtime Observability" in https://github.com/typelevel/cats-effect/releases/tag/v3.3.0

@yanns
Copy link
Contributor Author

yanns commented Dec 19, 2021

scala.concurrent.impl.ExecutionContextImpl.DefaultThreadFactory is able to detect scala.concurrent.blocking code blocks to add new threads if necessary, a feature that we cannot live without.

Good news, WorkStealingThreadPool supports this 👍

I see now that cats.effect.unsafe.WorkerThread extend scala.concurrent.BlockContext 👍

And we also have some metrics based on the ForkJoinPool, like getQueuedSubmissionCount:

WorkStealingThreadPool exposes similar metrics as MBeans, would that work for you? See the "Fiber Runtime Observability" in https://github.com/typelevel/cats-effect/releases/tag/v3.3.0

Oh yes, this can be a way to achieve that. I'll have a look. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants