Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrote Dispatcher #3923

Merged
merged 51 commits into from
Mar 5, 2024
Merged

Conversation

djspiewak
Copy link
Member

I have fixed many things. I possibly also made new problems. I think it's more maintainable now, but who am I to say?

I wrote a long comment in the middle which kind of explains the implementation, but the tldr is we're now very explicit about the impure submission queue and its associated race conditions. Those race conditions are fully resolved out by Worker, including the cancelation state machine. On the "actually do stuff" side is Executor, which is quite simple for the existing modes and quite complex for the new mode: sequentialCancelable.

Speaking of which, I added a third mode because I didn't realize it wasn't already the semantics of the second mode (sequential). Whoops. Anyway, we might not want to expose this one since I'm not sure anyone wants it, but I did all the work so here you go. Mostly I wanted to prove that it was possible. And it is… just hard.

Oh, I also removed and rewrote a bunch of tests which either weren't right, weren't reliable, or weren't interesting. I added a few new tests covering cases we probably should have always had. I don't think I missed anything, but who can say? Please enjoy.

@djspiewak
Copy link
Member Author

djspiewak commented Dec 31, 2023

Hard for me to tell if this is a legit failure or not, but there's at least one legit bug here somewhere. Got this locally just now:

[error]     ! not hang when cancelling
[error]      cats.effect.TestTimeoutException: null (Runners.scala:109)
[error] cats.effect.Runners.$anonfun$timeout$1(Runners.scala:109)
[error] cats.effect.unsafe.WorkStealingThreadPool.$anonfun$sleep$1(WorkStealingThreadPool.scala:685)
[error] cats.effect.unsafe.WorkStealingThreadPool.$anonfun$sleep$1$adapted(WorkStealingThreadPool.scala:685)
[error] cats.effect.unsafe.WorkerThread.run(WorkerThread.scala:740)

JVM, 2.12.18, ARM64 (parallel, await = false)

@djspiewak djspiewak changed the base branch from series/3.x to series/3.5.x January 12, 2024 15:43
@djspiewak djspiewak force-pushed the feature/new-dispatcher branch from 43a91a1 to 8f0d60c Compare January 12, 2024 15:43
@djspiewak
Copy link
Member Author

Hold on this until we can track down the uncaught supervisor state exceptions.

@durban durban mentioned this pull request Feb 5, 2024
@durban
Copy link
Contributor

durban commented Feb 5, 2024

I believe I've fixed one of the sources of the uncaught exceptions: #3991 (feel free to merge that branch into this one). The test affected was complete / cancel race. There was a race with multiple completions of a CancelRequested(latch) (with trySuccess), and one of these would also submit a Fiber#cancel to the cancelling-supervisor. But others would've already completed the latch, so the cancel-future would be completed, so the test could finish, so the supervisor could shut down. The fix in #3991 is to handle the state properly, and make sure there is only one completion of the latch.

There remain other sources of the same exception:

  • run multiple IOs in parallel: I don't really understand this test; it seems to intentionally shut down a dispatcher while things are still going on?
  • cancelation race does not block a worker: I think (but I'm not sure) this causes a Worker to finalize itself (with enqueing PoisonPill), then right after that the cancelling-supervisor finalizes itself (because it's the next Resource to close), but the queue of the worker is not empty, there could still be Registration.Finalizers in it, which are then submitted to the cancelling-supervisor.

@durban
Copy link
Contributor

durban commented Feb 5, 2024

For the sequential, non-cancelable dispatcher, is cancel() supposed to be a no-op? Or is the future returned by cancel() supposed to be completed only once the task is finished?

Because currently (i.e., in this PR) it nondeterministically behaves either way, based on whether cancel() wins or loses a race. I.e., the following test sometimes hangs, sometimes completes for Dispatcher.sequential[IO](await = true):

      "foobar" in real {
        D.use { dispatcher =>
          IO.fromFuture(IO {
            val (_, cancel) = dispatcher.unsafeToFutureCancelable(IO.never[Unit])
            val cancelFut = cancel()
            cancelFut
          })
        }.replicateA_(6).as(ok)
      }

@djspiewak
Copy link
Member Author

For the sequential, non-cancelable dispatcher, is cancel() supposed to be a no-op? Or is the future returned by cancel() supposed to be completed only once the task is finished?

I think the latter is the behavior I would expect. cancel() should basically behave as if the whole task is wrapped in uncancelable. Nondeterministically behaving in one way or the other sounds like a gloriously weird bug.

