Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Fiber local state PoC #836

Closed
wants to merge 6 commits into from

Conversation

RaasAhsan
Copy link

@RaasAhsan RaasAhsan commented Apr 17, 2020

This a PoC for supporting fiber-local state inside cats.effect.IO. I began to implement some of the ideas I laid out in #828 . I didn't get much feedback in the original issue, so hopefully there can be some more discussion here. I'd certainly like to hear whether or not this even belongs in this library.

Changes

  • Added a new IO primitive operation, FiberLocal, which ideally encapsulates all fiber-local behavior
  • Support FiberLocal instruction in the IO interpreter.
  • Added a new type FiberRef that leverages the new IO op to implement fiber local state. Unfortunately this is specific to IO.

Usage

def printLine[A: Show](msg: A): IO[Unit] =
    IO.delay(println(msg.show))

  def program: IO[Unit] =
    for {
      ref <- FiberRef.of(5)
      a1  <- ref.get
      _   <- printLine(a1)
      _   <- ref.set(10)
      a2  <- ref.get
      _   <- printLine(a2)
    } yield ()

TODO/Ideas

  • Tests
  • Docs
  • Cleanup
  • Java ThreadLocal integration?
  • think about forking behavior

Adding onto some of the ideas in #828, you could start to implement some very basic, runtime non-invasive, application tracing. You could implement a userspace Tracer type with a tracepoint method that captures slices of stack traces and holds it in fiber-local state

cc @djspiewak @alexandru

@djspiewak
Copy link
Member

Oooooh! 👀

@@ -1622,4 +1624,6 @@ object IO extends IOInstances {
override def recover(e: Throwable) =
Pure(Left(e))
}

final private[effect] case class FiberLocal[+A](k: mutable.HashMap[FiberRefId, AnyRef] => A) extends IO[A]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably doesn't need to be mapping function. It could return the state directly.
Using a mutable.HashMap might also have implications on thread safety

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe Introspect is a better name

@RaasAhsan
Copy link
Author

An alternative implementation I had in mind was to give the responsibility of storage to each FiberRef. So each FiberRef would keep a reference to some ConcurrentHashMap-like data structure that indexes values by a unique fiber ID. The benefit of this approach is that it only requires the fiber identity to be supported by the runtime.

On the other hand, it creates a more complex public API where users have to interact with fiber-local state via Resource, since state isn't automatically garbage collected when a fiber exits anymore. But I think we also lose the ability to do any useful state manipulation on forks and joins.

