Skip to content

Commit

Permalink
Add threadpool support to runtime
Browse files Browse the repository at this point in the history
Adds support for Julia to be started with `--threads=auto|N[,M]` where
`N` specifies the number of threads in the default threadpool and `M`,
if provided, specifies the number of threads in the new interactive
threadpool.

Adds an optional first parameter to `Threads.@spawn`:
`[:default|:interactive]`. If `:interactive` is specified, the task will
be run by thread(s) in the interactive threadpool only (if there is
one).

Co-authored-by: K Pamnany <kpamnany@users.noreply.github.com>
  • Loading branch information
jpsamaroo and kpamnany committed Apr 19, 2022
1 parent b5871eb commit ccfa09c
Show file tree
Hide file tree
Showing 19 changed files with 398 additions and 96 deletions.
6 changes: 4 additions & 2 deletions base/options.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

# NOTE: This type needs to be kept in sync with jl_options in src/julia.h
# NOTE: This type needs to be kept in sync with jl_options in src/jloptions.h
struct JLOptions
quiet::Int8
banner::Int8
Expand All @@ -9,7 +9,9 @@ struct JLOptions
commands::Ptr{Ptr{UInt8}} # (e)eval, (E)print, (L)load
image_file::Ptr{UInt8}
cpu_target::Ptr{UInt8}
nthreads::Int32
nthreadpools::Int16
nthreads::Int16
nthreads_per_pool::Ptr{Int16}
nprocs::Int32
machine_file::Ptr{UInt8}
project::Ptr{UInt8}
Expand Down
78 changes: 45 additions & 33 deletions base/partr.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module Partr

using ..Threads: SpinLock, nthreads
using ..Threads: SpinLock, nthreads, threadid

# a task minheap
mutable struct taskheap
Expand All @@ -16,12 +16,13 @@ end

# multiqueue minheap state
const heap_d = UInt32(8)
global heaps::Vector{taskheap} = Vector{taskheap}(undef, 0)
const heaps_lock = SpinLock()
global cong_unbias::UInt32 = typemax(UInt32)
const heaps = [Vector{taskheap}(undef, 0), Vector{taskheap}(undef, 0)]
const heaps_lock = [SpinLock(), SpinLock()]
const cong_unbias = [typemax(UInt32), typemax(UInt32)]


cong(max::UInt32, unbias::UInt32) = ccall(:jl_rand_ptls, UInt32, (UInt32, UInt32), max, unbias) + UInt32(1)
cong(max::UInt32, unbias::UInt32) =
ccall(:jl_rand_ptls, UInt32, (UInt32, UInt32), max, unbias) + UInt32(1)

function unbias_cong(max::UInt32)
return typemax(UInt32) - ((typemax(UInt32) % max) + UInt32(1))
Expand Down Expand Up @@ -60,46 +61,52 @@ function multiq_sift_down(heap::taskheap, idx::Int32)
end


function multiq_size()
function multiq_size(tpid::Int8)
nt = UInt32(Threads._nthreads_in_pool(tpid))
tp = tpid + 1
tpheaps = heaps[tp]
heap_c = UInt32(2)
heap_p = UInt32(length(heaps))
nt = UInt32(nthreads())
heap_p = UInt32(length(tpheaps))

if heap_c * nt <= heap_p
return heap_p
end

@lock heaps_lock begin
heap_p = UInt32(length(heaps))
nt = UInt32(nthreads())
@lock heaps_lock[tp] begin
heap_p = UInt32(length(tpheaps))
nt = UInt32(Threads._nthreads_in_pool(tpid))
if heap_c * nt <= heap_p
return heap_p
end

heap_p += heap_c * nt
newheaps = Vector{taskheap}(undef, heap_p)
copyto!(newheaps, heaps)
for i = (1 + length(heaps)):heap_p
copyto!(newheaps, tpheaps)
for i = (1 + length(tpheaps)):heap_p
newheaps[i] = taskheap()
end
global heaps = newheaps
global cong_unbias = unbias_cong(heap_p)
heaps[tp] = newheaps
cong_unbias[tp] = unbias_cong(heap_p)
end

return heap_p
end


function multiq_insert(task::Task, priority::UInt16)
tpid = ccall(:jl_get_task_threadpoolid, Int8, (Any,), task)
heap_p = multiq_size(tpid)
tp = tpid + 1

task.priority = priority

heap_p = multiq_size()
rn = cong(heap_p, cong_unbias)
while !trylock(heaps[rn].lock)
rn = cong(heap_p, cong_unbias)
rn = cong(heap_p, cong_unbias[tp])
tpheaps = heaps[tp]
while !trylock(tpheaps[rn].lock)
rn = cong(heap_p, cong_unbias[tp])
end

heap = heaps[rn]
heap = tpheaps[rn]
if heap.ntasks >= length(heap.tasks)
resize!(heap.tasks, length(heap.tasks) * 2)
end
Expand All @@ -122,34 +129,37 @@ function multiq_deletemin()
local rn1, rn2
local prio1, prio2

tid = Threads.threadid()
tp = ccall(:jl_threadpoolid, Int8, (Int16,), tid-1) + 1
tpheaps = heaps[tp]

@label retry
GC.safepoint()
heap_p = UInt32(length(heaps))
heap_p = UInt32(length(tpheaps))
for i = UInt32(0):heap_p
if i == heap_p
return nothing
end
rn1 = cong(heap_p, cong_unbias)
rn2 = cong(heap_p, cong_unbias)
prio1 = heaps[rn1].priority
prio2 = heaps[rn2].priority
rn1 = cong(heap_p, cong_unbias[tp])
rn2 = cong(heap_p, cong_unbias[tp])
prio1 = tpheaps[rn1].priority
prio2 = tpheaps[rn2].priority
if prio1 > prio2
prio1 = prio2
rn1 = rn2
elseif prio1 == prio2 && prio1 == typemax(UInt16)
continue
end
if trylock(heaps[rn1].lock)
if prio1 == heaps[rn1].priority
if trylock(tpheaps[rn1].lock)
if prio1 == tpheaps[rn1].priority
break
end
unlock(heaps[rn1].lock)
unlock(tpheaps[rn1].lock)
end
end

heap = heaps[rn1]
heap = tpheaps[rn1]
task = heap.tasks[1]
tid = Threads.threadid()
if ccall(:jl_set_task_tid, Cint, (Any, Cint), task, tid-1) == 0
unlock(heap.lock)
@goto retry
Expand All @@ -171,9 +181,11 @@ end


function multiq_check_empty()
for i = UInt32(1):length(heaps)
if heaps[i].ntasks != 0
return false
for j = UInt32(1):length(heaps)
for i = UInt32(1):length(heaps[j])
if heaps[j][i].ntasks != 0
return false
end
end
end
return true
Expand Down
4 changes: 4 additions & 0 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ true
istaskfailed(t::Task) = (load_state_acquire(t) === task_state_failed)

Threads.threadid(t::Task) = Int(ccall(:jl_get_task_tid, Int16, (Any,), t)+1)
function Threads.threadpool(t::Task)
tpid = ccall(:jl_get_task_threadpoolid, Int8, (Any,), t)
return tpid == 0 ? :default : :interactive
end

task_result(t::Task) = t.result

Expand Down
2 changes: 1 addition & 1 deletion base/threadcall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const threadcall_restrictor = Semaphore(max_ccall_threads)
The `@threadcall` macro is called in the same way as [`ccall`](@ref) but does the work
in a different thread. This is useful when you want to call a blocking C
function without causing the main `julia` thread to become blocked. Concurrency
function without causing the current `julia` thread to become blocked. Concurrency
is limited by size of the libuv thread pool, which defaults to 4 threads but
can be increased by setting the `UV_THREADPOOL_SIZE` environment variable and
restarting the `julia` process.
Expand Down
107 changes: 87 additions & 20 deletions base/threadingconstructs.jl
Original file line number Diff line number Diff line change
@@ -1,27 +1,66 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

export threadid, nthreads, @threads, @spawn
export threadid, nthreads, @threads, @spawn,
threadpool, nthreadpools, nthreads_in_pool

"""
Threads.threadid()
Threads.threadid() -> Int
Get the ID number of the current thread of execution. The master thread has ID `1`.
Get the ID number of the current thread of execution. The master thread has
ID `1`.
"""
threadid() = Int(ccall(:jl_threadid, Int16, ())+1)

# Inclusive upper bound on threadid()
"""
Threads.nthreads()
Threads.nthreads() -> Int
Get the number of threads available to the Julia process. This is the inclusive upper bound
on [`threadid()`](@ref).
Get the number of threads (across all thread pools) available to the Julia
process. This is the inclusive upper bound on [`threadid()`](@ref).
See also: `BLAS.get_num_threads` and `BLAS.set_num_threads` in the
[`LinearAlgebra`](@ref man-linalg) standard library, and `nprocs()` in the
[`Distributed`](@ref man-distributed) standard library.
"""
nthreads() = Int(unsafe_load(cglobal(:jl_n_threads, Cint)))

"""
Threads.threadpool(tid = threadid()) -> Symbol
Returns the specified thread's threadpool; either `:default` or `:interactive`.
"""
function threadpool(tid = threadid())
tpid = ccall(:jl_threadpoolid, Int8, (Int16,), tid-1)
return tpid == 0 ? :default : :interactive
end

