Skip to content

Commit

Permalink
Remove localize_vars. Serialize globals under Main.
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Jan 24, 2017
1 parent 06fa32c commit 154dbb7
Show file tree
Hide file tree
Showing 11 changed files with 524 additions and 89 deletions.
122 changes: 117 additions & 5 deletions base/clusterserialize.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,15 @@ type ClusterSerializer{I<:IO} <: AbstractSerializer
counter::Int
table::ObjectIdDict

sent_objects::Set{UInt64} # used by serialize (track objects sent)
pid::Int # Worker we are connected to.
tn_obj_sent::Set{UInt64} # TypeName objects sent
glbs_sent::Dict{UInt64, UInt64} # (key,value) -> (object_id, hash_value)
glbs_in_tnobj::Dict{UInt64, Vector{Symbol}} # Track globals referenced in
# anonymous functions.
anonfunc_id::UInt64

ClusterSerializer(io::I) = new(io, 0, ObjectIdDict(), Set{UInt64}())
ClusterSerializer(io::I) = new(io, 0, ObjectIdDict(), Base.worker_id_from_socket(io),
Set{UInt64}(), Dict{UInt64, UInt64}(), Dict{UInt64, Vector{Symbol}}(), 0)
end
ClusterSerializer(io::IO) = ClusterSerializer{typeof(io)}(io)

Expand All @@ -28,6 +34,9 @@ function deserialize(s::ClusterSerializer, ::Type{TypeName})
else
tn = deserialize_typename(s, number)
end

# retrieve arrays of global syms sent if any and deserialize them all.
foreach(sym->deserialize_global_from_main(s, sym), deserialize(s))
return tn
end

Expand All @@ -36,13 +45,116 @@ function serialize(s::ClusterSerializer, t::TypeName)
writetag(s.io, TYPENAME_TAG)

identifier = object_number(t)
send_whole = !(identifier in s.sent_objects)
send_whole = !(identifier in s.tn_obj_sent)
serialize(s, send_whole)
write(s.io, identifier)
if send_whole
# Track globals referenced in this anonymous function.
# This information is used to resend modified globals when we
# only send the identifier.
prev = s.anonfunc_id
s.anonfunc_id = identifier
serialize_typename(s, t)
push!(s.sent_objects, identifier)
s.anonfunc_id = prev
push!(s.tn_obj_sent, identifier)
finalizer(t, x->cleanup_tname_glbs(s, identifier))
end
# println(t.module, ":", t.name, ", id:", identifier, send_whole ? " sent" : " NOT sent")

# Send global refs if required.
syms = syms_2b_sent(s, identifier)
serialize(s, syms)
foreach(sym->serialize_global_from_main(s, sym), syms)
nothing
end

function serialize(s::ClusterSerializer, g::GlobalRef)
# Record if required and then invoke the default GlobalRef serializer.
sym = g.name
if g.mod === Main && isdefined(g.mod, sym)
v = getfield(Main, sym)
if !isa(v, DataType) && !isa(v, Module) &&
(binding_module(Main, sym) === Main) && (s.anonfunc_id != 0)
push!(get!(s.glbs_in_tnobj, s.anonfunc_id, []), sym)
end
end

invoke(serialize, Tuple{AbstractSerializer, GlobalRef}, s, g)
end

# Send/resend a global object if
# a) has not been sent previously, i.e., we are seeing this object_id for the first time, or,
# b) hash value has changed or
# c) is a bitstype
function syms_2b_sent(s::ClusterSerializer, identifier)
lst = Symbol[]
check_syms = get(s.glbs_in_tnobj, identifier, [])
for sym in check_syms
v = getfield(Main, sym)

if isbits(v)
push!(lst, sym)
else
oid = object_id(v)
if haskey(s.glbs_sent, oid)
# We have sent this object before, see if it has changed.
s.glbs_sent[oid] != hash(v) && push!(lst, sym)
else
push!(lst, sym)
end
end
end
return unique(lst)
end

function serialize_global_from_main(s::ClusterSerializer, sym)
v = getfield(Main, sym)

oid = object_id(v)
record_v = true
if isbits(v)
record_v = false
elseif !haskey(s.glbs_sent, oid)
# set up a finalizer the first time this object is sent
try
finalizer(v, x->delete_global_tracker(s,x))
catch ex
# Do not track objects that cannot be finalized.
if isa(ex, ErrorException)
record_v = false
else
rethrow(ex)
end
end
end
record_v && (s.glbs_sent[oid] = hash(v))

serialize(s, isconst(Main, sym))
serialize(s, v)
end