@RaasAhsan RaasAhsan mentioned this pull request Apr 19, 2020
@@ -1593,7 +1593,7 @@ object IO extends IOInstances {
* signal downstream
*/
final private[effect] case class Async[+A](
k: (IOConnection, Either[Throwable, A] => Unit) => Unit,
k: (IOConnection, IOContext, Either[Throwable, A] => Unit) => Unit,
Copy link
Author

@RaasAhsan RaasAhsan Apr 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up having to supply the IOContext through the Async constructor so it could be threaded through multiple asynchronous operations that are logically linked to the same fiber (IOBracket implementation). Not sure if this totally makes sense, but it eliminates the Introspect constructor for now.

Ideally we could pass state via method parameters, but that would be a more invasive change given how the run-time is implemented. right now. I'm also not happy that IOContext has to be thread-safe now. But this implementation works.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe #681 and a revise of the bracket implementation would fix this.

* Represents the state of execution for a
* single fiber.
*/
final private[effect] class IOContext {
Copy link
Author

@RaasAhsan RaasAhsan Apr 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IOContext encapsulates state around the execution of a logical fiber. Its a thread-safe implementation for now, but ideally it doesn't have to be since it should only ever be accessed by one thread at a time. It could also hold tracing state in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would assume it's okay to not care about thread safety in this case. But I might be missing something.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that IOContext needs to be threaded through multiple run-loops because of the IOBracket implementation, which also leverages the Async constructor. The "secondary" run-loops can manipulate fiber-local state on another thread (!!), and those changes need to propagate back to the original run-loop after the async callback is invoked. However, the current API doesn't lend itself to passing state to and back without some significant refactors. So the compromise for now is to make IOContext thread-safe so multiple threads/run-loops can access and manipulate that state safely.

Copy link
Member

@kubukoz kubukoz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm really excited about this!

As for the behavior in forking, I think we could make it customizable per ref. Some things you might want to pass to children in a copy of the context (e.g. trace/span ID) but some would be local to each fiber (I can't think of an example right now but I'm sure someone will have reasons).

Comment on lines +23 to +27
def get[A](ref: FiberRef[A]): IO[Option[A]] =
IO.Async { (_, ctx, cb) =>
val k = ref.asInstanceOf[FiberRef[Any]]
val v = ctx.getLocal(k).map(_.asInstanceOf[A])
cb(Right(v))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll need to come up with some mechanism to ensure more type safety, but we can leave that till we've decided on the semantics :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree. Fortunately, IOFiberRef and IOContext are intended to be internal APIs, so end users can't access it. The implementations of FiberRef and IOFiberRef guarantee that you can never perform an invalid cast, since the object reference itself is used as a key.


object IOContext {

def apply(): IOContext =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaks referential transparency so probably unsafeApply would be more apt

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intended to be an internal API, similar to IOConnection, which is implemented similarly. Probably need to add that as a comment

* Represents the state of execution for a
* single fiber.
*/
final private[effect] class IOContext {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would assume it's okay to not care about thread safety in this case. But I might be missing something.

*/
final private[effect] class IOContext {

// TODO: lazy?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was just a thought but probably not necessary 😅

@kubukoz
Copy link
Member

kubukoz commented May 2, 2020

@RaasAhsan could you add an example somewhere in the code? It'd be useful to see how it might look like in practice.

@RaasAhsan
Copy link
Author

RaasAhsan commented May 2, 2020

As for the behavior in forking, I think we could make it customizable per ref. Some things you might want to pass to children in a copy of the context (e.g. trace/span ID) but some would be local to each fiber (I can't think of an example right now but I'm sure someone will have reasons).

I agree, I think that would be the best way to unify these to find some sort of abstraction. Some variants I've thought of:

  1. Copy on merge - pass along a copy state to children. the parent and children can continue manipulating that state without either observing the others changes
  2. Copy on merge + merge on fork - pass along children to state, supply a merge function to combine them again on join (ZIO semantics)
  3. Shared reference - parents and children manipulate the same ref. almost like a Ref but its scoped to lineage of a fiber (Monix semantics)
  4. Isolated reference - no copying on fork, no merge on join. this is basically the PR implementation right now.

It's worth pointing out that option 2 is a special case of option 1 where the merge function merely drops the joined fiber's state value.

There are probably use cases for all of these, but like you said, I can only think of a few right now :)

could you add an example somewhere in the code? It'd be useful to see how it might look like in practice.

Added one in the coreJVM scope. let me know if that works for you

@RaasAhsan RaasAhsan mentioned this pull request May 16, 2020
@djspiewak djspiewak changed the base branch from master to series/2.x July 16, 2020 17:36
@oleg-py
Copy link
Contributor

oleg-py commented Aug 5, 2020

As a main proponent for "monix semantics" in monix, I want to point out that we still have this as a law:
https://github.com/typelevel/cats-effect/blob/series/2.x/laws/shared/src/main/scala/cats/effect/laws/ConcurrentLaws.scala#L92

  def startJoinIsIdentity[A](fa: F[A]) =
    F.start(fa).flatMap(_.join) <-> fa

And you can observe the violation for val fa = aFiberRef.set("foo")

If we want to keep the law (and that's a question), then the only reasonable behavior on join would be to actually replace the values from joined fiber into the current one, with no special logic. It's not enough for practical reasons though, so we have TaskLocal.isolate that makes a snapshot of a state and restores it afterwards, so composed with start it makes all fiber-local state have copy on fork + discard on join semantics.

It's a bit of a pain though, and configurable refs could reduce it, but not entirely. For example, monix TaskLocal is used in quill to keep the current transaction, and whether you want sharing or isolated reference depends on whether you are going to join the child fibers in the parent or they outlive it

@RaasAhsan RaasAhsan mentioned this pull request Oct 5, 2020
@RaasAhsan
Copy link
Author

I'm going to close this because CE3 will reach stable soon and we have a much simpler implementation for it in #1393

@RaasAhsan RaasAhsan closed this Nov 16, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants