Skip to content

Commit

Permalink
Merge pull request JuliaLang/julia#42255 from JuliaLang/backports-rel…
Browse files Browse the repository at this point in the history
…ease-1.7

release-1.7: Backports for 1.7-rc2
  • Loading branch information
KristofferC authored Oct 19, 2021
2 parents 7156ef2 + 75bd990 commit 30cae5b
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 63 deletions.
8 changes: 4 additions & 4 deletions src/Distributed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ function _require_callback(mod::Base.PkgId)
end
end

const REF_ID = Ref(1)
next_ref_id() = (id = REF_ID[]; REF_ID[] = id+1; id)
const REF_ID = Threads.Atomic{Int}(1)
next_ref_id() = Threads.atomic_add!(REF_ID, 1)

struct RRID
whence::Int
id::Int

RRID() = RRID(myid(),next_ref_id())
RRID(whence, id) = new(whence,id)
RRID() = RRID(myid(), next_ref_id())
RRID(whence, id) = new(whence, id)
end

hash(r::RRID, h::UInt) = hash(r.whence, hash(r.id, h))
Expand Down
11 changes: 8 additions & 3 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,10 @@ end
@enum WorkerState W_CREATED W_CONNECTED W_TERMINATING W_TERMINATED
mutable struct Worker
id::Int
del_msgs::Array{Any,1}
msg_lock::Threads.ReentrantLock # Lock for del_msgs, add_msgs, and gcflag
del_msgs::Array{Any,1} # XXX: Could del_msgs and add_msgs be Channels?
add_msgs::Array{Any,1}
gcflag::Bool
@atomic gcflag::Bool
state::WorkerState
c_state::Condition # wait for state changes
ct_time::Float64 # creation time
Expand Down Expand Up @@ -133,7 +134,7 @@ mutable struct Worker
if haskey(map_pid_wrkr, id)
return map_pid_wrkr[id]
end
w=new(id, [], [], false, W_CREATED, Condition(), time(), conn_func)
w=new(id, Threads.ReentrantLock(), [], [], false, W_CREATED, Condition(), time(), conn_func)
w.initialized = Event()
register_worker(w)
w
Expand Down Expand Up @@ -471,6 +472,10 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
# The `launch` method should add an object of type WorkerConfig for every
# worker launched. It provides information required on how to connect
# to it.

# FIXME: launched should be a Channel, launch_ntfy should be a Threads.Condition
# but both are part of the public interface. This means we currently can't use
# `Threads.@spawn` in the code below.
launched = WorkerConfig[]
launch_ntfy = Condition()

Expand Down
10 changes: 3 additions & 7 deletions src/macros.jl
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

let nextidx = 0
let nextidx = Threads.Atomic{Int}(0)
global nextproc
function nextproc()
p = -1
if p == -1
p = workers()[(nextidx % nworkers()) + 1]
nextidx += 1
end
p
idx = Threads.atomic_add!(nextidx, 1)
return workers()[(idx % nworkers()) + 1]
end
end

Expand Down
39 changes: 23 additions & 16 deletions src/messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -126,23 +126,30 @@ function flush_gc_msgs(w::Worker)
if !isdefined(w, :w_stream)
return
end
w.gcflag = false
new_array = Any[]
msgs = w.add_msgs
w.add_msgs = new_array
if !isempty(msgs)
remote_do(add_clients, w, msgs)
end
add_msgs = nothing
del_msgs = nothing
@lock w.msg_lock begin
if !w.gcflag # No work needed for this worker
return
end
@atomic w.gcflag = false
if !isempty(w.add_msgs)
add_msgs = w.add_msgs
w.add_msgs = Any[]
end

# del_msgs gets populated by finalizers, so be very careful here about ordering of allocations
# XXX: threading requires this to be atomic
new_array = Any[]
msgs = w.del_msgs
w.del_msgs = new_array
if !isempty(msgs)
#print("sending delete of $msgs\n")
remote_do(del_clients, w, msgs)
if !isempty(w.del_msgs)
del_msgs = w.del_msgs
w.del_msgs = Any[]
end
end
if add_msgs !== nothing
remote_do(add_clients, w, add_msgs)
end
if del_msgs !== nothing
remote_do(del_clients, w, del_msgs)
end
return
end

# Boundary inserted between messages on the wire, used for recovering
Expand Down Expand Up @@ -187,7 +194,7 @@ end
function flush_gc_msgs()
try
for w in (PGRP::ProcessGroup).workers
if isa(w,Worker) && w.gcflag && (w.state == W_CONNECTED)
if isa(w,Worker) && (w.state == W_CONNECTED) && w.gcflag
flush_gc_msgs(w)
end
end
Expand Down
113 changes: 84 additions & 29 deletions src/remotecall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,24 @@ end

function finalize_ref(r::AbstractRemoteRef)
if r.where > 0 # Handle the case of the finalizer having been called manually
if islocked(client_refs)
# delay finalizer for later, when it's not already locked
if trylock(client_refs.lock) # trylock doesn't call wait which causes yields
try
delete!(client_refs.ht, r) # direct removal avoiding locks
if isa(r, RemoteChannel)
send_del_client_no_lock(r)
else
# send_del_client only if the reference has not been set
r.v === nothing && send_del_client_no_lock(r)
r.v = nothing
end
r.where = 0
finally
unlock(client_refs.lock)
end
else
finalizer(finalize_ref, r)
return nothing
end
delete!(client_refs, r)
if isa(r, RemoteChannel)
send_del_client(r)
else
# send_del_client only if the reference has not been set
r.v === nothing && send_del_client(r)
r.v = nothing
end
r.where = 0
end
nothing
end
Expand Down Expand Up @@ -229,13 +233,18 @@ del_client(rr::AbstractRemoteRef) = del_client(remoteref_id(rr), myid())
del_client(id, client) = del_client(PGRP, id, client)
function del_client(pg, id, client)
lock(client_refs) do
rv = get(pg.refs, id, false)
if rv !== false
delete!(rv.clientset, client)
if isempty(rv.clientset)
delete!(pg.refs, id)
#print("$(myid()) collected $id\n")
end
_del_client(pg, id, client)
end
nothing
end

function _del_client(pg, id, client)
rv = get(pg.refs, id, false)
if rv !== false
delete!(rv.clientset, client)
if isempty(rv.clientset)
delete!(pg.refs, id)
#print("$(myid()) collected $id\n")
end
end
nothing
Expand All @@ -247,25 +256,67 @@ function del_clients(pairs::Vector)
end
end

const any_gc_flag = Condition()
# The task below is coalescing the `flush_gc_msgs` call
# across multiple producers, see `send_del_client`,
# and `send_add_client`.
# XXX: Is this worth the additional complexity?
# `flush_gc_msgs` has to iterate over all connected workers.
const any_gc_flag = Threads.Condition()
function start_gc_msgs_task()
errormonitor(@async while true
wait(any_gc_flag)
flush_gc_msgs()
end)
errormonitor(
Threads.@spawn begin
while true
lock(any_gc_flag) do
# this might miss events
wait(any_gc_flag)
end
flush_gc_msgs() # handles throws internally
end
end
)
end

# Function can be called within a finalizer
function send_del_client(rr)
if rr.where == myid()
del_client(rr)
elseif id_in_procs(rr.where) # process only if a valid worker
w = worker_from_id(rr.where)::Worker
push!(w.del_msgs, (remoteref_id(rr), myid()))
w.gcflag = true
process_worker(rr)
end
end

function send_del_client_no_lock(rr)
# for gc context to avoid yields
if rr.where == myid()
_del_client(PGRP, remoteref_id(rr), myid())
elseif id_in_procs(rr.where) # process only if a valid worker
process_worker(rr)
end
end

function publish_del_msg!(w::Worker, msg)
lock(w.msg_lock) do
push!(w.del_msgs, msg)
@atomic w.gcflag = true
end
lock(any_gc_flag) do
notify(any_gc_flag)
end
end

function process_worker(rr)
w = worker_from_id(rr.where)::Worker
msg = (remoteref_id(rr), myid())

# Needs to aquire a lock on the del_msg queue
T = Threads.@spawn begin
publish_del_msg!($w, $msg)
end
Base.errormonitor(T)

return
end

function add_client(id, client)
lock(client_refs) do
rv = lookup_ref(id)
Expand All @@ -288,9 +339,13 @@ function send_add_client(rr::AbstractRemoteRef, i)
# to the processor that owns the remote ref. it will add_client
# itself inside deserialize().
w = worker_from_id(rr.where)
push!(w.add_msgs, (remoteref_id(rr), i))
w.gcflag = true
notify(any_gc_flag)
lock(w.msg_lock) do
push!(w.add_msgs, (remoteref_id(rr), i))
@atomic w.gcflag = true
end
lock(any_gc_flag) do
notify(any_gc_flag)
end
end
end

Expand Down
19 changes: 15 additions & 4 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ end
testf(id_me)
testf(id_other)

function poll_while(f::Function; timeout_seconds::Integer = 60)
start_time = time_ns()
while f()
sleep(1)
if ( ( time_ns() - start_time )/1e9 ) > timeout_seconds
@error "Timed out" timeout_seconds
return false
end
end
return true
end

# Distributed GC tests for Futures
function test_futures_dgc(id)
f = remotecall(myid, id)
Expand All @@ -143,8 +155,7 @@ function test_futures_dgc(id)
@test fetch(f) == id
@test f.v !== nothing
yield(); # flush gc msgs
@test remotecall_fetch(k->(yield();haskey(Distributed.PGRP.refs, k)), id, fid) == false

@test poll_while(() -> remotecall_fetch(k->(yield();haskey(Distributed.PGRP.refs, k)), id, fid))

# if unfetched, it should be deleted after a finalize
f = remotecall(myid, id)
Expand All @@ -153,7 +164,7 @@ function test_futures_dgc(id)
@test f.v === nothing
finalize(f)
yield(); # flush gc msgs
@test remotecall_fetch(k->(yield();haskey(Distributed.PGRP.refs, k)), id, fid) == false
@test poll_while(() -> remotecall_fetch(k->(yield();haskey(Distributed.PGRP.refs, k)), id, fid))
end

test_futures_dgc(id_me)
Expand Down Expand Up @@ -243,7 +254,7 @@ function test_remoteref_dgc(id)
@test remotecall_fetch(k->(yield();haskey(Distributed.PGRP.refs, k)), id, rrid) == true
finalize(rr)
yield(); # flush gc msgs
@test remotecall_fetch(k->(yield();haskey(Distributed.PGRP.refs, k)), id, rrid) == false
@test poll_while(() -> remotecall_fetch(k->(yield();haskey(Distributed.PGRP.refs, k)), id, rrid))
end
test_remoteref_dgc(id_me)
test_remoteref_dgc(id_other)
Expand Down

0 comments on commit 30cae5b

Please sign in to comment.