diff --git a/benchmarks/shared/src/main/scala/cats/effect/benchmarks/MapCallsBenchmark.scala b/benchmarks/shared/src/main/scala/cats/effect/benchmarks/MapCallsBenchmark.scala index 6105d4818e..2306e30cfc 100644 --- a/benchmarks/shared/src/main/scala/cats/effect/benchmarks/MapCallsBenchmark.scala +++ b/benchmarks/shared/src/main/scala/cats/effect/benchmarks/MapCallsBenchmark.scala @@ -40,7 +40,7 @@ class MapCallsBenchmark { import MapCallsBenchmark.test @Benchmark - def one(): Long = test(12000, 1) + def one(): Long = test(1, 1) @Benchmark def batch30(): Long = test(12000 / 30, 30) diff --git a/build.sbt b/build.sbt index 336bb115b6..e14f0a6816 100644 --- a/build.sbt +++ b/build.sbt @@ -182,6 +182,20 @@ val mimaSettings = Seq( // change in encoding of value classes in generic methods https://github.com/lightbend/mima/issues/423 exclude[IncompatibleSignatureProblem]("cats.effect.Blocker.apply"), exclude[IncompatibleSignatureProblem]("cats.effect.Blocker.fromExecutorService"), + // Tracing - https://github.com/typelevel/cats-effect/pull/854 + exclude[DirectMissingMethodProblem]("cats.effect.IO#Async.apply"), + exclude[DirectMissingMethodProblem]("cats.effect.IO#Bind.apply"), + exclude[IncompatibleResultTypeProblem]("cats.effect.IO#Async.k"), + exclude[DirectMissingMethodProblem]("cats.effect.IO#Async.copy"), + exclude[IncompatibleResultTypeProblem]("cats.effect.IO#Async.copy$default$1"), + exclude[DirectMissingMethodProblem]("cats.effect.IO#Async.this"), + exclude[DirectMissingMethodProblem]("cats.effect.IO#Bind.copy"), + exclude[DirectMissingMethodProblem]("cats.effect.IO#Bind.this"), + exclude[DirectMissingMethodProblem]("cats.effect.IO#Map.index"), + exclude[IncompatibleMethTypeProblem]("cats.effect.IO#Map.copy"), + exclude[IncompatibleResultTypeProblem]("cats.effect.IO#Map.copy$default$3"), + exclude[IncompatibleMethTypeProblem]("cats.effect.IO#Map.this"), + exclude[IncompatibleMethTypeProblem]("cats.effect.IO#Map.apply"), // revise Deferred, MVarConcurrent, LinkedLongMap - https://github.com/typelevel/cats-effect/pull/918 exclude[IncompatibleResultTypeProblem]("cats.effect.concurrent.Deferred#State#Unset.waiting"), exclude[DirectMissingMethodProblem]("cats.effect.concurrent.Deferred#State#Unset.copy"), @@ -233,7 +247,7 @@ lazy val sharedSourcesSettings = Seq( lazy val root = project .in(file(".")) .disablePlugins(MimaPlugin) - .aggregate(coreJVM, coreJS, lawsJVM, lawsJS) + .aggregate(coreJVM, coreJS, lawsJVM, lawsJS, tracingTests) .settings(skipOnPublishSettings) lazy val core = crossProject(JSPlatform, JVMPlatform) @@ -298,6 +312,37 @@ lazy val laws = crossProject(JSPlatform, JVMPlatform) lazy val lawsJVM = laws.jvm lazy val lawsJS = laws.js +lazy val FullTracingTest = config("fulltracing").extend(Test) + +lazy val tracingTests = project + .in(file("tracing-tests")) + .dependsOn(coreJVM) + .settings(commonSettings ++ skipOnPublishSettings) + .settings( + libraryDependencies ++= Seq( + "org.typelevel" %%% "cats-laws" % CatsVersion, + "org.typelevel" %%% "discipline-scalatest" % DisciplineScalatestVersion % Test + ) + ) + .configs(FullTracingTest) + .settings(inConfig(FullTracingTest)(Defaults.testSettings): _*) + .settings( + unmanagedSourceDirectories in FullTracingTest += { + baseDirectory.value.getParentFile / "src" / "fulltracing" / "scala" + }, + test in Test := (test in Test).dependsOn(test in FullTracingTest).value, + fork in Test := true, + fork in FullTracingTest := true, + javaOptions in Test ++= Seq( + "-Dcats.effect.tracing=true", + "-Dcats.effect.stackTracingMode=cached" + ), + javaOptions in FullTracingTest ++= Seq( + "-Dcats.effect.tracing=true", + "-Dcats.effect.stackTracingMode=full" + ) + ) + lazy val benchmarksPrev = project .in(file("benchmarks/vPrev")) .settings(commonSettings ++ skipOnPublishSettings ++ sharedSourcesSettings) diff --git a/core/js/src/main/scala/cats/effect/internals/IOPlatform.scala b/core/js/src/main/scala/cats/effect/internals/IOPlatform.scala index d79747909f..0436804fdb 100644 --- a/core/js/src/main/scala/cats/effect/internals/IOPlatform.scala +++ b/core/js/src/main/scala/cats/effect/internals/IOPlatform.scala @@ -45,15 +45,6 @@ private[effect] object IOPlatform { } } - /** - * Establishes the maximum stack depth for `IO#map` operations - * for JavaScript. - * - * The default for JavaScript is 32, from which we subtract 1 - * as an optimization. - */ - final val fusionMaxStackDepth = 31 - /** Returns `true` if the underlying platform is the JVM, * `false` if it's JavaScript. */ final val isJVM = false diff --git a/core/js/src/main/scala/cats/effect/internals/IOTimer.scala b/core/js/src/main/scala/cats/effect/internals/IOTimer.scala index 10d0343223..9a99936b89 100644 --- a/core/js/src/main/scala/cats/effect/internals/IOTimer.scala +++ b/core/js/src/main/scala/cats/effect/internals/IOTimer.scala @@ -33,7 +33,7 @@ final private[internals] class IOTimer(ec: ExecutionContext) extends Timer[IO] { def sleep(timespan: FiniteDuration): IO[Unit] = IO.Async(new IOForkedStart[Unit] { - def apply(conn: IOConnection, cb: Either[Throwable, Unit] => Unit): Unit = { + def apply(conn: IOConnection, ctx: IOContext, cb: Either[Throwable, Unit] => Unit): Unit = { val task = setTimeout(timespan.toMillis, ec, new ScheduledTick(conn, cb)) // On the JVM this would need a ForwardCancelable, // but not on top of JS as we don't have concurrency diff --git a/core/js/src/main/scala/cats/effect/internals/TracingPlatform.scala b/core/js/src/main/scala/cats/effect/internals/TracingPlatform.scala new file mode 100644 index 0000000000..3a9decc42e --- /dev/null +++ b/core/js/src/main/scala/cats/effect/internals/TracingPlatform.scala @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.internals + +object TracingPlatform { + final val isCachedStackTracing: Boolean = false + + final val isFullStackTracing: Boolean = false + + final val isStackTracing: Boolean = isFullStackTracing || isCachedStackTracing + + final val traceBufferSize: Int = 32 +} diff --git a/core/jvm/src/main/java/cats/effect/internals/TracingPlatform.java b/core/jvm/src/main/java/cats/effect/internals/TracingPlatform.java new file mode 100644 index 0000000000..099598e12b --- /dev/null +++ b/core/jvm/src/main/java/cats/effect/internals/TracingPlatform.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.internals; + +import java.util.Optional; + +/** + * Holds platform-specific flags that control tracing behavior. + * + * The Scala compiler inserts a volatile bitmap access for module field accesses. + * Because the `tracingMode` flag is read in various IO combinators, we are opting + * to define it in a Java source file to avoid the volatile access. + * + * INTERNAL API. + */ +public final class TracingPlatform { + + /** + * Sets stack tracing mode for a JVM process, which controls + * how much stack trace information is captured. + * Acceptable values are: NONE, CACHED, FULL. + */ + private static final String stackTracingMode = Optional.ofNullable(System.getProperty("cats.effect.stackTracingMode")) + .filter(x -> !x.isEmpty()) + .orElse("cached"); + + public static final boolean isCachedStackTracing = stackTracingMode.equalsIgnoreCase("cached"); + + public static final boolean isFullStackTracing = stackTracingMode.equalsIgnoreCase("full"); + + public static final boolean isStackTracing = isFullStackTracing || isCachedStackTracing; + + /** + * The number of trace lines to retain during tracing. If more trace + * lines are produced, then the oldest trace lines will be discarded. + * Automatically rounded up to the nearest power of 2. + */ + public static final int traceBufferSize = Optional.ofNullable(System.getProperty("cats.effect.traceBufferSize")) + .filter(x -> !x.isEmpty()) + .flatMap(x -> { + try { + return Optional.of(Integer.valueOf(x)); + } catch (Exception e) { + return Optional.empty(); + } + }) + .orElse(128); + +} diff --git a/core/jvm/src/main/scala/cats/effect/internals/IOPlatform.scala b/core/jvm/src/main/scala/cats/effect/internals/IOPlatform.scala index da98b4e0e3..e3a378d633 100644 --- a/core/jvm/src/main/scala/cats/effect/internals/IOPlatform.scala +++ b/core/jvm/src/main/scala/cats/effect/internals/IOPlatform.scala @@ -20,7 +20,7 @@ import java.util.concurrent.locks.AbstractQueuedSynchronizer import cats.effect.IO import scala.concurrent.blocking import scala.concurrent.duration.{Duration, FiniteDuration} -import scala.util.{Either, Try} +import scala.util.Either private[effect] object IOPlatform { @@ -73,37 +73,6 @@ private[effect] object IOPlatform { } } - /** - * Establishes the maximum stack depth for `IO#map` operations. - * - * The default is `128`, from which we subtract one as an - * optimization. This default has been reached like this: - * - * - according to official docs, the default stack size on 32-bits - * Windows and Linux was 320 KB, whereas for 64-bits it is 1024 KB - * - according to measurements chaining `Function1` references uses - * approximately 32 bytes of stack space on a 64 bits system; - * this could be lower if "compressed oops" is activated - * - therefore a "map fusion" that goes 128 in stack depth can use - * about 4 KB of stack space - * - * If this parameter becomes a problem, it can be tuned by setting - * the `cats.effect.fusionMaxStackDepth` system property when - * executing the Java VM: - * - *
- * java -Dcats.effect.fusionMaxStackDepth=32 \ - * ... - *- */ - final val fusionMaxStackDepth = - Option(System.getProperty("cats.effect.fusionMaxStackDepth", "")) - .filter(s => s != null && s.nonEmpty) - .flatMap(s => Try(s.toInt).toOption) - .filter(_ > 0) - .map(_ - 1) - .getOrElse(127) - /** * Returns `true` if the underlying platform is the JVM, * `false` if it's JavaScript. diff --git a/core/jvm/src/main/scala/cats/effect/internals/IOTimer.scala b/core/jvm/src/main/scala/cats/effect/internals/IOTimer.scala index cf5e897712..0049769200 100644 --- a/core/jvm/src/main/scala/cats/effect/internals/IOTimer.scala +++ b/core/jvm/src/main/scala/cats/effect/internals/IOTimer.scala @@ -38,7 +38,7 @@ final private[internals] class IOTimer private (ec: ExecutionContext, sc: Schedu override def sleep(timespan: FiniteDuration): IO[Unit] = IO.Async(new IOForkedStart[Unit] { - def apply(conn: IOConnection, cb: T[Unit]): Unit = { + def apply(conn: IOConnection, ctx: IOContext, cb: T[Unit]): Unit = { // Doing what IO.cancelable does val ref = ForwardCancelable() conn.push(ref.cancel) diff --git a/core/shared/src/main/scala/cats/effect/IO.scala b/core/shared/src/main/scala/cats/effect/IO.scala index 517fa312c7..f534da9f62 100644 --- a/core/shared/src/main/scala/cats/effect/IO.scala +++ b/core/shared/src/main/scala/cats/effect/IO.scala @@ -18,7 +18,7 @@ package cats package effect import cats.effect.internals._ -import cats.effect.internals.IOPlatform.fusionMaxStackDepth +import cats.effect.internals.TracingPlatform.{isCachedStackTracing, isFullStackTracing} import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.duration._ @@ -26,6 +26,7 @@ import scala.concurrent.{ExecutionContext, Future, Promise, TimeoutException} import scala.util.control.NonFatal import scala.util.{Failure, Left, Right, Success, Try} import cats.data.Ior +import cats.effect.tracing.{IOEvent, IOTrace} /** * A pure abstraction representing the intention to perform a @@ -100,18 +101,18 @@ sealed abstract class IO[+A] extends internals.IOBinaryCompat[A] { * failures would be completely silent and `IO` references would * never terminate on evaluation. */ - final def map[B](f: A => B): IO[B] = - this match { - case Map(source, g, index) => - // Allowed to do fixed number of map operations fused before - // resetting the counter in order to avoid stack overflows; - // See `IOPlatform` for details on this maximum. - if (index != fusionMaxStackDepth) Map(source, g.andThen(f), index + 1) - else Map(this, f, 0) - case _ => - Map(this, f, 0) + final def map[B](f: A => B): IO[B] = { + val trace = if (isCachedStackTracing) { + IOTracing.cached(4, f.getClass) + } else if (isFullStackTracing) { + IOTracing.uncached(4) + } else { + null } + Map(this, f, trace) + } + /** * Monadic bind on `IO`, used for sequentially composing two `IO` * actions, where the value produced by the first `IO` is passed as @@ -127,8 +128,17 @@ sealed abstract class IO[+A] extends internals.IOBinaryCompat[A] { * failures would be completely silent and `IO` references would * never terminate on evaluation. */ - final def flatMap[B](f: A => IO[B]): IO[B] = - Bind(this, f) + final def flatMap[B](f: A => IO[B]): IO[B] = { + val trace = if (isCachedStackTracing) { + IOTracing.cached(3, f.getClass) + } else if (isFullStackTracing) { + IOTracing.uncached(3) + } else { + null + } + + Bind(this, f, trace) + } /** * Materializes any sequenced exceptions into value space, where @@ -144,7 +154,7 @@ sealed abstract class IO[+A] extends internals.IOBinaryCompat[A] { * @see [[IO.raiseError]] */ def attempt: IO[Either[Throwable, A]] = - Bind(this, AttemptIO.asInstanceOf[A => IO[Either[Throwable, A]]]) + Bind(this, AttemptIO.asInstanceOf[A => IO[Either[Throwable, A]]], null) /** * Produces an `IO` reference that should execute the source on @@ -327,7 +337,7 @@ sealed abstract class IO[+A] extends internals.IOBinaryCompat[A] { IORunLoop.step(this) match { case Pure(a) => Some(a) case RaiseError(e) => throw e - case self @ Async(_, _) => + case self @ Async(_, _, _) => IOPlatform.unsafeResync(self, limit) case _ => // $COVERAGE-OFF$ @@ -564,7 +574,7 @@ sealed abstract class IO[+A] extends internals.IOBinaryCompat[A] { * be released */ final def bracket[B](use: A => IO[B])(release: A => IO[Unit]): IO[B] = - bracketCase(use)((a, _) => release(a)) + IOBracket(this)(use)((a, _) => release(a)) /** * Returns a new `IO` task that treats the source task as the @@ -672,7 +682,7 @@ sealed abstract class IO[+A] extends internals.IOBinaryCompat[A] { * Implements `ApplicativeError.handleErrorWith`. */ def handleErrorWith[AA >: A](f: Throwable => IO[AA]): IO[AA] = - IO.Bind(this, new IOFrame.ErrorHandler(f)) + IO.Bind(this, new IOFrame.ErrorHandler(f), null) /** * Zips both this action and the parameter in parallel. @@ -706,7 +716,7 @@ sealed abstract class IO[+A] extends internals.IOBinaryCompat[A] { * in case it ends in success */ def redeem[B](recover: Throwable => B, map: A => B): IO[B] = - IO.Bind(this, new IOFrame.Redeem(recover, map)) + IO.Bind(this, new IOFrame.Redeem(recover, map), null) /** * Returns a new value that transforms the result of the source, @@ -738,7 +748,7 @@ sealed abstract class IO[+A] extends internals.IOBinaryCompat[A] { * in case of success */ def redeemWith[B](recover: Throwable => IO[B], bind: A => IO[B]): IO[B] = - IO.Bind(this, new IOFrame.RedeemWith(recover, bind)) + IO.Bind(this, new IOFrame.RedeemWith(recover, bind), null) override def toString: String = this match { case Pure(a) => s"IO($a)" @@ -1116,8 +1126,14 @@ object IO extends IOInstances { * Any exceptions thrown by the effect will be caught and sequenced * into the `IO`. */ - def delay[A](body: => A): IO[A] = - Delay(() => body) + def delay[A](body: => A): IO[A] = { + val nextIo = Delay(() => body) + if (isFullStackTracing) { + IOTracing.decorated(nextIo, 1) + } else { + nextIo + } + } /** * Suspends a synchronous side effect which produces an `IO` in `IO`. @@ -1127,8 +1143,14 @@ object IO extends IOInstances { * thrown by the side effect will be caught and sequenced into the * `IO`. */ - def suspend[A](thunk: => IO[A]): IO[A] = - Suspend(() => thunk) + def suspend[A](thunk: => IO[A]): IO[A] = { + val nextIo = Suspend(() => thunk) + if (isFullStackTracing) { + IOTracing.decorated(nextIo, 2) + } else { + nextIo + } + } /** * Suspends a pure value in `IO`. @@ -1140,7 +1162,14 @@ object IO extends IOInstances { * (when evaluated) than `IO(42)`, due to avoiding the allocation of * extra thunks. */ - def pure[A](a: A): IO[A] = Pure(a) + def pure[A](a: A): IO[A] = { + val nextIo = Pure(a) + if (isFullStackTracing) { + IOTracing.decorated(nextIo, 0) + } else { + nextIo + } + } /** Alias for `IO.pure(())`. */ val unit: IO[Unit] = pure(()) @@ -1205,12 +1234,21 @@ object IO extends IOInstances { * * @see [[asyncF]] and [[cancelable]] */ - def async[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] = - Async { (_, cb) => + def async[A](k: (Either[Throwable, A] => Unit) => Unit): IO[A] = { + val trace = if (isCachedStackTracing) { + IOTracing.cached(5, k.getClass) + } else if (isFullStackTracing) { + IOTracing.uncached(5) + } else { + null + } + + Async[A]((_, _, cb) => { val cb2 = Callback.asyncIdempotent(null, cb) try k(cb2) catch { case NonFatal(t) => cb2(Left(t)) } - } + }, trace = trace) + } /** * Suspends an asynchronous side effect in `IO`, this being a variant @@ -1236,20 +1274,32 @@ object IO extends IOInstances { * * @see [[async]] and [[cancelable]] */ - def asyncF[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): IO[A] = - Async { (conn, cb) => - // Must create new connection, otherwise we can have a race - // condition b/t the bind continuation and `startCancelable` below - val conn2 = IOConnection() - conn.push(conn2.cancel) - // The callback handles "conn.pop()" - val cb2 = Callback.asyncIdempotent(conn, cb) - val fa = - try k(cb2) - catch { case NonFatal(t) => IO(cb2(Left(t))) } - IORunLoop.startCancelable(fa, conn2, Callback.report) + def asyncF[A](k: (Either[Throwable, A] => Unit) => IO[Unit]): IO[A] = { + val trace = if (isCachedStackTracing) { + IOTracing.cached(6, k.getClass) + } else if (isFullStackTracing) { + IOTracing.uncached(6) + } else { + null } + Async[A]( + (conn, _, cb) => { + // Must create new connection, otherwise we can have a race + // condition b/t the bind continuation and `startCancelable` below + val conn2 = IOConnection() + conn.push(conn2.cancel) + // The callback handles "conn.pop()" + val cb2 = Callback.asyncIdempotent(conn, cb) + val fa = + try k(cb2) + catch { case NonFatal(t) => IO(cb2(Left(t))) } + IORunLoop.startCancelable(fa, conn2, Callback.report) + }, + trace = trace + ) + } + /** * Builds a cancelable `IO`. * @@ -1289,27 +1339,39 @@ object IO extends IOInstances { * @see [[asyncF]] for a more potent version that does hook into * the underlying cancelation model */ - def cancelable[A](k: (Either[Throwable, A] => Unit) => CancelToken[IO]): IO[A] = - Async { (conn, cb) => - val cb2 = Callback.asyncIdempotent(conn, cb) - val ref = ForwardCancelable() - conn.push(ref.cancel) - // Race condition test — no need to execute `k` if it was already cancelled, - // ensures that fiber.cancel will always wait for the finalizer if `k` - // is executed — note that `isCanceled` is visible here due to `push` - if (!conn.isCanceled) - ref.complete( - try k(cb2) - catch { - case NonFatal(t) => - cb2(Left(t)) - IO.unit - } - ) - else - ref.complete(IO.unit) + def cancelable[A](k: (Either[Throwable, A] => Unit) => CancelToken[IO]): IO[A] = { + val trace = if (isCachedStackTracing) { + IOTracing.cached(7, k.getClass) + } else if (isFullStackTracing) { + IOTracing.uncached(7) + } else { + null } + Async[A]( + (conn, _, cb) => { + val cb2 = Callback.asyncIdempotent(conn, cb) + val ref = ForwardCancelable() + conn.push(ref.cancel) + // Race condition test — no need to execute `k` if it was already cancelled, + // ensures that fiber.cancel will always wait for the finalizer if `k` + // is executed — note that `isCanceled` is visible here due to `push` + if (!conn.isCanceled) + ref.complete( + try k(cb2) + catch { + case NonFatal(t) => + cb2(Left(t)) + IO.unit + } + ) + else + ref.complete(IO.unit) + }, + trace = trace + ) + } + /** * Constructs an `IO` which sequences the specified exception. * @@ -1320,7 +1382,14 @@ object IO extends IOInstances { * * @see [[IO#attempt]] */ - def raiseError[A](e: Throwable): IO[A] = RaiseError(e) + def raiseError[A](e: Throwable): IO[A] = { + val nextIo = RaiseError(e) + if (isFullStackTracing) { + IOTracing.decorated(nextIo, 8) + } else { + nextIo + } + } /** * Constructs an `IO` which evaluates the given `Future` and @@ -1478,7 +1547,7 @@ object IO extends IOInstances { * }}} */ val cancelBoundary: IO[Unit] = { - val start: Start[Unit] = (_, cb) => cb(Callback.rightUnit) + val start: Start[Unit] = (_, _, cb) => cb(Callback.rightUnit) Async(start, trampolineAfter = true) } @@ -1559,8 +1628,19 @@ object IO extends IOInstances { def contextShift(ec: ExecutionContext): ContextShift[IO] = IOContextShift(ec) + /** + * Returns the accumulated trace of the currently active fiber. + */ + val trace: IO[IOTrace] = + IO.Async { (_, ctx, cb) => + cb(Right(ctx.trace())) + } + /* -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= */ /* IO's internal encoding: */ + // In the Bind, Map, and Async constructors, you will notice that traces + // are typed as an `AnyRef`. This seems to avoid a performance hit when + // tracing is disabled, particularly with the C2 JIT compiler. /** Corresponds to [[IO.pure]]. */ final private[effect] case class Pure[+A](a: A) extends IO[A] @@ -1575,12 +1655,12 @@ object IO extends IOInstances { final private[effect] case class Suspend[+A](thunk: () => IO[A]) extends IO[A] /** Corresponds to [[IO.flatMap]]. */ - final private[effect] case class Bind[E, +A](source: IO[E], f: E => IO[A]) extends IO[A] + final private[effect] case class Bind[E, +A](source: IO[E], f: E => IO[A], trace: AnyRef) extends IO[A] /** Corresponds to [[IO.map]]. */ - final private[effect] case class Map[E, +A](source: IO[E], f: E => A, index: Int) extends IO[A] with (E => IO[A]) { + final private[effect] case class Map[E, +A](source: IO[E], f: E => A, trace: AnyRef) extends IO[A] with (E => IO[A]) { override def apply(value: E): IO[A] = - new Pure(f(value)) + Pure(f(value)) } /** @@ -1598,10 +1678,13 @@ object IO extends IOInstances { * signal downstream */ final private[effect] case class Async[+A]( - k: (IOConnection, Either[Throwable, A] => Unit) => Unit, - trampolineAfter: Boolean = false + k: (IOConnection, IOContext, Either[Throwable, A] => Unit) => Unit, + trampolineAfter: Boolean = false, + trace: AnyRef = null ) extends IO[A] + final private[effect] case class Trace[A](source: IO[A], trace: IOEvent) extends IO[A] + /** * An internal state for that optimizes changes to * [[internals.IOConnection]]. diff --git a/core/shared/src/main/scala/cats/effect/internals/ForwardCancelable.scala b/core/shared/src/main/scala/cats/effect/internals/ForwardCancelable.scala index f1ac626cfa..ea30c9eb2b 100644 --- a/core/shared/src/main/scala/cats/effect/internals/ForwardCancelable.scala +++ b/core/shared/src/main/scala/cats/effect/internals/ForwardCancelable.scala @@ -35,11 +35,11 @@ final private[effect] class ForwardCancelable private () { private[this] val state = new AtomicReference[State](init) val cancel: CancelToken[IO] = { - @tailrec def loop(conn: IOConnection, cb: Callback.T[Unit]): Unit = + @tailrec def loop(conn: IOConnection, ctx: IOContext, cb: Callback.T[Unit]): Unit = state.get() match { case current @ Empty(list) => if (!state.compareAndSet(current, Empty(cb :: list))) - loop(conn, cb) + loop(conn, ctx, cb) case Active(token) => state.lazySet(finished) // GC purposes diff --git a/core/shared/src/main/scala/cats/effect/internals/IOBracket.scala b/core/shared/src/main/scala/cats/effect/internals/IOBracket.scala index 59b7d7b940..180478e354 100644 --- a/core/shared/src/main/scala/cats/effect/internals/IOBracket.scala +++ b/core/shared/src/main/scala/cats/effect/internals/IOBracket.scala @@ -30,7 +30,7 @@ private[effect] object IOBracket { * Implementation for `IO.bracketCase`. */ def apply[A, B](acquire: IO[A])(use: A => IO[B])(release: (A, ExitCase[Throwable]) => IO[Unit]): IO[B] = - IO.Async { (conn, cb) => + IO.Async[B] { (conn, ctx, cb) => // Placeholder for the future finalizer val deferredRelease = ForwardCancelable() conn.push(deferredRelease.cancel) @@ -38,9 +38,13 @@ private[effect] object IOBracket { // was cancelled already, to ensure that `cancel` really blocks if we // start `acquire` — n.b. `isCanceled` is visible here due to `push` if (!conn.isCanceled) { - // Note `acquire` is uncancelable due to usage of `IORunLoop.start` + // Note `acquire` is uncancelable due to usage of `IORunLoop.restart` // (in other words it is disconnected from our IOConnection) - IORunLoop.start[A](acquire, new BracketStart(use, release, conn, deferredRelease, cb)) + // We don't need to explicitly pass back a reference to `ctx` because + // it is held in `RestartCallback` and `BracketStart`. + // Updates to it in the run-loop will be visible when the callback is + // invoked, even across asynchronous boundaries. + IORunLoop.restart(acquire, ctx, new BracketStart(use, release, ctx, conn, deferredRelease, cb)) } else { deferredRelease.complete(IO.unit) } @@ -50,6 +54,7 @@ private[effect] object IOBracket { final private class BracketStart[A, B]( use: A => IO[B], release: (A, ExitCase[Throwable]) => IO[Unit], + ctx: IOContext, conn: IOConnection, deferredRelease: ForwardCancelable, cb: Callback.T[B] @@ -67,6 +72,7 @@ private[effect] object IOBracket { // Introducing a light async boundary, otherwise executing the required // logic directly will yield a StackOverflowException result = ea + ec.execute(this) } @@ -87,7 +93,7 @@ private[effect] object IOBracket { fb.flatMap(frame) } // Actual execution - IORunLoop.startCancelable(onNext, conn, cb) + IORunLoop.restartCancelable(onNext, conn, ctx, cb) } case error @ Left(_) => @@ -100,7 +106,7 @@ private[effect] object IOBracket { * Implementation for `IO.guaranteeCase`. */ def guaranteeCase[A](source: IO[A], release: ExitCase[Throwable] => IO[Unit]): IO[A] = - IO.Async { (conn, cb) => + IO.Async { (conn, ctx, cb) => // Light async boundary, otherwise this will trigger a StackOverflowException ec.execute(new Runnable { def run(): Unit = { @@ -113,10 +119,11 @@ private[effect] object IOBracket { // the connection was already cancelled — n.b. we don't need // to trigger `release` otherwise, because it already happened if (!conn.isCanceled) { - IORunLoop.startCancelable(onNext, conn, cb) + IORunLoop.restartCancelable(onNext, conn, ctx, cb) } } }) + // TODO: Trace here? } final private class BracketReleaseFrame[A, B](a: A, releaseFn: (A, ExitCase[Throwable]) => IO[Unit]) diff --git a/core/shared/src/main/scala/cats/effect/internals/IOContext.scala b/core/shared/src/main/scala/cats/effect/internals/IOContext.scala new file mode 100644 index 0000000000..cdfbc129c5 --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/internals/IOContext.scala @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.internals + +import cats.effect.tracing.{IOEvent, IOTrace} +import cats.effect.internals.TracingPlatform.traceBufferSize + +/** + * INTERNAL API — Holds state related to the execution of + * an IO and should be threaded across multiple invocations + * of the run-loop associated with the same fiber. + */ +final private[effect] class IOContext() { + + private[this] val events: RingBuffer[IOEvent] = new RingBuffer(traceBufferSize) + private[this] var captured: Int = 0 + private[this] var omitted: Int = 0 + + def pushEvent(fr: IOEvent): Unit = { + captured += 1 + if (events.push(fr) != null) omitted += 1 + } + + def trace(): IOTrace = + IOTrace(events.toList, captured, omitted) + +} diff --git a/core/shared/src/main/scala/cats/effect/internals/IOForkedStart.scala b/core/shared/src/main/scala/cats/effect/internals/IOForkedStart.scala index 743565d13d..a868da3f3f 100644 --- a/core/shared/src/main/scala/cats/effect/internals/IOForkedStart.scala +++ b/core/shared/src/main/scala/cats/effect/internals/IOForkedStart.scala @@ -53,8 +53,8 @@ private[effect] object IOForkedStart { @tailrec def detect(task: IO[_], limit: Int = 8): Boolean = if (limit > 0) { task match { - case IO.Async(k, _) => k.isInstanceOf[IOForkedStart[_]] - case IO.Bind(other, _) => detect(other, limit - 1) + case IO.Async(k, _, _) => k.isInstanceOf[IOForkedStart[_]] + case IO.Bind(other, _, _) => detect(other, limit - 1) case IO.Map(other, _, _) => detect(other, limit - 1) case IO.ContextSwitch(other, _, _) => detect(other, limit - 1) case _ => false diff --git a/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala b/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala index 9a84a81bda..d867777903 100644 --- a/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala +++ b/core/shared/src/main/scala/cats/effect/internals/IOParMap.scala @@ -29,7 +29,7 @@ private[effect] object IOParMap { def apply[A, B, C](cs: ContextShift[IO], fa: IO[A], fb: IO[B])(f: (A, B) => C): IO[C] = IO.Async( new IOForkedStart[C] { - def apply(conn: IOConnection, cb: Callback.T[C]) = + def apply(conn: IOConnection, ctx: IOContext, cb: Callback.T[C]) = // For preventing stack-overflow errors; using a // trampolined execution context, so no thread forks TrampolineEC.immediate.execute(new ParMapRunnable(cs, fa, fb, f, conn, cb)) diff --git a/core/shared/src/main/scala/cats/effect/internals/IORace.scala b/core/shared/src/main/scala/cats/effect/internals/IORace.scala index ad61b93571..a2aa57d4b4 100644 --- a/core/shared/src/main/scala/cats/effect/internals/IORace.scala +++ b/core/shared/src/main/scala/cats/effect/internals/IORace.scala @@ -58,7 +58,7 @@ private[effect] object IORace { Logger.reportFailure(err) } - val start: Start[Either[A, B]] = (conn, cb) => { + val start: Start[Either[A, B]] = (conn, _, cb) => { val active = new AtomicBoolean(true) // Cancelable connection for the left value val connL = IOConnection() @@ -94,7 +94,7 @@ private[effect] object IORace { * Implementation for `IO.racePair` */ def pair[A, B](cs: ContextShift[IO], lh: IO[A], rh: IO[B]): IO[Pair[A, B]] = { - val start: Start[Pair[A, B]] = (conn, cb) => { + val start: Start[Pair[A, B]] = (conn, _, cb) => { val active = new AtomicBoolean(true) // Cancelable connection for the left value val connL = IOConnection() diff --git a/core/shared/src/main/scala/cats/effect/internals/IORunLoop.scala b/core/shared/src/main/scala/cats/effect/internals/IORunLoop.scala index 9e7217ddca..582cf439dd 100644 --- a/core/shared/src/main/scala/cats/effect/internals/IORunLoop.scala +++ b/core/shared/src/main/scala/cats/effect/internals/IORunLoop.scala @@ -17,7 +17,10 @@ package cats.effect.internals import cats.effect.IO -import cats.effect.IO.{Async, Bind, ContextSwitch, Delay, Map, Pure, RaiseError, Suspend} +import cats.effect.IO.{Async, Bind, ContextSwitch, Delay, Map, Pure, RaiseError, Suspend, Trace} +import cats.effect.tracing.IOEvent +import cats.effect.internals.TracingPlatform.isStackTracing + import scala.util.control.NonFatal private[effect] object IORunLoop { @@ -31,14 +34,20 @@ private[effect] object IORunLoop { * with the result when completed. */ def start[A](source: IO[A], cb: Either[Throwable, A] => Unit): Unit = - loop(source, IOConnection.uncancelable, cb.asInstanceOf[Callback], null, null, null) + loop(source, IOConnection.uncancelable, cb.asInstanceOf[Callback], null, null, null, null) + + def restart[A](source: IO[A], ctx: IOContext, cb: Either[Throwable, A] => Unit): Unit = + loop(source, IOConnection.uncancelable, cb.asInstanceOf[Callback], ctx, null, null, null) /** * Evaluates the given `IO` reference, calling the given callback * with the result when completed. */ def startCancelable[A](source: IO[A], conn: IOConnection, cb: Either[Throwable, A] => Unit): Unit = - loop(source, conn, cb.asInstanceOf[Callback], null, null, null) + loop(source, conn, cb.asInstanceOf[Callback], null, null, null, null) + + def restartCancelable[A](source: IO[A], conn: IOConnection, ctx: IOContext, cb: Either[Throwable, A] => Unit): Unit = + loop(source, conn, cb.asInstanceOf[Callback], ctx, null, null, null) /** * Loop for evaluating an `IO` value. @@ -51,6 +60,7 @@ private[effect] object IORunLoop { source: Current, cancelable: IOConnection, cb: Either[Throwable, Any] => Unit, + ctxRef: IOContext, rcbRef: RestartCallback, bFirstRef: Bind, bRestRef: CallStack @@ -58,6 +68,7 @@ private[effect] object IORunLoop { var currentIO: Current = source // Can change on a context switch var conn: IOConnection = cancelable + var ctx: IOContext = ctxRef var bFirst: Bind = bFirstRef var bRest: CallStack = bRestRef var rcb: RestartCallback = rcbRef @@ -70,7 +81,12 @@ private[effect] object IORunLoop { while ({ currentIO match { - case Bind(fa, bindNext) => + case bind @ Bind(fa, bindNext, _) => + if (isStackTracing) { + if (ctx eq null) ctx = new IOContext() + val trace = bind.trace + if (trace ne null) ctx.pushEvent(trace.asInstanceOf[IOEvent]) + } if (bFirst ne null) { if (bRest eq null) bRest = new ArrayStack() bRest.push(bFirst) @@ -111,6 +127,11 @@ private[effect] object IORunLoop { } case bindNext @ Map(fa, _, _) => + if (isStackTracing) { + if (ctx eq null) ctx = new IOContext() + val trace = bindNext.trace + if (trace ne null) ctx.pushEvent(trace.asInstanceOf[IOEvent]) + } if (bFirst ne null) { if (bRest eq null) bRest = new ArrayStack() bRest.push(bFirst) @@ -118,10 +139,17 @@ private[effect] object IORunLoop { bFirst = bindNext.asInstanceOf[Bind] currentIO = fa - case async @ Async(_, _) => + case async @ Async(_, _, _) => if (conn eq null) conn = IOConnection() + // We need to initialize an IOContext because the continuation + // may produce trace frames e.g. IOBracket. + if (ctx eq null) ctx = new IOContext() if (rcb eq null) rcb = new RestartCallback(conn, cb.asInstanceOf[Callback]) - rcb.start(async, bFirst, bRest) + if (isStackTracing) { + val trace = async.trace + if (trace ne null) ctx.pushEvent(trace.asInstanceOf[IOEvent]) + } + rcb.start(async, ctx, bFirst, bRest) return case ContextSwitch(next, modify, restore) => @@ -131,8 +159,13 @@ private[effect] object IORunLoop { if (conn ne old) { if (rcb ne null) rcb.contextSwitch(conn) if (restore ne null) - currentIO = Bind(next, new RestoreContext(old, restore)) + currentIO = Bind(next, new RestoreContext(old, restore), null) } + + case Trace(source, frame) => + if (ctx eq null) ctx = new IOContext() + ctx.pushEvent(frame) + currentIO = source } if (hasUnboxed) { @@ -169,6 +202,7 @@ private[effect] object IORunLoop { var currentIO: Current = source var bFirst: Bind = null var bRest: CallStack = null + var ctx: IOContext = null // Values from Pure and Delay are unboxed in this var, // for code reuse between Pure and Delay var hasUnboxed: Boolean = false @@ -176,7 +210,12 @@ private[effect] object IORunLoop { while ({ currentIO match { - case Bind(fa, bindNext) => + case bind @ Bind(fa, bindNext, _) => + if (isStackTracing) { + if (ctx eq null) ctx = new IOContext() + val trace = bind.trace + if (trace ne null) ctx.pushEvent(trace.asInstanceOf[IOEvent]) + } if (bFirst ne null) { if (bRest eq null) bRest = new ArrayStack() bRest.push(bFirst) @@ -217,17 +256,28 @@ private[effect] object IORunLoop { } case bindNext @ Map(fa, _, _) => + if (isStackTracing) { + if (ctx eq null) ctx = new IOContext() + val trace = bindNext.trace + if (trace ne null) ctx.pushEvent(trace.asInstanceOf[IOEvent]) + } if (bFirst ne null) { if (bRest eq null) bRest = new ArrayStack() bRest.push(bFirst) } bFirst = bindNext.asInstanceOf[Bind] currentIO = fa + if (ctx eq null) ctx = new IOContext() + + case Trace(source, frame) => + if (ctx eq null) ctx = new IOContext() + ctx.pushEvent(frame) + currentIO = source case _ => // Cannot inline the code of this method — as it would // box those vars in scala.runtime.ObjectRef! - return suspendAsync(currentIO.asInstanceOf[IO[A]], bFirst, bRest) + return suspendAsync(currentIO.asInstanceOf[IO[A]], ctx, bFirst, bRest) } if (hasUnboxed) { @@ -252,11 +302,11 @@ private[effect] object IORunLoop { // $COVERAGE-ON$ } - private def suspendAsync[A](currentIO: IO[A], bFirst: Bind, bRest: CallStack): IO[A] = + private def suspendAsync[A](currentIO: IO[A], ctx: IOContext, bFirst: Bind, bRest: CallStack): IO[A] = // Encountered an instruction that can't be interpreted synchronously, // so suspend it in an Async node that can be invoked later. - Async { (conn, cb) => - loop(currentIO, conn, cb.asInstanceOf[Callback], null, bFirst, bRest) + Async { (conn, _, cb) => + loop(currentIO, conn, cb.asInstanceOf[Callback], ctx, null, bFirst, bRest) } /** @@ -329,6 +379,7 @@ private[effect] object IORunLoop { private[this] var trampolineAfter = false private[this] var bFirst: Bind = _ private[this] var bRest: CallStack = _ + private[this] var ctx: IOContext = _ // Used in combination with trampolineAfter = true private[this] var value: Either[Throwable, Any] = _ @@ -336,29 +387,33 @@ private[effect] object IORunLoop { def contextSwitch(conn: IOConnection): Unit = this.conn = conn - def start(task: IO.Async[Any], bFirst: Bind, bRest: CallStack): Unit = { + def start(task: IO.Async[Any], ctx: IOContext, bFirst: Bind, bRest: CallStack): Unit = { canCall = true this.bFirst = bFirst this.bRest = bRest this.trampolineAfter = task.trampolineAfter + this.ctx = ctx + // Go, go, go - task.k(conn, this) + task.k(conn, ctx, this) } private[this] def signal(either: Either[Throwable, Any]): Unit = { // Allow GC to collect val bFirst = this.bFirst val bRest = this.bRest + val ctx = this.ctx this.bFirst = null this.bRest = null + this.ctx = null // Auto-cancelable logic: in case the connection was cancelled, // we interrupt the bind continuation if (!conn.isCanceled) either match { case Right(success) => - loop(Pure(success), conn, cb, this, bFirst, bRest) + loop(Pure(success), conn, cb, ctx, this, bFirst, bRest) case Left(e) => - loop(RaiseError(e), conn, cb, this, bFirst, bRest) + loop(RaiseError(e), conn, cb, ctx, this, bFirst, bRest) } } diff --git a/core/shared/src/main/scala/cats/effect/internals/IOShift.scala b/core/shared/src/main/scala/cats/effect/internals/IOShift.scala index 3eccb05286..fa01e57a8b 100644 --- a/core/shared/src/main/scala/cats/effect/internals/IOShift.scala +++ b/core/shared/src/main/scala/cats/effect/internals/IOShift.scala @@ -25,7 +25,7 @@ private[effect] object IOShift { /** Implementation for `IO.shift`. */ def apply(ec: ExecutionContext): IO[Unit] = IO.Async(new IOForkedStart[Unit] { - def apply(conn: IOConnection, cb: Callback.T[Unit]): Unit = + def apply(conn: IOConnection, ctx: IOContext, cb: Callback.T[Unit]): Unit = ec.execute(new Tick(cb)) }) diff --git a/core/shared/src/main/scala/cats/effect/internals/IOStart.scala b/core/shared/src/main/scala/cats/effect/internals/IOStart.scala index 10f0711183..db00a7b96a 100644 --- a/core/shared/src/main/scala/cats/effect/internals/IOStart.scala +++ b/core/shared/src/main/scala/cats/effect/internals/IOStart.scala @@ -26,7 +26,7 @@ private[effect] object IOStart { * Implementation for `IO.start`. */ def apply[A](cs: ContextShift[IO], fa: IO[A]): IO[Fiber[IO, A]] = { - val start: Start[Fiber[IO, A]] = (_, cb) => { + val start: Start[Fiber[IO, A]] = (_, _, cb) => { // Memoization val p = Promise[Either[Throwable, A]]() diff --git a/core/shared/src/main/scala/cats/effect/internals/IOTracing.scala b/core/shared/src/main/scala/cats/effect/internals/IOTracing.scala new file mode 100644 index 0000000000..7a001e1b5a --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/internals/IOTracing.scala @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.internals + +import java.util.concurrent.ConcurrentHashMap + +import cats.effect.IO +import cats.effect.IO.Trace +import cats.effect.tracing.IOEvent + +private[effect] object IOTracing { + + def decorated[A](source: IO[A], tag: Int): IO[A] = + Trace(source, buildFrame(tag)) + + def uncached(tag: Int): IOEvent = + buildFrame(tag) + + def cached(tag: Int, clazz: Class[_]): IOEvent = + buildCachedFrame(tag, clazz) + + private def buildCachedFrame(tag: Int, clazz: Class[_]): IOEvent = { + val currentFrame = frameCache.get(clazz) + if (currentFrame eq null) { + val newFrame = buildFrame(tag) + frameCache.put(clazz, newFrame) + newFrame + } else { + currentFrame + } + } + + private def buildFrame(tag: Int): IOEvent = + IOEvent.StackTrace(tag, new Throwable()) + + /** + * Global cache for trace frames. Keys are references to lambda classes. + * Should converge to the working set of traces very quickly for hot code paths. + */ + private[this] val frameCache: ConcurrentHashMap[Class[_], IOEvent] = new ConcurrentHashMap() + +} diff --git a/core/shared/src/main/scala/cats/effect/internals/RingBuffer.scala b/core/shared/src/main/scala/cats/effect/internals/RingBuffer.scala new file mode 100644 index 0000000000..26c18fdb05 --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/internals/RingBuffer.scala @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.internals + +/** + * Provides a fast, mutable ring buffer. + * + * INTERNAL API. + */ +final private[internals] class RingBuffer[A <: AnyRef](size: Int) { + + import RingBuffer._ + + // These two probably don't need to be allocated every single time, maybe in Java? + private[this] val length = nextPowerOfTwo(size) + private[this] val mask = length - 1 + + private[this] val array: Array[AnyRef] = new Array(length) + private[this] var index: Int = 0 + + def push(a: A): A = { + val wi = index & mask + val old = array(wi).asInstanceOf[A] + array(wi) = a + index += 1 + old + } + + def isEmpty: Boolean = + index == 0 + + def capacity: Int = + length + + def toList: List[A] = { + val end = index + val start = Math.max(end - length, 0) + (start until end).toList + .map(i => array(i & mask).asInstanceOf[A]) + } + +} + +object RingBuffer { + + // N.B. this can overflow + private def nextPowerOfTwo(i: Int): Int = { + var n = 1 + while (n < i) { + n *= 2 + } + n + } + +} diff --git a/core/shared/src/main/scala/cats/effect/internals/package.scala b/core/shared/src/main/scala/cats/effect/internals/package.scala index 5590d0a4c3..a24e14033c 100644 --- a/core/shared/src/main/scala/cats/effect/internals/package.scala +++ b/core/shared/src/main/scala/cats/effect/internals/package.scala @@ -22,5 +22,5 @@ package object internals { * Handy alias for the registration functions of [[IO.Async]]. */ private[effect] type Start[+A] = - (IOConnection, Callback.T[A]) => Unit + (IOConnection, IOContext, Callback.T[A]) => Unit } diff --git a/core/shared/src/main/scala/cats/effect/tracing/IOEvent.scala b/core/shared/src/main/scala/cats/effect/tracing/IOEvent.scala new file mode 100644 index 0000000000..624e01432c --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/tracing/IOEvent.scala @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.tracing + +sealed abstract class IOEvent + +object IOEvent { + + final case class StackTrace(tag: Int, throwable: Throwable) extends IOEvent { + def stackTrace: List[StackTraceElement] = + throwable.getStackTrace().toList + } + +} diff --git a/core/shared/src/main/scala/cats/effect/tracing/IOTrace.scala b/core/shared/src/main/scala/cats/effect/tracing/IOTrace.scala new file mode 100644 index 0000000000..c86f7dd2bf --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/tracing/IOTrace.scala @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.tracing + +import cats.effect.IO + +final case class IOTrace(events: List[IOEvent], captured: Int, omitted: Int) { + + import IOTrace._ + + def printFiberTrace(options: PrintingOptions = PrintingOptions.Default): IO[Unit] = + IO(System.err.println(showFiberTrace(options))) + + def showFiberTrace(options: PrintingOptions = PrintingOptions.Default): String = { + val TurnRight = "╰" + val InverseTurnRight = "╭" + val Junction = "├" + val Line = "│" + + if (options.showFullStackTraces) { + val stackTraces = events.collect { case e: IOEvent.StackTrace => e } + + val header = s"IOTrace: $captured frames captured, $omitted omitted\n" + val body = stackTraces.zipWithIndex + .map { + case (st, index) => + val nameTag = tagToName(st.tag) + val op = if (index == 0) s"$InverseTurnRight $nameTag\n" else s"$Junction $nameTag\n" + val relevantLines = st.stackTrace + .drop(options.ignoreStackTraceLines) + .take(options.maxStackTraceLines) + val lines = relevantLines.zipWithIndex + .map { + case (ste, i) => + val junc = if (i == relevantLines.length - 1) TurnRight else Junction + val codeLine = renderStackTraceElement(ste) + s"$Line $junc $codeLine" + } + .mkString("", "\n", "\n") + + s"$op$lines$Line" + } + .mkString("\n") + + header + body + } else { + val acc0 = s"IOTrace: $captured frames captured, $omitted omitted\n" + val acc1 = events.zipWithIndex + .map { + case (event, index) => + val junc = if (index == events.length - 1) TurnRight else Junction + val message = event match { + case ev: IOEvent.StackTrace => { + val first = bestTraceElement(ev.stackTrace) + val nameTag = tagToName(ev.tag) + val codeLine = first.map(renderStackTraceElement).getOrElse("(...)") + s"$nameTag at $codeLine" + } + } + s" $junc $message" + } + .mkString(acc0, "\n", "\n") + + acc1 + } + } +} + +private[effect] object IOTrace { + + // Number of lines to drop from the top of the stack trace + def stackTraceIgnoreLines = 3 + + private[this] val anonfunRegex = "^\\$+anonfun\\$+(.+)\\$+\\d+$".r + + private[this] val stackTraceFilter = List( + "cats.effect.", + "cats.", + "sbt.", + "java.", + "sun.", + "scala." + ) + + private def renderStackTraceElement(ste: StackTraceElement): String = { + val methodName = demangleMethod(ste.getMethodName) + s"${ste.getClassName}.$methodName (${ste.getFileName}:${ste.getLineNumber})" + } + + private def bestTraceElement(frames: List[StackTraceElement]): Option[StackTraceElement] = + frames.dropWhile(l => stackTraceFilter.exists(b => l.getClassName.startsWith(b))).headOption + + private def demangleMethod(methodName: String): String = + anonfunRegex.findFirstMatchIn(methodName) match { + case Some(mat) => mat.group(1) + case None => methodName + } + + private def tagToName(tag: Int): String = + tag match { + case 0 => "pure" + case 1 => "delay" + case 2 => "suspend" + case 3 => "flatMap" + case 4 => "map" + case 5 => "async" + case 6 => "asyncF" + case 7 => "cancelable" + case 8 => "raiseError" + case _ => "???" + } +} diff --git a/core/shared/src/main/scala/cats/effect/tracing/PrintingOptions.scala b/core/shared/src/main/scala/cats/effect/tracing/PrintingOptions.scala new file mode 100644 index 0000000000..8686384a5f --- /dev/null +++ b/core/shared/src/main/scala/cats/effect/tracing/PrintingOptions.scala @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.tracing + +/** + * @param showFullStackTraces Whether or not to show the entire stack trace + * @param maxStackTraceLines When `showFullStackTraces` is `true`, the maximum number of stack trace + * elements to print + * @param ignoreStackTraceLines When `showFullStackTraces` is `true`, the number of stack trace elements + * to ignore from the start + */ +final case class PrintingOptions private (showFullStackTraces: Boolean, + maxStackTraceLines: Int, + ignoreStackTraceLines: Int) { + def withShowFullStackTraces(showFullStackTraces: Boolean): PrintingOptions = + copy(showFullStackTraces = showFullStackTraces) + + def withMaxStackTraceLines(maxStackTraceLines: Int): PrintingOptions = + copy(maxStackTraceLines = maxStackTraceLines) + + def withIgnoreStackTraceLines(ignoreStackTraceLines: Int): PrintingOptions = + copy(ignoreStackTraceLines = ignoreStackTraceLines) +} + +object PrintingOptions { + val Default = PrintingOptions( + showFullStackTraces = false, + maxStackTraceLines = Int.MaxValue, + ignoreStackTraceLines = 3 // the number of frames to ignore because of IOTracing + ) +} diff --git a/core/shared/src/test/scala/cats/effect/internals/IOContextTests.scala b/core/shared/src/test/scala/cats/effect/internals/IOContextTests.scala new file mode 100644 index 0000000000..62b4073ad0 --- /dev/null +++ b/core/shared/src/test/scala/cats/effect/internals/IOContextTests.scala @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.internals + +import cats.effect.tracing.IOEvent +import org.scalatest.matchers.should.Matchers +import org.scalatest.funsuite.AnyFunSuite + +class IOContextTests extends AnyFunSuite with Matchers { + + val traceBufferSize: Int = cats.effect.internals.TracingPlatform.traceBufferSize + val throwable = new Throwable() + + test("push traces") { + val ctx = new IOContext() + + val t1 = IOEvent.StackTrace(0, throwable) + val t2 = IOEvent.StackTrace(1, throwable) + + ctx.pushEvent(t1) + ctx.pushEvent(t2) + + val trace = ctx.trace + trace.events shouldBe List(t1, t2) + trace.captured shouldBe 2 + trace.omitted shouldBe 0 + } + + test("track omitted frames") { + val ctx = new IOContext() + + for (_ <- 0 until (traceBufferSize + 10)) { + ctx.pushEvent(IOEvent.StackTrace(0, throwable)) + } + + val trace = ctx.trace() + trace.events.length shouldBe traceBufferSize + trace.captured shouldBe (traceBufferSize + 10) + trace.omitted shouldBe 10 + } + +} diff --git a/core/shared/src/test/scala/cats/effect/internals/RingBufferTests.scala b/core/shared/src/test/scala/cats/effect/internals/RingBufferTests.scala new file mode 100644 index 0000000000..e10f74d049 --- /dev/null +++ b/core/shared/src/test/scala/cats/effect/internals/RingBufferTests.scala @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect.internals + +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +class RingBufferTests extends AnyFunSuite with Matchers with TestUtils { + test("empty ring buffer") { + val buffer = new RingBuffer[Integer](2) + buffer.isEmpty shouldBe true + } + + test("non-empty ring buffer") { + val buffer = new RingBuffer[Integer](2) + buffer.push(0) + buffer.isEmpty shouldBe false + } + + test("size is a power of two") { + new RingBuffer[Integer](0).capacity shouldBe 1 + new RingBuffer[Integer](1).capacity shouldBe 1 + new RingBuffer[Integer](2).capacity shouldBe 2 + new RingBuffer[Integer](3).capacity shouldBe 4 + new RingBuffer[Integer](127).capacity shouldBe 128 + new RingBuffer[Integer](13000).capacity shouldBe 16384 + } + + test("writing elements") { + val buffer = new RingBuffer[Integer](4) + for (i <- 0 to 3) buffer.push(i) + buffer.toList shouldBe List(0, 1, 2, 3) + } + + test("overwriting elements") { + val buffer = new RingBuffer[Integer](4) + for (i <- 0 to 100) buffer.push(i) + buffer.toList shouldBe List(97, 98, 99, 100) + } +} diff --git a/laws/shared/src/main/scala/cats/effect/laws/util/TestContext.scala b/laws/shared/src/main/scala/cats/effect/laws/util/TestContext.scala index 3354140416..d46b7f8485 100644 --- a/laws/shared/src/main/scala/cats/effect/laws/util/TestContext.scala +++ b/laws/shared/src/main/scala/cats/effect/laws/util/TestContext.scala @@ -17,7 +17,7 @@ package cats.effect.laws.util import cats.effect.internals.Callback.T -import cats.effect.internals.{IOConnection, IOForkedStart, IOShift} +import cats.effect.internals.{IOConnection, IOContext, IOForkedStart, IOShift} import cats.effect._ import scala.collection.immutable.SortedSet @@ -190,7 +190,7 @@ final class TestContext private () extends ExecutionContext { self => override def shift: F[Unit] = F.liftIO(IO.Async(new IOForkedStart[Unit] { - def apply(conn: IOConnection, cb: T[Unit]): Unit = + def apply(conn: IOConnection, ctx: IOContext, cb: T[Unit]): Unit = self.execute(tick(cb)) })) diff --git a/laws/shared/src/test/scala/cats/effect/IOTests.scala b/laws/shared/src/test/scala/cats/effect/IOTests.scala index f8f2a94d5f..56e6d65343 100644 --- a/laws/shared/src/test/scala/cats/effect/IOTests.scala +++ b/laws/shared/src/test/scala/cats/effect/IOTests.scala @@ -527,14 +527,6 @@ class IOTests extends BaseTestsSuite { io.unsafeRunSync() shouldBe 1 } - test("map is stack-safe for unsafeRunSync") { - import IOPlatform.{fusionMaxStackDepth => max} - val f = (x: Int) => x + 1 - val io = (0 until (max * 10000)).foldLeft(IO(0))((acc, _) => acc.map(f)) - - io.unsafeRunSync() shouldEqual max * 10000 - } - testAsync("parMap2 for successful values") { implicit ec => implicit val cs: ContextShift[IO] = ec.ioContextShift diff --git a/laws/shared/src/test/scala/cats/effect/SyncIOTests.scala b/laws/shared/src/test/scala/cats/effect/SyncIOTests.scala index aaa1daf0c2..43e5a59566 100644 --- a/laws/shared/src/test/scala/cats/effect/SyncIOTests.scala +++ b/laws/shared/src/test/scala/cats/effect/SyncIOTests.scala @@ -17,7 +17,6 @@ package cats package effect -import cats.effect.internals.IOPlatform import cats.effect.laws.discipline.SyncEffectTests import cats.effect.laws.discipline.arbitrary._ import cats.implicits._ @@ -154,14 +153,6 @@ class SyncIOTests extends BaseTestsSuite { io.unsafeRunSync() shouldEqual 2 } - test("map is stack-safe for unsafeRunSync") { - import IOPlatform.{fusionMaxStackDepth => max} - val f = (x: Int) => x + 1 - val io = (0 until (max * 10000)).foldLeft(SyncIO(0))((acc, _) => acc.map(f)) - - io.unsafeRunSync() shouldEqual max * 10000 - } - testAsync("IO#redeem(throw, f) <-> IO#map") { implicit ec => check { (io: IO[Int], f: Int => Int) => io.redeem(e => throw e, f) <-> io.map(f) diff --git a/site/src/main/mdoc/guides/tracing.md b/site/src/main/mdoc/guides/tracing.md new file mode 100644 index 0000000000..52ae266b89 --- /dev/null +++ b/site/src/main/mdoc/guides/tracing.md @@ -0,0 +1,205 @@ +--- +layout: docsplus +title: "Tracing" +position: 2 +--- + + + +## Introduction +Tracing is an advanced feature of `IO` that offers insight into the execution +graph of a fiber. This unlocks a lot of power for developers in the realm of +debugging and introspection, not only in local development environments +but also in critical production settings. + +A notable pain point of working with asynchronous code on the JVM is that +stack traces no longer provide valuable context of the execution path that +a program takes. This limitation is even more pronounced with Scala's `Future` +(pre- 2.13), where an asynchronous boundary is inserted after each operation. +`IO` suffers a similar problem, but even a synchronous `IO` program's stack +trace is polluted with the details of the `IO` run-loop. + +`IO` solves this problem by collecting a stack trace at various `IO` +operations that a fiber executes, and knitting them together to produce a more +coherent view of the fiber's execution path. For example, here is a trace of a +sample program that is running in cached stack tracing mode: + +``` +IOTrace: 13 frames captured, 0 omitted + ├ flatMap at org.simpleapp.example.Example.run (Example.scala:67) + ├ flatMap at org.simpleapp.example.Example.program (Example.scala:57) + ├ flatMap at org.simpleapp.example.Example.program (Example.scala:58) + ├ flatMap at org.simpleapp.example.Example.program (Example.scala:59) + ├ flatMap at org.simpleapp.example.Example.program (Example.scala:60) + ├ async at org.simpleapp.example.Example.program (Example.scala:60) + ├ flatMap at org.simpleapp.example.Example.program (Example.scala:61) + ├ flatMap at org.simpleapp.example.Example.program (Example.scala:60) + ├ flatMap at org.simpleapp.example.Example.program2 (Example.scala:51) + ├ map at org.simpleapp.example.Example.program2 (Example.scala:52) + ├ map at org.simpleapp.example.Example.program (Example.scala:60) + ├ map at org.simpleapp.example.Example.program (Example.scala:62) + ╰ flatMap at org.simpleapp.example.Example.run (Example.scala:67) +``` + +However, fiber tracing isn't limited to collecting stack traces. Tracing +has many use cases that improve developer experience and aid in understanding +how our applications work. These features are described below. A **bolded name** +indicates that the feature has been merged into master. + +1. **Asynchronous stack tracing**. This is essentially what is described above, +where stack frames are collected across asynchronous boundaries for a given +fiber. +2. Combinator inference. Stack traces can be walked to determine what +combinator was actually called by user code. For example, `void` and `as` are +combinators that are derived from `map`, and should appear in the fiber trace +rather than `map`. +3. Intermediate values. The intermediate values that an `IO` program encounters +can be converted to a string to render. This can aid in understanding the +actions that a program takes. +4. Thread tracking. A fiber is scheduled on potentially many threads throughout +its lifetime. Knowing what thread a fiber is running on, and when it shifts +threads is a powerful tool for understanding and debugging the concurrency of +an application. +5. Tree rendering. By collecting a trace of all `IO` operations, a pretty tree +or graph can be rendered to visualize fiber execution. +6. Fiber identity. Fibers, like threads, are unique and can therefore assume an +identity. If user code can observe fiber identity, powerful observability tools +can be built on top of it. For example, another shortcoming of asynchronous +code is that it becomes tedious to correlate log messages across asynchronous +boundaries (thread IDs aren't very useful). With fiber identity, log messages +produced by a single fiber can be associated with a unique, stable identifier. +7. Fiber ancestry graph. If fibers can assume an identity, an ancestry graph +can be formed, where nodes are fibers and edges represent a fork/join +relationship. +8. Asynchronous deadlock detection. Even when working with asynchronously +blocking code, fiber deadlocks aren't impossible. Being able to detect +deadlocks or infer when a deadlock can happen makes writing concurrent code +much easier. +9. Live fiber trace dumps. Similar to JVM thread dumps, the execution status +and trace information of all fibers in an application can be extracted for +debugging purposes. +10. Monad transformer analysis. + +As note of caution, fiber tracing generally introduces overhead to +applications in the form of higher CPU usage, memory and GC pressure. +Always remember to performance test your applications with tracing enabled +before deploying it to a production environment! + +## Asynchronous stack tracing +### Configuration +The stack tracing mode of an application is configured by the system property +`cats.effect.stackTracingMode`. There are three stack tracing modes: `DISABLED`, +`CACHED` and `FULL`. These values are case-insensitive. + +To prevent unbounded memory usage, stack traces for a fiber are accumulated +in an internal buffer as execution proceeds. If more traces are collected than +the buffer can retain, then the older traces will be overwritten. The default +size for the buffer is 128, but can be changed via the system property +`cats.effect.traceBufferSize`. Keep in mind that the buffer size will always +be rounded up to a power of 2. + +For example, to enable full stack tracing and a trace buffer size of 1024, +specify the following system properties: +``` +-Dcats.effect.stackTracingMode=full -Dcats.effect.traceBufferSize=1024 +``` + +#### DISABLED +No tracing is instrumented by the program and so incurs negligible impact to +performance. If a trace is requested, it will be empty. + +#### CACHED +When cached stack tracing is enabled, a stack trace is captured and cached for +every `map`, `flatMap` and `async` call in a program. + +The stack trace cache is indexed by the lambda class reference, so cached +tracing may produce inaccurate fiber traces under several scenarios: +1. Monad transformer composition +2. A named function is supplied to `map`, `async` or `flatMap` at multiple +call-sites + +We measured less than a 30% performance hit when cached tracing is enabled +for a completely synchronous `IO` program, but it will most likely be much less +for any program that performs any sort of I/O. We strongly recommend +benchmarking applications that make use of tracing. + +This is the recommended mode to run in most production applications and is +enabled by default. + +#### FULL +When full stack tracing is enabled, a stack trace is captured for most `IO` +combinators including `pure`, `delay`, `suspend`, `raiseError` as well as those +traced in cached mode. + +Stack traces are collected *on every invocation*, so naturally most programs +will experience a significant performance hit. This mode is mainly useful for +debugging in development environments. + +### Requesting and printing traces +Once the global tracing flag is configured, `IO` programs will automatically +begin collecting traces. The trace for a fiber can be accessed at any point +during its execution via the `IO.trace` combinator. This is the `IO` equivalent +of capturing a thread's stack trace. + +After we have a fiber trace, we can print it to the console, not unlike how +Java exception stack traces are printed with `printStackTrace`. `printFiberTrace` +can be called to print fiber traces to the consoles. Printing behavior can be +customized by passing in a `PrintingOptions` instance. By default, a fiber trace +is rendered in a very compact presentation that includes the most relevant stack +trace element from each fiber operation. + +```scala +import cats.effect.IO + +def program: IO[Unit] = + for { + _ <- IO(println("Started the program")) + trace <- IO.trace + _ <- trace.printFiberTrace() + } yield () +``` + +Keep in mind that the scope and amount of information that traces hold will +change over time as additional fiber tracing features are merged into master. + +### Complete example +Here is a sample program that demonstrates tracing in action. + +```scala +// Pass the following system property to your JVM: +// -Dcats.effect.stackTracingMode=full + +import cats.effect.tracing.PrintingOptions +import cats.implicits._ +import cats.effect.{ExitCode, IO, IOApp} + +import scala.util.Random + +object Example extends IOApp { + + val options = PrintingOptions.Default + .withShowFullStackTraces(true) + .withMaxStackTraceLines(8) + + def fib(n: Int, a: Long = 0, b: Long = 1): IO[Long] = + IO(a + b).flatMap { b2 => + if (n > 0) + fib(n - 1, b, b2) + else + IO.pure(b2) + } + + def program: IO[Unit] = + for { + x <- fib(20) + _ <- IO(println(s"The 20th fibonacci number is $x")) + _ <- IO(Random.nextBoolean()).ifM(IO.raiseError(new Throwable("")), IO.unit) + } yield () + + override def run(args: List[String]): IO[ExitCode] = + for { + _ <- program.handleErrorWith(_ => IO.trace.flatMap(_.printFiberTrace(options))) + } yield ExitCode.Success + +} +``` diff --git a/site/src/main/mdoc/tutorial/tutorial.md b/site/src/main/mdoc/guides/tutorial.md similarity index 100% rename from site/src/main/mdoc/tutorial/tutorial.md rename to site/src/main/mdoc/guides/tutorial.md diff --git a/site/src/main/resources/microsite/data/menu.yml b/site/src/main/resources/microsite/data/menu.yml index 4a6efc8ba8..f4bdd4615d 100644 --- a/site/src/main/resources/microsite/data/menu.yml +++ b/site/src/main/resources/microsite/data/menu.yml @@ -88,6 +88,14 @@ options: url: typeclasses/concurrent-effect.html menu_section: typeclasses - - title: Tutorial - url: tutorial/tutorial.html - menu_section: tutorial + - title: Guides + url: guides/ + menu_section: guides + + nested_options: + - title: Tracing + url: guides/tracing.html + menu_section: guides + - title: Tutorial + url: guides/tutorial.html + menu_section: guides diff --git a/tracing-tests/src/fulltracing/scala/cats/effect/FullStackTracingTests.scala b/tracing-tests/src/fulltracing/scala/cats/effect/FullStackTracingTests.scala new file mode 100644 index 0000000000..b882a12475 --- /dev/null +++ b/tracing-tests/src/fulltracing/scala/cats/effect/FullStackTracingTests.scala @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect + +import cats.effect.tracing.{IOEvent, IOTrace} +import org.scalatest.funsuite.AsyncFunSuite +import org.scalatest.matchers.should.Matchers + +import scala.concurrent.ExecutionContext + +class FullStackTracingTests extends AsyncFunSuite with Matchers { + implicit override def executionContext: ExecutionContext = ExecutionContext.Implicits.global + implicit val timer: Timer[IO] = IO.timer(executionContext) + implicit val cs: ContextShift[IO] = IO.contextShift(executionContext) + + def traced[A](io: IO[A]): IO[IOTrace] = + io.flatMap(_ => IO.trace) + + test("full stack tracing captures map frames") { + val task = IO.pure(0).map(_ + 1).map(_ + 1) + + for (r <- traced(task).unsafeToFuture()) yield { + r.captured shouldBe 4 + r.events.collect { case e: IOEvent.StackTrace => e }.filter(_.tag == 4).length shouldBe 2 + } + } + + test("full stack tracing captures bind frames") { + val task = IO.pure(0).flatMap(a => IO(a + 1)).flatMap(a => IO(a + 1)) + + for (r <- traced(task).unsafeToFuture()) yield { + r.captured shouldBe 6 + r.events.collect { case e: IOEvent.StackTrace => e }.filter(_.tag == 3).length shouldBe 3 // the extra one is used to capture the trace + } + } + + test("full stack tracing captures async frames") { + val task = IO.async[Int](_(Right(0))).flatMap(a => IO(a + 1)).flatMap(a => IO(a + 1)) + + for (r <- traced(task).unsafeToFuture()) yield { + r.captured shouldBe 6 + r.events.collect { case e: IOEvent.StackTrace => e }.filter(_.tag == 5).length shouldBe 1 + } + } + + test("full stack tracing captures pure frames") { + val task = IO.pure(0).flatMap(a => IO.pure(a + 1)) + + for (r <- traced(task).unsafeToFuture()) yield { + r.captured shouldBe 4 + r.events.collect { case e: IOEvent.StackTrace => e }.filter(_.tag == 0).length shouldBe 2 + } + } + + test("full stack tracing captures delay frames") { + val task = IO(0).flatMap(a => IO(a + 1)) + + for (r <- traced(task).unsafeToFuture()) yield { + r.captured shouldBe 4 + r.events.collect { case e: IOEvent.StackTrace => e }.filter(_.tag == 1).length shouldBe 2 + } + } + + test("full stack tracing captures suspend frames") { + val task = IO.suspend(IO(1)).flatMap(a => IO.suspend(IO(a + 1))) + + for (r <- traced(task).unsafeToFuture()) yield { + r.captured shouldBe 6 + r.events.collect { case e: IOEvent.StackTrace => e }.filter(_.tag == 2).length shouldBe 2 + } + } + + test("full stack tracing captures raiseError frames") { + val task = IO(0).flatMap(_ => IO.raiseError(new Throwable())).handleErrorWith(_ => IO.unit) + + for (r <- traced(task).unsafeToFuture()) yield { + r.captured shouldBe 5 + r.events.collect { case e: IOEvent.StackTrace => e }.filter(_.tag == 8).length shouldBe 1 + } + } +} diff --git a/tracing-tests/src/test/scala/cats/effect/CachedStackTracingTests.scala b/tracing-tests/src/test/scala/cats/effect/CachedStackTracingTests.scala new file mode 100644 index 0000000000..9938203663 --- /dev/null +++ b/tracing-tests/src/test/scala/cats/effect/CachedStackTracingTests.scala @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect + +import cats.effect.tracing.{IOEvent, IOTrace} +import org.scalatest.funsuite.AsyncFunSuite +import org.scalatest.matchers.should.Matchers + +import scala.concurrent.ExecutionContext + +class CachedStackTracingTests extends AsyncFunSuite with Matchers { + implicit override def executionContext: ExecutionContext = ExecutionContext.Implicits.global + implicit val timer: Timer[IO] = IO.timer(executionContext) + implicit val cs: ContextShift[IO] = IO.contextShift(executionContext) + + def traced[A](io: IO[A]): IO[IOTrace] = + io.flatMap(_ => IO.trace) + + test("cached stack tracing captures map frames") { + val task = IO.pure(0).map(_ + 1).map(_ + 1) + + for (r <- traced(task).unsafeToFuture()) yield { + r.captured shouldBe 3 + r.events.collect { case e: IOEvent.StackTrace => e }.filter(_.tag == 4).length shouldBe 2 + } + } + + test("cached stack tracing captures bind frames") { + val task = IO.pure(0).flatMap(a => IO(a + 1)).flatMap(a => IO(a + 1)) + + for (r <- traced(task).unsafeToFuture()) yield { + r.captured shouldBe 3 + r.events + .collect { case e: IOEvent.StackTrace => e } + .filter(_.tag == 3) + .length shouldBe 3 // extra one is to capture the trace + } + } + + test("cached stack tracing captures async frames") { + val task = IO.async[Int](_(Right(0))).flatMap(a => IO(a + 1)).flatMap(a => IO(a + 1)) + + for (r <- traced(task).unsafeToFuture()) yield { + r.captured shouldBe 4 + r.events.collect { case e: IOEvent.StackTrace => e }.filter(_.tag == 5).length shouldBe 1 + } + } +} diff --git a/tracing-tests/src/test/scala/cats/effect/TracingTests.scala b/tracing-tests/src/test/scala/cats/effect/TracingTests.scala new file mode 100644 index 0000000000..4518089439 --- /dev/null +++ b/tracing-tests/src/test/scala/cats/effect/TracingTests.scala @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2017-2019 The Typelevel Cats-effect Project Developers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cats.effect + +import cats.effect.tracing.IOTrace +import org.scalatest.funsuite.AsyncFunSuite +import org.scalatest.matchers.should.Matchers + +import scala.concurrent.ExecutionContext + +class TracingTests extends AsyncFunSuite with Matchers { + implicit override def executionContext: ExecutionContext = ExecutionContext.Implicits.global + implicit val timer: Timer[IO] = IO.timer(executionContext) + implicit val cs: ContextShift[IO] = IO.contextShift(executionContext) + + def traced[A](io: IO[A]): IO[IOTrace] = + io.flatMap(_ => IO.trace) + + test("traces are preserved across asynchronous boundaries") { + val task = for { + a <- IO.pure(1) + _ <- IO.shift + b <- IO.pure(1) + } yield a + b + + for (r <- traced(task).unsafeToFuture()) yield { + r.captured shouldBe 4 + } + } +}