Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a keyword argument to disable multithreading #3030

Merged
merged 24 commits into from
Jun 12, 2022
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
* Add `resize!`, `keepat!`, `pop!`, `popfirst!`, and `popat!`,
make `deleteat!` signature more precise
([#3047](https://github.com/JuliaData/DataFrames.jl/pull/3047))
* New `threads` argument allows disabling multithreading in
`combine`, `select`, `select!`, `transform`, `transform!`, `subset` and `subset!`
([#3030](https://github.com/JuliaData/DataFrames.jl/pull/3030))

## Previously announced breaking changes

Expand Down
9 changes: 6 additions & 3 deletions docs/src/lib/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ CurrentModule = DataFrames

# Functions

## Multi-threading support
## Multithreading support

Selected operations in DataFrames.jl automatically use multiple threads when available.
It is task-based and implemented using the `@spawn` macro from Julia Base.
By default, selected operations in DataFrames.jl automatically use multiple threads
when available. It is task-based and implemented using the `@spawn` macro from Julia Base.
Functions that take user-defined functions and may run it in parallel
accept a `threads` keyword argument which allows disabling multithreading
when the provided function requires serial execution or is not thread-safe.

This is a list of operations that currently make use of multi-threading:
- `DataFrame` constructor with `copycols=true`; also recursively all functions
Expand Down
2 changes: 2 additions & 0 deletions docs/src/lib/internals.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ getmaxwidths
ourshow
ourstrwidth
@spawn_for_chunks
@spawn_or_run_task
@spawn_or_run
default_table_transformation
isreadonly
```
28 changes: 19 additions & 9 deletions src/abstractdataframe/reshape.jl
Original file line number Diff line number Diff line change
Expand Up @@ -200,15 +200,15 @@ end
unstack(df::AbstractDataFrame, rowkeys, colkey, value;
renamecols::Function=identity, allowmissing::Bool=false,
allowduplicates::Bool=false, valuestransform=nothing,
fill=missing)
fill=missing, threads::Bool=true)
unstack(df::AbstractDataFrame, colkey, value;
renamecols::Function=identity, allowmissing::Bool=false,
allowduplicates::Bool=false, valuestransform=nothing,
fill=missing)
fill=missing, threads::Bool=true)
unstack(df::AbstractDataFrame;
renamecols::Function=identity, allowmissing::Bool=false,
allowduplicates::Bool=false, valuestransform=nothing,
fill=missing)
fill=missing, threads::Bool=true)

Unstack data frame `df`, i.e. convert it from long to wide format.

Expand Down Expand Up @@ -244,6 +244,10 @@ Row and column keys will be ordered in the order of their first appearance.
default is `missing`. If the `value` column is a `CategoricalVector` and
`fill` is not `missing` then in order to keep unstacked value columns also
`CategoricalVector` the `fill` must be passed as `CategoricalValue`
- `threads`: whether `valuestransform` may be run in separate tasks which
can execute in parallel (possibly being applied to multiple groups at the same time).
Whether or not tasks are actually spawned and their number are determined automatically.
Set to `false` if `valuestransform` requires serial execution or is not thread-safe.

# Examples

Expand Down Expand Up @@ -396,7 +400,8 @@ julia> unstack(df, :cols, :values, valuestransform=sum)
function unstack(df::AbstractDataFrame, rowkeys, colkey::ColumnIndex,
values::ColumnIndex; renamecols::Function=identity,
allowmissing::Bool=false, allowduplicates::Bool=false,
valuestransform=nothing, fill=missing)
valuestransform=nothing, fill=missing,
threads::Bool=true)
# first make sure that rowkeys are unique and
# normalize all selectors as a strings
# if some of the selectors are wrong we will get an early error here
Expand Down Expand Up @@ -428,7 +433,8 @@ function unstack(df::AbstractDataFrame, rowkeys, colkey::ColumnIndex,
# Ref that will get unwrapped by combine
agg_fun = Ref∘valuestransform
end
df_op = combine(gdf, values => agg_fun => values_out)
df_op = combine(gdf, values => agg_fun => values_out,
threads=threads)

group_rows = find_group_row(gdf)
if !issorted(group_rows)
Expand All @@ -452,19 +458,23 @@ end
function unstack(df::AbstractDataFrame, colkey::ColumnIndex, values::ColumnIndex;
renamecols::Function=identity,
allowmissing::Bool=false, allowduplicates::Bool=false,
valuestransform=nothing, fill=missing)
valuestransform=nothing, fill=missing,
threads::Bool=true)
colkey_int = index(df)[colkey]
value_int = index(df)[values]
return unstack(df, Not(colkey_int, value_int), colkey_int, value_int,
renamecols=renamecols, allowmissing=allowmissing,
allowduplicates=allowduplicates, valuestransform=valuestransform, fill=fill)
allowduplicates=allowduplicates, valuestransform=valuestransform,
fill=fill, threads=threads)
end

unstack(df::AbstractDataFrame; renamecols::Function=identity,
allowmissing::Bool=false, allowduplicates::Bool=false,
valuestransform=nothing, fill=missing) =
valuestransform=nothing, fill=missing,
threads::Bool=true) =
unstack(df, :variable, :value, renamecols=renamecols, allowmissing=allowmissing,
allowduplicates=allowduplicates, valuestransform=valuestransform, fill=fill)
allowduplicates=allowduplicates, valuestransform=valuestransform,
fill=fill, threads=threads)

# we take into account the fact that idx, starts and ends are computed lazily
# so we rather directly reference the gdf.groups
Expand Down
154 changes: 106 additions & 48 deletions src/abstractdataframe/selection.jl
Original file line number Diff line number Diff line change
Expand Up @@ -875,10 +875,14 @@ function select_transform!((nc,)::Ref{Any}, df::AbstractDataFrame, newdf::DataFr
end

"""
select!(df::AbstractDataFrame, args...; renamecols::Bool=true)
select!(args::Base.Callable, df::DataFrame; renamecols::Bool=true)
select!(gd::GroupedDataFrame, args...; ungroup::Bool=true, renamecols::Bool=true)
select!(f::Base.Callable, gd::GroupedDataFrame; ungroup::Bool=true, renamecols::Bool=true)
select!(df::AbstractDataFrame, args...;
renamecols::Bool=true, threads::Bool=true)
select!(args::Base.Callable, df::DataFrame;
renamecols::Bool=true, threads::Bool=true)
select!(gd::GroupedDataFrame, args...; ungroup::Bool=true,
renamecols::Bool=true, threads::Bool=true)
select!(f::Base.Callable, gd::GroupedDataFrame; ungroup::Bool=true,
renamecols::Bool=true, threads::Bool=true)

Mutate `df` or `gd` in place to retain only columns or transformations specified by `args...` and
return it. The result is guaranteed to have the same number of rows as `df` or
Expand Down Expand Up @@ -906,27 +910,40 @@ $TRANSFORMATION_COMMON_RULES
column names should include the name of transformation functions or not.
- `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data
frame or a `GroupedDataFrame`.
- `threads::Bool=true` : whether transformations may be run in separate tasks which
can execute in parallel (possibly being applied to multiple rows or groups at the same time).
Whether or not tasks are actually spawned and their number are determined automatically.
Set to `false` if some transformations require serial execution or are not thread-safe.

See [`select`](@ref) for examples.
"""
select!(df::DataFrame, @nospecialize(args...); renamecols::Bool=true) =
_replace_columns!(df, select(df, args..., copycols=false, renamecols=renamecols))

select!(df::SubDataFrame, @nospecialize(args...); renamecols::Bool=true) =
_replace_columns!(df, select(df, args..., copycols=true, renamecols=renamecols))

function select!(@nospecialize(arg::Base.Callable), df::AbstractDataFrame; renamecols::Bool=true)
select!(df::DataFrame, @nospecialize(args...);
renamecols::Bool=true, threads::Bool=true) =
_replace_columns!(df, select(df, args..., copycols=false,
renamecols=renamecols, threads=threads))

select!(df::SubDataFrame, @nospecialize(args...);
renamecols::Bool=true, threads::Bool=true) =
_replace_columns!(df, select(df, args..., copycols=true,
renamecols=renamecols, threads=threads))

function select!(@nospecialize(arg::Base.Callable), df::AbstractDataFrame;
renamecols::Bool=true, threads::Bool=true)
if arg isa Colon
throw(ArgumentError("First argument must be a transformation if the second argument is a data frame"))
end
return select!(df, arg)
return select!(df, arg, threads=threads)
end

"""
transform!(df::AbstractDataFrame, args...; renamecols::Bool=true)
transform!(args::Callable, df::AbstractDataFrame; renamecols::Bool=true)
transform!(gd::GroupedDataFrame, args...; ungroup::Bool=true, renamecols::Bool=true)
transform!(f::Base.Callable, gd::GroupedDataFrame; ungroup::Bool=true, renamecols::Bool=true)
transform!(df::AbstractDataFrame, args...;
renamecols::Bool=true, threads::Bool=true)
transform!(args::Callable, df::AbstractDataFrame;
renamecols::Bool=true, threads::Bool=true)
transform!(gd::GroupedDataFrame, args...;
ungroup::Bool=true, renamecols::Bool=true, threads::Bool=true)
transform!(f::Base.Callable, gd::GroupedDataFrame;
ungroup::Bool=true, renamecols::Bool=true, threads::Bool=true)

Mutate `df` or `gd` in place to add columns specified by `args...` and return it.
The result is guaranteed to have the same number of rows as `df`.
Expand All @@ -940,26 +957,36 @@ $TRANSFORMATION_COMMON_RULES
column names should include the name of transformation functions or not.
- `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data
frame or a `GroupedDataFrame`.
- `threads::Bool=true` : whether transformations may be run in separate tasks which
can execute in parallel (possibly being applied to multiple rows or groups at the same time).
Whether or not tasks are actually spawned and their number are determined automatically.
Set to `false` if some transformations require serial execution or are not thread-safe.

See [`select`](@ref) for examples.
"""
transform!(df::AbstractDataFrame, @nospecialize(args...); renamecols::Bool=true) =
select!(df, :, args..., renamecols=renamecols)
transform!(df::AbstractDataFrame, @nospecialize(args...);
renamecols::Bool=true, threads::Bool=true) =
select!(df, :, args..., renamecols=renamecols, threads=threads)

function transform!(@nospecialize(arg::Base.Callable), df::AbstractDataFrame; renamecols::Bool=true)
function transform!(@nospecialize(arg::Base.Callable), df::AbstractDataFrame;
renamecols::Bool=true, threads::Bool=true)
if arg isa Colon
throw(ArgumentError("First argument must be a transformation if the second argument is a data frame"))
end
return transform!(df, arg)
return transform!(df, arg, threads=threads)
end

"""
select(df::AbstractDataFrame, args...; copycols::Bool=true, renamecols::Bool=true)
select(args::Callable, df::DataFrame; renamecols::Bool=true)
select(gd::GroupedDataFrame, args...; copycols::Bool=true, keepkeys::Bool=true,
ungroup::Bool=true, renamecols::Bool=true)
select(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true,
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true)
select(df::AbstractDataFrame, args...;
copycols::Bool=true, renamecols::Bool=true, threads::Bool=true)
select(args::Callable, df::DataFrame;
renamecols::Bool=true, threads::Bool=true)
select(gd::GroupedDataFrame, args...;
copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true,
renamecols::Bool=true, threads::Bool=true)
select(f::Base.Callable, gd::GroupedDataFrame;
copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true,
renamecols::Bool=true, threads::Bool=true)

Create a new data frame that contains columns from `df` or `gd` specified by
`args` and return it. The result is guaranteed to have the same number of rows
Expand All @@ -977,6 +1004,11 @@ $TRANSFORMATION_COMMON_RULES
data frame.
- `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data
frame or a `GroupedDataFrame`.
- `threads::Bool=true` : whether transformations may be run in separate tasks which
can execute in parallel (possibly being applied to multiple rows or groups at the same time).
Whether or not tasks are actually spawned and their number are determined automatically.
Set to `false` if some transformations require serial execution or are not thread-safe.


# Examples
```jldoctest
Expand Down Expand Up @@ -1230,24 +1262,30 @@ julia> select(gd, nrow, proprow, groupindices, eachindex)
8 │ 2 3 0.375 2 3
```
"""
select(df::AbstractDataFrame, @nospecialize(args...); copycols::Bool=true, renamecols::Bool=true) =
select(df::AbstractDataFrame, @nospecialize(args...);
copycols::Bool=true, renamecols::Bool=true, threads::Bool=true) =
manipulate(df, map(x -> broadcast_pair(df, x), args)...,
copycols=copycols, keeprows=true, renamecols=renamecols)

function select(@nospecialize(arg::Base.Callable), df::AbstractDataFrame; renamecols::Bool=true)
function select(@nospecialize(arg::Base.Callable), df::AbstractDataFrame;
renamecols::Bool=true, threads::Bool=true)
if arg isa Colon
throw(ArgumentError("First argument must be a transformation if the second argument is a data frame"))
end
return select(df, arg)
return select(df, arg, threads=threads)
end

"""
transform(df::AbstractDataFrame, args...; copycols::Bool=true, renamecols::Bool=true)
transform(f::Callable, df::DataFrame; renamecols::Bool=true)
transform(gd::GroupedDataFrame, args...; copycols::Bool=true,
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true)
transform(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true,
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true)
transform(df::AbstractDataFrame, args...;
copycols::Bool=true, renamecols::Bool=true, threads::Bool=true)
transform(f::Callable, df::DataFrame;
renamecols::Bool=true, threads::Bool=true)
transform(gd::GroupedDataFrame, args...;
copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true,
renamecols::Bool=true, threads::Bool=true)
transform(f::Base.Callable, gd::GroupedDataFrame;
copycols::Bool=true, keepkeys::Bool=true, ungroup::Bool=true,
renamecols::Bool=true, threads::Bool=true)

Create a new data frame that contains columns from `df` or `gd` plus columns
specified by `args` and return it. The result is guaranteed to have the same
Expand All @@ -1264,6 +1302,11 @@ $TRANSFORMATION_COMMON_RULES
data frame.
- `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data
frame or a `GroupedDataFrame`.
- `threads::Bool=true` : whether transformations may be run in separate tasks which
can execute in parallel (possibly being applied to multiple rows or groups at the same time).
Whether or not tasks are actually spawned and their number are determined automatically.
Set to `false` if some transformations require serial execution or are not thread-safe.


Note that when the first argument is a `GroupedDataFrame`, `keepkeys=false`
is needed to be able to return a different value for the grouping column:
Expand Down Expand Up @@ -1297,23 +1340,30 @@ ERROR: ArgumentError: column :x in returned data frame is not equal to grouping

See [`select`](@ref) for more examples.
"""
transform(df::AbstractDataFrame, @nospecialize(args...); copycols::Bool=true, renamecols::Bool=true) =
select(df, :, args..., copycols=copycols, renamecols=renamecols)
transform(df::AbstractDataFrame, @nospecialize(args...);
copycols::Bool=true, renamecols::Bool=true, threads::Bool=true) =
select(df, :, args..., copycols=copycols,
renamecols=renamecols, threads=threads)

function transform(@nospecialize(arg::Base.Callable), df::AbstractDataFrame; renamecols::Bool=true)
function transform(@nospecialize(arg::Base.Callable), df::AbstractDataFrame;
renamecols::Bool=true, threads::Bool=true)
if arg isa Colon
throw(ArgumentError("First argument to must be a transformation if the second argument is a data frame"))
end
return transform(df, arg)
return transform(df, arg, threads=threads)
end

"""
combine(df::AbstractDataFrame, args...; renamecols::Bool=true)
combine(f::Callable, df::AbstractDataFrame; renamecols::Bool=true)
combine(df::AbstractDataFrame, args...;
renamecols::Bool=true, threads::Bool=true)
combine(f::Callable, df::AbstractDataFrame;
renamecols::Bool=true, threads::Bool=true)
combine(gd::GroupedDataFrame, args...;
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true)
keepkeys::Bool=true, ungroup::Bool=true,
renamecols::Bool=true, threads::Bool=true)
combine(f::Base.Callable, gd::GroupedDataFrame;
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true)
keepkeys::Bool=true, ungroup::Bool=true,
renamecols::Bool=true, threads::Bool=true)

Create a new data frame that contains columns from `df` or `gd` specified by
`args` and return it. The result can have any number of rows that is determined
Expand All @@ -1328,6 +1378,11 @@ $TRANSFORMATION_COMMON_RULES
data frame.
- `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data
frame or a `GroupedDataFrame`.
- `threads::Bool=true` : whether transformations may be run in separate tasks which
can execute in parallel (possibly being applied to multiple rows or groups at the same time).
Whether or not tasks are actually spawned and their number are determined automatically.
Set to `false` if some transformations require serial execution or are not thread-safe.


# Examples
```jldoctest
Expand Down Expand Up @@ -1575,18 +1630,21 @@ julia> combine(gd, :, AsTable(Not(:a)) => sum, renamecols=false)
8 │ 4 1 8 9
```
"""
combine(df::AbstractDataFrame, @nospecialize(args...); renamecols::Bool=true) =
combine(df::AbstractDataFrame, @nospecialize(args...);
renamecols::Bool=true, threads::Bool=true) =
manipulate(df, map(x -> broadcast_pair(df, x), args)...,
copycols=true, keeprows=false, renamecols=renamecols)

function combine(@nospecialize(arg::Base.Callable), df::AbstractDataFrame; renamecols::Bool=true)
function combine(@nospecialize(arg::Base.Callable), df::AbstractDataFrame;
renamecols::Bool=true, threads::Bool=true)
if arg isa Colon
throw(ArgumentError("First argument to select! must be a transformation if the second argument is a data frame"))
end
return combine(df, arg)
return combine(df, arg, threads=threads)
end

combine(@nospecialize(f::Pair), gd::AbstractDataFrame; renamecols::Bool=true) =
combine(@nospecialize(f::Pair), gd::AbstractDataFrame;
renamecols::Bool=true, threads::Bool=true) =
throw(ArgumentError("First argument must be a transformation if the second argument is a data frame. " *
"You can pass a `Pair` as the second argument of the transformation. If you want the return " *
"value to be processed as having multiple columns add `=> AsTable` suffix to the pair."))
Expand All @@ -1601,7 +1659,7 @@ function manipulate(df::DataFrame, @nospecialize(cs...); copycols::Bool, keeprow
end
end
return _manipulate(df, Any[normalize_selection(index(df), make_pair_concrete(c), renamecols) for c in cs_vec],
copycols, keeprows)
copycols, keeprows)
end

function _manipulate(df::AbstractDataFrame, normalized_cs::Vector{Any}, copycols::Bool, keeprows::Bool)
Expand Down
Loading