function deserialize_global_from_main(s::ClusterSerializer, sym)
sym_isconst = deserialize(s)
v = deserialize(s)
if sym_isconst
eval(Main, :(const $sym = $v))
else
eval(Main, :($sym = $v))
end
end

function delete_global_tracker(s::ClusterSerializer, v)
oid = object_id(v)
if haskey(s.glbs_sent, oid)
delete!(s.glbs_sent, oid)
end

# TODO: A global binding is released and gc'ed here but it continues
# to occupy memory on the remote node. Would be nice to release memory
# if possible.
end

function cleanup_tname_glbs(s::ClusterSerializer, identifier)
delete!(s.glbs_in_tnobj, identifier)
end

# TODO: cleanup from s.tn_obj_sent
6 changes: 4 additions & 2 deletions base/event.jl
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@ n_waiters(c::Condition) = length(c.waitq)
@schedule
Wrap an expression in a [`Task`](@ref) and add it to the local machine's scheduler queue.
Similar to [`@async`](@ref) except that an enclosing `@sync` does NOT wait for tasks
started with an `@schedule`.
"""
macro schedule(expr)
expr = :(()->($expr))
:(enq_work(Task($(esc(expr)))))
thunk = esc(:(()->($expr)))
:(enq_work(Task($thunk)))
end

## scheduler and work queue
Expand Down
31 changes: 0 additions & 31 deletions base/expr.jl
Original file line number Diff line number Diff line change
Expand Up @@ -188,37 +188,6 @@ end

## some macro utilities ##

find_vars(e) = find_vars(e, [])
function find_vars(e, lst)
if isa(e,Symbol)
if current_module()===Main && isdefined(e)
# Main runs on process 1, so send globals from there, excluding
# things defined in Base.
if !isdefined(Base,e) || eval(Base,e)!==eval(current_module(),e)
push!(lst, e)
end
end
elseif isa(e,Expr) && e.head !== :quote && e.head !== :top && e.head !== :core
for x in e.args
find_vars(x,lst)
end
end
lst
end

# wrap an expression in "let a=a,b=b,..." for each var it references
localize_vars(expr) = localize_vars(expr, true)
function localize_vars(expr, esca)
v = find_vars(expr)
# requires a special feature of the front end that knows how to insert
# the correct variables. the list of free variables cannot be computed
# from a macro.
if esca
v = map(esc,v)
end
Expr(:localize, expr, v...)
end

function pushmeta!(ex::Expr, sym::Symbol, args::Any...)
if isempty(args)
tag = sym
Expand Down
49 changes: 34 additions & 15 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1937,8 +1937,8 @@ end
## higher-level functions: spawn, pmap, pfor, etc. ##

let nextidx = 0
global chooseproc
function chooseproc(thunk::Function)
global nextproc
function nextproc()
p = -1
if p == -1
p = workers()[(nextidx % nworkers()) + 1]
Expand All @@ -1950,16 +1950,16 @@ end

spawnat(p, thunk) = sync_add(remotecall(thunk, p))

spawn_somewhere(thunk) = spawnat(chooseproc(thunk),thunk)
spawn_somewhere(thunk) = spawnat(nextproc(),thunk)

macro spawn(expr)
expr = localize_vars(esc(:(()->($expr))), false)
:(spawn_somewhere($expr))
thunk = esc(:(()->($expr)))
:(spawn_somewhere($thunk))
end

macro spawnat(p, expr)
expr = localize_vars(esc(:(()->($expr))), false)
:(spawnat($(esc(p)), $expr))
thunk = esc(:(()->($expr)))
:(spawnat($(esc(p)), $thunk))
end

"""
Expand All @@ -1969,11 +1969,8 @@ Equivalent to `fetch(@spawn expr)`.
See [`fetch`](@ref) and [`@spawn`](@ref).
"""
macro fetch(expr)
expr = localize_vars(esc(:(()->($expr))), false)
quote
thunk = $expr
remotecall_fetch(thunk, chooseproc(thunk))
end
thunk = esc(:(()->($expr)))
:(remotecall_fetch($thunk, nextproc()))
end

"""
Expand All @@ -1983,8 +1980,8 @@ Equivalent to `fetch(@spawnat p expr)`.
See [`fetch`](@ref) and [`@spawnat`](@ref).
"""
macro fetchfrom(p, expr)
expr = localize_vars(esc(:(()->($expr))), false)
:(remotecall_fetch($expr, $(esc(p))))
thunk = esc(:(()->($expr)))
:(remotecall_fetch($thunk, $(esc(p))))
end

"""
Expand Down Expand Up @@ -2140,7 +2137,7 @@ macro parallel(args...)
else
thecall = :(preduce($(esc(reducer)), $(make_preduce_body(var, body)), $(esc(r))))
end
localize_vars(thecall)
thecall
end


Expand Down Expand Up @@ -2286,3 +2283,25 @@ function getindex(r::RemoteChannel, args...)
end
return remotecall_fetch(getindex, r.where, r, args...)
end

"""
clear!(syms, pids=workers(); mod=Main)
Clears global bindings in modules by initializing them to `nothing`.
`syms` should be of type `Symbol` or a collection of `Symbol`s . `pids` and `mod`
identify the processes and the module in which global variables are to be
reinitialized. Only those names found to be defined under `mod` are cleared.
An exception is raised if a global constant is requested to be cleared.
"""
function clear!(syms, pids=workers(); mod=Main)
@sync for p in pids
@async remotecall_wait(clear_impl!, p, syms, mod)
end
end
clear!(sym::Symbol, pid::Int; mod=Main) = clear!([sym], [pid]; mod=mod)
clear!(sym::Symbol, pids=workers(); mod=Main) = clear!([sym], pids; mod=mod)
clear!(syms, pid::Int; mod=Main) = clear!(syms, [pid]; mod=mod)

clear_impl!(syms, mod::Module) = foreach(x->clear_impl!(x,mod), syms)
clear_impl!(sym::Symbol, mod::Module) = isdefined(mod, sym) && eval(mod, :(global $sym = nothing))
7 changes: 3 additions & 4 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,9 @@ end
Like `@schedule`, `@async` wraps an expression in a `Task` and adds it to the local
machine's scheduler queue. Additionally it adds the task to the set of items that the
nearest enclosing `@sync` waits for. `@async` also wraps the expression in a `let x=x, y=y, ...`
block to create a new scope with copies of all variables referenced in the expression.
nearest enclosing `@sync` waits for.
"""
macro async(expr)
expr = localize_vars(esc(:(()->($expr))), false)
:(async_run_thunk($expr))
thunk = esc(:(()->($expr)))
:(async_run_thunk($thunk))
end
73 changes: 73 additions & 0 deletions doc/src/manual/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,79 @@ have, it ends up having a substantial cost due to compilers (LLVM and GCC) not g
around the added overflow checks. If this improves in the future, we could consider defaulting
to checked integer arithmetic in Julia, but for now, we have to live with the possibility of overflow.

### What are the possible causes of an `UndefVarError` during remote execution?

As the error states, an immediate cause of an `UndefVarError` on a remote node is that a binding
by that name does not exist. Let us explore some of the possible causes.

```julia
julia> module Foo
foo() = remotecall_fetch(x->x, 2, "Hello")
end

julia> Foo.foo()
ERROR: On worker 2:
UndefVarError: Foo not defined
```

The closure `x->x` carries a reference to `Foo`, and since `Foo` is unavailable on node 2,
an `UndefVarError` is thrown.

Globals under modules other than `Main` are not serialized by value to the remote node. Only a reference is sent.
Functions which create global bindings (except under `Main`) may cause an `UndefVarError` to be thrown later.

```julia
julia> @everywhere module Foo
function foo()
global gvar = "Hello"
remotecall_fetch(()->gvar, 2)
end
end

julia> Foo.foo()
ERROR: On worker 2:
UndefVarError: gvar not defined
```

In the above example, `@everywhere module Foo` defined `Foo` on all nodes. However the call to `Foo.foo()` created
a new global binding `gvar` on the local node, but this was not found on node 2 resulting in an `UndefVarError` error.

Note that this does not apply to globals created under module `Main`. Globals under module `Main` are serialized
and new bindings created under `Main` on the remote node.

```julia
julia> gvar_self = "Node1"
"Node1"

julia> remotecall_fetch(()->gvar_self, 2)
"Node1"

julia> remotecall_fetch(whos, 2)
From worker 2: Base 41762 KB Module
From worker 2: Core 27337 KB Module
From worker 2: Foo 2477 bytes Module
From worker 2: Main 46191 KB Module
From worker 2: gvar_self 13 bytes String
```

This does not apply to `function` or `type` declarations. However, anonymous functions bound to global
variables are serialized as can be seen below.

```julia
julia> bar() = 1
bar (generic function with 1 method)

julia> remotecall_fetch(bar, 2)
ERROR: On worker 2:
UndefVarError: #bar not defined

julia> anon_bar = ()->1
(::#21) (generic function with 1 method)

julia> remotecall_fetch(anon_bar, 2)
1
```
## Packages and Modules
### What is the difference between "using" and "importall"?
Expand Down
Loading

0 comments on commit 154dbb7

Please sign in to comment.