Also I'm chewing on your branch right now. Will merge and then get to work on some of the other bits.

@djspiewak
Copy link
Member Author

djspiewak commented Feb 10, 2024

I fixed the weird parallel IO test to avoid dependence on await/non-await semantics. I still see unhandled exceptions though. It's really difficult for me to reproduce those exceptions on this machine. Also because of parallel specs execution it's very possible the exceptions are coming from somewhere other than those tests, but I'm not sure.

Edit: Actually now that I think about it, the nondeterminism in sequential cancel() is consistent with what would happen if the action was just wrapped in uncancelable. Consider:

IO.never.uncancelable.start.flatMap(_.cancel)

This will nondeterministically hang depending on whether the cancel gets in before the fiber begins running. Same idea here. So I think it's kind of okay?

@durban
Copy link
Contributor

durban commented Feb 12, 2024

This will nondeterministically hang depending on whether the cancel gets in before the fiber begins running. Same idea here. So I think it's kind of okay?

Except in the case of start/cancel, the uncancelable task either (1) doesn't even start executing (and cancel finishes quickly), or (2) executes "completely", i.e., hangs in never (and cancel also hangs).

With this dispatcher (note, we're still talking about sequential, cancelable = false, await = true) I think there is a (3): the task starts executing, but cancel finishes before the task is completed.

This test fails:

      "foobar2" in real {
        D.use { dispatcher =>
          IO.ref(0).flatMap { ctr1 =>
            IO.ref(0).flatMap { ctr2 =>
              IO.fromFuture(IO {
                val (_, cancel) = dispatcher.unsafeToFutureCancelable(IO.uncancelable { _ =>
                  ctr1.update(_ + 1) *> IO.sleep(0.1.second) *> ctr2.update(_ + 1)
                })
                val cancelFut = cancel()
                cancelFut
              }).flatMap { _ =>
                // if we're here, `cancel()` finished, so
                // either the task didn't run at all (i.e.,
                // it was cancelled before starting), or
                // it ran and already finished completely:
                (ctr1.get, ctr2.get).flatMapN { (v1, v2) =>
                  IO(v1 mustEqual v2)
                }
              }
            }
          }
        }.replicateA_(10000).as(ok)
      }

@djspiewak
Copy link
Member Author

Ah, okay I buy that this is a problem then.

@djspiewak djspiewak added this to the v3.5.next milestone Feb 19, 2024
@djspiewak
Copy link
Member Author

Minor update: I've seen this test fail once, but it's very difficult to get it to fail at all on my laptop. Something about the timing I suspect. I may need some help tracking this one down.

@durban
Copy link
Contributor

durban commented Feb 21, 2024

I think I've fixed it. See #4012 (feel free to merge my branch directly into this PR).

The problem is with this comment:

// we can use unit as a cancel action here since it must always sequence *after* the task
// thus, the task must complete before the cancel action will be picked up

This is not (always) true:

  1. It is true if registerCancel in Worker modifies the state Unstarted -> Running(pure(())). In that case, cancel() will find the pure(()) in Running, and submit it to the same queue.
  2. However, if cancel() is faster, changes state Unstarted -> CancelRequested. Then registerCancel in Worker changes state CancelRequested -> Running(pure(())), then submits the pure(()) to the separate cancel-supervisor. The cancel-supervisor "executes" the pure(()), then completes the latch in CancelRequested (cancel() is waiting for this). What the cancel-supervisor does, is (or can be) parallel to Executor running the orignal task.

My fix (in 6a9cfa6) is to instead of the synthetic pure(()) cancellation action, wait for a Deferred for the task to complete. (Since we're ignoring cancellation, this should be fine in either case.) Without the fix, I could reliably reproduce the bug with the test in a3c285d. With the fix, I can't any more.

Copy link
Contributor

mergify bot commented Mar 4, 2024

⚠️ The sha of the head commit of this PR conflicts with #4012. Mergify cannot evaluate rules on this PR. ⚠️

@djspiewak
Copy link
Member Author

Magnificent fix @durban! Sorry I had been too busy to get to this. Snagged it now. Thank you!

@djspiewak
Copy link
Member Author

Let's call this good. I got the 👍 from @armanbilge just now to merge without waiting for him, since @durban in particular has been all up in this PR's business. If nothing else, this should make the build a lot more stable!

@djspiewak djspiewak merged commit 4fc5628 into typelevel:series/3.5.x Mar 5, 2024
31 of 32 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants