From 0f5290a17f39ac9bf817a23fb06459d6ded04efc Mon Sep 17 00:00:00 2001 From: tan Date: Tue, 13 Aug 2019 11:53:44 +0530 Subject: [PATCH] do not use myid() to differentiate master & worker Occasionally while adding a large number of workers and particularly worker-worker connections are not lazy, it is possible to encounter the following error: ``` ERROR (unhandled task failure): MethodError: no method matching manage(::Base.Distributed.DefaultClusterManager, ::Int64, ::WorkerConfig, ::Symbol) Closest candidates are: manage(!Matched::Base.Distributed.SSHManager, ::Integer, ::WorkerConfig, ::Symbol) at distributed/managers.jl:224 manage(!Matched::Base.Distributed.LocalManager, ::Integer, ::WorkerConfig, ::Symbol) at distributed/managers.jl:337 manage(!Matched::Union{ClusterManagers.PBSManager, ClusterManagers.QRSHManager, ClusterManagers.SGEManager}, ::Int64, ::WorkerConfig, ::Symbol) at /home/jrun/.julia/v0.6/ClusterManagers/src/qsub.jl:115 ... Stacktrace: [1] deregister_worker(::Base.Distributed.ProcessGroup, ::Int64) at ./distributed/cluster.jl:903 [2] message_handler_loop(::TCPSocket, ::TCPSocket, ::Bool) at ./distributed/process_messages.jl:220 [3] process_tcp_streams(::TCPSocket, ::TCPSocket, ::Bool) at ./distributed/process_messages.jl:118 [4] (::Base.Distributed.##101#102{TCPSocket,TCPSocket,Bool})() at ./event.jl:73 ``` It can be simulated with this exact sequence of events: - worker2 in process of connecting to master - master has received the worker2s listen port, connected to it, sent the JoinPGRP message to it - master is now aware of worker2, and has added it to its list of workers - worker2 has still not processed the JoinPGRP message, so it is still unaware of its worker id - worker3 now connects to master - master sends the JoinPGRP message along with list of existing workers that includes worker2 - worker3 connects to worker2 - worker2 receives a new connection from worker3 and attempts to process it - worker3 faces an error and exits, thus breaking the connection - worker2 gets an error processing message from worker3 - goes into error handling - the current error handling code sees the self pid as 1 and incorrectly thinks it is the master - attempts to process the worker disconnection as a master and gets the error we see The MethodError prevents proper cleanup at the worker where it happens. The issue seems to be that it is not correct to identify whether a Julia process is master or worker by looking at the process id. Instead we should have a dedicated indicator for that. This change adds a new local process role variable that is set to `:master` by default, but is set to `:worker` when `start_worker` is invoked. This allows a process to know that it is running as a worker irrespective of whether it has received a process id or not. --- stdlib/Distributed/src/cluster.jl | 11 ++++++++++- stdlib/Distributed/test/distributed_exec.jl | 10 ++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/stdlib/Distributed/src/cluster.jl b/stdlib/Distributed/src/cluster.jl index 3caf002b3cd30..47b2fc2f3a082 100644 --- a/stdlib/Distributed/src/cluster.jl +++ b/stdlib/Distributed/src/cluster.jl @@ -361,6 +361,8 @@ process as a worker using TCP/IP sockets for transport. `cookie` is a [`cluster_cookie`](@ref). """ function init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager()) + myrole!(:worker) + # On workers, the default cluster manager connects via TCP sockets. Custom # transports will need to call this function with their own manager. global cluster_manager @@ -783,12 +785,19 @@ end # globals const LPROC = LocalProcess() +const LPROCROLE = Ref{Symbol}(:master) const HDR_VERSION_LEN=16 const HDR_COOKIE_LEN=16 const map_pid_wrkr = Dict{Int, Union{Worker, LocalProcess}}() const map_sock_wrkr = IdDict() const map_del_wrkr = Set{Int}() +# whether process is a master or worker in a distributed setup +myrole() = LPROCROLE[] +function myrole!(proctype::Symbol) + LPROCROLE[] = proctype +end + # cluster management related API """ myid() @@ -1108,7 +1117,7 @@ function deregister_worker(pg, pid) end end - if myid() == 1 && isdefined(w, :config) + if myid() == 1 && (myrole() === :master) && isdefined(w, :config) # Notify the cluster manager of this workers death manage(w.manager, w.id, w.config, :deregister) if PGRP.topology != :all_to_all || isclusterlazy() diff --git a/stdlib/Distributed/test/distributed_exec.jl b/stdlib/Distributed/test/distributed_exec.jl index b76a9414cd558..737381869851d 100644 --- a/stdlib/Distributed/test/distributed_exec.jl +++ b/stdlib/Distributed/test/distributed_exec.jl @@ -45,6 +45,16 @@ end id_me = myid() id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))] +# Test role +@everywhere using Distributed +@test Distributed.myrole() === :master +for wid = workers() + wrole = remotecall_fetch(wid) do + Distributed.myrole() + end + @test wrole === :worker +end + # Test remote() let pool = default_worker_pool()