Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming thread safety #581

Closed
wants to merge 23 commits into from
Closed

Conversation

JamesWrigley
Copy link
Collaborator

There's a few changes in here, I would recommend reviewing each commit individually. The most important one is 96a0c46.

Side note: I see that we have both take!(::StreamStore, ::UInt) and take!(::StreamingValue). The former is unused AFAICT, and it differs from the latter in that it will wait for the store to be notified when empty instead of spinning. Should we use it instead? Or delete it?

JamesWrigley and others added 23 commits November 26, 2024 12:28
Because it doesn't actually do anything now.
Using `myid()` with `workers()` meant that when the context was initialized with
a single worker the processor list would be: `[OSProc(1), OSProc(1)]`. `procs()`
will always include PID 1 and any other workers, which is what we want.
This shouldn't be necessary since we `wait()` for the given task in another
`errormonitor()`'d task.
This was observed when running the `Single task running forever` test with
multiple threads.
Necessary because `stream_pull_values!()` may throw different exceptions
depending on whether the exception occurred locally or remotely.
Otherwise it's possible that the scheduler will close the DTask before all
outputs have been sent, which would cause the downstream tasks to hang. This is
how it could happen:
1. A streaming task starts.
2. The output handler task calls `take!(::ProcessRingBuffer)` on an output
   buffer, finds it empty, and `yield()`'s.
3. The task executes, pushes its output to the output buffers, reaches `max_evals`
   and finishes.
4. The scheduler finishes the corresponding DTask.
5. The `take!(::ProcessRingBuffer)` call resumes. The buffer isn't empty anymore
   but it calls `task_may_cancel(; must_force=true)` before continuing and
   throws an exception since the scheduler has finished the DTask. The result is
   that the last output is never sent, and the exeption is swallowed by the
   output handler started by `initialize_output_stream!()`.
6. Downstream tasks don't get that last result so they never reach `max_evals`
   and spin forever.

Fixed by storing the handler tasks in the `StreamStore` and closing them in
`close(::StreamStore)`.

Also increased the timeout of the 'Single task running forever' task because it
will sometimes timeout before the default 10s is up.
@JamesWrigley JamesWrigley self-assigned this Dec 1, 2024
@JamesWrigley
Copy link
Collaborator Author

@jpsamaroo jpsamaroo force-pushed the jps/stream2 branch 4 times, most recently from 16827af to a765cbe Compare December 9, 2024 16:43
@jpsamaroo
Copy link
Member

Most of these changes were cherry-picked or manually ported into #463, which is generally passing CI, so I'm going to close this. Thanks for putting these fixes together!

@jpsamaroo jpsamaroo closed this Dec 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants