Skip to content

Commit

Permalink
Allow workers to have a user-given number identifier
Browse files Browse the repository at this point in the history
  • Loading branch information
nickrobinson251 committed Oct 21, 2024
1 parent 3766e93 commit 5278957
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 20 deletions.
13 changes: 6 additions & 7 deletions src/ReTestItems.jl
Original file line number Diff line number Diff line change
Expand Up @@ -471,18 +471,17 @@ function _runtests_in_current_env(
end

# Start a new `Worker` with `nworker_threads` threads and run `worker_init_expr` on it.
# The provided `worker_num` is only for logging purposes, and not persisted as part of the worker.
function start_worker(proj_name, nworker_threads::String, worker_init_expr::Expr, ntestitems::Int; worker_num=nothing)
w = Worker(; threads=nworker_threads)
i = worker_num == nothing ? "" : " $worker_num"
function start_worker(proj_name, nworker_threads::String, worker_init_expr::Expr, ntestitems::Int; worker_num)
w = Worker(; threads=nworker_threads, num=worker_num)
# remote_fetch here because we want to make sure the worker is all setup before starting to eval testitems
remote_fetch(w, quote
using ReTestItems, Test
Test.TESTSET_PRINT_ENABLE[] = false
const GLOBAL_TEST_CONTEXT = ReTestItems.TestContext($proj_name, $ntestitems)
GLOBAL_TEST_CONTEXT.setups_evaled = ReTestItems.TestSetupModules()
nthreads_str = $nworker_threads
@info "Starting test worker$($i) on pid = $(Libc.getpid()), with $nthreads_str threads"
num = $worker_num
@info "Starting test worker $(num) on pid=$(Libc.getpid()), with $(nthreads_str) threads"
$(worker_init_expr.args...)
nothing
end)
Expand Down Expand Up @@ -589,7 +588,7 @@ function manage_worker(
@warn "Memory usage ($(Base.Ryu.writefixed(memory_percent(), 1))%) is higher than threshold ($(Base.Ryu.writefixed(memory_threshold_percent, 1))%). Restarting process for worker $worker_num to try to free memory."
terminate!(worker)
wait(worker)
worker = robust_start_worker(proj_name, cfg.nworker_threads, cfg.worker_init_expr, ntestitems)
worker = robust_start_worker(proj_name, cfg.nworker_threads, cfg.worker_init_expr, ntestitems; worker_num)
end
testitem.workerid[] = worker.pid
timeout = something(testitem.timeout, cfg.testitem_timeout)
Expand Down Expand Up @@ -688,7 +687,7 @@ function manage_worker(
end
# The worker was terminated, so replace it unless there are no more testitems to run
if testitem !== nothing
worker = robust_start_worker(proj_name, cfg.nworker_threads, cfg.worker_init_expr, ntestitems)
worker = robust_start_worker(proj_name, cfg.nworker_threads, cfg.worker_init_expr, ntestitems; worker_num)
end
# Now loop back around to reschedule the testitem
continue
Expand Down
28 changes: 15 additions & 13 deletions src/workers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ Base.fetch(f::Future) = fetch(f.value)

mutable struct Worker
lock::ReentrantLock # protects the .futures field; no other fields are modified after construction
pid::Int
num::Int # user given ID
pid::Int # process ID
process::Base.Process
socket::TCPSocket
messages::Task
Expand All @@ -73,7 +74,7 @@ end
function terminate!(w::Worker, from::Symbol=:manual)
already_terminated = @atomicswap :monotonic w.terminated = true
if !already_terminated
@debug "terminating worker $(w.pid) from $from"
@debug "terminating $worker from $from"
end
wte = WorkerTerminatedException(w)
@lock w.lock begin
Expand Down Expand Up @@ -114,7 +115,7 @@ end
# Called when timeout_profile_wait is non-zero.
function trigger_profile(w::Worker, timeout_profile_wait, from::Symbol=:manual)
if !Sys.iswindows()
@debug "sending profile request to worker $(w.pid) from $from"
@debug "sending profile request to $worker from $from"
if Sys.islinux()
kill(w.process, 10) # SIGUSR1
elseif Sys.isbsd()
Expand Down Expand Up @@ -144,21 +145,22 @@ end
# wait until our spawned tasks have all finished
Base.wait(w::Worker) = fetch(w.process_watch) && fetch(w.messages) && fetch(w.output)

Base.show(io::IO, w::Worker) = print(io, "Worker(pid=$(w.pid)", w.terminated ? ", terminated=true, termsignal=$(w.process.termsignal)" : "", ")")
Base.show(io::IO, w::Worker) = print(io, "Worker(num=$(w.num), pid=$(w.pid)", w.terminated ? ", terminated=true, termsignal=$(w.process.termsignal)" : "", ")")

# used in testing to ensure all created workers are
# eventually cleaned up properly
const GLOBAL_CALLBACK_PER_WORKER = Ref{Any}()

function Worker(;
num::Int=rand(1:typemax(Int32)),
env::AbstractDict=ENV,
dir::String=pwd(),
threads::String="auto",
exeflags=`--threads=$threads`,
connect_timeout::Int=60,
worker_redirect_io::IO=stdout,
worker_redirect_fn=(io, pid, line)->println(io, " Worker $pid: $line")
)
worker_redirect_fn=(io, pid, line)->println(io, " Worker $num/$pid: $line")
)
# below copied from Distributed.launch
env = Dict{String, String}(env)
pathsep = Sys.iswindows() ? ";" : ":"
Expand Down Expand Up @@ -194,7 +196,7 @@ function Worker(;
return Sockets.connect(parse(Int, split(port_str, ':')[2]))
end
# create worker
w = Worker(ReentrantLock(), pid, proc, sock, Task(nothing), Task(nothing), Task(nothing), Dict{UInt64, Future}(), false)
w = Worker(ReentrantLock(), num, pid, proc, sock, Task(nothing), Task(nothing), Task(nothing), Dict{UInt64, Future}(), false)
## start a task to watch for worker process termination, notify the event when the task starts
e1 = Threads.Event()
w.process_watch = Threads.@spawn watch_and_terminate!(w, $e1)
Expand Down Expand Up @@ -233,7 +235,7 @@ function redirect_worker_output(io::IO, w::Worker, fn, proc, ev::Threads.Event)
end
end
catch e
# @error "Error redirecting worker output $(w.pid)" exception=(e, catch_backtrace())
# @error "Error redirecting $worker output" exception=(e, catch_backtrace())
terminate!(w, :redirect_worker_output)
e isa EOFError || e isa Base.IOError || rethrow()
finally
Expand All @@ -252,13 +254,13 @@ function process_responses(w::Worker, ev::Threads.Event)
while isopen(w.socket) && !w.terminated
# get the next Response from the worker
r = deserialize(w.socket)
@assert r isa Response "Received invalid response from worker $(w.pid): $(r)"
# println("Received response $(r) from worker $(w.pid)")
@assert r isa Response "Received invalid response from $worker: $(r)"
# println("Received response $(r) from $worker")
@lock lock begin
@assert haskey(reqs, r.id) "Received response for unknown request $(r.id) from worker $(w.pid)"
@assert haskey(reqs, r.id) "Received response for unknown request $(r.id) from $worker"
# look up the Future for this request
fut = pop!(reqs, r.id)
@assert !isready(fut.value) "Received duplicate response for request $(r.id) from worker $(w.pid)"
@assert !isready(fut.value) "Received duplicate response for request $(r.id) from $worker"
if r.error !== nothing
# this allows rethrowing the exception from the worker to the caller
close(fut.value, r.error)
Expand All @@ -268,7 +270,7 @@ function process_responses(w::Worker, ev::Threads.Event)
end
end
catch e
# @error "Error processing responses from worker $(w.pid)" exception=(e, catch_backtrace())
# @error "Error processing responses from $worker" exception=(e, catch_backtrace())
terminate!(w, :process_responses)
e isa EOFError || e isa Base.IOError || rethrow()
end
Expand Down

0 comments on commit 5278957

Please sign in to comment.