Skip to content

Commit

Permalink
Add a :dynamic scheduling option for Threads.@threads (JuliaLang#…
Browse files Browse the repository at this point in the history
…43919)

Co-authored-by: Julian Samaroo <jpsamaroo@jpsamaroo.me>
Co-authored-by: Takafumi Arakaki <29282+tkf@users.noreply.github.com>
Co-authored-by: Valentin Churavy <vchuravy@users.noreply.github.com>
  • Loading branch information
4 people authored and pull[bot] committed Apr 14, 2022
1 parent b174ed8 commit 02b3f81
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 17 deletions.
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ Command-line option changes
Multi-threading changes
-----------------------

* A new `:dynamic` schedule option for `Threads.@threads` which is similar to the default behavior except iterations
will be scheduled dynamically to available worker threads rather than pinned to each thread. This option is more
composable with (possibly nested) `@spawn` and `@threads` loops ([#43919])

Build system changes
--------------------
Expand Down
85 changes: 72 additions & 13 deletions base/threadingconstructs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ See also: `BLAS.get_num_threads` and `BLAS.set_num_threads` in the
"""
nthreads() = Int(unsafe_load(cglobal(:jl_n_threads, Cint)))

function threading_run(func)
function threading_run(fun, static)
ccall(:jl_enter_threaded_region, Cvoid, ())
n = nthreads()
tasks = Vector{Task}(undef, n)
for i = 1:n
t = Task(func)
t.sticky = true
ccall(:jl_set_task_tid, Cint, (Any, Cint), t, i-1)
t = Task(() -> fun(i)) # pass in tid
t.sticky = static
static && ccall(:jl_set_task_tid, Cint, (Any, Cint), t, i-1)
tasks[i] = t
schedule(t)
end
Expand All @@ -48,15 +48,14 @@ function _threadsfor(iter, lbody, schedule)
quote
local threadsfor_fun
let range = $(esc(range))
function threadsfor_fun(onethread=false)
function threadsfor_fun(tid=1; onethread=false)
r = range # Load into local variable
lenr = length(r)
# divide loop iterations among threads
if onethread
tid = 1
len, rem = lenr, 0
else
tid = threadid()
len, rem = divrem(lenr, nthreads())
end
# not enough iterations for all the threads?
Expand Down Expand Up @@ -86,15 +85,17 @@ function _threadsfor(iter, lbody, schedule)
end
end
end
if ccall(:jl_in_threaded_region, Cint, ()) != 0
if $(schedule === :dynamic)
threading_run(threadsfor_fun, false)
elseif ccall(:jl_in_threaded_region, Cint, ()) != 0
$(if schedule === :static
:(error("`@threads :static` cannot be used concurrently or nested"))
else
# only use threads when called from outside @threads
:(threadsfor_fun(true))
:(threadsfor_fun(onethread = true))
end)
else
threading_run(threadsfor_fun)
threading_run(threadsfor_fun, true)
end
nothing
end
Expand All @@ -110,15 +111,73 @@ A barrier is placed at the end of the loop which waits for all tasks to finish
execution.
The `schedule` argument can be used to request a particular scheduling policy.
The only currently supported value is `:static`, which creates one task per thread
and divides the iterations equally among them. Specifying `:static` is an error
if used from inside another `@threads` loop or from a thread other than 1.
Except for `:static` scheduling, how the iterations are assigned to tasks, and how the tasks
are assigned to the worker threads is undefined. The exact assignments can be different
for each execution. The scheduling option is a hint. The loop body code (including any code
transitively called from it) must not make assumptions about the distribution of iterations
to tasks or the worker thread in which they are executed. The loop body for each iteration
must be able to make forward progress independent of other iterations and be free from data
races. As such, synchronizations across iterations may deadlock.
For example, the above conditions imply that:
- The lock taken in an iteration *must* be released within the same iteration.
- Communicating between iterations using blocking primitives like `Channel`s is incorrect.
- Write only to locations not shared across iterations (unless a lock or atomic operation is used).
Schedule options are:
- `:static` creates one task per thread and divides the iterations equally among
them, assigning each task specifically to each thread.
Specifying `:static` is an error if used from inside another `@threads` loop
or from a thread other than 1.
- `:dynamic` will schedule iterations dynamically to available worker threads,
assuming that the workload for each iteration is uniform.
Without the scheduler argument, the exact scheduling is unspecified; i.e. it may be
different across Julia releases. Currently, the behavior is dependent on the calling thread.
The default is `:static` when called from thread 1. The loop will be executed without threading
when called from other threads.
The default schedule (used when no `schedule` argument is present) is subject to change.
For example, an illustration of the different scheduling strategies where `busywait`
is a non-yielding timed loop that runs for a number of seconds.
```julia-repl
julia> function busywait(seconds)
tstart = time_ns()
while (time_ns() - tstart) / 1e9 < seconds
end
end
julia> @time begin
Threads.@spawn busywait(5)
Threads.@threads :static for i in 1:Threads.nthreads()
busywait(1)
end
end
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)
julia> @time begin
Threads.@spawn busywait(5)
Threads.@threads :dynamic for i in 1:Threads.nthreads()
busywait(1)
end
end
2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time)
```
The `:dynamic` example takes 2 seconds since one of the non-occupied threads is able
to run two of the 1-second iterations to complete the for loop.
!!! compat "Julia 1.5"
The `schedule` argument is available as of Julia 1.5.
!!! compat "Julia 1.8"
The `:dynamic` option for the `schedule` argument is available as of Julia 1.8.
See also: [`@spawn`](@ref Threads.@spawn), [`nthreads()`](@ref Threads.nthreads),
[`threadid()`](@ref Threads.threadid), `pmap` in [`Distributed`](@ref man-distributed),
`BLAS.set_num_threads` in [`LinearAlgebra`](@ref man-linalg).
Expand All @@ -133,7 +192,7 @@ macro threads(args...)
# for now only allow quoted symbols
sched = nothing
end
if sched !== :static
if sched !== :static && sched !== :dynamic
throw(ArgumentError("unsupported schedule argument in @threads"))
end
elseif na == 1
Expand Down
61 changes: 57 additions & 4 deletions test/threads_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -722,15 +722,68 @@ let a = zeros(nthreads())
end

# static schedule
function _atthreads_static_schedule()
function _atthreads_static_schedule(n)
ids = zeros(Int, n)
Threads.@threads :static for i = 1:n
ids[i] = Threads.threadid()
end
return ids
end
@test _atthreads_static_schedule(nthreads()) == 1:nthreads()
@test _atthreads_static_schedule(1) == [1;]
@test_throws(
"`@threads :static` cannot be used concurrently or nested",
@threads(for i = 1:1; _atthreads_static_schedule(nthreads()); end),
)

# dynamic schedule
function _atthreads_dynamic_schedule(n)
inc = Threads.Atomic{Int}(0)
flags = zeros(Int, n)
Threads.@threads :dynamic for i = 1:n
Threads.atomic_add!(inc, 1)
flags[i] = 1
end
return inc[], flags
end
@test _atthreads_dynamic_schedule(nthreads()) == (nthreads(), ones(nthreads()))
@test _atthreads_dynamic_schedule(1) == (1, ones(1))
@test _atthreads_dynamic_schedule(10) == (10, ones(10))
@test _atthreads_dynamic_schedule(nthreads() * 2) == (nthreads() * 2, ones(nthreads() * 2))

# nested dynamic schedule
function _atthreads_dynamic_dynamic_schedule()
inc = Threads.Atomic{Int}(0)
Threads.@threads :dynamic for _ = 1:nthreads()
Threads.@threads :dynamic for _ = 1:nthreads()
Threads.atomic_add!(inc, 1)
end
end
return inc[]
end
@test _atthreads_dynamic_dynamic_schedule() == nthreads() * nthreads()

function _atthreads_static_dynamic_schedule()
ids = zeros(Int, nthreads())
inc = Threads.Atomic{Int}(0)
Threads.@threads :static for i = 1:nthreads()
ids[i] = Threads.threadid()
Threads.@threads :dynamic for _ = 1:nthreads()
Threads.atomic_add!(inc, 1)
end
end
return ids
return ids, inc[]
end
@test _atthreads_static_dynamic_schedule() == (1:nthreads(), nthreads() * nthreads())

# errors inside @threads :dynamic
function _atthreads_dynamic_with_error(a)
Threads.@threads :dynamic for i in eachindex(a)
error("user error in the loop body")
end
a
end
@test _atthreads_static_schedule() == [1:nthreads();]
@test_throws TaskFailedException @threads for i = 1:1; _atthreads_static_schedule(); end
@test_throws "user error in the loop body" _atthreads_dynamic_with_error(zeros(nthreads()))

try
@macroexpand @threads(for i = 1:10, j = 1:10; end)
Expand Down

0 comments on commit 02b3f81

Please sign in to comment.