Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a keyword argument to disable multithreading #3030

Merged
merged 24 commits into from
Jun 12, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
([#3012](https://github.com/JuliaData/DataFrames.jl/issues/3012))
* Guarantee that `permute!` and `invpermute!` throw on invalid input
([#3035](https://github.com/JuliaData/DataFrames.jl/pull/3035))
* New experimental functions `DataFrames.singlethreaded` and `DataFrames.setmultithreading`
allow disabling multithreading in all DataFrames.jl operations
([#3030](https://github.com/JuliaData/DataFrames.jl/pull/3030))

## Previously announced breaking changes

Expand Down
21 changes: 17 additions & 4 deletions docs/src/lib/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@ CurrentModule = DataFrames

# Functions

## Multi-threading support

Selected operations in DataFrames.jl automatically use multiple threads when available.
It is task-based and implemented using the `@spawn` macro from Julia Base.
## Multithreading support

By default, selected operations in DataFrames.jl automatically use multiple threads
when available. It is task-based and implemented using the `@spawn` macro from Julia Base.
Multithreading can be disabled when running a particular block of code using the
[`DataFrames.singlethreaded() do... end`(@ref) syntax. It can also be disabled
globally by calling [`DataFrames.setmultithreading(true)`](@ref).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
globally by calling [`DataFrames.setmultithreading(true)`](@ref).
globally by calling [`DataFrames.setmultithreading(false)`](@ref).

DataFrames.setmultithreading(true) sounds like enabling multithreading, not disabling it. Maybe I'm misunderstanding the sentence though.

This is useful in particular to run functions which are not thread-safe, or when
distribution of work across threads is managed separately.
These functions are considered as experimental and may change or be removed once
a cross-package mechanism for multithreading configuration is developed.
nalimilan marked this conversation as resolved.
Show resolved Hide resolved

This is a list of operations that currently make use of multi-threading:
- `DataFrame` constructor with `copycols=true`; also recursively all functions
Expand Down Expand Up @@ -176,3 +183,9 @@ pairs
```@docs
isapprox
```

# Multithreading configuration (experimental)
```@docs
setmultithreading
singlethreaded
```
2 changes: 2 additions & 0 deletions docs/src/lib/internals.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ getmaxwidths
ourshow
ourstrwidth
@spawn_for_chunks
@spawn_or_async
@spawn_or_run
default_table_transformation
isreadonly
```
5 changes: 3 additions & 2 deletions src/dataframe/dataframe.jl
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ struct DataFrame <: AbstractDataFrame
# we write into columns as we know that it is guaranteed
# that it was freshly allocated in the outer constructor
@static if VERSION >= v"1.4"
if copycols && len >= 1_000_000 && length(columns) > 1 && Threads.nthreads() > 1
if copycols && ismultithreaded() &&
len >= 1_000_000 && length(columns) > 1 && Threads.nthreads() > 1
@sync for i in eachindex(columns)
Threads.@spawn columns[i] = _preprocess_column(columns[i], len, copycols)
end
Expand Down Expand Up @@ -533,7 +534,7 @@ function _threaded_getindex(selected_rows::AbstractVector,
df_columns::AbstractVector,
idx::AbstractIndex)
@static if VERSION >= v"1.4"
if length(selected_rows) >= 1_000_000 && Threads.nthreads() > 1
if length(selected_rows) >= 1_000_000 && ismultithreaded() && Threads.nthreads() > 1
new_columns = Vector{AbstractVector}(undef, length(selected_columns))
@sync for i in eachindex(new_columns)
Threads.@spawn new_columns[i] = df_columns[selected_columns[i]][selected_rows]
Expand Down
12 changes: 6 additions & 6 deletions src/groupeddataframe/complextransforms.jl
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ function _combine_rows_with_first!((firstrow,)::Ref{Any},
# Create up to one task per thread
# This has lower overhead than creating one task per group,
# but is optimal only if operations take roughly the same time for all groups
if VERSION >= v"1.4" && isthreadsafe(outcols, incols)
if VERSION >= v"1.4" && ismultithreaded() && isthreadsafe(outcols, incols)
basesize = max(1, cld(len - 1, Threads.nthreads()))
partitions = Iterators.partition(2:len, basesize)
else
Expand All @@ -273,11 +273,11 @@ function _combine_rows_with_first!((firstrow,)::Ref{Any},
tasks = Vector{Task}(undef, length(partitions))
for (tid, idx) in enumerate(partitions)
tasks[tid] =
@spawn _combine_rows_with_first_task!(tid, first(idx), last(idx), first(idx),
outcols, outcolsref,
type_widened, widen_type_lock,
f, gd, starts, ends, incols, colnames,
firstcoltype(firstmulticol))
@spawn_or_async _combine_rows_with_first_task!(tid, first(idx), last(idx), first(idx),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a lot of code that isn't thread-safe is also not safe for @async concurrency. Silly example:

julia> v = []
Any[]

julia> @sync for i = 1:100
           @async begin
               sleep(rand())
               push!(v, i)
           end
       end

julia> issorted(v)
false

Perhaps folks will push an ID to a list or something inside a combine, and expect singlethreaded() will give no concurrency so it works?

My actual concern is loading data with Downloads.jl or HTTP.jl; both have a lot of issues with concurrency right now, and even @async is enough to quickly lock up release versions of HTTP.jl. So if you do something like "for each group, download a file associated to that group and compute some statistic of it" (a thing I actually do) then this will have issues.

So I think it would be great if there was a way to disallow concurrent execution of a function altogether, from green threads or threads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jrevels pointed out we often don't really want to refer to threads but rather tasks, which kind of generalizes my point above. Maybe it should be singletask instead of singlethreaded? (or NotPure as suggested in the issue)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good point - I think @async should be also disabled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, too bad. I hoped we could support this without adding branches all over the place which make the code hard to read and add special code paths that need a specific test for each case. This applies in particular to splitapplycombine.jl:669. The only way to avoid adding branches I can see is having the @run_or_async macro run the code immediately, and return a pseudo-Task object that supports with and fetch but makes them no-ops. But that's quite complex too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we can simply run the passed expression and return a Task that returns its value. It's a bit of a waste but not worse than spawning multiple tasks when the number of threads is 1. The cost is negligible anyway, which I why we always spawn tasks in these functions.

outcols, outcolsref,
type_widened, widen_type_lock,
f, gd, starts, ends, incols, colnames,
firstcoltype(firstmulticol))
end

# Workaround JuliaLang/julia#38931:
Expand Down
7 changes: 3 additions & 4 deletions src/groupeddataframe/splitapplycombine.jl
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,7 @@ function _combine(gd::GroupedDataFrame,
for i in eachindex(cs_norm, optional_transform, tasks)
cs_i = cs_norm[i]
optional_i = optional_transform[i]

tasks[i] = @spawn if length(gd) > 0 && isagg(cs_i, gd)
tasks[i] = @spawn_or_async if length(gd) > 0 && isagg(cs_i, gd)
_combine_process_agg(Ref{Any}(cs_i), optional_i, parentdf, gd,
seen_cols, trans_res, idx_agg[])
elseif keeprows && cs_i isa Pair && first(last(cs_i)) === identity &&
Expand Down Expand Up @@ -761,8 +760,8 @@ function _combine(gd::GroupedDataFrame,

@sync for i in eachindex(trans_res)
let i=i
@spawn reorder_cols!(trans_res, i, trans_res[i].col, trans_res[i].col_idx,
keeprows, idx_keeprows, gd)
@spawn_or_run reorder_cols!(trans_res, i, trans_res[i].col, trans_res[i].col_idx,
keeprows, idx_keeprows, gd)
end
end

Expand Down
12 changes: 6 additions & 6 deletions src/groupeddataframe/utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ function row_group_slots(cols::NTuple{N, AbstractVector},
refmap
end
@sync for (seeni, range_chunk) in zip(seen_vec, range_chunks)
@spawn for i in range_chunk
@spawn_or_run for i in range_chunk
@inbounds begin
local refs_i
let i=i # Workaround for julia#15276
Expand All @@ -378,7 +378,7 @@ function row_group_slots(cols::NTuple{N, AbstractVector},
end
else
@sync for (seeni, range_chunk) in zip(seen_vec, range_chunks)
@spawn for i in range_chunk
@spawn_or_run for i in range_chunk
@inbounds begin
local refs_i
let i=i # Workaround for julia#15276
Expand Down Expand Up @@ -414,10 +414,10 @@ function row_group_slots(cols::NTuple{N, AbstractVector},
else
xl = view(x, 1:len ÷ 2)
xr = view(x, len ÷ 2 + 1:len)
t1 = @spawn reduce_or!(xl)
t2 = @spawn reduce_or!(xr)
fetch(t1)
fetch(t2)
@sync begin
@spawn_or_run reduce_or!(xl)
@spawn_or_run reduce_or!(xr)
end
xl[1] .|= xr[1]
end
return
Expand Down
5 changes: 3 additions & 2 deletions src/join/composer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ function compose_inner_table(joiner::DataFrameJoiner,
left_ixs, right_ixs = find_inner_rows(joiner)

@static if VERSION >= v"1.4"
if Threads.nthreads() > 1 && length(left_ixs) >= 1_000_000
if ismultithreaded() && Threads.nthreads() > 1 && length(left_ixs) >= 1_000_000
dfl_task = Threads.@spawn joiner.dfl[left_ixs, :]
dfr_noon_task = Threads.@spawn joiner.dfr[right_ixs, Not(joiner.right_on)]
dfl = fetch(dfl_task)
Expand Down Expand Up @@ -238,7 +238,8 @@ function _compose_joined_table(joiner::DataFrameJoiner, kind::Symbol, makeunique
@assert col_idx == ncol(joiner.dfl_on) + 1

@static if VERSION >= v"1.4"
if Threads.nthreads() > 1 && target_nrow >= 1_000_000 && length(cols) > col_idx
if ismultithreaded() && Threads.nthreads() > 1 &&
target_nrow >= 1_000_000 && length(cols) > col_idx
@sync begin
for col in eachcol(dfl_noon)
cols_i = left_idxs[col_idx]
Expand Down
194 changes: 193 additions & 1 deletion src/other/utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,115 @@ end

funname(c::ComposedFunction) = Symbol(funname(c.outer), :_, funname(c.inner))


const SINGLETHREADING = Threads.Atomic{Bool}(false)
const SINGLETHREADING_DEPTH = Threads.Atomic{Int}(0)

ismultithreaded() = SINGLETHREADING_DEPTH[] == 0

"""
DataFrames.singlethreaded(f)

Run function `f` while disabling multithreading in all DataFrames.jl operations.
This is useful in particular to run functions which are not thread-safe, or when
distribution of work across threads is managed separately.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this API design for a while, I personally find it limiting to mix thread-safety and resource control issues.

A more granular fix for thread-safety issues is to make it configurable per-invocation basis. For example, the function f used as combine(gd, :y => f) may not be safe to be executed on multiple tasks. However, f on its own may invoke DataFrames APIs that can be safely parallelized. However, since these APIs are executed within the invocation of combine(gd, :y => f), a context-based solution cannot (easily) be used. (Reading @nalimilan's comment #2988 (comment), there may be a PR to add the keyword argument later. But I think it'd be better to not advertise singlethreaded as a mechanism for thread-safety if so.)

On the other hand, for resource control, a context-based solution is a good choice. However, for making composable ecosystem for parallel computing, I think it is better to treat "parallelized context" (= singlethreaded-like-API) as a hint. If you are doing gropuby of a large DataFrame in a child task where there are only 4 parent tasks on 8 core machines, it probably still is a better choice to parallelize groupby. So, from this perspective, I'd just try to be very conservative when choosing basesize in the "parallelized context." Furthermore, since this works only if this API is used across packages, I suggest creating a simple package for this and use it in DataFrames. (A small upside is that the version of said API does not have to be tied to DataFrames version. Since it is purely a performance hint, it may not have to be considered breaking if DataFrames.jl stops supporting this.)

Combining the above two comments, what I suggest is to only provide a keyword argument for supporting thread-unsafe user-defined functions as a surface DataFrames API. DataFrames can internally start relying on an external package for performance hints and let programmers who want to tune the performance invoke the external package.

All that said, I also realize these subtle differences may be hard to understand for many (especially new) Julia programmers. I understand DataFrames.jl needs to keep API very simple to be user-friendly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of separating thread-safety and resource control is very good.

@tkf - do I understand you correctly that you propose that:

  1. For now we in DataFrames.jl focus only on thread safety issues.
  2. Resource control should be for now left out of the scope of changes in DataFrames.jl and we should wait till this "extra package" is made available.

If this is correct then is there any concrete plan/timeline that could be sketched for this "extra package"?

@jpsamaroo - what do you think about this proposal?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nalimilan - what @tkf proposes is a very good. But maybe for simplicity what we should do now is:

  1. define setmultithreading only for now as a poor man's solution for both thread safety and resource control.
  2. later also support whatever the "extra package" adds

The reason is that I believe setmultithreading API is simple for users to understand. It will be marked as experimental so we can remove it when we have something better.

I know that this is not a super clean design but we could signal to the users that we are planning to have something better in the future but this is a simple temporary solution. What do you think?

The alternative, if we follow @tkf suggestion is to add singlethreaded::Bool=false kwarg to all functions that take a function and potentially do multi-threading.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another idea (probably not very good but it came to my mind so I am sharing it 😄):

for thread-safety we could define wrapper NotPure(fun) and then write e.g. :a => NotPure(x -> rand()) => :b and then we would not even have to add any kwarg.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do I understand you correctly

Yes, I think you do!

Also, I created ParallelismHints.jl (ref tkf/ParallelismHints.jl#1) as a POC "external package" that I was talking about.

for thread-safety we could define wrapper NotPure(fun) and then write e.g. :a => NotPure(x -> rand()) => :b and then we would not even have to add any kwarg.

I actually like this idea very much 👍 and even actually wanted to suggest this. For example, not a DataFrames API but mapreduce(f, NotPure(op), xs) is somewhat parallelizable. But I suggested the keyword argument because it may look "simpler" in some sense. OTOH, people use API like ByRow so maybe it's not crazy?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the NotPure approach is interesting as it works generally without having to add arguments everywhere. It also allows passing a wrapped function through several call levels if needed. We already considered a similar approach for map on PooledArray (JuliaData/PooledArrays.jl#63).

Julia recently gained support for declaring various levels of purity (JuliaLang/julia#43852), but AFAICT there's no way to mark a function as not pure given that it's the default, nor to mark a function as thread-unsafe.


*See also*: [`DataFrames.setmultithreading`](@ref) to disable multithreading globally

!!! note

This function is considered as experimental and may change or be removed once
a cross-package mechanism for multithreading configuration is developed.

Currently, it disables multithreading for any DataFrames.jl
operations which may be run while `f` is running (e.g. if tasks using data
frames have been spawned on multiple threads).
This may change in the future.

# Examples
```jldoctest
julia> df = DataFrame(x=repeat(1:5, inner=2), y=1:10);

julia> gd = groupby(df, :x);

julia> counter = 0;

julia> f(x) = (sleep(0.1); global counter += 1); # Thread-unsafe function

julia> DataFrames.singlethreaded() do
combine(gd, :y => f)
end
5×2 DataFrame
Row │ x y_f
│ Int64 Int64
─────┼──────────────
1 │ 1 1
2 │ 2 2
3 │ 3 3
4 │ 4 4
5 │ 5 5
```
"""
function singlethreaded(f)
Threads.atomic_add!(SINGLETHREADING_DEPTH, 1)
try
return f()
finally
Threads.atomic_sub!(SINGLETHREADING_DEPTH, 1)
end
end

"""
DataFrames.setmultithreading(enable::Bool)

Enable or disable multithreading permanently in all DataFrames.jl operations.
This is useful in particular to run functions which are not thread-safe, or when
distribution of work across threads is managed separately.

*See also*: [`DataFrames.singlethreaded`](@ref) to disable multithreading only
for a specific code block

!!! note

This function is considered as experimental and may change or be removed once
a cross-package mechanism for multithreading configuration is developed.

# Examples
```jldoctest
julia> df = DataFrame(x=repeat(1:5, inner=2), y=1:10);

julia> gd = groupby(df, :x);

julia> counter = 0;

julia> f(x) = (sleep(0.1); global counter += 1); # Thread-unsafe function

julia> DataFrames.setmultithreading(false);

julia> combine(gd, :y => f)
5×2 DataFrame
Row │ x y_f
│ Int64 Int64
─────┼──────────────
1 │ 1 1
2 │ 2 2
3 │ 3 3
4 │ 4 4
5 │ 5 5

julia> DataFrames.setmultithreading(true);
```
"""
function setmultithreading(enable::Bool)
old_state = Threads.atomic_xchg!(SINGLETHREADING, !enable)
if !enable && !old_state
Threads.atomic_add!(SINGLETHREADING_DEPTH, 1)
elseif enable && old_state
Threads.atomic_sub!(SINGLETHREADING_DEPTH, 1)
end
return enable
end

# Compute chunks of indices, each with at least `basesize` entries
# This method ensures balanced sizes by avoiding a small last chunk
function split_indices(len::Integer, basesize::Integer)
Expand Down Expand Up @@ -159,7 +268,7 @@ if VERSION >= v"1.4"

nt = Threads.nthreads()
len = length(x)
if nt > 1 && len > basesize
if ismultithreaded() && nt > 1 && len > basesize
tasks = [Threads.@spawn begin
for i in p
local $(esc(lidx)) = @inbounds x[i]
Expand Down Expand Up @@ -215,6 +324,89 @@ macro spawn_for_chunks(basesize, ex)
return _spawn_for_chunks_helper(ex.args[1], ex.args[2], basesize)
end

"""
@spawn_or_async expr

Equivalent to `Threads.@spawn` if `DataFrames.ismultithreaded() === true`
and to `@async` otherwise.
"""
macro spawn_or_async end

"""
@spawn_or_run expr

Equivalent to `Threads.@spawn` if `DataFrames.ismultithreaded() === true`,
otherwise simply runs `expr`.
"""
macro spawn_or_run end

if VERSION >= v"1.4"
macro spawn_or_async(expr)
letargs = Base._lift_one_interp!(expr)

thunk = esc(:(()->($expr)))
var = esc(Base.sync_varname)
quote
bkamins marked this conversation as resolved.
Show resolved Hide resolved
let $(letargs...)
local task = Task($thunk)
task.sticky = !DataFrames.ismultithreaded()
if $(Expr(:islocal, var))
@static if VERSION >= v"1.5.0"
put!($var, task)
else
push!($var, task)
end
end
schedule(task)
task
end
end
end

macro spawn_or_run(expr)
letargs = Base._lift_one_interp!(expr)

thunk = esc(:(()->($expr)))
var = esc(Base.sync_varname)
quote
let $(letargs...)
if DataFrames.ismultithreaded()
local task = Task($thunk)
task.sticky = false
if $(Expr(:islocal, var))
@static if VERSION >= v"1.5.0"
put!($var, task)
else
push!($var, task)
end
end
schedule(task)
else
$thunk()
end
nothing
end
end
end
else
# This is the definition of @async in Base
macro spawn_or_async(expr)
thunk = esc(:(()->($expr)))
var = esc(Base.sync_varname)
quote
local task = Task($thunk)
if $(Expr(:isdefined, var))
push!($var, task)
end
schedule(task)
end
end

macro spawn_or_run(expr)
esc(:($expr; nothing))
end
end

function _nt_like_hash(v, h::UInt)
length(v) == 0 && return hash(NamedTuple(), h)

Expand Down
Loading