Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a keyword argument to disable multithreading #3030

Merged
merged 24 commits into from
Jun 12, 2022
Merged

Conversation

nalimilan
Copy link
Member

First step towards fixing #2988. The first commits implements this by checking the global flag everywhere we call @spawn, while the other one defines macros to centralize the check, giving a cleaner code but a more complex implementation.

Maybe we'd better mark the API as internal/experimental for now just in case a more general mechanism is developed in Julia? For example, ideally DTable could just annotate the relevant parts with e.g. @singlethreading and all @spawn calls would automatically switch to @async.

src/DataFrames.jl Outdated Show resolved Hide resolved
Call `DataFrames.MULTITHREADING[] = false` to disable multithreading,
for example because distribution of work across threads is managed separately.
"""
const MULTITHREADING = Threads.Atomic{Bool}(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe set it to Threads.nthreads() > 1? I.e. when there is only a single thread we disable multi-threading? Intuitively it should be a bit faster (but I have not looked at the code below yet, so maybe it does not matter)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I tried to do that so that we don't need to also check Threads.nthreads() > 1, but I realized that it would be suboptimal to spawn tasks if for some reason users set MULTITHREADING to true when Threads.nthreads() == 1. And this could happen if somebody does DataFrames.MULTITHREADING[] = false; ... something thread-unsafe... DataFrames.MULTITHREADING[] = true.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I am just asking about what the default value for this variable should be when DataFrames.jl is loaded.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Then I tend to think always using true by default is simpler. The performance gain you refer to would be due to not having to check Threads.nthreads() > 1? That's probably negligible, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in line 264 (comment below) - you do not check for this. Maybe add a check and then we are OK as you propose.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that adding support for toggling arbitrary global flags in Dagger leads to unpredictable and non-composable behavior (because now other unrelated code sees the effect of Dagger changing this flag, and we also need to track the value of the flag per-worker, which adds extra overhead and complexity to Dagger).

Basically, in the absence of a composable mechanism (i.e. context variables), it's on the user to change this flag to something that is preferred. That's not a big deal, it's just something we need to document and communicate.

I'm sorry for not pointing this all out earlier.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me add some context to my decision with an example:

A user has a continuous processing pipeline that uses DataFrames. They're very happy with it, and the new DataFrames multithreading feature has made it fast and efficient.

This user now wants to add an HTTP server to this process that takes some of the results of this pipeline in real-time and serves them to clients. To do this, they use Dagger to spawn off some pre-processing operations using the DTable, which will produce the results that will be sent to the requesting client.

Now, they tested this new piece in isolation with test data, and it worked out really well, and the DTable effectively turned off DataFrames multithreading to ensure that the pre-processing tasks for HTTP clients ran efficiently.

With both pieces working well in isolation, they combine the codes together and run them in the same session (because they don't need a bunch of overhead from serializing data between workers). Suddenly, their continuous processing pipeline is struggling to perform well (because it's now running single-threaded), while the HTTP clients are being (quickly and efficiently) served stale data.

After some investigation, they find out what Dagger is doing, and are unhappy that simply using Dagger for data pre-processing slows down and effectively breaks their whole application. They file an issue on Dagger, and I have to point out that because of a certain design decision, this behavior is "expected". They are unhappy and stop using Dagger, opting to instead use some other approach. Dagger loses an interested user, and the user loses the benefits of Dagger's DTable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jpsamaroo - I fully understand that (and kind of expected - that is why I have asked for an explicit information from your side as we really want DataFrames.jl to play nicely with Dagger.jl in the long run). The problem is that even the singlethreading is not "strong enough" to give you what you want as it is using a shared counter (@tkf - please corret me if I am wrong).

Therefore what I propose to do:

  • for now use DataFrames.jl specific global getter/setter of multi threading that is intended to be handled on user's side
  • when we have a fully working and stable context passing solution in Dagger.jl we will add functionality that will respect this context (we will decide how to do it when the context-passing solution will be implemented in Dagger.jl)

OK?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That all sounds good to me!

Once context variables are used in DataFrames (either via ContextVariablesX or some Base-provided API) I will be happy to have Dagger propagate these variables (although I may add a new package that users can load to turn this on automatically, because this propagation still should be opt-in to be safe; automatically propagating a pointer, for example, would be very bad 😄).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I said, I designed singlethreaded(f) specifically to make it comptaible with context variable. The implementation can be switched to use a context variable once it's available. In fact, it can also be implemented using ContextVariablesX right now. This is exactly why I suggested separating UI and API (or ditching the enable/diable UI is also OK).

@@ -261,7 +261,7 @@ function _combine_rows_with_first!((firstrow,)::Ref{Any},
# Create up to one task per thread
# This has lower overhead than creating one task per group,
# but is optimal only if operations take roughly the same time for all groups
if VERSION >= v"1.4" && isthreadsafe(outcols, incols)
if VERSION >= v"1.4" && MULTITHREADING[] && isthreadsafe(outcols, incols)
Copy link
Member

@bkamins bkamins Mar 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern has a problem that we need to resolve before merging.

The problem is that after this check abut before you call @spawn_or_async (or @spawn_or_run) the value of MULTITHREADING[] might change.

What I propose is to set a local variable in this if part that captures if multi-threading should be used and pass this flag to @spawn_or_async.

You know the code better, so maybe this will not cause problems in any place the way you implemented it now, but it seems it would be safer to do this the way I propose.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I don't think it can be a problem though. In both cases we create a task, the only difference is whether we set t.sticky to true or false.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(from #3030 (comment))

in line 264 (comment below) - you do not check for this. Maybe add a check and then we are OK as you propose.
(or maybe what we have is equivalent it seems)

Yes, the two branches are equivalent when Threads.nthreads() == 1, which is why we don't check that currently. I guess we could add it for clarity though.

Suggested change
if VERSION >= v"1.4" && MULTITHREADING[] && isthreadsafe(outcols, incols)
if VERSION >= v"1.4" && MULTITHREADING[] && Threads.nthreads() > 1 && isthreadsafe(outcols, incols)

@nalimilan nalimilan changed the title Add a global flag do disable multithreading Add a global flag to disable multithreading Mar 27, 2022
@bkamins
Copy link
Member

bkamins commented Apr 8, 2022

@nalimilan - I have discussed with @jpsamaroo this PR.

The conclusion is that it would be best to avoid global flag setting mechanism. Instead going for singlethreading(f) (possibly undexported and annotated as experimental) wrapper would be better. The reason is that this way we would teach the users the API that would be more similar to target API. Later when context passing in Base Julia would be available we would just replace the internal temporary mechanizm handling of disabling multi-threading with the correct one.

What do you think about it?

docs/src/lib/functions.md Outdated Show resolved Hide resolved
docs/src/lib/functions.md Outdated Show resolved Hide resolved
src/other/utils.jl Outdated Show resolved Hide resolved
@bkamins bkamins linked an issue Apr 9, 2022 that may be closed by this pull request
src/other/utils.jl Outdated Show resolved Hide resolved
docs/src/lib/internals.md Outdated Show resolved Hide resolved
src/other/utils.jl Outdated Show resolved Hide resolved
@bkamins
Copy link
Member

bkamins commented Apr 9, 2022

For @spawn_or_acync and @spawn_or_run can you please add link to the source reference used to implement them (I want to check the approach to macro hygene in particular as it is always tricky).

@bkamins
Copy link
Member

bkamins commented Apr 9, 2022

@nalimilan followed your suggestions in the implementation.
I have checked the PR and now the only things that I think are left are documentation related.

@jpsamaroo + @tkf : your review at this point would be valuable (of course can you please focus on the API + implementation of multi-threading macros; I have checked all the places where they are used and this part is correct). Thank you!

@nalimilan
Copy link
Member Author

nalimilan commented Apr 9, 2022

For @spawn_or_acync and @spawn_or_run can you please add link to the source reference used to implement them (I want to check the approach to macro hygene in particular as it is always tricky).

For @spawn_or_async on Julia < 1.4 I copied the implementation of @async on older Julia releases: https://github.com/JuliaLang/julia/blob/v1.0.5/base/task.jl#L258
For @spawn_or_async and spawn_or_run on Julia >= 1.4 I adapted the implementation of @spawn which is similar to that of @async: https://github.com/JuliaLang/julia/blob/73c98630879c36a5336226e0d691bb7f80ef0e37/base/threadingconstructs.jl#L255 https://github.com/JuliaLang/julia/blob/73c98630879c36a5336226e0d691bb7f80ef0e37/base/task.jl#L487

@nalimilan
Copy link
Member Author

Yeah I also wonder whether a keyword argument wouldn't be more appropriate if we keep the idea that having a single Serial operation should disable all parallelism. That would probably also be simpler for users. The downside would be that you wouldn't be able to attach that information to the function that you can then easily pass down a call stack (you'd have to pass the value of the keyword argument separately).

@bkamins
Copy link
Member

bkamins commented Apr 25, 2022

The downside would be that you wouldn't be able to attach that information to the function that you can then easily pass down a call stack (you'd have to pass the value of the keyword argument separately).

Yes, but if you need to wrap something in Serial your code already is DataFrames.jl specific so I think kwarg would be equally acceptable.

Copy link
Member

@bkamins bkamins left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the conclusion is to change the implementation to use kwarg meaning that we want "serial" execution.

@nalimilan - would you have time to implement it? Thank you!

This kwarg should go to:
combine, select, select!, transform, transform!, subset, subset! (both for AbstractDataFrame and GroupedDataFrame).

@nalimilan
Copy link
Member Author

Last commit implements the keyword argument approach. I'm not sure what would be the best name for that argument though. Searching for code via JuliaHub shows no pattern, some packages use multithreaded, others threaded, threading, multithreading or multi_threading (and CSV.jl uses ntasks, but that's a bit different). Most packages doing this have much fewer users than us so it looks like we are in the position of influencing the convention. Maybe we could ask for opinions e.g. on Slack.

test/grouping.jl Outdated Show resolved Hide resolved
Copy link
Member

@bkamins bkamins left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a minor docstring layout comment. Thank you!

@bkamins
Copy link
Member

bkamins commented Jun 6, 2022

CI is failing :(.

@nalimilan nalimilan changed the title Add a global flag to disable multithreading Add a keyword argument to disable multithreading Jun 12, 2022
@bkamins
Copy link
Member

bkamins commented Jun 12, 2022

Looks good

@nalimilan nalimilan merged commit a682022 into main Jun 12, 2022
@nalimilan nalimilan deleted the nl/multithreading branch June 12, 2022 15:47
@bkamins
Copy link
Member

bkamins commented Jun 12, 2022

Bravo!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Is possible to restrict number of threads?
5 participants