Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a keyword argument to disable multithreading #3030

Merged
merged 24 commits into from
Jun 12, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
* Add `allcombinations` function that returns a data frame created
from all combinations of the passed vectors
([#3031](https://github.com/JuliaData/DataFrames.jl/pull/3031))
* New `multithreaded` argument allows disabling multithreading in
`combine`, `select`, `select!`, `transform`, `transform!`, `subset` and `subset!`
([#3030](https://github.com/JuliaData/DataFrames.jl/pull/3030))

## Previously announced breaking changes

Expand Down
9 changes: 6 additions & 3 deletions docs/src/lib/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ CurrentModule = DataFrames

# Functions

## Multi-threading support
## Multithreading support

Selected operations in DataFrames.jl automatically use multiple threads when available.
It is task-based and implemented using the `@spawn` macro from Julia Base.
By default, selected operations in DataFrames.jl automatically use multiple threads
when available. It is task-based and implemented using the `@spawn` macro from Julia Base.
Functions that take user-defined functions and may run it in parallel
accept a `multithreaded` keyword argument which allows disabling multithreading
when the provided function requires serial execution or is not thread-safe.

This is a list of operations that currently make use of multi-threading:
- `DataFrame` constructor with `copycols=true`; also recursively all functions
Expand Down
2 changes: 2 additions & 0 deletions docs/src/lib/internals.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ getmaxwidths
ourshow
ourstrwidth
@spawn_for_chunks
@spawn_or_run_task
@spawn_or_run
default_table_transformation
isreadonly
```
183 changes: 123 additions & 60 deletions src/abstractdataframe/selection.jl

Large diffs are not rendered by default.

52 changes: 35 additions & 17 deletions src/abstractdataframe/subset.jl
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ function assert_bool_vec(@nospecialize(fun))
end

function _get_subset_conditions(df::Union{AbstractDataFrame, GroupedDataFrame},
(args,)::Ref{Any}, skipmissing::Bool)
(args,)::Ref{Any}, skipmissing::Bool, multithreaded::Bool)
cs_vec = []
for v in map(x -> broadcast_pair(df isa GroupedDataFrame ? parent(df) : df, x), args)
if v isa AbstractVecOrMat{<:Pair}
Expand Down Expand Up @@ -106,10 +106,12 @@ function _get_subset_conditions(df::Union{AbstractDataFrame, GroupedDataFrame},
@assert !isempty(conditions)

if df isa AbstractDataFrame
df_conditions = select(df, conditions..., copycols=!(df isa DataFrame))
df_conditions = select(df, conditions...,
copycols=!(df isa DataFrame), multithreaded=multithreaded)
else
df_conditions = select(df, conditions...,
copycols=!(parent(df) isa DataFrame), keepkeys=false)
copycols=!(parent(df) isa DataFrame), keepkeys=false,
multithreaded=multithreaded)
end

@assert ncol(df_conditions) == length(conditions)
Expand Down Expand Up @@ -151,9 +153,11 @@ function _get_subset_conditions(df::Union{AbstractDataFrame, GroupedDataFrame},
end

"""
subset(df::AbstractDataFrame, args...; skipmissing::Bool=false, view::Bool=false)
subset(gdf::GroupedDataFrame, args...; skipmissing::Bool=false, view::Bool=false,
ungroup::Bool=true)
subset(df::AbstractDataFrame, args...;
skipmissing::Bool=false, view::Bool=false, multithreaded::Bool=true)
subset(gdf::GroupedDataFrame, args...;
skipmissing::Bool=false, view::Bool=false,
ungroup::Bool=true, multithreaded::Bool=true)

Return a copy of data frame `df` or parent of `gdf` containing only rows for
which all values produced by transformation(s) `args` for a given row are
Expand All @@ -180,6 +184,11 @@ If `view=true` a `SubDataFrame` view is returned instead of a `DataFrame`.
If `ungroup=false` the resulting data frame is re-grouped based on the same
grouping columns as `gdf` and a `GroupedDataFrame` is returned.

If `multithreaded=true` (the default) transformations may be run in separate tasks which
can execute in parallel (possibly being applied to multiple rows or at the same time).
Whether or not tasks are actually spawned and their number are determined automatically.
Set to `false` if some transformations require serial execution or are not thread-safe.

If a `GroupedDataFrame` is passed then it must include all groups present in the
`parent` data frame, like in [`select!`](@ref).

Expand Down Expand Up @@ -260,18 +269,19 @@ julia> subset(groupby(df, :y), :v => x -> minimum(x) > 5)
```
"""
function subset(df::AbstractDataFrame, @nospecialize(args...);
skipmissing::Bool=false, view::Bool=false)
skipmissing::Bool=false, view::Bool=false, multithreaded::Bool=true)
if isempty(args)
row_selector = axes(df, 1)
else
row_selector = _get_subset_conditions(df, Ref{Any}(args), skipmissing)
row_selector = _get_subset_conditions(df, Ref{Any}(args),
skipmissing, multithreaded)
end
return view ? Base.view(df, row_selector, :) : df[row_selector, :]
end

function subset(gdf::GroupedDataFrame, @nospecialize(args...);
skipmissing::Bool=false, view::Bool=false,
ungroup::Bool=true)
ungroup::Bool=true, multithreaded::Bool=true)
df = parent(gdf)
if isempty(args)
if nrow(parent(gdf)) > 0 && minimum(gdf.groups) == 0
Expand All @@ -281,17 +291,19 @@ function subset(gdf::GroupedDataFrame, @nospecialize(args...);
end
row_selector = axes(df, 1)
else
row_selector = _get_subset_conditions(gdf, Ref{Any}(args), skipmissing)
row_selector = _get_subset_conditions(gdf, Ref{Any}(args),
skipmissing, multithreaded)
end
res = view ? Base.view(df, row_selector, :) : df[row_selector, :]
# TODO: in some cases it might be faster to groupby gdf.groups[row_selector]
return ungroup ? res : groupby(res, groupcols(gdf))
end

"""
subset!(df::AbstractDataFrame, args...; skipmissing::Bool=false)
subset!(gdf::GroupedDataFrame{DataFrame}, args..., skipmissing::Bool=false,
ungroup::Bool=true)
subset!(df::AbstractDataFrame, args...;
skipmissing::Bool=false, multithreaded::Bool=true)
subset!(gdf::GroupedDataFrame{DataFrame}, args...;
skipmissing::Bool=false, ungroup::Bool=true, multithreaded::Bool=true)

Update data frame `df` or the parent of `gdf` in place to contain only rows for
which all values produced by transformation(s) `args` for a given row is `true`.
Expand All @@ -316,6 +328,11 @@ described for [`select`](@ref) with the restriction that:
If `ungroup=false` the passed `GroupedDataFrame` `gdf` is updated (preserving
the order of its groups) and returned.

If `multithreaded=true` (the default) transformations may be run in separate tasks which
can execute in parallel (possibly being applied to multiple rows or at the same time).
Whether or not tasks are actually spawned and their number are determined automatically.
Set to `false` if some transformations require serial execution or are not thread-safe.

If `GroupedDataFrame` is subsetted then it must include all groups present in
the `parent` data frame, like in [`select!`](@ref). In this case the passed
`GroupedDataFrame` is updated to have correct groups after its parent is
Expand Down Expand Up @@ -416,14 +433,15 @@ julia> df
2 │ 4 false false missing 12
```
"""
function subset!(df::AbstractDataFrame, @nospecialize(args...); skipmissing::Bool=false)
function subset!(df::AbstractDataFrame, @nospecialize(args...);
skipmissing::Bool=false, multithreaded::Bool=true)
isempty(args) && return df
row_selector = _get_subset_conditions(df, Ref{Any}(args), skipmissing)
row_selector = _get_subset_conditions(df, Ref{Any}(args), skipmissing, multithreaded)
return deleteat!(df, findall(!, row_selector))
end

function subset!(gdf::GroupedDataFrame, @nospecialize(args...); skipmissing::Bool=false,
ungroup::Bool=true)
ungroup::Bool=true, multithreaded::Bool=true)
df = parent(gdf)
if isempty(args)
if nrow(parent(gdf)) > 0 && minimum(gdf.groups) == 0
Expand All @@ -436,7 +454,7 @@ function subset!(gdf::GroupedDataFrame, @nospecialize(args...); skipmissing::Boo
ngroups = length(gdf)
groups = gdf.groups
lazy_lock = gdf.lazy_lock
row_selector = _get_subset_conditions(gdf, Ref{Any}(args), skipmissing)
row_selector = _get_subset_conditions(gdf, Ref{Any}(args), skipmissing, multithreaded)
res = deleteat!(df, findall(!, row_selector))
if nrow(res) == length(groups) # we have not removed any rows
return ungroup ? res : gdf
Expand Down
25 changes: 14 additions & 11 deletions src/groupeddataframe/complextransforms.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ _ncol(df::AbstractDataFrame) = ncol(df)
_ncol(x::Union{NamedTuple, DataFrameRow}) = length(x)

function _combine_multicol((firstres,)::Ref{Any}, wfun::Ref{Any}, gd::GroupedDataFrame,
wincols::Ref{Any})
wincols::Ref{Any}, multithreaded::Bool)
@assert only(wfun) isa Base.Callable
@assert only(wincols) isa Union{Nothing, AbstractVector, Tuple, NamedTuple}
firstmulticol = firstres isa MULTI_COLS_TYPE
Expand All @@ -17,13 +17,14 @@ function _combine_multicol((firstres,)::Ref{Any}, wfun::Ref{Any}, gd::GroupedDat
idx_agg = NOTHING_IDX_AGG
end
return _combine_with_first(Ref{Any}(wrap(firstres)), wfun, gd, wincols,
firstmulticol, idx_agg)
firstmulticol, idx_agg, multithreaded)
end

function _combine_with_first((first,)::Ref{Any},
(f,)::Ref{Any}, gd::GroupedDataFrame,
(incols,)::Ref{Any},
firstmulticol::Bool, idx_agg::Vector{Int})
firstmulticol::Bool, idx_agg::Vector{Int},
multithreaded::Bool)
@assert first isa Union{NamedTuple, DataFrameRow, AbstractDataFrame}
@assert f isa Base.Callable
@assert incols isa Union{Nothing, AbstractVector, Tuple, NamedTuple}
Expand Down Expand Up @@ -76,7 +77,8 @@ function _combine_with_first((first,)::Ref{Any},
gd,
Ref{Any}(incols),
Ref{Any}(targetcolnames),
firstmulticol)
firstmulticol,
multithreaded)
end
return idx, outcols, collect(Symbol, finalcolnames)
end
Expand Down Expand Up @@ -238,7 +240,8 @@ function _combine_rows_with_first!((firstrow,)::Ref{Any},
gd::GroupedDataFrame,
(incols,)::Ref{Any},
(colnames,)::Ref{Any},
firstmulticol::Bool)
firstmulticol::Bool,
multithreaded::Bool)
@assert firstrow isa Union{NamedTuple, DataFrameRow}
@assert outcols isa NTuple{N, AbstractVector} where N
@assert f isa Base.Callable
Expand All @@ -261,7 +264,7 @@ function _combine_rows_with_first!((firstrow,)::Ref{Any},
# Create up to one task per thread
# This has lower overhead than creating one task per group,
# but is optimal only if operations take roughly the same time for all groups
if VERSION >= v"1.4" && isthreadsafe(outcols, incols)
if VERSION >= v"1.4" && multithreaded && isthreadsafe(outcols, incols)
basesize = max(1, cld(len - 1, Threads.nthreads()))
partitions = Iterators.partition(2:len, basesize)
else
Expand All @@ -273,11 +276,11 @@ function _combine_rows_with_first!((firstrow,)::Ref{Any},
tasks = Vector{Task}(undef, length(partitions))
for (tid, idx) in enumerate(partitions)
tasks[tid] =
@spawn _combine_rows_with_first_task!(tid, first(idx), last(idx), first(idx),
outcols, outcolsref,
type_widened, widen_type_lock,
f, gd, starts, ends, incols, colnames,
firstcoltype(firstmulticol))
@spawn_or_run_task multithreaded _combine_rows_with_first_task!(tid, first(idx), last(idx), first(idx),
outcols, outcolsref,
type_widened, widen_type_lock,
f, gd, starts, ends, incols, colnames,
firstcoltype(firstmulticol))
end

# Workaround JuliaLang/julia#38931:
Expand Down
Loading