"""
Threads.nthreadpools() -> Int
Returns the number of threadpools currently configured.
"""
nthreadpools() = Int(unsafe_load(cglobal(:jl_n_threadpools, Cint)))

"""
Threads.nthreads_in_pool(pool::Symbol) -> Int
Returns the number of threads in the specified pool (`:default` or `:interactive`).
"""
function nthreads_in_pool(pool::Symbol)
if pool == :default
tpid = Int8(0)
elseif pool == :interactive
tpid = Int8(1)
else
error("invalid threadpool specified")
end
return _nthreads_in_pool(tpid)
end
function _nthreads_in_pool(tpid::Int8)
p = unsafe_load(cglobal(:jl_n_threads_per_pool, Ptr{Cint}))
return Int(unsafe_load(p, tpid + 1))
end


function threading_run(fun, static)
ccall(:jl_enter_threaded_region, Cvoid, ())
n = nthreads()
Expand All @@ -48,7 +87,7 @@ function _threadsfor(iter, lbody, schedule)
quote
local threadsfor_fun
let range = $(esc(range))
function threadsfor_fun(tid=1; onethread=false)
function threadsfor_fun(tid = 1; onethread = false)
r = range # Load into local variable
lenr = length(r)
# divide loop iterations among threads
Expand Down Expand Up @@ -232,35 +271,63 @@ macro threads(args...)
end

"""
Threads.@spawn expr
Threads.@spawn [:default|:interactive] expr
Create a [`Task`](@ref) and [`schedule`](@ref) it to run on any available thread.
The task is allocated to a thread after it becomes available. To wait for the task
to finish, call [`wait`](@ref) on the result of this macro, or call [`fetch`](@ref) to
wait and then obtain its return value.
Create a [`Task`](@ref) and [`schedule`](@ref) it to run on any available
thread in the specified threadpool (`:default` if unspecified). The task is
allocated to a thread once one becomes available. To wait for the task to
finish, call [`wait`](@ref) on the result of this macro, or call
[`fetch`](@ref) to wait and then obtain its return value.
Values can be interpolated into `@spawn` via `\$`, which copies the value directly into the
constructed underlying closure. This allows you to insert the _value_ of a variable,
isolating the asynchronous code from changes to the variable's value in the current task.
Values can be interpolated into `@spawn` via `\$`, which copies the value
directly into the constructed underlying closure. This allows you to insert
the _value_ of a variable, isolating the asynchronous code from changes to
the variable's value in the current task.
!!! note
See the manual chapter on threading for important caveats.
See the manual chapter on [multi-threading](@ref man-multi-threading)
for important caveats. See also the chapter on [threadpools](@ref man-threadpools).
!!! compat "Julia 1.3"
This macro is available as of Julia 1.3.
!!! compat "Julia 1.4"
Interpolating values via `\$` is available as of Julia 1.4.
!!! compat "Julia 1.9"
A threadpool may be specified as of Julia 1.9.
"""
macro spawn(expr)
letargs = Base._lift_one_interp!(expr)
macro spawn(args...)
tpid = Int8(0)
na = length(args)
if na == 2
ttype, ex = args
if ttype isa QuoteNode
ttype = ttype.value
elseif ttype isa Symbol
# TODO: allow unquoted symbols
ttype = nothing
end
if ttype === :interactive
tpid = Int8(1)
elseif ttype !== :default
throw(ArgumentError("unsupported threadpool in @spawn: $ttype"))
end
elseif na == 1
ex = args[1]
else
throw(ArgumentError("wrong number of arguments in @spawn"))
end

letargs = Base._lift_one_interp!(ex)

thunk = esc(:(()->($expr)))
thunk = esc(:(()->($ex)))
var = esc(Base.sync_varname)
quote
let $(letargs...)
local task = Task($thunk)
task.sticky = false
ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), task, $tpid)
if $(Expr(:islocal, var))
put!($var, task)
end
Expand Down
5 changes: 4 additions & 1 deletion doc/src/base/multi-threading.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ Base.Threads.foreach
Base.Threads.@spawn
Base.Threads.threadid
Base.Threads.nthreads
Base.Threads.threadpool
Base.Threads.nthreadpools
Base.Threads.nthreads_in_pool
```

See also [Multi-Threading](@ref man-multithreading).
Expand Down Expand Up @@ -49,7 +52,7 @@ Base.Threads.atomic_min!
Base.Threads.atomic_fence
```

## ccall using a threadpool (Experimental)
## ccall using a libuv threadpool (Experimental)

```@docs
Base.@threadcall
Expand Down
Loading

0 comments on commit ccfa09c

Please sign in to comment.