Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First cut at Dispatcher #1303

Merged
merged 18 commits into from
Oct 18, 2020
Merged

Conversation

djspiewak
Copy link
Member

The idea here is actually pretty obvious in retrospect: a non-blocking supervised fiber-based dispatcher for sequencing effects. State is being managed by a pair of AtomicReferences: one of which contains an optional callback (and also encodes a double-check state to resolve races), while the other contains a LongMap with the registered effects. The dispatcher fiber loops continuously, checking the state for new entries. If none are available, it creates an async node and registers the callback in the latch reference, double-checking the state to ensure that nothing was written to the state reference in the interim.

If any new work is encountered, the dispatcher creates a fiber for each registered effect (which must be of type F[Unit]) and places that fiber into a Ref which is folded into the outer Resource scope, ensuring that child fibers are appropriately cleaned up when the dispatcher is shut down (note: here's a good and unavoidable use of the start function). Cancelation is carefully checked here.

Note that any use of the Runner post-shutdown will result in deadlocks and memory leaks. I don't check for this at present.

Anyway, on the calling side, all effects of type F[E] (for some E) are wrapped in the appropriate machinery to populate a Promise upon completion. Additionally, a bit of extra machinery is constructed to ensure that the state is unregistered if the cancelation function is called, or if the effect has already been started, that cancelation is then passed along to the carrier fiber.

We don't actually need to fully encode a queue or anything like it since we can just spawn as many fibers as we want. When I realized this, I was able to greatly simplify the state machine. In abstract, it's really just an async latch which can be fired from impure code, together with a list of F[Unit] actions with callbacks to tie up cancelation.

I want to add a few more things to the respective RunnerPlatforms, but this is pretty much ready for review.

Resolves #1299

@djspiewak djspiewak added the CE 3 label Oct 12, 2020
@djspiewak djspiewak changed the title Feature/dispatcher First cut at Dispatcher Oct 12, 2020
@RaasAhsan
Copy link

Regarding where this lives, I feel like if this is going to be the spiritual successor to ConcurrentEffect and UnsafeRun, it should just go into kernel. If libraries like http4s and fs2 are likely going to be pulling this in, I'm sure they (and downstream libraries + users) would be happy to avoid a dependency on std.

@djspiewak
Copy link
Member Author

Regarding where this lives, I feel like if this is going to be the spiritual successor to ConcurrentEffect and UnsafeRun, it should just go into kernel. If libraries like http4s and fs2 are likely going to be pulling this in, I'm sure they (and downstream libraries + users) would be happy to avoid a dependency on std.

Possibly, but I kind of see the modules in the following way:

  • kernel – Dependency of datatypes (like Monix)
  • std – Dependency of libraries (like http4s/fs2)
  • core – Dependency of users and library test suites

So from that standpoint, it's okay to stay in std, since everyone who wants it would already have this dependency.

@mpilquist
Copy link
Member

This is awesome.

@kubukoz
Copy link
Member

kubukoz commented Oct 12, 2020

I'll take a look tomorrow ☺️

Copy link
Member

@kubukoz kubukoz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I have no idea what's going on there so I can only look at it from a user's perspective 😅


import scala.concurrent.duration._

class DispatcherSpec extends BaseSpec {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having some examples of this being used with a callback-driven API would be nice (not necessarily in the form of tests)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I think this needs some more documentation. Probably just scaladoc alone is sufficient.

@djspiewak
Copy link
Member Author

@RaasAhsan The state machine is ready for a second review!

Copy link

@RaasAhsan RaasAhsan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple more super tiny comments :) I'm going to look at this more later, but 👍 for merging after these are addressed

F uncancelable { _ =>
for {
// for catching race conditions where we finished before we were in the map
completed <- F.ref(LongMap[Unit]())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only gripe with this is that it can turn into a very slight memory leak. Basically, completed won't be garbage collected until the last fiber in a batch terminates. I may be splitting hairs over this though, so feel free to rightfully ignore this :D

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is okay. It's a relatively small amount of state, and the only way to avoid this is to do Ref[Ref[LongMap[Unit]], but the outer Ref still wouldn't be collected.

cancelToken = () => unsafeToFuture(token)

// double-check to resolve race condition here
if (canceled) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a benign race condition here, but I'm going to mention it anyways:

  1. Unsafe cancel sets canceled to true.
  2. registerCancel sets the cancel token here.
  3. Unsafe cancel reads the cancelToken and runs it.
  4. registerCancel reads canceled to be true and runs the token.

The cancel token is pretty much just a fiber.cancel, so I don't think we're violating correctness or leaking references, but it may just be worth fixing :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry took me a little while to scrape together the brain cells to understand this. :-) So I actually thought about that case when I was writing this! I didn't bother trying to resolve it because we already know cancel to be idempotent, so I think we can leave it.

@djspiewak
Copy link
Member Author

This is released as 3.0-80f5cc5

@djspiewak djspiewak merged commit d5a2213 into typelevel:series/3.x Oct 18, 2020
@djspiewak djspiewak deleted the feature/dispatcher branch July 3, 2022 23:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants