Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single threaded alternative processor #773

Merged
merged 14 commits into from
Jun 10, 2024

Conversation

JelleAalbers
Copy link
Member

@JelleAalbers JelleAalbers commented Nov 2, 2023

What is the problem / what does the code in this PR do
This adds a single-threaded alternative processor backend that avoids the mailbox system and uses less memory.

Strax has a custom-built concurrency backend (the 'mailbox system'). It works, but it has problems and we hope to eventually replace it. @jmosbacher has done much work towards this; see also the discussion in #81.

A single-threaded processor won't work for the DAQ, but it could help reprocessing, analysis, and debugging:

  • Use less memory, by only reading in data when it is strictly needed and deleting it as soon as possible. Mailboxes with max_workers=1 / lazy mode do a reasonable job at this already, so the difference should not be dramatic in practice.
  • Avoid hangs, deadlocks, and errors from mailboxes. There are places in strax where exceptions don't get reported but instead cause a mailbox hang, which is annoying during development.
  • Make it easy to track how much time is spent on a plugin. We could add a printout of how long each plugin took, as we had in pax. (Profilers also give you that info and much besides, but you don't enable them in production as they slow things down.) Even without that, it is easier to read and reason about the debug log, since you know only one thing happens at a time.
  • Provide some alternative processor, available as a fallback and minimal reference implementation for fancier processing backends.

Can you briefly describe how it works?

I started from a commit from Yossi's #410 to allow processors to be selected at runtime by the context. Currently BaseProcessor is a bit empty, we can see if there is more we can generalize to there.

The SingleThreadProcessor is an alternative to the ThreadedMailboxProcessor. To keep the processor classes comparable, I put most of the mailbox-replacing logic into a PostOffice class. There is only one PostOffice instance per processor, it handles all the data types ('topics'), and it is much simpler than a single mailbox since it needs no threading or locking. We could eventually split this off into a separate package, maybe together with mailboxes.

The trickiest part was dealing with rechunking in savers. The current code assumed it had its own thread available to independently pull on an iterator in save_from. In single-threaded processing we instead have to send chunks to _save_chunk individually (as we do in multiprocessing), so then the rechunking logic is skipped. I thus factored it out into a separate Rechunker class and wrapped that around the Saver when we are single-threading. Might be useful to have the rechunking logic separate anyway.

For testing, I let some tests in test_core run on both processors, and there are some asserts. Maybe you'd like to see some dedicated unit tests and docs as well, let me know.

The default processor is not changed, you have to add processor='single_thread' to your get_array/get_df/etc call to switch to the single-threaded processor. I would propose we test it in some reprocessing jobs first, if it works, we could then make it the default for max_workers=1.

Can you give a minimal working example (or illustrate with a figure)?
This shows the mprof output for the full processing of a tiny (~90 second) background run, starting from lz4 raw records on my laptop, with all needed online resources already downloaded. First for the current mailbox processing:

st.make('026195', 'event_basics')

mailbox

and for single-threaded processing:

st.make('026195', 'event_basics', processor='single_thread')

single_thread

For reviewing, note the number of lines changed is deceptively large. The stuff in strax.processor basically just got moved to strax.processors.threaded_mailbox. I kept strax.mailbox in place since mailboxes also used directly in the rechunker script. (And strax.processor is now present only to keep an old straxen test running that imports from it directly.)

@JelleAalbers JelleAalbers changed the title Single threaded alterantive processor Single threaded alternative processor Nov 2, 2023
@coveralls
Copy link

coveralls commented Nov 20, 2023

Coverage Status

coverage: 90.645% (-0.08%) from 90.724%
when pulling 89a643a on JelleAalbers:single_thread
into b55b0d3 on AxFoundation:master.

@JelleAalbers JelleAalbers marked this pull request as ready for review November 20, 2023 13:36
@dachengx dachengx self-requested a review November 21, 2023 12:58
@FaroutYLq FaroutYLq self-requested a review December 19, 2023 16:54
@FaroutYLq
Copy link
Collaborator

@JelleAalbers big thanks for the deep deep PR! We will review before next round of heavy low-level processing, for which I think this PR will mostly benefit about.

@dachengx
Copy link
Collaborator

Hey @JelleAalbers , can I resolve the conflicts and push to your forked repo?

@dachengx
Copy link
Collaborator

dachengx commented Jun 9, 2024

Thanks @JelleAalbers . This finally motivated me to read through and understand more about the ThreadedMailboxProcessor. Would you please educate me on why the SingleThreadProcessor uses less memory than the ThreadedMailboxProcessor?

In particular:

Use less memory, by only reading in data when it is strictly needed and deleting it as soon as possible.
At which lines of codesThreadedMailboxProcessor does not delete data when the data is not really needed?

Copy link
Collaborator

@dachengx dachengx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @JelleAalbers I tested the PR with no error or different result.

@JelleAalbers
Copy link
Member Author

Hi Dacheng, great! Thanks for testing.

In threaded mode, the operating system ultimately controls what strax code runs when (though we guide this with various locks/conditions in lazy mode). I think it's possible a thread gets interrupted for a while between when it does the final operation on some data and when it deletes it.

One dirty optimization definitely helped a little: https://github.com/AxFoundation/strax/pull/773/files#diff-0052f4762c34c9846086909453739399c2c86e4ce7cd5f15a39553f8dab348eeR209. It's explained more in the stackoverflow linked below that line. Basically, if you yield and then del data, the data is still kept around until the next time the iterator is run. So you have to del first, keep the data in an anonymous container, and pop it when you yield. This is nothing thread-specific though, and I think there are other places in strax where we could do this. (But the marginal savings might not be worth it.)

I hope the single threaded processor allows better tracking of where and when the memory and time is used. If so, we might get more savings in the future. When I looked at it, there was extra memory of about a chunk worth of data allocated somewhere outside the python layer -- at least no python tool I tried could find a reference to it. (There was no leak, i.e. it did not grow, so maybe this was just some kind of preallocation?)

@dachengx
Copy link
Collaborator

@JelleAalbers Yes the PR helps a lot with tracking the memory usage. Thanks for the explanation. I will merge the PR after the testing is finished.

@dachengx dachengx merged commit d0cbc1b into AxFoundation:master Jun 10, 2024
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants